Also Bulk Write representation changes + more cleanup

- Don't create intermediate archived representations
- Move writing of Subset + Version to database to just before file transactions
- Perform ReplaceOne for version instead of update with "$set" for the full version
This commit is contained in:
Roy Nieterau 2022-03-28 15:04:24 +02:00
parent 1844281c68
commit 8e0161bec7

View file

@ -6,7 +6,7 @@ import clique
import six
from bson.objectid import ObjectId
from pymongo import DeleteOne, InsertOne, UpdateOne
from pymongo import DeleteMany, ReplaceOne, InsertOne, UpdateOne
import pyblish.api
from avalon import io
import openpype.api
@ -28,6 +28,11 @@ def get_first_frame_padded(collection):
return get_frame_padded(start_frame, padding=collection.padding)
def bulk_write(writes):
"""Convenience function to bulk write into active project database"""
return io._database[io.Session["AVALON_PROJECT"]].bulk_write(writes)
class IntegrateAssetNew(pyblish.api.InstancePlugin):
"""Resolve any dependency issues
@ -177,21 +182,17 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
template_name = self._get_template_name(instance)
subset, subset_writes = self.register_subset(instance)
version, version_writes = self.register_version(instance, subset)
subset, subset_writes = self.prepare_subset(instance)
version, version_writes = self.prepare_version(instance, subset)
instance.data["versionEntity"] = version
# Bulk write to the database
# todo: Try to avoid writing already until after we've prepared
# representations to allow easier rollback?
io._database[io.Session["AVALON_PROJECT"]].bulk_write(
subset_writes + version_writes
)
archived_repres = list(io.find({
"parent": version["_id"],
"type": "archived_representation"
}))
# Get existing representations (if any)
existing_repres_by_name = {
repres["name"].lower(): repres for repres in io.find({
"parent": version["_id"],
"type": "representation"
})
}
# Prepare all representations
prepared_representations = []
@ -205,7 +206,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
# todo: reduce/simplify what is returned from this function
prepared = self.prepare_representation(repre,
template_name,
archived_repres,
existing_repres_by_name,
version,
instance_stagingdir,
instance)
@ -225,40 +226,70 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
for src, dst in instance.data.get("hardlinks", []):
file_transactions.add(src, dst, mode=FileTransaction.MODE_HARDLINK)
# Bulk write to the database
# todo: Can we move this even to after the file transfers?
bulk_write(subset_writes + version_writes)
self.log.info("Subset {subset[name]} and Version {version[name]} "
"written to database..".format(subset=subset,
version=version))
# Process all file transfers of all integrations now
self.log.debug("Integrating source files to destination ...")
file_transactions.process()
self.log.debug("Backup files "
self.log.debug("Backed up existing files: "
"{}".format(file_transactions.backups))
self.log.debug("Integrated files "
self.log.debug("Transferred files: "
"{}".format(file_transactions.transferred))
# Finalize the representations now the published files are integrated
# Get 'files' info for representations and its attached resources
self.log.debug("Retrieving Representation files information ...")
self.log.debug("Retrieving Representation Site Sync information ...")
sites = SiteSync.compute_resource_sync_sites(
system_settings=instance.context.data["system_settings"],
project_settings=instance.context.data["project_settings"]
)
log.debug("final sites:: {}".format(sites))
self.log.debug("final sites:: {}".format(sites))
anatomy = instance.context.data["anatomy"]
representations = []
representation_writes = []
new_repre_names_low = set()
for prepared in prepared_representations:
transfers = prepared["transfers"]
representation = prepared["representation"]
representation["files"] = self.get_files_info(
transfers, sites, anatomy
)
representations.append(representation)
# Remove all archived representations
if archived_repres:
repre_ids_to_remove = [repre["_id"] for repre in archived_repres]
io.delete_many({"_id": {"$in": repre_ids_to_remove}})
# Set up representation for writing to the database. Since
# we *might* be overwriting an existing entry if the version
# already existed we'll use ReplaceOnce with `upsert=True`
representation_writes.append(ReplaceOne(
filter={"_id": representation["_id"]},
replacement=representation,
upsert=True
))
# Write the new representations to the database
io.insert_many(representations)
new_repre_names_low.add(representation["name"].lower())
# Delete any existing representations that didn't get any new data
# if the instance is not set to append mode
if not instance.data.get("append", False):
delete_names = set()
for name, existing_repres in existing_repres_by_name.items():
if name not in new_repre_names_low:
# We add the exact representation name because `name` is
# lowercase for name matching only and not in the database
delete_names.add(existing_repres["name"])
if delete_names:
representation_writes.append(DeleteMany(
filter={
"parent": version["_id"],
"name": {"$in": list(delete_names)}
}
))
# Write representations to the database
bulk_write(representation_writes)
# Backwards compatibility
# todo: can we avoid the need to store this?
@ -267,12 +298,11 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
}
self.log.info("Registered {} representations"
"".format(len(representations)))
"".format(len(prepared_representations)))
def register_version(self, instance, subset):
def prepare_version(self, instance, subset):
version_number = instance.data["version"]
self.log.debug("Version: v{0:03d}".format(version_number))
version = {
"schema": "openpype:version-3.0",
@ -288,61 +318,26 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
'name': version_number
})
bulk_writes = []
if existing_version is None:
if existing_version:
self.log.debug("Updating existing version ...")
version["_id"] = existing_version["_id"]
else:
self.log.debug("Creating new version ...")
version["_id"] = ObjectId()
bulk_writes.append(InsertOne(version))
else:
self.log.debug("Updating existing version ...")
# Check if instance have set `append` mode which cause that
# only replicated representations are set to archive
append_repres = instance.data.get("append", False)
# Update version data
version_id = existing_version['_id']
bulk_writes.append(UpdateOne({
'_id': version_id
}, {
'$set': version
}))
bulk_writes = [ReplaceOne(
filter={"_id": version["_id"]},
replacement=version,
upsert=True
)]
# Instead of directly writing and querying we reproduce what
# the resulting version would look like so we can hold off making
# changes to the database to avoid the need for 'rollback'
version = copy.deepcopy(version)
version["_id"] = existing_version["_id"]
# Find representations of existing version and archive them
repres = instance.data.get("representations", [])
new_repre_names_low = [_repre["name"].lower() for _repre in repres]
current_repres = io.find({
"type": "representation",
"parent": version_id
})
for repre in current_repres:
if append_repres:
# archive only duplicated representations
if repre["name"].lower() not in new_repre_names_low:
continue
# Representation must change type,
# `_id` must be stored to other key and replaced with new
# - that is because new representations should have same ID
repre_id = repre["_id"]
bulk_writes.append(DeleteOne({"_id": repre_id}))
repre["orig_id"] = repre_id
repre["_id"] = ObjectId()
repre["type"] = "archived_representation"
bulk_writes.append(InsertOne(repre))
self.log.info("Registered version: v{0:03d}".format(version["name"]))
self.log.info("Prepared version: v{0:03d}".format(version["name"]))
return version, bulk_writes
def prepare_representation(self, repre,
template_name,
archived_repres,
existing_repres_by_name,
version,
instance_stagingdir,
instance):
@ -516,15 +511,12 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
if repre.get("udim"):
repre_context["udim"] = repre.get("udim") # store list
# Define representation id
repre_id = ObjectId()
# Use previous representation's id if there is a name match
repre_name_lower = repre["name"].lower()
for _archived_repres in archived_repres:
if repre_name_lower == _archived_repres["name"].lower():
repre_id = _archived_repres["orig_id"]
break
existing = existing_repres_by_name.get(repre["name"].lower())
if existing:
repre_id = existing["_id"]
else:
repre_id = ObjectId()
# Backwards compatibility:
# Store first transferred destination as published path data
@ -594,7 +586,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
template_name = profile["template_name"]
return template_name
def register_subset(self, instance):
def prepare_subset(self, instance):
asset = instance.data.get("assetEntity")
subset_name = instance.data["subset"]
self.log.debug("Subset: {}".format(subset_name))
@ -631,7 +623,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
else:
# Update existing subset data with new data and set in database.
# We also change the found subset in-place so we don't need to
# We also change the found subset in-place so we don't need to
# re-query the subset afterwards
subset["data"].update(data)
bulk_writes.append(UpdateOne(
@ -641,7 +633,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
}}
))
self.log.info("Registered subset: {}".format(subset_name))
self.log.info("Prepared subset: {}".format(subset_name))
return subset, bulk_writes
def create_version_data(self, instance):