removed functions from 'ayon_core.client'

This commit is contained in:
Jakub Trllo 2024-03-08 16:00:26 +01:00
parent c359d446de
commit 0dc27825ce
10 changed files with 1 additions and 3568 deletions

View file

@ -1,102 +1,6 @@
from .utils import get_ayon_server_api_connection
from .entities import (
get_asset_by_id,
get_asset_by_name,
get_assets,
get_archived_assets,
get_asset_ids_with_subsets,
get_subset_by_id,
get_subset_by_name,
get_subsets,
get_subset_families,
get_version_by_id,
get_version_by_name,
get_versions,
get_hero_version_by_id,
get_hero_version_by_subset_id,
get_hero_versions,
get_last_versions,
get_last_version_by_subset_id,
get_last_version_by_subset_name,
get_output_link_versions,
version_is_latest,
get_representation_by_id,
get_representation_by_name,
get_representations,
get_representation_parents,
get_representations_parents,
get_archived_representations,
get_thumbnail,
get_thumbnails,
get_thumbnail_id_from_source,
get_workfile_info,
get_asset_name_identifier,
)
from .entity_links import (
get_linked_asset_ids,
get_linked_assets,
get_linked_representation_id,
)
from .operations import (
create_project,
)
__all__ = (
"get_ayon_server_api_connection",
"get_asset_by_id",
"get_asset_by_name",
"get_assets",
"get_archived_assets",
"get_asset_ids_with_subsets",
"get_subset_by_id",
"get_subset_by_name",
"get_subsets",
"get_subset_families",
"get_version_by_id",
"get_version_by_name",
"get_versions",
"get_hero_version_by_id",
"get_hero_version_by_subset_id",
"get_hero_versions",
"get_last_versions",
"get_last_version_by_subset_id",
"get_last_version_by_subset_name",
"get_output_link_versions",
"version_is_latest",
"get_representation_by_id",
"get_representation_by_name",
"get_representations",
"get_representation_parents",
"get_representations_parents",
"get_archived_representations",
"get_thumbnail",
"get_thumbnails",
"get_thumbnail_id_from_source",
"get_workfile_info",
"get_linked_asset_ids",
"get_linked_assets",
"get_linked_representation_id",
"create_project",
"get_asset_name_identifier",
)

View file

@ -1,26 +0,0 @@
# --- Folders ---
DEFAULT_FOLDER_FIELDS = {
"id",
"name",
"path",
"parentId",
"active",
"parents",
"thumbnailId"
}
REPRESENTATION_FILES_FIELDS = {
"files.name",
"files.hash",
"files.id",
"files.path",
"files.size",
}
CURRENT_ASSET_DOC_SCHEMA = "openpype:asset-3.0"
CURRENT_SUBSET_SCHEMA = "openpype:subset-3.0"
CURRENT_VERSION_SCHEMA = "openpype:version-3.0"
CURRENT_HERO_VERSION_SCHEMA = "openpype:hero_version-1.0"
CURRENT_REPRESENTATION_SCHEMA = "openpype:representation-2.0"
CURRENT_WORKFILE_INFO_SCHEMA = "openpype:workfile-1.0"
CURRENT_THUMBNAIL_SCHEMA = "openpype:thumbnail-1.0"

File diff suppressed because it is too large Load diff

View file

