diff --git a/pype/lib.py b/pype/lib.py index e009ac1940..3dcfacb0e2 100644 --- a/pype/lib.py +++ b/pype/lib.py @@ -1666,4 +1666,4 @@ def timeit(method): log.debug('%r %2.2f ms' % (method.__name__, (te - ts) * 1000)) print('%r %2.2f ms' % (method.__name__, (te - ts) * 1000)) return result - return timed \ No newline at end of file + return timed diff --git a/pype/modules/sync_server/providers/gdrive.py b/pype/modules/sync_server/providers/gdrive.py index 162e23afd0..fbb48e18a7 100644 --- a/pype/modules/sync_server/providers/gdrive.py +++ b/pype/modules/sync_server/providers/gdrive.py @@ -1,12 +1,9 @@ from __future__ import print_function -import pickle import os.path from googleapiclient.discovery import build -from google_auth_oauthlib.flow import InstalledAppFlow -from google.auth.transport.requests import Request +import google.oauth2.service_account as service_account from googleapiclient import errors from .abstract_provider import AbstractProvider -# If modifying these scopes, delete the file token.pickle. from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload from pype.api import Logger from pype.lib import timeit @@ -27,6 +24,7 @@ class GDriveHandler(AbstractProvider): lazy creation, created only after first call when necessary """ FOLDER_STR = 'application/vnd.google-apps.folder' + CREDENTIALS_FILE_URL = os.path.dirname(__file__) + '/credentials.json' def __init__(self, tree=None): self.service = self._get_gd_service() @@ -35,29 +33,16 @@ class GDriveHandler(AbstractProvider): def _get_gd_service(self): """ - Authorize client with 'credentials.json', stores token into - 'token.pickle'. + Authorize client with 'credentials.json', uses service account. + Service account needs to have target folder shared with. Produces service that communicates with GDrive API. - :return: + + Returns: + None """ - creds = None - # The file token.pickle stores the user's access and refresh tokens, - # and is created automatically when the authorization flow completes - # for the first time. - if os.path.exists('token.pickle'): - with open('token.pickle', 'rb') as token: - creds = pickle.load(token) - # If there are no (valid) credentials available, let the user log in. - if not creds or not creds.valid: - if creds and creds.expired and creds.refresh_token: - creds.refresh(Request()) - else: - flow = InstalledAppFlow.from_client_secrets_file( - os.path.dirname(__file__) + '/credentials.json', SCOPES) - creds = flow.run_local_server(port=0) - # Save the credentials for the next run - with open('token.pickle', 'wb') as token: - pickle.dump(creds, token) + creds = service_account.Credentials.from_service_account_file( + self.CREDENTIALS_FILE_URL, + scopes=SCOPES) service = build('drive', 'v3', credentials=creds, cache_discovery=False) return service @@ -137,7 +122,8 @@ class GDriveHandler(AbstractProvider): """ Return name of root folder. Needs to be used as a beginning of absolute gdrive path - :return: - plain name, no '/' + Returns: + (string) - plain name, no '/' """ return self.root["name"] @@ -186,10 +172,13 @@ class GDriveHandler(AbstractProvider): Uploads single file from 'source_path' to destination 'path'. It creates all folders on the path if are not existing. - :param source_path: - :param path: absolute path with or without name of the file - :param overwrite: replace existing file - :return: file_id of created/modified file , + Args: + source_path (string): + path (string): absolute path with or without name of the file + overwrite (boolean): replace existing file + + Returns: + (string) file_id of created/modified file , throws FileExistsError, FileNotFoundError exceptions """ if not os.path.isfile(source_path): @@ -230,6 +219,7 @@ class GDriveHandler(AbstractProvider): fields='id').execute() else: + log.debug("update file {}".format(file["id"])) file = self.service.files().update(fileId=file["id"], body=file_metadata, media_body=media, @@ -239,6 +229,10 @@ class GDriveHandler(AbstractProvider): if ex.resp['status'] == '404': return False if ex.resp['status'] == '403': + # real permission issue + if 'has not granted' in ex._get_reason().strip(): + raise PermissionError(ex._get_reason().strip()) + log.warning("Forbidden received, hit quota. " "Injecting 60s delay.") import time @@ -254,10 +248,13 @@ class GDriveHandler(AbstractProvider): It creates all folders on the local_path if are not existing. By default existing file on 'local_path' will trigger an exception - :param source_path: - absolute path on provider - :param local_path: absolute path with or without name of the file - :param overwrite: replace existing file - :return: file_id of created/modified file , + Args: + source_path (string): absolute path on provider + local_path (string): absolute path with or without name of the file + overwrite (boolean): replace existing file + + Returns: + (string) file_id of created/modified file , throws FileExistsError, FileNotFoundError exceptions """ remote_file = self.file_path_exists(source_path) @@ -296,9 +293,12 @@ class GDriveHandler(AbstractProvider): 'force' argument. In that case deletes folder on 'path' and all its children. - :param path: absolute path on GDrive - :param force: delete even if children in folder - :return: None + Args: + path (string): absolute path on GDrive + force (boolean): delete even if children in folder + + Returns: + None """ folder_id = self.folder_path_exists(path) if not folder_id: @@ -321,8 +321,12 @@ class GDriveHandler(AbstractProvider): def delete_file(self, path): """ Deletes file from 'path'. Expects path to specific file. - :param path: absolute path to particular file - :return: None + + Args: + path: absolute path to particular file + + Returns: + None """ file = self.file_path_exists(path) if not file: @@ -332,8 +336,11 @@ class GDriveHandler(AbstractProvider): def _get_folder_metadata(self, path): """ Get info about folder with 'path' - :param path: - :return: with metadata or raises ValueError + Args: + path (string): + + Returns: + (dictionary) with metadata or raises ValueError """ try: return self.get_tree()[path] @@ -343,8 +350,11 @@ class GDriveHandler(AbstractProvider): def list_folder(self, folder_path): """ List all files and subfolders of particular path non-recursively. - :param folder_path: absolut path on provider - :return: + + Args: + folder_path (string): absolut path on provider + Returns: + (list) """ pass @@ -352,7 +362,9 @@ class GDriveHandler(AbstractProvider): def list_folders(self): """ Lists all folders in GDrive. Used to build in-memory structure of path to folder ids model. - :return: list of dictionaries('id', 'name', [parents]) + + Returns: + (list) of dictionaries('id', 'name', [parents]) """ folders = [] page_token = None @@ -376,7 +388,8 @@ class GDriveHandler(AbstractProvider): """ Lists all files in GDrive Runs loop through possibly multiple pages. Result could be large, if it would be a problem, change it to generator - :return: list of dictionaries('id', 'name', [parents]) + Returns: + (list) of dictionaries('id', 'name', [parents]) """ files = [] page_token = None @@ -422,8 +435,11 @@ class GDriveHandler(AbstractProvider): def file_path_exists(self, file_path): """ Checks if 'file_path' exists on GDrive - :param file_path: separated by '/', from root, with file name - :return: file metadata | False if not found + + Args: + file_path (string): separated by '/', from root, with file name + Returns: + (dictionary|boolean) file metadata | False if not found """ folder_id = self.folder_path_exists(file_path) if folder_id: @@ -433,9 +449,13 @@ class GDriveHandler(AbstractProvider): def file_exists(self, file_name, folder_id): """ Checks if 'file_name' exists in 'folder_id' - :param file_name: - :param folder_id: google drive folder id - :return: file metadata, False if not found + + Args: + file_name (string): + folder_id (int): google drive folder id + + Returns: + (dictionary|boolean) file metadata, False if not found """ q = self._handle_q("name = '{}' and '{}' in parents" .format(file_name, folder_id)) @@ -456,9 +476,13 @@ class GDriveHandler(AbstractProvider): def _handle_q(self, q, trashed=False): """ API list call contain trashed and hidden files/folder by default. Usually we dont want those, must be included in query explicitly. - :param q: query portion - :param trashed: False|True - :return: + + Args: + q (string): query portion + trashed (boolean): False|True + + Returns: + (string) - modified query """ parts = [q] if not trashed: @@ -466,59 +490,6 @@ class GDriveHandler(AbstractProvider): return " and ".join(parts) - def _iterfiles(self, name=None, is_folder=None, parent=None, - order_by='folder,name,createdTime'): - """ - Function to list resources in folders, used by _walk - :param name: - :param is_folder: - :param parent: - :param order_by: - :return: - """ - q = [] - if name is not None: - q.append("name = '%s'" % name.replace("'", "\\'")) - if is_folder is not None: - q.append("mimeType %s '%s'" % ( - '=' if is_folder else '!=', self.FOLDER_STR)) - if parent is not None: - q.append("'%s' in parents" % parent.replace("'", "\\'")) - params = {'pageToken': None, 'orderBy': order_by} - if q: - params['q'] = ' and '.join(q) - while True: - response = self.service.files().list(**params).execute() - for f in response['files']: - yield f - try: - params['pageToken'] = response['nextPageToken'] - except KeyError: - return - - def _walk(self, top='root', by_name=False): - """ - Recurcively walk through folders, could be api requests expensive. - :param top: folder id to start walking, 'root' is total root - :param by_name: - :return: - """ - if by_name: - top, = self._iterfiles(name=top, is_folder=True) - else: - top = self.service.files().get(fileId=top).execute() - if top['mimeType'] != self.FOLDER_STR: - raise ValueError('not a folder: %r' % top) - stack = [((top['name'],), top)] - while stack: - path, top = stack.pop() - dirs, files = is_file = [], [] - for f in self._iterfiles(parent=top['id']): - is_file[f['mimeType'] != self.FOLDER_STR].append(f) - yield path, top, dirs, files - if dirs: - stack.extend((path + (d['name'],), d) for d in reversed(dirs)) - if __name__ == '__main__': gd = GDriveHandler() diff --git a/pype/modules/sync_server/sync_server.py b/pype/modules/sync_server/sync_server.py index c2cd57a98c..fc582d665f 100644 --- a/pype/modules/sync_server/sync_server.py +++ b/pype/modules/sync_server/sync_server.py @@ -70,9 +70,10 @@ class SyncServer(): classes and registered in 'providers.py'. """ + # TODO all these move to presets RETRY_CNT = 3 # number of attempts to sync specific file LOCAL_PROVIDER = 'studio' - LOCAL_ID = 'local_0' # personal id of this tray TODO - from Env or preset + LOCAL_ID = 'local_0' # personal id of this tray # limit querying DB to look for X number of representations that should # be sync, we try to run more loops with less records # actual number of files synced could be lower as providers can have @@ -95,11 +96,10 @@ class SyncServer(): io.Session['AVALON_PROJECT'] = 'performance_test' # temp TODO try: self.presets = config.get_presets()["services"]["sync_server"] - except Exception: - + except KeyError: log.debug(( - "There are not set presets for SyncServer." - " No credentials provided, no synching possible" + "There are not set presets for SyncServer." + " No credentials provided, no synching possible" ).format(str(self.presets))) self.sync_server_thread = SynchServerThread(self) @@ -185,7 +185,8 @@ class SyncServer(): if tries < self.RETRY_CNT: return SyncStatus.DO_UPLOAD else: - local_rec = self._get_provider_rec(sites, self.LOCAL_ID) or {} + _, local_rec = self._get_provider_rec(sites, self.LOCAL_ID) \ + or {} if not local_rec or not local_rec.get("created_dt"): tries = self._get_tries_count_from_rec(local_rec) # file will be skipped if unsuccessfully tried over @@ -311,11 +312,17 @@ class SyncServer(): query, update ) + status = 'failed' + error_str = 'with error {}'.format(error) if new_file_id: status = 'succeeded with id {}'.format(new_file_id) + error_str = '' + source_file = file.get("path", "") - log.debug("File {} process {} {}".format(status, source_file, status)) + log.debug("File {} process {} {}".format(status, + source_file, + error_str)) def tray_start(self): self.sync_server_thread.start() @@ -567,11 +574,10 @@ class SynchServerThread(threading.Thread): tree = handler.get_tree() limit -= 1 task = asyncio.create_task( - self.module.upload( - file, - sync, - provider, - tree)) + self.module.upload(file, + sync, + provider, + tree)) task_files_to_process.append(task) # store info for exception handling files_processed_info.append((file, @@ -582,11 +588,10 @@ class SynchServerThread(threading.Thread): tree = handler.get_tree() limit -= 1 task = asyncio.create_task( - self.module.download - (file, - sync, - provider, - tree)) + self.module.download(file, + sync, + provider, + tree)) task_files_to_process.append(task) files_processed_info.append((file, diff --git a/pype/tests/test_mongo_performance.py b/pype/tests/test_mongo_performance.py index a94d0777bf..07ceaab7e6 100644 --- a/pype/tests/test_mongo_performance.py +++ b/pype/tests/test_mongo_performance.py @@ -2,6 +2,7 @@ import pymongo import bson import random from datetime import datetime +import os class TestPerformance(): @@ -34,6 +35,10 @@ class TestPerformance(): MONGO_DB = 'performance_test' MONGO_COLLECTION = 'performance_test' + MAX_FILE_SIZE_B = 5000 + MAX_NUMBER_OF_SITES = 50 + ROOT_DIR = "C:/projects" + inserted_ids = [] def __init__(self, version='array'): @@ -57,7 +62,7 @@ class TestPerformance(): self.ids = [] # for testing self.inserted_ids = [] - def prepare(self, no_of_records=100000): + def prepare(self, no_of_records=100000, create_files=False): ''' Produce 'no_of_records' of representations with 'files' segment. It depends on 'version' value in constructor, 'arrray' or 'doc' @@ -75,9 +80,13 @@ class TestPerformance(): file_id3 = bson.objectid.ObjectId() self.inserted_ids.extend([file_id, file_id2, file_id3]) + version_str = "v{0:03}".format(i+1) + file_name = "test_Cylinder_workfileLookdev_{}.mb".\ + format(version_str) - document = {"files": self.get_files(self.version, i, - file_id, file_id2, file_id3) + document = {"files": self.get_files(self.version, i+1, + file_id, file_id2, file_id3, + create_files) , "context": { "subset": "workfileLookdev", @@ -89,13 +98,13 @@ class TestPerformance(): "version": 1, "asset": "Cylinder", "representation": "mb", - "root": "C:/projects" + "root": self.ROOT_DIR }, "dependencies": [], "name": "mb", "parent": {"oid": '{}'.format(id)}, "data": { - "path": "C:\\projects\\Test\\Assets\\Cylinder\\publish\\workfile\\workfileLookdev\\v001\\test_Cylinder_workfileLookdev_v001.mb", + "path": "C:\\projects\\Test\\Assets\\Cylinder\\publish\\workfile\\workfileLookdev\\{}\\{}".format(version_str, file_name), "template": "{root}\\{project[name]}\\{hierarchy}\\{asset}\\publish\\{family}\\{subset}\\v{version:0>3}\\{project[code]}_{asset}_{subset}_v{version:0>3}<_{output}><.{frame:0>4}>.{representation}" }, "type": "representation", @@ -158,7 +167,8 @@ class TestPerformance(): print('duration per loop {}'.format(end - start)) print("found_cnt {}".format(found_cnt)) - def get_files(self, mode, i, file_id, file_id2, file_id3): + def get_files(self, mode, i, file_id, file_id2, file_id3, + create_files=False): ''' Wrapper to decide if 'array' or document version should be used :param mode: 'array'|'doc' @@ -169,46 +179,60 @@ class TestPerformance(): :return: ''' if mode == 'array': - return self.get_files_array(i, file_id, file_id2, file_id3) + return self.get_files_array(i, file_id, file_id2, file_id3, + create_files) else: return self.get_files_doc(i, file_id, file_id2, file_id3) - def get_files_array(self, i, file_id, file_id2, file_id3): - return [ + def get_files_array(self, i, file_id, file_id2, file_id3, + create_files=False): + ret = [ { - "path": "c:/Test/Assets/Cylinder/publish/workfile/" - "workfileLookdev/v001/" - "test_CylinderA_workfileLookdev_v{0:03}.mb".format(i), + "path": "{root}" + "/Test/Assets/Cylinder/publish/workfile/" + "workfileLookdev/v{0:03}/" + "test_Cylinder_A_workfileLookdev_v{0:03}.dat" + .format(i, i), "_id": '{}'.format(file_id), "hash": "temphash", - "sites": self.get_sites(50), - "size": 87236 + "sites": self.get_sites(self.MAX_NUMBER_OF_SITES), + "size": random.randint(0, self.MAX_FILE_SIZE_B) }, { - "path": "c:/Test/Assets/Cylinder/publish/workfile/" - "workfileLookdev/v001/" - "test_CylinderB_workfileLookdev_v{0:03}.mb".format(i), + "path": "{root}" + "/Test/Assets/Cylinder/publish/workfile/" + "workfileLookdev/v{0:03}/" + "test_Cylinder_B_workfileLookdev_v{0:03}.dat" + .format(i, i), "_id": '{}'.format(file_id2), "hash": "temphash", - "sites": self.get_sites(50), - "size": 87236 + "sites": self.get_sites(self.MAX_NUMBER_OF_SITES), + "size": random.randint(0, self.MAX_FILE_SIZE_B) }, { - "path": "c:/Test/Assets/Cylinder/publish/workfile/" - "workfileLookdev/v001/" - "test_CylinderC_workfileLookdev_v{0:03}.mb".format(i), + "path": "{root}" + "/Test/Assets/Cylinder/publish/workfile/" + "workfileLookdev/v{0:03}/" + "test_Cylinder_C_workfileLookdev_v{0:03}.dat" + .format(i, i), "_id": '{}'.format(file_id3), "hash": "temphash", - "sites": self.get_sites(50), - "size": 87236 + "sites": self.get_sites(self.MAX_NUMBER_OF_SITES), + "size": random.randint(0, self.MAX_FILE_SIZE_B) } ] + if create_files: + for f in ret: + path = f.get("path").replace("{root}", self.ROOT_DIR) + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'wb') as fp: + fp.write(os.urandom(f.get("size"))) + + return ret def get_files_doc(self, i, file_id, file_id2, file_id3): ret = {} ret['{}'.format(file_id)] = { - "path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" + "path": "{root}" + + "/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" "v001/test_CylinderA_workfileLookdev_v{0:03}.mb".format(i), "hash": "temphash", "sites": ["studio"], @@ -216,14 +240,16 @@ class TestPerformance(): } ret['{}'.format(file_id2)] = { - "path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" + "path": "{root}" + + "/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" "v001/test_CylinderB_workfileLookdev_v{0:03}.mb".format(i), "hash": "temphash", "sites": ["studio"], "size": 87236 } ret['{}'.format(file_id3)] = { - "path": "c:/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" + "path": "{root}" + + "/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" "v001/test_CylinderC_workfileLookdev_v{0:03}.mb".format(i), "hash": "temphash", "sites": ["studio"], @@ -261,11 +287,11 @@ class TestPerformance(): if __name__ == '__main__': tp = TestPerformance('array') - tp.prepare() # enable to prepare data - tp.run(10, 3) + tp.prepare(no_of_records=10, create_files=True) # enable to prepare data + # tp.run(10, 3) - print('-'*50) - - tp = TestPerformance('doc') - tp.prepare() # enable to prepare data - tp.run(1000, 3) + # print('-'*50) + # + # tp = TestPerformance('doc') + # tp.prepare() # enable to prepare data + # tp.run(1000, 3)