diff --git a/openpype/plugins/publish/integrate_new.py b/openpype/plugins/publish/integrate_new.py index ead00452da..7a3ca2bdf7 100644 --- a/openpype/plugins/publish/integrate_new.py +++ b/openpype/plugins/publish/integrate_new.py @@ -6,7 +6,7 @@ import clique import six from bson.objectid import ObjectId -from pymongo import DeleteOne, InsertOne, UpdateOne +from pymongo import DeleteMany, ReplaceOne, InsertOne, UpdateOne import pyblish.api from avalon import io import openpype.api @@ -28,6 +28,11 @@ def get_first_frame_padded(collection): return get_frame_padded(start_frame, padding=collection.padding) +def bulk_write(writes): + """Convenience function to bulk write into active project database""" + return io._database[io.Session["AVALON_PROJECT"]].bulk_write(writes) + + class IntegrateAssetNew(pyblish.api.InstancePlugin): """Resolve any dependency issues @@ -177,21 +182,17 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): template_name = self._get_template_name(instance) - subset, subset_writes = self.register_subset(instance) - version, version_writes = self.register_version(instance, subset) + subset, subset_writes = self.prepare_subset(instance) + version, version_writes = self.prepare_version(instance, subset) instance.data["versionEntity"] = version - # Bulk write to the database - # todo: Try to avoid writing already until after we've prepared - # representations to allow easier rollback? - io._database[io.Session["AVALON_PROJECT"]].bulk_write( - subset_writes + version_writes - ) - - archived_repres = list(io.find({ - "parent": version["_id"], - "type": "archived_representation" - })) + # Get existing representations (if any) + existing_repres_by_name = { + repres["name"].lower(): repres for repres in io.find({ + "parent": version["_id"], + "type": "representation" + }) + } # Prepare all representations prepared_representations = [] @@ -205,7 +206,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): # todo: reduce/simplify what is returned from this function prepared = self.prepare_representation(repre, template_name, - archived_repres, + existing_repres_by_name, version, instance_stagingdir, instance) @@ -225,40 +226,70 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): for src, dst in instance.data.get("hardlinks", []): file_transactions.add(src, dst, mode=FileTransaction.MODE_HARDLINK) + # Bulk write to the database + # todo: Can we move this even to after the file transfers? + bulk_write(subset_writes + version_writes) + self.log.info("Subset {subset[name]} and Version {version[name]} " + "written to database..".format(subset=subset, + version=version)) + # Process all file transfers of all integrations now self.log.debug("Integrating source files to destination ...") file_transactions.process() - self.log.debug("Backup files " + self.log.debug("Backed up existing files: " "{}".format(file_transactions.backups)) - self.log.debug("Integrated files " + self.log.debug("Transferred files: " "{}".format(file_transactions.transferred)) # Finalize the representations now the published files are integrated # Get 'files' info for representations and its attached resources - self.log.debug("Retrieving Representation files information ...") + self.log.debug("Retrieving Representation Site Sync information ...") sites = SiteSync.compute_resource_sync_sites( system_settings=instance.context.data["system_settings"], project_settings=instance.context.data["project_settings"] ) - log.debug("final sites:: {}".format(sites)) + self.log.debug("final sites:: {}".format(sites)) anatomy = instance.context.data["anatomy"] - representations = [] + representation_writes = [] + new_repre_names_low = set() for prepared in prepared_representations: transfers = prepared["transfers"] representation = prepared["representation"] representation["files"] = self.get_files_info( transfers, sites, anatomy ) - representations.append(representation) - # Remove all archived representations - if archived_repres: - repre_ids_to_remove = [repre["_id"] for repre in archived_repres] - io.delete_many({"_id": {"$in": repre_ids_to_remove}}) + # Set up representation for writing to the database. Since + # we *might* be overwriting an existing entry if the version + # already existed we'll use ReplaceOnce with `upsert=True` + representation_writes.append(ReplaceOne( + filter={"_id": representation["_id"]}, + replacement=representation, + upsert=True + )) - # Write the new representations to the database - io.insert_many(representations) + new_repre_names_low.add(representation["name"].lower()) + + # Delete any existing representations that didn't get any new data + # if the instance is not set to append mode + if not instance.data.get("append", False): + delete_names = set() + for name, existing_repres in existing_repres_by_name.items(): + if name not in new_repre_names_low: + # We add the exact representation name because `name` is + # lowercase for name matching only and not in the database + delete_names.add(existing_repres["name"]) + if delete_names: + representation_writes.append(DeleteMany( + filter={ + "parent": version["_id"], + "name": {"$in": list(delete_names)} + } + )) + + # Write representations to the database + bulk_write(representation_writes) # Backwards compatibility # todo: can we avoid the need to store this? @@ -267,12 +298,11 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): } self.log.info("Registered {} representations" - "".format(len(representations))) + "".format(len(prepared_representations))) - def register_version(self, instance, subset): + def prepare_version(self, instance, subset): version_number = instance.data["version"] - self.log.debug("Version: v{0:03d}".format(version_number)) version = { "schema": "openpype:version-3.0", @@ -288,61 +318,26 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): 'name': version_number }) - bulk_writes = [] - if existing_version is None: + if existing_version: + self.log.debug("Updating existing version ...") + version["_id"] = existing_version["_id"] + else: self.log.debug("Creating new version ...") version["_id"] = ObjectId() - bulk_writes.append(InsertOne(version)) - else: - self.log.debug("Updating existing version ...") - # Check if instance have set `append` mode which cause that - # only replicated representations are set to archive - append_repres = instance.data.get("append", False) - # Update version data - version_id = existing_version['_id'] - bulk_writes.append(UpdateOne({ - '_id': version_id - }, { - '$set': version - })) + bulk_writes = [ReplaceOne( + filter={"_id": version["_id"]}, + replacement=version, + upsert=True + )] - # Instead of directly writing and querying we reproduce what - # the resulting version would look like so we can hold off making - # changes to the database to avoid the need for 'rollback' - version = copy.deepcopy(version) - version["_id"] = existing_version["_id"] - - # Find representations of existing version and archive them - repres = instance.data.get("representations", []) - new_repre_names_low = [_repre["name"].lower() for _repre in repres] - current_repres = io.find({ - "type": "representation", - "parent": version_id - }) - for repre in current_repres: - if append_repres: - # archive only duplicated representations - if repre["name"].lower() not in new_repre_names_low: - continue - # Representation must change type, - # `_id` must be stored to other key and replaced with new - # - that is because new representations should have same ID - repre_id = repre["_id"] - bulk_writes.append(DeleteOne({"_id": repre_id})) - - repre["orig_id"] = repre_id - repre["_id"] = ObjectId() - repre["type"] = "archived_representation" - bulk_writes.append(InsertOne(repre)) - - self.log.info("Registered version: v{0:03d}".format(version["name"])) + self.log.info("Prepared version: v{0:03d}".format(version["name"])) return version, bulk_writes def prepare_representation(self, repre, template_name, - archived_repres, + existing_repres_by_name, version, instance_stagingdir, instance): @@ -516,15 +511,12 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): if repre.get("udim"): repre_context["udim"] = repre.get("udim") # store list - # Define representation id - repre_id = ObjectId() - # Use previous representation's id if there is a name match - repre_name_lower = repre["name"].lower() - for _archived_repres in archived_repres: - if repre_name_lower == _archived_repres["name"].lower(): - repre_id = _archived_repres["orig_id"] - break + existing = existing_repres_by_name.get(repre["name"].lower()) + if existing: + repre_id = existing["_id"] + else: + repre_id = ObjectId() # Backwards compatibility: # Store first transferred destination as published path data @@ -594,7 +586,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): template_name = profile["template_name"] return template_name - def register_subset(self, instance): + def prepare_subset(self, instance): asset = instance.data.get("assetEntity") subset_name = instance.data["subset"] self.log.debug("Subset: {}".format(subset_name)) @@ -631,7 +623,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): else: # Update existing subset data with new data and set in database. - # We also change the found subset in-place so we don't need to + # We also change the found subset in-place so we don't need to # re-query the subset afterwards subset["data"].update(data) bulk_writes.append(UpdateOne( @@ -641,7 +633,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): }} )) - self.log.info("Registered subset: {}".format(subset_name)) + self.log.info("Prepared subset: {}".format(subset_name)) return subset, bulk_writes def create_version_data(self, instance):