@ -1,702 +0,0 @@
import collections
from .constants import CURRENT_THUMBNAIL_SCHEMA
from .utils import get_ayon_server_api_connection
from .openpype_comp import get_folders_with_tasks
from .conversion_utils import (
folder_fields_v3_to_v4,
convert_v4_folder_to_v3,
subset_fields_v3_to_v4,
convert_v4_subset_to_v3,
version_fields_v3_to_v4,
convert_v4_version_to_v3,
representation_fields_v3_to_v4,
convert_v4_representation_to_v3,
workfile_info_fields_v3_to_v4,
convert_v4_workfile_info_to_v3,
)
def get_asset_name_identifier(asset_doc):
"""Get asset name identifier by asset document.
This function is added because of AYON implementation where name
identifier is not just a name but full path.
Asset document must have "name" key, and "data.parents" when in AYON mode.
Args:
asset_doc (dict[str, Any]): Asset document.
"""
parents = list(asset_doc["data"]["parents"])
parents.append(asset_doc["name"])
return "/" + "/".join(parents)
def _get_subsets(
project_name,
subset_ids=None,
subset_names=None,
folder_ids=None,
names_by_folder_ids=None,
archived=False,
fields=None
):
# Convert fields and add minimum required fields
con = get_ayon_server_api_connection()
fields = subset_fields_v3_to_v4(fields, con)
if fields is not None:
for key in (
"id",
"active"
):
fields.add(key)
active = True
if archived:
active = None
for subset in con.get_products(
project_name,
product_ids=subset_ids,
product_names=subset_names,
folder_ids=folder_ids,
names_by_folder_ids=names_by_folder_ids,
active=active,
fields=fields,
):
yield convert_v4_subset_to_v3(subset)
def _get_versions(
project_name,
version_ids=None,
subset_ids=None,
versions=None,
hero=True,
standard=True,
latest=None,
active=None,
fields=None
):
con = get_ayon_server_api_connection()
fields = version_fields_v3_to_v4(fields, con)
# Make sure 'productId' and 'version' are available when hero versions
# are queried
if fields and hero:
fields = set(fields)
fields |= {"productId", "version"}
queried_versions = con.get_versions(
project_name,
version_ids=version_ids,
product_ids=subset_ids,
versions=versions,
hero=hero,
standard=standard,
latest=latest,
active=active,
fields=fields
)
version_entities = []
hero_versions = []
for version in queried_versions:
if version["version"] < 0:
hero_versions.append(version)
else:
version_entities.append(convert_v4_version_to_v3(version))
if hero_versions:
subset_ids = set()
versions_nums = set()
for hero_version in hero_versions:
versions_nums.add(abs(hero_version["version"]))
subset_ids.add(hero_version["productId"])
hero_eq_versions = con.get_versions(
project_name,
product_ids=subset_ids,
versions=versions_nums,
hero=False,
fields=["id", "version", "productId"]
)
hero_eq_by_subset_id = collections.defaultdict(list)
for version in hero_eq_versions:
hero_eq_by_subset_id[version["productId"]].append(version)
for hero_version in hero_versions:
abs_version = abs(hero_version["version"])
subset_id = hero_version["productId"]
version_id = None
for version in hero_eq_by_subset_id.get(subset_id, []):
if version["version"] == abs_version:
version_id = version["id"]
break
conv_hero = convert_v4_version_to_v3(hero_version)
conv_hero["version_id"] = version_id
version_entities.append(conv_hero)
return version_entities
def get_asset_by_id(project_name, asset_id, fields=None):
assets = get_assets(
project_name, asset_ids=[asset_id], fields=fields
)
for asset in assets:
return asset
return None
def get_asset_by_name(project_name, asset_name, fields=None):
assets = get_assets(
project_name, asset_names=[asset_name], fields=fields
)
for asset in assets:
return asset
return None
def _folders_query(project_name, con, fields, **kwargs):
if fields is None or "tasks" in fields:
folders = get_folders_with_tasks(
con, project_name, fields=fields, **kwargs
)
else:
folders = con.get_folders(project_name, fields=fields, **kwargs)
for folder in folders:
yield folder
def get_assets(
project_name,
asset_ids=None,
asset_names=None,
parent_ids=None,
archived=False,
fields=None
):
if not project_name:
return
active = True
if archived:
active = None
con = get_ayon_server_api_connection()
fields = folder_fields_v3_to_v4(fields, con)
kwargs = dict(
folder_ids=asset_ids,
parent_ids=parent_ids,
active=active,
)
if not asset_names:
for folder in _folders_query(project_name, con, fields, **kwargs):
yield convert_v4_folder_to_v3(folder, project_name)
return
new_asset_names = set()
folder_paths = set()
for name in asset_names:
if "/" in name:
folder_paths.add(name)
else:
new_asset_names.add(name)
yielded_ids = set()
if folder_paths:
for folder in _folders_query(
project_name, con, fields, folder_paths=folder_paths, **kwargs
):
yielded_ids.add(folder["id"])
yield convert_v4_folder_to_v3(folder, project_name)
if not new_asset_names:
return
for folder in _folders_query(
project_name, con, fields, folder_names=new_asset_names, **kwargs
):
if folder["id"] not in yielded_ids:
yielded_ids.add(folder["id"])
yield convert_v4_folder_to_v3(folder, project_name)
def get_archived_assets(
project_name,
asset_ids=None,
asset_names=None,
parent_ids=None,
fields=None
):
return get_assets(
project_name,
asset_ids,
asset_names,
parent_ids,
True,
fields
)
def get_asset_ids_with_subsets(project_name, asset_ids=None):
con = get_ayon_server_api_connection()
return con.get_folder_ids_with_products(project_name, asset_ids)
def get_subset_by_id(project_name, subset_id, fields=None):
subsets = get_subsets(
project_name, subset_ids=[subset_id], fields=fields
)
for subset in subsets:
return subset
return None
def get_subset_by_name(project_name, subset_name, asset_id, fields=None):
subsets = get_subsets(
project_name,
subset_names=[subset_name],
asset_ids=[asset_id],
fields=fields
)
for subset in subsets:
return subset
return None
def get_subsets(
project_name,
subset_ids=None,
subset_names=None,
asset_ids=None,
names_by_asset_ids=None,
archived=False,
fields=None
):
return _get_subsets(
project_name,
subset_ids,
subset_names,
asset_ids,
names_by_asset_ids,
archived,
fields=fields
)
def get_subset_families(project_name, subset_ids=None):
con = get_ayon_server_api_connection()
return con.get_product_type_names(project_name, subset_ids)
def get_version_by_id(project_name, version_id, fields=None):
versions = get_versions(
project_name,
version_ids=[version_id],
fields=fields,
hero=True
)
for version in versions:
return version
return None
def get_version_by_name(project_name, version, subset_id, fields=None):
versions = get_versions(
project_name,
subset_ids=[subset_id],
versions=[version],
fields=fields
)
for version in versions:
return version
return None
def get_versions(
project_name,
version_ids=None,
subset_ids=None,
versions=None,
hero=False,
fields=None
):
return _get_versions(
project_name,
version_ids,
subset_ids,
versions,
hero=hero,
standard=True,
fields=fields
)
def get_hero_version_by_id(project_name, version_id, fields=None):
versions = get_hero_versions(
project_name,
version_ids=[version_id],
fields=fields
)
for version in versions:
return version
return None
def get_hero_version_by_subset_id(
project_name, subset_id, fields=None
):
versions = get_hero_versions(
project_name,
subset_ids=[subset_id],
fields=fields
)
for version in versions:
return version
return None
def get_hero_versions(
project_name, subset_ids=None, version_ids=None, fields=None
):
return _get_versions(
project_name,
version_ids=version_ids,
subset_ids=subset_ids,
hero=True,
standard=False,
fields=fields
)
def get_last_versions(project_name, subset_ids, active=None, fields=None):
if fields:
fields = set(fields)
fields.add("parent")
versions = _get_versions(
project_name,
subset_ids=subset_ids,
latest=True,
hero=False,
active=active,
fields=fields
)
return {
version["parent"]: version
for version in versions
}
def get_last_version_by_subset_id(project_name, subset_id, fields=None):
versions = _get_versions(
project_name,
subset_ids=[subset_id],
latest=True,
hero=False,
fields=fields
)
if not versions:
return None
return versions[0]
def get_last_version_by_subset_name(
project_name,
subset_name,
asset_id=None,
asset_name=None,
fields=None
):
if not asset_id and not asset_name:
return None
if not asset_id:
asset = get_asset_by_name(
project_name, asset_name, fields=["_id"]
)
if not asset:
return None
asset_id = asset["_id"]
subset = get_subset_by_name(
project_name, subset_name, asset_id, fields=["_id"]
)
if not subset:
return None
return get_last_version_by_subset_id(
project_name, subset["_id"], fields=fields
)
def get_output_link_versions(project_name, version_id, fields=None):
if not version_id:
return []
con = get_ayon_server_api_connection()
version_links = con.get_version_links(
project_name, version_id, link_direction="out")
version_ids = {
link["entityId"]
for link in version_links
if link["entityType"] == "version"
}
if not version_ids:
return []
return get_versions(project_name, version_ids=version_ids, fields=fields)
def version_is_latest(project_name, version_id):
con = get_ayon_server_api_connection()
return con.version_is_latest(project_name, version_id)
def get_representation_by_id(project_name, representation_id, fields=None):
representations = get_representations(
project_name,
representation_ids=[representation_id],
fields=fields
)
for representation in representations:
return representation
return None
def get_representation_by_name(
project_name, representation_name, version_id, fields=None
):
representations = get_representations(
project_name,
representation_names=[representation_name],
version_ids=[version_id],
fields=fields
)
for representation in representations:
return representation
return None
def get_representations(
project_name,
representation_ids=None,
representation_names=None,
version_ids=None,
context_filters=None,
names_by_version_ids=None,
archived=False,
standard=True,
fields=None
):
if context_filters is not None:
# TODO should we add the support?
# - there was ability to fitler using regex
raise ValueError("OP v4 can't filter by representation context.")
if not archived and not standard:
return
if archived and not standard:
active = False
elif not archived and standard:
active = True
else:
active = None
con = get_ayon_server_api_connection()
fields = representation_fields_v3_to_v4(fields, con)
if fields and active is not None:
fields.add("active")
representations = con.get_representations(
project_name,
representation_ids=representation_ids,
representation_names=representation_names,
version_ids=version_ids,
names_by_version_ids=names_by_version_ids,
active=active,
fields=fields
)
for representation in representations:
yield convert_v4_representation_to_v3(representation)
def get_representation_parents(project_name, representation):
if not representation:
return None
repre_id = representation["_id"]
parents_by_repre_id = get_representations_parents(
project_name, [representation]
)
return parents_by_repre_id[repre_id]
def get_representations_parents(project_name, representations):
repre_ids = {
repre["_id"]
for repre in representations
}
con = get_ayon_server_api_connection()
parents_by_repre_id = con.get_representations_parents(
project_name, repre_ids
)
new_parents = {}
for repre_id, parents in parents_by_repre_id.items():
version, subset, folder, project = parents
new_parents[repre_id] = (
version,
subset,
folder,
project
)
return new_parents
def get_archived_representations(
project_name,
representation_ids=None,
representation_names=None,
version_ids=None,
context_filters=None,
names_by_version_ids=None,
fields=None
):
return get_representations(
project_name,
representation_ids=representation_ids,
representation_names=representation_names,
version_ids=version_ids,
context_filters=context_filters,
names_by_version_ids=names_by_version_ids,
archived=True,
standard=False,
fields=fields
)
def get_thumbnail(
project_name, thumbnail_id, entity_type, entity_id, fields=None
):
"""Receive thumbnail entity data.
Args:
project_name (str): Name of project where to look for queried entities.
thumbnail_id (Union[str, ObjectId]): Id of thumbnail entity.
entity_type (str): Type of entity for which the thumbnail should be
received.
entity_id (str): Id of entity for which the thumbnail should be
received.
fields (Iterable[str]): Fields that should be returned. All fields are
returned if 'None' is passed.
Returns:
None: If thumbnail with specified id was not found.
Dict: Thumbnail entity data which can be reduced to specified 'fields'.
"""
if not thumbnail_id or not entity_type or not entity_id:
return None
if entity_type == "asset":
entity_type = "folder"
elif entity_type == "hero_version":
entity_type = "version"
return {
"_id": thumbnail_id,
"type": "thumbnail",
"schema": CURRENT_THUMBNAIL_SCHEMA,
"data": {
"entity_type": entity_type,
"entity_id": entity_id
}
}
def get_thumbnails(project_name, thumbnail_contexts, fields=None):
"""Get thumbnail entities.
Warning:
This function is not OpenPype compatible. There is none usage of this
function in codebase so there is nothing to convert. The previous
implementation cannot be AYON compatible without entity types.
"""
thumbnail_items = set()
for thumbnail_context in thumbnail_contexts:
thumbnail_id, entity_type, entity_id = thumbnail_context
thumbnail_item = get_thumbnail(
project_name, thumbnail_id, entity_type, entity_id
)
if thumbnail_item:
thumbnail_items.add(thumbnail_item)
return list(thumbnail_items)
def get_thumbnail_id_from_source(project_name, src_type, src_id):
"""Receive thumbnail id from source entity.
Args:
project_name (str): Name of project where to look for queried entities.
src_type (str): Type of source entity ('asset', 'version').
src_id (Union[str, ObjectId]): Id of source entity.
Returns:
ObjectId: Thumbnail id assigned to entity.
None: If Source entity does not have any thumbnail id assigned.
"""
if not src_type or not src_id:
return None
if src_type == "version":
version = get_version_by_id(
project_name, src_id, fields=["data.thumbnail_id"]
) or {}
return version.get("data", {}).get("thumbnail_id")
if src_type == "asset":
asset = get_asset_by_id(
project_name, src_id, fields=["data.thumbnail_id"]
) or {}
return asset.get("data", {}).get("thumbnail_id")
return None
def get_workfile_info(
project_name, asset_id, task_name, filename, fields=None
):
if not asset_id or not task_name or not filename:
return None
con = get_ayon_server_api_connection()
task = con.get_task_by_name(
project_name, asset_id, task_name, fields=["id", "name", "folderId"]
)
if not task:
return None
fields = workfile_info_fields_v3_to_v4(fields)
for workfile_info in con.get_workfiles_info(
project_name, task_ids=[task["id"]], fields=fields
):
if workfile_info["name"] == filename:
return convert_v4_workfile_info_to_v3(workfile_info, task)
return None

