ayon-core/pype/modules/ftrack/lib/avalon_sync.py

2528 lines
93 KiB
Python

import collections
import copy
import json
import os
import queue
import re
import avalon
import avalon.api
import ftrack_api
from avalon.api import AvalonMongoDB
from avalon.vendor import toml
from bson.errors import InvalidId
from bson.objectid import ObjectId
from pymongo import UpdateOne
from pype.api import Logger, Anatomy
from pype.api import config
log = Logger().get_logger(__name__)
# Current schemas for avalon types
EntitySchemas = {
"project": "pype:project-2.1",
"asset": "pype:asset-3.0",
"config": "pype:config-1.1"
}
# Group name of custom attributes
CUST_ATTR_GROUP = "pype"
# name of Custom attribute that stores mongo_id from avalon db
CUST_ATTR_ID_KEY = "avalon_mongo_id"
CUST_ATTR_AUTO_SYNC = "avalon_auto_sync"
def default_custom_attributes_definition():
json_file_path = os.path.join(
os.path.dirname(__file__), "custom_attributes.json"
)
with open(json_file_path, "r") as json_stream:
data = json.load(json_stream)
return data
def check_regex(name, entity_type, in_schema=None, schema_patterns=None):
schema_name = "asset-3.0"
if in_schema:
schema_name = in_schema
elif entity_type == "project":
schema_name = "project-2.1"
elif entity_type == "task":
schema_name = "task"
name_pattern = None
if schema_patterns is not None:
name_pattern = schema_patterns.get(schema_name)
if not name_pattern:
default_pattern = "^[a-zA-Z0-9_.]*$"
schema_obj = avalon.schema._cache.get(schema_name + ".json")
if not schema_obj:
name_pattern = default_pattern
else:
name_pattern = (
schema_obj
.get("properties", {})
.get("name", {})
.get("pattern", default_pattern)
)
if schema_patterns is not None:
schema_patterns[schema_name] = name_pattern
if re.match(name_pattern, name):
return True
return False
def get_pype_attr(session, split_hierarchical=True):
custom_attributes = []
hier_custom_attributes = []
# TODO remove deprecated "avalon" group from query
cust_attrs_query = (
"select id, entity_type, object_type_id, is_hierarchical, default"
" from CustomAttributeConfiguration"
" where group.name in (\"avalon\", \"pype\")"
)
all_avalon_attr = session.query(cust_attrs_query).all()
for cust_attr in all_avalon_attr:
if split_hierarchical and cust_attr["is_hierarchical"]:
hier_custom_attributes.append(cust_attr)
continue
custom_attributes.append(cust_attr)
if split_hierarchical:
# return tuple
return custom_attributes, hier_custom_attributes
return custom_attributes
def from_dict_to_set(data, is_project):
"""
Converts 'data' into $set part of MongoDB update command.
Sets new or modified keys.
Tasks are updated completely, not per task. (Eg. change in any of the
tasks results in full update of "tasks" from Ftrack.
Args:
data (dictionary): up-to-date data from Ftrack
is_project (boolean): true for project
Returns:
(dictionary) - { "$set" : "{..}"}
"""
not_set = object()
task_changes = not_set
if (
is_project
and "config" in data
and "tasks" in data["config"]
):
task_changes = data["config"].pop("tasks")
task_changes_key = "config.tasks"
if not data["config"]:
data.pop("config")
elif (
not is_project
and "data" in data
and "tasks" in data["data"]
):
task_changes = data["data"].pop("tasks")
task_changes_key = "data.tasks"
if not data["data"]:
data.pop("data")
result = {"$set": {}}
dict_queue = queue.Queue()
dict_queue.put((None, data))
while not dict_queue.empty():
_key, _data = dict_queue.get()
for key, value in _data.items():
new_key = key
if _key is not None:
new_key = "{}.{}".format(_key, key)
if not isinstance(value, dict) or \
(isinstance(value, dict) and not bool(value)): # empty dic
result["$set"][new_key] = value
continue
dict_queue.put((new_key, value))
if task_changes is not not_set and task_changes_key:
result["$set"][task_changes_key] = task_changes
return result
def get_avalon_project_template(project_name):
"""Get avalon template
Args:
project_name: (string)
Returns:
dictionary with templates
"""
templates = Anatomy(project_name).templates
return {
"workfile": templates["avalon"]["workfile"],
"work": templates["avalon"]["work"],
"publish": templates["avalon"]["publish"]
}
def get_project_apps(in_app_list):
"""
Returns metadata information about apps in 'in_app_list' enhanced
from toml files.
Args:
in_app_list: (list) - names of applications
Returns:
tuple (list, dictionary) - list of dictionaries about apps
dictionary of warnings
"""
apps = []
# TODO report
missing_toml_msg = "Missing config file for application"
error_msg = (
"Unexpected error happend during preparation of application"
)
warnings = collections.defaultdict(list)
for app in in_app_list:
try:
toml_path = avalon.lib.which_app(app)
if not toml_path:
log.warning(missing_toml_msg + ' "{}"'.format(app))
warnings[missing_toml_msg].append(app)
continue
apps.append({
"name": app,
"label": toml.load(toml_path)["label"]
})
except Exception:
warnings[error_msg].append(app)
log.warning((
"Error has happened during preparing application \"{}\""
).format(app), exc_info=True)
return apps, warnings
def get_hierarchical_attributes(session, entity, attr_names, attr_defaults={}):
entity_ids = []
if entity.entity_type.lower() == "project":
entity_ids.append(entity["id"])
else:
typed_context = session.query((
"select ancestors.id, project from TypedContext where id is \"{}\""
).format(entity["id"])).one()
entity_ids.append(typed_context["id"])
entity_ids.extend(
[ent["id"] for ent in reversed(typed_context["ancestors"])]
)
entity_ids.append(typed_context["project"]["id"])
missing_defaults = []
for attr_name in attr_names:
if attr_name not in attr_defaults:
missing_defaults.append(attr_name)
join_ent_ids = ", ".join(
["\"{}\"".format(entity_id) for entity_id in entity_ids]
)
join_attribute_names = ", ".join(
["\"{}\"".format(key) for key in attr_names]
)
queries = []
queries.append({
"action": "query",
"expression": (
"select value, entity_id from CustomAttributeValue "
"where entity_id in ({}) and configuration.key in ({})"
).format(join_ent_ids, join_attribute_names)
})
if not missing_defaults:
if hasattr(session, "call"):
[values] = session.call(queries)
else:
[values] = session._call(queries)
else:
join_missing_names = ", ".join(
["\"{}\"".format(key) for key in missing_defaults]
)
queries.append({
"action": "query",
"expression": (
"select default from CustomAttributeConfiguration "
"where key in ({})"
).format(join_missing_names)
})
[values, default_values] = session.call(queries)
for default_value in default_values:
key = default_value["data"][0]["key"]
attr_defaults[key] = default_value["data"][0]["default"]
hier_values = {}
for key, val in attr_defaults.items():
hier_values[key] = val
if not values["data"]:
return hier_values
_hier_values = collections.defaultdict(list)
for value in values["data"]:
key = value["configuration"]["key"]
_hier_values[key].append(value)
for key, values in _hier_values.items():
value = sorted(
values, key=lambda value: entity_ids.index(value["entity_id"])
)[0]
hier_values[key] = value["value"]
return hier_values
def get_task_short_name(task_type):
"""
Returns short name (code) for 'task_type'. Short name stored in
metadata dictionary in project.config per each 'task_type'.
Could be used in anatomy, paths etc.
If no appropriate short name is found in mapping, 'task_type' is
returned back unchanged.
Currently stores data in:
'pype-config/presets/ftrack/project_defaults.json'
Args:
task_type: (string) - Animation | Modeling ...
Returns:
(string) - anim | model ...
"""
presets = config.get_presets()['ftrack']['project_defaults']\
.get("task_short_names")
return presets.get(task_type, task_type)
class SyncEntitiesFactory:
dbcon = AvalonMongoDB()
project_query = (
"select full_name, name, custom_attributes"
", project_schema._task_type_schema.types.name"
" from Project where full_name is \"{}\""
)
entities_query = (
"select id, name, parent_id, link"
" from TypedContext where project_id is \"{}\""
)
ignore_custom_attr_key = "avalon_ignore_sync"
ignore_entity_types = ["milestone"]
report_splitter = {"type": "label", "value": "---"}
def __init__(self, log_obj, session):
self.log = log_obj
self._server_url = session.server_url
self._api_key = session.api_key
self._api_user = session.api_user
def launch_setup(self, project_full_name):
try:
self.session.close()
except Exception:
pass
self.session = ftrack_api.Session(
server_url=self._server_url,
api_key=self._api_key,
api_user=self._api_user,
auto_connect_event_hub=True
)
self.duplicates = {}
self.failed_regex = {}
self.tasks_failed_regex = collections.defaultdict(list)
self.report_items = {
"info": collections.defaultdict(list),
"warning": collections.defaultdict(list),
"error": collections.defaultdict(list)
}
self.create_list = []
self.updates = collections.defaultdict(dict)
self.avalon_project = None
self.avalon_entities = None
self._avalon_ents_by_id = None
self._avalon_ents_by_ftrack_id = None
self._avalon_ents_by_name = None
self._avalon_ents_by_parent_id = None
self._avalon_archived_ents = None
self._avalon_archived_by_id = None
self._avalon_archived_by_parent_id = None
self._avalon_archived_by_name = None
self._subsets_by_parent_id = None
self._changeability_by_mongo_id = None
self.all_filtered_entities = {}
self.filtered_ids = []
self.not_selected_ids = []
self.hier_cust_attr_ids_by_key = {}
self._ent_paths_by_ftrack_id = {}
self.ftrack_avalon_mapper = None
self.avalon_ftrack_mapper = None
self.create_ftrack_ids = None
self.update_ftrack_ids = None
self.deleted_entities = None
# Get Ftrack project
ft_project = self.session.query(
self.project_query.format(project_full_name)
).one()
ft_project_id = ft_project["id"]
# Skip if project is ignored
if ft_project["custom_attributes"].get(
self.ignore_custom_attr_key
) is True:
msg = (
"Project \"{}\" has set `Ignore Sync` custom attribute to True"
).format(project_full_name)
self.log.warning(msg)
return {"success": False, "message": msg}
self.log.debug((
"*** Synchronization initialization started <{}>."
).format(project_full_name))
# Check if `avalon_mongo_id` custom attribute exist or is accessible
if CUST_ATTR_ID_KEY not in ft_project["custom_attributes"]:
items = []
items.append({
"type": "label",
"value": "# Can't access Custom attribute <{}>".format(
CUST_ATTR_ID_KEY
)
})
items.append({
"type": "label",
"value": (
"<p>- Check if user \"{}\" has permissions"
" to access the Custom attribute</p>"
).format(self._api_key)
})
items.append({
"type": "label",
"value": "<p>- Check if the Custom attribute exist</p>"
})
return {
"items": items,
"title": "Synchronization failed",
"success": False,
"message": "Synchronization failed"
}
# Find all entities in project
all_project_entities = self.session.query(
self.entities_query.format(ft_project_id)
).all()
# Store entities by `id` and `parent_id`
entities_dict = collections.defaultdict(lambda: {
"children": list(),
"parent_id": None,
"entity": None,
"entity_type": None,
"name": None,
"custom_attributes": {},
"hier_attrs": {},
"avalon_attrs": {},
"tasks": {}
})
for entity in all_project_entities:
parent_id = entity["parent_id"]
entity_type = entity.entity_type
entity_type_low = entity_type.lower()
if entity_type_low in self.ignore_entity_types:
continue
elif entity_type_low == "task":
# enrich task info with additional metadata
task = {"type": entity["type"]["name"]}
entities_dict[parent_id]["tasks"][entity["name"]] = task
continue
entity_id = entity["id"]
entities_dict[entity_id].update({
"entity": entity,
"parent_id": parent_id,
"entity_type": entity_type_low,
"entity_type_orig": entity_type,
"name": entity["name"]
})
entities_dict[parent_id]["children"].append(entity_id)
entities_dict[ft_project_id]["entity"] = ft_project
entities_dict[ft_project_id]["entity_type"] = (
ft_project.entity_type.lower()
)
entities_dict[ft_project_id]["entity_type_orig"] = (
ft_project.entity_type
)
entities_dict[ft_project_id]["name"] = ft_project["full_name"]
self.ft_project_id = ft_project_id
self.entities_dict = entities_dict
@property
def avalon_ents_by_id(self):
"""
Returns dictionary of avalon tracked entities (assets stored in
MongoDB) accessible by its '_id'
(mongo intenal ID - example ObjectId("5f48de5830a9467b34b69798"))
Returns:
(dictionary) - {"(_id)": whole entity asset}
"""
if self._avalon_ents_by_id is None:
self._avalon_ents_by_id = {}
for entity in self.avalon_entities:
self._avalon_ents_by_id[str(entity["_id"])] = entity
return self._avalon_ents_by_id
@property
def avalon_ents_by_ftrack_id(self):
"""
Returns dictionary of Mongo ids of avalon tracked entities
(assets stored in MongoDB) accessible by its 'ftrackId'
(id from ftrack)
(example '431ee3f2-e91a-11ea-bfa4-92591a5b5e3e')
Returns:
(dictionary) - {"(ftrackId)": "_id"}
"""
if self._avalon_ents_by_ftrack_id is None:
self._avalon_ents_by_ftrack_id = {}
for entity in self.avalon_entities:
key = entity.get("data", {}).get("ftrackId")
if not key:
continue
self._avalon_ents_by_ftrack_id[key] = str(entity["_id"])
return self._avalon_ents_by_ftrack_id
@property
def avalon_ents_by_name(self):
"""
Returns dictionary of Mongo ids of avalon tracked entities
(assets stored in MongoDB) accessible by its 'name'
(example 'Hero')
Returns:
(dictionary) - {"(name)": "_id"}
"""
if self._avalon_ents_by_name is None:
self._avalon_ents_by_name = {}
for entity in self.avalon_entities:
self._avalon_ents_by_name[entity["name"]] = str(entity["_id"])
return self._avalon_ents_by_name
@property
def avalon_ents_by_parent_id(self):
"""
Returns dictionary of avalon tracked entities
(assets stored in MongoDB) accessible by its 'visualParent'
(example ObjectId("5f48de5830a9467b34b69798"))
Fills 'self._avalon_archived_ents' for performance
Returns:
(dictionary) - {"(_id)": whole entity}
"""
if self._avalon_ents_by_parent_id is None:
self._avalon_ents_by_parent_id = collections.defaultdict(list)
for entity in self.avalon_entities:
parent_id = entity["data"]["visualParent"]
if parent_id is not None:
parent_id = str(parent_id)
self._avalon_ents_by_parent_id[parent_id].append(entity)
return self._avalon_ents_by_parent_id
@property
def avalon_archived_ents(self):
"""
Returns list of archived assets from DB
(their "type" == 'archived_asset')
Fills 'self._avalon_archived_ents' for performance
Returns:
(list) of assets
"""
if self._avalon_archived_ents is None:
self._avalon_archived_ents = [
ent for ent in self.dbcon.find({"type": "archived_asset"})
]
return self._avalon_archived_ents
@property
def avalon_archived_by_name(self):
"""
Returns list of archived assets from DB
(their "type" == 'archived_asset')
Fills 'self._avalon_archived_by_name' for performance
Returns:
(dictionary of lists) of assets accessible by asset name
"""
if self._avalon_archived_by_name is None:
self._avalon_archived_by_name = collections.defaultdict(list)
for ent in self.avalon_archived_ents:
self._avalon_archived_by_name[ent["name"]].append(ent)
return self._avalon_archived_by_name
@property
def avalon_archived_by_id(self):
"""
Returns dictionary of archived assets from DB
(their "type" == 'archived_asset')
Fills 'self._avalon_archived_by_id' for performance
Returns:
(dictionary) of assets accessible by asset mongo _id
"""
if self._avalon_archived_by_id is None:
self._avalon_archived_by_id = {
str(ent["_id"]): ent for ent in self.avalon_archived_ents
}
return self._avalon_archived_by_id
@property
def avalon_archived_by_parent_id(self):
"""
Returns dictionary of archived assets from DB per their's parent
(their "type" == 'archived_asset')
Fills 'self._avalon_archived_by_parent_id' for performance
Returns:
(dictionary of lists) of assets accessible by asset parent
mongo _id
"""
if self._avalon_archived_by_parent_id is None:
self._avalon_archived_by_parent_id = collections.defaultdict(list)
for entity in self.avalon_archived_ents:
parent_id = entity["data"]["visualParent"]
if parent_id is not None:
parent_id = str(parent_id)
self._avalon_archived_by_parent_id[parent_id].append(entity)
return self._avalon_archived_by_parent_id
@property
def subsets_by_parent_id(self):
"""
Returns dictionary of subsets from Mongo ("type": "subset")
grouped by their parent.
Fills 'self._subsets_by_parent_id' for performance
Returns:
(dictionary of lists)
"""
if self._subsets_by_parent_id is None:
self._subsets_by_parent_id = collections.defaultdict(list)
for subset in self.dbcon.find({"type": "subset"}):
self._subsets_by_parent_id[str(subset["parent"])].append(
subset
)
return self._subsets_by_parent_id
@property
def changeability_by_mongo_id(self):
if self._changeability_by_mongo_id is None:
self._changeability_by_mongo_id = collections.defaultdict(
lambda: True
)
self._changeability_by_mongo_id[self.avalon_project_id] = False
self._bubble_changeability(list(self.subsets_by_parent_id.keys()))
return self._changeability_by_mongo_id
@property
def all_ftrack_names(self):
"""
Returns lists of names of all entities in Ftrack
Returns:
(list)
"""
return [
ent_dict["name"] for ent_dict in self.entities_dict.values() if (
ent_dict.get("name")
)
]
def duplicity_regex_check(self):
self.log.debug("* Checking duplicities and invalid symbols")
# Duplicity and regex check
entity_ids_by_name = {}
duplicates = []
failed_regex = []
task_names = {}
_schema_patterns = {}
for ftrack_id, entity_dict in self.entities_dict.items():
regex_check = True
name = entity_dict["name"]
entity_type = entity_dict["entity_type"]
# Tasks must be checked too
for task in entity_dict["tasks"].items():
task_name, task = task
passed = task_names.get(task_name)
if passed is None:
passed = check_regex(
task_name, "task", schema_patterns=_schema_patterns
)
task_names[task_name] = passed
if not passed:
self.tasks_failed_regex[task_name].append(ftrack_id)
if name in entity_ids_by_name:
duplicates.append(name)
else:
entity_ids_by_name[name] = []
regex_check = check_regex(
name, entity_type, schema_patterns=_schema_patterns
)
entity_ids_by_name[name].append(ftrack_id)
if not regex_check:
failed_regex.append(name)
for name in failed_regex:
self.failed_regex[name] = entity_ids_by_name[name]
for name in duplicates:
self.duplicates[name] = entity_ids_by_name[name]
self.filter_by_duplicate_regex()
def filter_by_duplicate_regex(self):
filter_queue = queue.Queue()
failed_regex_msg = "{} - Entity has invalid symbols in the name"
duplicate_msg = "There are multiple entities with the name: \"{}\":"
for ids in self.failed_regex.values():
for id in ids:
ent_path = self.get_ent_path(id)
self.log.warning(failed_regex_msg.format(ent_path))
filter_queue.put(id)
for name, ids in self.duplicates.items():
self.log.warning(duplicate_msg.format(name))
for id in ids:
ent_path = self.get_ent_path(id)
self.log.warning(ent_path)
filter_queue.put(id)
filtered_ids = []
while not filter_queue.empty():
ftrack_id = filter_queue.get()
if ftrack_id in filtered_ids:
continue
entity_dict = self.entities_dict.pop(ftrack_id, {})
if not entity_dict:
continue
self.all_filtered_entities[ftrack_id] = entity_dict
parent_id = entity_dict.get("parent_id")
if parent_id and parent_id in self.entities_dict:
if ftrack_id in self.entities_dict[parent_id]["children"]:
self.entities_dict[parent_id]["children"].remove(ftrack_id)
filtered_ids.append(ftrack_id)
for child_id in entity_dict.get("children", []):
filter_queue.put(child_id)
for name, ids in self.tasks_failed_regex.items():
for id in ids:
if id not in self.entities_dict:
continue
self.entities_dict[id]["tasks"].pop(name)
ent_path = self.get_ent_path(id)
self.log.warning(failed_regex_msg.format(
"/".join([ent_path, name])
))
def filter_by_ignore_sync(self):
# skip filtering if `ignore_sync` attribute do not exist
if self.entities_dict[self.ft_project_id]["avalon_attrs"].get(
self.ignore_custom_attr_key, "_notset_"
) == "_notset_":
return
self.filter_queue = queue.Queue()
self.filter_queue.put((self.ft_project_id, False))
while not self.filter_queue.empty():
parent_id, remove = self.filter_queue.get()
if remove:
parent_dict = self.entities_dict.pop(parent_id, {})
self.all_filtered_entities[parent_id] = parent_dict
self.filtered_ids.append(parent_id)
else:
parent_dict = self.entities_dict.get(parent_id, {})
for child_id in parent_dict.get("children", []):
# keep original `remove` value for all childs
_remove = (remove is True)
if not _remove:
if self.entities_dict[child_id]["avalon_attrs"].get(
self.ignore_custom_attr_key
):
self.entities_dict[parent_id]["children"].remove(
child_id
)
_remove = True
self.filter_queue.put((child_id, _remove))
def filter_by_selection(self, event):
# BUGGY!!!! cause that entities are in deleted list
# TODO may be working when filtering happen after preparations
# - But this part probably does not have any functional reason
# - Time of synchronization probably won't be changed much
selected_ids = []
for entity in event["data"]["selection"]:
# Skip if project is in selection
if entity["entityType"] == "show":
return
selected_ids.append(entity["entityId"])
sync_ids = [self.ft_project_id]
parents_queue = queue.Queue()
children_queue = queue.Queue()
for id in selected_ids:
# skip if already filtered with ignore sync custom attribute
if id in self.filtered_ids:
continue
parents_queue.put(id)
children_queue.put(id)
while not parents_queue.empty():
id = parents_queue.get()
while True:
# Stops when parent is in sync_ids
if id in self.filtered_ids or id in sync_ids or id is None:
break
sync_ids.append(id)
id = self.entities_dict[id]["parent_id"]
while not children_queue.empty():
parent_id = children_queue.get()
for child_id in self.entities_dict[parent_id]["children"]:
if child_id in sync_ids or child_id in self.filtered_ids:
continue
sync_ids.append(child_id)
children_queue.put(child_id)
# separate not selected and to process entities
for key, value in self.entities_dict.items():
if key not in sync_ids:
self.not_selected_ids.append(key)
for id in self.not_selected_ids:
# pop from entities
value = self.entities_dict.pop(id)
# remove entity from parent's children
parent_id = value["parent_id"]
if parent_id not in sync_ids:
continue
self.entities_dict[parent_id]["children"].remove(id)
def set_cutom_attributes(self):
self.log.debug("* Preparing custom attributes")
# Get custom attributes and values
custom_attrs, hier_attrs = get_pype_attr(self.session)
ent_types = self.session.query("select id, name from ObjectType").all()
ent_types_by_name = {
ent_type["name"]: ent_type["id"] for ent_type in ent_types
}
# store default values per entity type
attrs_per_entity_type = collections.defaultdict(dict)
avalon_attrs = collections.defaultdict(dict)
# store also custom attribute configuration id for future use (create)
attrs_per_entity_type_ca_id = collections.defaultdict(dict)
avalon_attrs_ca_id = collections.defaultdict(dict)
attribute_key_by_id = {}
for cust_attr in custom_attrs:
key = cust_attr["key"]
attribute_key_by_id[cust_attr["id"]] = key
ca_ent_type = cust_attr["entity_type"]
if key.startswith("avalon_"):
if ca_ent_type == "show":
avalon_attrs[ca_ent_type][key] = cust_attr["default"]
avalon_attrs_ca_id[ca_ent_type][key] = cust_attr["id"]
elif ca_ent_type == "task":
obj_id = cust_attr["object_type_id"]
avalon_attrs[obj_id][key] = cust_attr["default"]
avalon_attrs_ca_id[obj_id][key] = cust_attr["id"]
continue
if ca_ent_type == "show":
attrs_per_entity_type[ca_ent_type][key] = cust_attr["default"]
attrs_per_entity_type_ca_id[ca_ent_type][key] = cust_attr["id"]
elif ca_ent_type == "task":
obj_id = cust_attr["object_type_id"]
attrs_per_entity_type[obj_id][key] = cust_attr["default"]
attrs_per_entity_type_ca_id[obj_id][key] = cust_attr["id"]
obj_id_ent_type_map = {}
sync_ids = []
for entity_id, entity_dict in self.entities_dict.items():
sync_ids.append(entity_id)
entity_type = entity_dict["entity_type"]
entity_type_orig = entity_dict["entity_type_orig"]
if entity_type == "project":
attr_key = "show"
else:
map_key = obj_id_ent_type_map.get(entity_type_orig)
if not map_key:
# Put space between capitals
# (e.g. 'AssetBuild' -> 'Asset Build')
map_key = re.sub(
r"(\w)([A-Z])", r"\1 \2", entity_type_orig
)
obj_id_ent_type_map[entity_type_orig] = map_key
# Get object id of entity type
attr_key = ent_types_by_name.get(map_key)
# Backup soluction when id is not found by prequeried objects
if not attr_key:
query = "ObjectType where name is \"{}\"".format(map_key)
attr_key = self.session.query(query).one()["id"]
ent_types_by_name[map_key] = attr_key
prepared_attrs = attrs_per_entity_type.get(attr_key)
prepared_avalon_attr = avalon_attrs.get(attr_key)
prepared_attrs_ca_id = attrs_per_entity_type_ca_id.get(attr_key)
prepared_avalon_attr_ca_id = avalon_attrs_ca_id.get(attr_key)
if prepared_attrs:
self.entities_dict[entity_id]["custom_attributes"] = (
copy.deepcopy(prepared_attrs)
)
if prepared_attrs_ca_id:
self.entities_dict[entity_id]["custom_attributes_id"] = (
copy.deepcopy(prepared_attrs_ca_id)
)
if prepared_avalon_attr:
self.entities_dict[entity_id]["avalon_attrs"] = (
copy.deepcopy(prepared_avalon_attr)
)
if prepared_avalon_attr_ca_id:
self.entities_dict[entity_id]["avalon_attrs_id"] = (
copy.deepcopy(prepared_avalon_attr_ca_id)
)
# TODO query custom attributes by entity_id
entity_ids_joined = ", ".join([
"\"{}\"".format(id) for id in sync_ids
])
attributes_joined = ", ".join([
"\"{}\"".format(attr_id) for attr_id in attribute_key_by_id.keys()
])
cust_attr_query = (
"select value, entity_id from ContextCustomAttributeValue "
"where entity_id in ({}) and configuration_id in ({})"
)
call_expr = [{
"action": "query",
"expression": cust_attr_query.format(
entity_ids_joined, attributes_joined
)
}]
if hasattr(self.session, "call"):
[values] = self.session.call(call_expr)
else:
[values] = self.session._call(call_expr)
for item in values["data"]:
entity_id = item["entity_id"]
key = attribute_key_by_id[item["configuration_id"]]
store_key = "custom_attributes"
if key.startswith("avalon_"):
store_key = "avalon_attrs"
self.entities_dict[entity_id][store_key][key] = item["value"]
# process hierarchical attributes
self.set_hierarchical_attribute(hier_attrs, sync_ids)
def set_hierarchical_attribute(self, hier_attrs, sync_ids):
# collect all hierarchical attribute keys
# and prepare default values to project
attributes_by_key = {}
attribute_key_by_id = {}
for attr in hier_attrs:
key = attr["key"]
attribute_key_by_id[attr["id"]] = key
attributes_by_key[key] = attr
self.hier_cust_attr_ids_by_key[key] = attr["id"]
store_key = "hier_attrs"
if key.startswith("avalon_"):
store_key = "avalon_attrs"
self.entities_dict[self.ft_project_id][store_key][key] = (
attr["default"]
)
# Add attribute ids to entities dictionary
avalon_attribute_id_by_key = {
attr_key: attr_id
for attr_id, attr_key in attribute_key_by_id.items()
if attr_key.startswith("avalon_")
}
for entity_id in self.entities_dict.keys():
if "avalon_attrs_id" not in self.entities_dict[entity_id]:
self.entities_dict[entity_id]["avalon_attrs_id"] = {}
for attr_key, attr_id in avalon_attribute_id_by_key.items():
self.entities_dict[entity_id]["avalon_attrs_id"][attr_key] = (
attr_id
)
# Prepare dict with all hier keys and None values
prepare_dict = {}
prepare_dict_avalon = {}
for key in attributes_by_key.keys():
if key.startswith("avalon_"):
prepare_dict_avalon[key] = None
else:
prepare_dict[key] = None
for id, entity_dict in self.entities_dict.items():
# Skip project because has stored defaults at the moment
if entity_dict["entity_type"] == "project":
continue
entity_dict["hier_attrs"] = copy.deepcopy(prepare_dict)
for key, val in prepare_dict_avalon.items():
entity_dict["avalon_attrs"][key] = val
# Prepare values to query
entity_ids_joined = ", ".join([
"\"{}\"".format(id) for id in sync_ids
])
attributes_joined = ", ".join([
"\"{}\"".format(attr_id) for attr_id in attribute_key_by_id.keys()
])
avalon_hier = []
call_expr = [{
"action": "query",
"expression": (
"select value, entity_id from ContextCustomAttributeValue "
"where entity_id in ({}) and configuration_id in ({})"
).format(entity_ids_joined, attributes_joined)
}]
if hasattr(self.session, "call"):
[values] = self.session.call(call_expr)
else:
[values] = self.session._call(call_expr)
for item in values["data"]:
value = item["value"]
# WARNING It is not possible to propage enumerate hierachical
# attributes with multiselection 100% right. Unseting all values
# will cause inheritance from parent.
if (
value is None
or (isinstance(value, (tuple, list)) and not value)
):
continue
entity_id = item["entity_id"]
key = attribute_key_by_id[item["configuration_id"]]
if key.startswith("avalon_"):
store_key = "avalon_attrs"
avalon_hier.append(key)
else:
store_key = "hier_attrs"
self.entities_dict[entity_id][store_key][key] = value
# Get dictionary with not None hierarchical values to pull to childs
top_id = self.ft_project_id
project_values = {}
for key, value in self.entities_dict[top_id]["hier_attrs"].items():
if value is not None:
project_values[key] = value
for key in avalon_hier:
if key == CUST_ATTR_ID_KEY:
continue
value = self.entities_dict[top_id]["avalon_attrs"][key]
if value is not None:
project_values[key] = value
hier_down_queue = queue.Queue()
hier_down_queue.put((project_values, top_id))
while not hier_down_queue.empty():
hier_values, parent_id = hier_down_queue.get()
for child_id in self.entities_dict[parent_id]["children"]:
_hier_values = copy.deepcopy(hier_values)
for key in attributes_by_key.keys():
if key.startswith("avalon_"):
store_key = "avalon_attrs"
else:
store_key = "hier_attrs"
value = self.entities_dict[child_id][store_key][key]
if value is not None:
_hier_values[key] = value
self.entities_dict[child_id]["hier_attrs"].update(_hier_values)
hier_down_queue.put((_hier_values, child_id))
def remove_from_archived(self, mongo_id):
entity = self.avalon_archived_by_id.pop(mongo_id, None)
if not entity:
return
if self._avalon_archived_ents is not None:
if entity in self._avalon_archived_ents:
self._avalon_archived_ents.remove(entity)
if self._avalon_archived_by_name is not None:
name = entity["name"]
if name in self._avalon_archived_by_name:
name_ents = self._avalon_archived_by_name[name]
if entity in name_ents:
if len(name_ents) == 1:
self._avalon_archived_by_name.pop(name)
else:
self._avalon_archived_by_name[name].remove(entity)
# TODO use custom None instead of __NOTSET__
if self._avalon_archived_by_parent_id is not None:
parent_id = entity.get("data", {}).get(
"visualParent", "__NOTSET__"
)
if parent_id is not None:
parent_id = str(parent_id)
if parent_id in self._avalon_archived_by_parent_id:
parent_list = self._avalon_archived_by_parent_id[parent_id]
if entity not in parent_list:
self._avalon_archived_by_parent_id[parent_id].remove(
entity
)
def prepare_ftrack_ent_data(self):
not_set_ids = []
for id, entity_dict in self.entities_dict.items():
entity = entity_dict["entity"]
if entity is None:
not_set_ids.append(id)
continue
self.entities_dict[id]["final_entity"] = {}
self.entities_dict[id]["final_entity"]["name"] = (
entity_dict["name"]
)
data = {}
data["ftrackId"] = entity["id"]
data["entityType"] = entity_dict["entity_type_orig"]
for key, val in entity_dict.get("custom_attributes", []).items():
data[key] = val
for key, val in entity_dict.get("hier_attrs", []).items():
data[key] = val
if id == self.ft_project_id:
data["code"] = entity["name"]
self.entities_dict[id]["final_entity"]["data"] = data
self.entities_dict[id]["final_entity"]["type"] = "project"
proj_schema = entity["project_schema"]
task_types = proj_schema["_task_type_schema"]["types"]
proj_apps, warnings = get_project_apps(
(data.get("applications") or [])
)
for msg, items in warnings.items():
if not msg or not items:
continue
self.report_items["warning"][msg] = items
tasks = {}
for tt in task_types:
tasks[tt["name"]] = {
"short_name": get_task_short_name(tt["name"])
}
self.entities_dict[id]["final_entity"]["config"] = {
"tasks": tasks,
"apps": proj_apps
}
continue
ent_path_items = [ent["name"] for ent in entity["link"]]
parents = ent_path_items[1:len(ent_path_items) - 1:]
hierarchy = ""
if len(parents) > 0:
hierarchy = os.path.sep.join(parents)
data["parents"] = parents
data["hierarchy"] = hierarchy
data["tasks"] = self.entities_dict[id].pop("tasks", {})
self.entities_dict[id]["final_entity"]["data"] = data
self.entities_dict[id]["final_entity"]["type"] = "asset"
if not_set_ids:
self.log.debug((
"- Debug information: Filtering bug, there are empty dicts"
"in entities dict (functionality should not be affected) <{}>"
).format("| ".join(not_set_ids)))
for id in not_set_ids:
self.entities_dict.pop(id)
def get_ent_path(self, ftrack_id):
ent_path = self._ent_paths_by_ftrack_id.get(ftrack_id)
if not ent_path:
entity = self.entities_dict[ftrack_id]["entity"]
ent_path = "/".join(
[ent["name"] for ent in entity["link"]]
)
self._ent_paths_by_ftrack_id[ftrack_id] = ent_path
return ent_path
def prepare_avalon_entities(self, ft_project_name):
self.log.debug((
"* Preparing avalon entities "
"(separate to Create, Update and Deleted groups)"
))
# Avalon entities
self.dbcon.install()
self.dbcon.Session["AVALON_PROJECT"] = ft_project_name
avalon_project = self.dbcon.find_one({"type": "project"})
avalon_entities = self.dbcon.find({"type": "asset"})
self.avalon_project = avalon_project
self.avalon_entities = avalon_entities
ftrack_avalon_mapper = {}
avalon_ftrack_mapper = {}
create_ftrack_ids = []
update_ftrack_ids = []
same_mongo_id = []
all_mongo_ids = {}
for ftrack_id, entity_dict in self.entities_dict.items():
mongo_id = entity_dict["avalon_attrs"].get(CUST_ATTR_ID_KEY)
if not mongo_id:
continue
if mongo_id in all_mongo_ids:
same_mongo_id.append(mongo_id)
else:
all_mongo_ids[mongo_id] = []
all_mongo_ids[mongo_id].append(ftrack_id)
if avalon_project:
mongo_id = str(avalon_project["_id"])
ftrack_avalon_mapper[self.ft_project_id] = mongo_id
avalon_ftrack_mapper[mongo_id] = self.ft_project_id
update_ftrack_ids.append(self.ft_project_id)
else:
create_ftrack_ids.append(self.ft_project_id)
# make it go hierarchically
prepare_queue = queue.Queue()
for child_id in self.entities_dict[self.ft_project_id]["children"]:
prepare_queue.put(child_id)
while not prepare_queue.empty():
ftrack_id = prepare_queue.get()
for child_id in self.entities_dict[ftrack_id]["children"]:
prepare_queue.put(child_id)
entity_dict = self.entities_dict[ftrack_id]
ent_path = self.get_ent_path(ftrack_id)
mongo_id = entity_dict["avalon_attrs"].get(CUST_ATTR_ID_KEY)
av_ent_by_mongo_id = self.avalon_ents_by_id.get(mongo_id)
if av_ent_by_mongo_id:
av_ent_ftrack_id = av_ent_by_mongo_id.get("data", {}).get(
"ftrackId"
)
is_right = False
else_match_better = False
if av_ent_ftrack_id and av_ent_ftrack_id == ftrack_id:
is_right = True
elif mongo_id not in same_mongo_id:
is_right = True
else:
ftrack_ids_with_same_mongo = all_mongo_ids[mongo_id]
for _ftrack_id in ftrack_ids_with_same_mongo:
if _ftrack_id == av_ent_ftrack_id:
continue
_entity_dict = self.entities_dict[_ftrack_id]
_mongo_id = (
_entity_dict["avalon_attrs"][CUST_ATTR_ID_KEY]
)
_av_ent_by_mongo_id = self.avalon_ents_by_id.get(
_mongo_id
)
_av_ent_ftrack_id = _av_ent_by_mongo_id.get(
"data", {}
).get("ftrackId")
if _av_ent_ftrack_id == ftrack_id:
else_match_better = True
break
if not is_right and not else_match_better:
entity = entity_dict["entity"]
ent_path_items = [ent["name"] for ent in entity["link"]]
parents = ent_path_items[1:len(ent_path_items) - 1:]
av_parents = av_ent_by_mongo_id["data"]["parents"]
if av_parents == parents:
is_right = True
else:
name = entity_dict["name"]
av_name = av_ent_by_mongo_id["name"]
if name == av_name:
is_right = True
if is_right:
self.log.debug(
"Existing (by MongoID) <{}>".format(ent_path)
)
ftrack_avalon_mapper[ftrack_id] = mongo_id
avalon_ftrack_mapper[mongo_id] = ftrack_id
update_ftrack_ids.append(ftrack_id)
continue
mongo_id = self.avalon_ents_by_ftrack_id.get(ftrack_id)
if not mongo_id:
mongo_id = self.avalon_ents_by_name.get(entity_dict["name"])
if mongo_id:
self.log.debug(
"Existing (by matching name) <{}>".format(ent_path)
)
else:
self.log.debug(
"Existing (by FtrackID in mongo) <{}>".format(ent_path)
)
if mongo_id:
ftrack_avalon_mapper[ftrack_id] = mongo_id
avalon_ftrack_mapper[mongo_id] = ftrack_id
update_ftrack_ids.append(ftrack_id)
continue
self.log.debug("New <{}>".format(ent_path))
create_ftrack_ids.append(ftrack_id)
deleted_entities = []
for mongo_id in self.avalon_ents_by_id:
if mongo_id in avalon_ftrack_mapper:
continue
deleted_entities.append(mongo_id)
av_ent = self.avalon_ents_by_id[mongo_id]
av_ent_path_items = [p for p in av_ent["data"]["parents"]]
av_ent_path_items.append(av_ent["name"])
self.log.debug("Deleted <{}>".format("/".join(av_ent_path_items)))
self.ftrack_avalon_mapper = ftrack_avalon_mapper
self.avalon_ftrack_mapper = avalon_ftrack_mapper
self.create_ftrack_ids = create_ftrack_ids
self.update_ftrack_ids = update_ftrack_ids
self.deleted_entities = deleted_entities
self.log.debug((
"Ftrack -> Avalon comparison: New <{}> "
"| Existing <{}> | Deleted <{}>"
).format(
len(create_ftrack_ids),
len(update_ftrack_ids),
len(deleted_entities)
))
def filter_with_children(self, ftrack_id):
if ftrack_id not in self.entities_dict:
return
ent_dict = self.entities_dict[ftrack_id]
parent_id = ent_dict["parent_id"]
self.entities_dict[parent_id]["children"].remove(ftrack_id)
children_queue = queue.Queue()
children_queue.put(ftrack_id)
while not children_queue.empty():
_ftrack_id = children_queue.get()
entity_dict = self.entities_dict.pop(_ftrack_id, {"children": []})
for child_id in entity_dict["children"]:
children_queue.put(child_id)
def prepare_changes(self):
self.log.debug("* Preparing changes for avalon/ftrack")
hierarchy_changing_ids = []
ignore_keys = collections.defaultdict(list)
update_queue = queue.Queue()
for ftrack_id in self.update_ftrack_ids:
update_queue.put(ftrack_id)
while not update_queue.empty():
ftrack_id = update_queue.get()
if ftrack_id == self.ft_project_id:
changes = self.prepare_project_changes()
if changes:
self.updates[self.avalon_project_id] = changes
continue
ftrack_ent_dict = self.entities_dict[ftrack_id]
# *** check parents
parent_check = False
ftrack_parent_id = ftrack_ent_dict["parent_id"]
avalon_id = self.ftrack_avalon_mapper[ftrack_id]
avalon_entity = self.avalon_ents_by_id[avalon_id]
avalon_parent_id = avalon_entity["data"]["visualParent"]
if avalon_parent_id is not None:
avalon_parent_id = str(avalon_parent_id)
ftrack_parent_mongo_id = self.ftrack_avalon_mapper[
ftrack_parent_id
]
# if parent is project
if (ftrack_parent_mongo_id == avalon_parent_id) or (
ftrack_parent_id == self.ft_project_id and
avalon_parent_id is None
):
parent_check = True
# check name
ftrack_name = ftrack_ent_dict["name"]
avalon_name = avalon_entity["name"]
name_check = ftrack_name == avalon_name
# IDEAL STATE: both parent and name check passed
if parent_check and name_check:
continue
# If entity is changeable then change values of parent or name
if self.changeability_by_mongo_id[avalon_id]:
# TODO logging
if not parent_check:
if ftrack_parent_mongo_id == str(self.avalon_project_id):
new_parent_name = self.entities_dict[
self.ft_project_id]["name"]
new_parent_id = None
else:
new_parent_name = self.avalon_ents_by_id[
ftrack_parent_mongo_id]["name"]
new_parent_id = ObjectId(ftrack_parent_mongo_id)
if avalon_parent_id == str(self.avalon_project_id):
old_parent_name = self.entities_dict[
self.ft_project_id]["name"]
else:
old_parent_name = self.avalon_ents_by_id[
ftrack_parent_mongo_id]["name"]
self.updates[avalon_id]["data"] = {
"visualParent": new_parent_id
}
ignore_keys[ftrack_id].append("data.visualParent")
self.log.debug((
"Avalon entity \"{}\" changed parent \"{}\" -> \"{}\""
).format(avalon_name, old_parent_name, new_parent_name))
if not name_check:
self.updates[avalon_id]["name"] = ftrack_name
ignore_keys[ftrack_id].append("name")
self.log.debug(
"Avalon entity \"{}\" was renamed to \"{}\"".format(
avalon_name, ftrack_name
)
)
continue
# parents and hierarchy must be recalculated
hierarchy_changing_ids.append(ftrack_id)
# Parent is project if avalon_parent_id is set to None
if avalon_parent_id is None:
avalon_parent_id = str(self.avalon_project_id)
if not name_check:
ent_path = self.get_ent_path(ftrack_id)
# TODO report
# TODO logging
self.entities_dict[ftrack_id]["name"] = avalon_name
self.entities_dict[ftrack_id]["entity"]["name"] = (
avalon_name
)
self.entities_dict[ftrack_id]["final_entity"]["name"] = (
avalon_name
)
self.log.warning("Name was changed back to {} <{}>".format(
avalon_name, ent_path
))
self._ent_paths_by_ftrack_id.pop(ftrack_id, None)
msg = (
"<Entity renamed back> It is not possible to change"
" the name of an entity or it's parents, "
" if it already contained published data."
)
self.report_items["warning"][msg].append(ent_path)
# skip parent oricessing if hierarchy didn't change
if parent_check:
continue
# Logic when parenting(hierarchy) has changed and should not
old_ftrack_parent_id = self.avalon_ftrack_mapper.get(
avalon_parent_id
)
# If last ftrack parent id from mongo entity exist then just
# remap paren_id on entity
if old_ftrack_parent_id:
# TODO report
# TODO logging
ent_path = self.get_ent_path(ftrack_id)
msg = (
"<Entity moved back in hierachy> It is not possible"
" to change the hierarchy of an entity or it's parents,"
" if it already contained published data."
)
self.report_items["warning"][msg].append(ent_path)
self.log.warning((
" Entity contains published data so it was moved"
" back to it's original hierarchy <{}>"
).format(ent_path))
self.entities_dict[ftrack_id]["entity"]["parent_id"] = (
old_ftrack_parent_id
)
self.entities_dict[ftrack_id]["parent_id"] = (
old_ftrack_parent_id
)
self.entities_dict[old_ftrack_parent_id][
"children"
].append(ftrack_id)
continue
old_parent_ent = self.avalon_ents_by_id.get(avalon_parent_id)
if not old_parent_ent:
old_parent_ent = self.avalon_archived_by_id.get(
avalon_parent_id
)
# TODO report
# TODO logging
if not old_parent_ent:
self.log.warning((
"Parent entity was not found by id"
" - Trying to find by parent name"
))
ent_path = self.get_ent_path(ftrack_id)
parents = avalon_entity["data"]["parents"]
parent_name = parents[-1]
matching_entity_id = None
for id, entity_dict in self.entities_dict.items():
if entity_dict["name"] == parent_name:
matching_entity_id = id
break
if matching_entity_id is None:
# TODO logging
# TODO report (turn off auto-sync?)
self.log.error((
"The entity contains published data but it was moved"
" to a different place in the hierarchy and it's"
" previous parent cannot be found."
" It's impossible to solve this programmatically <{}>"
).format(ent_path))
msg = (
"<Entity can't be synchronised> Hierarchy of an entity"
" can't be changed due to published data and missing"
" previous parent"
)
self.report_items["error"][msg].append(ent_path)
self.filter_with_children(ftrack_id)
continue
matching_ent_dict = self.entities_dict.get(matching_entity_id)
match_ent_parents = matching_ent_dict.get(
"final_entity", {}).get(
"data", {}).get(
"parents", ["__NOTSET__"]
)
# TODO logging
# TODO report
if (
len(match_ent_parents) >= len(parents) or
match_ent_parents[:-1] != parents
):
ent_path = self.get_ent_path(ftrack_id)
self.log.error((
"The entity contains published data but it was moved"
" to a different place in the hierarchy and it's"
" previous parents were moved too."
" It's impossible to solve this programmatically <{}>"
).format(ent_path))
msg = (
"<Entity not synchronizable> Hierarchy of an entity"
" can't be changed due to published data and scrambled"
"hierarchy"
)
continue
old_parent_ent = matching_ent_dict["final_entity"]
parent_id = self.ft_project_id
entities_to_create = []
# TODO logging
self.log.warning(
"Ftrack entities must be recreated because they were deleted,"
" but they contain published data."
)
_avalon_ent = old_parent_ent
self.updates[avalon_parent_id] = {"type": "asset"}
success = True
while True:
_vis_par = _avalon_ent["data"]["visualParent"]
_name = _avalon_ent["name"]
if _name in self.all_ftrack_names:
av_ent_path_items = _avalon_ent["data"]["parents"]
av_ent_path_items.append(_name)
av_ent_path = "/".join(av_ent_path_items)
# TODO report
# TODO logging
self.log.error((
"Can't recreate the entity in Ftrack because an entity"
" with the same name already exists in a different"
" place in the hierarchy <{}>"
).format(av_ent_path))
msg = (
"<Entity not synchronizable> Hierarchy of an entity"
" can't be changed. I contains published data and it's"
" previous parent had a name, that is duplicated at a "
" different hierarchy level"
)
self.report_items["error"][msg].append(av_ent_path)
self.filter_with_children(ftrack_id)
success = False
break
entities_to_create.append(_avalon_ent)
if _vis_par is None:
break
_vis_par = str(_vis_par)
_mapped = self.avalon_ftrack_mapper.get(_vis_par)
if _mapped:
parent_id = _mapped
break
_avalon_ent = self.avalon_ents_by_id.get(_vis_par)
if not _avalon_ent:
_avalon_ent = self.avalon_archived_by_id.get(_vis_par)
if success is False:
continue
new_entity_id = None
for av_entity in reversed(entities_to_create):
new_entity_id = self.create_ftrack_ent_from_avalon_ent(
av_entity, parent_id
)
update_queue.put(new_entity_id)
if new_entity_id:
ftrack_ent_dict["entity"]["parent_id"] = new_entity_id
if hierarchy_changing_ids:
self.reload_parents(hierarchy_changing_ids)
for ftrack_id in self.update_ftrack_ids:
if ftrack_id == self.ft_project_id:
continue
avalon_id = self.ftrack_avalon_mapper[ftrack_id]
avalon_entity = self.avalon_ents_by_id[avalon_id]
avalon_attrs = self.entities_dict[ftrack_id]["avalon_attrs"]
if (
CUST_ATTR_ID_KEY not in avalon_attrs or
avalon_attrs[CUST_ATTR_ID_KEY] != avalon_id
):
configuration_id = self.entities_dict[ftrack_id][
"avalon_attrs_id"][CUST_ATTR_ID_KEY]
_entity_key = collections.OrderedDict({
"configuration_id": configuration_id,
"entity_id": ftrack_id
})
self.session.recorded_operations.push(
ftrack_api.operation.UpdateEntityOperation(
"ContextCustomAttributeValue",
_entity_key,
"value",
ftrack_api.symbol.NOT_SET,
avalon_id
)
)
# check rest of data
data_changes = self.compare_dict(
self.entities_dict[ftrack_id]["final_entity"],
avalon_entity,
ignore_keys[ftrack_id]
)
if data_changes:
self.updates[avalon_id] = self.merge_dicts(
data_changes,
self.updates[avalon_id]
)
# double check changes in tasks, some task could be renamed or
# deleted in Ftrack - not captured otherwise
final_entity = self.entities_dict[ftrack_id]["final_entity"]
if final_entity["data"].get("tasks", {}) != \
avalon_entity["data"].get("tasks", {}):
if "data" not in self.updates[avalon_id]:
self.updates[avalon_id]["data"] = {}
self.updates[avalon_id]["data"]["tasks"] = (
final_entity["data"]["tasks"]
)
def synchronize(self):
self.log.debug("* Synchronization begins")
avalon_project_id = self.ftrack_avalon_mapper.get(self.ft_project_id)
if avalon_project_id:
self.avalon_project_id = ObjectId(avalon_project_id)
# remove filtered ftrack ids from create/update list
for ftrack_id in self.all_filtered_entities:
if ftrack_id in self.create_ftrack_ids:
self.create_ftrack_ids.remove(ftrack_id)
elif ftrack_id in self.update_ftrack_ids:
self.update_ftrack_ids.remove(ftrack_id)
self.log.debug("* Processing entities for archivation")
self.delete_entities()
self.log.debug("* Processing new entities")
# Create not created entities
for ftrack_id in self.create_ftrack_ids:
# CHECK it is possible that entity was already created
# because is parent of another entity which was processed first
if ftrack_id in self.ftrack_avalon_mapper:
continue
self.create_avalon_entity(ftrack_id)
if len(self.create_list) > 0:
self.dbcon.insert_many(self.create_list)
self.session.commit()
self.log.debug("* Processing entities for update")
self.prepare_changes()
self.update_entities()
self.session.commit()
def create_avalon_entity(self, ftrack_id):
if ftrack_id == self.ft_project_id:
self.create_avalon_project()
return
entity_dict = self.entities_dict[ftrack_id]
parent_ftrack_id = entity_dict["parent_id"]
avalon_parent = None
if parent_ftrack_id != self.ft_project_id:
avalon_parent = self.ftrack_avalon_mapper.get(parent_ftrack_id)
# if not avalon_parent:
# self.create_avalon_entity(parent_ftrack_id)
# avalon_parent = self.ftrack_avalon_mapper[parent_ftrack_id]
avalon_parent = ObjectId(avalon_parent)
# avalon_archived_by_id avalon_archived_by_name
current_id = (
entity_dict["avalon_attrs"].get(CUST_ATTR_ID_KEY) or ""
).strip()
mongo_id = current_id
name = entity_dict["name"]
# Check if exist archived asset in mongo - by ID
unarchive = False
unarchive_id = self.check_unarchivation(ftrack_id, mongo_id, name)
if unarchive_id is not None:
unarchive = True
mongo_id = unarchive_id
item = entity_dict["final_entity"]
try:
new_id = ObjectId(mongo_id)
if mongo_id in self.avalon_ftrack_mapper:
new_id = ObjectId()
except InvalidId:
new_id = ObjectId()
item["_id"] = new_id
item["parent"] = self.avalon_project_id
item["schema"] = EntitySchemas["asset"]
item["data"]["visualParent"] = avalon_parent
new_id_str = str(new_id)
self.ftrack_avalon_mapper[ftrack_id] = new_id_str
self.avalon_ftrack_mapper[new_id_str] = ftrack_id
self._avalon_ents_by_id[new_id_str] = item
self._avalon_ents_by_ftrack_id[ftrack_id] = new_id_str
self._avalon_ents_by_name[item["name"]] = new_id_str
if current_id != new_id_str:
# store mongo id to ftrack entity
configuration_id = self.hier_cust_attr_ids_by_key.get(
CUST_ATTR_ID_KEY
)
if not configuration_id:
# NOTE this is for cases when CUST_ATTR_ID_KEY key is not
# hierarchical custom attribute but per entity type
configuration_id = self.entities_dict[ftrack_id][
"avalon_attrs_id"
][CUST_ATTR_ID_KEY]
_entity_key = collections.OrderedDict({
"configuration_id": configuration_id,
"entity_id": ftrack_id
})
self.session.recorded_operations.push(
ftrack_api.operation.UpdateEntityOperation(
"ContextCustomAttributeValue",
_entity_key,
"value",
ftrack_api.symbol.NOT_SET,
new_id_str
)
)
if unarchive is False:
self.create_list.append(item)
return
# If unarchive then replace entity data in database
self.dbcon.replace_one({"_id": new_id}, item)
self.remove_from_archived(mongo_id)
av_ent_path_items = item["data"]["parents"]
av_ent_path_items.append(item["name"])
av_ent_path = "/".join(av_ent_path_items)
self.log.debug("Entity was unarchived <{}>".format(av_ent_path))
def check_unarchivation(self, ftrack_id, mongo_id, name):
archived_by_id = self.avalon_archived_by_id.get(mongo_id)
archived_by_name = self.avalon_archived_by_name.get(name)
# if not found in archived then skip
if not archived_by_id and not archived_by_name:
return None
entity_dict = self.entities_dict[ftrack_id]
if archived_by_id:
# if is changeable then unarchive (nothing to check here)
if self.changeability_by_mongo_id[mongo_id]:
return mongo_id
# TODO replace `__NOTSET__` with custom None constant
archived_parent_id = archived_by_id["data"].get(
"visualParent", "__NOTSET__"
)
archived_parents = archived_by_id["data"].get("parents")
archived_name = archived_by_id["name"]
if (
archived_name != entity_dict["name"] or
archived_parents != entity_dict["final_entity"]["data"][
"parents"
]
):
return None
return mongo_id
# First check if there is any that have same parents
for archived in archived_by_name:
mongo_id = str(archived["_id"])
archived_parents = archived.get("data", {}).get("parents")
if (
archived_parents == entity_dict["final_entity"]["data"][
"parents"
]
):
return mongo_id
# Secondly try to find more close to current ftrack entity
first_changeable = None
for archived in archived_by_name:
mongo_id = str(archived["_id"])
if not self.changeability_by_mongo_id[mongo_id]:
continue
if first_changeable is None:
first_changeable = mongo_id
ftrack_parent_id = entity_dict["parent_id"]
map_ftrack_parent_id = self.ftrack_avalon_mapper.get(
ftrack_parent_id
)
# TODO replace `__NOTSET__` with custom None constant
archived_parent_id = archived.get("data", {}).get(
"visualParent", "__NOTSET__"
)
if archived_parent_id is not None:
archived_parent_id = str(archived_parent_id)
# skip if parent is archived - How this should be possible?
parent_entity = self.avalon_ents_by_id.get(archived_parent_id)
if (
parent_entity and (
map_ftrack_parent_id is not None and
map_ftrack_parent_id == str(parent_entity["_id"])
)
):
return mongo_id
# Last return first changeable with same name (or None)
return first_changeable
def create_avalon_project(self):
project_item = self.entities_dict[self.ft_project_id]["final_entity"]
mongo_id = (
self.entities_dict[self.ft_project_id]["avalon_attrs"].get(
CUST_ATTR_ID_KEY
) or ""
).strip()
try:
new_id = ObjectId(mongo_id)
except InvalidId:
new_id = ObjectId()
project_item["_id"] = new_id
project_item["parent"] = None
project_item["schema"] = EntitySchemas["project"]
project_item["config"]["schema"] = EntitySchemas["config"]
self.ftrack_avalon_mapper[self.ft_project_id] = new_id
self.avalon_ftrack_mapper[new_id] = self.ft_project_id
self.avalon_project_id = new_id
self._avalon_ents_by_id[str(new_id)] = project_item
if self._avalon_ents_by_ftrack_id is None:
self._avalon_ents_by_ftrack_id = {}
self._avalon_ents_by_ftrack_id[self.ft_project_id] = str(new_id)
if self._avalon_ents_by_name is None:
self._avalon_ents_by_name = {}
self._avalon_ents_by_name[project_item["name"]] = str(new_id)
self.create_list.append(project_item)
# store mongo id to ftrack entity
entity = self.entities_dict[self.ft_project_id]["entity"]
entity["custom_attributes"][CUST_ATTR_ID_KEY] = str(new_id)
def _bubble_changeability(self, unchangeable_ids):
unchangeable_queue = queue.Queue()
for entity_id in unchangeable_ids:
unchangeable_queue.put((entity_id, False))
processed_parents_ids = []
subsets_to_remove = []
while not unchangeable_queue.empty():
entity_id, child_is_archived = unchangeable_queue.get()
# skip if already processed
if entity_id in processed_parents_ids:
continue
entity = self.avalon_ents_by_id.get(entity_id)
# if entity is not archived but unchageable child was then skip
# - archived entities should not affect not archived?
if entity and child_is_archived:
continue
# set changeability of current entity to False
self._changeability_by_mongo_id[entity_id] = False
processed_parents_ids.append(entity_id)
# if not entity then is probably archived
if not entity:
entity = self.avalon_archived_by_id.get(entity_id)
child_is_archived = True
if not entity:
# if entity is not found then it is subset without parent
if entity_id in unchangeable_ids:
subsets_to_remove.append(entity_id)
else:
# TODO logging - What is happening here?
self.log.warning((
"Avalon contains entities without valid parents that"
" lead to Project (should not cause errors)"
" - MongoId <{}>"
).format(str(entity_id)))
continue
# skip if parent is project
parent_id = entity["data"]["visualParent"]
if parent_id is None:
continue
unchangeable_queue.put((str(parent_id), child_is_archived))
self._delete_subsets_without_asset(subsets_to_remove)
def _delete_subsets_without_asset(self, not_existing_parents):
subset_ids = []
version_ids = []
repre_ids = []
to_delete = []
for parent_id in not_existing_parents:
subsets = self.subsets_by_parent_id.get(parent_id)
if not subsets:
continue
for subset in subsets:
if subset.get("type") != "subset":
continue
subset_ids.append(subset["_id"])
db_subsets = self.dbcon.find({
"_id": {"$in": subset_ids},
"type": "subset"
})
if not db_subsets:
return
db_versions = self.dbcon.find({
"parent": {"$in": subset_ids},
"type": "version"
})
if db_versions:
version_ids = [ver["_id"] for ver in db_versions]
db_repres = self.dbcon.find({
"parent": {"$in": version_ids},
"type": "representation"
})
if db_repres:
repre_ids = [repre["_id"] for repre in db_repres]
to_delete.extend(subset_ids)
to_delete.extend(version_ids)
to_delete.extend(repre_ids)
self.dbcon.delete_many({"_id": {"$in": to_delete}})
# Probably deprecated
def _check_changeability(self, parent_id=None):
for entity in self.avalon_ents_by_parent_id[parent_id]:
mongo_id = str(entity["_id"])
is_changeable = self._changeability_by_mongo_id.get(mongo_id)
if is_changeable is not None:
continue
self._check_changeability(mongo_id)
is_changeable = True
for child in self.avalon_ents_by_parent_id[parent_id]:
if not self._changeability_by_mongo_id[str(child["_id"])]:
is_changeable = False
break
if is_changeable is True:
is_changeable = (mongo_id in self.subsets_by_parent_id)
self._changeability_by_mongo_id[mongo_id] = is_changeable
def update_entities(self):
"""
Runs changes converted to "$set" queries in bulk.
"""
mongo_changes_bulk = []
for mongo_id, changes in self.updates.items():
mongo_id = ObjectId(mongo_id)
is_project = mongo_id == self.avalon_project_id
change_data = from_dict_to_set(changes, is_project)
filter = {"_id": mongo_id}
mongo_changes_bulk.append(UpdateOne(filter, change_data))
if not mongo_changes_bulk:
# TODO LOG
return
self.dbcon.bulk_write(mongo_changes_bulk)
def reload_parents(self, hierarchy_changing_ids):
parents_queue = queue.Queue()
parents_queue.put((self.ft_project_id, [], False))
while not parents_queue.empty():
ftrack_id, parent_parents, changed = parents_queue.get()
_parents = copy.deepcopy(parent_parents)
if ftrack_id not in hierarchy_changing_ids and not changed:
if ftrack_id != self.ft_project_id:
_parents.append(self.entities_dict[ftrack_id]["name"])
for child_id in self.entities_dict[ftrack_id]["children"]:
parents_queue.put((child_id, _parents, changed))
continue
changed = True
parents = [par for par in _parents]
hierarchy = "/".join(parents)
self.entities_dict[ftrack_id][
"final_entity"]["data"]["parents"] = parents
self.entities_dict[ftrack_id][
"final_entity"]["data"]["hierarchy"] = hierarchy
_parents.append(self.entities_dict[ftrack_id]["name"])
for child_id in self.entities_dict[ftrack_id]["children"]:
parents_queue.put((child_id, _parents, changed))
if ftrack_id in self.create_ftrack_ids:
mongo_id = self.ftrack_avalon_mapper[ftrack_id]
if "data" not in self.updates[mongo_id]:
self.updates[mongo_id]["data"] = {}
self.updates[mongo_id]["data"]["parents"] = parents
self.updates[mongo_id]["data"]["hierarchy"] = hierarchy
def prepare_project_changes(self):
ftrack_ent_dict = self.entities_dict[self.ft_project_id]
ftrack_entity = ftrack_ent_dict["entity"]
avalon_code = self.avalon_project["data"]["code"]
# TODO Is possible to sync if full name was changed?
# if ftrack_ent_dict["name"] != self.avalon_project["name"]:
# ftrack_entity["full_name"] = avalon_name
# self.entities_dict[self.ft_project_id]["name"] = avalon_name
# self.entities_dict[self.ft_project_id]["final_entity"][
# "name"
# ] = avalon_name
# TODO logging
# TODO report
# TODO May this happen? Is possible to change project code?
if ftrack_entity["name"] != avalon_code:
ftrack_entity["name"] = avalon_code
self.entities_dict[self.ft_project_id]["final_entity"]["data"][
"code"
] = avalon_code
self.session.commit()
sub_msg = (
"Project code was changed back to \"{}\"".format(avalon_code)
)
msg = (
"It is not possible to change"
" project code after synchronization"
)
self.report_items["warning"][msg] = sub_msg
self.log.warning(sub_msg)
return self.compare_dict(
self.entities_dict[self.ft_project_id]["final_entity"],
self.avalon_project
)
def compare_dict(self, dict_new, dict_old, _ignore_keys=[]):
"""
Recursively compares and list changes between dictionaries
'dict_new' and 'dict_old'.
Keys in '_ignore_keys' are skipped and not compared.
Args:
dict_new (dictionary):
dict_old (dictionary):
_ignore_keys (list):
Returns:
(dictionary) of new or updated keys and theirs values
"""
# _ignore_keys may be used for keys nested dict like"data.visualParent"
changes = {}
ignore_keys = []
for key_val in _ignore_keys:
key_items = key_val.split(".")
if len(key_items) == 1:
ignore_keys.append(key_items[0])
for key, value in dict_new.items():
if key in ignore_keys:
continue
if key not in dict_old:
changes[key] = value
continue
if isinstance(value, dict):
if not isinstance(dict_old[key], dict):
changes[key] = value
continue
_new_ignore_keys = []
for key_val in _ignore_keys:
key_items = key_val.split(".")
if len(key_items) <= 1:
continue
_new_ignore_keys.append(".".join(key_items[1:]))
_changes = self.compare_dict(
value, dict_old[key], _new_ignore_keys
)
if _changes:
changes[key] = _changes
continue
if value != dict_old[key]:
changes[key] = value
return changes
def merge_dicts(self, dict_new, dict_old):
"""
Apply all new or updated keys from 'dict_new' on 'dict_old'.
Recursively.
Doesn't recognise that 'dict_new' doesn't contain some keys
anymore.
Args:
dict_new (dictionary): from Ftrack most likely
dict_old (dictionary): current in DB
Returns:
(dictionary) of applied changes to original dictionary
"""
for key, value in dict_new.items():
if key not in dict_old:
dict_old[key] = value
continue
if isinstance(value, dict):
dict_old[key] = self.merge_dicts(value, dict_old[key])
continue
dict_old[key] = value
return dict_old
def delete_entities(self):
if not self.deleted_entities:
return
# Try to order so child is not processed before parent
deleted_entities = []
_deleted_entities = [id for id in self.deleted_entities]
while True:
if not _deleted_entities:
break
_ready = []
for mongo_id in _deleted_entities:
ent = self.avalon_ents_by_id[mongo_id]
vis_par = ent["data"]["visualParent"]
if (
vis_par is not None and
str(vis_par) in _deleted_entities
):
continue
_ready.append(mongo_id)
for id in _ready:
deleted_entities.append(id)
_deleted_entities.remove(id)
delete_ids = []
for mongo_id in deleted_entities:
# delete if they are deletable
if self.changeability_by_mongo_id[mongo_id]:
delete_ids.append(ObjectId(mongo_id))
continue
# check if any new created entity match same entity
# - name and parents must match
deleted_entity = self.avalon_ents_by_id[mongo_id]
name = deleted_entity["name"]
parents = deleted_entity["data"]["parents"]
similar_ent_id = None
for ftrack_id in self.create_ftrack_ids:
_ent_final = self.entities_dict[ftrack_id]["final_entity"]
if _ent_final["name"] != name:
continue
if _ent_final["data"]["parents"] != parents:
continue
# If in create is "same" then we can "archive" current
# since will be unarchived in create method
similar_ent_id = ftrack_id
break
# If similar entity(same name and parents) is in create
# entities list then just change from create to update
if similar_ent_id is not None:
self.create_ftrack_ids.remove(similar_ent_id)
self.update_ftrack_ids.append(similar_ent_id)
self.avalon_ftrack_mapper[mongo_id] = similar_ent_id
self.ftrack_avalon_mapper[similar_ent_id] = mongo_id
continue
found_by_name_id = None
for ftrack_id, ent_dict in self.entities_dict.items():
if not ent_dict.get("name"):
continue
if name == ent_dict["name"]:
found_by_name_id = ftrack_id
break
if found_by_name_id is not None:
# * THESE conditins are too complex to implement in first stage
# - probably not possible to solve if this happen
# if found_by_name_id in self.create_ftrack_ids:
# # reparent entity of the new one create?
# pass
#
# elif found_by_name_id in self.update_ftrack_ids:
# found_mongo_id = self.ftrack_avalon_mapper[found_by_name_id]
#
# ent_dict = self.entities_dict[found_by_name_id]
# TODO report - CRITICAL entity with same name alread exists in
# different hierarchy - can't recreate entity
continue
_vis_parent = deleted_entity["data"]["visualParent"]
if _vis_parent is None:
_vis_parent = self.avalon_project_id
_vis_parent = str(_vis_parent)
ftrack_parent_id = self.avalon_ftrack_mapper[_vis_parent]
self.create_ftrack_ent_from_avalon_ent(
deleted_entity, ftrack_parent_id
)
filter = {"_id": {"$in": delete_ids}, "type": "asset"}
self.dbcon.update_many(filter, {"$set": {"type": "archived_asset"}})
def create_ftrack_ent_from_avalon_ent(self, av_entity, parent_id):
new_entity = None
parent_entity = self.entities_dict[parent_id]["entity"]
_name = av_entity["name"]
_type = av_entity["data"].get("entityType", "folder")
self.log.debug((
"Re-ceating deleted entity {} <{}>"
).format(_name, _type))
new_entity = self.session.create(_type, {
"name": _name,
"parent": parent_entity
})
self.session.commit()
final_entity = {}
for k, v in av_entity.items():
final_entity[k] = v
if final_entity.get("type") != "asset":
final_entity["type"] = "asset"
new_entity_id = new_entity["id"]
new_entity_data = {
"entity": new_entity,
"parent_id": parent_id,
"entity_type": _type.lower(),
"entity_type_orig": _type,
"name": _name,
"final_entity": final_entity
}
for k, v in new_entity_data.items():
self.entities_dict[new_entity_id][k] = v
p_chilren = self.entities_dict[parent_id]["children"]
if new_entity_id not in p_chilren:
self.entities_dict[parent_id]["children"].append(new_entity_id)
cust_attr, hier_attrs = get_pype_attr(self.session)
for _attr in cust_attr:
key = _attr["key"]
if key not in av_entity["data"]:
continue
if key not in new_entity["custom_attributes"]:
continue
value = av_entity["data"][key]
if not value:
continue
new_entity["custom_attributes"][key] = value
av_entity_id = str(av_entity["_id"])
new_entity["custom_attributes"][CUST_ATTR_ID_KEY] = av_entity_id
self.ftrack_avalon_mapper[new_entity_id] = av_entity_id
self.avalon_ftrack_mapper[av_entity_id] = new_entity_id
self.session.commit()
ent_path = self.get_ent_path(new_entity_id)
msg = (
"Deleted entity was recreated because it or its children"
" contain published data"
)
self.report_items["info"][msg].append(ent_path)
return new_entity_id
def regex_duplicate_interface(self):
items = []
if self.failed_regex or self.tasks_failed_regex:
subtitle = "Entity names contain prohibited symbols:"
items.append({
"type": "label",
"value": "# {}".format(subtitle)
})
items.append({
"type": "label",
"value": (
"<p><i>NOTE: You can use Letters( a-Z ),"
" Numbers( 0-9 ) and Underscore( _ )</i></p>"
)
})
log_msgs = []
for name, ids in self.failed_regex.items():
error_title = {
"type": "label",
"value": "## {}".format(name)
}
items.append(error_title)
paths = []
for entity_id in ids:
ent_path = self.get_ent_path(entity_id)
paths.append(ent_path)
error_message = {
"type": "label",
"value": '<p>{}</p>'.format("<br>".join(paths))
}
items.append(error_message)
log_msgs.append("<{}> ({})".format(name, ",".join(paths)))
for name, ids in self.tasks_failed_regex.items():
error_title = {
"type": "label",
"value": "## Task: {}".format(name)
}
items.append(error_title)
paths = []
for entity_id in ids:
ent_path = self.get_ent_path(entity_id)
ent_path = "/".join([ent_path, name])
paths.append(ent_path)
error_message = {
"type": "label",
"value": '<p>{}</p>'.format("<br>".join(paths))
}
items.append(error_message)
log_msgs.append("<{}> ({})".format(name, ",".join(paths)))
self.log.warning("{}{}".format(subtitle, ", ".join(log_msgs)))
if self.duplicates:
subtitle = "Duplicated entity names:"
items.append({
"type": "label",
"value": "# {}".format(subtitle)
})
items.append({
"type": "label",
"value": (
"<p><i>NOTE: It is not allowed to use the same name"
" for multiple entities in the same project</i></p>"
)
})
log_msgs = []
for name, ids in self.duplicates.items():
error_title = {
"type": "label",
"value": "## {}".format(name)
}
items.append(error_title)
paths = []
for entity_id in ids:
ent_path = self.get_ent_path(entity_id)
paths.append(ent_path)
error_message = {
"type": "label",
"value": '<p>{}</p>'.format("<br>".join(paths))
}
items.append(error_message)
log_msgs.append("<{}> ({})".format(name, ", ".join(paths)))
self.log.warning("{}{}".format(subtitle, ", ".join(log_msgs)))
return items
def report(self):
items = []
project_name = self.entities_dict[self.ft_project_id]["name"]
title = "Synchronization report ({}):".format(project_name)
keys = ["error", "warning", "info"]
for key in keys:
subitems = []
if key == "warning":
for _item in self.regex_duplicate_interface():
subitems.append(_item)
for msg, _items in self.report_items[key].items():
if not _items:
continue
subitems.append({
"type": "label",
"value": "# {}".format(msg)
})
if isinstance(_items, str):
_items = [_items]
subitems.append({
"type": "label",
"value": '<p>{}</p>'.format("<br>".join(_items))
})
if items and subitems:
items.append(self.report_splitter)
items.extend(subitems)
return {
"items": items,
"title": title,
"success": False,
"message": "Synchronization Finished"
}