Merge pull request #623 from pypeclub/bugfix/task_processing_in_event_sync

Task processing in event sync
This commit is contained in:
Milan Kolar 2020-10-09 10:49:50 +02:00 committed by GitHub
commit a25bf0916b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -42,10 +42,13 @@ class SyncToAvalonEvent(BaseEvent):
)
# useful for getting all tasks for asset
entities_query_by_parent_id = (
"select id, name, parent_id, link, custom_attributes from TypedContext"
task_entities_query_by_parent_id = (
"select id, name, parent_id, type_id from Task"
" where project_id is \"{}\" and parent_id in ({})"
)
task_types_query = (
"select id, name from Type"
)
entities_name_query_by_name = (
"select id, name from TypedContext"
" where project_id is \"{}\" and name in ({})"
@ -555,8 +558,16 @@ class SyncToAvalonEvent(BaseEvent):
ftrack_id = ftrack_id[0]
# task modified, collect parent id of task, handle separately
if entityType.lower() == 'task':
self.modified_tasks_ftrackids.add(ent_info["parentId"])
if entity_type.lower() == "task":
changes = ent_info.get("changes") or {}
if action == "move":
parent_changes = changes["parent_id"]
self.modified_tasks_ftrackids.add(parent_changes["new"])
self.modified_tasks_ftrackids.add(parent_changes["old"])
elif "typeid" in changes or "name" in changes:
self.modified_tasks_ftrackids.add(ent_info["parentId"])
continue
if action == "move":
ent_keys = ent_info["keys"]
@ -565,31 +576,15 @@ class SyncToAvalonEvent(BaseEvent):
_ent_info = ent_info.copy()
for ent_key in ent_keys:
if ent_key == "parent_id":
# task parents modified, collect both
if entityType.lower() == 'task':
self.modified_tasks_ftrackids.add(
ent_info["changes"]["new"])
self.modified_tasks_ftrackids.add(
ent_info["changes"]["old"])
_ent_info["changes"].pop(ent_key, None)
_ent_info["keys"].remove(ent_key)
else:
ent_info["changes"].pop(ent_key, None)
ent_info["keys"].remove(ent_key)
if entityType.lower() != 'task':
entities_by_action["update"][ftrack_id] = _ent_info
else:
if entityType.lower() == 'task':
self.modified_tasks_ftrackids.add(
ent_info["changes"]["parent_id"]["new"])
self.modified_tasks_ftrackids.add(
ent_info["changes"]["parent_id"]["old"]
)
entities_by_action["update"][ftrack_id] = _ent_info
# regular change process handles all other than Tasks
if entityType.lower() != 'task':
found_actions.add(action)
entities_by_action[action][ftrack_id] = ent_info
found_actions.add(action)
entities_by_action[action][ftrack_id] = ent_info
found_actions = list(found_actions)
if not found_actions and not self.modified_tasks_ftrackids:
@ -631,9 +626,10 @@ class SyncToAvalonEvent(BaseEvent):
# skip most of events where nothing has changed for avalon
if (
len(found_actions) == 1 and
found_actions[0] == "update" and
not updated
len(found_actions) == 1
and found_actions[0] == "update"
and not updated
and not self.modified_tasks_ftrackids
):
return True
@ -673,12 +669,16 @@ class SyncToAvalonEvent(BaseEvent):
for action, _ftrack_ids in entities_by_action.items():
# skip updated (already prepared) and removed (not exist in ftrack)
if action not in ("remove", "update"):
ftrack_ids.union(set(_ftrack_ids))
ftrack_ids |= set(_ftrack_ids)
# collect entity records data which might not be in event
for entity in self._get_entities_for_ftrack_ids(ft_project["id"],
ftrack_ids):
self.ftrack_ents_by_id[entity["id"]] = entity
if ftrack_ids:
joined_ids = ", ".join(["\"{}\"".format(id) for id in ftrack_ids])
ftrack_entities = self.process_session.query(
self.entities_query_by_id.format(ft_project["id"], joined_ids)
).all()
for entity in ftrack_entities:
self.ftrack_ents_by_id[entity["id"]] = entity
# Filter updates where name is changing
for ftrack_id, ent_info in updated.items():
@ -739,13 +739,13 @@ class SyncToAvalonEvent(BaseEvent):
time_cleanup = time_7 - time_6
time_task_updates = time_8 - time_7
time_total = time_8 - time_1
self.log.debug(
"Process time: {:.2f} <{:.2f}, {:.2f}, {:.2f}, ".format(
time_total, time_removed, time_renamed, time_added) +
"{:.2f}, {:.2f}, {:.2f}, {:.2f}>".format(
time_moved, time_updated, time_cleanup, time_task_updates
)
)
self.log.debug((
"Process time: {:.2f} <{:.2f}, {:.2f}, {:.2f}, "
"{:.2f}, {:.2f}, {:.2f}, {:.2f}>"
).format(
time_total, time_removed, time_renamed, time_added,
time_moved, time_updated, time_cleanup, time_task_updates
))
except Exception:
msg = "An error has happened during synchronization"
@ -770,10 +770,10 @@ class SyncToAvalonEvent(BaseEvent):
recreate_ents = []
removed_names = []
for ftrack_id, removed in ent_infos.items():
if entity_type == "Task":
entity_type = removed["entity_type"]
if entity_type.lower() == "task":
continue
entity_type = removed["entity_type"]
removed_name = removed["changes"]["name"]["old"]
avalon_ent = self.avalon_ents_by_ftrack_id.get(ftrack_id)
@ -1095,15 +1095,8 @@ class SyncToAvalonEvent(BaseEvent):
)
)
# Tasks
tasks = {}
for child in ftrack_ent["children"]:
if child.entity_type.lower() != "task":
continue
self.log.debug("child:: {}".format(child))
task_type = self._get_task_type(self.cur_project["id"],
child['entityId'])
tasks[child["name"]] = {"type": task_type}
# Add entity to modified so tasks are added at the end
self.modified_tasks_ftrackids.add(ftrack_ent["id"])
# Visual Parent
vis_par = None
@ -1123,7 +1116,7 @@ class SyncToAvalonEvent(BaseEvent):
"entityType": ftrack_ent.entity_type,
"parents": parents,
"hierarchy": hierarchy,
"tasks": tasks,
"tasks": {},
"visualParent": vis_par
}
}
@ -2195,31 +2188,50 @@ class SyncToAvalonEvent(BaseEvent):
)
if not self.modified_tasks_ftrackids:
return
entities = self._get_entities_for_ftrack_ids(
self.cur_project["id"],
self.modified_tasks_ftrackids)
joined_ids = ", ".join([
"\"{}\"".format(ftrack_id)
for ftrack_id in self.modified_tasks_ftrackids
])
task_entities = self.process_session.query(
self.task_entities_query_by_parent_id.format(
self.cur_project["id"], joined_ids
)
).all()
ftrack_mongo_mapping_found = {}
not_found_ids = []
tasks_per_ftrack_id = {}
# Make sure all parents have updated tasks, as they may not have any
tasks_per_ftrack_id = {
ftrack_id: {}
for ftrack_id in self.modified_tasks_ftrackids
}
# Query all task types at once
task_types = self.process_session.query(self.task_types_query).all()
task_types_by_id = {
task_type["id"]: task_type
for task_type in task_types
}
# prepare all tasks per parentId, eg. Avalon asset record
for entity in entities:
ftrack_id = entity["parent_id"]
for task_entity in task_entities:
task_type = task_types_by_id[task_entity["type_id"]]
ftrack_id = task_entity["parent_id"]
if ftrack_id not in tasks_per_ftrack_id:
tasks_per_ftrack_id[ftrack_id] = {}
passed_regex = avalon_sync.check_regex(
entity["name"], "task",
schema_patterns=self.regex_schemas
)
task_entity["name"], "task",
schema_patterns=self.regex_schemas
)
if not passed_regex:
entity_id = entity["id"]
self.regex_failed.append(entity_id)
self.regex_failed.append(task_entity["id"])
continue
task = {"type": entity["type"]["name"]}
tasks_per_ftrack_id[ftrack_id][entity["name"]] = task
tasks_per_ftrack_id[ftrack_id][task_entity["name"]] = {
"type": task_type["name"]
}
# find avalon entity by parentId
# should be there as create was run first
@ -2230,8 +2242,10 @@ class SyncToAvalonEvent(BaseEvent):
continue
ftrack_mongo_mapping_found[ftrack_id] = avalon_entity["_id"]
self._update_avalon_tasks(ftrack_mongo_mapping_found,
tasks_per_ftrack_id)
self._update_avalon_tasks(
ftrack_mongo_mapping_found,
tasks_per_ftrack_id
)
def update_entities(self):
"""
@ -2429,8 +2443,9 @@ class SyncToAvalonEvent(BaseEvent):
)
return True
def _update_avalon_tasks(self, ftrack_mongo_mapping_found,
tasks_per_ftrack_id):
def _update_avalon_tasks(
self, ftrack_mongo_mapping_found, tasks_per_ftrack_id
):
"""
Prepare new "tasks" content for existing records in Avalon.
Args:
@ -2448,10 +2463,9 @@ class SyncToAvalonEvent(BaseEvent):
change_data = {"$set": {}}
change_data["$set"]["data.tasks"] = tasks_per_ftrack_id[ftrack_id]
mongo_changes_bulk.append(UpdateOne(filter, change_data))
if not mongo_changes_bulk:
return
self.dbcon.bulk_write(mongo_changes_bulk)
if mongo_changes_bulk:
self.dbcon.bulk_write(mongo_changes_bulk)
def _mongo_id_configuration(
self,
@ -2500,50 +2514,6 @@ class SyncToAvalonEvent(BaseEvent):
return mongo_id_configuration_id
def _get_task_type(self, project_id, entityId):
"""
Returns task type ('Props', 'Art') from Task 'entityId'.
Args:
project_id (string):
entityId (string): entityId of Task
Returns:
(string) - None if Task not found
"""
task_type = None
entity = self.process_session.query(
self.entities_query_by_id.format(
project_id, entityId
)
).first()
if entity:
task_type = entity["type"]["name"]
return task_type
def _get_entities_for_ftrack_ids(self, ft_project_id, ftrack_ids):
"""
Query Ftrack API and return all entities for particular
'ft_project' and their parent_id in 'ftrack_ids'.
It is much faster to run this once for multiple ids than run it
for each separately.
Used mainly for collecting task information
Args:
ft_project_id (string):
ftrack_ids (list): of strings
Returns:
(list) of Ftrack entities
"""
ftrack_entities = []
if ftrack_ids:
joined_ids = ", ".join(["\"{}\"".format(id) for id in ftrack_ids])
ftrack_entities = self.process_session.query(
self.entities_query_by_parent_id.format(ft_project_id,
joined_ids)
).all()
return ftrack_entities
def register(session, plugins_presets):
'''Register plugin. Called when used as an plugin.'''