Authentication changed to 'service account'

Added error propagation for permission issue
Added dummy files creation to test_mongo_performance.py
This commit is contained in:
Petr Kalis 2020-09-22 14:21:53 +02:00
parent e3c8d9fd7c
commit 8eb5524d64
4 changed files with 160 additions and 158 deletions

View file

@ -1666,4 +1666,4 @@ def timeit(method):
log.debug('%r %2.2f ms' % (method.__name__, (te - ts) * 1000))
print('%r %2.2f ms' % (method.__name__, (te - ts) * 1000))
return result
return timed
return timed

View file

@ -1,12 +1,9 @@
from __future__ import print_function
import pickle
import os.path
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import google.oauth2.service_account as service_account
from googleapiclient import errors
from .abstract_provider import AbstractProvider
# If modifying these scopes, delete the file token.pickle.
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
from pype.api import Logger
from pype.lib import timeit
@ -27,6 +24,7 @@ class GDriveHandler(AbstractProvider):
lazy creation, created only after first call when necessary
"""
FOLDER_STR = 'application/vnd.google-apps.folder'
CREDENTIALS_FILE_URL = os.path.dirname(__file__) + '/credentials.json'
def __init__(self, tree=None):
self.service = self._get_gd_service()
@ -35,29 +33,16 @@ class GDriveHandler(AbstractProvider):
def _get_gd_service(self):
"""
Authorize client with 'credentials.json', stores token into
'token.pickle'.
Authorize client with 'credentials.json', uses service account.
Service account needs to have target folder shared with.
Produces service that communicates with GDrive API.
:return:
Returns:
None
"""
creds = None
# The file token.pickle stores the user's access and refresh tokens,
# and is created automatically when the authorization flow completes
# for the first time.
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
os.path.dirname(__file__) + '/credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
creds = service_account.Credentials.from_service_account_file(
self.CREDENTIALS_FILE_URL,
scopes=SCOPES)
service = build('drive', 'v3',
credentials=creds, cache_discovery=False)
return service
@ -137,7 +122,8 @@ class GDriveHandler(AbstractProvider):
"""
Return name of root folder. Needs to be used as a beginning of
absolute gdrive path
:return: <string> - plain name, no '/'
Returns:
(string) - plain name, no '/'
"""
return self.root["name"]
@ -186,10 +172,13 @@ class GDriveHandler(AbstractProvider):
Uploads single file from 'source_path' to destination 'path'.
It creates all folders on the path if are not existing.
:param source_path:
:param path: absolute path with or without name of the file
:param overwrite: replace existing file
:return: <string> file_id of created/modified file ,
Args:
source_path (string):
path (string): absolute path with or without name of the file
overwrite (boolean): replace existing file
Returns:
(string) file_id of created/modified file ,
throws FileExistsError, FileNotFoundError exceptions
"""
if not os.path.isfile(source_path):
@ -230,6 +219,7 @@ class GDriveHandler(AbstractProvider):
fields='id').execute()
else:
log.debug("update file {}".format(file["id"]))
file = self.service.files().update(fileId=file["id"],
body=file_metadata,
media_body=media,
@ -239,6 +229,10 @@ class GDriveHandler(AbstractProvider):
if ex.resp['status'] == '404':
return False
if ex.resp['status'] == '403':
# real permission issue
if 'has not granted' in ex._get_reason().strip():
raise PermissionError(ex._get_reason().strip())
log.warning("Forbidden received, hit quota. "
"Injecting 60s delay.")
import time
@ -254,10 +248,13 @@ class GDriveHandler(AbstractProvider):
It creates all folders on the local_path if are not existing.
By default existing file on 'local_path' will trigger an exception
:param source_path: <string> - absolute path on provider
:param local_path: absolute path with or without name of the file
:param overwrite: replace existing file
:return: <string> file_id of created/modified file ,
Args:
source_path (string): absolute path on provider
local_path (string): absolute path with or without name of the file
overwrite (boolean): replace existing file
Returns:
(string) file_id of created/modified file ,
throws FileExistsError, FileNotFoundError exceptions
"""
remote_file = self.file_path_exists(source_path)
@ -296,9 +293,12 @@ class GDriveHandler(AbstractProvider):
'force' argument.
In that case deletes folder on 'path' and all its children.
:param path: absolute path on GDrive
:param force: delete even if children in folder
:return: None
Args:
path (string): absolute path on GDrive
force (boolean): delete even if children in folder
Returns:
None
"""
folder_id = self.folder_path_exists(path)
if not folder_id:
@ -321,8 +321,12 @@ class GDriveHandler(AbstractProvider):
def delete_file(self, path):
"""
Deletes file from 'path'. Expects path to specific file.
:param path: absolute path to particular file
:return: None
Args:
path: absolute path to particular file
Returns:
None
"""
file = self.file_path_exists(path)
if not file:
@ -332,8 +336,11 @@ class GDriveHandler(AbstractProvider):
def _get_folder_metadata(self, path):
"""
Get info about folder with 'path'
:param path: <string>
:return: <dictionary> with metadata or raises ValueError
Args:
path (string):
Returns:
(dictionary) with metadata or raises ValueError
"""
try:
return self.get_tree()[path]
@ -343,8 +350,11 @@ class GDriveHandler(AbstractProvider):
def list_folder(self, folder_path):
"""
List all files and subfolders of particular path non-recursively.
:param folder_path: absolut path on provider
:return: <list>
Args:
folder_path (string): absolut path on provider
Returns:
(list)
"""
pass
@ -352,7 +362,9 @@ class GDriveHandler(AbstractProvider):
def list_folders(self):
""" Lists all folders in GDrive.
Used to build in-memory structure of path to folder ids model.
:return: list of dictionaries('id', 'name', [parents])
Returns:
(list) of dictionaries('id', 'name', [parents])
"""
folders = []
page_token = None
@ -376,7 +388,8 @@ class GDriveHandler(AbstractProvider):
""" Lists all files in GDrive
Runs loop through possibly multiple pages. Result could be large,
if it would be a problem, change it to generator
:return: list of dictionaries('id', 'name', [parents])
Returns:
(list) of dictionaries('id', 'name', [parents])
"""
files = []
page_token = None
@ -422,8 +435,11 @@ class GDriveHandler(AbstractProvider):
def file_path_exists(self, file_path):
"""
Checks if 'file_path' exists on GDrive
:param file_path: separated by '/', from root, with file name
:return: file metadata | False if not found
Args:
file_path (string): separated by '/', from root, with file name
Returns:
(dictionary|boolean) file metadata | False if not found
"""
folder_id = self.folder_path_exists(file_path)
if folder_id:
@ -433,9 +449,13 @@ class GDriveHandler(AbstractProvider):
def file_exists(self, file_name, folder_id):
"""
Checks if 'file_name' exists in 'folder_id'
:param file_name:
:param folder_id: google drive folder id
:return: file metadata, False if not found
Args:
file_name (string):
folder_id (int): google drive folder id
Returns:
(dictionary|boolean) file metadata, False if not found
"""
q = self._handle_q("name = '{}' and '{}' in parents"
.format(file_name, folder_id))
@ -456,9 +476,13 @@ class GDriveHandler(AbstractProvider):
def _handle_q(self, q, trashed=False):
""" API list call contain trashed and hidden files/folder by default.
Usually we dont want those, must be included in query explicitly.
:param q: <string> query portion
:param trashed: False|True
:return: <string>
Args:
q (string): query portion
trashed (boolean): False|True
Returns:
(string) - modified query
"""
parts = [q]
if not trashed:
@ -466,59 +490,6 @@ class GDriveHandler(AbstractProvider):
return " and ".join(parts)
def _iterfiles(self, name=None, is_folder=None, parent=None,
order_by='folder,name,createdTime'):
"""
Function to list resources in folders, used by _walk
:param name:
:param is_folder:
:param parent:
:param order_by:
:return:
"""
q = []
if name is not None:
q.append("name = '%s'" % name.replace("'", "\\'"))
if is_folder is not None:
q.append("mimeType %s '%s'" % (
'=' if is_folder else '!=', self.FOLDER_STR))
if parent is not None:
q.append("'%s' in parents" % parent.replace("'", "\\'"))
params = {'pageToken': None, 'orderBy': order_by}
if q:
params['q'] = ' and '.join(q)
while True:
response = self.service.files().list(**params).execute()
for f in response['files']:
yield f
try:
params['pageToken'] = response['nextPageToken']
except KeyError:
return
def _walk(self, top='root', by_name=False):
"""
Recurcively walk through folders, could be api requests expensive.
:param top: <string> folder id to start walking, 'root' is total root
:param by_name:
:return: <generator>
"""
if by_name:
top, = self._iterfiles(name=top, is_folder=True)
else:
top = self.service.files().get(fileId=top).execute()
if top['mimeType'] != self.FOLDER_STR:
raise ValueError('not a folder: %r' % top)
stack = [((top['name'],), top)]
while stack:
path, top = stack.pop()
dirs, files = is_file = [], []
for f in self._iterfiles(parent=top['id']):
is_file[f['mimeType'] != self.FOLDER_STR].append(f)
yield path, top, dirs, files
if dirs:
stack.extend((path + (d['name'],), d) for d in reversed(dirs))
if __name__ == '__main__':
gd = GDriveHandler()

View file

@ -70,9 +70,10 @@ class SyncServer():
classes and registered in 'providers.py'.
"""
# TODO all these move to presets
RETRY_CNT = 3 # number of attempts to sync specific file
LOCAL_PROVIDER = 'studio'
LOCAL_ID = 'local_0' # personal id of this tray TODO - from Env or preset
LOCAL_ID = 'local_0' # personal id of this tray
# limit querying DB to look for X number of representations that should
# be sync, we try to run more loops with less records
# actual number of files synced could be lower as providers can have
@ -95,11 +96,10 @@ class SyncServer():
io.Session['AVALON_PROJECT'] = 'performance_test' # temp TODO
try:
self.presets = config.get_presets()["services"]["sync_server"]
except Exception:
except KeyError:
log.debug((
"There are not set presets for SyncServer."
" No credentials provided, no synching possible"
"There are not set presets for SyncServer."
" No credentials provided, no synching possible"
).format(str(self.presets)))
self.sync_server_thread = SynchServerThread(self)
@ -185,7 +185,8 @@ class SyncServer():
if tries < self.RETRY_CNT:
return SyncStatus.DO_UPLOAD
else:
local_rec = self._get_provider_rec(sites, self.LOCAL_ID) or {}
_, local_rec = self._get_provider_rec(sites, self.LOCAL_ID) \
or {}
if not local_rec or not local_rec.get("created_dt"):
tries = self._get_tries_count_from_rec(local_rec)
# file will be skipped if unsuccessfully tried over
@ -311,11 +312,17 @@ class SyncServer():
query,
update
)
status = 'failed'
error_str = 'with error {}'.format(error)
if new_file_id:
status = 'succeeded with id {}'.format(new_file_id)
error_str = ''
source_file = file.get("path", "")
log.debug("File {} process {} {}".format(status, source_file, status))
log.debug("File {} process {} {}".format(status,
source_file,
error_str))
def tray_start(self):
self.sync_server_thread.start()
@ -567,11 +574,10 @@ class SynchServerThread(threading.Thread):
tree = handler.get_tree()
limit -= 1
task = asyncio.create_task(
self.module.upload(
file,
sync,
provider,
tree))
self.module.upload(file,
sync,
provider,
tree))
task_files_to_process.append(task)
# store info for exception handling
files_processed_info.append((file,
@ -582,11 +588,10 @@ class SynchServerThread(threading.Thread):
tree = handler.get_tree()
limit -= 1
task = asyncio.create_task(
self.module.download
(file,
sync,
provider,
tree))
self.module.download(file,
sync,
provider,
tree))
task_files_to_process.append(task)
files_processed_info.append((file,

View file

@ -2,6 +2,7 @@ import pymongo
import bson
import random
from datetime import datetime
import os
class TestPerformance():
@ -34,6 +35,10 @@ class TestPerformance():
MONGO_DB = 'performance_test'
MONGO_COLLECTION = 'performance_test'
MAX_FILE_SIZE_B = 5000
MAX_NUMBER_OF_SITES = 50
ROOT_DIR = "C:/projects"
inserted_ids = []
def __init__(self, version='array'):
@ -57,7 +62,7 @@ class TestPerformance():
self.ids = [] # for testing
self.inserted_ids = []
def prepare(self, no_of_records=100000):
def prepare(self, no_of_records=100000, create_files=False):
'''
Produce 'no_of_records' of representations with 'files' segment.
It depends on 'version' value in constructor, 'arrray' or 'doc'
@ -75,9 +80,13 @@ class TestPerformance():
file_id3 = bson.objectid.ObjectId()
self.inserted_ids.extend([file_id, file_id2, file_id3])
version_str = "v{0:03}".format(i+1)
file_name = "test_Cylinder_workfileLookdev_{}.mb".\
format(version_str)
document = {"files": self.get_files(self.version, i,
file_id, file_id2, file_id3)
document = {"files": self.get_files(self.version, i+1,
file_id, file_id2, file_id3,
create_files)
,
"context": {
"subset": "workfileLookdev",
@ -89,13 +98,13 @@ class TestPerformance():
"version": 1,
"asset": "Cylinder",
"representation": "mb",
"root": "C:/projects"
"root": self.ROOT_DIR
},
"dependencies": [],
"name": "mb",
"parent": {"oid": '{}'.format(id)},
"data": {
"path": "C:\\projects\\Test\\Assets\\Cylinder\\publish\\workfile\\workfileLookdev\\v001\\test_Cylinder_workfileLookdev_v001.mb",
"path": "C:\\projects\\Test\\Assets\\Cylinder\\publish\\workfile\\workfileLookdev\\{}\\{}".format(version_str, file_name),
"template": "{root}\\{project[name]}\\{hierarchy}\\{asset}\\publish\\{family}\\{subset}\\v{version:0>3}\\{project[code]}_{asset}_{subset}_v{version:0>3}<_{output}><.{frame:0>4}>.{representation}"
},
"type": "representation",
@ -158,7 +167,8 @@ class TestPerformance():
print('duration per loop {}'.format(end - start))
print("found_cnt {}".format(found_cnt))
def get_files(self, mode, i, file_id, file_id2, file_id3):
def get_files(self, mode, i, file_id, file_id2, file_id3,
create_files=False):
'''
Wrapper to decide if 'array' or document version should be used
:param mode: 'array'|'doc'
@ -169,46 +179,60 @@ class TestPerformance():
:return:
'''
if mode == 'array':
return self.get_files_array(i, file_id, file_id2, file_id3)
return self.get_files_array(i, file_id, file_id2, file_id3,
create_files)
else:
return self.get_files_doc(i, file_id, file_id2, file_id3)
def get_files_array(self, i, file_id, file_id2, file_id3):
return [
def get_files_array(self, i, file_id, file_id2, file_id3,
create_files=False):
ret = [
{
"path": "c:/Test/Assets/Cylinder/publish/workfile/"
"workfileLookdev/v001/"
"test_CylinderA_workfileLookdev_v{0:03}.mb".format(i),
"path": "{root}" + "/Test/Assets/Cylinder/publish/workfile/"
"workfileLookdev/v{0:03}/"
"test_Cylinder_A_workfileLookdev_v{0:03}.dat"
.format(i, i),
"_id": '{}'.format(file_id),
"hash": "temphash",
"sites": self.get_sites(50),
"size": 87236
"sites": self.get_sites(self.MAX_NUMBER_OF_SITES),
"size": random.randint(0, self.MAX_FILE_SIZE_B)
},
{
"path": "c:/Test/Assets/Cylinder/publish/workfile/"
"workfileLookdev/v001/"
"test_CylinderB_workfileLookdev_v{0:03}.mb".format(i),
"path": "{root}" + "/Test/Assets/Cylinder/publish/workfile/"
"workfileLookdev/v{0:03}/"
"test_Cylinder_B_workfileLookdev_v{0:03}.dat"
.format(i, i),
"_id": '{}'.format(file_id2),
"hash": "temphash",
"sites": self.get_sites(50),
"size": 87236
"sites": self.get_sites(self.MAX_NUMBER_OF_SITES),
"size": random.randint(0, self.MAX_FILE_SIZE_B)
},
{
"path": "c:/Test/Assets/Cylinder/publish/workfile/"
"workfileLookdev/v001/"
"test_CylinderC_workfileLookdev_v{0:03}.mb".format(i),
"path": "{root}" + "/Test/Assets/Cylinder/publish/workfile/"
"workfileLookdev/v{0:03}/"
"test_Cylinder_C_workfileLookdev_v{0:03}.dat"
.format(i, i),
"_id": '{}'.format(file_id3),
"hash": "temphash",
"sites": self.get_sites(50),
"size": 87236
"sites": self.get_sites(self.MAX_NUMBER_OF_SITES),
"size": random.randint(0, self.MAX_FILE_SIZE_B)
}
]
if create_files:
for f in ret:
path = f.get("path").replace("{root}", self.ROOT_DIR)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as fp:
fp.write(os.urandom(f.get("size")))
return ret
def get_files_doc(self, i, file_id, file_id2, file_id3):
ret = {}
ret['{}'.format(file_id)] = {
"path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/"
"path": "{root}" +
"/Test/Assets/Cylinder/publish/workfile/workfileLookdev/"
"v001/test_CylinderA_workfileLookdev_v{0:03}.mb".format(i),
"hash": "temphash",
"sites": ["studio"],
@ -216,14 +240,16 @@ class TestPerformance():
}
ret['{}'.format(file_id2)] = {
"path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/"
"path": "{root}" +
"/Test/Assets/Cylinder/publish/workfile/workfileLookdev/"
"v001/test_CylinderB_workfileLookdev_v{0:03}.mb".format(i),
"hash": "temphash",
"sites": ["studio"],
"size": 87236
}
ret['{}'.format(file_id3)] = {
"path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/"
"path": "{root}" +
"/Test/Assets/Cylinder/publish/workfile/workfileLookdev/"
"v001/test_CylinderC_workfileLookdev_v{0:03}.mb".format(i),
"hash": "temphash",
"sites": ["studio"],
@ -261,11 +287,11 @@ class TestPerformance():
if __name__ == '__main__':
tp = TestPerformance('array')
tp.prepare() # enable to prepare data
tp.run(10, 3)
tp.prepare(no_of_records=10, create_files=True) # enable to prepare data
# tp.run(10, 3)
print('-'*50)
tp = TestPerformance('doc')
tp.prepare() # enable to prepare data
tp.run(1000, 3)
# print('-'*50)
#
# tp = TestPerformance('doc')
# tp.prepare() # enable to prepare data
# tp.run(1000, 3)