use task status profiles to change task status id on creation

This commit is contained in:
Jakub Trllo 2022-08-16 11:55:23 +02:00
parent 345a476159
commit 088a2d2003

View file

@ -1,9 +1,12 @@
import sys
import collections
import six
import pyblish.api
from copy import deepcopy
import pyblish.api
from openpype.client import get_asset_by_id
from openpype.lib import filter_profiles
# Copy of constant `openpype_modules.ftrack.lib.avalon_sync.CUST_ATTR_AUTO_SYNC`
@ -73,6 +76,7 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
"traypublisher"
]
optional = False
create_task_status_profiles = []
def process(self, context):
self.context = context
@ -82,14 +86,16 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
hierarchy_context = self._get_active_assets(context)
self.log.debug("__ hierarchy_context: {}".format(hierarchy_context))
self.session = self.context.data["ftrackSession"]
session = self.context.data["ftrackSession"]
project_name = self.context.data["projectEntity"]["name"]
query = 'Project where full_name is "{}"'.format(project_name)
project = self.session.query(query).one()
auto_sync_state = project[
"custom_attributes"][CUST_ATTR_AUTO_SYNC]
project = session.query(query).one()
auto_sync_state = project["custom_attributes"][CUST_ATTR_AUTO_SYNC]
self.ft_project = None
self.session = session
self.ft_project = project
self.task_types = self.get_all_task_types(project)
self.task_statuses = self.get_task_statuses(project)
# disable termporarily ftrack project's autosyncing
if auto_sync_state:
@ -121,10 +127,7 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
self.log.debug(entity_type)
if entity_type.lower() == 'project':
query = 'Project where full_name is "{}"'.format(entity_name)
entity = self.session.query(query).one()
self.ft_project = entity
self.task_types = self.get_all_task_types(entity)
entity = self.ft_project
elif self.ft_project is None or parent is None:
raise AssertionError(
@ -217,13 +220,6 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
task_type=task_type,
parent=entity
)
try:
self.session.commit()
except Exception:
tp, value, tb = sys.exc_info()
self.session.rollback()
self.session._configure_locations()
six.reraise(tp, value, tb)
# Incoming links.
self.create_links(project_name, entity_data, entity)
@ -303,7 +299,37 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
return tasks
def get_task_statuses(self, project_entity):
project_schema = project_entity["project_schema"]
task_workflow_statuses = project_schema["_task_workflow"]["statuses"]
return {
status["id"]: status
for status in task_workflow_statuses
}
def create_task(self, name, task_type, parent):
filter_data = {
"task_names": name,
"task_types": task_type
}
profile = filter_profiles(
self.create_task_status_profiles,
filter_data
)
status_id = None
if profile:
status_name = profile["status_name"]
status_name_low = status_name.lower()
for _status_id, status in self.task_statuses.items():
if status["name"].lower() == status_name_low:
status_id = _status_id
break
if status_id is None:
self.log.warning(
"Task status \"{}\" was not found".format(status_name)
)
task = self.session.create('Task', {
'name': name,
'parent': parent
@ -312,6 +338,8 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
self.log.info(task_type)
self.log.info(self.task_types)
task['type'] = self.task_types[task_type]
if status_id is not None:
task["status_id"] = status_id
try:
self.session.commit()