From ea4113c5e40f85c146a5f7f9ad89576ebdecbc8c Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Tue, 22 Sep 2020 18:13:58 +0200 Subject: [PATCH] Added presets Better handling of not active providers --- .../providers/abstract_provider.py | 8 ++ pype/modules/sync_server/providers/gdrive.py | 31 ++++++- pype/modules/sync_server/sync_server.py | 91 +++++++++++++------ 3 files changed, 98 insertions(+), 32 deletions(-) diff --git a/pype/modules/sync_server/providers/abstract_provider.py b/pype/modules/sync_server/providers/abstract_provider.py index f40a964b2d..6931373561 100644 --- a/pype/modules/sync_server/providers/abstract_provider.py +++ b/pype/modules/sync_server/providers/abstract_provider.py @@ -3,6 +3,14 @@ from abc import ABCMeta, abstractmethod class AbstractProvider(metaclass=ABCMeta): + @abstractmethod + def is_active(self): + """ + Returns True if provider is activated, eg. has working credentials. + Returns: + (boolean) + """ + @abstractmethod def upload_file(self, source_path, target_path, overwrite=True): """ diff --git a/pype/modules/sync_server/providers/gdrive.py b/pype/modules/sync_server/providers/gdrive.py index fbb48e18a7..4d2958b3bd 100644 --- a/pype/modules/sync_server/providers/gdrive.py +++ b/pype/modules/sync_server/providers/gdrive.py @@ -7,6 +7,7 @@ from .abstract_provider import AbstractProvider from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload from pype.api import Logger from pype.lib import timeit +from pype.api import config SCOPES = ['https://www.googleapis.com/auth/drive.metadata.readonly', 'https://www.googleapis.com/auth/drive.file'] # for write|delete @@ -24,12 +25,26 @@ class GDriveHandler(AbstractProvider): lazy creation, created only after first call when necessary """ FOLDER_STR = 'application/vnd.google-apps.folder' - CREDENTIALS_FILE_URL = os.path.dirname(__file__) + '/credentials.json' def __init__(self, tree=None): + self.presets = None + self.active = False + try: + self.presets = config.get_presets()["sync_server"]["gdrive"] + except KeyError: + log.info(("Sync Server: There are no presets for Gdrive " + + "provider."). + format(str(self.presets))) + return + + if not os.path.exists(self.presets["credentials_url"]): + log.info("Sync Server: No credentials for Gdrive provider! ") + return + self.service = self._get_gd_service() self.root = self.service.files().get(fileId='root').execute() self._tree = tree + self.active = True def _get_gd_service(self): """ @@ -41,7 +56,7 @@ class GDriveHandler(AbstractProvider): None """ creds = service_account.Credentials.from_service_account_file( - self.CREDENTIALS_FILE_URL, + self.presets["credentials_url"], scopes=SCOPES) service = build('drive', 'v3', credentials=creds, cache_discovery=False) @@ -105,6 +120,14 @@ class GDriveHandler(AbstractProvider): return tree + def is_active(self): + """ + Returns True if provider is activated, eg. has working credentials. + Returns: + (boolean) + """ + return self.active + def get_tree(self): """ Building of the folder tree could be potentially expensive, @@ -375,7 +398,7 @@ class GDriveHandler(AbstractProvider): pageSize=1000, spaces='drive', fields=fields, - pageToken=page_token)\ + pageToken=page_token) \ .execute() folders.extend(response.get('files', [])) page_token = response.get('nextPageToken', None) @@ -399,7 +422,7 @@ class GDriveHandler(AbstractProvider): response = self.service.files().list(q=q, spaces='drive', fields=fields, - pageToken=page_token).\ + pageToken=page_token). \ execute() files.extend(response.get('files', [])) page_token = response.get('nextPageToken', None) diff --git a/pype/modules/sync_server/sync_server.py b/pype/modules/sync_server/sync_server.py index 0125c364da..f2912ff5d7 100644 --- a/pype/modules/sync_server/sync_server.py +++ b/pype/modules/sync_server/sync_server.py @@ -32,9 +32,10 @@ class SyncServer(): checks if 'created_dt' field is present denoting successful sync with provider destination. Sites structure is created during publish and by default it will - always contain 1 record with "name" == LOCAL_ID and filled "created_dt" - AND 1 or multiple records for all defined remote sites, where - "created_dt" is empty. This highlights that file should be uploaded to + always contain 1 record with "name" == self.presets["local_id"] and + filled "created_dt" AND 1 or multiple records for all defined + remote sites, where "created_dt" is not present. + This highlights that file should be uploaded to remote destination ''' - example of synced file test_Cylinder_lookMain_v010.ma to GDrive @@ -58,30 +59,26 @@ class SyncServer(): } }, ''' - Each Tray app has assigned its own LOCAL_ID (TODO from env) which is + Each Tray app has assigned its own self.presets["local_id"] used in sites as a name. Tray is searching only for records where - name matches its LOCAL_ID + any defined remote sites. + name matches its self.presets["local_id"] + any defined remote sites. If the local record has its "created_dt" filled, its a source and process will try to upload the file to all defined remote sites. - Remote files "id" is real id that could be used in appropriate API. + Remote files "id" is real id that could be used in approeckpriate API. Local files have "id" too, for conformity, contains just file name. It is expected that multiple providers will be implemented in separate classes and registered in 'providers.py'. """ # TODO all these move to presets - RETRY_CNT = 3 # number of attempts to sync specific file LOCAL_PROVIDER = 'studio' - LOCAL_ID = 'local_0' # personal id of this tray # limit querying DB to look for X number of representations that should # be sync, we try to run more loops with less records # actual number of files synced could be lower as providers can have # different limits imposed by its API # set 0 to no limit REPRESENTATION_LIMIT = 100 - # after how many seconds start next loop after end of previous - LOOP_DELAY = 60 def __init__(self): self.qaction = None @@ -95,40 +92,53 @@ class SyncServer(): io.Session['AVALON_PROJECT'] = 'performance_test' # temp TODO try: - self.presets = config.get_presets()["services"]["sync_server"] + self.presets = config.get_presets()["sync_server"]["config"] except KeyError: log.debug(("There are not set presets for SyncServer." " No credentials provided, no synching possible"). format(str(self.presets))) self.sync_server_thread = SynchServerThread(self) + # try to activate providers, need to have valid credentials + self.active_provider_names = [] + for provider in lib.factory.providers.keys(): + handler = lib.factory.get_provider(provider) + if handler.is_active(): + self.active_provider_names.append(provider) + @timeit def get_sync_representations(self): """ Get representations that should be synched, these could be recognised by presence of document in 'files.sites', where key is a provider (GDrive, S3) and value is empty document or document - without 'created_dt' field. (Don't put null to 'created_dt'!) + without 'created_dt' field. (Don't put null to 'created_dt'!). + Querying of 'to-be-synched' files is offloaded to Mongod for + better performance. Goal is to get as few representations as + possible. Returns: (list) """ + # retry_cnt - number of attempts to sync specific file before giving up retries_str = "null," + \ - ",".join([str(i) for i in range(self.RETRY_CNT)]) - representations = io.find({ + ",".join([str(i) + for i in range(self.presets["retry_cnt"])]) + active_providers_str = ",".join(self.active_provider_names) + query = { "type": "representation", "$or": [ {"$and": [ { "files.sites": { "$elemMatch": { - "name": self.LOCAL_ID, + "name": self.presets["local_id"], "created_dt": {"$exists": True} } }}, { "files.sites": { "$elemMatch": { - "name": "gdrive", + "name": {"$in": [active_providers_str]}, "created_dt": {"$exists": False}, "tries": {"$nin": [retries_str]} } @@ -138,21 +148,23 @@ class SyncServer(): { "files.sites": { "$elemMatch": { - "name": self.LOCAL_ID, + "name": self.presets["local_id"], "created_dt": {"$exists": False}, "tries": {"$nin": [retries_str]} } }}, { "files.sites": { "$elemMatch": { - "name": "gdrive", + "name": {"$in": [active_providers_str]}, "created_dt": {"$exists": True} } } } ]} ] - }).limit(self.REPRESENTATION_LIMIT) + } + log.debug("query: {}".format(query)) + representations = io.find(query).limit(self.REPRESENTATION_LIMIT) return representations @@ -163,7 +175,7 @@ class SyncServer(): (Eg. check if 'scene.ma' of lookdev.v10 should be synched to GDrive Always is comparing againts local record, eg. site with - 'name' == self.LOCAL_ID + 'name' == self.presets["local_id"] Args: file (dictionary): of file from representation in Mongo @@ -171,6 +183,7 @@ class SyncServer(): Returns: (string) - one of SyncStatus """ + log.debug("file: {}".format(file)) sites = file.get("sites") or [] # if isinstance(sites, list): # temporary, old format of 'sites' # return SyncStatus.DO_NOTHING @@ -181,17 +194,19 @@ class SyncServer(): tries = self._get_tries_count_from_rec(provider_rec) # file will be skipped if unsuccessfully tried over threshold # error metadata needs to be purged manually in DB to reset - if tries < self.RETRY_CNT: + if tries < self.presets["retry_cnt"]: return SyncStatus.DO_UPLOAD else: - _, local_rec = self._get_provider_rec(sites, self.LOCAL_ID)\ - or {} + _, local_rec = self._get_provider_rec( + sites, + self.presets["local_id"]) or {} + if not local_rec or not local_rec.get("created_dt"): tries = self._get_tries_count_from_rec(local_rec) # file will be skipped if unsuccessfully tried over # threshold times, error metadata needs to be purged # manually in DB to reset - if tries < self.RETRY_CNT: + if tries < self.presets["retry_cnt"]: return SyncStatus.DO_DOWNLOAD return SyncStatus.DO_NOTHING @@ -324,7 +339,18 @@ class SyncServer(): error_str)) def tray_start(self): - self.sync_server_thread.start() + """ + Triggered when Tray is started. Checks if configuration presets + are available and if there is any provider ('gdrive', 'S3') that + is activated (eg. has valid credentials). + Returns: + None + """ + if self.presets and self.active_provider_names: + self.sync_server_thread.start() + else: + log.debug("No presets or active providers. " + + "Synchronization not possible.") def tray_exit(self): self.stop() @@ -411,6 +437,15 @@ class SyncServer(): update ) + def get_loop_delay(self): + """ + Return count of seconds before next synchronization loop starts + after finish of previous loop. + Returns: + (int): in seconds + """ + return self.presets["loop_delay"] + def _get_success_dict(self, file_index, site_index, new_file_id): """ Provide success metadata ("id", "created_dt") to be stored in Db. @@ -518,7 +553,7 @@ class SynchServerThread(threading.Thread): self.is_running = True try: - log.info("Starting synchserver server") + log.info("Starting Sync Server") self.loop = asyncio.new_event_loop() # create new loop for thread asyncio.set_event_loop(self.loop) self.loop.set_default_executor(self.executor) @@ -549,7 +584,7 @@ class SynchServerThread(threading.Thread): # id processed_file_path = set() cnt = 0 # TODO remove - for provider in lib.factory.providers.keys(): + for provider in self.module.active_provider_names: handler = lib.factory.get_provider(provider) limit = lib.factory.get_provider_batch_limit(provider) # first call to get_provider could be expensive, its @@ -617,7 +652,7 @@ class SynchServerThread(threading.Thread): duration = time.time() - start_time log.debug("One loop took {}".format(duration)) - await asyncio.sleep(self.module.LOOP_DELAY) + await asyncio.sleep(self.module.get_loop_delay()) except ConnectionResetError: log.warning("ConnectionResetError in sync loop, trying next loop", exc_info=True)