Extracted logic to methods

This commit is contained in:
Petr Kalis 2025-10-23 10:55:37 +02:00
parent 0d235ed8ca
commit 0ebbd0a232

View file

@ -482,6 +482,8 @@ class ProjectPushItemProcess:
self._log_info("Destination project was found")
self._fill_or_create_destination_folder()
self._log_info("Destination folder was determined")
self._fill_or_create_destination_task()
self._log_info("Destination task was determined")
self._determine_product_type()
self._determine_publish_template_name()
self._determine_product_name()
@ -730,7 +732,6 @@ class ProjectPushItemProcess:
def _fill_or_create_destination_folder(self):
dst_project_name = self._item.dst_project_name
dst_folder_id = self._item.dst_folder_id
dst_task_name = self._item.dst_task_name
new_folder_name = self._item.new_folder_name
if not dst_folder_id and not new_folder_name:
self._status.set_failed(
@ -761,12 +762,11 @@ class ProjectPushItemProcess:
new_folder_name
)
self._folder_entity = folder_entity
if not dst_task_name:
dst_task_name = self._make_sure_task_exists(folder_entity)
if not dst_task_name: # really no task selected nor on source
self._task_info = {}
return
def _fill_or_create_destination_task(self):
folder_entity = self._folder_entity
dst_task_name = self._item.dst_task_name
dst_project_name = self._item.dst_project_name
folder_path = folder_entity["path"]
folder_tasks = {
@ -775,6 +775,21 @@ class ProjectPushItemProcess:
dst_project_name, folder_ids=[folder_entity["id"]]
)
}
if not dst_task_name:
src_task_info = self._get_src_task_info()
if not src_task_info: # really no task selected nor on source
self._task_info = {}
return
dst_task_name = src_task_info["name"].lower()
if dst_task_name not in folder_tasks:
self._make_sure_task_exists(
folder_entity, src_task_info
)
task_info = copy.deepcopy(src_task_info)
folder_tasks[dst_task_name] = task_info
task_info = folder_tasks.get(dst_task_name.lower())
if not task_info:
self._status.set_failed(
@ -965,9 +980,22 @@ class ProjectPushItemProcess:
)
self._version_entity = version_entity
def _make_sure_task_exists(self, folder_entity: Dict[str, Any]) -> str:
def _make_sure_task_exists(
self,
folder_entity: Dict[str, Any],
task_info: Dict[str, Any],
):
"""Creates destination task from source task information"""
project_name = self._item.dst_project_name
_task_id = ayon_api.create_task(
project_name,
task_info["name"],
folder_id=folder_entity["id"],
task_type=task_info["taskType"],
attrib=task_info["attrib"],
)
def _get_src_task_info(self):
src_version_entity = self._src_version_entity
src_task = ayon_api.get_task_by_id(
self._item.src_project_name, src_version_entity["taskId"]
@ -977,15 +1005,7 @@ class ProjectPushItemProcess:
f"No task selected and couldn't find source task"
)
raise PushToProjectError(self._status.fail_reason)
_task_id = ayon_api.create_task(
project_name,
src_task["name"],
folder_id=folder_entity["id"],
task_type=src_task["taskType"],
attrib=src_task["attrib"],
)
return src_task["name"]
return src_task
def _integrate_representations(self):
try: