From 4762ff30e30cfe9b797178c15fa618709fe584c8 Mon Sep 17 00:00:00 2001 From: Jakub Trllo Date: Thu, 5 Jan 2023 15:42:39 +0100 Subject: [PATCH] implemented rest of integration logic --- .../push_to_project/control_integrate.py | 1286 ++++++++++------- 1 file changed, 736 insertions(+), 550 deletions(-) diff --git a/openpype/tools/push_to_project/control_integrate.py b/openpype/tools/push_to_project/control_integrate.py index 36e78bfdc6..97de03ee1f 100644 --- a/openpype/tools/push_to_project/control_integrate.py +++ b/openpype/tools/push_to_project/control_integrate.py @@ -3,9 +3,13 @@ import re import copy import socket import itertools +import datetime + +from bson.objectid import ObjectId from openpype.client import ( get_project, + get_assets, get_asset_by_id, get_subset_by_id, get_subset_by_name, @@ -13,19 +17,25 @@ from openpype.client import ( get_last_version_by_subset_id, get_version_by_name, get_representations, - get_representation_by_name, ) from openpype.client.operations import ( OperationsSession, + new_asset_document, new_subset_document, new_version_doc, + new_representation_doc, prepare_version_update_data, + prepare_representation_update_data, ) +from openpype.modules import ModulesManager from openpype.lib import ( StringTemplate, get_openpype_username, get_formatted_current_time, + source_hash, ) + +from openpype.lib.file_transaction import FileTransaction from openpype.settings import get_project_settings from openpype.pipeline import Anatomy from openpype.pipeline.template_data import get_template_data @@ -35,10 +45,44 @@ from openpype.pipeline.create import get_subset_name UNKNOWN = object() -class RepublishError(Exception): +class PushToProjectError(Exception): pass +class FileItem(object): + def __init__(self, path): + self.path = path + + @property + def is_valid_file(self): + return os.path.exists(self.path) and os.path.isfile(self.path) + + +class SourceFile(FileItem): + def __init__(self, path, frame=None, udim=None): + super(SourceFile, self).__init__(path) + self.frame = frame + self.udim = udim + + def __repr__(self): + subparts = [self.__class__.__name__] + if self.frame is not None: + subparts.append("frame: {}".format(self.frame)) + if self.udim is not None: + subparts.append("UDIM: {}".format(self.udim)) + + return "<{}> '{}'".format(" - ".join(subparts), self.path) + + +class ResourceFile(FileItem): + def __init__(self, path, relative_path): + super(ResourceFile, self).__init__(path) + self.relative_path = relative_path + + def __repr__(self): + return "<{}> '{}'".format(self.__class__.__name__, self.relative_path) + + class ProjectPushItem: def __init__( self, @@ -47,6 +91,9 @@ class ProjectPushItem: dst_project_name, dst_asset_id, dst_task_name, + variant, + comment=None, + new_asset_name=None, dst_version=None ): self.src_project_name = src_project_name @@ -55,12 +102,17 @@ class ProjectPushItem: self.dst_asset_id = dst_asset_id self.dst_task_name = dst_task_name self.dst_version = dst_version + self.variant = variant + self.new_asset_name = new_asset_name + self.comment = comment or "" self._id = "|".join([ src_project_name, src_version_id, dst_project_name, - dst_asset_id, - dst_task_name + str(dst_asset_id), + str(new_asset_name), + str(dst_task_name), + str(dst_version) ]) @property @@ -68,13 +120,62 @@ class ProjectPushItem: return self._id def __repr__(self): - return "{} - {} -> {}/{}/{}".format( - self.src_project_name, - self.src_version_id, - self.dst_project_name, - self.dst_asset_id, - self.dst_task_name - ) + return "<{} - {}>".format(self.__class__.__name__, self.id) + + +class ProjectPushItemStatus: + def __init__( + self, failed=False, finished=False, error=None, messages=None + ): + if messages is None: + messages = [] + self._failed = failed + self._finished = finished + self._error = error + self._progress_messages = messages + self._last_message = None + + def get_failed(self): + return self._failed + + def set_failed(self, failed): + if failed == self._failed: + return + self._failed = failed + + def get_finished(self): + return self._finished + + def set_finished(self, finished): + if finished == self._finished: + return + self._finished = finished + + def get_error(self): + return self._error + + def set_error(self, error, failed=None): + if error == self._error: + return + self._error = error + if failed is None: + failed = error is not None + + if failed: + self.failed = failed + + failed = property(get_failed, set_failed) + finished = property(get_finished, set_finished) + error = property(get_error, set_error) + + def add_progress_message(self, message): + self._progress_messages.append(message) + self._last_message = message + print(message) + + @property + def last_message(self): + return self._last_message class ProjectPushRepreItem: @@ -90,6 +191,7 @@ class ProjectPushRepreItem: self._roots = roots self._src_files = None self._resource_files = None + self._frame = UNKNOWN @property def repre_doc(self): @@ -107,6 +209,20 @@ class ProjectPushRepreItem: self.get_source_files() return self._resource_files + @property + def frame(self): + if self._frame is UNKNOWN: + frame = None + for src_file in self.src_files: + src_frame = src_file.frame + if ( + src_frame is not None + and (frame is None or src_frame < frame) + ): + frame = src_frame + self._frame = frame + return self._frame + @staticmethod def validate_source_files(src_files, resource_files): if not src_files: @@ -233,623 +349,693 @@ class ProjectPushItemProcess: item (ProjectPushItem): Item which is being processed. """ - # TODO how to define 'variant'? - ask user - variant = "Main" # TODO where to get host?!!! host_name = "republisher" def __init__(self, item): self._item = item - self._src_project_doc = UNKNOWN - self._src_asset_doc = UNKNOWN - self._src_subset_doc = UNKNOWN - self._src_version_doc = UNKNOWN - self._src_repre_items = UNKNOWN + + self._src_project_doc = None + self._src_asset_doc = None + self._src_subset_doc = None + self._src_version_doc = None + self._src_repre_items = None self._src_anatomy = None - self._project_doc = UNKNOWN + self._project_doc = None self._anatomy = None - self._asset_doc = UNKNOWN - self._task_info = UNKNOWN + self._asset_doc = None + self._created_asset_doc = None + self._task_info = None self._subset_doc = None self._version_doc = None - self._family = UNKNOWN - self._subset_name = UNKNOWN + self._family = None + self._subset_name = None - self._project_settings = UNKNOWN - self._template_name = UNKNOWN + self._project_settings = None + self._template_name = None - self._src_files = UNKNOWN - self._src_resource_files = UNKNOWN + self._status = ProjectPushItemStatus() + self._operations = OperationsSession() + self._file_transaction = FileTransaction() - def get_src_project_doc(self): - if self._src_project_doc is UNKNOWN: - self._src_project_doc = get_project(self._item.src_project_name) + @property + def src_project_doc(self): return self._src_project_doc - def get_src_anatomy(self): - if self._src_anatomy is None: - self._src_anatomy = Anatomy(self._item.src_project_name) + @property + def src_anatomy(self): return self._src_anatomy - def get_src_asset_doc(self): - if self._src_asset_doc is UNKNOWN: - asset_doc = None - subset_doc = self.get_src_subset_doc() - if subset_doc: - asset_doc = get_asset_by_id( - self._item.src_project_name, - subset_doc["parent"] - ) - self._src_asset_doc = asset_doc + @property + def src_asset_doc(self): return self._src_asset_doc - def get_src_subset_doc(self): - if self._src_subset_doc is UNKNOWN: - version_doc = self.get_src_version_doc() - subset_doc = None - if version_doc: - subset_doc = get_subset_by_id( - self._item.src_project_name, - version_doc["parent"] - ) - self._src_subset_doc = subset_doc + @property + def src_subset_doc(self): return self._src_subset_doc - def get_src_version_doc(self): - if self._src_version_doc is UNKNOWN: - self._src_version_doc = get_version_by_id( - self._item.src_project_name, self._item.src_version_id - ) + @property + def src_version_doc(self): return self._src_version_doc - def get_src_repre_items(self): - if self._src_repre_items is UNKNOWN: - repre_items = None - version_doc = self.get_src_version_doc() - if version_doc: - repre_docs = get_representations( - self._item.src_project_name, - version_ids=[version_doc["_id"]] - ) - repre_items = [ - ProjectPushRepreItem(repre_doc, self.src_anatomy.roots) - for repre_doc in repre_docs - ] - self._src_repre_items = repre_items + @property + def src_repre_items(self): return self._src_repre_items - src_project_doc = property(get_src_project_doc) - src_anatomy = property(get_src_anatomy) - src_asset_doc = property(get_src_asset_doc) - src_subset_doc = property(get_src_subset_doc) - src_version_doc = property(get_src_version_doc) - src_repre_items = property(get_src_repre_items) - - def get_project_doc(self): - if self._project_doc is UNKNOWN: - self._project_doc = get_project(self._item.dst_project_name) + @property + def project_doc(self): return self._project_doc - def get_anatomy(self): - if self._anatomy is None: - self._anatomy = Anatomy(self._item.dst_project_name) + @property + def anatomy(self): return self._anatomy - def get_asset_doc(self): - if self._asset_doc is UNKNOWN: - self._asset_doc = get_asset_by_id( - self._item.dst_project_name, self._item.dst_asset_id - ) - return self._asset_doc - - def get_task_info(self): - if self._task_info is UNKNOWN: - task_name = self._item.dst_task_name - if not task_name: - self._task_info = {} - return self._task_info - - project_doc = self.get_project_doc() - asset_doc = self.get_asset_doc() - if not project_doc or not asset_doc: - self._task_info = None - return self._task_info - - asset_tasks = asset_doc.get("data", {}).get("tasks") or {} - task_info = asset_tasks.get(task_name) - if not task_info: - self._task_info = None - return self._task_info - - # Create copy of task info to avoid changing data in asset document - task_info = copy.deepcopy(task_info) - task_info["name"] = task_name - # Fill rest of task information based on task type - task_type = task_info["type"] - task_type_info = project_doc["config"]["tasks"].get(task_type, {}) - task_info.update(task_type_info) - self._task_info = task_info - - return self._task_info - - def get_subset_doc(self): - return self._subset_doc - - def set_subset_doc(self, subset_doc): - self._subset_doc = subset_doc - - def get_version_doc(self): - return self._version_doc - - def set_version_doc(self, version_doc): - self._version_doc = version_doc - - project_doc = property(get_project_doc) - anatomy = property(get_anatomy) - asset_doc = property(get_asset_doc) - task_info = property(get_task_info) - subset_doc = property(get_subset_doc) - version_doc = property(get_version_doc, set_version_doc) - - def get_project_settings(self): - if self._project_settings is UNKNOWN: - self._project_settings = get_project_settings( - self._item.dst_project_name - ) + @property + def project_settings(self): return self._project_settings - project_settings = property(get_project_settings) + @property + def asset_doc(self): + return self._asset_doc + + @property + def task_info(self): + return self._task_info + + @property + def subset_doc(self): + return self._subset_doc + + @property + def version_doc(self): + return self._version_doc + + @property + def variant(self): + return self._item.variant @property def family(self): - if self._family is UNKNOWN: - family = None - subset_doc = self.src_subset_doc - if subset_doc: - family = subset_doc["data"].get("family") - families = subset_doc["data"].get("families") - if not family and families: - family = families[0] - self._family = family return self._family @property def subset_name(self): - if self._subset_name is UNKNOWN: - subset_name = None - family = self.family - asset_doc = self.asset_doc - task_info = self.task_info - if family and asset_doc and task_info: - subset_name = get_subset_name( - family, - self.variant, - task_info["name"], - asset_doc, - project_name=self._item.dst_project_name, - host_name=self.host_name, - project_settings=self.project_settings - ) - self._subset_name = subset_name return self._subset_name @property def template_name(self): - if self._template_name is UNKNOWN: - task_info = self.task_info - family = self.family - template_name = None - if family and task_info: - template_name = get_publish_template_name( - self._item.dst_project_name, - self.host_name, - self.family, - task_info["name"], - task_info["type"], - project_settings=self.project_settings - ) - self._template_name = template_name return self._template_name + def fill_source_variables(self): + src_project_name = self._item.src_project_name + src_version_id = self._item.src_version_id -class ProjectPushItemStatus: - def __init__( - self, - item, - failed=False, - finished=False, - error=None - ): - self._item = item - self._failed = failed - self._finished = finished - self._error = error - self._progress_messages = [] - self._last_message = None - - def get_failed(self): - return self._failed - - def set_failed(self, failed): - if failed == self._failed: - return - self._failed = failed - - def get_finished(self): - return self._finished - - def set_finished(self, finished): - if finished == self._finished: - return - self._finished = finished - - def get_error(self): - return self._error - - def set_error(self, error, failed=None): - if error == self._error: - return - self._error = error - if failed is None: - failed = error is not None - - if failed: - self.failed = failed - - failed = property(get_failed, set_failed) - finished = property(get_finished, set_finished) - error = property(get_error, set_error) - - def add_progress_message(self, message): - self._progress_messages.append(message) - self._last_message = message - print(message) - - @property - def last_message(self): - return self._last_message - - -class RepublisherController: - def __init__(self): - self._items = {} - - def add_item(self, item): - if item.id in self._items: - raise RepublishError(f"Item is already in queue {item}") - self._items[item.id] = item - - def remote_item(self, item_id): - self._items.pop(item_id, None) - - def get_items(self): - return dict(self._items) - - -class FileItem(object): - def __init__(self, path): - self.path = path - - @property - def is_valid_file(self): - return os.path.exists(self.path) and os.path.isfile(self.path) - - -class SourceFile(FileItem): - def __init__(self, path, frame=None, udim=None): - super(SourceFile, self).__init__(path) - self.frame = frame - self.udim = udim - - def __repr__(self): - subparts = [self.__class__.__name__] - if self.frame is not None: - subparts.append("frame: {}".format(self.frame)) - if self.udim is not None: - subparts.append("UDIM: {}".format(self.udim)) - - return "<{}> '{}'".format(" - ".join(subparts), self.path) - - -class ResourceFile(FileItem): - def __init__(self, path, relative_path): - super(ResourceFile, self).__init__(path) - self.relative_path = relative_path - - def __repr__(self): - return "<{}> '{}'".format(self.__class__.__name__, self.relative_path) - - -def _make_sure_subset_exists(item_process, project_name, operations): - dst_asset_doc = item_process.asset_doc - subset_name = item_process.subset_name - family = item_process.family - asset_id = dst_asset_doc["_id"] - subset_doc = get_subset_by_name(project_name, subset_name, asset_id) - if subset_doc: - return subset_doc - - data = { - "families": [family] - } - subset_doc = new_subset_document( - subset_name, family, asset_id, data - ) - operations.create_entity(project_name, "subset", subset_doc) - item_process.set_subset_doc(subset_doc) - - -def _make_sure_version_exists( - item_process, - project_name, - version, - operations -): - """Make sure version document exits in database. - - Args: - item_process (ProjectPushItemProcess): Item handling process. - project_name (str): Name of project where version should live. - version (Union[int, None]): Number of version. Latest is used when - 'None' is passed. - operations (OperationsSession): Session which handler creation and - update of entities. - - Returns: - Tuple[Dict[str, Any], bool]: New version document and boolean if version - already existed in database. - """ - - src_version_doc = item_process.src_version_doc - subset_doc = item_process.subset_doc - subset_id = subset_doc["_id"] - src_data = src_version_doc["data"] - families = subset_doc["data"].get("families") - if not families: - families = [subset_doc["data"]["family"]] - - version_data = { - "families": list(families), - "fps": src_data.get("fps"), - "source": src_data.get("source"), - "machine": socket.gethostname(), - "comment": "", - "author": get_openpype_username(), - "time": get_formatted_current_time(), - } - if version is None: - last_version_doc = get_last_version_by_subset_id( - project_name, subset_id - ) - version = 1 - if last_version_doc: - version += int(last_version_doc["name"]) - - existing_version_doc = get_version_by_name( - project_name, version, subset_id - ) - # Update existing version - if existing_version_doc: - version_doc = new_version_doc( - version, subset_id, version_data, existing_version_doc["_id"] - ) - update_data = prepare_version_update_data( - existing_version_doc, version_doc - ) - if update_data: - operations.update_entity( - project_name, - "version", - existing_version_doc["_id"], - update_data + project_doc = get_project(src_project_name) + if not project_doc: + self._status.error = ( + f"Source project \"{src_project_name}\" was not found" ) - item_process.set_version_doc(version_doc) + raise PushToProjectError(self._status.error) - return - - if version is None: - last_version_doc = get_last_version_by_subset_id( - project_name, subset_id + self._status.add_progress_message( + f"Project '{src_project_name}' found" ) - version = 1 - if last_version_doc: - version += int(last_version_doc["name"]) - version_doc = new_version_doc( - version, subset_id, version_data - ) - operations.create_entity(project_name, "version", version_doc) + version_doc = get_version_by_id(src_project_name, src_version_id) + if not version_doc: + self._status.error = ( + f"Source version with id \"{src_version_id}\"" + f" was not found in project \"{src_project_name}\"" + ) + raise PushToProjectError(self._status.error) - item_process.set_version_doc(version_doc) + subset_id = version_doc["parent"] + subset_doc = get_subset_by_id(src_project_name, subset_id) + if not subset_doc: + self._status.error = ( + f"Could find subset with id \"{subset_id}\"" + f" in project \"{src_project_name}\"" + ) + raise PushToProjectError(self._status.error) + asset_id = subset_doc["parent"] + asset_doc = get_asset_by_id(src_project_name, asset_id) + if not asset_doc: + self._status.error = ( + f"Could find asset with id \"{asset_id}\"" + f" in project \"{src_project_name}\"" + ) + raise PushToProjectError(self._status.error) -def _integrate_representations(item, item_process, item_status, operations): - """ + anatomy = Anatomy(src_project_name) - Args: - item (ProjectPushItem): Item to be pushed to different project. - item_process (ProjectPushItemProcess): Process of push item. - """ - - version_id = item_process.version_doc["_id"] - repre_names = { - repre_item.repre_doc["name"] - for repre_item in item_process.src_repre_items - } - existing_repres = get_representations( - item.dst_project_name, - representation_names=repre_names, - version_ids=[version_id] - ) - existing_repres_by_name = { - repre_doc["name"] : repre_doc - for repre_doc in existing_repres - } - anatomy = item_process.anatomy - formatting_data = get_template_data( - item_process.project_doc, - item_process.asset_doc, - item.dst_task_name, - item_process.host_name - ) - - -def _republish_to(item, item_process, item_status): - """ - - Args: - item (ProjectPushItem): Item to process. - item_process (ProjectPushItemProcess): Item process information. - item_status (ProjectPushItemStatus): Item status information. - """ - - family = item_process.family - item_status.add_progress_message( - f"Republishing family '{family}' (Based on source subset)" - ) - - subset_name = item_process.subset_name - item_status.add_progress_message(f"Final subset name is '{subset_name}'") - - template_name = item_process.template_name - item_status.add_progress_message( - f"Using template '{template_name}' for integration" - ) - - repre_items = item_process.src_repre_items - file_count = sum( - len(repre_item.src_files) + len(repre_item.resource_files) - for repre_item in repre_items - ) - item_status.add_progress_message( - f"Representation has {file_count} files to integrate" - ) - - operations = OperationsSession() - item_status.add_progress_message( - f"Integration to {item.dst_project_name} begins." - ) - _make_sure_subset_exists( - item_process, - item.dst_project_name, - operations - ) - _make_sure_version_exists( - item_process, - item.dst_project_name, - item.dst_version, - operations - ) - _integrate_representations(item, item_process, item_status, operations) - - -def _process_item(item, item_process, item_status): - """ - - Args: - item (ProjectPushItem): Item defying the source and destination. - item_process (ProjectPushItemProcess): Process item. - item_status (ProjectPushItemStatus): Status of process item. - """ - - # Query all entities source and destination - # - all of them are required for processing to exist - # --- Source entities --- - # Project - we just need validation of existence - src_project_name = item.src_project_name - src_project_doc = item_process.get_src_project_doc() - if not src_project_doc: - item_status.error = ( - f"Source project '{src_project_name}' was not found" + repre_docs = get_representations( + src_project_name, + version_ids=[src_version_id] ) - return - item_status.add_progress_message(f"Project '{src_project_name}' found") + repre_items = [ + ProjectPushRepreItem(repre_doc, anatomy.roots) + for repre_doc in repre_docs + ] + self._status.add_progress_message(( + f"Found {len(repre_items)} representations on" + f" version {src_version_id} in project '{src_project_name}'" + )) - # Representation - contains information of source files and template data - repre_items = item_process.get_src_repre_items() - if not repre_items: - item_status.error = ( - f"Version {item.src_version_id} does not have any representations" + self._src_anatomy = anatomy + self._src_project_doc = project_doc + self._src_asset_doc = asset_doc + self._src_subset_doc = subset_doc + self._src_version_doc = version_doc + self._src_repre_items = repre_items + + def fill_destination_project(self): + # --- Destination entities --- + dst_project_name = self._item.dst_project_name + # Validate project existence + dst_project_doc = get_project(dst_project_name) + if not dst_project_doc: + self._status.error = ( + f"Destination project '{dst_project_name}' was not found" + ) + raise PushToProjectError(self._status.error) + + self._status.add_progress_message( + f"Destination project '{dst_project_name}' found" ) - return - - item_status.add_progress_message( - f"Found {len(repre_items)} representations on" - f" version {item.src_version_id} in project '{src_project_name}'" - ) - - # --- Destination entities --- - dst_project_name = item.dst_project_name - dst_asset_id = item.dst_asset_id - dst_task_name = item.dst_task_name - - # Validate project existence - dst_project_doc = item_process.get_project_doc() - if not dst_project_doc: - item_status.error = ( - f"Destination project '{dst_project_name}' was not found" + self._project_doc = dst_project_doc + self._anatomy = Anatomy(dst_project_name) + self._project_settings = get_project_settings( + self._item.dst_project_name ) - return - item_status.add_progress_message(f"Project '{dst_project_name}' found") - # Get asset document - if not item_process.asset_doc: - item_status.error = ( - f"Destination asset with id '{dst_asset_id}'" - f" was not found in project '{dst_project_name}'" + def _create_asset( + self, + src_asset_doc, + project_doc, + parent_asset_doc, + asset_name + ): + parent_id = None + parents = [] + tools = [] + if parent_asset_doc: + parent_id = parent_asset_doc["_id"] + parents = list(parent_asset_doc["data"]["parents"]) + parents.append(parent_asset_doc["name"]) + _tools = parent_asset_doc["data"].get("tools_env") + if _tools: + tools = list(_tools) + + asset_name_low = asset_name.lower() + other_asset_docs = get_assets( + project_doc["name"], parent_ids=[parent_id], fields=["name"] ) - return - item_status.add_progress_message(( - f"Asset with id '{dst_asset_id}'" - f" found in project '{dst_project_name}'" - )) + for other_asset_doc in other_asset_docs: + other_name = other_asset_doc["name"] + if other_name.lower() == asset_name_low: + self._status.add_progress_message( + f"Found already existing asset with name \"{other_name}\"" + f" which match requested name \"{asset_name}\"" + ) + return other_asset_doc - # Get task information from asset document - if not item_process.task_info: - item_status.error = ( - f"Destination task '{dst_task_name}'" - f" was not found on asset with id '{dst_asset_id}'" - f" in project '{dst_project_name}'" + data_keys = ( + "clipIn", + "clipOut", + "frameStart", + "frameEnd", + "handleStart", + "handleEnd", + "resolutionWidth", + "resolutionHeight", + "fps", + "pixelAspect", ) - return + asset_data = { + "visualParent": parent_id, + "parents": parents, + "tasks": {}, + "tools_env": tools + } + src_asset_data = src_asset_doc["data"] + for key in data_keys: + if key in src_asset_data: + asset_data[key] = src_asset_data[key] - item_status.add_progress_message(( - f"Task with name '{dst_task_name}'" - f" found on asset with id '{dst_asset_id}'" - f" in project '{dst_project_name}'" - )) + asset_doc = new_asset_document( + asset_name, + project_doc["_id"], + parent_id, + parents, + data=asset_data + ) + self._operations.create_entity( + project_doc["name"], + asset_doc["type"], + asset_doc + ) + self._status.add_progress_message( + f"Creating new asset with name \"{asset_name}\"" + ) + self._created_asset_doc = asset_doc + return asset_doc - _republish_to(item, item_process, item_status) + def fill_or_create_destination_asset(self): + dst_project_name = self._item.dst_project_name + dst_asset_id = self._item.dst_asset_id + dst_task_name = self._item.dst_task_name + new_asset_name = self._item.new_asset_name + if not dst_asset_id and not new_asset_name: + self._status.error = ( + "Push item does not have defined destination asset" + ) + raise PushToProjectError(self._status.error) + # Get asset document + parent_asset_doc = None + if dst_asset_id: + parent_asset_doc = get_asset_by_id( + self._item.dst_project_name, self._item.dst_asset_id + ) + if not parent_asset_doc: + self._status.error = ( + f"Could find asset with id \"{dst_asset_id}\"" + f" in project \"{dst_project_name}\"" + ) + raise PushToProjectError(self._status.error) -def fake_process(controller): - items = controller.get_items() - for item in items.values(): - item_process = ProjectPushItemProcess(item) - item_status = ProjectPushItemStatus(item) - _process_item(item, item_process, item_status) - if item_status.failed: - print("Process failed") + if not new_asset_name: + asset_doc = parent_asset_doc else: - print("Process Finished") + asset_doc = self._create_asset( + self.src_asset_doc, + self.project_doc, + parent_asset_doc, + new_asset_name + ) + self._asset_doc = asset_doc + if not dst_task_name: + self._task_info = {} + return + + asset_path_parts = list(asset_doc["data"]["parents"]) + asset_path_parts.append(asset_doc["name"]) + asset_path = "/".join(asset_path_parts) + asset_tasks = asset_doc.get("data", {}).get("tasks") or {} + task_info = asset_tasks.get(dst_task_name) + if not task_info: + self._status.error = ( + f"Could find task with name \"{dst_task_name}\"" + f" on asset \"{asset_path}\"" + f" in project \"{dst_project_name}\"" + ) + raise PushToProjectError(self._status.error) + + # Create copy of task info to avoid changing data in asset document + task_info = copy.deepcopy(task_info) + task_info["name"] = dst_task_name + # Fill rest of task information based on task type + task_type = task_info["type"] + task_type_info = self.project_doc["config"]["tasks"].get(task_type, {}) + task_info.update(task_type_info) + self._task_info = task_info + + def determine_family(self): + subset_doc = self.src_subset_doc + family = subset_doc["data"].get("family") + families = subset_doc["data"].get("families") + if not family and families: + family = families[0] + + if not family: + self._status.error = ( + "Couldn't figure out family from source subset" + ) + raise PushToProjectError(self._status.error) + + self._status.add_progress_message( + f"Publishing family is '{family}' (Based on source subset)" + ) + self._family = family + + def determine_publish_template_name(self): + template_name = get_publish_template_name( + self._item.dst_project_name, + self.host_name, + self.family, + self.task_info.get("name"), + self.task_info.get("type"), + project_settings=self.project_settings + ) + self._status.add_progress_message( + f"Using template '{template_name}' for integration" + ) + self._template_name = template_name + + def determine_subset_name(self): + family = self.family + asset_doc = self.asset_doc + task_info = self.task_info + subset_name = get_subset_name( + family, + self.variant, + task_info.get("name"), + asset_doc, + project_name=self._item.dst_project_name, + host_name=self.host_name, + project_settings=self.project_settings + ) + self._status.add_progress_message( + f"Push will be integrating to subet with name '{subset_name}'" + ) + self._subset_name = subset_name + + def make_sure_subset_exists(self): + project_name = self._item.dst_project_name + asset_id = self.asset_doc["_id"] + subset_name = self.subset_name + family = self.family + subset_doc = get_subset_by_name(project_name, subset_name, asset_id) + if subset_doc: + self._subset_doc = subset_doc + return subset_doc + + data = { + "families": [family] + } + subset_doc = new_subset_document( + subset_name, family, asset_id, data + ) + self._operations.create_entity(project_name, "subset", subset_doc) + self._subset_doc = subset_doc + + def make_sure_version_exists(self): + """Make sure version document exits in database. + + Args: + item_process (ProjectPushItemProcess): Item handling process. + project_name (str): Name of project where version should live. + version (Union[int, None]): Number of version. Latest is used when + 'None' is passed. + operations (OperationsSession): Session which handler creation and + update of entities. + + Returns: + Tuple[Dict[str, Any], bool]: New version document and boolean if version + already existed in database. + """ + + project_name = self._item.dst_project_name + version = self._item.dst_version + src_version_doc = self.src_version_doc + subset_doc = self.subset_doc + subset_id = subset_doc["_id"] + src_data = src_version_doc["data"] + families = subset_doc["data"].get("families") + if not families: + families = [subset_doc["data"]["family"]] + + version_data = { + "families": list(families), + "fps": src_data.get("fps"), + "source": src_data.get("source"), + "machine": socket.gethostname(), + "comment": "", + "author": get_openpype_username(), + "time": get_formatted_current_time(), + } + if version is None: + last_version_doc = get_last_version_by_subset_id( + project_name, subset_id + ) + version = 1 + if last_version_doc: + version += int(last_version_doc["name"]) + + existing_version_doc = get_version_by_name( + project_name, version, subset_id + ) + # Update existing version + if existing_version_doc: + version_doc = new_version_doc( + version, subset_id, version_data, existing_version_doc["_id"] + ) + update_data = prepare_version_update_data( + existing_version_doc, version_doc + ) + if update_data: + self._operations.update_entity( + project_name, + "version", + existing_version_doc["_id"], + update_data + ) + self._version_doc = version_doc + + return + + if version is None: + last_version_doc = get_last_version_by_subset_id( + project_name, subset_id + ) + version = 1 + if last_version_doc: + version += int(last_version_doc["name"]) + + version_doc = new_version_doc( + version, subset_id, version_data + ) + self._operations.create_entity(project_name, "version", version_doc) + + self._version_doc = version_doc + + def integrate_representations(self): + try: + self._integrate_representations() + except Exception: + self._operations.clear() + self._file_transaction.rollback() + raise + + def _integrate_representations(self): + version_doc = self.version_doc + version_id = version_doc["_id"] + existing_repres = get_representations( + self._item.dst_project_name, + version_ids=[version_id] + ) + existing_repres_by_low_name = { + repre_doc["name"].lower(): repre_doc + for repre_doc in existing_repres + } + template_name = self.template_name + anatomy = self.anatomy + formatting_data = get_template_data( + self.project_doc, + self.asset_doc, + self.task_info.get("name"), + self.host_name + ) + formatting_data.update({ + "subset": self.subset_name, + "family": self.family, + "version": version_doc["name"] + }) + + path_template = anatomy.templates[template_name]["path"].replace( + "\\", "/" + ) + file_template = StringTemplate( + anatomy.templates[template_name]["file"] + ) + processed_repre_items = self._prepare_file_transactions( + anatomy, template_name, formatting_data, file_template + ) + self._file_transaction.process() + self._prepare_database_operations( + version_id, + processed_repre_items, + path_template, + existing_repres_by_low_name + ) + self._operations.commit() + self._file_transaction.finalize() + + def _prepare_file_transactions( + self, anatomy, template_name, formatting_data, file_template + ): + processed_repre_items = [] + for repre_item in self.src_repre_items: + repre_doc = repre_item.repre_doc + repre_name = repre_doc["name"] + repre_format_data = copy.deepcopy(formatting_data) + repre_format_data["representation"] = repre_name + for src_file in repre_item.src_files: + ext = os.path.splitext(src_file.path)[-1] + repre_format_data["ext"] = ext[1:] + break + + tmp_result = anatomy.format(formatting_data) + folder_path = tmp_result[template_name]["folder"] + repre_context = folder_path.used_values + folder_path_rootless = folder_path.rootless + repre_filepaths = [] + published_path = None + for src_file in repre_item.src_files: + file_data = copy.deepcopy(repre_format_data) + frame = src_file.frame + if frame is not None: + file_data["frame"] = frame + + udim = src_file.udim + if udim is not None: + file_data["udim"] = udim + + filename = file_template.format_strict(file_data) + dst_filepath = os.path.normpath( + os.path.join(folder_path, filename) + ) + dst_rootless_path = os.path.normpath( + os.path.join(folder_path_rootless, filename) + ) + if published_path is None or frame == repre_item.frame: + published_path = dst_filepath + repre_context.update(filename.used_values) + + repre_filepaths.append((dst_filepath, dst_rootless_path)) + self._file_transaction.add(src_file.path, dst_filepath) + + for resource_file in repre_item.resource_files: + dst_filepath = os.path.normpath( + os.path.join(folder_path, resource_file.relative_path) + ) + dst_rootless_path = os.path.normpath( + os.path.join( + folder_path_rootless, resource_file.relative_path + ) + ) + repre_filepaths.append((dst_filepath, dst_rootless_path)) + self._file_transaction.add(resource_file.path, dst_filepath) + processed_repre_items.append( + (repre_item, repre_filepaths, repre_context, published_path) + ) + return processed_repre_items + + def _prepare_database_operations( + self, + version_id, + processed_repre_items, + path_template, + existing_repres_by_low_name + ): + modules_manager = ModulesManager() + sync_server_module = modules_manager.get("sync_server") + if sync_server_module is None or not sync_server_module.enabled: + sites = [{ + "name": "studio", + "created_dt": datetime.datetime.now() + }] + else: + sites = sync_server_module.compute_resource_sync_sites( + project_name=self._item.dst_project_name + ) + + added_repre_names = set() + for item in processed_repre_items: + (repre_item, repre_filepaths, repre_context, published_path) = item + repre_name = repre_item.repre_doc["name"] + added_repre_names.add(repre_name.lower()) + new_repre_data = { + "path": published_path, + "template": path_template + } + new_repre_files = [] + for (path, rootless_path) in repre_filepaths: + new_repre_files.append({ + "_id": ObjectId(), + "path": rootless_path, + "size": os.path.getsize(path), + "hash": source_hash(path), + "sites": sites + }) + + existing_repre = existing_repres_by_low_name.get( + repre_name.lower() + ) + entity_id = None + if existing_repre: + entity_id = existing_repre["_id"] + new_repre_doc = new_representation_doc( + repre_name, + version_id, + repre_context, + data=new_repre_data, + entity_id=entity_id + ) + new_repre_doc["files"] = new_repre_files + if not existing_repre: + self._operations.create_entity( + self._item.dst_project_name, + new_repre_doc["type"], + new_repre_doc + ) + else: + update_data = prepare_representation_update_data( + existing_repre, new_repre_doc + ) + if update_data: + self._operations.update_entity( + self._item.dst_project_name, + new_repre_doc["type"], + new_repre_doc["_id"], + update_data + ) + + existing_repre_names = set(existing_repres_by_low_name.keys()) + for repre_name in (existing_repre_names - added_repre_names): + repre_doc = existing_repres_by_low_name[repre_name] + self._operations.update_entity( + self._item.dst_project_name, + repre_doc["type"], + repre_doc["_id"], + {"type": "archived_representation"} + ) + + def process(self): + item_process.fill_source_variables() + item_process.fill_destination_project() + item_process.fill_or_create_destination_asset() + item_process.determine_family() + item_process.determine_publish_template_name() + item_process.determine_subset_name() + item_process.make_sure_subset_exists() + item_process.make_sure_version_exists() + item_process.integrate_representations() def main(): # NOTE For development purposes - controller = RepublisherController() project_name = "" - verssion_id = "" + version_id = "" dst_project_name = "" dst_asset_id = "" dst_task_name = "" - controller.add_item(ProjectPushItem( + version = None + variant = "" + comment = "" + + item = ProjectPushItem( project_name, version_id, dst_project_name, dst_asset_id, dst_task_name, + variant, + version, dst_version=1 - )) - fake_process(controller) \ No newline at end of file + ) + item_process = ProjectPushItemProcess(item) + item_process.process()