Merge pull request #3601 from pypeclub/feature/OP-3392_Naive-implementation-of-document-updates-for-client

General: Naive implementation of document create, update, delete
This commit is contained in:
Jakub Trllo 2022-08-08 17:35:31 +02:00 committed by GitHub
commit 205ee30019
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 712 additions and 106 deletions

View file

@ -6,38 +6,12 @@ that has project name as a context (e.g. on 'ProjectEntity'?).
+ We will need more specific functions doing wery specific queires really fast.
"""
import os
import collections
import six
from bson.objectid import ObjectId
from .mongo import OpenPypeMongoConnection
def _get_project_database():
db_name = os.environ.get("AVALON_DB") or "avalon"
return OpenPypeMongoConnection.get_mongo_client()[db_name]
def get_project_connection(project_name):
"""Direct access to mongo collection.
We're trying to avoid using direct access to mongo. This should be used
only for Create, Update and Remove operations until there are implemented
api calls for that.
Args:
project_name(str): Project name for which collection should be
returned.
Returns:
pymongo.Collection: Collection realated to passed project.
"""
if not project_name:
raise ValueError("Invalid project name {}".format(str(project_name)))
return _get_project_database()[project_name]
from .mongo import get_project_database, get_project_connection
def _prepare_fields(fields, required_fields=None):
@ -72,7 +46,7 @@ def _convert_ids(in_ids):
def get_projects(active=True, inactive=False, fields=None):
mongodb = _get_project_database()
mongodb = get_project_database()
for project_name in mongodb.collection_names():
if project_name in ("system.indexes",):
continue

View file

@ -208,3 +208,28 @@ class OpenPypeMongoConnection:
mongo_url, time.time() - t1
))
return mongo_client
def get_project_database():
db_name = os.environ.get("AVALON_DB") or "avalon"
return OpenPypeMongoConnection.get_mongo_client()[db_name]
def get_project_connection(project_name):
"""Direct access to mongo collection.
We're trying to avoid using direct access to mongo. This should be used
only for Create, Update and Remove operations until there are implemented
api calls for that.
Args:
project_name(str): Project name for which collection should be
returned.
Returns:
pymongo.Collection: Collection realated to passed project.
"""
if not project_name:
raise ValueError("Invalid project name {}".format(str(project_name)))
return get_project_database()[project_name]

View file

@ -0,0 +1,587 @@
import uuid
import copy
import collections
from abc import ABCMeta, abstractmethod, abstractproperty
import six
from bson.objectid import ObjectId
from pymongo import DeleteOne, InsertOne, UpdateOne
from .mongo import get_project_connection
REMOVED_VALUE = object()
CURRENT_PROJECT_SCHEMA = "openpype:project-3.0"
CURRENT_PROJECT_CONFIG_SCHEMA = "openpype:config-2.0"
CURRENT_ASSET_DOC_SCHEMA = "openpype:asset-3.0"
CURRENT_SUBSET_SCHEMA = "openpype:subset-3.0"
CURRENT_VERSION_SCHEMA = "openpype:version-3.0"
CURRENT_REPRESENTATION_SCHEMA = "openpype:representation-2.0"
def _create_or_convert_to_mongo_id(mongo_id):
if mongo_id is None:
return ObjectId()
return ObjectId(mongo_id)
def new_project_document(
project_name, project_code, config, data=None, entity_id=None
):
"""Create skeleton data of project document.
Args:
project_name (str): Name of project. Used as identifier of a project.
project_code (str): Shorter version of projet without spaces and
special characters (in most of cases). Should be also considered
as unique name across projects.
config (Dic[str, Any]): Project config consist of roots, templates,
applications and other project Anatomy related data.
data (Dict[str, Any]): Project data with information about it's
attributes (e.g. 'fps' etc.) or integration specific keys.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of project document.
"""
if data is None:
data = {}
data["code"] = project_code
return {
"_id": _create_or_convert_to_mongo_id(entity_id),
"name": project_name,
"type": CURRENT_PROJECT_SCHEMA,
"entity_data": data,
"config": config
}
def new_asset_document(
name, project_id, parent_id, parents, data=None, entity_id=None
):
"""Create skeleton data of asset document.
Args:
name (str): Is considered as unique identifier of asset in project.
project_id (Union[str, ObjectId]): Id of project doument.
parent_id (Union[str, ObjectId]): Id of parent asset.
parents (List[str]): List of parent assets names.
data (Dict[str, Any]): Asset document data. Empty dictionary is used
if not passed. Value of 'parent_id' is used to fill 'visualParent'.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of asset document.
"""
if data is None:
data = {}
if parent_id is not None:
parent_id = ObjectId(parent_id)
data["visualParent"] = parent_id
data["parents"] = parents
return {
"_id": _create_or_convert_to_mongo_id(entity_id),
"type": "asset",
"name": name,
"parent": ObjectId(project_id),
"data": data,
"schema": CURRENT_ASSET_DOC_SCHEMA
}
def new_subset_document(name, family, asset_id, data=None, entity_id=None):
"""Create skeleton data of subset document.
Args:
name (str): Is considered as unique identifier of subset under asset.
family (str): Subset's family.
asset_id (Union[str, ObjectId]): Id of parent asset.
data (Dict[str, Any]): Subset document data. Empty dictionary is used
if not passed. Value of 'family' is used to fill 'family'.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of subset document.
"""
if data is None:
data = {}
data["family"] = family
return {
"_id": _create_or_convert_to_mongo_id(entity_id),
"schema": CURRENT_SUBSET_SCHEMA,
"type": "subset",
"name": name,
"data": data,
"parent": asset_id
}
def new_version_doc(version, subset_id, data=None, entity_id=None):
"""Create skeleton data of version document.
Args:
version (int): Is considered as unique identifier of version
under subset.
subset_id (Union[str, ObjectId]): Id of parent subset.
data (Dict[str, Any]): Version document data.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of version document.
"""
if data is None:
data = {}
return {
"_id": _create_or_convert_to_mongo_id(entity_id),
"schema": CURRENT_VERSION_SCHEMA,
"type": "version",
"name": int(version),
"parent": subset_id,
"data": data
}
def new_representation_doc(
name, version_id, context, data=None, entity_id=None
):
"""Create skeleton data of asset document.
Args:
version (int): Is considered as unique identifier of version
under subset.
version_id (Union[str, ObjectId]): Id of parent version.
context (Dict[str, Any]): Representation context used for fill template
of to query.
data (Dict[str, Any]): Representation document data.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of version document.
"""
if data is None:
data = {}
return {
"_id": _create_or_convert_to_mongo_id(entity_id),
"schema": CURRENT_REPRESENTATION_SCHEMA,
"type": "representation",
"parent": version_id,
"name": name,
"data": data,
# Imprint shortcut to context for performance reasons.
"context": context
}
def _prepare_update_data(old_doc, new_doc, replace):
changes = {}
for key, value in new_doc.items():
if key not in old_doc or value != old_doc[key]:
changes[key] = value
if replace:
for key in old_doc.keys():
if key not in new_doc:
changes[key] = REMOVED_VALUE
return changes
def prepare_subset_update_data(old_doc, new_doc, replace=True):
"""Compare two subset documents and prepare update data.
Based on compared values will create update data for 'UpdateOperation'.
Empty output means that documents are identical.
Returns:
Dict[str, Any]: Changes between old and new document.
"""
return _prepare_update_data(old_doc, new_doc, replace)
def prepare_version_update_data(old_doc, new_doc, replace=True):
"""Compare two version documents and prepare update data.
Based on compared values will create update data for 'UpdateOperation'.
Empty output means that documents are identical.
Returns:
Dict[str, Any]: Changes between old and new document.
"""
return _prepare_update_data(old_doc, new_doc, replace)
def prepare_representation_update_data(old_doc, new_doc, replace=True):
"""Compare two representation documents and prepare update data.
Based on compared values will create update data for 'UpdateOperation'.
Empty output means that documents are identical.
Returns:
Dict[str, Any]: Changes between old and new document.
"""
return _prepare_update_data(old_doc, new_doc, replace)
@six.add_metaclass(ABCMeta)
class AbstractOperation(object):
"""Base operation class.
Opration represent a call into database. The call can create, change or
remove data.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
"""
def __init__(self, project_name, entity_type):
self._project_name = project_name
self._entity_type = entity_type
self._id = str(uuid.uuid4())
@property
def project_name(self):
return self._project_name
@property
def id(self):
"""Identifier of operation."""
return self._id
@property
def entity_type(self):
return self._entity_type
@abstractproperty
def operation_name(self):
"""Stringified type of operation."""
pass
@abstractmethod
def to_mongo_operation(self):
"""Convert operation to Mongo batch operation."""
pass
def to_data(self):
"""Convert opration to data that can be converted to json or others.
Warning:
Current state returns ObjectId objects which cannot be parsed by
json.
Returns:
Dict[str, Any]: Description of operation.
"""
return {
"id": self._id,
"entity_type": self.entity_type,
"project_name": self.project_name,
"operation": self.operation_name
}
class CreateOperation(AbstractOperation):
"""Opeartion to create an entity.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
data (Dict[str, Any]): Data of entity that will be created.
"""
operation_name = "create"
def __init__(self, project_name, entity_type, data):
super(CreateOperation, self).__init__(project_name, entity_type)
if not data:
data = {}
else:
data = copy.deepcopy(dict(data))
if "_id" not in data:
data["_id"] = ObjectId()
else:
data["_id"] = ObjectId(data["_id"])
self._entity_id = data["_id"]
self._data = data
def __setitem__(self, key, value):
self.set_value(key, value)
def __getitem__(self, key):
return self.data[key]
def set_value(self, key, value):
self.data[key] = value
def get(self, key, *args, **kwargs):
return self.data.get(key, *args, **kwargs)
@property
def entity_id(self):
return self._entity_id
@property
def data(self):
return self._data
def to_mongo_operation(self):
return InsertOne(copy.deepcopy(self._data))
def to_data(self):
output = super(CreateOperation, self).to_data()
output["data"] = copy.deepcopy(self.data)
return output
class UpdateOperation(AbstractOperation):
"""Opeartion to update an entity.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
entity_id (Union[str, ObjectId]): Identifier of an entity.
update_data (Dict[str, Any]): Key -> value changes that will be set in
database. If value is set to 'REMOVED_VALUE' the key will be
removed. Only first level of dictionary is checked (on purpose).
"""
operation_name = "update"
def __init__(self, project_name, entity_type, entity_id, update_data):
super(UpdateOperation, self).__init__(project_name, entity_type)
self._entity_id = ObjectId(entity_id)
self._update_data = update_data
@property
def entity_id(self):
return self._entity_id
@property
def update_data(self):
return self._update_data
def to_mongo_operation(self):
unset_data = {}
set_data = {}
for key, value in self._update_data.items():
if value is REMOVED_VALUE:
unset_data[key] = value
else:
set_data[key] = value
op_data = {}
if unset_data:
op_data["$unset"] = unset_data
if set_data:
op_data["$set"] = set_data
if not op_data:
return None
return UpdateOne(
{"_id": self.entity_id},
op_data
)
def to_data(self):
changes = {}
for key, value in self._update_data.items():
if value is REMOVED_VALUE:
value = None
changes[key] = value
output = super(UpdateOperation, self).to_data()
output.update({
"entity_id": self.entity_id,
"changes": changes
})
return output
class DeleteOperation(AbstractOperation):
"""Opeartion to delete an entity.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
entity_id (Union[str, ObjectId]): Entity id that will be removed.
"""
operation_name = "delete"
def __init__(self, project_name, entity_type, entity_id):
super(DeleteOperation, self).__init__(project_name, entity_type)
self._entity_id = ObjectId(entity_id)
@property
def entity_id(self):
return self._entity_id
def to_mongo_operation(self):
return DeleteOne({"_id": self.entity_id})
def to_data(self):
output = super(DeleteOperation, self).to_data()
output["entity_id"] = self.entity_id
return output
class OperationsSession(object):
"""Session storing operations that should happen in an order.
At this moment does not handle anything special can be sonsidered as
stupid list of operations that will happen after each other. If creation
of same entity is there multiple times it's handled in any way and document
values are not validated.
All operations must be related to single project.
Args:
project_name (str): Project name to which are operations related.
"""
def __init__(self):
self._operations = []
def add(self, operation):
"""Add operation to be processed.
Args:
operation (BaseOperation): Operation that should be processed.
"""
if not isinstance(
operation,
(CreateOperation, UpdateOperation, DeleteOperation)
):
raise TypeError("Expected Operation object got {}".format(
str(type(operation))
))
self._operations.append(operation)
def append(self, operation):
"""Add operation to be processed.
Args:
operation (BaseOperation): Operation that should be processed.
"""
self.add(operation)
def extend(self, operations):
"""Add operations to be processed.
Args:
operations (List[BaseOperation]): Operations that should be
processed.
"""
for operation in operations:
self.add(operation)
def remove(self, operation):
"""Remove operation."""
self._operations.remove(operation)
def clear(self):
"""Clear all registered operations."""
self._operations = []
def to_data(self):
return [
operation.to_data()
for operation in self._operations
]
def commit(self):
"""Commit session operations."""
operations, self._operations = self._operations, []
if not operations:
return
operations_by_project = collections.defaultdict(list)
for operation in operations:
operations_by_project[operation.project_name].append(operation)
for project_name, operations in operations_by_project.items():
bulk_writes = []
for operation in operations:
mongo_op = operation.to_mongo_operation()
if mongo_op is not None:
bulk_writes.append(mongo_op)
if bulk_writes:
collection = get_project_connection(project_name)
collection.bulk_write(bulk_writes)
def create_entity(self, project_name, entity_type, data):
"""Fast access to 'CreateOperation'.
Returns:
CreateOperation: Object of update operation.
"""
operation = CreateOperation(project_name, entity_type, data)
self.add(operation)
return operation
def update_entity(self, project_name, entity_type, entity_id, update_data):
"""Fast access to 'UpdateOperation'.
Returns:
UpdateOperation: Object of update operation.
"""
operation = UpdateOperation(
project_name, entity_type, entity_id, update_data
)
self.add(operation)
return operation
def delete_entity(self, project_name, entity_type, entity_id):
"""Fast access to 'DeleteOperation'.
Returns:
DeleteOperation: Object of delete operation.
"""
operation = DeleteOperation(project_name, entity_type, entity_id)
self.add(operation)
return operation

View file

@ -5,8 +5,16 @@ import copy
import clique
import six
from openpype.client.operations import (
OperationsSession,
new_subset_document,
new_version_doc,
new_representation_doc,
prepare_subset_update_data,
prepare_version_update_data,
prepare_representation_update_data,
)
from bson.objectid import ObjectId
from pymongo import DeleteMany, ReplaceOne, InsertOne, UpdateOne
import pyblish.api
from openpype.client import (
@ -247,9 +255,12 @@ class IntegrateAsset(pyblish.api.InstancePlugin):
template_name = self.get_template_name(instance)
subset, subset_writes = self.prepare_subset(instance, project_name)
version, version_writes = self.prepare_version(
instance, subset, project_name
op_session = OperationsSession()
subset = self.prepare_subset(
instance, op_session, project_name
)
version = self.prepare_version(
instance, op_session, subset, project_name
)
instance.data["versionEntity"] = version
@ -299,7 +310,8 @@ class IntegrateAsset(pyblish.api.InstancePlugin):
# Transaction to reduce the chances of another publish trying to
# publish to the same version number since that chance can greatly
# increase if the file transaction takes a long time.
legacy_io.bulk_write(subset_writes + version_writes)
op_session.commit()
self.log.info("Subset {subset[name]} and Version {version[name]} "
"written to database..".format(subset=subset,
version=version))
@ -331,49 +343,49 @@ class IntegrateAsset(pyblish.api.InstancePlugin):
# Finalize the representations now the published files are integrated
# Get 'files' info for representations and its attached resources
representation_writes = []
new_repre_names_low = set()
for prepared in prepared_representations:
representation = prepared["representation"]
repre_doc = prepared["representation"]
repre_update_data = prepared["repre_doc_update_data"]
transfers = prepared["transfers"]
destinations = [dst for src, dst in transfers]
representation["files"] = self.get_files_info(
repre_doc["files"] = self.get_files_info(
destinations, sites=sites, anatomy=anatomy
)
# Add the version resource file infos to each representation
representation["files"] += resource_file_infos
repre_doc["files"] += resource_file_infos
# Set up representation for writing to the database. Since
# we *might* be overwriting an existing entry if the version
# already existed we'll use ReplaceOnce with `upsert=True`
representation_writes.append(ReplaceOne(
filter={"_id": representation["_id"]},
replacement=representation,
upsert=True
))
if repre_update_data is None:
op_session.create_entity(
project_name, repre_doc["type"], repre_doc
)
else:
op_session.update_entity(
project_name,
repre_doc["type"],
repre_doc["_id"],
repre_update_data
)
new_repre_names_low.add(representation["name"].lower())
new_repre_names_low.add(repre_doc["name"].lower())
# Delete any existing representations that didn't get any new data
# if the instance is not set to append mode
if not instance.data.get("append", False):
delete_names = set()
for name, existing_repres in existing_repres_by_name.items():
if name not in new_repre_names_low:
# We add the exact representation name because `name` is
# lowercase for name matching only and not in the database
delete_names.add(existing_repres["name"])
if delete_names:
representation_writes.append(DeleteMany(
filter={
"parent": version["_id"],
"name": {"$in": list(delete_names)}
}
))
op_session.delete_entity(
project_name, "representation", existing_repres["_id"]
)
# Write representations to the database
legacy_io.bulk_write(representation_writes)
self.log.debug("{}".format(op_session.to_data()))
op_session.commit()
# Backwards compatibility
# todo: can we avoid the need to store this?
@ -384,13 +396,14 @@ class IntegrateAsset(pyblish.api.InstancePlugin):
self.log.info("Registered {} representations"
"".format(len(prepared_representations)))
def prepare_subset(self, instance, project_name):
def prepare_subset(self, instance, op_session, project_name):
asset_doc = instance.data["assetEntity"]
subset_name = instance.data["subset"]
family = instance.data["family"]
self.log.debug("Subset: {}".format(subset_name))
# Get existing subset if it exists
subset_doc = get_subset_by_name(
existing_subset_doc = get_subset_by_name(
project_name, subset_name, asset_doc["_id"]
)
@ -403,69 +416,79 @@ class IntegrateAsset(pyblish.api.InstancePlugin):
if subset_group:
data["subsetGroup"] = subset_group
bulk_writes = []
if subset_doc is None:
subset_id = None
if existing_subset_doc:
subset_id = existing_subset_doc["_id"]
subset_doc = new_subset_document(
subset_name, family, asset_doc["_id"], data, subset_id
)
if existing_subset_doc is None:
# Create a new subset
self.log.info("Subset '%s' not found, creating ..." % subset_name)
subset_doc = {
"_id": ObjectId(),
"schema": "openpype:subset-3.0",
"type": "subset",
"name": subset_name,
"data": data,
"parent": asset_doc["_id"]
}
bulk_writes.append(InsertOne(subset_doc))
op_session.create_entity(
project_name, subset_doc["type"], subset_doc
)
else:
# Update existing subset data with new data and set in database.
# We also change the found subset in-place so we don't need to
# re-query the subset afterwards
subset_doc["data"].update(data)
bulk_writes.append(UpdateOne(
{"type": "subset", "_id": subset_doc["_id"]},
{"$set": {
"data": subset_doc["data"]
}}
))
update_data = prepare_subset_update_data(
existing_subset_doc, subset_doc
)
op_session.update_entity(
project_name,
subset_doc["type"],
subset_doc["_id"],
update_data
)
self.log.info("Prepared subset: {}".format(subset_name))
return subset_doc, bulk_writes
return subset_doc
def prepare_version(self, instance, subset_doc, project_name):
def prepare_version(self, instance, op_session, subset_doc, project_name):
version_number = instance.data["version"]
version_doc = {
"schema": "openpype:version-3.0",
"type": "version",
"parent": subset_doc["_id"],
"name": version_number,
"data": self.create_version_data(instance)
}
existing_version = get_version_by_name(
project_name,
version_number,
subset_doc["_id"],
fields=["_id"]
)
version_id = None
if existing_version:
version_id = existing_version["_id"]
version_data = self.create_version_data(instance)
version_doc = new_version_doc(
version_number,
subset_doc["_id"],
version_data,
version_id
)
if existing_version:
self.log.debug("Updating existing version ...")
version_doc["_id"] = existing_version["_id"]
update_data = prepare_version_update_data(
existing_version, version_doc
)
op_session.update_entity(
project_name,
version_doc["type"],
version_doc["_id"],
update_data
)
else:
self.log.debug("Creating new version ...")
version_doc["_id"] = ObjectId()
bulk_writes = [ReplaceOne(
filter={"_id": version_doc["_id"]},
replacement=version_doc,
upsert=True
)]
op_session.create_entity(
project_name, version_doc["type"], version_doc
)
self.log.info("Prepared version: v{0:03d}".format(version_doc["name"]))
return version_doc, bulk_writes
return version_doc
def prepare_representation(self, repre,
template_name,
@ -676,10 +699,9 @@ class IntegrateAsset(pyblish.api.InstancePlugin):
# Use previous representation's id if there is a name match
existing = existing_repres_by_name.get(repre["name"].lower())
repre_id = None
if existing:
repre_id = existing["_id"]
else:
repre_id = ObjectId()
# Store first transferred destination as published path data
# - used primarily for reviews that are integrated to custom modules
@ -693,20 +715,18 @@ class IntegrateAsset(pyblish.api.InstancePlugin):
# and the actual representation entity for the database
data = repre.get("data", {})
data.update({"path": published_path, "template": template})
representation = {
"_id": repre_id,
"schema": "openpype:representation-2.0",
"type": "representation",
"parent": version["_id"],
"name": repre["name"],
"data": data,
# Imprint shortcut to context for performance reasons.
"context": repre_context
}
repre_doc = new_representation_doc(
repre["name"], version["_id"], repre_context, data, repre_id
)
update_data = None
if repre_id is not None:
update_data = prepare_representation_update_data(
existing, repre_doc
)
return {
"representation": representation,
"representation": repre_doc,
"repre_doc_update_data": update_data,
"anatomy_data": template_data,
"transfers": transfers,
# todo: avoid the need for 'published_files' used by Integrate Hero