diff --git a/pype/modules/sync_server/README.md b/pype/modules/sync_server/README.md index 4ee8310085..e6c055c1fb 100644 --- a/pype/modules/sync_server/README.md +++ b/pype/modules/sync_server/README.md @@ -2,9 +2,47 @@ Synchronization server --------------------- This server is scheduled at start of Pype, it periodically checks avalon DB for 'representation' records which have in theirs files.sites record with -name: 'gdrive' without field 'created_dt'. -This denotes that this representation should be sync to GDrive. +name: 'gdrive' (or any other site name from 'gdrive.json') without +field 'created_dt'. + +This denotes that this representation should be synced to GDrive. Records like these are created by IntegrateNew process based on configuration. +Leave 'config.json.remote_site' empty for not synchronizing at all. + +One provider could have multiple sites. (GDrive implementation is 'a provider', +target folder on it is 'a site') + +Quick HOWTOs: +------------- +I want to start syncing my newly published files: +------------------------------------------------ + +Get credentials for service account, share target folder on Gdrive with it +Set path to stored credentils file in gdrive.json +Set name of site, root folder in gdrive.json +Update config.json/remote_site to name of site you set in previous step +Start Pype and publish + +My published file is not syncing: +-------------------------------- + +Check that representation record contains for all 'files.site' skeleton in +format: {name: "MY_CONFIGURED_REMOTE_SITE"} +Check if that record doesn't have already 'created_dt' filled. That would +denote that file was synced but someone might have had removed it on remote +site. +If that records contains field "error", check that "tries" field doesn't +contain same value as threshold in config.json.retry_cnt. If it does fix +the problem mentioned in 'error' field, delete 'tries' field. + +I want to sync my already published files: +----------------------------------------- + +Configure your Pype for syncing (see first section of Howtos). +Manually add skeleton {name: "MY_CONFIGURED_REMOTE_SITE"} to all +representation.files.sites: +db.getCollection('MY_PROJECT').update({type:"representation"}, +{$set:{"files.$[].sites.MY_CONFIGURED_REMOTE_SITE" : {}}}, true, true) Needed configuration: -------------------- @@ -16,16 +54,23 @@ pype-config/presets/config.json: could by same as 'local_id' if user is working from home without connection to studio infrastructure - "remote_site": "gdrive" -- key for site to synchronize to (currently only - 'gdrive' implemented, but could be any provider - implemented in 'pype/modules/sync_server') + "remote_site": "gdrive" -- key for site to synchronize to. Must match to site + configured in 'gdrive.json'. + Used in IntegrateNew to prepare skeleton for + syncing in the representation record. + Leave empty if no syncing is wanted. + This is a general configuration, 'local_id', 'active_site' and 'remote_site' + will be set and changed by some GUI in the future. + pype-config/presets/gdrive.json: - "credentials_url": "/my_secret_folder/credentials.json", - -- path to credentials for service account - "root": { -- "root": "/My Drive" in simple scenario, this could be for - multiroot projects - "root_one": "/My Drive/work_folder", - "root_tow": "/My Drive/publish_folder" + "gdrive": { - site name, must be unique + "credentials_url": "/my_secret_folder/credentials.json", + -- path to credentials for service account + "root": { -- "root": "/My Drive" in simple scenario, config here for + -- multiroot projects + "root_one": "/My Drive/work_folder", + "root_tow": "/My Drive/publish_folder" + } } diff --git a/pype/modules/sync_server/providers/gdrive.py b/pype/modules/sync_server/providers/gdrive.py index de33246777..8e5be274d8 100644 --- a/pype/modules/sync_server/providers/gdrive.py +++ b/pype/modules/sync_server/providers/gdrive.py @@ -22,19 +22,30 @@ class GDriveHandler(AbstractProvider): structure is build in constructor to map folder paths to folder ids, which are used in API. Building of this tree might be expensive and slow and should be run only when necessary. Currently is set to - lazy creation, created only after first call when necessary + lazy creation, created only after first call when necessary. + + Configuration for provider is in pype-config/presets/gdrive.json + + Example of config: + "gdrive": { - site name + "credentials_url": "/my_secret_folder/credentials.json", + "root": { - could be "root": "/My Drive" for single root + "root_one": "/My Drive", + "root_two": "/My Drive/different_folder" + } + } """ FOLDER_STR = 'application/vnd.google-apps.folder' - def __init__(self, tree=None): + def __init__(self, site_name, tree=None): self.presets = None self.active = False - try: - self.presets = config.get_presets()["sync_server"]["gdrive"] - except KeyError: - log.info(("Sync Server: There are no presets for Gdrive " + - "provider."). - format(str(self.presets))) + self.site_name = site_name + + self.presets = self.get_presets().get(site_name, None) + if not self.presets: + log.info("Sync Server: There are no presets for {}.". + format(site_name)) return if not os.path.exists(self.presets["credentials_url"]): @@ -501,6 +512,24 @@ class GDriveHandler(AbstractProvider): return False return file[0] + @classmethod + def get_presets(cls): + """ + Get presets for this provider + Returns: + (dictionary) of configured sites + """ + provider_presets = None + try: + provider_presets = config.get_presets()["sync_server"]["gdrive"] + except KeyError: + log.info(("Sync Server: There are no presets for Gdrive " + + "provider."). + format(str(provider_presets))) + return + log.info("Provider_presets::{}".format(provider_presets)) + return provider_presets + def _handle_q(self, q, trashed=False): """ API list call contain trashed and hidden files/folder by default. Usually we dont want those, must be included in query explicitly. @@ -520,6 +549,6 @@ class GDriveHandler(AbstractProvider): if __name__ == '__main__': - gd = GDriveHandler() + gd = GDriveHandler('gdrive') print(gd.root) print(gd.get_tree()) diff --git a/pype/modules/sync_server/providers/lib.py b/pype/modules/sync_server/providers/lib.py index 07d8deb01c..5aaa7a78fd 100644 --- a/pype/modules/sync_server/providers/lib.py +++ b/pype/modules/sync_server/providers/lib.py @@ -14,33 +14,41 @@ class ProviderFactory: enum. """ def __init__(self): - self.providers = {} - self.creators = {} + self.providers = {} # {'PROVIDER_LABEL: {cls, int},..} def register_provider(self, provider, creator, batch_limit): """ Provide all necessary information for one specific remote provider - :param provider: - name of provider - :param creator: - class implementing AbstractProvider - :param batch_limit: - number of files that could be processed in + Args: + provider (string): name of provider + creator (class): class implementing AbstractProvider + batch_limit (int): number of files that could be processed in one loop (based on provider API quota) - :return: modifies self.providers + Returns: + modifies self.providers and self.sites """ self.providers[provider] = (creator, batch_limit) - def get_provider(self, provider, tree=None): + def get_provider(self, provider, site_name, tree=None): """ - Returns new instance of provider client. + Returns new instance of provider client for specific site. + One provider could have multiple sites. + 'tree' is used for injecting already created memory structure, without it constructor of provider would need to calculate it from scratch, which could be expensive. - :param provider: 'gdrive','S3' - :param tree: - folder paths to folder id structure - :return: + Args: + provider (string): 'gdrive','S3' + site_name (string): descriptor of site, different service accounts + must have different site name + tree (dictionary): - folder paths to folder id structure + Returns: + (implementation of AbstractProvider) """ creator_info = self._get_creator_info(provider) + site = creator_info[0](site_name, tree) # call init - return creator_info[0](tree) + return site def get_provider_batch_limit(self, provider): """ @@ -50,8 +58,9 @@ class ProviderFactory: (For example 'gdrive' has 1000 queries for 100 sec, one file could be multiple queries (one for each level of path + check if file exists) - :param provider: 'gdrive','S3' - :return: + Args: + provider (string): 'gdrive','S3' + Returns: """ info = self._get_creator_info(provider) return info[1] @@ -60,8 +69,9 @@ class ProviderFactory: """ Collect all necessary info for provider. Currently only creator class and batch limit - :param provider: - :return: + Args: + provider (string): 'gdrive' etc + Returns: """ creator_info = self.providers.get(provider) if not creator_info: diff --git a/pype/modules/sync_server/sync_server.py b/pype/modules/sync_server/sync_server.py index fe3ef5c774..88c89ded5f 100644 --- a/pype/modules/sync_server/sync_server.py +++ b/pype/modules/sync_server/sync_server.py @@ -1,5 +1,4 @@ from pype.api import config, Logger -from pypeapp.lib.anatomy import Roots from pype.lib import timeit import threading @@ -93,7 +92,6 @@ class SyncServer(): self.lock = threading.Lock() self.connection = AvalonMongoDB() - log.debug("connection {}".format(self.connection)) try: self.presets = config.get_presets()["sync_server"]["config"] @@ -108,11 +106,13 @@ class SyncServer(): self.remote_site = self.presets["remote_site"] # try to activate providers, need to have valid credentials - self.active_provider_names = [] + self.active_sites = [] for provider in lib.factory.providers.keys(): - handler = lib.factory.get_provider(provider) - if handler.is_active(): - self.active_provider_names.append(provider) + for site in lib.factory.providers[provider][0].get_presets().\ + keys(): + handler = lib.factory.get_provider(provider, site) + if handler.is_active(): + self.active_sites.append((provider, site)) @property def active_site(self): @@ -268,8 +268,8 @@ class SyncServer(): return SyncStatus.DO_UPLOAD else: _, local_rec = self._get_provider_rec( - sites, - self.presets["active_site"]) or {} + sites, + self.presets["active_site"]) or {} if not local_rec or not local_rec.get("created_dt"): tries = self._get_tries_count_from_rec(local_rec) @@ -281,7 +281,8 @@ class SyncServer(): return SyncStatus.DO_NOTHING - async def upload(self, file, representation, provider_name, tree=None): + async def upload(self, file, representation, provider_name, site_name, + tree=None): """ Upload single 'file' of a 'representation' to 'provider'. Source url is taken from 'file' portion, where {root} placeholder @@ -292,10 +293,12 @@ class SyncServer(): from GDrive), 'created_dt' - time of upload Args: - file : of file from representation in Mongo - representation : of representation - provider_name : gdrive, gdc etc. - tree : injected memory structure for performance + file (dictionary): of file from representation in Mongo + representation (dictionary): of representation + provider_name (string): gdrive, gdc etc. + site_name (string): site on provider, single provider(gdrive) could + have multiple sites (different accounts, credentials) + tree (dictionary): injected memory structure for performance """ # create ids sequentially, upload file in parallel later @@ -303,7 +306,7 @@ class SyncServer(): # this part modifies structure on 'remote_site', only single # thread can do that at a time, upload/download to prepared # structure should be run in parallel - handler = lib.factory.get_provider(provider_name, tree) + handler = lib.factory.get_provider(provider_name, site_name, tree) remote_file = self._get_remote_file_path(file, handler.get_roots_config() ) @@ -315,7 +318,7 @@ class SyncServer(): if not folder_id: err = "Folder {} wasn't created. Check permissions.".\ - format(target_folder) + format(target_folder) raise NotADirectoryError(err) loop = asyncio.get_running_loop() @@ -326,7 +329,8 @@ class SyncServer(): True) return file_id - async def download(self, file, representation, provider_name, tree=None): + async def download(self, file, representation, provider_name, + site_name, tree=None): """ Downloads file to local folder denoted in representation.Context. @@ -334,13 +338,15 @@ class SyncServer(): file (dictionary) : info about processed file representation (dictionary): repr that 'file' belongs to provider_name (string): 'gdrive' etc + site_name (string): site on provider, single provider(gdrive) could + have multiple sites (different accounts, credentials) tree (dictionary): injected memory structure for performance Returns: (string) - 'name' of local file """ with self.lock: - handler = lib.factory.get_provider(provider_name, tree) + handler = lib.factory.get_provider(provider_name, site_name, tree) remote_file = self._get_remote_file_path(file, handler.get_roots_config() ) @@ -411,7 +417,9 @@ class SyncServer(): source_file = file.get("path", "") log.debug("File {source_file} process {status} {error_str}". - format(status, source_file, error_str)) + format(status=status, + source_file=source_file, + error_str=error_str)) def tray_start(self): """ @@ -421,7 +429,7 @@ class SyncServer(): Returns: None """ - if self.presets and self.active_provider_names: + if self.presets and self.active_sites: self.sync_server_thread.start() else: log.debug("No presets or active providers. " + @@ -612,11 +620,10 @@ class SyncServer(): local_root (string): value of {root} for local projects Returns: - - absolute path on local system + (string) - absolute path on local system """ if not local_root: raise ValueError("Unknown local root for file {}") - roots = Roots().default_roots() path = file.get("path", "") return path.format(**{"root": local_root}) @@ -631,7 +638,6 @@ class SyncServer(): Returns: (string) - absolute path on remote location """ - log.debug("root_config::{}".format(root_config)) if isinstance(root_config, str): root_config = {'root': root_config} @@ -720,8 +726,9 @@ class SyncServerThread(threading.Thread): # upload process can find already uploaded file and # reuse same id processed_file_path = set() - for provider in self.module.active_provider_names: - handler = lib.factory.get_provider(provider) + for active_site in self.module.active_sites: + provider, site = active_site + handler = lib.factory.get_provider(provider, site) limit = lib.factory.get_provider_batch_limit(provider) # first call to get_provider could be expensive, its # building folder tree structure in memory @@ -743,15 +750,16 @@ class SyncServerThread(threading.Thread): tree = handler.get_tree() limit -= 1 task = asyncio.create_task( - self.module.upload(file, - sync, - provider, - tree)) + self.module.upload(file, + sync, + provider, + site, + tree)) task_files_to_process.append(task) # store info for exception handling files_processed_info.append((file, sync, - provider)) + site)) processed_file_path.add(file_path) if status == SyncStatus.DO_DOWNLOAD: tree = handler.get_tree() @@ -760,6 +768,7 @@ class SyncServerThread(threading.Thread): self.module.download(file, sync, provider, + site, tree)) task_files_to_process.append(task) @@ -771,11 +780,11 @@ class SyncServerThread(threading.Thread): log.debug("Sync tasks count {}". format(len(task_files_to_process))) files_created = await asyncio.gather( - *task_files_to_process, - return_exceptions=True) + *task_files_to_process, + return_exceptions=True) for file_id, info in zip(files_created, files_processed_info): - file, representation, provider = info + file, representation, site = info error = None if isinstance(file_id, BaseException): error = str(file_id) @@ -783,7 +792,7 @@ class SyncServerThread(threading.Thread): self.module.update_db(file_id, file, representation, - provider, + site, error) duration = time.time() - start_time diff --git a/pype/modules/websocket_server/websocket_server.py b/pype/modules/websocket_server/websocket_server.py index 1152c65e00..e3a14c0b1d 100644 --- a/pype/modules/websocket_server/websocket_server.py +++ b/pype/modules/websocket_server/websocket_server.py @@ -30,7 +30,7 @@ class WebSocketServer(): WebSocketServer._instance = self self.client = None self.handlers = {} - + port = 8099 websocket_url = os.getenv("WEBSOCKET_URL") if websocket_url: parsed = urllib.parse.urlparse(websocket_url)