Merge pull request #1503 from ynput/enhancement/YN-0132_create_source_task_if_no_selected

Library: copy and create source task if no task selected
This commit is contained in:
Petr Kalis 2025-10-24 16:07:41 +02:00 committed by GitHub
commit 1b201a755a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -5,7 +5,7 @@ import itertools
import sys
import traceback
import uuid
from typing import Optional, Dict, Any
from typing import Optional, Any
import ayon_api
from ayon_api.utils import create_entity_id
@ -225,8 +225,8 @@ class ProjectPushRepreItem:
but filenames are not template based.
Args:
repre_entity (Dict[str, Ant]): Representation entity.
roots (Dict[str, str]): Project roots (based on project anatomy).
repre_entity (dict[str, Ant]): Representation entity.
roots (dict[str, str]): Project roots (based on project anatomy).
"""
def __init__(self, repre_entity, roots):
@ -482,6 +482,8 @@ class ProjectPushItemProcess:
self._log_info("Destination project was found")
self._fill_or_create_destination_folder()
self._log_info("Destination folder was determined")
self._fill_or_create_destination_task()
self._log_info("Destination task was determined")
self._determine_product_type()
self._determine_publish_template_name()
self._determine_product_name()
@ -750,7 +752,6 @@ class ProjectPushItemProcess:
def _fill_or_create_destination_folder(self):
dst_project_name = self._item.dst_project_name
dst_folder_id = self._item.dst_folder_id
dst_task_name = self._item.dst_task_name
new_folder_name = self._item.new_folder_name
if not dst_folder_id and not new_folder_name:
self._status.set_failed(
@ -781,9 +782,11 @@ class ProjectPushItemProcess:
new_folder_name
)
self._folder_entity = folder_entity
if not dst_task_name:
self._task_info = {}
return
def _fill_or_create_destination_task(self):
folder_entity = self._folder_entity
dst_task_name = self._item.dst_task_name
dst_project_name = self._item.dst_project_name
folder_path = folder_entity["path"]
folder_tasks = {
@ -792,6 +795,20 @@ class ProjectPushItemProcess:
dst_project_name, folder_ids=[folder_entity["id"]]
)
}
if not dst_task_name:
src_task_info = self._get_src_task_info()
if not src_task_info: # really no task selected nor on source
self._task_info = {}
return
dst_task_name = src_task_info["name"]
if dst_task_name.lower() not in folder_tasks:
task_info = self._make_sure_task_exists(
folder_entity, src_task_info
)
folder_tasks[dst_task_name.lower()] = task_info
task_info = folder_tasks.get(dst_task_name.lower())
if not task_info:
self._status.set_failed(
@ -810,7 +827,10 @@ class ProjectPushItemProcess:
task_type["name"]: task_type
for task_type in self._project_entity["taskTypes"]
}
task_type_info = task_types_by_name.get(task_type_name, {})
task_type_info = copy.deepcopy(
task_types_by_name.get(task_type_name, {})
)
task_type_info.pop("name") # do not overwrite real task name
task_info.update(task_type_info)
self._task_info = task_info
@ -945,8 +965,8 @@ class ProjectPushItemProcess:
version = get_versioning_start(
project_name,
self.host_name,
task_name=self._task_info["name"],
task_type=self._task_info["taskType"],
task_name=self._task_info.get("name"),
task_type=self._task_info.get("taskType"),
product_type=product_type,
product_name=product_entity["name"],
)
@ -970,10 +990,16 @@ class ProjectPushItemProcess:
existing_version_entity["attrib"].update(dst_attrib)
self._version_entity = existing_version_entity
return
copied_tags = self._get_transferable_tags(src_version_entity)
copied_status = self._get_transferable_status(src_version_entity)
version_entity = new_version_entity(
version,
product_id,
author=src_version_entity["author"],
status=copied_status,
tags=copied_tags,
task_id=self._task_info.get("id"),
attribs=dst_attrib,
thumbnail_id=thumbnail_id,
)
@ -982,6 +1008,47 @@ class ProjectPushItemProcess:
)
self._version_entity = version_entity
def _make_sure_task_exists(
self,
folder_entity: dict[str, Any],
task_info: dict[str, Any],
) -> dict[str, Any]:
"""Creates destination task from source task information"""
project_name = self._item.dst_project_name
found_task_type = False
src_task_type = task_info["taskType"]
for task_type in self._project_entity["taskTypes"]:
if task_type["name"].lower() == src_task_type.lower():
found_task_type = True
break
if not found_task_type:
self._status.set_failed(
f"'{src_task_type}' task type is not configured in "
"project Anatomy."
)
raise PushToProjectError(self._status.fail_reason)
task_info = self._operations.create_task(
project_name,
task_info["name"],
folder_id=folder_entity["id"],
task_type=src_task_type,
attrib=task_info["attrib"],
)
self._task_info = task_info.data
return self._task_info
def _get_src_task_info(self):
src_version_entity = self._src_version_entity
if not src_version_entity["taskId"]:
return None
src_task = ayon_api.get_task_by_id(
self._item.src_project_name, src_version_entity["taskId"]
)
return src_task
def _integrate_representations(self):
try:
self._real_integrate_representations()
@ -1217,10 +1284,12 @@ class ProjectPushItemProcess:
if context_value and isinstance(context_value, dict):
for context_sub_key in context_value.keys():
value_to_update = formatting_data.get(context_key, {}).get(
context_sub_key)
context_sub_key
)
if value_to_update:
repre_context[context_key][
context_sub_key] = value_to_update
repre_context[context_key][context_sub_key] = (
value_to_update
)
else:
value_to_update = formatting_data.get(context_key)
if value_to_update:
@ -1229,6 +1298,28 @@ class ProjectPushItemProcess:
repre_context.pop("task", None)
return repre_context
def _get_transferable_tags(self, src_version_entity):
"""Copy over only tags present in destination project"""
dst_project_tags = [
tag["name"] for tag in self._project_entity["tags"]
]
copied_tags = []
for src_tag in src_version_entity["tags"]:
if src_tag in dst_project_tags:
copied_tags.append(src_tag)
return copied_tags
def _get_transferable_status(self, src_version_entity):
"""Copy over status, first status if not matching found"""
dst_project_statuses = {
status["name"]: status
for status in self._project_entity["statuses"]
}
copied_status = dst_project_statuses.get(src_version_entity["status"])
if copied_status:
return copied_status["name"]
return None
class IntegrateModel:
def __init__(self, controller):
@ -1301,6 +1392,6 @@ class IntegrateModel:
return
item.integrate()
def get_items(self) -> Dict[str, ProjectPushItemProcess]:
def get_items(self) -> dict[str, ProjectPushItemProcess]:
"""Returns dict of all ProjectPushItemProcess items """
return self._process_items