View file

@ -1,157 +0,0 @@
from .utils import get_ayon_server_api_connection
from .entities import get_assets, get_representation_by_id
def get_linked_asset_ids(project_name, asset_doc=None, asset_id=None):
"""Extract linked asset ids from asset document.
One of asset document or asset id must be passed.
Note:
Asset links now works only from asset to assets.
Args:
project_name (str): Project where to look for asset.
asset_doc (dict): Asset document from DB.
asset_id (str): Asset id to find its document.
Returns:
List[Union[ObjectId, str]]: Asset ids of input links.
"""
output = []
if not asset_doc and not asset_id:
return output
if not asset_id:
asset_id = asset_doc["_id"]
con = get_ayon_server_api_connection()
links = con.get_folder_links(project_name, asset_id, link_direction="in")
return [
link["entityId"]
for link in links
if link["entityType"] == "folder"
]
def get_linked_assets(
project_name, asset_doc=None, asset_id=None, fields=None
):
"""Return linked assets based on passed asset document.
One of asset document or asset id must be passed.
Args:
project_name (str): Name of project where to look for queried entities.
asset_doc (Dict[str, Any]): Asset document from database.
asset_id (Union[ObjectId, str]): Asset id. Can be used instead of
asset document.
fields (Iterable[str]): Fields that should be returned. All fields are
returned if 'None' is passed.
Returns:
List[Dict[str, Any]]: Asset documents of input links for passed
asset doc.
"""
link_ids = get_linked_asset_ids(project_name, asset_doc, asset_id)
if not link_ids:
return []
return list(get_assets(project_name, asset_ids=link_ids, fields=fields))
def get_linked_representation_id(
project_name, repre_doc=None, repre_id=None, link_type=None, max_depth=None
):
"""Returns list of linked ids of particular type (if provided).
One of representation document or representation id must be passed.
Note:
Representation links now works only from representation through version
back to representations.
Todos:
Missing depth query. Not sure how it did find more representations in
depth, probably links to version?
Args:
project_name (str): Name of project where look for links.
repre_doc (Dict[str, Any]): Representation document.
repre_id (Union[ObjectId, str]): Representation id.
link_type (str): Type of link (e.g. 'reference', ...).
max_depth (int): Limit recursion level. Default: 0
Returns:
List[ObjectId] Linked representation ids.
"""
if repre_doc:
repre_id = repre_doc["_id"]
if not repre_id and not repre_doc:
return []
version_id = None
if repre_doc:
version_id = repre_doc.get("parent")
if not version_id:
repre_doc = get_representation_by_id(
project_name, repre_id, fields=["parent"]
)
if repre_doc:
version_id = repre_doc["parent"]
if not version_id:
return []
if max_depth is None or max_depth == 0:
max_depth = 1
link_types = None
if link_type:
link_types = [link_type]
con = get_ayon_server_api_connection()
# Store already found version ids to avoid recursion, and also to store
# output -> Don't forget to remove 'version_id' at the end!!!
linked_version_ids = {version_id}
# Each loop of depth will reset this variable
versions_to_check = {version_id}
for _ in range(max_depth):
if not versions_to_check:
break
versions_links = con.get_versions_links(
project_name,
versions_to_check,
link_types=link_types,
link_direction="out")
versions_to_check = set()
for links in versions_links.values():
for link in links:
# Care only about version links
if link["entityType"] != "version":
continue
entity_id = link["entityId"]
# Skip already found linked version ids
if entity_id in linked_version_ids:
continue
linked_version_ids.add(entity_id)
versions_to_check.add(entity_id)
linked_version_ids.remove(version_id)
if not linked_version_ids:
return []
con = get_ayon_server_api_connection()
representations = con.get_representations(
project_name,
version_ids=linked_version_ids,
fields=["id"])
return [
repre["id"]
for repre in representations
]

