diff --git a/pype/modules/ftrack/actions/action_delivery.py b/pype/modules/ftrack/actions/action_delivery.py index 7dbb7c65e8..663a81aad4 100644 --- a/pype/modules/ftrack/actions/action_delivery.py +++ b/pype/modules/ftrack/actions/action_delivery.py @@ -1,5 +1,6 @@ import os import copy +import json import shutil import collections @@ -9,7 +10,7 @@ from bson.objectid import ObjectId from avalon import pipeline from avalon.vendor import filelink -from pype.api import Anatomy +from pype.api import Anatomy, config from pype.modules.ftrack.lib import BaseAction, statics_icon from pype.modules.ftrack.lib.avalon_sync import CustAttrIdKey from pype.modules.ftrack.lib.io_nonsingleton import DbConnector @@ -41,36 +42,22 @@ class Delivery(BaseAction): items = [] item_splitter = {"type": "label", "value": "---"} - # Prepare component names for processing - components = None - project = None - for entity in entities: - if project is None: - project_id = None - for ent_info in entity["link"]: - if ent_info["type"].lower() == "project": - project_id = ent_info["id"] - break + project_entity = self.get_project_from_entity(entities[0]) + project_name = project_entity["full_name"] + self.db_con.install() + self.db_con.Session["AVALON_PROJECT"] = project_name + project_doc = self.db_con.find_one({"type": "project"}) + if not project_doc: + return { + "success": False, + "message": ( + "Didn't found project \"{}\" in avalon." + ).format(project_name) + } - if project_id is None: - project = entity["asset"]["parent"]["project"] - else: - project = session.query(( - "select id, full_name from Project where id is \"{}\"" - ).format(project_id)).one() + repre_names = self._get_repre_names(entities) + self.db_con.uninstall() - _components = set( - [component["name"] for component in entity["components"]] - ) - if components is None: - components = _components - continue - - components = components.intersection(_components) - if not components: - break - - project_name = project["full_name"] items.append({ "type": "hidden", "name": "__project_name__", @@ -93,7 +80,7 @@ class Delivery(BaseAction): skipped = False # Add message if there are any common components - if not components or not new_anatomies: + if not repre_names or not new_anatomies: skipped = True items.append({ "type": "label", @@ -106,7 +93,7 @@ class Delivery(BaseAction): "value": skipped }) - if not components: + if not repre_names: if len(entities) == 1: items.append({ "type": "label", @@ -143,12 +130,12 @@ class Delivery(BaseAction): "type": "label" }) - for component in components: + for repre_name in repre_names: items.append({ "type": "boolean", "value": False, - "label": component, - "name": component + "label": repre_name, + "name": repre_name }) items.append(item_splitter) @@ -198,27 +185,233 @@ class Delivery(BaseAction): "title": title } + def _get_repre_names(self, entities): + version_ids = self._get_interest_version_ids(entities) + repre_docs = self.db_con.find({ + "type": "representation", + "parent": {"$in": version_ids} + }) + return list(sorted(repre_docs.distinct("name"))) + + def _get_interest_version_ids(self, entities): + parent_ent_by_id = {} + subset_names = set() + version_nums = set() + for entity in entities: + asset = entity["asset"] + parent = asset["parent"] + parent_ent_by_id[parent["id"]] = parent + + subset_name = asset["name"] + subset_names.add(subset_name) + + version = entity["version"] + version_nums.add(version) + + asset_docs_by_ftrack_id = self._get_asset_docs(parent_ent_by_id) + subset_docs = self._get_subset_docs( + asset_docs_by_ftrack_id, subset_names, entities + ) + version_docs = self._get_version_docs( + asset_docs_by_ftrack_id, subset_docs, version_nums, entities + ) + + return [version_doc["_id"] for version_doc in version_docs] + + def _get_version_docs( + self, asset_docs_by_ftrack_id, subset_docs, version_nums, entities + ): + subset_docs_by_id = { + subset_doc["_id"]: subset_doc + for subset_doc in subset_docs + } + version_docs = list(self.db_con.find({ + "type": "version", + "parent": {"$in": list(subset_docs_by_id.keys())}, + "name": {"$in": list(version_nums)} + })) + version_docs_by_parent_id = collections.defaultdict(dict) + for version_doc in version_docs: + subset_doc = subset_docs_by_id[version_doc["parent"]] + + asset_id = subset_doc["parent"] + subset_name = subset_doc["name"] + version = version_doc["name"] + if version_docs_by_parent_id[asset_id].get(subset_name) is None: + version_docs_by_parent_id[asset_id][subset_name] = {} + + version_docs_by_parent_id[asset_id][subset_name][version] = ( + version_doc + ) + + filtered_versions = [] + for entity in entities: + asset = entity["asset"] + + parent = asset["parent"] + asset_doc = asset_docs_by_ftrack_id[parent["id"]] + + subsets_by_name = version_docs_by_parent_id.get(asset_doc["_id"]) + if not subsets_by_name: + continue + + subset_name = asset["name"] + version_docs_by_version = subsets_by_name.get(subset_name) + if not version_docs_by_version: + continue + + version = entity["version"] + version_doc = version_docs_by_version.get(version) + if version_doc: + filtered_versions.append(version_doc) + return filtered_versions + + def _get_subset_docs( + self, asset_docs_by_ftrack_id, subset_names, entities + ): + asset_doc_ids = list() + for asset_doc in asset_docs_by_ftrack_id.values(): + asset_doc_ids.append(asset_doc["_id"]) + + subset_docs = list(self.db_con.find({ + "type": "subset", + "parent": {"$in": asset_doc_ids}, + "name": {"$in": list(subset_names)} + })) + subset_docs_by_parent_id = collections.defaultdict(dict) + for subset_doc in subset_docs: + asset_id = subset_doc["parent"] + subset_name = subset_doc["name"] + subset_docs_by_parent_id[asset_id][subset_name] = subset_doc + + filtered_subsets = [] + for entity in entities: + asset = entity["asset"] + + parent = asset["parent"] + asset_doc = asset_docs_by_ftrack_id[parent["id"]] + + subsets_by_name = subset_docs_by_parent_id.get(asset_doc["_id"]) + if not subsets_by_name: + continue + + subset_name = asset["name"] + subset_doc = subsets_by_name.get(subset_name) + if subset_doc: + filtered_subsets.append(subset_doc) + return filtered_subsets + + def _get_asset_docs(self, parent_ent_by_id): + asset_docs = list(self.db_con.find({ + "type": "asset", + "data.ftrackId": {"$in": list(parent_ent_by_id.keys())} + })) + asset_docs_by_ftrack_id = { + asset_doc["data"]["ftrackId"]: asset_doc + for asset_doc in asset_docs + } + + entities_by_mongo_id = {} + entities_by_names = {} + for ftrack_id, entity in parent_ent_by_id.items(): + if ftrack_id not in asset_docs_by_ftrack_id: + parent_mongo_id = entity["custom_attributes"].get( + CustAttrIdKey + ) + if parent_mongo_id: + entities_by_mongo_id[ObjectId(parent_mongo_id)] = entity + else: + entities_by_names[entity["name"]] = entity + + expressions = [] + if entities_by_mongo_id: + expression = { + "type": "asset", + "_id": {"$in": list(entities_by_mongo_id.keys())} + } + expressions.append(expression) + + if entities_by_names: + expression = { + "type": "asset", + "name": {"$in": list(entities_by_names.keys())} + } + expressions.append(expression) + + if expressions: + if len(expressions) == 1: + filter = expressions[0] + else: + filter = {"$or": expressions} + + asset_docs = self.db_con.find(filter) + for asset_doc in asset_docs: + if asset_doc["_id"] in entities_by_mongo_id: + entity = entities_by_mongo_id[asset_doc["_id"]] + asset_docs_by_ftrack_id[entity["id"]] = asset_doc + + elif asset_doc["name"] in entities_by_names: + entity = entities_by_names[asset_doc["name"]] + asset_docs_by_ftrack_id[entity["id"]] = asset_doc + + return asset_docs_by_ftrack_id + def launch(self, session, entities, event): if "values" not in event["data"]: return - self.report_items = collections.defaultdict(list) - values = event["data"]["values"] skipped = values.pop("__skipped__") if skipped: return None - component_names = [] + user_id = event["source"]["user"]["id"] + user_entity = session.query( + "User where id is {}".format(user_id) + ).one() + + job = session.create("Job", { + "user": user_entity, + "status": "running", + "data": json.dumps({ + "description": "Delivery processing." + }) + }) + session.commit() + + try: + self.db_con.install() + self.real_launch(session, entities, event) + job["status"] = "done" + + except Exception: + self.log.warning( + "Failed during processing delivery action.", + exc_info=True + ) + + finally: + if job["status"] != "done": + job["status"] = "failed" + session.commit() + self.db_con.uninstall() + + def real_launch(self, session, entities, event): + self.log.info("Delivery action just started.") + report_items = collections.defaultdict(list) + + values = event["data"]["values"] + location_path = values.pop("__location_path__") anatomy_name = values.pop("__new_anatomies__") project_name = values.pop("__project_name__") + repre_names = [] for key, value in values.items(): if value is True: - component_names.append(key) + repre_names.append(key) - if not component_names: + if not repre_names: return { "success": True, "message": "Not selected components to deliver." @@ -230,64 +423,15 @@ class Delivery(BaseAction): if not os.path.exists(location_path): os.makedirs(location_path) - self.db_con.install() self.db_con.Session["AVALON_PROJECT"] = project_name - repres_to_deliver = [] - for entity in entities: - asset = entity["asset"] - subset_name = asset["name"] - version = entity["version"] - - parent = asset["parent"] - parent_mongo_id = parent["custom_attributes"].get(CustAttrIdKey) - if parent_mongo_id: - parent_mongo_id = ObjectId(parent_mongo_id) - else: - asset_ent = self.db_con.find_one({ - "type": "asset", - "data.ftrackId": parent["id"] - }) - if not asset_ent: - ent_path = "/".join( - [ent["name"] for ent in parent["link"]] - ) - msg = "Not synchronized entities to avalon" - self.report_items[msg].append(ent_path) - self.log.warning("{} <{}>".format(msg, ent_path)) - continue - - parent_mongo_id = asset_ent["_id"] - - subset_ent = self.db_con.find_one({ - "type": "subset", - "parent": parent_mongo_id, - "name": subset_name - }) - - version_ent = self.db_con.find_one({ - "type": "version", - "name": version, - "parent": subset_ent["_id"] - }) - - repre_ents = self.db_con.find({ - "type": "representation", - "parent": version_ent["_id"] - }) - - repres_by_name = {} - for repre in repre_ents: - repre_name = repre["name"] - repres_by_name[repre_name] = repre - - for component in entity["components"]: - comp_name = component["name"] - if comp_name not in component_names: - continue - - repre = repres_by_name.get(comp_name) - repres_to_deliver.append(repre) + self.log.debug("Collecting representations to process.") + version_ids = self._get_interest_version_ids(entities) + repres_to_deliver = list(self.db_con.find({ + "type": "representation", + "parent": {"$in": version_ids}, + "name": {"$in": repre_names} + })) anatomy = Anatomy(project_name) @@ -304,9 +448,17 @@ class Delivery(BaseAction): for name in root_names: format_dict["root"][name] = location_path + datetime_data = config.get_datetime_data() for repre in repres_to_deliver: + source_path = repre.get("data", {}).get("path") + debug_msg = "Processing representation {}".format(repre["_id"]) + if source_path: + debug_msg += " with published path {}.".format(source_path) + self.log.debug(debug_msg) + # Get destination repre path anatomy_data = copy.deepcopy(repre["context"]) + anatomy_data.update(datetime_data) anatomy_filled = anatomy.format_all(anatomy_data) test_path = anatomy_filled["delivery"][anatomy_name] @@ -333,7 +485,7 @@ class Delivery(BaseAction): "- Invalid value DataType: \"{}\"
" ).format(str(repre["_id"]), keys) - self.report_items[msg].append(sub_msg) + report_items[msg].append(sub_msg) self.log.warning( "{} Representation: \"{}\" Filled: <{}>".format( msg, str(repre["_id"]), str(test_path) @@ -355,20 +507,19 @@ class Delivery(BaseAction): anatomy, anatomy_name, anatomy_data, - format_dict + format_dict, + report_items ) - if not frame: self.process_single_file(*args) else: self.process_sequence(*args) - self.db_con.uninstall() - - return self.report() + return self.report(report_items) def process_single_file( - self, repre_path, anatomy, anatomy_name, anatomy_data, format_dict + self, repre_path, anatomy, anatomy_name, anatomy_data, format_dict, + report_items ): anatomy_filled = anatomy.format(anatomy_data) if format_dict: @@ -384,7 +535,8 @@ class Delivery(BaseAction): self.copy_file(repre_path, delivery_path) def process_sequence( - self, repre_path, anatomy, anatomy_name, anatomy_data, format_dict + self, repre_path, anatomy, anatomy_name, anatomy_data, format_dict, + report_items ): dir_path, file_name = os.path.split(str(repre_path)) @@ -398,7 +550,7 @@ class Delivery(BaseAction): if not file_name_items: msg = "Source file was not found" - self.report_items[msg].append(repre_path) + report_items[msg].append(repre_path) self.log.warning("{} <{}>".format(msg, repre_path)) return @@ -418,7 +570,7 @@ class Delivery(BaseAction): if src_collection is None: # TODO log error! msg = "Source collection of files was not found" - self.report_items[msg].append(repre_path) + report_items[msg].append(repre_path) self.log.warning("{} <{}>".format(msg, repre_path)) return @@ -491,10 +643,10 @@ class Delivery(BaseAction): except OSError: shutil.copyfile(src_path, dst_path) - def report(self): + def report(self, report_items): items = [] title = "Delivery report" - for msg, _items in self.report_items.items(): + for msg, _items in report_items.items(): if not _items: continue