diff --git a/openpype/client/__init__.py b/openpype/client/__init__.py index 64a82334d9..d080425e3c 100644 --- a/openpype/client/__init__.py +++ b/openpype/client/__init__.py @@ -45,6 +45,13 @@ from .entities import ( get_workfile_info, ) +from .entity_links import ( + get_linked_asset_ids, + get_linked_assets, + get_linked_representation_ids, +) + + __all__ = ( "OpenPypeMongoConnection", @@ -88,4 +95,8 @@ __all__ = ( "get_thumbnail_id_from_source", "get_workfile_info", + + "get_linked_asset_ids", + "get_linked_assets", + "get_linked_representation_ids", ) diff --git a/openpype/client/entity_links.py b/openpype/client/entity_links.py new file mode 100644 index 0000000000..66214f469c --- /dev/null +++ b/openpype/client/entity_links.py @@ -0,0 +1,232 @@ +from .mongo import get_project_connection +from .entities import ( + get_assets, + get_asset_by_id, + get_representation_by_id, + convert_id, +) + + +def get_linked_asset_ids(project_name, asset_doc=None, asset_id=None): + """Extract linked asset ids from asset document. + + One of asset document or asset id must be passed. + + Note: + Asset links now works only from asset to assets. + + Args: + asset_doc (dict): Asset document from DB. + + Returns: + List[Union[ObjectId, str]]: Asset ids of input links. + """ + + output = [] + if not asset_doc and not asset_id: + return output + + if not asset_doc: + asset_doc = get_asset_by_id( + project_name, asset_id, fields=["data.inputLinks"] + ) + + input_links = asset_doc["data"].get("inputLinks") + if not input_links: + return output + + for item in input_links: + # Backwards compatibility for "_id" key which was replaced with + # "id" + if "_id" in item: + link_id = item["_id"] + else: + link_id = item["id"] + output.append(link_id) + return output + + +def get_linked_assets( + project_name, asset_doc=None, asset_id=None, fields=None +): + """Return linked assets based on passed asset document. + + One of asset document or asset id must be passed. + + Args: + project_name (str): Name of project where to look for queried entities. + asset_doc (Dict[str, Any]): Asset document from database. + asset_id (Union[ObjectId, str]): Asset id. Can be used instead of + asset document. + fields (Iterable[str]): Fields that should be returned. All fields are + returned if 'None' is passed. + + Returns: + List[Dict[str, Any]]: Asset documents of input links for passed + asset doc. + """ + + if not asset_doc: + if not asset_id: + return [] + asset_doc = get_asset_by_id( + project_name, + asset_id, + fields=["data.inputLinks"] + ) + if not asset_doc: + return [] + + link_ids = get_linked_asset_ids(project_name, asset_doc=asset_doc) + if not link_ids: + return [] + + return list(get_assets(project_name, asset_ids=link_ids, fields=fields)) + + +def get_linked_representation_id( + project_name, repre_doc=None, repre_id=None, link_type=None, max_depth=None +): + """Returns list of linked ids of particular type (if provided). + + One of representation document or representation id must be passed. + Note: + Representation links now works only from representation through version + back to representations. + + Args: + project_name (str): Name of project where look for links. + repre_doc (Dict[str, Any]): Representation document. + repre_id (Union[ObjectId, str]): Representation id. + link_type (str): Type of link (e.g. 'reference', ...). + max_depth (int): Limit recursion level. Default: 0 + + Returns: + List[ObjectId] Linked representation ids. + """ + + if repre_doc: + repre_id = repre_doc["_id"] + + if repre_id: + repre_id = convert_id(repre_id) + + if not repre_id and not repre_doc: + return [] + + version_id = None + if repre_doc: + version_id = repre_doc.get("parent") + + if not version_id: + repre_doc = get_representation_by_id( + project_name, repre_id, fields=["parent"] + ) + version_id = repre_doc["parent"] + + if not version_id: + return [] + + if max_depth is None: + max_depth = 0 + + match = { + "_id": version_id, + "type": {"$in": ["version", "hero_version"]} + } + + graph_lookup = { + "from": project_name, + "startWith": "$data.inputLinks.id", + "connectFromField": "data.inputLinks.id", + "connectToField": "_id", + "as": "outputs_recursive", + "depthField": "depth" + } + if max_depth != 0: + # We offset by -1 since 0 basically means no recursion + # but the recursion only happens after the initial lookup + # for outputs. + graph_lookup["maxDepth"] = max_depth - 1 + + query_pipeline = [ + # Match + {"$match": match}, + # Recursive graph lookup for inputs + {"$graphLookup": graph_lookup} + ] + + conn = get_project_connection(project_name) + result = conn.aggregate(query_pipeline) + referenced_version_ids = _process_referenced_pipeline_result( + result, link_type + ) + if not referenced_version_ids: + return [] + + ref_ids = conn.distinct( + "_id", + filter={ + "parent": {"$in": list(referenced_version_ids)}, + "type": "representation" + } + ) + + return list(ref_ids) + + +def _process_referenced_pipeline_result(result, link_type): + """Filters result from pipeline for particular link_type. + + Pipeline cannot use link_type directly in a query. + + Returns: + (list) + """ + + referenced_version_ids = set() + correctly_linked_ids = set() + for item in result: + input_links = item["data"].get("inputLinks") + if not input_links: + continue + + _filter_input_links( + input_links, + link_type, + correctly_linked_ids + ) + + # outputs_recursive in random order, sort by depth + outputs_recursive = item.get("outputs_recursive") + if not outputs_recursive: + continue + + for output in sorted(outputs_recursive, key=lambda o: o["depth"]): + output_links = output["data"].get("inputLinks") + if not output_links: + continue + + # Leaf + if output["_id"] not in correctly_linked_ids: + continue + + _filter_input_links( + output_links, + link_type, + correctly_linked_ids + ) + + referenced_version_ids.add(output["_id"]) + + return referenced_version_ids + + +def _filter_input_links(input_links, link_type, correctly_linked_ids): + for input_link in input_links: + if link_type and input_link["type"] != link_type: + continue + + link_id = input_link.get("id") or input_link.get("_id") + if link_id is not None: + correctly_linked_ids.add(link_id)