View file

@ -1,39 +0,0 @@
# Client functionality
## Reason
Preparation for OpenPype v4 server. Goal is to remove direct mongo calls in code to prepare a little bit for different source of data for code before. To start think about database calls less as mongo calls but more universally. To do so was implemented simple wrapper around database calls to not use pymongo specific code.
Current goal is not to make universal database model which can be easily replaced with any different source of data but to make it close as possible. Current implementation of OpenPype is too tightly connected to pymongo and it's abilities so we're trying to get closer with long term changes that can be used even in current state.
## Queries
Query functions don't use full potential of mongo queries like very specific queries based on subdictionaries or unknown structures. We try to avoid these calls as much as possible because they'll probably won't be available in future. If it's really necessary a new function can be added but only if it's reasonable for overall logic. All query functions were moved to `~/client/entities.py`. Each function has arguments with available filters and possible reduce of returned keys for each entity.
## Changes
Changes are a little bit complicated. Mongo has many options how update can happen which had to be reduced also it would be at this stage complicated to validate values which are created or updated thus automation is at this point almost none. Changes can be made using operations available in `~/client/operations.py`. Each operation require project name and entity type, but may require operation specific data.
### Create
Create operations expect already prepared document data, for that are prepared functions creating skeletal structures of documents (do not fill all required data), except `_id` all data should be right. Existence of entity is not validated so if the same creation operation is send n times it will create the entity n times which can cause issues.
### Update
Update operation require entity id and keys that should be changed, update dictionary must have {"key": value}. If value should be set in nested dictionary the key must have also all subkeys joined with dot `.` (e.g. `{"data": {"fps": 25}}` -> `{"data.fps": 25}`). To simplify update dictionaries were prepared functions which does that for you, their name has template `prepare_<entity type>_update_data` - they work on comparison of previous document and new document. If there is missing function for requested entity type it is because we didn't need it yet and require implementation.
### Delete
Delete operation need entity id. Entity will be deleted from mongo.
## What (probably) won't be replaced
Some parts of code are still using direct mongo calls. In most of cases it is for very specific calls that are module specific or their usage will completely change in future.
- Mongo calls that are not project specific (out of `avalon` collection) will be removed or will have to use different mechanism how the data are stored. At this moment it is related to OpenPype settings and logs, ftrack server events, some other data.
- Sync server queries. They're complex and very specific for sync server module. Their replacement will require specific calls to OpenPype server in v4 thus their abstraction with wrapper is irrelevant and would complicate production in v3.
- Project managers (ftrack, kitsu, shotgrid, embedded Project Manager, etc.). Project managers are creating, updating or removing assets in v3, but in v4 will create folders with different structure. Wrapping creation of assets would not help to prepare for v4 because of new data structures. The same can be said about editorial Extract Hierarchy Avalon plugin which create project structure.
- Code parts that is marked as deprecated in v3 or will be deprecated in v4.
- integrate asset legacy publish plugin - already is legacy kept for safety
- integrate thumbnail - thumbnails will be stored in different way in v4
- input links - link will be stored in different way and will have different mechanism of linking. In v3 are links limited to same entity type "asset <-> asset" or "representation <-> representation".
## Known missing replacements
- change subset group in loader tool
- integrate subset group
- query input links in openpype lib
- create project in openpype lib
- save/create workfile doc in openpype lib
- integrate hero version

View file

@ -1,159 +0,0 @@
import collections
import json
import six
from ayon_api.graphql import GraphQlQuery, FIELD_VALUE, fields_to_dict
from .constants import DEFAULT_FOLDER_FIELDS
def folders_tasks_graphql_query(fields):
query = GraphQlQuery("FoldersQuery")
project_name_var = query.add_variable("projectName", "String!")
folder_ids_var = query.add_variable("folderIds", "[String!]")
parent_folder_ids_var = query.add_variable("parentFolderIds", "[String!]")
folder_paths_var = query.add_variable("folderPaths", "[String!]")
folder_names_var = query.add_variable("folderNames", "[String!]")
has_products_var = query.add_variable("folderHasProducts", "Boolean!")
project_field = query.add_field("project")
project_field.set_filter("name", project_name_var)
folders_field = project_field.add_field_with_edges("folders")
folders_field.set_filter("ids", folder_ids_var)
folders_field.set_filter("parentIds", parent_folder_ids_var)
folders_field.set_filter("names", folder_names_var)
folders_field.set_filter("paths", folder_paths_var)
folders_field.set_filter("hasProducts", has_products_var)
fields = set(fields)
fields.discard("tasks")
tasks_field = folders_field.add_field_with_edges("tasks")
tasks_field.add_field("name")
tasks_field.add_field("taskType")
nested_fields = fields_to_dict(fields)
query_queue = collections.deque()
for key, value in nested_fields.items():
query_queue.append((key, value, folders_field))
while query_queue:
item = query_queue.popleft()
key, value, parent = item
field = parent.add_field(key)
if value is FIELD_VALUE:
continue
for k, v in value.items():
query_queue.append((k, v, field))
return query
def get_folders_with_tasks(
con,
project_name,
folder_ids=None,
folder_paths=None,
folder_names=None,
parent_ids=None,
active=True,
fields=None
):
"""Query folders with tasks from server.
This is for v4 compatibility where tasks were stored on assets. This is
an inefficient way how folders and tasks are queried so it was added only
as compatibility function.
Todos:
Folder name won't be unique identifier, so we should add folder path
filtering.
Notes:
Filter 'active' don't have direct filter in GraphQl.
Args:
con (ServerAPI): Connection to server.
project_name (str): Name of project where folders are.
folder_ids (Iterable[str]): Folder ids to filter.
folder_paths (Iterable[str]): Folder paths used for filtering.
folder_names (Iterable[str]): Folder names used for filtering.
parent_ids (Iterable[str]): Ids of folder parents. Use 'None'
if folder is direct child of project.
active (Union[bool, None]): Filter active/inactive folders. Both
are returned if is set to None.
fields (Union[Iterable(str), None]): Fields to be queried
for folder. All possible folder fields are returned if 'None'
is passed.
Yields:
Dict[str, Any]: Queried folder entities.
"""
if not project_name:
return
filters = {
"projectName": project_name
}
if folder_ids is not None:
folder_ids = set(folder_ids)
if not folder_ids:
return
filters["folderIds"] = list(folder_ids)
if folder_paths is not None:
folder_paths = set(folder_paths)
if not folder_paths:
return
filters["folderPaths"] = list(folder_paths)
if folder_names is not None:
folder_names = set(folder_names)
if not folder_names:
return
filters["folderNames"] = list(folder_names)
if parent_ids is not None:
parent_ids = set(parent_ids)
if not parent_ids:
return
if None in parent_ids:
# Replace 'None' with '"root"' which is used during GraphQl
# query for parent ids filter for folders without folder
# parent
parent_ids.remove(None)
parent_ids.add("root")
if project_name in parent_ids:
# Replace project name with '"root"' which is used during
# GraphQl query for parent ids filter for folders without
# folder parent
parent_ids.remove(project_name)
parent_ids.add("root")
filters["parentFolderIds"] = list(parent_ids)
if fields:
fields = set(fields)
else:
fields = con.get_default_fields_for_type("folder")
fields |= DEFAULT_FOLDER_FIELDS
if active is not None:
fields.add("active")
query = folders_tasks_graphql_query(fields)
for attr, filter_value in filters.items():
query.set_variable_value(attr, filter_value)
parsed_data = query.query(con)
folders = parsed_data["project"]["folders"]
for folder in folders:
if active is not None and folder["active"] is not active:
continue
folder_data = folder.get("data")
if isinstance(folder_data, six.string_types):
folder["data"] = json.loads(folder_data)
yield folder

View file

@ -1,838 +0,0 @@
import copy
import json
import collections
import uuid
import datetime
from .constants import (
CURRENT_ASSET_DOC_SCHEMA,
CURRENT_SUBSET_SCHEMA,
CURRENT_VERSION_SCHEMA,
CURRENT_HERO_VERSION_SCHEMA,
CURRENT_REPRESENTATION_SCHEMA,
CURRENT_WORKFILE_INFO_SCHEMA,
CURRENT_THUMBNAIL_SCHEMA,
)
from .operations_base import (
REMOVED_VALUE,
CreateOperation,
UpdateOperation,
DeleteOperation,
BaseOperationsSession
)
from .conversion_utils import (
convert_create_asset_to_v4,
convert_create_task_to_v4,
convert_create_subset_to_v4,
convert_create_version_to_v4,
convert_create_hero_version_to_v4,
convert_create_representation_to_v4,
convert_create_workfile_info_to_v4,
convert_update_folder_to_v4,
convert_update_subset_to_v4,
convert_update_version_to_v4,
convert_update_hero_version_to_v4,
convert_update_representation_to_v4,
convert_update_workfile_info_to_v4,
)
from .utils import create_entity_id, get_ayon_server_api_connection
def _create_or_convert_to_id(entity_id=None):
if entity_id is None:
return create_entity_id()
# Validate if can be converted to uuid
uuid.UUID(entity_id)
return entity_id
def new_asset_document(
name, project_id, parent_id, parents, data=None, entity_id=None
):
"""Create skeleton data of asset document.
Args:
name (str): Is considered as unique identifier of asset in project.
project_id (Union[str, ObjectId]): Id of project doument.
parent_id (Union[str, ObjectId]): Id of parent asset.
parents (List[str]): List of parent assets names.
data (Dict[str, Any]): Asset document data. Empty dictionary is used
if not passed. Value of 'parent_id' is used to fill 'visualParent'.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of asset document.
"""
if data is None:
data = {}
if parent_id is not None:
parent_id = _create_or_convert_to_id(parent_id)
data["visualParent"] = parent_id
data["parents"] = parents
return {
"_id": _create_or_convert_to_id(entity_id),
"type": "asset",
"name": name,
# This will be ignored
"parent": project_id,
"data": data,
"schema": CURRENT_ASSET_DOC_SCHEMA
}
def new_subset_document(name, family, asset_id, data=None, entity_id=None):
"""Create skeleton data of subset document.
Args:
name (str): Is considered as unique identifier of subset under asset.
family (str): Subset's family.
asset_id (Union[str, ObjectId]): Id of parent asset.
data (Dict[str, Any]): Subset document data. Empty dictionary is used
if not passed. Value of 'family' is used to fill 'family'.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of subset document.
"""
if data is None:
data = {}
data["family"] = family
return {
"_id": _create_or_convert_to_id(entity_id),
"schema": CURRENT_SUBSET_SCHEMA,
"type": "subset",
"name": name,
"data": data,
"parent": _create_or_convert_to_id(asset_id)
}
def new_version_doc(version, subset_id, data=None, entity_id=None):
"""Create skeleton data of version document.
Args:
version (int): Is considered as unique identifier of version
under subset.
subset_id (Union[str, ObjectId]): Id of parent subset.
data (Dict[str, Any]): Version document data.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of version document.
"""
if data is None:
data = {}
return {
"_id": _create_or_convert_to_id(entity_id),
"schema": CURRENT_VERSION_SCHEMA,
"type": "version",
"name": int(version),
"parent": _create_or_convert_to_id(subset_id),
"data": data
}
def new_hero_version_doc(subset_id, data, version=None, entity_id=None):
"""Create skeleton data of hero version document.
Args:
subset_id (Union[str, ObjectId]): Id of parent subset.
data (Dict[str, Any]): Version document data.
version (int): Version of source version.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of version document.
"""
if version is None:
version = -1
elif version > 0:
version = -version
return {
"_id": _create_or_convert_to_id(entity_id),
"schema": CURRENT_HERO_VERSION_SCHEMA,
"type": "hero_version",
"version": version,
"parent": _create_or_convert_to_id(subset_id),
"data": data
}
def new_representation_doc(
name, version_id, context, data=None, entity_id=None
):
"""Create skeleton data of representation document.
Args:
name (str): Representation name considered as unique identifier
of representation under version.
version_id (Union[str, ObjectId]): Id of parent version.
context (Dict[str, Any]): Representation context used for fill template
of to query.
data (Dict[str, Any]): Representation document data.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of version document.
"""
if data is None:
data = {}
return {
"_id": _create_or_convert_to_id(entity_id),
"schema": CURRENT_REPRESENTATION_SCHEMA,
"type": "representation",
"parent": _create_or_convert_to_id(version_id),
"name": name,
"data": data,
# Imprint shortcut to context for performance reasons.
"context": context
}
def new_thumbnail_doc(data=None, entity_id=None):
"""Create skeleton data of thumbnail document.
Args:
data (Dict[str, Any]): Thumbnail document data.
entity_id (Union[str, ObjectId]): Predefined id of document. New id is
created if not passed.
Returns:
Dict[str, Any]: Skeleton of thumbnail document.
"""
if data is None:
data = {}
return {
"_id": _create_or_convert_to_id(entity_id),
"type": "thumbnail",
"schema": CURRENT_THUMBNAIL_SCHEMA,
"data": data
}
def new_workfile_info_doc(
filename, asset_id, task_name, files, data=None, entity_id=None
):
"""Create skeleton data of workfile info document.
Workfile document is at this moment used primarily for artist notes.
Args:
filename (str): Filename of workfile.
asset_id (Union[str, ObjectId]): Id of asset under which workfile live.
task_name (str): Task under which was workfile created.
files (List[str]): List of rootless filepaths related to workfile.
data (Dict[str, Any]): Additional metadata.
Returns:
Dict[str, Any]: Skeleton of workfile info document.
"""
if not data:
data = {}
return {
"_id": _create_or_convert_to_id(entity_id),
"type": "workfile",
"parent": _create_or_convert_to_id(asset_id),
"task_name": task_name,
"filename": filename,
"data": data,
"files": files
}
def _prepare_update_data(old_doc, new_doc, replace):
changes = {}
for key, value in new_doc.items():
if key not in old_doc or value != old_doc[key]:
changes[key] = value
if replace:
for key in old_doc.keys():
if key not in new_doc:
changes[key] = REMOVED_VALUE
return changes
def prepare_subset_update_data(old_doc, new_doc, replace=True):
"""Compare two subset documents and prepare update data.
Based on compared values will create update data for
'MongoUpdateOperation'.
Empty output means that documents are identical.
Returns:
Dict[str, Any]: Changes between old and new document.
"""
return _prepare_update_data(old_doc, new_doc, replace)
def prepare_version_update_data(old_doc, new_doc, replace=True):
"""Compare two version documents and prepare update data.
Based on compared values will create update data for
'MongoUpdateOperation'.
Empty output means that documents are identical.
Returns:
Dict[str, Any]: Changes between old and new document.
"""
return _prepare_update_data(old_doc, new_doc, replace)
def prepare_hero_version_update_data(old_doc, new_doc, replace=True):
"""Compare two hero version documents and prepare update data.
Based on compared values will create update data for 'UpdateOperation'.
Empty output means that documents are identical.
Returns:
Dict[str, Any]: Changes between old and new document.
"""
changes = _prepare_update_data(old_doc, new_doc, replace)
changes.pop("version_id", None)
return changes
def prepare_representation_update_data(old_doc, new_doc, replace=True):
"""Compare two representation documents and prepare update data.
Based on compared values will create update data for
'MongoUpdateOperation'.
Empty output means that documents are identical.
Returns:
Dict[str, Any]: Changes between old and new document.
"""
changes = _prepare_update_data(old_doc, new_doc, replace)
context = changes.get("data", {}).get("context")
# Make sure that both 'family' and 'subset' are in changes if
# one of them changed (they'll both become 'product').
if (
context
and ("family" in context or "subset" in context)
):
context["family"] = new_doc["data"]["context"]["family"]
context["subset"] = new_doc["data"]["context"]["subset"]
return changes
def prepare_workfile_info_update_data(old_doc, new_doc, replace=True):
"""Compare two workfile info documents and prepare update data.
Based on compared values will create update data for
'MongoUpdateOperation'.
Empty output means that documents are identical.
Returns:
Dict[str, Any]: Changes between old and new document.
"""
return _prepare_update_data(old_doc, new_doc, replace)
class FailedOperations(Exception):
pass
def entity_data_json_default(value):
if isinstance(value, datetime.datetime):
return int(value.timestamp())
raise TypeError(
"Object of type {} is not JSON serializable".format(str(type(value)))
)
def failed_json_default(value):
return "< Failed value {} > {}".format(type(value), str(value))
class ServerCreateOperation(CreateOperation):
"""Operation to create an entity.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
data (Dict[str, Any]): Data of entity that will be created.
"""
def __init__(self, project_name, entity_type, data, session):
self._session = session
if not data:
data = {}
data = copy.deepcopy(data)
if entity_type == "project":
raise ValueError("Project cannot be created using operations")
tasks = None
if entity_type in "asset":
# TODO handle tasks
entity_type = "folder"
if "data" in data:
tasks = data["data"].get("tasks")
project = self._session.get_project(project_name)
new_data = convert_create_asset_to_v4(data, project, self.con)
elif entity_type == "task":
project = self._session.get_project(project_name)
new_data = convert_create_task_to_v4(data, project, self.con)
elif entity_type == "subset":
new_data = convert_create_subset_to_v4(data, self.con)
entity_type = "product"
elif entity_type == "version":
new_data = convert_create_version_to_v4(data, self.con)
elif entity_type == "hero_version":
new_data = convert_create_hero_version_to_v4(
data, project_name, self.con
)
entity_type = "version"
elif entity_type in ("representation", "archived_representation"):
new_data = convert_create_representation_to_v4(data, self.con)
entity_type = "representation"
elif entity_type == "workfile":
new_data = convert_create_workfile_info_to_v4(
data, project_name, self.con
)
else:
raise ValueError(
"Unhandled entity type \"{}\"".format(entity_type)
)
# Simple check if data can be dumped into json
# - should raise error on 'ObjectId' object
try:
new_data = json.loads(
json.dumps(new_data, default=entity_data_json_default)
)
except:
raise ValueError("Couldn't json parse body: {}".format(
json.dumps(new_data, default=failed_json_default)
))
super(ServerCreateOperation, self).__init__(
project_name, entity_type, new_data
)
if "id" not in self._data:
self._data["id"] = create_entity_id()
if tasks:
copied_tasks = copy.deepcopy(tasks)
for task_name, task in copied_tasks.items():
task["name"] = task_name
task["folderId"] = self._data["id"]
self.session.create_entity(
project_name, "task", task, nested_id=self.id
)
@property
def con(self):
return self.session.con
@property
def session(self):
return self._session
@property
def entity_id(self):
return self._data["id"]
def to_server_operation(self):
return {
"id": self.id,
"type": "create",
"entityType": self.entity_type,
"entityId": self.entity_id,
"data": self._data
}
class ServerUpdateOperation(UpdateOperation):
"""Operation to update an entity.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
entity_id (Union[str, ObjectId]): Identifier of an entity.
update_data (Dict[str, Any]): Key -> value changes that will be set in
database. If value is set to 'REMOVED_VALUE' the key will be
removed. Only first level of dictionary is checked (on purpose).
"""
def __init__(
self, project_name, entity_type, entity_id, update_data, session
):
self._session = session
update_data = copy.deepcopy(update_data)
if entity_type == "project":
raise ValueError("Project cannot be created using operations")
if entity_type in ("asset", "archived_asset"):
new_update_data = convert_update_folder_to_v4(
project_name, entity_id, update_data, self.con
)
entity_type = "folder"
elif entity_type == "subset":
new_update_data = convert_update_subset_to_v4(
project_name, entity_id, update_data, self.con
)
entity_type = "product"
elif entity_type == "version":
new_update_data = convert_update_version_to_v4(
project_name, entity_id, update_data, self.con
)
elif entity_type == "hero_version":
new_update_data = convert_update_hero_version_to_v4(
project_name, entity_id, update_data, self.con
)
entity_type = "version"
elif entity_type in ("representation", "archived_representation"):
new_update_data = convert_update_representation_to_v4(
project_name, entity_id, update_data, self.con
)
entity_type = "representation"
elif entity_type == "workfile":
new_update_data = convert_update_workfile_info_to_v4(
project_name, entity_id, update_data, self.con
)
else:
raise ValueError(
"Unhandled entity type \"{}\"".format(entity_type)
)
try:
new_update_data = json.loads(
json.dumps(new_update_data, default=entity_data_json_default)
)
except:
raise ValueError("Couldn't json parse body: {}".format(
json.dumps(new_update_data, default=failed_json_default)
))
super(ServerUpdateOperation, self).__init__(
project_name, entity_type, entity_id, new_update_data
)
@property
def con(self):
return self.session.con
@property
def session(self):
return self._session
def to_server_operation(self):
if not self._update_data:
return None
update_data = {}
for key, value in self._update_data.items():
if value is REMOVED_VALUE:
value = None
update_data[key] = value
return {
"id": self.id,
"type": "update",
"entityType": self.entity_type,
"entityId": self.entity_id,
"data": update_data
}
class ServerDeleteOperation(DeleteOperation):
"""Operation to delete an entity.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
entity_id (Union[str, ObjectId]): Entity id that will be removed.
"""
def __init__(self, project_name, entity_type, entity_id, session):
self._session = session
if entity_type == "asset":
entity_type = "folder"
elif entity_type == "hero_version":
entity_type = "version"
elif entity_type == "subset":
entity_type = "product"
super(ServerDeleteOperation, self).__init__(
project_name, entity_type, entity_id
)
@property
def con(self):
return self.session.con
@property
def session(self):
return self._session
def to_server_operation(self):
return {
"id": self.id,
"type": self.operation_name,
"entityId": self.entity_id,
"entityType": self.entity_type,
}
class OperationsSession(BaseOperationsSession):
def __init__(self, con=None, *args, **kwargs):
super(OperationsSession, self).__init__(*args, **kwargs)
if con is None:
con = get_ayon_server_api_connection()
self._con = con
self._project_cache = {}
self._nested_operations = collections.defaultdict(list)
@property
def con(self):
return self._con
def get_project(self, project_name):
if project_name not in self._project_cache:
self._project_cache[project_name] = self.con.get_project(
project_name)
return copy.deepcopy(self._project_cache[project_name])
def commit(self):
"""Commit session operations."""
operations, self._operations = self._operations, []
if not operations:
return
operations_by_project = collections.defaultdict(list)
for operation in operations:
operations_by_project[operation.project_name].append(operation)
body_by_id = {}
results = []
for project_name, operations in operations_by_project.items():
operations_body = []
for operation in operations:
body = operation.to_server_operation()
if body is not None:
try:
json.dumps(body)
except:
raise ValueError("Couldn't json parse body: {}".format(
json.dumps(
body, indent=4, default=failed_json_default
)
))
body_by_id[operation.id] = body
operations_body.append(body)
if operations_body:
result = self._con.post(
"projects/{}/operations".format(project_name),
operations=operations_body,
canFail=False
)
results.append(result.data)
for result in results:
if result.get("success"):
continue
if "operations" not in result:
raise FailedOperations(
"Operation failed. Content: {}".format(str(result))
)
for op_result in result["operations"]:
if not op_result["success"]:
operation_id = op_result["id"]
raise FailedOperations((
"Operation \"{}\" failed with data:\n{}\nError: {}."
).format(
operation_id,
json.dumps(body_by_id[operation_id], indent=4),
op_result.get("error", "unknown"),
))
def create_entity(self, project_name, entity_type, data, nested_id=None):
"""Fast access to 'ServerCreateOperation'.
Args:
project_name (str): On which project the creation happens.
entity_type (str): Which entity type will be created.
data (Dicst[str, Any]): Entity data.
nested_id (str): Id of other operation from which is triggered
operation -> Operations can trigger suboperations but they
must be added to operations list after it's parent is added.
Returns:
ServerCreateOperation: Object of update operation.
"""
operation = ServerCreateOperation(
project_name, entity_type, data, self
)
if nested_id:
self._nested_operations[nested_id].append(operation)
else:
self.add(operation)
if operation.id in self._nested_operations:
self.extend(self._nested_operations.pop(operation.id))
return operation
def update_entity(
self, project_name, entity_type, entity_id, update_data, nested_id=None
):
"""Fast access to 'ServerUpdateOperation'.
Returns:
ServerUpdateOperation: Object of update operation.
"""
operation = ServerUpdateOperation(
project_name, entity_type, entity_id, update_data, self
)
if nested_id:
self._nested_operations[nested_id].append(operation)
else:
self.add(operation)
if operation.id in self._nested_operations:
self.extend(self._nested_operations.pop(operation.id))
return operation
def delete_entity(
self, project_name, entity_type, entity_id, nested_id=None
):
"""Fast access to 'ServerDeleteOperation'.
Returns:
ServerDeleteOperation: Object of delete operation.
"""
operation = ServerDeleteOperation(
project_name, entity_type, entity_id, self
)
if nested_id:
self._nested_operations[nested_id].append(operation)
else:
self.add(operation)
if operation.id in self._nested_operations:
self.extend(self._nested_operations.pop(operation.id))
return operation
def create_project(
project_name,
project_code,
library_project=False,
preset_name=None,
con=None
):
"""Create project using OpenPype settings.
This project creation function is not validating project document on
creation. It is because project document is created blindly with only
minimum required information about project which is it's name, code, type
and schema.
Entered project name must be unique and project must not exist yet.
Note:
This function is here to be OP v4 ready but in v3 has more logic
to do. That's why inner imports are in the body.
Args:
project_name (str): New project name. Should be unique.
project_code (str): Project's code should be unique too.
library_project (bool): Project is library project.
preset_name (str): Name of anatomy preset. Default is used if not
passed.
con (ServerAPI): Connection to server with logged user.
Raises:
ValueError: When project name already exists in MongoDB.
Returns:
dict: Created project document.
"""
if con is None:
con = get_ayon_server_api_connection()
return con.create_project(
project_name,
project_code,
library_project,
preset_name
)
def delete_project(project_name, con=None):
if con is None:
con = get_ayon_server_api_connection()
return con.delete_project(project_name)
def create_thumbnail(project_name, src_filepath, thumbnail_id=None, con=None):
if con is None:
con = get_ayon_server_api_connection()
return con.create_thumbnail(project_name, src_filepath, thumbnail_id)

View file

@ -1,289 +0,0 @@
import uuid
import copy
from abc import ABCMeta, abstractmethod, abstractproperty
import six
REMOVED_VALUE = object()
@six.add_metaclass(ABCMeta)
class AbstractOperation(object):
"""Base operation class.
Operation represent a call into database. The call can create, change or
remove data.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
"""
def __init__(self, project_name, entity_type):
self._project_name = project_name
self._entity_type = entity_type
self._id = str(uuid.uuid4())
@property
def project_name(self):
return self._project_name
@property
def id(self):
"""Identifier of operation."""
return self._id
@property
def entity_type(self):
return self._entity_type
@abstractproperty
def operation_name(self):
"""Stringified type of operation."""
pass
def to_data(self):
"""Convert operation to data that can be converted to json or others.
Warning:
Current state returns ObjectId objects which cannot be parsed by
json.
Returns:
Dict[str, Any]: Description of operation.
"""
return {
"id": self._id,
"entity_type": self.entity_type,
"project_name": self.project_name,
"operation": self.operation_name
}
class CreateOperation(AbstractOperation):
"""Operation to create an entity.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
data (Dict[str, Any]): Data of entity that will be created.
"""
operation_name = "create"
def __init__(self, project_name, entity_type, data):
super(CreateOperation, self).__init__(project_name, entity_type)
if not data:
data = {}
else:
data = copy.deepcopy(dict(data))
self._data = data
def __setitem__(self, key, value):
self.set_value(key, value)
def __getitem__(self, key):
return self.data[key]
def set_value(self, key, value):
self.data[key] = value
def get(self, key, *args, **kwargs):
return self.data.get(key, *args, **kwargs)
@abstractproperty
def entity_id(self):
pass
@property
def data(self):
return self._data
def to_data(self):
output = super(CreateOperation, self).to_data()
output["data"] = copy.deepcopy(self.data)
return output
class UpdateOperation(AbstractOperation):
"""Operation to update an entity.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
entity_id (Union[str, ObjectId]): Identifier of an entity.
update_data (Dict[str, Any]): Key -> value changes that will be set in
database. If value is set to 'REMOVED_VALUE' the key will be
removed. Only first level of dictionary is checked (on purpose).
"""
operation_name = "update"
def __init__(self, project_name, entity_type, entity_id, update_data):
super(UpdateOperation, self).__init__(project_name, entity_type)
self._entity_id = entity_id
self._update_data = update_data
@property
def entity_id(self):
return self._entity_id
@property
def update_data(self):
return self._update_data
def to_data(self):
changes = {}
for key, value in self._update_data.items():
if value is REMOVED_VALUE:
value = None
changes[key] = value
output = super(UpdateOperation, self).to_data()
output.update({
"entity_id": self.entity_id,
"changes": changes
})
return output
class DeleteOperation(AbstractOperation):
"""Operation to delete an entity.
Args:
project_name (str): On which project operation will happen.
entity_type (str): Type of entity on which change happens.
e.g. 'asset', 'representation' etc.
entity_id (Union[str, ObjectId]): Entity id that will be removed.
"""
operation_name = "delete"
def __init__(self, project_name, entity_type, entity_id):
super(DeleteOperation, self).__init__(project_name, entity_type)
self._entity_id = entity_id
@property
def entity_id(self):
return self._entity_id
def to_data(self):
output = super(DeleteOperation, self).to_data()
output["entity_id"] = self.entity_id
return output
class BaseOperationsSession(object):
"""Session storing operations that should happen in an order.
At this moment does not handle anything special can be considered as
stupid list of operations that will happen after each other. If creation
of same entity is there multiple times it's handled in any way and document
values are not validated.
"""
def __init__(self):
self._operations = []
def __len__(self):
return len(self._operations)
def add(self, operation):
"""Add operation to be processed.
Args:
operation (BaseOperation): Operation that should be processed.
"""
if not isinstance(
operation,
(CreateOperation, UpdateOperation, DeleteOperation)
):
raise TypeError("Expected Operation object got {}".format(
str(type(operation))
))
self._operations.append(operation)
def append(self, operation):
"""Add operation to be processed.
Args:
operation (BaseOperation): Operation that should be processed.
"""
self.add(operation)
def extend(self, operations):
"""Add operations to be processed.
Args:
operations (List[BaseOperation]): Operations that should be
processed.
"""
for operation in operations:
self.add(operation)
def remove(self, operation):
"""Remove operation."""
self._operations.remove(operation)
def clear(self):
"""Clear all registered operations."""
self._operations = []
def to_data(self):
return [
operation.to_data()
for operation in self._operations
]
@abstractmethod
def commit(self):
"""Commit session operations."""
pass
def create_entity(self, project_name, entity_type, data):
"""Fast access to 'CreateOperation'.
Returns:
CreateOperation: Object of update operation.
"""
operation = CreateOperation(project_name, entity_type, data)
self.add(operation)
return operation
def update_entity(self, project_name, entity_type, entity_id, update_data):
"""Fast access to 'UpdateOperation'.
Returns:
UpdateOperation: Object of update operation.
"""
operation = UpdateOperation(
project_name, entity_type, entity_id, update_data
)
self.add(operation)
return operation
def delete_entity(self, project_name, entity_type, entity_id):
"""Fast access to 'DeleteOperation'.
Returns:
DeleteOperation: Object of delete operation.
"""
operation = DeleteOperation(project_name, entity_type, entity_id)
self.add(operation)
return operation

