diff --git a/pype/modules/sync_server/providers/gdrive.py b/pype/modules/sync_server/providers/gdrive.py index 9e30324432..2207fdf3a3 100644 --- a/pype/modules/sync_server/providers/gdrive.py +++ b/pype/modules/sync_server/providers/gdrive.py @@ -25,10 +25,14 @@ class GDriveHandler(AbstractProvider): slow and should be run only when necessary. Currently is set to lazy creation, created only after first call when necessary. - Configuration for provider is in pype-config/presets/gdrive.json + Configuration for provider is in + 'settings/defaults/project_settings/global.json' + + Settings could be overwritten per project. Example of config: "gdrive": { - site name + "provider": "gdrive", - type of provider, label must be registered "credentials_url": "/my_secret_folder/credentials.json", "root": { - could be "root": "/My Drive" for single root "root_one": "/My Drive", @@ -39,12 +43,12 @@ class GDriveHandler(AbstractProvider): FOLDER_STR = 'application/vnd.google-apps.folder' MY_DRIVE_STR = 'My Drive' # name of root folder of regular Google drive - def __init__(self, site_name, tree=None): + def __init__(self, site_name, tree=None, presets=None): self.presets = None self.active = False self.site_name = site_name - self.presets = self.get_presets().get(site_name, None) + self.presets = presets if not self.presets: log.info("Sync Server: There are no presets for {}.". format(site_name)) diff --git a/pype/modules/sync_server/providers/lib.py b/pype/modules/sync_server/providers/lib.py index 5aaa7a78fd..a6a52f0624 100644 --- a/pype/modules/sync_server/providers/lib.py +++ b/pype/modules/sync_server/providers/lib.py @@ -29,7 +29,7 @@ class ProviderFactory: """ self.providers[provider] = (creator, batch_limit) - def get_provider(self, provider, site_name, tree=None): + def get_provider(self, provider, site_name, tree=None, presets=None): """ Returns new instance of provider client for specific site. One provider could have multiple sites. @@ -42,11 +42,13 @@ class ProviderFactory: site_name (string): descriptor of site, different service accounts must have different site name tree (dictionary): - folder paths to folder id structure + presets (dictionary): config for provider and site (eg. + "credentials_url"..) Returns: (implementation of AbstractProvider) """ creator_info = self._get_creator_info(provider) - site = creator_info[0](site_name, tree) # call init + site = creator_info[0](site_name, tree, presets) # call init return site @@ -68,10 +70,16 @@ class ProviderFactory: def _get_creator_info(self, provider): """ Collect all necessary info for provider. Currently only creator - class and batch limit + class and batch limit. Args: provider (string): 'gdrive' etc Returns: + (tuple): (creator, batch_limit) + creator is class of a provider (ex: GDriveHandler) + batch_limit denotes how many files synced at single loop + its provided via 'register_provider' as its needed even + before provider class is initialized itself + (setting it as a class variable didn't work) """ creator_info = self.providers.get(provider) if not creator_info: @@ -81,4 +89,8 @@ class ProviderFactory: factory = ProviderFactory() +# this says that there is implemented provider with a label 'gdrive' +# there is implementing 'GDriveHandler' class +# 7 denotes number of files that could be synced in single loop - learned by +# trial and error factory.register_provider('gdrive', GDriveHandler, 7) diff --git a/pype/modules/sync_server/sync_server.py b/pype/modules/sync_server/sync_server.py index ef7a5616b7..0eff2b3b20 100644 --- a/pype/modules/sync_server/sync_server.py +++ b/pype/modules/sync_server/sync_server.py @@ -1,4 +1,4 @@ -from pype.api import get_system_settings, Logger +from pype.api import get_system_settings, get_project_settings, Logger import threading import asyncio @@ -92,79 +92,89 @@ class SyncServer(): self.connection = AvalonMongoDB() try: - self.presets = get_system_settings()["sync_server"]["config"] + self.presets = self.get_synced_presets() + self.set_active_sites(self.presets) self.sync_server_thread = SyncServerThread(self) + except KeyError as exp: + log.debug(("There are not set presets for SyncServer OR " + "Credentials provided are invalid, " + "no syncing possible"). + format(str(self.presets)), exc_info=True) - self.active_site = self.presets["active_site"] - self.remote_site = self.presets["remote_site"] - - # try to activate providers, need to have valid credentials - self.active_sites = [] - for provider in lib.factory.providers.keys(): - for site in lib.factory.providers[provider][0].get_presets(). \ - keys(): - handler = lib.factory.get_provider(provider, site) - if handler.is_active(): - self.active_sites.append((provider, site)) - except KeyError: - log.debug(("There are not set presets for SyncServer." - " No credentials provided, no syncing possible"). - format(str(self.presets))) - - @property - def active_site(self): + def get_synced_presets(self): """ - Returns active 'local' site (could be personal location on user - laptop or general 'studio' mounted disk. - Its 'mine' part of synchronization. - + Collects all projects which have enabled syncing and their settings Returns: - (string) + (dict): of settings, keys are project names """ - return self._active_site + sync_presets = {} + for collection in self.connection.database.collection_names(False): + sync_settings = self.get_synced_preset(collection) + if sync_settings: + sync_presets[collection] = sync_settings - @active_site.setter - def active_site(self, value): + return sync_presets + + def get_synced_preset(self, project_name): + """ Handles pulling sync_server's settings for enabled 'project_name' + + Args: + project_name (str): used in project settings + Returns: + (dict): settings dictionary for the enabled project, + empty if no settings or sync is disabled """ - Sets 'mine' part of synchronization process. It is expected only - single site is active at the time. Active site could be changed - though on different location (user working in studio has - 'active_site' = 'studio', when user is at home changes - 'active_site' to 'john_doe_local_001'. + settings = get_project_settings(project_name) + sync_settings = settings.get("global")["sync_server"] + if not sync_settings: + log.debug("No project setting for sync_server, not syncing.") + return {} + if sync_settings.get("enabled"): + return sync_settings + return {} + + def set_active_sites(self, settings): + """ + Sets 'self.active_sites' as a dictionary from provided 'settings' + + Format: + { 'project_name' : ('provider_name', 'site_name') } Args: - value (string): label for site, needs to match representation's - 'files.site'.keys() - - Returns: - (string) + settings (dict): all enabled project sync setting (sites labesl, + retries count etc.) """ - self._active_site = value + self.active_sites = {} + for project_name, project_setting in settings.items(): + for site_name, config in project_setting.get("sites").items(): + handler = lib.factory.get_provider(config["provider"], + site_name, + presets=config) + if handler.is_active(): + if not self.active_sites.get('project_name'): + self.active_sites[project_name] = [] - @property - def remote_site(self): - """ - Remote side of synchronization, where "to synchronize to". - Currently expected only single remote destination ('gdrive'..), - but prepared for multiple. - Denotes 'theirs' side of synchronization. + self.active_sites[project_name].append( + (config["provider"], site_name)) - Returns: - (list) of strings (['gdrive']) - """ - return [self._remote_site] + if not self.active_sites: + log.debug("No sync sites active, no working credentials provided") - @remote_site.setter - def remote_site(self, value): - self._remote_site = value + def get_active_sites(self, project_name): + """ + Returns active sites (provider configured and able to connect) per + project. - def get_collections(self): + Args: + project_name (str): used as a key in dict + + Returns: + (dict): + Format: + { 'project_name' : ('provider_name', 'site_name') } """ - Returns: - (list) of strings with collection names in avalon DB - """ - return self.connection.database.collection_names(False) + return self.active_sites[project_name] @time_function def get_sync_representations(self, collection, active_site, remote_site): @@ -191,8 +201,8 @@ class SyncServer(): log.debug("Check representations for : {}".format(collection)) self.connection.Session["AVALON_PROJECT"] = collection # retry_cnt - number of attempts to sync specific file before giving up - retries_arr = self._get_retries_arr() - active_providers_str = ",".join(remote_site) + retries_arr = self._get_retries_arr(collection) + #active_providers_str = ",".join(remote_site) query = { "type": "representation", "$or": [ @@ -206,7 +216,7 @@ class SyncServer(): }}, { "files.sites": { "$elemMatch": { - "name": {"$in": [active_providers_str]}, + "name": {"$in": [remote_site]}, "created_dt": {"$exists": False}, "tries": {"$in": retries_arr} } @@ -223,7 +233,7 @@ class SyncServer(): }}, { "files.sites": { "$elemMatch": { - "name": {"$in": [active_providers_str]}, + "name": {"$in": [remote_site]}, "created_dt": {"$exists": True} } } @@ -237,18 +247,19 @@ class SyncServer(): return representations - def check_status(self, file, provider_name): + def check_status(self, file, provider_name, config_preset): """ Check synchronization status for single 'file' of single 'representation' by single 'provider'. (Eg. check if 'scene.ma' of lookdev.v10 should be synced to GDrive Always is comparing local record, eg. site with - 'name' == self.presets["active_site"] + 'name' == self.presets[PROJECT_NAME]['config']["active_site"] Args: file (dictionary): of file from representation in Mongo provider_name (string): - gdrive etc. + config_preset (dict): config about active site, retries Returns: (string) - one of SyncStatus """ @@ -262,25 +273,25 @@ class SyncServer(): tries = self._get_tries_count_from_rec(provider_rec) # file will be skipped if unsuccessfully tried over threshold # error metadata needs to be purged manually in DB to reset - if tries < self.presets["retry_cnt"]: + if tries < config_preset["retry_cnt"]: return SyncStatus.DO_UPLOAD else: _, local_rec = self._get_provider_rec( sites, - self.presets["active_site"]) or {} + config_preset["active_site"]) or {} if not local_rec or not local_rec.get("created_dt"): tries = self._get_tries_count_from_rec(local_rec) # file will be skipped if unsuccessfully tried over # threshold times, error metadata needs to be purged # manually in DB to reset - if tries < self.presets["retry_cnt"]: + if tries < config_preset["retry_cnt"]: return SyncStatus.DO_DOWNLOAD return SyncStatus.DO_NOTHING async def upload(self, file, representation, provider_name, site_name, - tree=None): + tree=None, preset=None): """ Upload single 'file' of a 'representation' to 'provider'. Source url is taken from 'file' portion, where {root} placeholder @@ -297,6 +308,7 @@ class SyncServer(): site_name (string): site on provider, single provider(gdrive) could have multiple sites (different accounts, credentials) tree (dictionary): injected memory structure for performance + preset (dictionary): site config ('credentials_url', 'root'...) """ # create ids sequentially, upload file in parallel later @@ -304,7 +316,8 @@ class SyncServer(): # this part modifies structure on 'remote_site', only single # thread can do that at a time, upload/download to prepared # structure should be run in parallel - handler = lib.factory.get_provider(provider_name, site_name, tree) + handler = lib.factory.get_provider(provider_name, site_name, + tree=tree, presets=preset) remote_file = self._get_remote_file_path(file, handler.get_roots_config() ) @@ -328,7 +341,7 @@ class SyncServer(): return file_id async def download(self, file, representation, provider_name, - site_name, tree=None): + site_name, tree=None, preset=None): """ Downloads file to local folder denoted in representation.Context. @@ -339,12 +352,14 @@ class SyncServer(): site_name (string): site on provider, single provider(gdrive) could have multiple sites (different accounts, credentials) tree (dictionary): injected memory structure for performance + preset (dictionary): site config ('credentials_url', 'root'...) Returns: (string) - 'name' of local file """ with self.lock: - handler = lib.factory.get_provider(provider_name, site_name, tree) + handler = lib.factory.get_provider(provider_name, site_name, + tree=tree, presets=preset) remote_file = self._get_remote_file_path(file, handler.get_roots_config() ) @@ -538,14 +553,14 @@ class SyncServer(): update ) - def get_loop_delay(self): + def get_loop_delay(self, project_name): """ Return count of seconds before next synchronization loop starts after finish of previous loop. Returns: (int): in seconds """ - return self.presets["loop_delay"] + return int(self.presets[project_name]["config"]["loop_delay"]) def _get_success_dict(self, file_index, site_index, new_file_id): """ @@ -642,7 +657,7 @@ class SyncServer(): path = path.format(**root_config) return path - def _get_retries_arr(self): + def _get_retries_arr(self, project_name): """ Returns array with allowed values in 'tries' field. If repre contains these values, it means it was tried to be synchronized @@ -651,7 +666,8 @@ class SyncServer(): Returns: (list) """ - arr = [i for i in range(self.presets["retry_cnt"])] + retry_cnt = self.presets[project_name].get("config")["retry_cnt"] + arr = [i for i in range(int(retry_cnt))] arr.append(None) return arr @@ -706,15 +722,16 @@ class SyncServerThread(threading.Thread): while self.is_running: import time start_time = None - for collection in self.module.get_collections(): + for collection, preset in self.module.get_synced_presets().\ + items(): start_time = time.time() sync_repres = self.module.get_sync_representations( collection, - self.module.active_site, - self.module.remote_site + preset.get('config')["active_site"], + preset.get('config')["remote_site"] ) - local = self.module.active_site + local = preset.get('config')["active_site"] task_files_to_process = [] files_processed_info = [] # process only unique file paths in one batch @@ -723,9 +740,12 @@ class SyncServerThread(threading.Thread): # upload process can find already uploaded file and # reuse same id processed_file_path = set() - for active_site in self.module.active_sites: - provider, site = active_site - handler = lib.factory.get_provider(provider, site) + for check_site in self.module.get_active_sites(collection): + provider, site = check_site + site_preset = preset.get('sites')[site] + handler = lib.factory.get_provider(provider, + site, + presets=site_preset) limit = lib.factory.get_provider_batch_limit(provider) # first call to get_provider could be expensive, its # building folder tree structure in memory @@ -741,8 +761,10 @@ class SyncServerThread(threading.Thread): if file_path in processed_file_path: continue - status = self.module.check_status(file, - provider) + status = self.module.check_status( + file, + provider, + preset.get('config')) if status == SyncStatus.DO_UPLOAD: tree = handler.get_tree() limit -= 1 @@ -751,7 +773,8 @@ class SyncServerThread(threading.Thread): sync, provider, site, - tree)) + tree, + site_preset)) task_files_to_process.append(task) # store info for exception handling files_processed_info.append((file, @@ -762,11 +785,12 @@ class SyncServerThread(threading.Thread): tree = handler.get_tree() limit -= 1 task = asyncio.create_task( - self.module.download(file, - sync, - provider, - site, - tree)) + self.module.download(file, + sync, + provider, + site, + tree, + site_preset)) task_files_to_process.append(task) files_processed_info.append((file, @@ -794,7 +818,7 @@ class SyncServerThread(threading.Thread): duration = time.time() - start_time log.debug("One loop took {:.2f}s".format(duration)) - await asyncio.sleep(self.module.get_loop_delay()) + await asyncio.sleep(self.module.get_loop_delay(collection)) except ConnectionResetError: log.warning("ConnectionResetError in sync loop, trying next loop", exc_info=True) diff --git a/pype/settings/defaults/project_settings/global.json b/pype/settings/defaults/project_settings/global.json index b3cc1bcaae..6fd8c3d57f 100644 --- a/pype/settings/defaults/project_settings/global.json +++ b/pype/settings/defaults/project_settings/global.json @@ -180,16 +180,20 @@ } }, "sync_server": { - "enabled": false, + "enabled": true, "config": { - "local_id": "", - "retry_cnt": "", - "loop_delay": "", - "active_site": "", - "remote_site": "" + "local_id": "local_0", + "retry_cnt": "3", + "loop_delay": "60", + "active_site": "studio", + "remote_site": "gdrive" }, - "providers": { - "gdrive": {} + "sites": { + "gdrive": { + "provider": "gdrive", + "credentials_url": "", + "root": "" + } } } } diff --git a/pype/settings/defaults/system_settings/modules.json b/pype/settings/defaults/system_settings/modules.json index a985a789b5..9bf5936861 100644 --- a/pype/settings/defaults/system_settings/modules.json +++ b/pype/settings/defaults/system_settings/modules.json @@ -146,20 +146,8 @@ "enabled": false, "workspace_name": "studio name" }, - "sync_server": { - "enabled": false, - "config": { - "local_id": "local_0", - "retry_cnt": "3", - "loop_delay": "60", - "active_site": "studio", - "remote_site": "gdrive" - }, - "providers": { - "gdrive": { - "credentials_url": "" - } - } + "Sync Server": { + "enabled": true }, "deadline": { "enabled": true,