diff --git a/openpype/lib/avalon_context.py b/openpype/lib/avalon_context.py index e82dcc558f..077b8086d4 100644 --- a/openpype/lib/avalon_context.py +++ b/openpype/lib/avalon_context.py @@ -1973,3 +1973,119 @@ def get_last_workfile( return os.path.normpath(os.path.join(workdir, filename)) return filename + + +@with_avalon +def get_linked_ids_for_representations(project_name, repre_ids, dbcon=None, + link_type=None, max_depth=0): + """Returns list of linked ids of particular type (if provided). + + Goes from representations to version, back to representations + Args: + project_name (str) + repre_ids (list) or (ObjectId) + dbcon (avalon.mongodb.AvalonMongoDB, optional): Avalon Mongo connection + with Session. + link_type (str): ['reference', '..] + max_depth (int): limit how many levels of recursion + Returns: + (list) of ObjectId - linked representations + """ + # Create new dbcon if not passed and use passed project name + if not dbcon: + from avalon.api import AvalonMongoDB + dbcon = AvalonMongoDB() + dbcon.Session["AVALON_PROJECT"] = project_name + # Validate that passed dbcon has same project + elif dbcon.Session["AVALON_PROJECT"] != project_name: + raise ValueError("Passed connection does not have right project") + + if not isinstance(repre_ids, list): + repre_ids = [repre_ids] + + version_ids = dbcon.distinct("parent", { + "_id": {"$in": repre_ids}, + "type": "representation" + }) + + match = { + "_id": {"$in": version_ids}, + "type": "version" + } + + graph_lookup = { + "from": project_name, + "startWith": "$data.inputLinks.id", + "connectFromField": "data.inputLinks.id", + "connectToField": "_id", + "as": "outputs_recursive", + "depthField": "depth" + } + if max_depth != 0: + # We offset by -1 since 0 basically means no recursion + # but the recursion only happens after the initial lookup + # for outputs. + graph_lookup["maxDepth"] = max_depth - 1 + + pipeline_ = [ + # Match + {"$match": match}, + # Recursive graph lookup for inputs + {"$graphLookup": graph_lookup} + ] + + result = dbcon.aggregate(pipeline_) + referenced_version_ids = _process_referenced_pipeline_result(result, + link_type) + + ref_ids = dbcon.distinct( + "_id", + filter={ + "parent": {"$in": list(referenced_version_ids)}, + "type": "representation" + } + ) + + return list(ref_ids) + + +def _process_referenced_pipeline_result(result, link_type): + """Filters result from pipeline for particular link_type. + + Pipeline cannot use link_type directly in a query. + Returns: + (list) + """ + referenced_version_ids = set() + correctly_linked_ids = set() + for item in result: + input_links = item["data"].get("inputLinks", []) + correctly_linked_ids = _filter_input_links(input_links, + link_type, + correctly_linked_ids) + + # outputs_recursive in random order, sort by depth + outputs_recursive = sorted(item.get("outputs_recursive", []), + key=lambda d: d["depth"]) + + for output in outputs_recursive: + if output["_id"] not in correctly_linked_ids: # leaf + continue + + correctly_linked_ids = _filter_input_links( + output["data"].get("inputLinks", []), + link_type, + correctly_linked_ids) + + referenced_version_ids.add(output["_id"]) + + return referenced_version_ids + + +def _filter_input_links(input_links, link_type, correctly_linked_ids): + for input_link in input_links: + if not link_type or input_link["type"] == link_type: + correctly_linked_ids.add(input_link.get("id") or + input_link.get("_id")) # legacy + + return correctly_linked_ids diff --git a/openpype/modules/sync_server/sync_server_module.py b/openpype/modules/sync_server/sync_server_module.py index 3744a21b43..d7d7fde90c 100644 --- a/openpype/modules/sync_server/sync_server_module.py +++ b/openpype/modules/sync_server/sync_server_module.py @@ -23,7 +23,7 @@ from openpype.settings.lib import ( from .providers.local_drive import LocalDriveHandler from .providers import lib -from .utils import time_function, SyncStatus +from .utils import time_function, SyncStatus, SiteAlreadyPresentError log = PypeLogger().get_logger("SyncServer") @@ -131,21 +131,25 @@ class SyncServerModule(OpenPypeModule, ITrayModule): def add_site(self, collection, representation_id, site_name=None, force=False): """ - Adds new site to representation to be synced. + Adds new site to representation to be synced. - 'collection' must have synchronization enabled (globally or - project only) + 'collection' must have synchronization enabled (globally or + project only) - Used as a API endpoint from outside applications (Loader etc) + Used as a API endpoint from outside applications (Loader etc). - Args: - collection (string): project name (must match DB) - representation_id (string): MongoDB _id value - site_name (string): name of configured and active site - force (bool): reset site if exists + Use 'force' to reset existing site. - Returns: - throws ValueError if any issue + Args: + collection (string): project name (must match DB) + representation_id (string): MongoDB _id value + site_name (string): name of configured and active site + force (bool): reset site if exists + + Throws: + SiteAlreadyPresentError - if adding already existing site and + not 'force' + ValueError - other errors (repre not found, misconfiguration) """ if not self.get_sync_project_setting(collection): raise ValueError("Project not configured") @@ -155,7 +159,8 @@ class SyncServerModule(OpenPypeModule, ITrayModule): self.reset_site_on_representation(collection, representation_id, - site_name=site_name, force=force) + site_name=site_name, + force=force) def remove_site(self, collection, representation_id, site_name, remove_local_files=False): @@ -351,36 +356,38 @@ class SyncServerModule(OpenPypeModule, ITrayModule): def create_validate_project_task(self, collection, site_name): """Adds metadata about project files validation on a queue. - This process will loop through all representation and check if - their files actually exist on an active site. + This process will loop through all representation and check if + their files actually exist on an active site. - This might be useful for edge cases when artists is switching - between sites, remote site is actually physically mounted and - active site has same file urls etc. + It also checks if site is set in DB, but file is physically not + present - Task will run on a asyncio loop, shouldn't be blocking. + This might be useful for edge cases when artists is switching + between sites, remote site is actually physically mounted and + active site has same file urls etc. + + Task will run on a asyncio loop, shouldn't be blocking. """ task = { "type": "validate", "project_name": collection, - "func": lambda: self.validate_project(collection, site_name) + "func": lambda: self.validate_project(collection, site_name, + reset_missing=True) } self.projects_processed.add(collection) self.long_running_tasks.append(task) - def validate_project(self, collection, site_name, remove_missing=False): - """ - Validate 'collection' of 'site_name' and its local files + def validate_project(self, collection, site_name, reset_missing=False): + """Validate 'collection' of 'site_name' and its local files - If file present and not marked with a 'site_name' in DB, DB is - updated with site name and file modified date. + If file present and not marked with a 'site_name' in DB, DB is + updated with site name and file modified date. - Args: - module (SyncServerModule) - collection (string): project name - site_name (string): active site name - remove_missing (bool): if True remove sites in DB if missing - physically + Args: + collection (string): project name + site_name (string): active site name + reset_missing (bool): if True reset site in DB if missing + physically """ self.log.debug("Validation of {} for {} started".format(collection, site_name)) @@ -395,29 +402,32 @@ class SyncServerModule(OpenPypeModule, ITrayModule): return sites_added = 0 - sites_removed = 0 + sites_reset = 0 for repre in representations: repre_id = repre["_id"] for repre_file in repre.get("files", []): try: - has_site = site_name in [site["name"] - for site in repre_file["sites"]] - except TypeError: + is_on_site = site_name in [site["name"] + for site in repre_file["sites"] + if (site.get("created_dt") and + not site.get("error"))] + except (TypeError, AttributeError): self.log.debug("Structure error in {}".format(repre_id)) continue - if has_site and not remove_missing: - continue - file_path = repre_file.get("path", "") local_file_path = self.get_local_file_path(collection, site_name, file_path) - if local_file_path and os.path.exists(local_file_path): - self.log.debug("Adding site {} for {}".format(site_name, - repre_id)) - if not has_site: + file_exists = (local_file_path and + os.path.exists(local_file_path)) + if not is_on_site: + if file_exists: + self.log.debug( + "Adding site {} for {}".format(site_name, + repre_id)) + query = { "_id": repre_id } @@ -425,27 +435,27 @@ class SyncServerModule(OpenPypeModule, ITrayModule): os.path.getmtime(local_file_path)) elem = {"name": site_name, "created_dt": created_dt} - self._add_site(collection, query, [repre], elem, + self._add_site(collection, query, repre, elem, site_name=site_name, - file_id=repre_file["_id"]) + file_id=repre_file["_id"], + force=True) sites_added += 1 else: - if has_site and remove_missing: - self.log.debug("Removing site {} for {}". + if not file_exists and reset_missing: + self.log.debug("Resetting site {} for {}". format(site_name, repre_id)) - self.reset_provider_for_file(collection, - repre_id, - file_id=repre_file["_id"], - remove=True) - sites_removed += 1 + self.reset_site_on_representation( + collection, repre_id, site_name=site_name, + file_id=repre_file["_id"]) + sites_reset += 1 if sites_added % 100 == 0: self.log.debug("Sites added {}".format(sites_added)) self.log.debug("Validation of {} for {} ended".format(collection, site_name)) - self.log.info("Sites added {}, sites removed {}".format(sites_added, - sites_removed)) + self.log.info("Sites added {}, sites reset {}".format(sites_added, + reset_missing)) def pause_representation(self, collection, representation_id, site_name): """ @@ -963,7 +973,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): self.log.debug("Adding alternate {} to {}".format( alt_site, representation["_id"])) self._add_site(collection, query, - [representation], elem, + representation, elem, alt_site, file_id=file_id, force=True) """ End of Public API """ @@ -1567,14 +1577,16 @@ class SyncServerModule(OpenPypeModule, ITrayModule): pause (bool or None): if True - pause, False - unpause force (bool): hard reset - currently only for add_site - Returns: - throws ValueError + Raises: + SiteAlreadyPresentError - if adding already existing site and + not 'force' + ValueError - other errors (repre not found, misconfiguration) """ query = { "_id": ObjectId(representation_id) } - representation = list(self.connection.database[collection].find(query)) + representation = self.connection.database[collection].find_one(query) if not representation: raise ValueError("Representation {} not found in {}". format(representation_id, collection)) @@ -1605,7 +1617,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): representation, site_name, pause) else: # add new site to all files for representation self._add_site(collection, query, representation, elem, site_name, - force) + force=force) def _update_site(self, collection, query, update, arr_filter): """ @@ -1660,7 +1672,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): Throws ValueError if 'site_name' not found on 'representation' """ found = False - for repre_file in representation.pop().get("files"): + for repre_file in representation.get("files"): for site in repre_file.get("sites"): if site.get("name") == site_name: found = True @@ -1686,7 +1698,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): """ found = False site = None - for repre_file in representation.pop().get("files"): + for repre_file in representation.get("files"): for site in repre_file.get("sites"): if site["name"] == site_name: found = True @@ -1718,29 +1730,34 @@ class SyncServerModule(OpenPypeModule, ITrayModule): Adds 'site_name' to 'representation' on 'collection' Args: - representation (list of 1 dict) + representation (dict) file_id (ObjectId) Use 'force' to remove existing or raises ValueError """ - reseted_existing = False - for repre_file in representation.pop().get("files"): + reset_existing = False + files = representation.get("files", []) + if not files: + log.debug("No files for {}".format(representation["_id"])) + return + + for repre_file in files: if file_id and file_id != repre_file["_id"]: continue for site in repre_file.get("sites"): if site["name"] == site_name: - if force: + if force or site.get("error"): self._reset_site_for_file(collection, query, elem, repre_file["_id"], site_name) - reseted_existing = True + reset_existing = True else: msg = "Site {} already present".format(site_name) log.info(msg) - raise ValueError(msg) + raise SiteAlreadyPresentError(msg) - if reseted_existing: + if reset_existing: return if not file_id: @@ -1904,7 +1921,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): (int) - number of failed attempts """ _, rec = self._get_site_rec(file.get("sites", []), provider) - return rec.get("tries", 0) + return self._get_tries_count_from_rec(rec) def _get_progress_dict(self, progress): """ diff --git a/openpype/modules/sync_server/utils.py b/openpype/modules/sync_server/utils.py index 85e4e03f77..03f362202f 100644 --- a/openpype/modules/sync_server/utils.py +++ b/openpype/modules/sync_server/utils.py @@ -8,6 +8,11 @@ class ResumableError(Exception): pass +class SiteAlreadyPresentError(Exception): + """Representation has already site skeleton present.""" + pass + + class SyncStatus: DO_NOTHING = 0 DO_UPLOAD = 1 diff --git a/openpype/plugins/load/add_site.py b/openpype/plugins/load/add_site.py index 95001691e2..55fda55d17 100644 --- a/openpype/plugins/load/add_site.py +++ b/openpype/plugins/load/add_site.py @@ -1,9 +1,19 @@ from openpype.modules import ModulesManager from openpype.pipeline import load +from openpype.lib.avalon_context import get_linked_ids_for_representations +from openpype.modules.sync_server.utils import SiteAlreadyPresentError class AddSyncSite(load.LoaderPlugin): - """Add sync site to representation""" + """Add sync site to representation + + If family of synced representation is 'workfile', it looks for all + representations which are referenced (loaded) in workfile with content of + 'inputLinks'. + It doesn't do any checks for site, most common use case is when artist is + downloading workfile to his local site, but it might be helpful when + artist is re-uploading broken representation on remote site also. + """ representations = ["*"] families = ["*"] @@ -12,21 +22,42 @@ class AddSyncSite(load.LoaderPlugin): icon = "download" color = "#999999" + _sync_server = None + is_add_site_loader = True + + @property + def sync_server(self): + if not self._sync_server: + manager = ModulesManager() + self._sync_server = manager.modules_by_name["sync_server"] + + return self._sync_server + def load(self, context, name=None, namespace=None, data=None): self.log.info("Adding {} to representation: {}".format( data["site_name"], data["_id"])) - self.add_site_to_representation(data["project_name"], - data["_id"], - data["site_name"]) - self.log.debug("Site added.") + family = context["representation"]["context"]["family"] + project_name = data["project_name"] + repre_id = data["_id"] + site_name = data["site_name"] - @staticmethod - def add_site_to_representation(project_name, representation_id, site_name): - """Adds new site to representation_id, resets if exists""" - manager = ModulesManager() - sync_server = manager.modules_by_name["sync_server"] - sync_server.add_site(project_name, representation_id, site_name, - force=True) + self.sync_server.add_site(project_name, repre_id, site_name, + force=True) + + if family == "workfile": + links = get_linked_ids_for_representations(project_name, + [repre_id], + link_type="reference") + for link_repre_id in links: + try: + self.sync_server.add_site(project_name, link_repre_id, + site_name, + force=False) + except SiteAlreadyPresentError: + # do not add/reset working site for references + self.log.debug("Site present", exc_info=True) + + self.log.debug("Site added.") def filepath_from_context(self, context): """No real file loading""" diff --git a/openpype/plugins/load/remove_site.py b/openpype/plugins/load/remove_site.py index adffec9986..c5f442b2f5 100644 --- a/openpype/plugins/load/remove_site.py +++ b/openpype/plugins/load/remove_site.py @@ -12,22 +12,26 @@ class RemoveSyncSite(load.LoaderPlugin): icon = "download" color = "#999999" + _sync_server = None + is_remove_site_loader = True + + @property + def sync_server(self): + if not self._sync_server: + manager = ModulesManager() + self._sync_server = manager.modules_by_name["sync_server"] + + return self._sync_server + def load(self, context, name=None, namespace=None, data=None): self.log.info("Removing {} on representation: {}".format( data["site_name"], data["_id"])) - self.remove_site_on_representation(data["project_name"], - data["_id"], - data["site_name"]) + self.sync_server.remove_site(data["project_name"], + data["_id"], + data["site_name"], + True) self.log.debug("Site added.") - @staticmethod - def remove_site_on_representation(project_name, representation_id, - site_name): - manager = ModulesManager() - sync_server = manager.modules_by_name["sync_server"] - sync_server.remove_site(project_name, representation_id, - site_name, True) - def filepath_from_context(self, context): """No real file loading""" return "" diff --git a/openpype/tools/utils/lib.py b/openpype/tools/utils/lib.py index 8e2044482a..20fea6600b 100644 --- a/openpype/tools/utils/lib.py +++ b/openpype/tools/utils/lib.py @@ -727,11 +727,11 @@ def is_sync_loader(loader): def is_remove_site_loader(loader): - return hasattr(loader, "remove_site_on_representation") + return hasattr(loader, "is_remove_site_loader") def is_add_site_loader(loader): - return hasattr(loader, "add_site_to_representation") + return hasattr(loader, "is_add_site_loader") class WrappedCallbackItem: