From 7b64505042697654d0d6b3f8bd43eaad149e7bad Mon Sep 17 00:00:00 2001 From: "petr.kalis" Date: Tue, 21 Jul 2020 20:21:30 +0200 Subject: [PATCH] Added better exceptions Added error metadata Added retries limit --- pype/modules/sync_server/providers/gdrive.py | 14 +- pype/modules/sync_server/sync_server.py | 128 +++++++++++++++---- 2 files changed, 110 insertions(+), 32 deletions(-) diff --git a/pype/modules/sync_server/providers/gdrive.py b/pype/modules/sync_server/providers/gdrive.py index b1aba3eb9f..02eac83bf2 100644 --- a/pype/modules/sync_server/providers/gdrive.py +++ b/pype/modules/sync_server/providers/gdrive.py @@ -47,7 +47,8 @@ class GDriveHandler(AbstractProvider): # Save the credentials for the next run with open('token.pickle', 'wb') as token: pickle.dump(creds, token) - service = build('drive', 'v3', credentials=creds) + service = build('drive', 'v3', + credentials=creds, cache_discovery=False) return service def _build_tree(self, folders): @@ -146,11 +147,12 @@ class GDriveHandler(AbstractProvider): :param source_path: :param path: absolute path with or without name of the file :param overwrite: replace existing file - :return: file_id of created/modified file + :return: file_id of created/modified file , + throws FileExistsError, FileNotFoundError exceptions """ if not os.path.isfile(source_path): - raise ValueError("Source file {} doesn't exist.". - format(source_path)) + raise FileNotFoundError("Source file {} doesn't exist." + .format(source_path)) root, ext = os.path.splitext(path) @@ -163,8 +165,8 @@ class GDriveHandler(AbstractProvider): file = self.file_path_exists(path + "/" + target_name) if file and not overwrite: - raise ValueError("File already exists, " - "use 'overwrite' argument") + raise FileExistsError("File already exists, " + "use 'overwrite' argument") folder_id = self.create_folder(path) file_metadata = { diff --git a/pype/modules/sync_server/sync_server.py b/pype/modules/sync_server/sync_server.py index d23c2befd7..68808759b8 100644 --- a/pype/modules/sync_server/sync_server.py +++ b/pype/modules/sync_server/sync_server.py @@ -53,6 +53,8 @@ class SyncServer(): classes and registered in 'providers.py'. """ + RETRY_CNT = 3 # number of attempts to sync specific file + def __init__(self): self.qaction = None self.failed_icon = None @@ -76,8 +78,12 @@ class SyncServer(): def get_sync_representations(self): """ - Get representations. - TODO: filter out representations that shouldnt be synced + Get representations that should be synched, these could be + recognised by presence of document in 'files.sites', where key is + a provider (GDrive, S3) and value is empty document or document with + no value for 'created_dt' field. + Currently returning all representations. + TODO: filter out representations that shouldnt be synced :return: """ representations = io.find({ @@ -99,11 +105,15 @@ class SyncServer(): sites = file.get("sites") or {} if isinstance(sites, list): # temporary, old format of 'sites' return SyncStatus.DO_NOTHING - provider = sites.get(provider) or {} - if provider: - created_dt = provider.get("created_dt") + provider_rec = sites.get(provider) or {} + if provider_rec: + created_dt = provider_rec.get("created_dt") if not created_dt: - return SyncStatus.DO_UPLOAD + tries = self._get_tries_count(file, provider) + # file will be skipped if unsuccessfully tried over threshold + # error metadata needs to be purged manually in DB to reset + if tries <= self.RETRY_CNT: + return SyncStatus.DO_UPLOAD return SyncStatus.DO_NOTHING @@ -131,25 +141,53 @@ class SyncServer(): target_root = '/{}'.format(handler.get_root_name()) target_file = file.get("path", "").replace('{root}', target_root) - new_file_id = handler.upload_file(source_file, - target_file, - overwrite=True) - if new_file_id: - representation_id = representation.get("_id") - file_id = file.get("_id") - query = { - "_id": representation_id, - "files._id": file_id - } - io.update_many( - query - , - {"$set": {"files.$.sites.gdrive.id": new_file_id, - "files.$.sites.gdrive.created_dt": - datetime.datetime.utcnow()}} - ) + new_file_id = error = None + try: + import time + start = time.time() + new_file_id = handler.upload_file(source_file, + target_file, + overwrite=True) + duration = time.time() - start + log.info("one file took {}".format(duration)) + except FileNotFoundError as exp: + error = str(exp) + log.warning("File to be uploaded not found {}" + .format(source_file), exc_info=True) + except FileExistsError as exp: + error = str(exp) + log.warning("File already exists {}" + .format(target_file), exc_info=True) + except Exception as exp: + error = str(exp) + log.warning("Error happened during upload", exc_info=True) - log.info("file {} uploaded {}".format(source_file, new_file_id)) + representation_id = representation.get("_id") + file_id = file.get("_id") + query = { + "_id": representation_id, + "files._id": file_id + } + + update = {} + if new_file_id: + update["$set"] = self._get_success_dict(provider, new_file_id) + # reset previous errors if any + update["$unset"] = self._get_error_dict(provider, "", "") + else: + tries = self._get_tries_count(file, provider) + tries += 1 + + update["$set"] = self._get_error_dict(provider, error, tries) + + # it actually modifies single _id, but io.update_one not implemented + io.update_many( + query, + update + ) + if new_file_id: + log.info("file {} uploaded successfully {}".format(source_file, + new_file_id)) async def download(self, file, representation, provider): pass @@ -180,6 +218,42 @@ class SyncServer(): def thread_stopped(self): self._is_running = False + def _get_success_dict(self, provider, new_file_id): + """ + Provide success metadata ("id", "created_dt") to be stored in Db. + :param provider: used as part of path in DB (files.sites.gdrive) + :param new_file_id: id of created file + :return: + """ + val = {"files.$.sites.{}.id".format(provider): new_file_id, + "files.$.sites.{}.created_dt".format(provider): + datetime.datetime.utcnow()} + return val + + def _get_error_dict(self, provider, error = "", tries = ""): + """ + Provide error metadata to be stored in Db. + Used for set (error and tries provided) or unset mode. + :param provider: used as part of path in DB (files.sites.gdrive) + :param error: message + :param tries: how many times failed + :return: + """ + val = {"files.$.sites.{}.last_failed_dt".format(provider): + datetime.datetime.utcnow(), + "files.$.sites.{}.error".format(provider): error, + "files.$.sites.{}.tries".format(provider): tries} + return val + + def _get_tries_count(self, file, provider): + """ + Get number of failed attempts to synch + :param file: - info about specific file + :param provider: - gdrive, S3 etc + :return: - number of failed attempts + """ + return file.get("sites", {}).get(provider, {}).get("tries", 0) + class SynchServerThread(threading.Thread): """ @@ -212,10 +286,12 @@ class SynchServerThread(threading.Thread): async def sync_loop(self): while self.is_running: - - sync_representations = self.module.get_sync_representations() import time start_time = time.time() + sync_representations = self.module.get_sync_representations() + log.info("sync_representations duration {}" + .format(time.time()-start_time)) + for provider in lib.factory.providers: # TODO clumsy for sync in sync_representations: files = sync.get("files") or {}