View file

@ -1,134 +1,9 @@
import os
import uuid
import ayon_api
from ayon_core.client.operations_base import REMOVED_VALUE
class _GlobalCache:
initialized = False
def get_ayon_server_api_connection():
if _GlobalCache.initialized:
con = ayon_api.get_server_api_connection()
else:
from ayon_core.lib.local_settings import get_local_site_id
_GlobalCache.initialized = True
site_id = get_local_site_id()
version = os.getenv("AYON_VERSION")
if ayon_api.is_connection_created():
con = ayon_api.get_server_api_connection()
con.set_site_id(site_id)
con.set_client_version(version)
else:
con = ayon_api.create_connection(site_id, version)
return con
def create_entity_id():
return uuid.uuid1().hex
def prepare_attribute_changes(old_entity, new_entity, replace=False):
"""Prepare changes of attributes on entities.
Compare 'attrib' of old and new entity data to prepare only changed
values that should be sent to server for update.
Example:
>>> # Limited entity data to 'attrib'
>>> old_entity = {
... "attrib": {"attr_1": 1, "attr_2": "MyString", "attr_3": True}
... }
>>> new_entity = {
... "attrib": {"attr_1": 2, "attr_3": True, "attr_4": 3}
... }
>>> # Changes if replacement should not happen
>>> expected_changes = {
... "attr_1": 2,
... "attr_4": 3
... }
>>> changes = prepare_attribute_changes(old_entity, new_entity)
>>> changes == expected_changes
True
>>> # Changes if replacement should happen
>>> expected_changes_replace = {
... "attr_1": 2,
... "attr_2": REMOVED_VALUE,
... "attr_4": 3
... }
>>> changes_replace = prepare_attribute_changes(
... old_entity, new_entity, True)
>>> changes_replace == expected_changes_replace
True
Args:
old_entity (dict[str, Any]): Data of entity queried from server.
new_entity (dict[str, Any]): Entity data with applied changes.
replace (bool): New entity should fully replace all old entity values.
Returns:
Dict[str, Any]: Values from new entity only if value has changed.
"""
attrib_changes = {}
new_attrib = new_entity.get("attrib")
old_attrib = old_entity.get("attrib")
if new_attrib is None:
if not replace:
return attrib_changes
new_attrib = {}
if old_attrib is None:
return new_attrib
for attr, new_attr_value in new_attrib.items():
old_attr_value = old_attrib.get(attr)
if old_attr_value != new_attr_value:
attrib_changes[attr] = new_attr_value
if replace:
for attr in old_attrib:
if attr not in new_attrib:
attrib_changes[attr] = REMOVED_VALUE
return attrib_changes
def prepare_entity_changes(old_entity, new_entity, replace=False):
"""Prepare changes of AYON entities.
Compare old and new entity to filter values from new data that changed.
Args:
old_entity (dict[str, Any]): Data of entity queried from server.
new_entity (dict[str, Any]): Entity data with applied changes.
replace (bool): All attributes should be replaced by new values. So
all attribute values that are not on new entity will be removed.
Returns:
Dict[str, Any]: Only values from new entity that changed.
"""
changes = {}
for key, new_value in new_entity.items():
if key == "attrib":
continue
old_value = old_entity.get(key)
if old_value != new_value:
changes[key] = new_value
if replace:
for key in old_entity:
if key not in new_entity:
changes[key] = REMOVED_VALUE
attr_changes = prepare_attribute_changes(old_entity, new_entity, replace)
if attr_changes:
changes["attrib"] = attr_changes
return changes
return ayon_api.get_server_api_connection()