From 043fed0d007f10435fdf434cfad085a41157932b Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 3 Nov 2021 17:16:08 +0100 Subject: [PATCH 1/3] replaced Queue with deque in event sync to avalon --- .../event_sync_to_avalon.py | 71 ++++++++++--------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/openpype/modules/default_modules/ftrack/event_handlers_server/event_sync_to_avalon.py b/openpype/modules/default_modules/ftrack/event_handlers_server/event_sync_to_avalon.py index 93a0404c0b..b4a6c517fa 100644 --- a/openpype/modules/default_modules/ftrack/event_handlers_server/event_sync_to_avalon.py +++ b/openpype/modules/default_modules/ftrack/event_handlers_server/event_sync_to_avalon.py @@ -1,8 +1,6 @@ -import os import collections import copy import json -import queue import time import datetime import atexit @@ -340,13 +338,13 @@ class SyncToAvalonEvent(BaseEvent): self._avalon_archived_by_id[mongo_id] = entity def _bubble_changeability(self, unchangeable_ids): - unchangeable_queue = queue.Queue() + unchangeable_queue = collections.deque() for entity_id in unchangeable_ids: - unchangeable_queue.put((entity_id, False)) + unchangeable_queue.append((entity_id, False)) processed_parents_ids = [] - while not unchangeable_queue.empty(): - entity_id, child_is_archived = unchangeable_queue.get() + while unchangeable_queue: + entity_id, child_is_archived = unchangeable_queue.popleft() # skip if already processed if entity_id in processed_parents_ids: continue @@ -388,7 +386,7 @@ class SyncToAvalonEvent(BaseEvent): parent_id = entity["data"]["visualParent"] if parent_id is None: continue - unchangeable_queue.put((parent_id, child_is_archived)) + unchangeable_queue.append((parent_id, child_is_archived)) def reset_variables(self): """Reset variables so each event callback has clear env.""" @@ -1050,7 +1048,7 @@ class SyncToAvalonEvent(BaseEvent): key=(lambda entity: len(entity["link"])) ) - children_queue = queue.Queue() + children_queue = collections.deque() for entity in synchronizable_ents: parent_avalon_ent = self.avalon_ents_by_ftrack_id[ entity["parent_id"] @@ -1060,10 +1058,10 @@ class SyncToAvalonEvent(BaseEvent): for child in entity["children"]: if child.entity_type.lower() == "task": continue - children_queue.put(child) + children_queue.append(child) - while not children_queue.empty(): - entity = children_queue.get() + while children_queue: + entity = children_queue.popleft() ftrack_id = entity["id"] name = entity["name"] ent_by_ftrack_id = self.avalon_ents_by_ftrack_id.get(ftrack_id) @@ -1093,7 +1091,7 @@ class SyncToAvalonEvent(BaseEvent): for child in entity["children"]: if child.entity_type.lower() == "task": continue - children_queue.put(child) + children_queue.append(child) def create_entity_in_avalon(self, ftrack_ent, parent_avalon): proj, ents = self.avalon_entities @@ -1278,7 +1276,7 @@ class SyncToAvalonEvent(BaseEvent): "Processing renamed entities: {}".format(str(ent_infos)) ) - changeable_queue = queue.Queue() + changeable_queue = collections.deque() for ftrack_id, ent_info in ent_infos.items(): entity_type = ent_info["entity_type"] if entity_type == "Task": @@ -1306,7 +1304,7 @@ class SyncToAvalonEvent(BaseEvent): mongo_id = avalon_ent["_id"] if self.changeability_by_mongo_id[mongo_id]: - changeable_queue.put((ftrack_id, avalon_ent, new_name)) + changeable_queue.append((ftrack_id, avalon_ent, new_name)) else: ftrack_ent = self.ftrack_ents_by_id[ftrack_id] ftrack_ent["name"] = avalon_ent["name"] @@ -1348,8 +1346,8 @@ class SyncToAvalonEvent(BaseEvent): old_names = [] # Process renaming in Avalon DB - while not changeable_queue.empty(): - ftrack_id, avalon_ent, new_name = changeable_queue.get() + while changeable_queue: + ftrack_id, avalon_ent, new_name = changeable_queue.popleft() mongo_id = avalon_ent["_id"] old_name = avalon_ent["name"] @@ -1390,13 +1388,13 @@ class SyncToAvalonEvent(BaseEvent): # - it's name may be changed in next iteration same_name_ftrack_id = same_name_avalon_ent["data"]["ftrackId"] same_is_unprocessed = False - for item in list(changeable_queue.queue): + for item in changeable_queue: if same_name_ftrack_id == item[0]: same_is_unprocessed = True break if same_is_unprocessed: - changeable_queue.put((ftrack_id, avalon_ent, new_name)) + changeable_queue.append((ftrack_id, avalon_ent, new_name)) continue self.duplicated.append(ftrack_id) @@ -2008,12 +2006,12 @@ class SyncToAvalonEvent(BaseEvent): # ftrack_parenting = collections.defaultdict(list) entities_dict = collections.defaultdict(dict) - children_queue = queue.Queue() - parent_queue = queue.Queue() + children_queue = collections.deque() + parent_queue = collections.deque() for mongo_id in hier_cust_attrs_ids: avalon_ent = self.avalon_ents_by_id[mongo_id] - parent_queue.put(avalon_ent) + parent_queue.append(avalon_ent) ftrack_id = avalon_ent["data"]["ftrackId"] if ftrack_id not in entities_dict: entities_dict[ftrack_id] = { @@ -2040,10 +2038,10 @@ class SyncToAvalonEvent(BaseEvent): entities_dict[_ftrack_id]["parent_id"] = ftrack_id if _ftrack_id not in entities_dict[ftrack_id]["children"]: entities_dict[ftrack_id]["children"].append(_ftrack_id) - children_queue.put(children_ent) + children_queue.append(children_ent) - while not children_queue.empty(): - avalon_ent = children_queue.get() + while children_queue: + avalon_ent = children_queue.popleft() mongo_id = avalon_ent["_id"] ftrack_id = avalon_ent["data"]["ftrackId"] if ftrack_id in cust_attrs_ftrack_ids: @@ -2066,10 +2064,10 @@ class SyncToAvalonEvent(BaseEvent): entities_dict[_ftrack_id]["parent_id"] = ftrack_id if _ftrack_id not in entities_dict[ftrack_id]["children"]: entities_dict[ftrack_id]["children"].append(_ftrack_id) - children_queue.put(children_ent) + children_queue.append(children_ent) - while not parent_queue.empty(): - avalon_ent = parent_queue.get() + while parent_queue: + avalon_ent = parent_queue.popleft() if avalon_ent["type"].lower() == "project": continue @@ -2100,7 +2098,7 @@ class SyncToAvalonEvent(BaseEvent): # if ftrack_id not in ftrack_parenting[parent_ftrack_id]: # ftrack_parenting[parent_ftrack_id].append(ftrack_id) - parent_queue.put(parent_ent) + parent_queue.append(parent_ent) # Prepare values to query configuration_ids = set() @@ -2174,11 +2172,13 @@ class SyncToAvalonEvent(BaseEvent): if value is not None: project_values[key] = value - hier_down_queue = queue.Queue() - hier_down_queue.put((project_values, ftrack_project_id)) + hier_down_queue = collections.deque() + hier_down_queue.append( + (project_values, ftrack_project_id) + ) - while not hier_down_queue.empty(): - hier_values, parent_id = hier_down_queue.get() + while hier_down_queue: + hier_values, parent_id = hier_down_queue.popleft() for child_id in entities_dict[parent_id]["children"]: _hier_values = hier_values.copy() for name in hier_cust_attrs_keys: @@ -2187,7 +2187,7 @@ class SyncToAvalonEvent(BaseEvent): _hier_values[name] = value entities_dict[child_id]["hier_attrs"].update(_hier_values) - hier_down_queue.put((_hier_values, child_id)) + hier_down_queue.append((_hier_values, child_id)) ftrack_mongo_mapping = {} for mongo_id, ftrack_id in mongo_ftrack_mapping.items(): @@ -2302,11 +2302,12 @@ class SyncToAvalonEvent(BaseEvent): """ mongo_changes_bulk = [] for mongo_id, changes in self.updates.items(): - filter = {"_id": mongo_id} avalon_ent = self.avalon_ents_by_id[mongo_id] is_project = avalon_ent["type"] == "project" change_data = avalon_sync.from_dict_to_set(changes, is_project) - mongo_changes_bulk.append(UpdateOne(filter, change_data)) + mongo_changes_bulk.append( + UpdateOne({"_id": mongo_id}, change_data) + ) if not mongo_changes_bulk: return From 0094e956ffcbd14635f70cde2d90af8d21659356 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 3 Nov 2021 17:16:24 +0100 Subject: [PATCH 2/3] replaced Queue with deque in delete asset action --- .../ftrack/event_handlers_user/action_delete_asset.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/openpype/modules/default_modules/ftrack/event_handlers_user/action_delete_asset.py b/openpype/modules/default_modules/ftrack/event_handlers_user/action_delete_asset.py index f860065b26..d3cc0ad971 100644 --- a/openpype/modules/default_modules/ftrack/event_handlers_user/action_delete_asset.py +++ b/openpype/modules/default_modules/ftrack/event_handlers_user/action_delete_asset.py @@ -1,7 +1,6 @@ import collections import uuid from datetime import datetime -from queue import Queue from bson.objectid import ObjectId from openpype_modules.ftrack.lib import BaseAction, statics_icon @@ -473,12 +472,12 @@ class DeleteAssetSubset(BaseAction): continue ftrack_ids_to_delete.append(ftrack_id) - children_queue = Queue() + children_queue = collections.deque() for mongo_id in assets_to_delete: - children_queue.put(mongo_id) + children_queue.append(mongo_id) - while not children_queue.empty(): - mongo_id = children_queue.get() + while children_queue: + mongo_id = children_queue.popleft() if mongo_id in asset_ids_to_archive: continue @@ -494,7 +493,7 @@ class DeleteAssetSubset(BaseAction): for child in children: child_id = child["_id"] if child_id not in asset_ids_to_archive: - children_queue.put(child_id) + children_queue.append(child_id) # Prepare names of assets in ftrack and ids of subsets in mongo asset_names_to_delete = [] From 9b0dc76c2e139b75ad4199ad3a0cec1c06a5b2f7 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Wed, 3 Nov 2021 17:16:41 +0100 Subject: [PATCH 3/3] replaced Queue with deque in avalon sync lib --- .../default_modules/ftrack/lib/avalon_sync.py | 143 +++++++++--------- 1 file changed, 74 insertions(+), 69 deletions(-) diff --git a/openpype/modules/default_modules/ftrack/lib/avalon_sync.py b/openpype/modules/default_modules/ftrack/lib/avalon_sync.py index 2458308af5..1667031f29 100644 --- a/openpype/modules/default_modules/ftrack/lib/avalon_sync.py +++ b/openpype/modules/default_modules/ftrack/lib/avalon_sync.py @@ -6,11 +6,6 @@ import copy import six -if six.PY3: - from queue import Queue -else: - from Queue import Queue - from avalon.api import AvalonMongoDB import avalon @@ -146,11 +141,11 @@ def from_dict_to_set(data, is_project): data.pop("data") result = {"$set": {}} - dict_queue = Queue() - dict_queue.put((None, data)) + dict_queue = collections.deque() + dict_queue.append((None, data)) - while not dict_queue.empty(): - _key, _data = dict_queue.get() + while dict_queue: + _key, _data = dict_queue.popleft() for key, value in _data.items(): new_key = key if _key is not None: @@ -160,7 +155,7 @@ def from_dict_to_set(data, is_project): (isinstance(value, dict) and not bool(value)): # empty dic result["$set"][new_key] = value continue - dict_queue.put((new_key, value)) + dict_queue.append((new_key, value)) if task_changes is not not_set and task_changes_key: result["$set"][task_changes_key] = task_changes @@ -714,7 +709,7 @@ class SyncEntitiesFactory: self.filter_by_duplicate_regex() def filter_by_duplicate_regex(self): - filter_queue = Queue() + filter_queue = collections.deque() failed_regex_msg = "{} - Entity has invalid symbols in the name" duplicate_msg = "There are multiple entities with the name: \"{}\":" @@ -722,18 +717,18 @@ class SyncEntitiesFactory: for id in ids: ent_path = self.get_ent_path(id) self.log.warning(failed_regex_msg.format(ent_path)) - filter_queue.put(id) + filter_queue.append(id) for name, ids in self.duplicates.items(): self.log.warning(duplicate_msg.format(name)) for id in ids: ent_path = self.get_ent_path(id) self.log.warning(ent_path) - filter_queue.put(id) + filter_queue.append(id) filtered_ids = [] - while not filter_queue.empty(): - ftrack_id = filter_queue.get() + while filter_queue: + ftrack_id = filter_queue.popleft() if ftrack_id in filtered_ids: continue @@ -749,7 +744,7 @@ class SyncEntitiesFactory: filtered_ids.append(ftrack_id) for child_id in entity_dict.get("children", []): - filter_queue.put(child_id) + filter_queue.append(child_id) for name, ids in self.tasks_failed_regex.items(): for id in ids: @@ -768,10 +763,10 @@ class SyncEntitiesFactory: ) == "_notset_": return - self.filter_queue = Queue() - self.filter_queue.put((self.ft_project_id, False)) - while not self.filter_queue.empty(): - parent_id, remove = self.filter_queue.get() + filter_queue = collections.deque() + filter_queue.append((self.ft_project_id, False)) + while filter_queue: + parent_id, remove = filter_queue.popleft() if remove: parent_dict = self.entities_dict.pop(parent_id, {}) self.all_filtered_entities[parent_id] = parent_dict @@ -790,7 +785,7 @@ class SyncEntitiesFactory: child_id ) _remove = True - self.filter_queue.put((child_id, _remove)) + filter_queue.append((child_id, _remove)) def filter_by_selection(self, event): # BUGGY!!!! cause that entities are in deleted list @@ -805,47 +800,51 @@ class SyncEntitiesFactory: selected_ids.append(entity["entityId"]) sync_ids = [self.ft_project_id] - parents_queue = Queue() - children_queue = Queue() - for id in selected_ids: + parents_queue = collections.deque() + children_queue = collections.deque() + for selected_id in selected_ids: # skip if already filtered with ignore sync custom attribute - if id in self.filtered_ids: + if selected_id in self.filtered_ids: continue - parents_queue.put(id) - children_queue.put(id) + parents_queue.append(selected_id) + children_queue.append(selected_id) - while not parents_queue.empty(): - id = parents_queue.get() + while parents_queue: + ftrack_id = parents_queue.popleft() while True: # Stops when parent is in sync_ids - if id in self.filtered_ids or id in sync_ids or id is None: + if ( + ftrack_id in self.filtered_ids + or ftrack_id in sync_ids + or ftrack_id is None + ): break - sync_ids.append(id) - id = self.entities_dict[id]["parent_id"] + sync_ids.append(ftrack_id) + ftrack_id = self.entities_dict[ftrack_id]["parent_id"] - while not children_queue.empty(): - parent_id = children_queue.get() + while children_queue: + parent_id = children_queue.popleft() for child_id in self.entities_dict[parent_id]["children"]: if child_id in sync_ids or child_id in self.filtered_ids: continue sync_ids.append(child_id) - children_queue.put(child_id) + children_queue.append(child_id) # separate not selected and to process entities for key, value in self.entities_dict.items(): if key not in sync_ids: self.not_selected_ids.append(key) - for id in self.not_selected_ids: + for ftrack_id in self.not_selected_ids: # pop from entities - value = self.entities_dict.pop(id) + value = self.entities_dict.pop(ftrack_id) # remove entity from parent's children parent_id = value["parent_id"] if parent_id not in sync_ids: continue - self.entities_dict[parent_id]["children"].remove(id) + self.entities_dict[parent_id]["children"].remove(ftrack_id) def _query_custom_attributes(self, session, conf_ids, entity_ids): output = [] @@ -1117,11 +1116,11 @@ class SyncEntitiesFactory: if value is not None: project_values[key] = value - hier_down_queue = Queue() - hier_down_queue.put((project_values, top_id)) + hier_down_queue = collections.deque() + hier_down_queue.append((project_values, top_id)) - while not hier_down_queue.empty(): - hier_values, parent_id = hier_down_queue.get() + while hier_down_queue: + hier_values, parent_id = hier_down_queue.popleft() for child_id in self.entities_dict[parent_id]["children"]: _hier_values = copy.deepcopy(hier_values) for key in attributes_by_key.keys(): @@ -1134,7 +1133,7 @@ class SyncEntitiesFactory: _hier_values[key] = value self.entities_dict[child_id]["hier_attrs"].update(_hier_values) - hier_down_queue.put((_hier_values, child_id)) + hier_down_queue.append((_hier_values, child_id)) def remove_from_archived(self, mongo_id): entity = self.avalon_archived_by_id.pop(mongo_id, None) @@ -1303,15 +1302,15 @@ class SyncEntitiesFactory: create_ftrack_ids.append(self.ft_project_id) # make it go hierarchically - prepare_queue = Queue() + prepare_queue = collections.deque() for child_id in self.entities_dict[self.ft_project_id]["children"]: - prepare_queue.put(child_id) + prepare_queue.append(child_id) - while not prepare_queue.empty(): - ftrack_id = prepare_queue.get() + while prepare_queue: + ftrack_id = prepare_queue.popleft() for child_id in self.entities_dict[ftrack_id]["children"]: - prepare_queue.put(child_id) + prepare_queue.append(child_id) entity_dict = self.entities_dict[ftrack_id] ent_path = self.get_ent_path(ftrack_id) @@ -1426,25 +1425,25 @@ class SyncEntitiesFactory: parent_id = ent_dict["parent_id"] self.entities_dict[parent_id]["children"].remove(ftrack_id) - children_queue = Queue() - children_queue.put(ftrack_id) - while not children_queue.empty(): - _ftrack_id = children_queue.get() + children_queue = collections.deque() + children_queue.append(ftrack_id) + while children_queue: + _ftrack_id = children_queue.popleft() entity_dict = self.entities_dict.pop(_ftrack_id, {"children": []}) for child_id in entity_dict["children"]: - children_queue.put(child_id) + children_queue.append(child_id) def prepare_changes(self): self.log.debug("* Preparing changes for avalon/ftrack") hierarchy_changing_ids = [] ignore_keys = collections.defaultdict(list) - update_queue = Queue() + update_queue = collections.deque() for ftrack_id in self.update_ftrack_ids: - update_queue.put(ftrack_id) + update_queue.append(ftrack_id) - while not update_queue.empty(): - ftrack_id = update_queue.get() + while update_queue: + ftrack_id = update_queue.popleft() if ftrack_id == self.ft_project_id: changes = self.prepare_project_changes() if changes: @@ -1720,7 +1719,7 @@ class SyncEntitiesFactory: new_entity_id = self.create_ftrack_ent_from_avalon_ent( av_entity, parent_id ) - update_queue.put(new_entity_id) + update_queue.append(new_entity_id) if new_entity_id: ftrack_ent_dict["entity"]["parent_id"] = new_entity_id @@ -2024,14 +2023,14 @@ class SyncEntitiesFactory: entity["custom_attributes"][CUST_ATTR_ID_KEY] = str(new_id) def _bubble_changeability(self, unchangeable_ids): - unchangeable_queue = Queue() + unchangeable_queue = collections.deque() for entity_id in unchangeable_ids: - unchangeable_queue.put((entity_id, False)) + unchangeable_queue.append((entity_id, False)) processed_parents_ids = [] subsets_to_remove = [] - while not unchangeable_queue.empty(): - entity_id, child_is_archived = unchangeable_queue.get() + while unchangeable_queue: + entity_id, child_is_archived = unchangeable_queue.popleft() # skip if already processed if entity_id in processed_parents_ids: continue @@ -2067,7 +2066,9 @@ class SyncEntitiesFactory: parent_id = entity["data"]["visualParent"] if parent_id is None: continue - unchangeable_queue.put((str(parent_id), child_is_archived)) + unchangeable_queue.append( + (str(parent_id), child_is_archived) + ) self._delete_subsets_without_asset(subsets_to_remove) @@ -2150,16 +2151,18 @@ class SyncEntitiesFactory: self.dbcon.bulk_write(mongo_changes_bulk) def reload_parents(self, hierarchy_changing_ids): - parents_queue = Queue() - parents_queue.put((self.ft_project_id, [], False)) - while not parents_queue.empty(): - ftrack_id, parent_parents, changed = parents_queue.get() + parents_queue = collections.deque() + parents_queue.append((self.ft_project_id, [], False)) + while parents_queue: + ftrack_id, parent_parents, changed = parents_queue.popleft() _parents = copy.deepcopy(parent_parents) if ftrack_id not in hierarchy_changing_ids and not changed: if ftrack_id != self.ft_project_id: _parents.append(self.entities_dict[ftrack_id]["name"]) for child_id in self.entities_dict[ftrack_id]["children"]: - parents_queue.put((child_id, _parents, changed)) + parents_queue.append( + (child_id, _parents, changed) + ) continue changed = True @@ -2170,7 +2173,9 @@ class SyncEntitiesFactory: _parents.append(self.entities_dict[ftrack_id]["name"]) for child_id in self.entities_dict[ftrack_id]["children"]: - parents_queue.put((child_id, _parents, changed)) + parents_queue.append( + (child_id, _parents, changed) + ) if ftrack_id in self.create_ftrack_ids: mongo_id = self.ftrack_avalon_mapper[ftrack_id]