From 0ecb2d393e4435e02c2ab8bede2a692186825792 Mon Sep 17 00:00:00 2001 From: "petr.kalis" Date: Thu, 30 Jul 2020 13:11:13 +0200 Subject: [PATCH] Refactored build tree Added download functionality --- pype/modules/sync_server/providers/gdrive.py | 99 +++++-- pype/modules/sync_server/providers/lib.py | 21 +- pype/modules/sync_server/sync_server.py | 259 ++++++++++++------- 3 files changed, 270 insertions(+), 109 deletions(-) diff --git a/pype/modules/sync_server/providers/gdrive.py b/pype/modules/sync_server/providers/gdrive.py index 287a4f1cf8..8da311fa97 100644 --- a/pype/modules/sync_server/providers/gdrive.py +++ b/pype/modules/sync_server/providers/gdrive.py @@ -5,11 +5,10 @@ from googleapiclient.discovery import build from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request from googleapiclient import errors -import random from .abstract_provider import AbstractProvider # If modifying these scopes, delete the file token.pickle. -from googleapiclient.http import MediaFileUpload -from pype.api import Logger +from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload +from pype.api import Logger SCOPES = ['https://www.googleapis.com/auth/drive.metadata.readonly', 'https://www.googleapis.com/auth/drive.file'] # for write|delete @@ -18,6 +17,13 @@ log = Logger().get_logger("SyncServer") class GDriveHandler(AbstractProvider): + """ + Implementation of Google Drive API. + As GD API doesn't have real folder structure, 'tree' in memory + structure is build in constructor to map folder paths to folder ids, + which are used in API. Building of this tree might be expensive and + slow and should be run only when necessary. + """ FOLDER_STR = 'application/vnd.google-apps.folder' def __init__(self, tree=None): @@ -56,8 +62,8 @@ class GDriveHandler(AbstractProvider): def _build_tree(self, folders): """ - Create in-memory structure resolving paths to folder id as recursive - quering might be slower. + Create in-memory structure resolving paths to folder id as + recursive quering might be slower. Initialized in the time of class initialization. Maybe should be persisted Tree is structure of path to id: @@ -70,14 +76,15 @@ class GDriveHandler(AbstractProvider): log.debug("build_tree len {}".format(len(folders))) tree = {"/": {"id": self.root["id"]}} ending_by = {self.root["id"]: "/" + self.root["name"]} - not_changed_times = 0 - folders_cnt = len(folders) * 5 - # exit loop for weird unresolved folders, raise ValueError, safety - while folders and not_changed_times < folders_cnt: + no_parents_yet = {} + while folders: folder = folders.pop(0) - # weird cases without parents, shared folders, etc, - # parent under root - parent = folder.get("parents", [self.root["id"]])[0] + parents = folder.get("parents", []) + # weird cases, shared folders, etc, parent under root + if not parents: + parent = self.root["id"] + else: + parent = parents[0] if folder["id"] == self.root["id"]: # do not process root continue @@ -87,14 +94,24 @@ class GDriveHandler(AbstractProvider): ending_by[folder["id"]] = path_key tree[path_key] = {"id": folder["id"]} else: - not_changed_times += 1 - if not_changed_times % 10 == 0: # try to reshuffle deadlocks - random.shuffle(folders) - folders.append(folder) # dont know parent, wait until shows up + no_parents_yet.setdefault(parent, []).append((folder["id"], + folder["name"])) + loop_cnt = 0 + # break if looped more then X times - safety against infinite loop + while no_parents_yet and loop_cnt < 20: - if len(folders) > 0: + keys = list(no_parents_yet.keys()) + for parent in keys: + if parent in ending_by.keys(): + subfolders = no_parents_yet.pop(parent) + for folder_id, folder_name in subfolders: + path_key = ending_by[parent] + "/" + folder_name + ending_by[folder_id] = path_key + tree[path_key] = {"id": folder_id} + + if len(no_parents_yet) > 0: raise ValueError("Some folders path are not resolved {}" - .format(folders)) + .format(no_parents_yet)) return tree @@ -212,7 +229,8 @@ class GDriveHandler(AbstractProvider): if ex.resp['status'] == '404': return False if ex.resp['status'] == '403': - log.info("Forbidden received, hit quota. Injecting 60s delay.") + log.warning("Forbidden received, hit quota. " + "Injecting 60s delay.") import time time.sleep(60) return False @@ -220,8 +238,46 @@ class GDriveHandler(AbstractProvider): return file["id"] - def download_file(self, source_path, local_path): - pass + def download_file(self, source_path, local_path, overwrite=False): + """ + Downloads single file from 'source_path' (remote) to 'local_path'. + It creates all folders on the local_path if are not existing. + By default existing file on 'local_path' will trigger an exception + + :param source_path: - absolute path on provider + :param local_path: absolute path with or without name of the file + :param overwrite: replace existing file + :return: file_id of created/modified file , + throws FileExistsError, FileNotFoundError exceptions + """ + remote_file = self.file_path_exists(source_path) + if not remote_file: + raise FileNotFoundError("Source file {} doesn't exist." + .format(source_path)) + + root, ext = os.path.splitext(local_path) + if ext: + # full path with file name + target_name = os.path.basename(local_path) + local_path = os.path.dirname(local_path) + else: # just folder, get file name from source + target_name = os.path.basename(source_path) + + file = os.path.isfile(local_path + "/" + target_name) + + if file and not overwrite: + raise FileExistsError("File already exists, " + "use 'overwrite' argument") + + request = self.service.files().get_media(fileId=remote_file["id"]) + + with open(local_path + "/" + target_name, "wb") as fh: + downloader = MediaIoBaseDownload(fh, request) + done = False + while done is False: + status, done = downloader.next_chunk() + + return target_name def delete_folder(self, path, force=False): """ @@ -292,6 +348,7 @@ class GDriveHandler(AbstractProvider): while True: q = self._handle_q("mimeType='application/vnd.google-apps.folder'") response = self.service.files().list(q=q, + pageSize=1000, spaces='drive', fields=fields, pageToken=page_token).execute() diff --git a/pype/modules/sync_server/providers/lib.py b/pype/modules/sync_server/providers/lib.py index e2a65a117a..07d8deb01c 100644 --- a/pype/modules/sync_server/providers/lib.py +++ b/pype/modules/sync_server/providers/lib.py @@ -3,6 +3,7 @@ from .gdrive import GDriveHandler class Providers(Enum): + LOCAL = 'studio' GDRIVE = 'gdrive' @@ -17,12 +18,24 @@ class ProviderFactory: self.creators = {} def register_provider(self, provider, creator, batch_limit): + """ + Provide all necessary information for one specific remote provider + :param provider: - name of provider + :param creator: - class implementing AbstractProvider + :param batch_limit: - number of files that could be processed in + one loop (based on provider API quota) + :return: modifies self.providers + """ self.providers[provider] = (creator, batch_limit) def get_provider(self, provider, tree=None): """ - Returns new instance of provider client + Returns new instance of provider client. + 'tree' is used for injecting already created memory structure, + without it constructor of provider would need to calculate it + from scratch, which could be expensive. :param provider: 'gdrive','S3' + :param tree: - folder paths to folder id structure :return: """ creator_info = self._get_creator_info(provider) @@ -44,6 +57,12 @@ class ProviderFactory: return info[1] def _get_creator_info(self, provider): + """ + Collect all necessary info for provider. Currently only creator + class and batch limit + :param provider: + :return: + """ creator_info = self.providers.get(provider) if not creator_info: raise ValueError( diff --git a/pype/modules/sync_server/sync_server.py b/pype/modules/sync_server/sync_server.py index 446195fa36..fcf03d00a3 100644 --- a/pype/modules/sync_server/sync_server.py +++ b/pype/modules/sync_server/sync_server.py @@ -4,6 +4,7 @@ from avalon import io import threading import asyncio import concurrent.futures +from concurrent.futures._base import CancelledError from enum import Enum from datetime import datetime @@ -29,7 +30,7 @@ class SyncServer(): any of implemented providers (like GDrive, S3 etc.) Runs in the background and checks all representations, looks for files that are marked to be in different location than 'studio' (temporary), - checks if 'created_dt' field is present denoting successfull synch + checks if 'created_dt' field is present denoting successful sync with provider destination. ''' - example of synced file test_Cylinder_lookMain_v010.ma to GDrive @@ -51,11 +52,14 @@ class SyncServer(): } }, ''' + Remote files "id" is real id that could be used in appropriate API. + Local files have "id" too, for conformity, contains just file name. It is expected that multiple providers will be implemented in separate classes and registered in 'providers.py'. """ RETRY_CNT = 3 # number of attempts to sync specific file + LOCAL_PROVIDER = 'studio' def __init__(self): self.qaction = None @@ -76,7 +80,6 @@ class SyncServer(): "There are not set presets for SyncServer." " No credentials provided, no synching possible" ).format(str(self.presets))) - handler = lib.factory.get_provider('gdrive') # prime handler TEMP self.sync_server_thread = SynchServerThread(self) def get_sync_representations(self): @@ -95,32 +98,40 @@ class SyncServer(): return representations - def check_status(self, file, representation, provider): + def check_status(self, file, provider_name): """ Check synchronization status for single 'file' of single 'representation' by single 'provider'. (Eg. check if 'scene.ma' of lookdev.v10 should be synched to GDrive :param file: of file from representation in Mongo - :param representation: of representation - :param provider: - gdrive, gdc etc. + :param provider_name: - gdrive, gdc etc. :return: - one of SyncStatus """ sites = file.get("sites") or {} if isinstance(sites, list): # temporary, old format of 'sites' return SyncStatus.DO_NOTHING - provider_rec = sites.get(provider) or {} + provider_rec = sites.get(provider_name) or {} if provider_rec: created_dt = provider_rec.get("created_dt") if not created_dt: - tries = self._get_tries_count(file, provider) + tries = self._get_tries_count(file, provider_name) # file will be skipped if unsuccessfully tried over threshold # error metadata needs to be purged manually in DB to reset if tries < self.RETRY_CNT: return SyncStatus.DO_UPLOAD + else: + local_rec = sites.get(lib.Providers.LOCAL.value) or {} + if not local_rec or not local_rec.get("created_dt"): + tries = self._get_tries_count(file, self.LOCAL_PROVIDER) + # file will be skipped if unsuccessfully tried over + # threshold error metadata needs to be purged manually + # in DB to reset + if tries < self.RETRY_CNT: + return SyncStatus.DO_DOWNLOAD return SyncStatus.DO_NOTHING - async def upload(self, file, representation, provider, tree=None): + async def upload(self, file, representation, provider_name, tree=None): """ Upload single 'file' of a 'representation' to 'provider'. Source url is taken from 'file' portion, where {root} placeholder @@ -132,20 +143,19 @@ class SyncServer(): :param file: of file from representation in Mongo :param representation: of representation - :param provider: - gdrive, gdc etc. + :param provider_name: - gdrive, gdc etc. + :param tree: - injected memory structure for performance :return: """ # create ids sequentially, upload file in parallel later with self.lock: - handler = lib.factory.get_provider(provider, tree) + handler = lib.factory.get_provider(provider_name, tree) + remote_file = self._get_remote_file_path(file, + handler.get_root_name()) local_root = representation.get("context", {}).get("root") - if not local_root: - raise ValueError("Unknown local root for file {}") - source_file = file.get("path", "").replace('{root}', local_root) - target_root = '/{}'.format(handler.get_root_name()) - target_file = file.get("path", "").replace('{root}', target_root) + local_file = self._get_local_file_path(file, local_root) - target_folder = os.path.dirname(target_file) + target_folder = os.path.dirname(remote_file) folder_id = handler.create_folder(target_folder) if not folder_id: @@ -155,19 +165,47 @@ class SyncServer(): loop = asyncio.get_running_loop() file_id = await loop.run_in_executor(None, handler.upload_file, - source_file, - target_file, + local_file, + remote_file, True) return file_id - def update_db(self, new_file_id, file, representation,provider,error=None): + async def download(self, file, representation, provider_name, tree=None): + """ + Downloads file to local folder denoted in representation.Context. + :param file: - info about processed file + :param representation: - repr that 'file' belongs to + :param provider_name: - 'gdrive' etc + :param tree: - injected memory structure for performance + :return: - 'name' of local file + """ + with self.lock: + handler = lib.factory.get_provider(provider_name, tree) + remote_file = self._get_remote_file_path(file, + handler.get_root_name()) + local_root = representation.get("context", {}).get("root") + local_file = self._get_local_file_path(file, local_root) + + local_folder = os.path.dirname(local_file) + os.makedirs(local_folder, exist_ok=True) + + loop = asyncio.get_running_loop() + file_id = await loop.run_in_executor(None, + handler.download_file, + remote_file, + local_file, + False) + return file_id + + def update_db(self, new_file_id, file, representation, provider_name, + error=None): """ Update 'provider' portion of records in DB with success (file_id) or error (exception) :param new_file_id: :param file: - info about processed file (pulled from DB) :param representation: - parent repr of file (from DB) - :param provider: - label ('gdrive', 'S3') + :param provider_name: - label ('gdrive', 'S3') :param error: - exception message :return: None """ @@ -180,14 +218,14 @@ class SyncServer(): update = {} if new_file_id: - update["$set"] = self._get_success_dict(provider, new_file_id) + update["$set"] = self._get_success_dict(provider_name, new_file_id) # reset previous errors if any - update["$unset"] = self._get_error_dict(provider, "", "") + update["$unset"] = self._get_error_dict(provider_name, "", "") else: - tries = self._get_tries_count(file, provider) + tries = self._get_tries_count(file, provider_name) tries += 1 - update["$set"] = self._get_error_dict(provider, error, tries) + update["$set"] = self._get_error_dict(provider_name, error, tries) # it actually modifies single _id, but io.update_one not implemented io.update_many( @@ -198,10 +236,7 @@ class SyncServer(): if new_file_id: status = 'succeeded with id {}'.format(new_file_id) source_file = file.get("path", "") - log.debug("File {} upload {} {}".format(status, source_file, status)) - - async def download(self, file, representation, provider): - pass + log.debug("File {} process {} {}".format(status, source_file, status)) def tray_start(self): self.sync_server_thread.start() @@ -286,6 +321,27 @@ class SyncServer(): """ return file.get("sites", {}).get(provider, {}).get("tries", 0) + def _get_local_file_path(self, file, local_root): + """ + Auxiliary function for replacing rootless path with real path + :param file: url to file with {root} + :param local_root: value of {root} for local projects + :return: - absolute path on local system + """ + if not local_root: + raise ValueError("Unknown local root for file {}") + return file.get("path", "").replace('{root}', local_root) + + def _get_remote_file_path(self, file, root_name): + """ + Auxiliary function for replacing rootless path with real path + :param file: url to file with {root} + :param root_name: value of {root} for remote location + :return: - absolute path on remote location + """ + target_root = '/{}'.format(root_name) + return file.get("path", "").replace('{root}', target_root) + class SynchServerThread(threading.Thread): """ @@ -319,69 +375,98 @@ class SynchServerThread(threading.Thread): self.loop.close() # optional async def sync_loop(self): - while self.is_running: - import time - from datetime import datetime - start_time = time.time() - sync_representations = self.module.get_sync_representations() + try: + while self.is_running: + import time + from datetime import datetime + start_time = time.time() + sync_representations = self.module.get_sync_representations() - task_files_to_upload = [] - files_created_info = [] - # process only unique file paths in one batch - # multiple representation could have same file path (textures), - # upload process can find already uploaded file and reuse same id - processed_file_path = set() - for provider in lib.factory.providers.keys(): - tree = lib.factory.get_provider(provider).get_tree() - limit = lib.factory.get_provider_batch_limit(provider) - for sync in sync_representations: - if limit <= 0: - continue - files = sync.get("files") or {} - if files: - for file in files: - # skip already processed files - file_path = file.get('path', '') - if file_path in processed_file_path: - continue + local_label = lib.Providers.LOCAL.value + task_files_to_process = [] + files_processed_info = [] + # process only unique file paths in one batch + # multiple representation could have same file path (textures), + # upload process can find already uploaded file and reuse same + # id + processed_file_path = set() + for provider in lib.factory.providers.keys(): + # first call to get_provider could be expensive, its + # building folder tree structure in memory + handler = lib.factory.get_provider(provider) + tree = handler.get_tree() + limit = lib.factory.get_provider_batch_limit(provider) + for sync in sync_representations: + if limit <= 0: + continue + files = sync.get("files") or {} + if files: + for file in files: + # skip already processed files + file_path = file.get('path', '') + if file_path in processed_file_path: + continue - status = self.module.check_status(file, sync, - provider) + status = self.module.check_status(file, + provider) - if status == SyncStatus.DO_UPLOAD: - limit -= 1 - task_files_to_upload.append(asyncio.create_task( - self.module.upload(file, - sync, - provider, - tree))) - # store info for exception handling - files_created_info.append((file, - sync, - provider)) - processed_file_path.add(file_path) - if status == SyncStatus.DO_DOWNLOAD: - limit -= 1 - await self.module.download(file, sync, provider) - processed_file_path.add(file_path) - files_created = [] - files_created = await asyncio.gather(*task_files_to_upload, - return_exceptions=True) - for file_id, info in zip(files_created, files_created_info): - file, representation, provider = info - error = None - if isinstance(file_id, BaseException): - error = str(file_id) - file_id = None - self.module.update_db(file_id, - file, - representation, - provider, - error) + if status == SyncStatus.DO_UPLOAD: + limit -= 1 + task = asyncio.create_task( + self.module.upload( + file, + sync, + provider, + tree)) + task_files_to_process.append(task) + # store info for exception handling + files_processed_info.append((file, + sync, + provider)) + processed_file_path.add(file_path) + if status == SyncStatus.DO_DOWNLOAD: + limit -= 1 + task = asyncio.create_task( + self.module.download + (file, + sync, + provider)) + task_files_to_process.append(task) - duration = time.time() - start_time - log.debug("One loop took {}".format(duration)) - await asyncio.sleep(60) + files_processed_info.append((file, + sync, + local_label)) + processed_file_path.add(file_path) + + log.debug("gather tasks len {}". + format(len(task_files_to_process))) + files_created = await asyncio.gather(*task_files_to_process, + return_exceptions=True) + for file_id, info in zip(files_created, files_processed_info): + file, representation, provider = info + error = None + if isinstance(file_id, BaseException): + error = str(file_id) + file_id = None + self.module.update_db(file_id, + file, + representation, + provider, + error) + + duration = time.time() - start_time + log.debug("One loop took {}".format(duration)) + await asyncio.sleep(60) + except ConnectionResetError: + log.warning("ConnectionResetError in sync loop, trying next loop", + exc_info=True) + except CancelledError: + # just stopping server + pass + except Exception: + self.stop() + log.warning("Unhandled exception in sync loop, stopping server", + exc_info=True) def stop(self): """Sets is_running flag to false, 'check_shutdown' shuts server down"""