diff --git a/openpype/client/entities.py b/openpype/client/entities.py index dd5d831ecf..0e94b99ae6 100644 --- a/openpype/client/entities.py +++ b/openpype/client/entities.py @@ -6,38 +6,12 @@ that has project name as a context (e.g. on 'ProjectEntity'?). + We will need more specific functions doing wery specific queires really fast. """ -import os import collections import six from bson.objectid import ObjectId -from .mongo import OpenPypeMongoConnection - - -def _get_project_database(): - db_name = os.environ.get("AVALON_DB") or "avalon" - return OpenPypeMongoConnection.get_mongo_client()[db_name] - - -def get_project_connection(project_name): - """Direct access to mongo collection. - - We're trying to avoid using direct access to mongo. This should be used - only for Create, Update and Remove operations until there are implemented - api calls for that. - - Args: - project_name(str): Project name for which collection should be - returned. - - Returns: - pymongo.Collection: Collection realated to passed project. - """ - - if not project_name: - raise ValueError("Invalid project name {}".format(str(project_name))) - return _get_project_database()[project_name] +from .mongo import get_project_database, get_project_connection def _prepare_fields(fields, required_fields=None): @@ -72,7 +46,7 @@ def _convert_ids(in_ids): def get_projects(active=True, inactive=False, fields=None): - mongodb = _get_project_database() + mongodb = get_project_database() for project_name in mongodb.collection_names(): if project_name in ("system.indexes",): continue diff --git a/openpype/client/mongo.py b/openpype/client/mongo.py index a747250107..72acbc5476 100644 --- a/openpype/client/mongo.py +++ b/openpype/client/mongo.py @@ -208,3 +208,28 @@ class OpenPypeMongoConnection: mongo_url, time.time() - t1 )) return mongo_client + + +def get_project_database(): + db_name = os.environ.get("AVALON_DB") or "avalon" + return OpenPypeMongoConnection.get_mongo_client()[db_name] + + +def get_project_connection(project_name): + """Direct access to mongo collection. + + We're trying to avoid using direct access to mongo. This should be used + only for Create, Update and Remove operations until there are implemented + api calls for that. + + Args: + project_name(str): Project name for which collection should be + returned. + + Returns: + pymongo.Collection: Collection realated to passed project. + """ + + if not project_name: + raise ValueError("Invalid project name {}".format(str(project_name))) + return get_project_database()[project_name] diff --git a/openpype/client/operations.py b/openpype/client/operations.py new file mode 100644 index 0000000000..69d1eb2bb6 --- /dev/null +++ b/openpype/client/operations.py @@ -0,0 +1,587 @@ +import uuid +import copy +import collections +from abc import ABCMeta, abstractmethod, abstractproperty + +import six +from bson.objectid import ObjectId +from pymongo import DeleteOne, InsertOne, UpdateOne + +from .mongo import get_project_connection + +REMOVED_VALUE = object() + +CURRENT_PROJECT_SCHEMA = "openpype:project-3.0" +CURRENT_PROJECT_CONFIG_SCHEMA = "openpype:config-2.0" +CURRENT_ASSET_DOC_SCHEMA = "openpype:asset-3.0" +CURRENT_SUBSET_SCHEMA = "openpype:subset-3.0" +CURRENT_VERSION_SCHEMA = "openpype:version-3.0" +CURRENT_REPRESENTATION_SCHEMA = "openpype:representation-2.0" + + +def _create_or_convert_to_mongo_id(mongo_id): + if mongo_id is None: + return ObjectId() + return ObjectId(mongo_id) + + +def new_project_document( + project_name, project_code, config, data=None, entity_id=None +): + """Create skeleton data of project document. + + Args: + project_name (str): Name of project. Used as identifier of a project. + project_code (str): Shorter version of projet without spaces and + special characters (in most of cases). Should be also considered + as unique name across projects. + config (Dic[str, Any]): Project config consist of roots, templates, + applications and other project Anatomy related data. + data (Dict[str, Any]): Project data with information about it's + attributes (e.g. 'fps' etc.) or integration specific keys. + entity_id (Union[str, ObjectId]): Predefined id of document. New id is + created if not passed. + + Returns: + Dict[str, Any]: Skeleton of project document. + """ + + if data is None: + data = {} + + data["code"] = project_code + + return { + "_id": _create_or_convert_to_mongo_id(entity_id), + "name": project_name, + "type": CURRENT_PROJECT_SCHEMA, + "entity_data": data, + "config": config + } + + +def new_asset_document( + name, project_id, parent_id, parents, data=None, entity_id=None +): + """Create skeleton data of asset document. + + Args: + name (str): Is considered as unique identifier of asset in project. + project_id (Union[str, ObjectId]): Id of project doument. + parent_id (Union[str, ObjectId]): Id of parent asset. + parents (List[str]): List of parent assets names. + data (Dict[str, Any]): Asset document data. Empty dictionary is used + if not passed. Value of 'parent_id' is used to fill 'visualParent'. + entity_id (Union[str, ObjectId]): Predefined id of document. New id is + created if not passed. + + Returns: + Dict[str, Any]: Skeleton of asset document. + """ + + if data is None: + data = {} + if parent_id is not None: + parent_id = ObjectId(parent_id) + data["visualParent"] = parent_id + data["parents"] = parents + + return { + "_id": _create_or_convert_to_mongo_id(entity_id), + "type": "asset", + "name": name, + "parent": ObjectId(project_id), + "data": data, + "schema": CURRENT_ASSET_DOC_SCHEMA + } + + +def new_subset_document(name, family, asset_id, data=None, entity_id=None): + """Create skeleton data of subset document. + + Args: + name (str): Is considered as unique identifier of subset under asset. + family (str): Subset's family. + asset_id (Union[str, ObjectId]): Id of parent asset. + data (Dict[str, Any]): Subset document data. Empty dictionary is used + if not passed. Value of 'family' is used to fill 'family'. + entity_id (Union[str, ObjectId]): Predefined id of document. New id is + created if not passed. + + Returns: + Dict[str, Any]: Skeleton of subset document. + """ + + if data is None: + data = {} + data["family"] = family + return { + "_id": _create_or_convert_to_mongo_id(entity_id), + "schema": CURRENT_SUBSET_SCHEMA, + "type": "subset", + "name": name, + "data": data, + "parent": asset_id + } + + +def new_version_doc(version, subset_id, data=None, entity_id=None): + """Create skeleton data of version document. + + Args: + version (int): Is considered as unique identifier of version + under subset. + subset_id (Union[str, ObjectId]): Id of parent subset. + data (Dict[str, Any]): Version document data. + entity_id (Union[str, ObjectId]): Predefined id of document. New id is + created if not passed. + + Returns: + Dict[str, Any]: Skeleton of version document. + """ + + if data is None: + data = {} + + return { + "_id": _create_or_convert_to_mongo_id(entity_id), + "schema": CURRENT_VERSION_SCHEMA, + "type": "version", + "name": int(version), + "parent": subset_id, + "data": data + } + + +def new_representation_doc( + name, version_id, context, data=None, entity_id=None +): + """Create skeleton data of asset document. + + Args: + version (int): Is considered as unique identifier of version + under subset. + version_id (Union[str, ObjectId]): Id of parent version. + context (Dict[str, Any]): Representation context used for fill template + of to query. + data (Dict[str, Any]): Representation document data. + entity_id (Union[str, ObjectId]): Predefined id of document. New id is + created if not passed. + + Returns: + Dict[str, Any]: Skeleton of version document. + """ + + if data is None: + data = {} + + return { + "_id": _create_or_convert_to_mongo_id(entity_id), + "schema": CURRENT_REPRESENTATION_SCHEMA, + "type": "representation", + "parent": version_id, + "name": name, + "data": data, + + # Imprint shortcut to context for performance reasons. + "context": context + } + + +def _prepare_update_data(old_doc, new_doc, replace): + changes = {} + for key, value in new_doc.items(): + if key not in old_doc or value != old_doc[key]: + changes[key] = value + + if replace: + for key in old_doc.keys(): + if key not in new_doc: + changes[key] = REMOVED_VALUE + return changes + + +def prepare_subset_update_data(old_doc, new_doc, replace=True): + """Compare two subset documents and prepare update data. + + Based on compared values will create update data for 'UpdateOperation'. + + Empty output means that documents are identical. + + Returns: + Dict[str, Any]: Changes between old and new document. + """ + + return _prepare_update_data(old_doc, new_doc, replace) + + +def prepare_version_update_data(old_doc, new_doc, replace=True): + """Compare two version documents and prepare update data. + + Based on compared values will create update data for 'UpdateOperation'. + + Empty output means that documents are identical. + + Returns: + Dict[str, Any]: Changes between old and new document. + """ + + return _prepare_update_data(old_doc, new_doc, replace) + + +def prepare_representation_update_data(old_doc, new_doc, replace=True): + """Compare two representation documents and prepare update data. + + Based on compared values will create update data for 'UpdateOperation'. + + Empty output means that documents are identical. + + Returns: + Dict[str, Any]: Changes between old and new document. + """ + + return _prepare_update_data(old_doc, new_doc, replace) + + +@six.add_metaclass(ABCMeta) +class AbstractOperation(object): + """Base operation class. + + Opration represent a call into database. The call can create, change or + remove data. + + Args: + project_name (str): On which project operation will happen. + entity_type (str): Type of entity on which change happens. + e.g. 'asset', 'representation' etc. + """ + + def __init__(self, project_name, entity_type): + self._project_name = project_name + self._entity_type = entity_type + self._id = str(uuid.uuid4()) + + @property + def project_name(self): + return self._project_name + + @property + def id(self): + """Identifier of operation.""" + + return self._id + + @property + def entity_type(self): + return self._entity_type + + @abstractproperty + def operation_name(self): + """Stringified type of operation.""" + + pass + + @abstractmethod + def to_mongo_operation(self): + """Convert operation to Mongo batch operation.""" + + pass + + def to_data(self): + """Convert opration to data that can be converted to json or others. + + Warning: + Current state returns ObjectId objects which cannot be parsed by + json. + + Returns: + Dict[str, Any]: Description of operation. + """ + + return { + "id": self._id, + "entity_type": self.entity_type, + "project_name": self.project_name, + "operation": self.operation_name + } + + +class CreateOperation(AbstractOperation): + """Opeartion to create an entity. + + Args: + project_name (str): On which project operation will happen. + entity_type (str): Type of entity on which change happens. + e.g. 'asset', 'representation' etc. + data (Dict[str, Any]): Data of entity that will be created. + """ + + operation_name = "create" + + def __init__(self, project_name, entity_type, data): + super(CreateOperation, self).__init__(project_name, entity_type) + + if not data: + data = {} + else: + data = copy.deepcopy(dict(data)) + + if "_id" not in data: + data["_id"] = ObjectId() + else: + data["_id"] = ObjectId(data["_id"]) + + self._entity_id = data["_id"] + self._data = data + + def __setitem__(self, key, value): + self.set_value(key, value) + + def __getitem__(self, key): + return self.data[key] + + def set_value(self, key, value): + self.data[key] = value + + def get(self, key, *args, **kwargs): + return self.data.get(key, *args, **kwargs) + + @property + def entity_id(self): + return self._entity_id + + @property + def data(self): + return self._data + + def to_mongo_operation(self): + return InsertOne(copy.deepcopy(self._data)) + + def to_data(self): + output = super(CreateOperation, self).to_data() + output["data"] = copy.deepcopy(self.data) + return output + + +class UpdateOperation(AbstractOperation): + """Opeartion to update an entity. + + Args: + project_name (str): On which project operation will happen. + entity_type (str): Type of entity on which change happens. + e.g. 'asset', 'representation' etc. + entity_id (Union[str, ObjectId]): Identifier of an entity. + update_data (Dict[str, Any]): Key -> value changes that will be set in + database. If value is set to 'REMOVED_VALUE' the key will be + removed. Only first level of dictionary is checked (on purpose). + """ + + operation_name = "update" + + def __init__(self, project_name, entity_type, entity_id, update_data): + super(UpdateOperation, self).__init__(project_name, entity_type) + + self._entity_id = ObjectId(entity_id) + self._update_data = update_data + + @property + def entity_id(self): + return self._entity_id + + @property + def update_data(self): + return self._update_data + + def to_mongo_operation(self): + unset_data = {} + set_data = {} + for key, value in self._update_data.items(): + if value is REMOVED_VALUE: + unset_data[key] = value + else: + set_data[key] = value + + op_data = {} + if unset_data: + op_data["$unset"] = unset_data + if set_data: + op_data["$set"] = set_data + + if not op_data: + return None + + return UpdateOne( + {"_id": self.entity_id}, + op_data + ) + + def to_data(self): + changes = {} + for key, value in self._update_data.items(): + if value is REMOVED_VALUE: + value = None + changes[key] = value + + output = super(UpdateOperation, self).to_data() + output.update({ + "entity_id": self.entity_id, + "changes": changes + }) + return output + + +class DeleteOperation(AbstractOperation): + """Opeartion to delete an entity. + + Args: + project_name (str): On which project operation will happen. + entity_type (str): Type of entity on which change happens. + e.g. 'asset', 'representation' etc. + entity_id (Union[str, ObjectId]): Entity id that will be removed. + """ + + operation_name = "delete" + + def __init__(self, project_name, entity_type, entity_id): + super(DeleteOperation, self).__init__(project_name, entity_type) + + self._entity_id = ObjectId(entity_id) + + @property + def entity_id(self): + return self._entity_id + + def to_mongo_operation(self): + return DeleteOne({"_id": self.entity_id}) + + def to_data(self): + output = super(DeleteOperation, self).to_data() + output["entity_id"] = self.entity_id + return output + + +class OperationsSession(object): + """Session storing operations that should happen in an order. + + At this moment does not handle anything special can be sonsidered as + stupid list of operations that will happen after each other. If creation + of same entity is there multiple times it's handled in any way and document + values are not validated. + + All operations must be related to single project. + + Args: + project_name (str): Project name to which are operations related. + """ + + def __init__(self): + self._operations = [] + + def add(self, operation): + """Add operation to be processed. + + Args: + operation (BaseOperation): Operation that should be processed. + """ + if not isinstance( + operation, + (CreateOperation, UpdateOperation, DeleteOperation) + ): + raise TypeError("Expected Operation object got {}".format( + str(type(operation)) + )) + + self._operations.append(operation) + + def append(self, operation): + """Add operation to be processed. + + Args: + operation (BaseOperation): Operation that should be processed. + """ + + self.add(operation) + + def extend(self, operations): + """Add operations to be processed. + + Args: + operations (List[BaseOperation]): Operations that should be + processed. + """ + + for operation in operations: + self.add(operation) + + def remove(self, operation): + """Remove operation.""" + + self._operations.remove(operation) + + def clear(self): + """Clear all registered operations.""" + + self._operations = [] + + def to_data(self): + return [ + operation.to_data() + for operation in self._operations + ] + + def commit(self): + """Commit session operations.""" + + operations, self._operations = self._operations, [] + if not operations: + return + + operations_by_project = collections.defaultdict(list) + for operation in operations: + operations_by_project[operation.project_name].append(operation) + + for project_name, operations in operations_by_project.items(): + bulk_writes = [] + for operation in operations: + mongo_op = operation.to_mongo_operation() + if mongo_op is not None: + bulk_writes.append(mongo_op) + + if bulk_writes: + collection = get_project_connection(project_name) + collection.bulk_write(bulk_writes) + + def create_entity(self, project_name, entity_type, data): + """Fast access to 'CreateOperation'. + + Returns: + CreateOperation: Object of update operation. + """ + + operation = CreateOperation(project_name, entity_type, data) + self.add(operation) + return operation + + def update_entity(self, project_name, entity_type, entity_id, update_data): + """Fast access to 'UpdateOperation'. + + Returns: + UpdateOperation: Object of update operation. + """ + + operation = UpdateOperation( + project_name, entity_type, entity_id, update_data + ) + self.add(operation) + return operation + + def delete_entity(self, project_name, entity_type, entity_id): + """Fast access to 'DeleteOperation'. + + Returns: + DeleteOperation: Object of delete operation. + """ + + operation = DeleteOperation(project_name, entity_type, entity_id) + self.add(operation) + return operation diff --git a/openpype/plugins/publish/integrate.py b/openpype/plugins/publish/integrate.py index 688e252f1b..f99c718f8a 100644 --- a/openpype/plugins/publish/integrate.py +++ b/openpype/plugins/publish/integrate.py @@ -5,8 +5,16 @@ import copy import clique import six +from openpype.client.operations import ( + OperationsSession, + new_subset_document, + new_version_doc, + new_representation_doc, + prepare_subset_update_data, + prepare_version_update_data, + prepare_representation_update_data, +) from bson.objectid import ObjectId -from pymongo import DeleteMany, ReplaceOne, InsertOne, UpdateOne import pyblish.api from openpype.client import ( @@ -247,9 +255,12 @@ class IntegrateAsset(pyblish.api.InstancePlugin): template_name = self.get_template_name(instance) - subset, subset_writes = self.prepare_subset(instance, project_name) - version, version_writes = self.prepare_version( - instance, subset, project_name + op_session = OperationsSession() + subset = self.prepare_subset( + instance, op_session, project_name + ) + version = self.prepare_version( + instance, op_session, subset, project_name ) instance.data["versionEntity"] = version @@ -299,7 +310,8 @@ class IntegrateAsset(pyblish.api.InstancePlugin): # Transaction to reduce the chances of another publish trying to # publish to the same version number since that chance can greatly # increase if the file transaction takes a long time. - legacy_io.bulk_write(subset_writes + version_writes) + op_session.commit() + self.log.info("Subset {subset[name]} and Version {version[name]} " "written to database..".format(subset=subset, version=version)) @@ -331,49 +343,49 @@ class IntegrateAsset(pyblish.api.InstancePlugin): # Finalize the representations now the published files are integrated # Get 'files' info for representations and its attached resources - representation_writes = [] new_repre_names_low = set() for prepared in prepared_representations: - representation = prepared["representation"] + repre_doc = prepared["representation"] + repre_update_data = prepared["repre_doc_update_data"] transfers = prepared["transfers"] destinations = [dst for src, dst in transfers] - representation["files"] = self.get_files_info( + repre_doc["files"] = self.get_files_info( destinations, sites=sites, anatomy=anatomy ) # Add the version resource file infos to each representation - representation["files"] += resource_file_infos + repre_doc["files"] += resource_file_infos # Set up representation for writing to the database. Since # we *might* be overwriting an existing entry if the version # already existed we'll use ReplaceOnce with `upsert=True` - representation_writes.append(ReplaceOne( - filter={"_id": representation["_id"]}, - replacement=representation, - upsert=True - )) + if repre_update_data is None: + op_session.create_entity( + project_name, repre_doc["type"], repre_doc + ) + else: + op_session.update_entity( + project_name, + repre_doc["type"], + repre_doc["_id"], + repre_update_data + ) - new_repre_names_low.add(representation["name"].lower()) + new_repre_names_low.add(repre_doc["name"].lower()) # Delete any existing representations that didn't get any new data # if the instance is not set to append mode if not instance.data.get("append", False): - delete_names = set() for name, existing_repres in existing_repres_by_name.items(): if name not in new_repre_names_low: # We add the exact representation name because `name` is # lowercase for name matching only and not in the database - delete_names.add(existing_repres["name"]) - if delete_names: - representation_writes.append(DeleteMany( - filter={ - "parent": version["_id"], - "name": {"$in": list(delete_names)} - } - )) + op_session.delete_entity( + project_name, "representation", existing_repres["_id"] + ) - # Write representations to the database - legacy_io.bulk_write(representation_writes) + self.log.debug("{}".format(op_session.to_data())) + op_session.commit() # Backwards compatibility # todo: can we avoid the need to store this? @@ -384,13 +396,14 @@ class IntegrateAsset(pyblish.api.InstancePlugin): self.log.info("Registered {} representations" "".format(len(prepared_representations))) - def prepare_subset(self, instance, project_name): + def prepare_subset(self, instance, op_session, project_name): asset_doc = instance.data["assetEntity"] subset_name = instance.data["subset"] + family = instance.data["family"] self.log.debug("Subset: {}".format(subset_name)) # Get existing subset if it exists - subset_doc = get_subset_by_name( + existing_subset_doc = get_subset_by_name( project_name, subset_name, asset_doc["_id"] ) @@ -403,69 +416,79 @@ class IntegrateAsset(pyblish.api.InstancePlugin): if subset_group: data["subsetGroup"] = subset_group - bulk_writes = [] - if subset_doc is None: + subset_id = None + if existing_subset_doc: + subset_id = existing_subset_doc["_id"] + subset_doc = new_subset_document( + subset_name, family, asset_doc["_id"], data, subset_id + ) + + if existing_subset_doc is None: # Create a new subset self.log.info("Subset '%s' not found, creating ..." % subset_name) - subset_doc = { - "_id": ObjectId(), - "schema": "openpype:subset-3.0", - "type": "subset", - "name": subset_name, - "data": data, - "parent": asset_doc["_id"] - } - bulk_writes.append(InsertOne(subset_doc)) + op_session.create_entity( + project_name, subset_doc["type"], subset_doc + ) else: # Update existing subset data with new data and set in database. # We also change the found subset in-place so we don't need to # re-query the subset afterwards subset_doc["data"].update(data) - bulk_writes.append(UpdateOne( - {"type": "subset", "_id": subset_doc["_id"]}, - {"$set": { - "data": subset_doc["data"] - }} - )) + update_data = prepare_subset_update_data( + existing_subset_doc, subset_doc + ) + op_session.update_entity( + project_name, + subset_doc["type"], + subset_doc["_id"], + update_data + ) self.log.info("Prepared subset: {}".format(subset_name)) - return subset_doc, bulk_writes + return subset_doc - def prepare_version(self, instance, subset_doc, project_name): + def prepare_version(self, instance, op_session, subset_doc, project_name): version_number = instance.data["version"] - version_doc = { - "schema": "openpype:version-3.0", - "type": "version", - "parent": subset_doc["_id"], - "name": version_number, - "data": self.create_version_data(instance) - } - existing_version = get_version_by_name( project_name, version_number, subset_doc["_id"], fields=["_id"] ) + version_id = None + if existing_version: + version_id = existing_version["_id"] + + version_data = self.create_version_data(instance) + version_doc = new_version_doc( + version_number, + subset_doc["_id"], + version_data, + version_id + ) if existing_version: self.log.debug("Updating existing version ...") - version_doc["_id"] = existing_version["_id"] + update_data = prepare_version_update_data( + existing_version, version_doc + ) + op_session.update_entity( + project_name, + version_doc["type"], + version_doc["_id"], + update_data + ) else: self.log.debug("Creating new version ...") - version_doc["_id"] = ObjectId() - - bulk_writes = [ReplaceOne( - filter={"_id": version_doc["_id"]}, - replacement=version_doc, - upsert=True - )] + op_session.create_entity( + project_name, version_doc["type"], version_doc + ) self.log.info("Prepared version: v{0:03d}".format(version_doc["name"])) - return version_doc, bulk_writes + return version_doc def prepare_representation(self, repre, template_name, @@ -676,10 +699,9 @@ class IntegrateAsset(pyblish.api.InstancePlugin): # Use previous representation's id if there is a name match existing = existing_repres_by_name.get(repre["name"].lower()) + repre_id = None if existing: repre_id = existing["_id"] - else: - repre_id = ObjectId() # Store first transferred destination as published path data # - used primarily for reviews that are integrated to custom modules @@ -693,20 +715,18 @@ class IntegrateAsset(pyblish.api.InstancePlugin): # and the actual representation entity for the database data = repre.get("data", {}) data.update({"path": published_path, "template": template}) - representation = { - "_id": repre_id, - "schema": "openpype:representation-2.0", - "type": "representation", - "parent": version["_id"], - "name": repre["name"], - "data": data, - - # Imprint shortcut to context for performance reasons. - "context": repre_context - } + repre_doc = new_representation_doc( + repre["name"], version["_id"], repre_context, data, repre_id + ) + update_data = None + if repre_id is not None: + update_data = prepare_representation_update_data( + existing, repre_doc + ) return { - "representation": representation, + "representation": repre_doc, + "repre_doc_update_data": update_data, "anatomy_data": template_data, "transfers": transfers, # todo: avoid the need for 'published_files' used by Integrate Hero