Merge pull request #2204 from pypeclub/feature/replace_queue_with_deque

Ftrack: Replace Queue with deque in event handlers logic
This commit is contained in:
Jakub Trllo 2021-11-03 18:02:29 +01:00 committed by GitHub
commit afedaf6c2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 110 deletions

View file

@ -1,8 +1,6 @@
import os
import collections
import copy
import json
import queue
import time
import datetime
import atexit
@ -340,13 +338,13 @@ class SyncToAvalonEvent(BaseEvent):
self._avalon_archived_by_id[mongo_id] = entity
def _bubble_changeability(self, unchangeable_ids):
unchangeable_queue = queue.Queue()
unchangeable_queue = collections.deque()
for entity_id in unchangeable_ids:
unchangeable_queue.put((entity_id, False))
unchangeable_queue.append((entity_id, False))
processed_parents_ids = []
while not unchangeable_queue.empty():
entity_id, child_is_archived = unchangeable_queue.get()
while unchangeable_queue:
entity_id, child_is_archived = unchangeable_queue.popleft()
# skip if already processed
if entity_id in processed_parents_ids:
continue
@ -388,7 +386,7 @@ class SyncToAvalonEvent(BaseEvent):
parent_id = entity["data"]["visualParent"]
if parent_id is None:
continue
unchangeable_queue.put((parent_id, child_is_archived))
unchangeable_queue.append((parent_id, child_is_archived))
def reset_variables(self):
"""Reset variables so each event callback has clear env."""
@ -1050,7 +1048,7 @@ class SyncToAvalonEvent(BaseEvent):
key=(lambda entity: len(entity["link"]))
)
children_queue = queue.Queue()
children_queue = collections.deque()
for entity in synchronizable_ents:
parent_avalon_ent = self.avalon_ents_by_ftrack_id[
entity["parent_id"]
@ -1060,10 +1058,10 @@ class SyncToAvalonEvent(BaseEvent):
for child in entity["children"]:
if child.entity_type.lower() == "task":
continue
children_queue.put(child)
children_queue.append(child)
while not children_queue.empty():
entity = children_queue.get()
while children_queue:
entity = children_queue.popleft()
ftrack_id = entity["id"]
name = entity["name"]
ent_by_ftrack_id = self.avalon_ents_by_ftrack_id.get(ftrack_id)
@ -1093,7 +1091,7 @@ class SyncToAvalonEvent(BaseEvent):
for child in entity["children"]:
if child.entity_type.lower() == "task":
continue
children_queue.put(child)
children_queue.append(child)
def create_entity_in_avalon(self, ftrack_ent, parent_avalon):
proj, ents = self.avalon_entities
@ -1278,7 +1276,7 @@ class SyncToAvalonEvent(BaseEvent):
"Processing renamed entities: {}".format(str(ent_infos))
)
changeable_queue = queue.Queue()
changeable_queue = collections.deque()
for ftrack_id, ent_info in ent_infos.items():
entity_type = ent_info["entity_type"]
if entity_type == "Task":
@ -1306,7 +1304,7 @@ class SyncToAvalonEvent(BaseEvent):
mongo_id = avalon_ent["_id"]
if self.changeability_by_mongo_id[mongo_id]:
changeable_queue.put((ftrack_id, avalon_ent, new_name))
changeable_queue.append((ftrack_id, avalon_ent, new_name))
else:
ftrack_ent = self.ftrack_ents_by_id[ftrack_id]
ftrack_ent["name"] = avalon_ent["name"]
@ -1348,8 +1346,8 @@ class SyncToAvalonEvent(BaseEvent):
old_names = []
# Process renaming in Avalon DB
while not changeable_queue.empty():
ftrack_id, avalon_ent, new_name = changeable_queue.get()
while changeable_queue:
ftrack_id, avalon_ent, new_name = changeable_queue.popleft()
mongo_id = avalon_ent["_id"]
old_name = avalon_ent["name"]
@ -1390,13 +1388,13 @@ class SyncToAvalonEvent(BaseEvent):
# - it's name may be changed in next iteration
same_name_ftrack_id = same_name_avalon_ent["data"]["ftrackId"]
same_is_unprocessed = False
for item in list(changeable_queue.queue):
for item in changeable_queue:
if same_name_ftrack_id == item[0]:
same_is_unprocessed = True
break
if same_is_unprocessed:
changeable_queue.put((ftrack_id, avalon_ent, new_name))
changeable_queue.append((ftrack_id, avalon_ent, new_name))
continue
self.duplicated.append(ftrack_id)
@ -2008,12 +2006,12 @@ class SyncToAvalonEvent(BaseEvent):
# ftrack_parenting = collections.defaultdict(list)
entities_dict = collections.defaultdict(dict)
children_queue = queue.Queue()
parent_queue = queue.Queue()
children_queue = collections.deque()
parent_queue = collections.deque()
for mongo_id in hier_cust_attrs_ids:
avalon_ent = self.avalon_ents_by_id[mongo_id]
parent_queue.put(avalon_ent)
parent_queue.append(avalon_ent)
ftrack_id = avalon_ent["data"]["ftrackId"]
if ftrack_id not in entities_dict:
entities_dict[ftrack_id] = {
@ -2040,10 +2038,10 @@ class SyncToAvalonEvent(BaseEvent):
entities_dict[_ftrack_id]["parent_id"] = ftrack_id
if _ftrack_id not in entities_dict[ftrack_id]["children"]:
entities_dict[ftrack_id]["children"].append(_ftrack_id)
children_queue.put(children_ent)
children_queue.append(children_ent)
while not children_queue.empty():
avalon_ent = children_queue.get()
while children_queue:
avalon_ent = children_queue.popleft()
mongo_id = avalon_ent["_id"]
ftrack_id = avalon_ent["data"]["ftrackId"]
if ftrack_id in cust_attrs_ftrack_ids:
@ -2066,10 +2064,10 @@ class SyncToAvalonEvent(BaseEvent):
entities_dict[_ftrack_id]["parent_id"] = ftrack_id
if _ftrack_id not in entities_dict[ftrack_id]["children"]:
entities_dict[ftrack_id]["children"].append(_ftrack_id)
children_queue.put(children_ent)
children_queue.append(children_ent)
while not parent_queue.empty():
avalon_ent = parent_queue.get()
while parent_queue:
avalon_ent = parent_queue.popleft()
if avalon_ent["type"].lower() == "project":
continue
@ -2100,7 +2098,7 @@ class SyncToAvalonEvent(BaseEvent):
# if ftrack_id not in ftrack_parenting[parent_ftrack_id]:
# ftrack_parenting[parent_ftrack_id].append(ftrack_id)
parent_queue.put(parent_ent)
parent_queue.append(parent_ent)
# Prepare values to query
configuration_ids = set()
@ -2174,11 +2172,13 @@ class SyncToAvalonEvent(BaseEvent):
if value is not None:
project_values[key] = value
hier_down_queue = queue.Queue()
hier_down_queue.put((project_values, ftrack_project_id))
hier_down_queue = collections.deque()
hier_down_queue.append(
(project_values, ftrack_project_id)
)
while not hier_down_queue.empty():
hier_values, parent_id = hier_down_queue.get()
while hier_down_queue:
hier_values, parent_id = hier_down_queue.popleft()
for child_id in entities_dict[parent_id]["children"]:
_hier_values = hier_values.copy()
for name in hier_cust_attrs_keys:
@ -2187,7 +2187,7 @@ class SyncToAvalonEvent(BaseEvent):
_hier_values[name] = value
entities_dict[child_id]["hier_attrs"].update(_hier_values)
hier_down_queue.put((_hier_values, child_id))
hier_down_queue.append((_hier_values, child_id))
ftrack_mongo_mapping = {}
for mongo_id, ftrack_id in mongo_ftrack_mapping.items():
@ -2302,11 +2302,12 @@ class SyncToAvalonEvent(BaseEvent):
"""
mongo_changes_bulk = []
for mongo_id, changes in self.updates.items():
filter = {"_id": mongo_id}
avalon_ent = self.avalon_ents_by_id[mongo_id]
is_project = avalon_ent["type"] == "project"
change_data = avalon_sync.from_dict_to_set(changes, is_project)
mongo_changes_bulk.append(UpdateOne(filter, change_data))
mongo_changes_bulk.append(
UpdateOne({"_id": mongo_id}, change_data)
)
if not mongo_changes_bulk:
return

View file

@ -1,7 +1,6 @@
import collections
import uuid
from datetime import datetime
from queue import Queue
from bson.objectid import ObjectId
from openpype_modules.ftrack.lib import BaseAction, statics_icon
@ -473,12 +472,12 @@ class DeleteAssetSubset(BaseAction):
continue
ftrack_ids_to_delete.append(ftrack_id)
children_queue = Queue()
children_queue = collections.deque()
for mongo_id in assets_to_delete:
children_queue.put(mongo_id)
children_queue.append(mongo_id)
while not children_queue.empty():
mongo_id = children_queue.get()
while children_queue:
mongo_id = children_queue.popleft()
if mongo_id in asset_ids_to_archive:
continue
@ -494,7 +493,7 @@ class DeleteAssetSubset(BaseAction):
for child in children:
child_id = child["_id"]
if child_id not in asset_ids_to_archive:
children_queue.put(child_id)
children_queue.append(child_id)
# Prepare names of assets in ftrack and ids of subsets in mongo
asset_names_to_delete = []

View file

@ -6,11 +6,6 @@ import copy
import six
if six.PY3:
from queue import Queue
else:
from Queue import Queue
from avalon.api import AvalonMongoDB
import avalon
@ -146,11 +141,11 @@ def from_dict_to_set(data, is_project):
data.pop("data")
result = {"$set": {}}
dict_queue = Queue()
dict_queue.put((None, data))
dict_queue = collections.deque()
dict_queue.append((None, data))
while not dict_queue.empty():
_key, _data = dict_queue.get()
while dict_queue:
_key, _data = dict_queue.popleft()
for key, value in _data.items():
new_key = key
if _key is not None:
@ -160,7 +155,7 @@ def from_dict_to_set(data, is_project):
(isinstance(value, dict) and not bool(value)): # empty dic
result["$set"][new_key] = value
continue
dict_queue.put((new_key, value))
dict_queue.append((new_key, value))
if task_changes is not not_set and task_changes_key:
result["$set"][task_changes_key] = task_changes
@ -714,7 +709,7 @@ class SyncEntitiesFactory:
self.filter_by_duplicate_regex()
def filter_by_duplicate_regex(self):
filter_queue = Queue()
filter_queue = collections.deque()
failed_regex_msg = "{} - Entity has invalid symbols in the name"
duplicate_msg = "There are multiple entities with the name: \"{}\":"
@ -722,18 +717,18 @@ class SyncEntitiesFactory:
for id in ids:
ent_path = self.get_ent_path(id)
self.log.warning(failed_regex_msg.format(ent_path))
filter_queue.put(id)
filter_queue.append(id)
for name, ids in self.duplicates.items():
self.log.warning(duplicate_msg.format(name))
for id in ids:
ent_path = self.get_ent_path(id)
self.log.warning(ent_path)
filter_queue.put(id)
filter_queue.append(id)
filtered_ids = []
while not filter_queue.empty():
ftrack_id = filter_queue.get()
while filter_queue:
ftrack_id = filter_queue.popleft()
if ftrack_id in filtered_ids:
continue
@ -749,7 +744,7 @@ class SyncEntitiesFactory:
filtered_ids.append(ftrack_id)
for child_id in entity_dict.get("children", []):
filter_queue.put(child_id)
filter_queue.append(child_id)
for name, ids in self.tasks_failed_regex.items():
for id in ids:
@ -768,10 +763,10 @@ class SyncEntitiesFactory:
) == "_notset_":
return
self.filter_queue = Queue()
self.filter_queue.put((self.ft_project_id, False))
while not self.filter_queue.empty():
parent_id, remove = self.filter_queue.get()
filter_queue = collections.deque()
filter_queue.append((self.ft_project_id, False))
while filter_queue:
parent_id, remove = filter_queue.popleft()
if remove:
parent_dict = self.entities_dict.pop(parent_id, {})
self.all_filtered_entities[parent_id] = parent_dict
@ -790,7 +785,7 @@ class SyncEntitiesFactory:
child_id
)
_remove = True
self.filter_queue.put((child_id, _remove))
filter_queue.append((child_id, _remove))
def filter_by_selection(self, event):
# BUGGY!!!! cause that entities are in deleted list
@ -805,47 +800,51 @@ class SyncEntitiesFactory:
selected_ids.append(entity["entityId"])
sync_ids = [self.ft_project_id]
parents_queue = Queue()
children_queue = Queue()
for id in selected_ids:
parents_queue = collections.deque()
children_queue = collections.deque()
for selected_id in selected_ids:
# skip if already filtered with ignore sync custom attribute
if id in self.filtered_ids:
if selected_id in self.filtered_ids:
continue
parents_queue.put(id)
children_queue.put(id)
parents_queue.append(selected_id)
children_queue.append(selected_id)
while not parents_queue.empty():
id = parents_queue.get()
while parents_queue:
ftrack_id = parents_queue.popleft()
while True:
# Stops when parent is in sync_ids
if id in self.filtered_ids or id in sync_ids or id is None:
if (
ftrack_id in self.filtered_ids
or ftrack_id in sync_ids
or ftrack_id is None
):
break
sync_ids.append(id)
id = self.entities_dict[id]["parent_id"]
sync_ids.append(ftrack_id)
ftrack_id = self.entities_dict[ftrack_id]["parent_id"]
while not children_queue.empty():
parent_id = children_queue.get()
while children_queue:
parent_id = children_queue.popleft()
for child_id in self.entities_dict[parent_id]["children"]:
if child_id in sync_ids or child_id in self.filtered_ids:
continue
sync_ids.append(child_id)
children_queue.put(child_id)
children_queue.append(child_id)
# separate not selected and to process entities
for key, value in self.entities_dict.items():
if key not in sync_ids:
self.not_selected_ids.append(key)
for id in self.not_selected_ids:
for ftrack_id in self.not_selected_ids:
# pop from entities
value = self.entities_dict.pop(id)
value = self.entities_dict.pop(ftrack_id)
# remove entity from parent's children
parent_id = value["parent_id"]
if parent_id not in sync_ids:
continue
self.entities_dict[parent_id]["children"].remove(id)
self.entities_dict[parent_id]["children"].remove(ftrack_id)
def _query_custom_attributes(self, session, conf_ids, entity_ids):
output = []
@ -1117,11 +1116,11 @@ class SyncEntitiesFactory:
if value is not None:
project_values[key] = value
hier_down_queue = Queue()
hier_down_queue.put((project_values, top_id))
hier_down_queue = collections.deque()
hier_down_queue.append((project_values, top_id))
while not hier_down_queue.empty():
hier_values, parent_id = hier_down_queue.get()
while hier_down_queue:
hier_values, parent_id = hier_down_queue.popleft()
for child_id in self.entities_dict[parent_id]["children"]:
_hier_values = copy.deepcopy(hier_values)
for key in attributes_by_key.keys():
@ -1134,7 +1133,7 @@ class SyncEntitiesFactory:
_hier_values[key] = value
self.entities_dict[child_id]["hier_attrs"].update(_hier_values)
hier_down_queue.put((_hier_values, child_id))
hier_down_queue.append((_hier_values, child_id))
def remove_from_archived(self, mongo_id):
entity = self.avalon_archived_by_id.pop(mongo_id, None)
@ -1303,15 +1302,15 @@ class SyncEntitiesFactory:
create_ftrack_ids.append(self.ft_project_id)
# make it go hierarchically
prepare_queue = Queue()
prepare_queue = collections.deque()
for child_id in self.entities_dict[self.ft_project_id]["children"]:
prepare_queue.put(child_id)
prepare_queue.append(child_id)
while not prepare_queue.empty():
ftrack_id = prepare_queue.get()
while prepare_queue:
ftrack_id = prepare_queue.popleft()
for child_id in self.entities_dict[ftrack_id]["children"]:
prepare_queue.put(child_id)
prepare_queue.append(child_id)
entity_dict = self.entities_dict[ftrack_id]
ent_path = self.get_ent_path(ftrack_id)
@ -1426,25 +1425,25 @@ class SyncEntitiesFactory:
parent_id = ent_dict["parent_id"]
self.entities_dict[parent_id]["children"].remove(ftrack_id)
children_queue = Queue()
children_queue.put(ftrack_id)
while not children_queue.empty():
_ftrack_id = children_queue.get()
children_queue = collections.deque()
children_queue.append(ftrack_id)
while children_queue:
_ftrack_id = children_queue.popleft()
entity_dict = self.entities_dict.pop(_ftrack_id, {"children": []})
for child_id in entity_dict["children"]:
children_queue.put(child_id)
children_queue.append(child_id)
def prepare_changes(self):
self.log.debug("* Preparing changes for avalon/ftrack")
hierarchy_changing_ids = []
ignore_keys = collections.defaultdict(list)
update_queue = Queue()
update_queue = collections.deque()
for ftrack_id in self.update_ftrack_ids:
update_queue.put(ftrack_id)
update_queue.append(ftrack_id)
while not update_queue.empty():
ftrack_id = update_queue.get()
while update_queue:
ftrack_id = update_queue.popleft()
if ftrack_id == self.ft_project_id:
changes = self.prepare_project_changes()
if changes:
@ -1720,7 +1719,7 @@ class SyncEntitiesFactory:
new_entity_id = self.create_ftrack_ent_from_avalon_ent(
av_entity, parent_id
)
update_queue.put(new_entity_id)
update_queue.append(new_entity_id)
if new_entity_id:
ftrack_ent_dict["entity"]["parent_id"] = new_entity_id
@ -2024,14 +2023,14 @@ class SyncEntitiesFactory:
entity["custom_attributes"][CUST_ATTR_ID_KEY] = str(new_id)
def _bubble_changeability(self, unchangeable_ids):
unchangeable_queue = Queue()
unchangeable_queue = collections.deque()
for entity_id in unchangeable_ids:
unchangeable_queue.put((entity_id, False))
unchangeable_queue.append((entity_id, False))
processed_parents_ids = []
subsets_to_remove = []
while not unchangeable_queue.empty():
entity_id, child_is_archived = unchangeable_queue.get()
while unchangeable_queue:
entity_id, child_is_archived = unchangeable_queue.popleft()
# skip if already processed
if entity_id in processed_parents_ids:
continue
@ -2067,7 +2066,9 @@ class SyncEntitiesFactory:
parent_id = entity["data"]["visualParent"]
if parent_id is None:
continue
unchangeable_queue.put((str(parent_id), child_is_archived))
unchangeable_queue.append(
(str(parent_id), child_is_archived)
)
self._delete_subsets_without_asset(subsets_to_remove)
@ -2150,16 +2151,18 @@ class SyncEntitiesFactory:
self.dbcon.bulk_write(mongo_changes_bulk)
def reload_parents(self, hierarchy_changing_ids):
parents_queue = Queue()
parents_queue.put((self.ft_project_id, [], False))
while not parents_queue.empty():
ftrack_id, parent_parents, changed = parents_queue.get()
parents_queue = collections.deque()
parents_queue.append((self.ft_project_id, [], False))
while parents_queue:
ftrack_id, parent_parents, changed = parents_queue.popleft()
_parents = copy.deepcopy(parent_parents)
if ftrack_id not in hierarchy_changing_ids and not changed:
if ftrack_id != self.ft_project_id:
_parents.append(self.entities_dict[ftrack_id]["name"])
for child_id in self.entities_dict[ftrack_id]["children"]:
parents_queue.put((child_id, _parents, changed))
parents_queue.append(
(child_id, _parents, changed)
)
continue
changed = True
@ -2170,7 +2173,9 @@ class SyncEntitiesFactory:
_parents.append(self.entities_dict[ftrack_id]["name"])
for child_id in self.entities_dict[ftrack_id]["children"]:
parents_queue.put((child_id, _parents, changed))
parents_queue.append(
(child_id, _parents, changed)
)
if ftrack_id in self.create_ftrack_ids:
mongo_id = self.ftrack_avalon_mapper[ftrack_id]