diff --git a/openpype/modules/ftrack/event_handlers_server/event_push_frame_values_to_task.py b/openpype/modules/ftrack/event_handlers_server/event_push_frame_values_to_task.py index c0b3137455..81719258e1 100644 --- a/openpype/modules/ftrack/event_handlers_server/event_push_frame_values_to_task.py +++ b/openpype/modules/ftrack/event_handlers_server/event_push_frame_values_to_task.py @@ -2,7 +2,10 @@ import collections import datetime import ftrack_api -from openpype.modules.ftrack.lib import BaseEvent +from openpype.modules.ftrack.lib import ( + BaseEvent, + query_custom_attributes +) class PushFrameValuesToTaskEvent(BaseEvent): @@ -55,10 +58,6 @@ class PushFrameValuesToTaskEvent(BaseEvent): if entity_info.get("entityType") != "task": continue - # Skip `Task` entity type - if entity_info["entity_type"].lower() == "task": - continue - # Care only about changes of status changes = entity_info.get("changes") if not changes: @@ -74,6 +73,14 @@ class PushFrameValuesToTaskEvent(BaseEvent): if project_id is None: continue + # Skip `Task` entity type if parent didn't change + if entity_info["entity_type"].lower() == "task": + if ( + "parent_id" not in changes + or changes["parent_id"]["new"] is None + ): + continue + if project_id not in entities_info_by_project_id: entities_info_by_project_id[project_id] = [] entities_info_by_project_id[project_id].append(entity_info) @@ -117,11 +124,24 @@ class PushFrameValuesToTaskEvent(BaseEvent): )) return + interest_attributes = set(interest_attributes) + interest_entity_types = set(interest_entity_types) + + # Separate value changes and task parent changes + _entities_info = [] + task_parent_changes = [] + for entity_info in entities_info: + if entity_info["entity_type"].lower() == "task": + task_parent_changes.append(entity_info) + else: + _entities_info.append(entity_info) + entities_info = _entities_info + # Filter entities info with changes interesting_data, changed_keys_by_object_id = self.filter_changes( session, event, entities_info, interest_attributes ) - if not interesting_data: + if not interesting_data and not task_parent_changes: return # Prepare object types @@ -131,6 +151,289 @@ class PushFrameValuesToTaskEvent(BaseEvent): name_low = object_type["name"].lower() object_types_by_name[name_low] = object_type + # NOTE it would be nice to check if `interesting_data` do not contain + # value changs of tasks that were created or moved + # - it is a complex way how to find out + if interesting_data: + self.process_attribute_changes( + session, object_types_by_name, + interesting_data, changed_keys_by_object_id, + interest_entity_types, interest_attributes + ) + + if task_parent_changes: + self.process_task_parent_change( + session, object_types_by_name, task_parent_changes, + interest_entity_types, interest_attributes + ) + + def process_task_parent_change( + self, session, object_types_by_name, task_parent_changes, + interest_entity_types, interest_attributes + ): + """Push custom attribute values if task parent has changed. + + Parent is changed if task is created or if is moved under different + entity. We don't care about all task changes only about those that + have it's parent in interest types (from settings). + + Tasks hierarchical value should be unset or set based on parents + real hierarchical value and non hierarchical custom attribute value + should be set to hierarchical value. + """ + # Store task ids which were created or moved under parent with entity + # type defined in settings (interest_entity_types). + task_ids = set() + # Store parent ids of matching task ids + matching_parent_ids = set() + # Store all entity ids of all entities to be able query hierarchical + # values. + whole_hierarchy_ids = set() + # Store parent id of each entity id + parent_id_by_entity_id = {} + for entity_info in task_parent_changes: + # Ignore entities with less parents than 2 + # NOTE entity itself is also part of "parents" value + parents = entity_info.get("parents") or [] + if len(parents) < 2: + continue + + parent_info = parents[1] + # Check if parent has entity type we care about. + if parent_info["entity_type"] not in interest_entity_types: + continue + + task_ids.add(entity_info["entityId"]) + matching_parent_ids.add(parent_info["entityId"]) + + # Store whole hierarchi of task entity + prev_id = None + for item in parents: + item_id = item["entityId"] + whole_hierarchy_ids.add(item_id) + + if prev_id is None: + prev_id = item_id + continue + + parent_id_by_entity_id[prev_id] = item_id + if item["entityType"] == "show": + break + prev_id = item_id + + # Just skip if nothing is interesting for our settings + if not matching_parent_ids: + return + + # Query object type ids of parent ids for custom attribute + # definitions query + entities = session.query( + "select object_type_id from TypedContext where id in ({})".format( + self.join_query_keys(matching_parent_ids) + ) + ) + + # Prepare task object id + task_object_id = object_types_by_name["task"]["id"] + + # All object ids for which we're querying custom attribute definitions + object_type_ids = set() + object_type_ids.add(task_object_id) + for entity in entities: + object_type_ids.add(entity["object_type_id"]) + + attrs_by_obj_id, hier_attrs = self.attrs_configurations( + session, object_type_ids, interest_attributes + ) + + # Skip if all task attributes are not available + task_attrs = attrs_by_obj_id.get(task_object_id) + if not task_attrs: + return + + # Skip attributes that is not in both hierarchical and nonhierarchical + # TODO be able to push values if hierarchical is available + for key in interest_attributes: + if key not in hier_attrs: + task_attrs.pop(key, None) + + elif key not in task_attrs: + hier_attrs.pop(key) + + # Skip if nothing remained + if not task_attrs: + return + + # Do some preparations for custom attribute values query + attr_key_by_id = {} + nonhier_id_by_key = {} + hier_attr_ids = [] + for key, attr_id in hier_attrs.items(): + attr_key_by_id[attr_id] = key + hier_attr_ids.append(attr_id) + + conf_ids = list(hier_attr_ids) + for key, attr_id in task_attrs.items(): + attr_key_by_id[attr_id] = key + nonhier_id_by_key[key] = attr_id + conf_ids.append(attr_id) + + # Query custom attribute values + # - result does not contain values for all entities only result of + # query callback to ftrack server + result = query_custom_attributes( + session, conf_ids, whole_hierarchy_ids + ) + + # Prepare variables where result will be stored + # - hierachical values should not contain attribute with value by + # default + hier_values_by_entity_id = { + entity_id: {} + for entity_id in whole_hierarchy_ids + } + # - real values of custom attributes + values_by_entity_id = { + entity_id: { + attr_id: None + for attr_id in conf_ids + } + for entity_id in whole_hierarchy_ids + } + for item in result: + attr_id = item["configuration_id"] + entity_id = item["entity_id"] + value = item["value"] + + values_by_entity_id[entity_id][attr_id] = value + + if attr_id in hier_attr_ids and value is not None: + hier_values_by_entity_id[entity_id][attr_id] = value + + # Prepare values for all task entities + # - going through all parents and storing first value value + # - store None to those that are already known that do not have set + # value at all + for task_id in tuple(task_ids): + for attr_id in hier_attr_ids: + entity_ids = [] + value = None + entity_id = task_id + while value is None: + entity_value = hier_values_by_entity_id[entity_id] + if attr_id in entity_value: + value = entity_value[attr_id] + if value is None: + break + + if value is None: + entity_ids.append(entity_id) + + entity_id = parent_id_by_entity_id.get(entity_id) + if entity_id is None: + break + + for entity_id in entity_ids: + hier_values_by_entity_id[entity_id][attr_id] = value + + # Prepare changes to commit + changes = [] + for task_id in tuple(task_ids): + parent_id = parent_id_by_entity_id[task_id] + for attr_id in hier_attr_ids: + attr_key = attr_key_by_id[attr_id] + nonhier_id = nonhier_id_by_key[attr_key] + + # Real value of hierarchical attribute on parent + # - If is none then should be unset + real_parent_value = values_by_entity_id[parent_id][attr_id] + # Current hierarchical value of a task + # - Will be compared to real parent value + hier_value = hier_values_by_entity_id[task_id][attr_id] + + # Parent value that can be inherited from it's parent entity + parent_value = hier_values_by_entity_id[parent_id][attr_id] + # Task value of nonhierarchical custom attribute + nonhier_value = values_by_entity_id[task_id][nonhier_id] + + if real_parent_value != hier_value: + changes.append({ + "new_value": real_parent_value, + "attr_id": attr_id, + "entity_id": task_id, + "attr_key": attr_key + }) + + if parent_value != nonhier_value: + changes.append({ + "new_value": parent_value, + "attr_id": nonhier_id, + "entity_id": task_id, + "attr_key": attr_key + }) + + self._commit_changes(session, changes) + + def _commit_changes(self, session, changes): + uncommited_changes = False + for idx, item in enumerate(changes): + new_value = item["new_value"] + attr_id = item["attr_id"] + entity_id = item["entity_id"] + attr_key = item["attr_key"] + + entity_key = collections.OrderedDict() + entity_key["configuration_id"] = attr_id + entity_key["entity_id"] = entity_id + self._cached_changes.append({ + "attr_key": attr_key, + "entity_id": entity_id, + "value": new_value, + "time": datetime.datetime.now() + }) + if new_value is None: + op = ftrack_api.operation.DeleteEntityOperation( + "CustomAttributeValue", + entity_key + ) + else: + op = ftrack_api.operation.UpdateEntityOperation( + "ContextCustomAttributeValue", + entity_key, + "value", + ftrack_api.symbol.NOT_SET, + new_value + ) + + session.recorded_operations.push(op) + self.log.info(( + "Changing Custom Attribute \"{}\" to value" + " \"{}\" on entity: {}" + ).format(attr_key, new_value, entity_id)) + + if (idx + 1) % 20 == 0: + uncommited_changes = False + try: + session.commit() + except Exception: + session.rollback() + self.log.warning( + "Changing of values failed.", exc_info=True + ) + else: + uncommited_changes = True + if uncommited_changes: + try: + session.commit() + except Exception: + session.rollback() + self.log.warning("Changing of values failed.", exc_info=True) + + def process_attribute_changes( + self, session, object_types_by_name, + interesting_data, changed_keys_by_object_id, + interest_entity_types, interest_attributes + ): # Prepare task object id task_object_id = object_types_by_name["task"]["id"] @@ -216,13 +519,13 @@ class PushFrameValuesToTaskEvent(BaseEvent): task_entity_ids.add(task_id) parent_id_by_task_id[task_id] = task_entity["parent_id"] - self.finalize( + self.finalize_attribute_changes( session, interesting_data, changed_keys, attrs_by_obj_id, hier_attrs, task_entity_ids, parent_id_by_task_id ) - def finalize( + def finalize_attribute_changes( self, session, interesting_data, changed_keys, attrs_by_obj_id, hier_attrs, task_entity_ids, parent_id_by_task_id @@ -248,6 +551,7 @@ class PushFrameValuesToTaskEvent(BaseEvent): session, attr_ids, entity_ids, task_entity_ids, hier_attrs ) + changes = [] for entity_id, current_values in current_values_by_id.items(): parent_id = parent_id_by_task_id.get(entity_id) if not parent_id: @@ -272,39 +576,13 @@ class PushFrameValuesToTaskEvent(BaseEvent): if new_value == old_value: continue - entity_key = collections.OrderedDict() - entity_key["configuration_id"] = attr_id - entity_key["entity_id"] = entity_id - self._cached_changes.append({ - "attr_key": attr_key, + changes.append({ + "new_value": new_value, + "attr_id": attr_id, "entity_id": entity_id, - "value": new_value, - "time": datetime.datetime.now() + "attr_key": attr_key }) - if new_value is None: - op = ftrack_api.operation.DeleteEntityOperation( - "CustomAttributeValue", - entity_key - ) - else: - op = ftrack_api.operation.UpdateEntityOperation( - "ContextCustomAttributeValue", - entity_key, - "value", - ftrack_api.symbol.NOT_SET, - new_value - ) - - session.recorded_operations.push(op) - self.log.info(( - "Changing Custom Attribute \"{}\" to value" - " \"{}\" on entity: {}" - ).format(attr_key, new_value, entity_id)) - try: - session.commit() - except Exception: - session.rollback() - self.log.warning("Changing of values failed.", exc_info=True) + self._commit_changes(session, changes) def filter_changes( self, session, event, entities_info, interest_attributes diff --git a/openpype/modules/ftrack/lib/__init__.py b/openpype/modules/ftrack/lib/__init__.py index ce6d5284b6..9dc2d67279 100644 --- a/openpype/modules/ftrack/lib/__init__.py +++ b/openpype/modules/ftrack/lib/__init__.py @@ -13,7 +13,8 @@ from .custom_attributes import ( default_custom_attributes_definition, app_definitions_from_app_manager, tool_definitions_from_app_manager, - get_openpype_attr + get_openpype_attr, + query_custom_attributes ) from . import avalon_sync @@ -37,6 +38,7 @@ __all__ = ( "app_definitions_from_app_manager", "tool_definitions_from_app_manager", "get_openpype_attr", + "query_custom_attributes", "avalon_sync", diff --git a/openpype/modules/ftrack/lib/custom_attributes.py b/openpype/modules/ftrack/lib/custom_attributes.py index f6b82c90b1..53facd4ab2 100644 --- a/openpype/modules/ftrack/lib/custom_attributes.py +++ b/openpype/modules/ftrack/lib/custom_attributes.py @@ -81,3 +81,60 @@ def get_openpype_attr(session, split_hierarchical=True, query_keys=None): return custom_attributes, hier_custom_attributes return custom_attributes + + +def join_query_keys(keys): + """Helper to join keys to query.""" + return ",".join(["\"{}\"".format(key) for key in keys]) + + +def query_custom_attributes(session, conf_ids, entity_ids, table_name=None): + """Query custom attribute values from ftrack database. + + Using ftrack call method result may differ based on used table name and + version of ftrack server. + + Args: + session(ftrack_api.Session): Connected ftrack session. + conf_id(list, set, tuple): Configuration(attribute) ids which are + queried. + entity_ids(list, set, tuple): Entity ids for which are values queried. + table_name(str): Table nam from which values are queried. Not + recommended to change until you know what it means. + """ + output = [] + # Just skip + if not conf_ids or not entity_ids: + return output + + if table_name is None: + table_name = "ContextCustomAttributeValue" + + # Prepare values to query + attributes_joined = join_query_keys(conf_ids) + attributes_len = len(conf_ids) + + # Query values in chunks + chunk_size = int(5000 / attributes_len) + # Make sure entity_ids is `list` for chunk selection + entity_ids = list(entity_ids) + for idx in range(0, len(entity_ids), chunk_size): + entity_ids_joined = join_query_keys( + entity_ids[idx:idx + chunk_size] + ) + + call_expr = [{ + "action": "query", + "expression": ( + "select value, entity_id from {}" + " where entity_id in ({}) and configuration_id in ({})" + ).format(table_name, entity_ids_joined, attributes_joined) + }] + if hasattr(session, "call"): + [result] = session.call(call_expr) + else: + [result] = session._call(call_expr) + + for item in result["data"]: + output.append(item) + return output