From 5c8eac6b6357fa80859ffbed45be41cf8ae106da Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Fri, 29 Jul 2022 17:07:57 +0200 Subject: [PATCH 1/8] OP-3405 - replaced find with get_representations --- .../modules/sync_server/sync_server_module.py | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/openpype/modules/sync_server/sync_server_module.py b/openpype/modules/sync_server/sync_server_module.py index 4027561d22..81aff9368f 100644 --- a/openpype/modules/sync_server/sync_server_module.py +++ b/openpype/modules/sync_server/sync_server_module.py @@ -25,6 +25,8 @@ from .providers import lib from .utils import time_function, SyncStatus, SiteAlreadyPresentError +from openpype.client import get_representations + log = PypeLogger.get_logger("SyncServer") @@ -344,6 +346,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): "files.sites.name": site_name } + # TODO currently not possible to replace with get_representations representations = list( self.connection.database[collection].find(query)) if not representations: @@ -391,12 +394,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): """ self.log.debug("Validation of {} for {} started".format(collection, site_name)) - query = { - "type": "representation" - } - - representations = list( - self.connection.database[collection].find(query)) + representations = list(get_representations(collection)) if not representations: self.log.debug("No repre found") return @@ -1593,14 +1591,11 @@ class SyncServerModule(OpenPypeModule, ITrayModule): not 'force' ValueError - other errors (repre not found, misconfiguration) """ - query = { - "_id": ObjectId(representation_id) - } - - representation = self.connection.database[collection].find_one(query) - if not representation: + representations = get_representations(collection, [representation_id]) + if not representations: raise ValueError("Representation {} not found in {}". format(representation_id, collection)) + representation = representations[0] if side and site_name: raise ValueError("Misconfiguration, only one of side and " + "site_name arguments should be passed.") @@ -1808,18 +1803,15 @@ class SyncServerModule(OpenPypeModule, ITrayModule): provider_name = self.get_provider_for_site(site=site_name) if provider_name == 'local_drive': - query = { - "_id": ObjectId(representation_id) - } - - representation = list( - self.connection.database[collection].find(query)) - if not representation: + representations = list(get_representations(collection, + [representation_id], + fields=["files"])) + if not representations: self.log.debug("No repre {} found".format( representation_id)) return - representation = representation.pop() + representation = representations.pop() local_file_path = '' for file in representation.get("files"): local_file_path = self.get_local_file_path(collection, From c944ae35c9848045cfb73ccfc1b93f30f7af2989 Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Fri, 29 Jul 2022 17:17:03 +0200 Subject: [PATCH 2/8] OP-3405 - replaced find with get_representation_by_id --- openpype/modules/sync_server/tray/models.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/openpype/modules/sync_server/tray/models.py b/openpype/modules/sync_server/tray/models.py index 6d1e85c17a..a97797c920 100644 --- a/openpype/modules/sync_server/tray/models.py +++ b/openpype/modules/sync_server/tray/models.py @@ -11,6 +11,7 @@ from openpype.tools.utils.delegates import pretty_timestamp from openpype.lib import PypeLogger from openpype.api import get_local_site_id +from openpype.client import get_representation_by_id from . import lib @@ -919,8 +920,7 @@ class SyncRepresentationSummaryModel(_SyncRepresentationModel): repre_id = self.data(index, Qt.UserRole) - representation = list(self.dbcon.find({"type": "representation", - "_id": repre_id})) + representation = get_representation_by_id(self.project, repre_id) if representation: self.sync_server.update_db(self.project, None, None, representation.pop(), @@ -1357,11 +1357,10 @@ class SyncRepresentationDetailModel(_SyncRepresentationModel): file_id = self.data(index, Qt.UserRole) updated_file = None - # conversion from cursor to list - representations = list(self.dbcon.find({"type": "representation", - "_id": self._id})) + representation = get_representation_by_id(self.project, self._id) + if not representation: + return - representation = representations.pop() for repre_file in representation["files"]: if repre_file["_id"] == file_id: updated_file = repre_file From 292d071f442a494cabd2161512012b13e391a9f8 Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Fri, 29 Jul 2022 17:39:59 +0200 Subject: [PATCH 3/8] OP-3405 - query is required for updates --- openpype/modules/sync_server/sync_server_module.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/openpype/modules/sync_server/sync_server_module.py b/openpype/modules/sync_server/sync_server_module.py index 81aff9368f..6a3dbf6095 100644 --- a/openpype/modules/sync_server/sync_server_module.py +++ b/openpype/modules/sync_server/sync_server_module.py @@ -1611,6 +1611,10 @@ class SyncServerModule(OpenPypeModule, ITrayModule): elem = {"name": site_name} + query = { + "_id": ObjectId(representation_id) + } + if file_id: # reset site for particular file self._reset_site_for_file(collection, query, elem, file_id, site_name) From 0f5ec0f0c4cbd4db8c4968db75f6375b6bdf7f59 Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Fri, 29 Jul 2022 17:54:51 +0200 Subject: [PATCH 4/8] OP-3405 - used get_representation_by_id --- .../modules/sync_server/sync_server_module.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/openpype/modules/sync_server/sync_server_module.py b/openpype/modules/sync_server/sync_server_module.py index 6a3dbf6095..71e35c7839 100644 --- a/openpype/modules/sync_server/sync_server_module.py +++ b/openpype/modules/sync_server/sync_server_module.py @@ -25,7 +25,7 @@ from .providers import lib from .utils import time_function, SyncStatus, SiteAlreadyPresentError -from openpype.client import get_representations +from openpype.client import get_representations, get_representation_by_id log = PypeLogger.get_logger("SyncServer") @@ -1591,11 +1591,12 @@ class SyncServerModule(OpenPypeModule, ITrayModule): not 'force' ValueError - other errors (repre not found, misconfiguration) """ - representations = get_representations(collection, [representation_id]) - if not representations: + representation = get_representation_by_id(collection, + representation_id) + if not representation: raise ValueError("Representation {} not found in {}". format(representation_id, collection)) - representation = representations[0] + if side and site_name: raise ValueError("Misconfiguration, only one of side and " + "site_name arguments should be passed.") @@ -1807,15 +1808,14 @@ class SyncServerModule(OpenPypeModule, ITrayModule): provider_name = self.get_provider_for_site(site=site_name) if provider_name == 'local_drive': - representations = list(get_representations(collection, - [representation_id], - fields=["files"])) - if not representations: + representation = get_representation_by_id(collection, + representation_id, + fields=["files"]) + if not representation: self.log.debug("No repre {} found".format( representation_id)) return - representation = representations.pop() local_file_path = '' for file in representation.get("files"): local_file_path = self.get_local_file_path(collection, From 89bd23856c30e39f2493d99b2c743d3b918cccda Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Tue, 2 Aug 2022 12:25:51 +0200 Subject: [PATCH 5/8] OP-3405 - refactor - updated methods signature Renamed collection to project_name as when we are leaving MongoDB, collection doesnt make much sense. --- .../providers/abstract_provider.py | 8 +- .../modules/sync_server/providers/dropbox.py | 12 +- .../modules/sync_server/providers/gdrive.py | 16 +- .../sync_server/providers/local_drive.py | 12 +- .../modules/sync_server/providers/sftp.py | 16 +- openpype/modules/sync_server/sync_server.py | 71 +++---- .../modules/sync_server/sync_server_module.py | 189 +++++++++--------- openpype/modules/sync_server/tray/models.py | 2 +- 8 files changed, 164 insertions(+), 162 deletions(-) diff --git a/openpype/modules/sync_server/providers/abstract_provider.py b/openpype/modules/sync_server/providers/abstract_provider.py index 688a17f14f..8c2fe1cad9 100644 --- a/openpype/modules/sync_server/providers/abstract_provider.py +++ b/openpype/modules/sync_server/providers/abstract_provider.py @@ -62,7 +62,7 @@ class AbstractProvider: @abc.abstractmethod def upload_file(self, source_path, path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False): """ Copy file from 'source_path' to 'target_path' on provider. @@ -75,7 +75,7 @@ class AbstractProvider: arguments for saving progress: server (SyncServer): server instance to call update_db on - collection (str): name of collection + project_name (str): name of project_name file (dict): info about uploaded file (matches structure from db) representation (dict): complete repre containing 'file' site (str): site name @@ -87,7 +87,7 @@ class AbstractProvider: @abc.abstractmethod def download_file(self, source_path, local_path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False): """ Download file from provider into local system @@ -99,7 +99,7 @@ class AbstractProvider: arguments for saving progress: server (SyncServer): server instance to call update_db on - collection (str): name of collection + project_name (str): file (dict): info about uploaded file (matches structure from db) representation (dict): complete repre containing 'file' site (str): site name diff --git a/openpype/modules/sync_server/providers/dropbox.py b/openpype/modules/sync_server/providers/dropbox.py index dfc42fed75..89d6990841 100644 --- a/openpype/modules/sync_server/providers/dropbox.py +++ b/openpype/modules/sync_server/providers/dropbox.py @@ -224,7 +224,7 @@ class DropboxHandler(AbstractProvider): return False def upload_file(self, source_path, path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False): """ Copy file from 'source_path' to 'target_path' on provider. @@ -237,7 +237,7 @@ class DropboxHandler(AbstractProvider): arguments for saving progress: server (SyncServer): server instance to call update_db on - collection (str): name of collection + project_name (str): file (dict): info about uploaded file (matches structure from db) representation (dict): complete repre containing 'file' site (str): site name @@ -290,7 +290,7 @@ class DropboxHandler(AbstractProvider): cursor.offset = f.tell() server.update_db( - collection=collection, + project_name=project_name, new_file_id=None, file=file, representation=representation, @@ -301,7 +301,7 @@ class DropboxHandler(AbstractProvider): return path def download_file(self, source_path, local_path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False): """ Download file from provider into local system @@ -313,7 +313,7 @@ class DropboxHandler(AbstractProvider): arguments for saving progress: server (SyncServer): server instance to call update_db on - collection (str): name of collection + project_name (str): file (dict): info about uploaded file (matches structure from db) representation (dict): complete repre containing 'file' site (str): site name @@ -337,7 +337,7 @@ class DropboxHandler(AbstractProvider): self.dbx.files_download_to_file(local_path, source_path) server.update_db( - collection=collection, + project_name=project_name, new_file_id=None, file=file, representation=representation, diff --git a/openpype/modules/sync_server/providers/gdrive.py b/openpype/modules/sync_server/providers/gdrive.py index aa7329b104..bef707788b 100644 --- a/openpype/modules/sync_server/providers/gdrive.py +++ b/openpype/modules/sync_server/providers/gdrive.py @@ -251,7 +251,7 @@ class GDriveHandler(AbstractProvider): return folder_id def upload_file(self, source_path, path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False): """ Uploads single file from 'source_path' to destination 'path'. @@ -264,7 +264,7 @@ class GDriveHandler(AbstractProvider): arguments for saving progress: server (SyncServer): server instance to call update_db on - collection (str): name of collection + project_name (str): file (dict): info about uploaded file (matches structure from db) representation (dict): complete repre containing 'file' site (str): site name @@ -324,7 +324,7 @@ class GDriveHandler(AbstractProvider): while response is None: if server.is_representation_paused(representation['_id'], check_parents=True, - project_name=collection): + project_name=project_name): raise ValueError("Paused during process, please redo.") if status: status_val = float(status.progress()) @@ -333,7 +333,7 @@ class GDriveHandler(AbstractProvider): last_tick = time.time() log.debug("Uploaded %d%%." % int(status_val * 100)) - server.update_db(collection=collection, + server.update_db(project_name=project_name, new_file_id=None, file=file, representation=representation, @@ -358,7 +358,7 @@ class GDriveHandler(AbstractProvider): return response['id'] def download_file(self, source_path, local_path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False): """ Downloads single file from 'source_path' (remote) to 'local_path'. @@ -372,7 +372,7 @@ class GDriveHandler(AbstractProvider): arguments for saving progress: server (SyncServer): server instance to call update_db on - collection (str): name of collection + project_name (str): file (dict): info about uploaded file (matches structure from db) representation (dict): complete repre containing 'file' site (str): site name @@ -410,7 +410,7 @@ class GDriveHandler(AbstractProvider): while response is None: if server.is_representation_paused(representation['_id'], check_parents=True, - project_name=collection): + project_name=project_name): raise ValueError("Paused during process, please redo.") if status: status_val = float(status.progress()) @@ -419,7 +419,7 @@ class GDriveHandler(AbstractProvider): last_tick = time.time() log.debug("Downloaded %d%%." % int(status_val * 100)) - server.update_db(collection=collection, + server.update_db(project_name=project_name, new_file_id=None, file=file, representation=representation, diff --git a/openpype/modules/sync_server/providers/local_drive.py b/openpype/modules/sync_server/providers/local_drive.py index 172cb338cf..4951ef4d1a 100644 --- a/openpype/modules/sync_server/providers/local_drive.py +++ b/openpype/modules/sync_server/providers/local_drive.py @@ -82,7 +82,7 @@ class LocalDriveHandler(AbstractProvider): return editable def upload_file(self, source_path, target_path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False, direction="Upload"): """ Copies file from 'source_path' to 'target_path' @@ -95,7 +95,7 @@ class LocalDriveHandler(AbstractProvider): thread = threading.Thread(target=self._copy, args=(source_path, target_path)) thread.start() - self._mark_progress(collection, file, representation, server, + self._mark_progress(project_name, file, representation, server, site, source_path, target_path, direction) else: if os.path.exists(target_path): @@ -105,13 +105,13 @@ class LocalDriveHandler(AbstractProvider): return os.path.basename(target_path) def download_file(self, source_path, local_path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False): """ Download a file form 'source_path' to 'local_path' """ return self.upload_file(source_path, local_path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite, direction="Download") def delete_file(self, path): @@ -188,7 +188,7 @@ class LocalDriveHandler(AbstractProvider): except shutil.SameFileError: print("same files, skipping") - def _mark_progress(self, collection, file, representation, server, site, + def _mark_progress(self, project_name, file, representation, server, site, source_path, target_path, direction): """ Updates progress field in DB by values 0-1. @@ -204,7 +204,7 @@ class LocalDriveHandler(AbstractProvider): status_val = target_file_size / source_file_size last_tick = time.time() log.debug(direction + "ed %d%%." % int(status_val * 100)) - server.update_db(collection=collection, + server.update_db(project_name=project_name, new_file_id=None, file=file, representation=representation, diff --git a/openpype/modules/sync_server/providers/sftp.py b/openpype/modules/sync_server/providers/sftp.py index 49b87b14ec..302ffae3e6 100644 --- a/openpype/modules/sync_server/providers/sftp.py +++ b/openpype/modules/sync_server/providers/sftp.py @@ -222,7 +222,7 @@ class SFTPHandler(AbstractProvider): return os.path.basename(path) def upload_file(self, source_path, target_path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False): """ Uploads single file from 'source_path' to destination 'path'. @@ -235,7 +235,7 @@ class SFTPHandler(AbstractProvider): arguments for saving progress: server (SyncServer): server instance to call update_db on - collection (str): name of collection + project_name (str): file (dict): info about uploaded file (matches structure from db) representation (dict): complete repre containing 'file' site (str): site name @@ -256,7 +256,7 @@ class SFTPHandler(AbstractProvider): thread = threading.Thread(target=self._upload, args=(source_path, target_path)) thread.start() - self._mark_progress(collection, file, representation, server, + self._mark_progress(project_name, file, representation, server, site, source_path, target_path, "upload") return os.path.basename(target_path) @@ -267,7 +267,7 @@ class SFTPHandler(AbstractProvider): conn.put(source_path, target_path) def download_file(self, source_path, target_path, - server, collection, file, representation, site, + server, project_name, file, representation, site, overwrite=False): """ Downloads single file from 'source_path' (remote) to 'target_path'. @@ -281,7 +281,7 @@ class SFTPHandler(AbstractProvider): arguments for saving progress: server (SyncServer): server instance to call update_db on - collection (str): name of collection + project_name (str): file (dict): info about uploaded file (matches structure from db) representation (dict): complete repre containing 'file' site (str): site name @@ -302,7 +302,7 @@ class SFTPHandler(AbstractProvider): thread = threading.Thread(target=self._download, args=(source_path, target_path)) thread.start() - self._mark_progress(collection, file, representation, server, + self._mark_progress(project_name, file, representation, server, site, source_path, target_path, "download") return os.path.basename(target_path) @@ -425,7 +425,7 @@ class SFTPHandler(AbstractProvider): pysftp.exceptions.ConnectionException): log.warning("Couldn't connect", exc_info=True) - def _mark_progress(self, collection, file, representation, server, site, + def _mark_progress(self, project_name, file, representation, server, site, source_path, target_path, direction): """ Updates progress field in DB by values 0-1. @@ -446,7 +446,7 @@ class SFTPHandler(AbstractProvider): status_val = target_file_size / source_file_size last_tick = time.time() log.debug(direction + "ed %d%%." % int(status_val * 100)) - server.update_db(collection=collection, + server.update_db(project_name=project_name, new_file_id=None, file=file, representation=representation, diff --git a/openpype/modules/sync_server/sync_server.py b/openpype/modules/sync_server/sync_server.py index 356a75f99d..9cc55ec562 100644 --- a/openpype/modules/sync_server/sync_server.py +++ b/openpype/modules/sync_server/sync_server.py @@ -14,7 +14,7 @@ from .utils import SyncStatus, ResumableError log = PypeLogger().get_logger("SyncServer") -async def upload(module, collection, file, representation, provider_name, +async def upload(module, project_name, file, representation, provider_name, remote_site_name, tree=None, preset=None): """ Upload single 'file' of a 'representation' to 'provider'. @@ -31,7 +31,7 @@ async def upload(module, collection, file, representation, provider_name, Args: module(SyncServerModule): object to run SyncServerModule API - collection (str): source collection + project_name (str): source db file (dictionary): of file from representation in Mongo representation (dictionary): of representation provider_name (string): gdrive, gdc etc. @@ -47,7 +47,7 @@ async def upload(module, collection, file, representation, provider_name, # thread can do that at a time, upload/download to prepared # structure should be run in parallel remote_handler = lib.factory.get_provider(provider_name, - collection, + project_name, remote_site_name, tree=tree, presets=preset) @@ -55,7 +55,7 @@ async def upload(module, collection, file, representation, provider_name, file_path = file.get("path", "") try: local_file_path, remote_file_path = resolve_paths(module, - file_path, collection, remote_site_name, remote_handler + file_path, project_name, remote_site_name, remote_handler ) except Exception as exp: print(exp) @@ -74,27 +74,28 @@ async def upload(module, collection, file, representation, provider_name, local_file_path, remote_file_path, module, - collection, + project_name, file, representation, remote_site_name, True ) - module.handle_alternate_site(collection, representation, remote_site_name, + module.handle_alternate_site(project_name, representation, + remote_site_name, file["_id"], file_id) return file_id -async def download(module, collection, file, representation, provider_name, +async def download(module, project_name, file, representation, provider_name, remote_site_name, tree=None, preset=None): """ Downloads file to local folder denoted in representation.Context. Args: module(SyncServerModule): object to run SyncServerModule API - collection (str): source collection + project_name (str): source file (dictionary) : info about processed file representation (dictionary): repr that 'file' belongs to provider_name (string): 'gdrive' etc @@ -108,20 +109,20 @@ async def download(module, collection, file, representation, provider_name, """ with module.lock: remote_handler = lib.factory.get_provider(provider_name, - collection, + project_name, remote_site_name, tree=tree, presets=preset) file_path = file.get("path", "") local_file_path, remote_file_path = resolve_paths( - module, file_path, collection, remote_site_name, remote_handler + module, file_path, project_name, remote_site_name, remote_handler ) local_folder = os.path.dirname(local_file_path) os.makedirs(local_folder, exist_ok=True) - local_site = module.get_active_site(collection) + local_site = module.get_active_site(project_name) loop = asyncio.get_running_loop() file_id = await loop.run_in_executor(None, @@ -129,20 +130,20 @@ async def download(module, collection, file, representation, provider_name, remote_file_path, local_file_path, module, - collection, + project_name, file, representation, local_site, True ) - module.handle_alternate_site(collection, representation, local_site, + module.handle_alternate_site(project_name, representation, local_site, file["_id"], file_id) return file_id -def resolve_paths(module, file_path, collection, +def resolve_paths(module, file_path, project_name, remote_site_name=None, remote_handler=None): """ Returns tuple of local and remote file paths with {root} @@ -153,7 +154,7 @@ def resolve_paths(module, file_path, collection, Args: module(SyncServerModule): object to run SyncServerModule API file_path(string): path with {root} - collection(string): project name + project_name(string): project name remote_site_name(string): remote site remote_handler(AbstractProvider): implementation Returns: @@ -164,7 +165,7 @@ def resolve_paths(module, file_path, collection, remote_file_path = remote_handler.resolve_path(file_path) local_handler = lib.factory.get_provider( - 'local_drive', collection, module.get_active_site(collection)) + 'local_drive', project_name, module.get_active_site(project_name)) local_file_path = local_handler.resolve_path(file_path) return local_file_path, remote_file_path @@ -269,7 +270,7 @@ class SyncServerThread(threading.Thread): - gets list of collections in DB - gets list of active remote providers (has configuration, credentials) - - for each collection it looks for representations that should + - for each project_name it looks for representations that should be synced - synchronize found collections - update representations - fills error messages for exceptions @@ -282,17 +283,17 @@ class SyncServerThread(threading.Thread): import time start_time = time.time() self.module.set_sync_project_settings() # clean cache - collection = None + project_name = None enabled_projects = self.module.get_enabled_projects() - for collection in enabled_projects: - preset = self.module.sync_project_settings[collection] + for project_name in enabled_projects: + preset = self.module.sync_project_settings[project_name] - local_site, remote_site = self._working_sites(collection) + local_site, remote_site = self._working_sites(project_name) if not all([local_site, remote_site]): continue sync_repres = self.module.get_sync_representations( - collection, + project_name, local_site, remote_site ) @@ -310,7 +311,7 @@ class SyncServerThread(threading.Thread): remote_provider = \ self.module.get_provider_for_site(site=remote_site) handler = lib.factory.get_provider(remote_provider, - collection, + project_name, remote_site, presets=site_preset) limit = lib.factory.get_provider_batch_limit( @@ -341,7 +342,7 @@ class SyncServerThread(threading.Thread): limit -= 1 task = asyncio.create_task( upload(self.module, - collection, + project_name, file, sync, remote_provider, @@ -353,7 +354,7 @@ class SyncServerThread(threading.Thread): files_processed_info.append((file, sync, remote_site, - collection + project_name )) processed_file_path.add(file_path) if status == SyncStatus.DO_DOWNLOAD: @@ -361,7 +362,7 @@ class SyncServerThread(threading.Thread): limit -= 1 task = asyncio.create_task( download(self.module, - collection, + project_name, file, sync, remote_provider, @@ -373,7 +374,7 @@ class SyncServerThread(threading.Thread): files_processed_info.append((file, sync, local_site, - collection + project_name )) processed_file_path.add(file_path) @@ -384,12 +385,12 @@ class SyncServerThread(threading.Thread): return_exceptions=True) for file_id, info in zip(files_created, files_processed_info): - file, representation, site, collection = info + file, representation, site, project_name = info error = None if isinstance(file_id, BaseException): error = str(file_id) file_id = None - self.module.update_db(collection, + self.module.update_db(project_name, file_id, file, representation, @@ -399,7 +400,7 @@ class SyncServerThread(threading.Thread): duration = time.time() - start_time log.debug("One loop took {:.2f}s".format(duration)) - delay = self.module.get_loop_delay(collection) + delay = self.module.get_loop_delay(project_name) log.debug("Waiting for {} seconds to new loop".format(delay)) self.timer = asyncio.create_task(self.run_timer(delay)) await asyncio.gather(self.timer) @@ -458,19 +459,19 @@ class SyncServerThread(threading.Thread): self.timer.cancel() self.timer = None - def _working_sites(self, collection): - if self.module.is_project_paused(collection): + def _working_sites(self, project_name): + if self.module.is_project_paused(project_name): log.debug("Both sites same, skipping") return None, None - local_site = self.module.get_active_site(collection) - remote_site = self.module.get_remote_site(collection) + local_site = self.module.get_active_site(project_name) + remote_site = self.module.get_remote_site(project_name) if local_site == remote_site: log.debug("{}-{} sites same, skipping".format(local_site, remote_site)) return None, None - configured_sites = _get_configured_sites(self.module, collection) + configured_sites = _get_configured_sites(self.module, project_name) if not all([local_site in configured_sites, remote_site in configured_sites]): log.debug("Some of the sites {} - {} is not ".format(local_site, diff --git a/openpype/modules/sync_server/sync_server_module.py b/openpype/modules/sync_server/sync_server_module.py index 71e35c7839..c4d90416bb 100644 --- a/openpype/modules/sync_server/sync_server_module.py +++ b/openpype/modules/sync_server/sync_server_module.py @@ -130,12 +130,12 @@ class SyncServerModule(OpenPypeModule, ITrayModule): self.projects_processed = set() """ Start of Public API """ - def add_site(self, collection, representation_id, site_name=None, + def add_site(self, project_name, representation_id, site_name=None, force=False): """ Adds new site to representation to be synced. - 'collection' must have synchronization enabled (globally or + 'project_name' must have synchronization enabled (globally or project only) Used as a API endpoint from outside applications (Loader etc). @@ -143,7 +143,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): Use 'force' to reset existing site. Args: - collection (string): project name (must match DB) + project_name (string): project name (must match DB) representation_id (string): MongoDB _id value site_name (string): name of configured and active site force (bool): reset site if exists @@ -153,25 +153,25 @@ class SyncServerModule(OpenPypeModule, ITrayModule): not 'force' ValueError - other errors (repre not found, misconfiguration) """ - if not self.get_sync_project_setting(collection): + if not self.get_sync_project_setting(project_name): raise ValueError("Project not configured") if not site_name: site_name = self.DEFAULT_SITE - self.reset_site_on_representation(collection, + self.reset_site_on_representation(project_name, representation_id, site_name=site_name, force=force) - def remove_site(self, collection, representation_id, site_name, + def remove_site(self, project_name, representation_id, site_name, remove_local_files=False): """ Removes 'site_name' for particular 'representation_id' on - 'collection' + 'project_name' Args: - collection (string): project name (must match DB) + project_name (string): project name (must match DB) representation_id (string): MongoDB _id value site_name (string): name of configured and active site remove_local_files (bool): remove only files for 'local_id' @@ -180,15 +180,15 @@ class SyncServerModule(OpenPypeModule, ITrayModule): Returns: throws ValueError if any issue """ - if not self.get_sync_project_setting(collection): + if not self.get_sync_project_setting(project_name): raise ValueError("Project not configured") - self.reset_site_on_representation(collection, + self.reset_site_on_representation(project_name, representation_id, site_name=site_name, remove=True) if remove_local_files: - self._remove_local_file(collection, representation_id, site_name) + self._remove_local_file(project_name, representation_id, site_name) def compute_resource_sync_sites(self, project_name): """Get available resource sync sites state for publish process. @@ -335,9 +335,9 @@ class SyncServerModule(OpenPypeModule, ITrayModule): return alt_site_pairs - def clear_project(self, collection, site_name): + def clear_project(self, project_name, site_name): """ - Clear 'collection' of 'site_name' and its local files + Clear 'project_name' of 'site_name' and its local files Works only on real local sites, not on 'studio' """ @@ -348,15 +348,15 @@ class SyncServerModule(OpenPypeModule, ITrayModule): # TODO currently not possible to replace with get_representations representations = list( - self.connection.database[collection].find(query)) + self.connection.database[project_name].find(query)) if not representations: self.log.debug("No repre found") return for repre in representations: - self.remove_site(collection, repre.get("_id"), site_name, True) + self.remove_site(project_name, repre.get("_id"), site_name, True) - def create_validate_project_task(self, collection, site_name): + def create_validate_project_task(self, project_name, site_name): """Adds metadata about project files validation on a queue. This process will loop through all representation and check if @@ -373,28 +373,28 @@ class SyncServerModule(OpenPypeModule, ITrayModule): """ task = { "type": "validate", - "project_name": collection, - "func": lambda: self.validate_project(collection, site_name, + "project_name": project_name, + "func": lambda: self.validate_project(project_name, site_name, reset_missing=True) } - self.projects_processed.add(collection) + self.projects_processed.add(project_name) self.long_running_tasks.append(task) - def validate_project(self, collection, site_name, reset_missing=False): - """Validate 'collection' of 'site_name' and its local files + def validate_project(self, project_name, site_name, reset_missing=False): + """Validate 'project_name' of 'site_name' and its local files If file present and not marked with a 'site_name' in DB, DB is updated with site name and file modified date. Args: - collection (string): project name + project_name (string): project name site_name (string): active site name reset_missing (bool): if True reset site in DB if missing physically """ - self.log.debug("Validation of {} for {} started".format(collection, + self.log.debug("Validation of {} for {} started".format(project_name, site_name)) - representations = list(get_representations(collection)) + representations = list(get_representations(project_name)) if not representations: self.log.debug("No repre found") return @@ -414,7 +414,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): continue file_path = repre_file.get("path", "") - local_file_path = self.get_local_file_path(collection, + local_file_path = self.get_local_file_path(project_name, site_name, file_path) @@ -426,14 +426,11 @@ class SyncServerModule(OpenPypeModule, ITrayModule): "Adding site {} for {}".format(site_name, repre_id)) - query = { - "_id": repre_id - } created_dt = datetime.fromtimestamp( os.path.getmtime(local_file_path)) elem = {"name": site_name, "created_dt": created_dt} - self._add_site(collection, query, repre, elem, + self._add_site(project_name, repre, elem, site_name=site_name, file_id=repre_file["_id"], force=True) @@ -443,41 +440,42 @@ class SyncServerModule(OpenPypeModule, ITrayModule): self.log.debug("Resetting site {} for {}". format(site_name, repre_id)) self.reset_site_on_representation( - collection, repre_id, site_name=site_name, + project_name, repre_id, site_name=site_name, file_id=repre_file["_id"]) sites_reset += 1 if sites_added % 100 == 0: self.log.debug("Sites added {}".format(sites_added)) - self.log.debug("Validation of {} for {} ended".format(collection, + self.log.debug("Validation of {} for {} ended".format(project_name, site_name)) self.log.info("Sites added {}, sites reset {}".format(sites_added, reset_missing)) - def pause_representation(self, collection, representation_id, site_name): + def pause_representation(self, project_name, representation_id, site_name): """ Sets 'representation_id' as paused, eg. no syncing should be happening on it. Args: - collection (string): project name + project_name (string): project name representation_id (string): MongoDB objectId value site_name (string): 'gdrive', 'studio' etc. """ log.info("Pausing SyncServer for {}".format(representation_id)) self._paused_representations.add(representation_id) - self.reset_site_on_representation(collection, representation_id, + self.reset_site_on_representation(project_name, representation_id, site_name=site_name, pause=True) - def unpause_representation(self, collection, representation_id, site_name): + def unpause_representation(self, project_name, + representation_id, site_name): """ Sets 'representation_id' as unpaused. Does not fail or warn if repre wasn't paused. Args: - collection (string): project name + project_name (string): project name representation_id (string): MongoDB objectId value site_name (string): 'gdrive', 'studio' etc. """ @@ -487,7 +485,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): except KeyError: pass # self.paused_representations is not persistent - self.reset_site_on_representation(collection, representation_id, + self.reset_site_on_representation(project_name, representation_id, site_name=site_name, pause=False) def is_representation_paused(self, representation_id, @@ -518,7 +516,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): happening on all representation inside. Args: - project_name (string): collection name + project_name (string): project_name name """ log.info("Pausing SyncServer for {}".format(project_name)) self._paused_projects.add(project_name) @@ -530,7 +528,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): Does not fail or warn if project wasn't paused. Args: - project_name (string): collection name + project_name (string): """ log.info("Unpausing SyncServer for {}".format(project_name)) try: @@ -543,7 +541,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): Returns if 'project_name' is paused or not. Args: - project_name (string): collection name + project_name (string): check_parents (bool): check if server itself is not paused Returns: @@ -942,8 +940,8 @@ class SyncServerModule(OpenPypeModule, ITrayModule): return True return False - def handle_alternate_site(self, collection, representation, processed_site, - file_id, synced_file_id): + def handle_alternate_site(self, project_name, representation, + processed_site, file_id, synced_file_id): """ For special use cases where one site vendors another. @@ -956,7 +954,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): same location >> file is accesible on 'sftp' site right away. Args: - collection (str): name of project + project_name (str): name of project representation (dict) processed_site (str): real site_name of published/uploaded file file_id (ObjectId): DB id of file handled @@ -980,26 +978,23 @@ class SyncServerModule(OpenPypeModule, ITrayModule): alternate_sites = set(alternate_sites) for alt_site in alternate_sites: - query = { - "_id": representation["_id"] - } elem = {"name": alt_site, "created_dt": datetime.now(), "id": synced_file_id} self.log.debug("Adding alternate {} to {}".format( alt_site, representation["_id"])) - self._add_site(collection, query, + self._add_site(project_name, representation, elem, alt_site, file_id=file_id, force=True) """ End of Public API """ - def get_local_file_path(self, collection, site_name, file_path): + def get_local_file_path(self, project_name, site_name, file_path): """ Externalized for app """ - handler = LocalDriveHandler(collection, site_name) + handler = LocalDriveHandler(project_name, site_name) local_file_path = handler.resolve_path(file_path) return local_file_path @@ -1286,7 +1281,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): return sites.get(site, 'N/A') @time_function - def get_sync_representations(self, collection, active_site, remote_site): + def get_sync_representations(self, project_name, active_site, remote_site): """ Get representations that should be synced, these could be recognised by presence of document in 'files.sites', where key is @@ -1297,8 +1292,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): better performance. Goal is to get as few representations as possible. Args: - collection (string): name of collection (in most cases matches - project name + project_name (string): active_site (string): identifier of current active site (could be 'local_0' when working from home, 'studio' when working in the studio (default) @@ -1307,10 +1301,10 @@ class SyncServerModule(OpenPypeModule, ITrayModule): Returns: (list) of dictionaries """ - log.debug("Check representations for : {}".format(collection)) - self.connection.Session["AVALON_PROJECT"] = collection + log.debug("Check representations for : {}".format(project_name)) + self.connection.Session["AVALON_PROJECT"] = project_name # retry_cnt - number of attempts to sync specific file before giving up - retries_arr = self._get_retries_arr(collection) + retries_arr = self._get_retries_arr(project_name) match = { "type": "representation", "$or": [ @@ -1447,14 +1441,14 @@ class SyncServerModule(OpenPypeModule, ITrayModule): return SyncStatus.DO_NOTHING - def update_db(self, collection, new_file_id, file, representation, + def update_db(self, project_name, new_file_id, file, representation, site, error=None, progress=None, priority=None): """ Update 'provider' portion of records in DB with success (file_id) or error (exception) Args: - collection (string): name of project - force to db connection as + project_name (string): name of project - force to db connection as each file might come from different collection new_file_id (string): file (dictionary): info about processed file (pulled from DB) @@ -1497,7 +1491,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): if file_id: arr_filter.append({'f._id': ObjectId(file_id)}) - self.connection.database[collection].update_one( + self.connection.database[project_name].update_one( query, update, upsert=True, @@ -1560,7 +1554,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): return -1, None - def reset_site_on_representation(self, collection, representation_id, + def reset_site_on_representation(self, project_name, representation_id, side=None, file_id=None, site_name=None, remove=False, pause=None, force=False): """ @@ -1577,7 +1571,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): Should be used when repre should be synced to new site. Args: - collection (string): name of project (eg. collection) in DB + project_name (string): name of project (eg. collection) in DB representation_id(string): _id of representation file_id (string): file _id in representation side (string): local or remote side @@ -1591,18 +1585,18 @@ class SyncServerModule(OpenPypeModule, ITrayModule): not 'force' ValueError - other errors (repre not found, misconfiguration) """ - representation = get_representation_by_id(collection, + representation = get_representation_by_id(project_name, representation_id) if not representation: raise ValueError("Representation {} not found in {}". - format(representation_id, collection)) + format(representation_id, project_name)) if side and site_name: raise ValueError("Misconfiguration, only one of side and " + "site_name arguments should be passed.") - local_site = self.get_active_site(collection) - remote_site = self.get_remote_site(collection) + local_site = self.get_active_site(project_name) + remote_site = self.get_remote_site(project_name) if side: if side == 'local': @@ -1612,42 +1606,44 @@ class SyncServerModule(OpenPypeModule, ITrayModule): elem = {"name": site_name} - query = { - "_id": ObjectId(representation_id) - } - if file_id: # reset site for particular file - self._reset_site_for_file(collection, query, + self._reset_site_for_file(project_name, representation_id, elem, file_id, site_name) elif side: # reset site for whole representation - self._reset_site(collection, query, elem, site_name) + self._reset_site(project_name, representation_id, elem, site_name) elif remove: # remove site for whole representation - self._remove_site(collection, query, representation, site_name) + self._remove_site(project_name, + representation, site_name) elif pause is not None: - self._pause_unpause_site(collection, query, + self._pause_unpause_site(project_name, representation, site_name, pause) else: # add new site to all files for representation - self._add_site(collection, query, representation, elem, site_name, + self._add_site(project_name, representation, elem, site_name, force=force) - def _update_site(self, collection, query, update, arr_filter): + def _update_site(self, project_name, representation_id, + update, arr_filter): """ Auxiliary method to call update_one function on DB Used for refactoring ugly reset_provider_for_file """ - self.connection.database[collection].update_one( + query = { + "_id": ObjectId(representation_id) + } + + self.connection.database[project_name].update_one( query, update, upsert=True, array_filters=arr_filter ) - def _reset_site_for_file(self, collection, query, + def _reset_site_for_file(self, project_name, representation_id, elem, file_id, site_name): """ Resets 'site_name' for 'file_id' on representation in 'query' on - 'collection' + 'project_name' """ update = { "$set": {"files.$[f].sites.$[s]": elem} @@ -1660,9 +1656,9 @@ class SyncServerModule(OpenPypeModule, ITrayModule): {'f._id': file_id} ] - self._update_site(collection, query, update, arr_filter) + self._update_site(project_name, representation_id, update, arr_filter) - def _reset_site(self, collection, query, elem, site_name): + def _reset_site(self, project_name, representation_id, elem, site_name): """ Resets 'site_name' for all files of representation in 'query' """ @@ -1674,9 +1670,9 @@ class SyncServerModule(OpenPypeModule, ITrayModule): {'s.name': site_name} ] - self._update_site(collection, query, update, arr_filter) + self._update_site(project_name, representation_id, update, arr_filter) - def _remove_site(self, collection, query, representation, site_name): + def _remove_site(self, project_name, representation, site_name): """ Removes 'site_name' for 'representation' in 'query' @@ -1698,10 +1694,11 @@ class SyncServerModule(OpenPypeModule, ITrayModule): } arr_filter = [] - self._update_site(collection, query, update, arr_filter) + self._update_site(project_name, representation["_id"], + update, arr_filter) - def _pause_unpause_site(self, collection, query, - representation, site_name, pause): + def _pause_unpause_site(self, project_name, representation, + site_name, pause): """ Pauses/unpauses all files for 'representation' based on 'pause' @@ -1733,12 +1730,13 @@ class SyncServerModule(OpenPypeModule, ITrayModule): {'s.name': site_name} ] - self._update_site(collection, query, update, arr_filter) + self._update_site(project_name, representation["_id"], + update, arr_filter) - def _add_site(self, collection, query, representation, elem, site_name, + def _add_site(self, project_name, representation, elem, site_name, force=False, file_id=None): """ - Adds 'site_name' to 'representation' on 'collection' + Adds 'site_name' to 'representation' on 'project_name' Args: representation (dict) @@ -1746,10 +1744,11 @@ class SyncServerModule(OpenPypeModule, ITrayModule): Use 'force' to remove existing or raises ValueError """ + representation_id = representation["_id"] reset_existing = False files = representation.get("files", []) if not files: - log.debug("No files for {}".format(representation["_id"])) + log.debug("No files for {}".format(representation_id)) return for repre_file in files: @@ -1759,7 +1758,8 @@ class SyncServerModule(OpenPypeModule, ITrayModule): for site in repre_file.get("sites"): if site["name"] == site_name: if force or site.get("error"): - self._reset_site_for_file(collection, query, + self._reset_site_for_file(project_name, + representation_id, elem, repre_file["_id"], site_name) reset_existing = True @@ -1785,14 +1785,15 @@ class SyncServerModule(OpenPypeModule, ITrayModule): {'f._id': file_id} ] - self._update_site(collection, query, update, arr_filter) + self._update_site(project_name, representation_id, + update, arr_filter) - def _remove_local_file(self, collection, representation_id, site_name): + def _remove_local_file(self, project_name, representation_id, site_name): """ Removes all local files for 'site_name' of 'representation_id' Args: - collection (string): project name (must match DB) + project_name (string): project name (must match DB) representation_id (string): MongoDB _id value site_name (string): name of configured and active site @@ -1808,7 +1809,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): provider_name = self.get_provider_for_site(site=site_name) if provider_name == 'local_drive': - representation = get_representation_by_id(collection, + representation = get_representation_by_id(project_name, representation_id, fields=["files"]) if not representation: @@ -1818,7 +1819,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): local_file_path = '' for file in representation.get("files"): - local_file_path = self.get_local_file_path(collection, + local_file_path = self.get_local_file_path(project_name, site_name, file.get("path", "") ) diff --git a/openpype/modules/sync_server/tray/models.py b/openpype/modules/sync_server/tray/models.py index a97797c920..f05a5bd8ea 100644 --- a/openpype/modules/sync_server/tray/models.py +++ b/openpype/modules/sync_server/tray/models.py @@ -441,7 +441,7 @@ class SyncRepresentationSummaryModel(_SyncRepresentationModel): full text filtering. Allows pagination, most of heavy lifting is being done on DB side. - Single model matches to single collection. When project is changed, + Single model matches to single project. When project is changed, model is reset and refreshed. Args: From eb2c82558888fe5650bdab4bee1a60a498b685fa Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Tue, 2 Aug 2022 16:09:59 +0200 Subject: [PATCH 6/8] OP-3405 - extracted aggregate query from Loader to Site Sync module --- .../modules/sync_server/sync_server_module.py | 89 +++++++++++++++++ openpype/tools/loader/model.py | 95 ++----------------- 2 files changed, 98 insertions(+), 86 deletions(-) diff --git a/openpype/modules/sync_server/sync_server_module.py b/openpype/modules/sync_server/sync_server_module.py index c4d90416bb..8fdfab9c2e 100644 --- a/openpype/modules/sync_server/sync_server_module.py +++ b/openpype/modules/sync_server/sync_server_module.py @@ -988,6 +988,95 @@ class SyncServerModule(OpenPypeModule, ITrayModule): representation, elem, alt_site, file_id=file_id, force=True) + def get_repre_info_for_versions(self, project_name, version_ids, + active_site, remote_site): + """Returns representation documents for versions and sites combi + + Args: + project_name (str) + version_ids (list): of version[_id] + active_site (string): 'local', 'studio' etc + remote_site (string): dtto + Returns: + + """ + self.connection.Session["AVALON_PROJECT"] = project_name + query = [ + {"$match": {"parent": {"$in": version_ids}, + "type": "representation", + "files.sites.name": {"$exists": 1}}}, + {"$unwind": "$files"}, + {'$addFields': { + 'order_local': { + '$filter': { + 'input': '$files.sites', 'as': 'p', + 'cond': {'$eq': ['$$p.name', active_site]} + } + } + }}, + {'$addFields': { + 'order_remote': { + '$filter': { + 'input': '$files.sites', 'as': 'p', + 'cond': {'$eq': ['$$p.name', remote_site]} + } + } + }}, + {'$addFields': { + 'progress_local': {"$arrayElemAt": [{ + '$cond': [ + {'$size': "$order_local.progress"}, + "$order_local.progress", + # if exists created_dt count is as available + {'$cond': [ + {'$size': "$order_local.created_dt"}, + [1], + [0] + ]} + ]}, + 0 + ]} + }}, + {'$addFields': { + 'progress_remote': {"$arrayElemAt": [{ + '$cond': [ + {'$size': "$order_remote.progress"}, + "$order_remote.progress", + # if exists created_dt count is as available + {'$cond': [ + {'$size': "$order_remote.created_dt"}, + [1], + [0] + ]} + ]}, + 0 + ]} + }}, + {'$group': { # first group by repre + '_id': '$_id', + 'parent': {'$first': '$parent'}, + 'avail_ratio_local': { + '$first': { + '$divide': [{'$sum': "$progress_local"}, {'$sum': 1}] + } + }, + 'avail_ratio_remote': { + '$first': { + '$divide': [{'$sum': "$progress_remote"}, {'$sum': 1}] + } + } + }}, + {'$group': { # second group by parent, eg version_id + '_id': '$parent', + 'repre_count': {'$sum': 1}, # total representations + # fully available representation for site + 'avail_repre_local': {'$sum': "$avail_ratio_local"}, + 'avail_repre_remote': {'$sum': "$avail_ratio_remote"}, + }}, + ] + # docs = list(self.connection.aggregate(query)) + return self.connection.aggregate(query) + """ End of Public API """ def get_local_file_path(self, project_name, site_name, file_path): diff --git a/openpype/tools/loader/model.py b/openpype/tools/loader/model.py index a5174bd804..3ce44ea6c8 100644 --- a/openpype/tools/loader/model.py +++ b/openpype/tools/loader/model.py @@ -272,15 +272,15 @@ class SubsetsModel(TreeModel, BaseRepresentationModel): # update availability on active site when version changes if self.sync_server.enabled and version_doc: - query = self._repre_per_version_pipeline( + repre_info = self.sync_server.get_repre_info_for_versions( + project_name, [version_doc["_id"]], self.active_site, self.remote_site ) - docs = list(self.dbcon.aggregate(query)) - if docs: - repre = docs.pop() - version_doc["data"].update(self._get_repre_dict(repre)) + if repre_info: + version_doc["data"].update( + self._get_repre_dict(repre_info[0])) self.set_version(index, version_doc) @@ -478,16 +478,16 @@ class SubsetsModel(TreeModel, BaseRepresentationModel): for _subset_id, doc in last_versions_by_subset_id.items(): version_ids.add(doc["_id"]) - query = self._repre_per_version_pipeline( + repres = self.sync_server.get_repre_info_for_versions( + project_name, list(version_ids), self.active_site, self.remote_site ) - - for doc in self.dbcon.aggregate(query): + for repre in repres: if self._doc_fetching_stop: return doc["active_provider"] = self.active_provider doc["remote_provider"] = self.remote_provider - repre_info[doc["_id"]] = doc + repre_info[repre["_id"]] = repre self._doc_payload = { "asset_docs_by_id": asset_docs_by_id, @@ -827,83 +827,6 @@ class SubsetsModel(TreeModel, BaseRepresentationModel): return data - def _repre_per_version_pipeline(self, version_ids, - active_site, remote_site): - query = [ - {"$match": {"parent": {"$in": version_ids}, - "type": "representation", - "files.sites.name": {"$exists": 1}}}, - {"$unwind": "$files"}, - {'$addFields': { - 'order_local': { - '$filter': { - 'input': '$files.sites', 'as': 'p', - 'cond': {'$eq': ['$$p.name', active_site]} - } - } - }}, - {'$addFields': { - 'order_remote': { - '$filter': { - 'input': '$files.sites', 'as': 'p', - 'cond': {'$eq': ['$$p.name', remote_site]} - } - } - }}, - {'$addFields': { - 'progress_local': {"$arrayElemAt": [{ - '$cond': [ - {'$size': "$order_local.progress"}, - "$order_local.progress", - # if exists created_dt count is as available - {'$cond': [ - {'$size': "$order_local.created_dt"}, - [1], - [0] - ]} - ]}, - 0 - ]} - }}, - {'$addFields': { - 'progress_remote': {"$arrayElemAt": [{ - '$cond': [ - {'$size': "$order_remote.progress"}, - "$order_remote.progress", - # if exists created_dt count is as available - {'$cond': [ - {'$size': "$order_remote.created_dt"}, - [1], - [0] - ]} - ]}, - 0 - ]} - }}, - {'$group': { # first group by repre - '_id': '$_id', - 'parent': {'$first': '$parent'}, - 'avail_ratio_local': { - '$first': { - '$divide': [{'$sum': "$progress_local"}, {'$sum': 1}] - } - }, - 'avail_ratio_remote': { - '$first': { - '$divide': [{'$sum': "$progress_remote"}, {'$sum': 1}] - } - } - }}, - {'$group': { # second group by parent, eg version_id - '_id': '$parent', - 'repre_count': {'$sum': 1}, # total representations - # fully available representation for site - 'avail_repre_local': {'$sum': "$avail_ratio_local"}, - 'avail_repre_remote': {'$sum': "$avail_ratio_remote"}, - }}, - ] - return query - class GroupMemberFilterProxyModel(QtCore.QSortFilterProxyModel): """Provide the feature of filtering group by the acceptance of members From 26c4a0f8ca19eeb4faaa85ceac1524c3bed71b7d Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Tue, 2 Aug 2022 16:15:17 +0200 Subject: [PATCH 7/8] OP-3405 - Hound --- openpype/modules/sync_server/providers/local_drive.py | 3 ++- openpype/modules/sync_server/sync_server.py | 9 +++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/openpype/modules/sync_server/providers/local_drive.py b/openpype/modules/sync_server/providers/local_drive.py index 4951ef4d1a..01bc891d08 100644 --- a/openpype/modules/sync_server/providers/local_drive.py +++ b/openpype/modules/sync_server/providers/local_drive.py @@ -111,7 +111,8 @@ class LocalDriveHandler(AbstractProvider): Download a file form 'source_path' to 'local_path' """ return self.upload_file(source_path, local_path, - server, project_name, file, representation, site, + server, project_name, file, + representation, site, overwrite, direction="Download") def delete_file(self, path): diff --git a/openpype/modules/sync_server/sync_server.py b/openpype/modules/sync_server/sync_server.py index 9cc55ec562..97538fcd4e 100644 --- a/openpype/modules/sync_server/sync_server.py +++ b/openpype/modules/sync_server/sync_server.py @@ -54,8 +54,9 @@ async def upload(module, project_name, file, representation, provider_name, file_path = file.get("path", "") try: - local_file_path, remote_file_path = resolve_paths(module, - file_path, project_name, remote_site_name, remote_handler + local_file_path, remote_file_path = resolve_paths( + module, file_path, project_name, + remote_site_name, remote_handler ) except Exception as exp: print(exp) @@ -270,8 +271,8 @@ class SyncServerThread(threading.Thread): - gets list of collections in DB - gets list of active remote providers (has configuration, credentials) - - for each project_name it looks for representations that should - be synced + - for each project_name it looks for representations that + should be synced - synchronize found collections - update representations - fills error messages for exceptions - waits X seconds and repeat From d7d8d45ee5589741092a66187f42f2332296420a Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Wed, 3 Aug 2022 18:27:08 +0200 Subject: [PATCH 8/8] OP-3405 - representation is not a list Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> --- openpype/modules/sync_server/tray/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openpype/modules/sync_server/tray/models.py b/openpype/modules/sync_server/tray/models.py index f05a5bd8ea..629c4cbbf1 100644 --- a/openpype/modules/sync_server/tray/models.py +++ b/openpype/modules/sync_server/tray/models.py @@ -923,7 +923,7 @@ class SyncRepresentationSummaryModel(_SyncRepresentationModel): representation = get_representation_by_id(self.project, repre_id) if representation: self.sync_server.update_db(self.project, None, None, - representation.pop(), + representation, get_local_site_id(), priority=value) self.is_editing = False