Merge pull request #4126 from pypeclub/feature/OP-4499_Backend---editorial-asset-sync-issue

Ftrack: Editorial asset sync issue
This commit is contained in:
Jakub Trllo 2022-11-23 17:56:03 +01:00 committed by GitHub
commit 9fb726d62b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -7,10 +7,8 @@ import pyblish.api
from openpype.client import get_asset_by_id
from openpype.lib import filter_profiles
from openpype.pipeline import KnownPublishError
# Copy of constant `openpype_modules.ftrack.lib.avalon_sync.CUST_ATTR_AUTO_SYNC`
CUST_ATTR_AUTO_SYNC = "avalon_auto_sync"
CUST_ATTR_GROUP = "openpype"
@ -19,7 +17,6 @@ CUST_ATTR_GROUP = "openpype"
def get_pype_attr(session, split_hierarchical=True):
custom_attributes = []
hier_custom_attributes = []
# TODO remove deprecated "avalon" group from query
cust_attrs_query = (
"select id, entity_type, object_type_id, is_hierarchical, default"
" from CustomAttributeConfiguration"
@ -79,120 +76,284 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
create_task_status_profiles = []
def process(self, context):
self.context = context
if "hierarchyContext" not in self.context.data:
if "hierarchyContext" not in context.data:
return
hierarchy_context = self._get_active_assets(context)
self.log.debug("__ hierarchy_context: {}".format(hierarchy_context))
session = self.context.data["ftrackSession"]
project_name = self.context.data["projectEntity"]["name"]
query = 'Project where full_name is "{}"'.format(project_name)
project = session.query(query).one()
auto_sync_state = project["custom_attributes"][CUST_ATTR_AUTO_SYNC]
session = context.data["ftrackSession"]
project_name = context.data["projectName"]
project = session.query(
'select id, full_name from Project where full_name is "{}"'.format(
project_name
)
).first()
if not project:
raise KnownPublishError(
"Project \"{}\" was not found on ftrack.".format(project_name)
)
self.context = context
self.session = session
self.ft_project = project
self.task_types = self.get_all_task_types(project)
self.task_statuses = self.get_task_statuses(project)
# disable termporarily ftrack project's autosyncing
if auto_sync_state:
self.auto_sync_off(project)
# import ftrack hierarchy
self.import_to_ftrack(project_name, hierarchy_context)
try:
# import ftrack hierarchy
self.import_to_ftrack(project_name, hierarchy_context)
except Exception:
raise
finally:
if auto_sync_state:
self.auto_sync_on(project)
def query_ftrack_entitites(self, session, ft_project):
project_id = ft_project["id"]
entities = session.query((
"select id, name, parent_id"
" from TypedContext where project_id is \"{}\""
).format(project_id)).all()
def import_to_ftrack(self, project_name, input_data, parent=None):
entities_by_id = {}
entities_by_parent_id = collections.defaultdict(list)
for entity in entities:
entities_by_id[entity["id"]] = entity
parent_id = entity["parent_id"]
entities_by_parent_id[parent_id].append(entity)
ftrack_hierarchy = []
ftrack_id_queue = collections.deque()
ftrack_id_queue.append((project_id, ftrack_hierarchy))
while ftrack_id_queue:
item = ftrack_id_queue.popleft()
ftrack_id, parent_list = item
if ftrack_id == project_id:
entity = ft_project
name = entity["full_name"]
else:
entity = entities_by_id[ftrack_id]
name = entity["name"]
children = []
parent_list.append({
"name": name,
"low_name": name.lower(),
"entity": entity,
"children": children,
})
for child in entities_by_parent_id[ftrack_id]:
ftrack_id_queue.append((child["id"], children))
return ftrack_hierarchy
def find_matching_ftrack_entities(
self, hierarchy_context, ftrack_hierarchy
):
walk_queue = collections.deque()
for entity_name, entity_data in hierarchy_context.items():
walk_queue.append(
(entity_name, entity_data, ftrack_hierarchy)
)
matching_ftrack_entities = []
while walk_queue:
item = walk_queue.popleft()
entity_name, entity_data, ft_children = item
matching_ft_child = None
for ft_child in ft_children:
if ft_child["low_name"] == entity_name.lower():
matching_ft_child = ft_child
break
if matching_ft_child is None:
continue
entity = matching_ft_child["entity"]
entity_data["ft_entity"] = entity
matching_ftrack_entities.append(entity)
hierarchy_children = entity_data.get("childs")
if not hierarchy_children:
continue
for child_name, child_data in hierarchy_children.items():
walk_queue.append(
(child_name, child_data, matching_ft_child["children"])
)
return matching_ftrack_entities
def query_custom_attribute_values(self, session, entities, hier_attrs):
attr_ids = {
attr["id"]
for attr in hier_attrs
}
entity_ids = {
entity["id"]
for entity in entities
}
output = {
entity_id: {}
for entity_id in entity_ids
}
if not attr_ids or not entity_ids:
return {}
joined_attr_ids = ",".join(
['"{}"'.format(attr_id) for attr_id in attr_ids]
)
# Query values in chunks
chunk_size = int(5000 / len(attr_ids))
# Make sure entity_ids is `list` for chunk selection
entity_ids = list(entity_ids)
results = []
for idx in range(0, len(entity_ids), chunk_size):
joined_entity_ids = ",".join([
'"{}"'.format(entity_id)
for entity_id in entity_ids[idx:idx + chunk_size]
])
results.extend(
session.query(
(
"select value, entity_id, configuration_id"
" from CustomAttributeValue"
" where entity_id in ({}) and configuration_id in ({})"
).format(
joined_entity_ids,
joined_attr_ids
)
).all()
)
for result in results:
attr_id = result["configuration_id"]
entity_id = result["entity_id"]
output[entity_id][attr_id] = result["value"]
return output
def import_to_ftrack(self, project_name, hierarchy_context):
# Prequery hiearchical custom attributes
hier_custom_attributes = get_pype_attr(self.session)[1]
hier_attrs = get_pype_attr(self.session)[1]
hier_attr_by_key = {
attr["key"]: attr
for attr in hier_custom_attributes
for attr in hier_attrs
}
# Query user entity (for comments)
user = self.session.query(
"User where username is \"{}\"".format(self.session.api_user)
).first()
if not user:
self.log.warning(
"Was not able to query current User {}".format(
self.session.api_user
)
)
# Query ftrack hierarchy with parenting
ftrack_hierarchy = self.query_ftrack_entitites(
self.session, self.ft_project)
# Fill ftrack entities to hierarchy context
# - there is no need to query entities again
matching_entities = self.find_matching_ftrack_entities(
hierarchy_context, ftrack_hierarchy)
# Query custom attribute values of each entity
custom_attr_values_by_id = self.query_custom_attribute_values(
self.session, matching_entities, hier_attrs)
# Get ftrack api module (as they are different per python version)
ftrack_api = self.context.data["ftrackPythonModule"]
for entity_name in input_data:
entity_data = input_data[entity_name]
# Use queue of hierarchy items to process
import_queue = collections.deque()
for entity_name, entity_data in hierarchy_context.items():
import_queue.append(
(entity_name, entity_data, None)
)
while import_queue:
item = import_queue.popleft()
entity_name, entity_data, parent = item
entity_type = entity_data['entity_type']
self.log.debug(entity_data)
self.log.debug(entity_type)
if entity_type.lower() == 'project':
entity = self.ft_project
elif self.ft_project is None or parent is None:
entity = entity_data.get("ft_entity")
if entity is None and entity_type.lower() == "project":
raise AssertionError(
"Collected items are not in right order!"
)
# try to find if entity already exists
else:
query = (
'TypedContext where name is "{0}" and '
'project_id is "{1}"'
).format(entity_name, self.ft_project["id"])
try:
entity = self.session.query(query).one()
except Exception:
entity = None
# Create entity if not exists
if entity is None:
entity = self.create_entity(
name=entity_name,
type=entity_type,
parent=parent
)
entity = self.session.create(entity_type, {
"name": entity_name,
"parent": parent
})
entity_data["ft_entity"] = entity
# self.log.info('entity: {}'.format(dict(entity)))
# CUSTOM ATTRIBUTES
custom_attributes = entity_data.get('custom_attributes', [])
instances = [
instance
for instance in self.context
if instance.data.get("asset") == entity["name"]
]
custom_attributes = entity_data.get('custom_attributes', {})
instances = []
for instance in self.context:
instance_asset_name = instance.data.get("asset")
if (
instance_asset_name
and instance_asset_name.lower() == entity["name"].lower()
):
instances.append(instance)
for instance in instances:
instance.data["ftrackEntity"] = entity
for key in custom_attributes:
for key, cust_attr_value in custom_attributes.items():
if cust_attr_value is None:
continue
hier_attr = hier_attr_by_key.get(key)
# Use simple method if key is not hierarchical
if not hier_attr:
assert (key in entity['custom_attributes']), (
'Missing custom attribute key: `{0}` in attrs: '
'`{1}`'.format(key, entity['custom_attributes'].keys())
if key not in entity["custom_attributes"]:
raise KnownPublishError((
"Missing custom attribute in ftrack with name '{}'"
).format(key))
entity['custom_attributes'][key] = cust_attr_value
continue
attr_id = hier_attr["id"]
entity_values = custom_attr_values_by_id.get(entity["id"], {})
# New value is defined by having id in values
# - it can be set to 'None' (ftrack allows that using API)
is_new_value = attr_id not in entity_values
attr_value = entity_values.get(attr_id)
# Use ftrack operations method to set hiearchical
# attribute value.
# - this is because there may be non hiearchical custom
# attributes with different properties
entity_key = collections.OrderedDict((
("configuration_id", hier_attr["id"]),
("entity_id", entity["id"])
))
op = None
if is_new_value:
op = ftrack_api.operation.CreateEntityOperation(
"CustomAttributeValue",
entity_key,
{"value": cust_attr_value}
)
entity['custom_attributes'][key] = custom_attributes[key]
else:
# Use ftrack operations method to set hiearchical
# attribute value.
# - this is because there may be non hiearchical custom
# attributes with different properties
entity_key = collections.OrderedDict()
entity_key["configuration_id"] = hier_attr["id"]
entity_key["entity_id"] = entity["id"]
self.session.recorded_operations.push(
ftrack_api.operation.UpdateEntityOperation(
"ContextCustomAttributeValue",
entity_key,
"value",
ftrack_api.symbol.NOT_SET,
custom_attributes[key]
)
elif attr_value != cust_attr_value:
op = ftrack_api.operation.UpdateEntityOperation(
"CustomAttributeValue",
entity_key,
"value",
attr_value,
cust_attr_value
)
if op is not None:
self.session.recorded_operations.push(op)
if self.session.recorded_operations:
try:
self.session.commit()
except Exception:
@ -206,7 +367,7 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
for instance in instances:
task_name = instance.data.get("task")
if task_name:
instances_by_task_name[task_name].append(instance)
instances_by_task_name[task_name.lower()].append(instance)
tasks = entity_data.get('tasks', [])
existing_tasks = []
@ -247,30 +408,28 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
six.reraise(tp, value, tb)
# Create notes.
user = self.session.query(
"User where username is \"{}\"".format(self.session.api_user)
).first()
if user:
for comment in entity_data.get("comments", []):
entity_comments = entity_data.get("comments")
if user and entity_comments:
for comment in entity_comments:
entity.create_note(comment, user)
else:
self.log.warning(
"Was not able to query current User {}".format(
self.session.api_user
)
)
try:
self.session.commit()
except Exception:
tp, value, tb = sys.exc_info()
self.session.rollback()
self.session._configure_locations()
six.reraise(tp, value, tb)
try:
self.session.commit()
except Exception:
tp, value, tb = sys.exc_info()
self.session.rollback()
self.session._configure_locations()
six.reraise(tp, value, tb)
# Import children.
if 'childs' in entity_data:
self.import_to_ftrack(
project_name, entity_data['childs'], entity)
children = entity_data.get("childs")
if not children:
continue
for entity_name, entity_data in children.items():
import_queue.append(
(entity_name, entity_data, entity)
)
def create_links(self, project_name, entity_data, entity):
# Clear existing links.
@ -366,48 +525,6 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
return task
def create_entity(self, name, type, parent):
entity = self.session.create(type, {
'name': name,
'parent': parent
})
try:
self.session.commit()
except Exception:
tp, value, tb = sys.exc_info()
self.session.rollback()
self.session._configure_locations()
six.reraise(tp, value, tb)
return entity
def auto_sync_off(self, project):
project["custom_attributes"][CUST_ATTR_AUTO_SYNC] = False
self.log.info("Ftrack autosync swithed off")
try:
self.session.commit()
except Exception:
tp, value, tb = sys.exc_info()
self.session.rollback()
self.session._configure_locations()
six.reraise(tp, value, tb)
def auto_sync_on(self, project):
project["custom_attributes"][CUST_ATTR_AUTO_SYNC] = True
self.log.info("Ftrack autosync swithed on")
try:
self.session.commit()
except Exception:
tp, value, tb = sys.exc_info()
self.session.rollback()
self.session._configure_locations()
six.reraise(tp, value, tb)
def _get_active_assets(self, context):
""" Returns only asset dictionary.
Usually the last part of deep dictionary which
@ -429,19 +546,17 @@ class IntegrateHierarchyToFtrack(pyblish.api.ContextPlugin):
hierarchy_context = context.data["hierarchyContext"]
active_assets = []
active_assets = set()
# filter only the active publishing insatnces
for instance in context:
if instance.data.get("publish") is False:
continue
if not instance.data.get("asset"):
continue
active_assets.append(instance.data["asset"])
asset_name = instance.data.get("asset")
if asset_name:
active_assets.add(asset_name)
# remove duplicity in list
active_assets = list(set(active_assets))
self.log.debug("__ active_assets: {}".format(active_assets))
self.log.debug("__ active_assets: {}".format(list(active_assets)))
return get_pure_hierarchy_data(hierarchy_context)