From f2f11e444efe2a8c54b85205ee7fa6f2c8298e5c Mon Sep 17 00:00:00 2001 From: Jakub Trllo Date: Thu, 14 Mar 2024 18:42:51 +0100 Subject: [PATCH] modified templates --- .../ayon_core/pipeline/anatomy/templates.py | 687 +++++++++++------- 1 file changed, 408 insertions(+), 279 deletions(-) diff --git a/client/ayon_core/pipeline/anatomy/templates.py b/client/ayon_core/pipeline/anatomy/templates.py index f8dd8179ae..63dfd9b1b0 100644 --- a/client/ayon_core/pipeline/anatomy/templates.py +++ b/client/ayon_core/pipeline/anatomy/templates.py @@ -1,19 +1,19 @@ import os -import copy import re +import copy import collections import numbers -import six - -from ayon_core.lib import Logger from ayon_core.lib.path_templates import ( TemplateResult, StringTemplate, - TemplatesDict, ) -from .exceptions import AnatomyTemplateUnsolved +from .exceptions import ( + ProjectNotSet, + TemplateMissingKey, + AnatomyTemplateUnsolved, +) from .roots import RootItem @@ -67,7 +67,12 @@ class AnatomyTemplateResult(TemplateResult): class AnatomyStringTemplate(StringTemplate): - """String template which has access to anatomy.""" + """String template which has access to anatomy. + + Args: + anatomy_templates (AnatomyTemplates): Anatomy templates object. + template (str): Template string. + """ def __init__(self, anatomy_templates, template): self.anatomy_templates = anatomy_templates @@ -88,10 +93,154 @@ class AnatomyStringTemplate(StringTemplate): data = copy.deepcopy(data) data["root"] = anatomy_templates.anatomy.roots result = StringTemplate.format(self, data) - rootless_path = anatomy_templates.rootless_path_from_result(result) + rootless_path = anatomy_templates.get_rootless_path_from_result( + result + ) return AnatomyTemplateResult(result, rootless_path) +def _merge_dict(main_dict, enhance_dict): + """Merges dictionaries by keys. + + Function call itself if value on key is again dictionary. + + Args: + main_dict (dict): First dict to merge second one into. + enhance_dict (dict): Second dict to be merged. + + Returns: + dict: Merged result. + + .. note:: does not override whole value on first found key + but only values differences from enhance_dict + + """ + + merge_queue = collections.deque() + merge_queue.append((main_dict, enhance_dict)) + while merge_queue: + queue_item = merge_queue.popleft() + l_dict, r_dict = queue_item + + for key, value in r_dict.items(): + if key not in l_dict: + l_dict[key] = value + elif isinstance(value, dict) and isinstance(l_dict[key], dict): + merge_queue.append((l_dict[key], value)) + else: + l_dict[key] = value + return main_dict + + +class TemplatesResultDict(dict): + """Holds and wrap 'AnatomyTemplateResult' for easy bug report. + + Dictionary like object which holds 'AnatomyTemplateResult' in the same + data structure as base dictionary of anatomy templates. It can raise + + """ + + def __init__(self, in_data, key=None, parent=None, strict=None): + super(TemplatesResultDict, self).__init__() + for _key, _value in in_data.items(): + if isinstance(_value, TemplatesResultDict): + _value.parent = self + elif isinstance(_value, dict): + _value = self.__class__(_value, _key, self) + self[_key] = _value + + if strict is None and parent is None: + strict = True + + self.key = key + self.parent = parent + self._is_strict = strict + + def __getitem__(self, key): + if key not in self.keys(): + hier = self.get_hierarchy() + hier.append(key) + raise TemplateMissingKey(hier) + + value = super(TemplatesResultDict, self).__getitem__(key) + if isinstance(value, self.__class__): + return value + + # Raise exception when expected solved templates and it is not. + if self.is_strict and hasattr(value, "validate"): + value.validate() + return value + + def get_is_strict(self): + return self._is_strict + + def set_is_strict(self, is_strict): + if is_strict is None and self.parent is None: + is_strict = True + self._is_strict = is_strict + for child in self.values(): + if isinstance(child, self.__class__): + child.set_is_strict(is_strict) + elif isinstance(child, AnatomyTemplateResult): + child.strict = is_strict + + strict = property(get_is_strict, set_is_strict) + is_strict = property(get_is_strict, set_is_strict) + + def get_hierarchy(self): + """Return dictionary keys one by one to root parent.""" + if self.key is None: + return [] + + if self.parent is None: + return [self.key] + + par_hier = list(self.parent.get_hierarchy()) + par_hier.append(self.key) + return par_hier + + @property + def missing_keys(self): + """Return missing keys of all children templates.""" + missing_keys = set() + for value in self.values(): + missing_keys |= value.missing_keys + return missing_keys + + @property + def invalid_types(self): + """Return invalid types of all children templates.""" + invalid_types = {} + for value in self.values(): + invalid_types = _merge_dict(invalid_types, value.invalid_types) + return invalid_types + + @property + def used_values(self): + """Return used values for all children templates.""" + used_values = {} + for value in self.values(): + used_values = _merge_dict(used_values, value.used_values) + return used_values + + def get_solved(self): + """Get only solved key from templates.""" + result = {} + for key, value in self.items(): + if isinstance(value, self.__class__): + value = value.get_solved() + if not value: + continue + result[key] = value + + elif ( + not hasattr(value, "solved") or + value.solved + ): + result[key] = value + return self.__class__(result, key=self.key, parent=self.parent) + + class TemplateItem: """Template item under template category. @@ -146,7 +295,7 @@ class TemplateItem: if isinstance(value, AnatomyStringTemplate): value = value.format(data) output[key] = value - return output + return TemplatesResultDict(output, strict=strict) class TemplateCategory: @@ -199,8 +348,10 @@ class TemplateCategory: elif isinstance(value, AnatomyStringTemplate): value = value.format(data) + if isinstance(value, TemplatesResultDict): + value.key = key output[key] = value - return output + return TemplatesResultDict(output, key=self.name, strict=strict) def _convert_getter_key(self, key): """Convert key for backwards compatibility. @@ -229,164 +380,76 @@ class TemplateCategory: class AnatomyTemplates(TemplatesDict): +class AnatomyTemplates: inner_key_pattern = re.compile(r"(\{@.*?[^{}0]*\})") inner_key_name_pattern = re.compile(r"\{@(.*?[^{}0]*)\}") def __init__(self, anatomy): - self._log = Logger.get_logger(self.__class__.__name__) - super(AnatomyTemplates, self).__init__() - self.anatomy = anatomy - self.loaded_project = None + self._anatomy = anatomy + + self._loaded_project = None + self._raw_templates = None + self._templates = None + self._objected_templates = None + + def __getitem__(self, key): + self._validate_discovery() + return self._objected_templates[key] + + def get(self, key, default=None): + self._validate_discovery() + return self._objected_templates.get(key, default) + + def keys(self): + return self._objected_templates.keys() def reset(self): self._raw_templates = None self._templates = None self._objected_templates = None + @property + def anatomy(self): + """Anatomy instance. + + Returns: + Anatomy: Anatomy instance. + + """ + return self._anatomy + @property def project_name(self): - return self.anatomy.project_name + """Project name. + + Returns: + Union[str, None]: Project name if set, otherwise None. + + """ + return self._anatomy.project_name @property def roots(self): - return self.anatomy.roots + """Anatomy roots object. + + Returns: + RootItem: Anatomy roots data. + + """ + return self._anatomy.roots @property def templates(self): - self._validate_discovery() - return self._templates + """Templates data. - @property - def objected_templates(self): - self._validate_discovery() - return self._objected_templates - - def _validate_discovery(self): - if self.project_name != self.loaded_project: - self.reset() - - if self._templates is None: - self._discover() - self.loaded_project = self.project_name - - def _format_value(self, value, data): - if isinstance(value, RootItem): - return self._solve_dict(value, data) - return super(AnatomyTemplates, self)._format_value(value, data) - - @staticmethod - def _ayon_template_conversion(templates): - def _convert_template_item(template_item): - # Change 'directory' to 'folder' - if "directory" in template_item: - template_item["folder"] = template_item["directory"] - - if ( - "path" not in template_item - and "file" in template_item - and "folder" in template_item - ): - template_item["path"] = "/".join( - (template_item["folder"], template_item["file"]) - ) - - def _get_default_template_name(templates): - default_template = None - for name, template in templates.items(): - if name == "default": - return "default" - - if default_template is None: - default_template = name - - return default_template - - def _fill_template_category(templates, cat_templates, cat_key): - default_template_name = _get_default_template_name(cat_templates) - for template_name, cat_template in cat_templates.items(): - _convert_template_item(cat_template) - if template_name == default_template_name: - templates[cat_key] = cat_template - else: - new_name = "{}_{}".format(cat_key, template_name) - templates["others"][new_name] = cat_template - - others_templates = templates.pop("others", None) or {} - new_others_templates = {} - templates["others"] = new_others_templates - for name, template in others_templates.items(): - _convert_template_item(template) - new_others_templates[name] = template - - for key in ( - "work", - "publish", - "hero", - ): - cat_templates = templates.pop(key) - _fill_template_category(templates, cat_templates, key) - - delivery_templates = templates.pop("delivery", None) or {} - new_delivery_templates = {} - for name, delivery_template in delivery_templates.items(): - new_delivery_templates[name] = "/".join( - (delivery_template["directory"], delivery_template["file"]) - ) - templates["delivery"] = new_delivery_templates - - def set_templates(self, templates): - if not templates: - self.reset() - return - - templates = copy.deepcopy(templates) - # TODO remove AYON convertion - self._ayon_template_conversion(templates) - - self._raw_templates = copy.deepcopy(templates) - v_queue = collections.deque() - v_queue.append(templates) - while v_queue: - item = v_queue.popleft() - if not isinstance(item, dict): - continue - - for key in tuple(item.keys()): - value = item[key] - if isinstance(value, dict): - v_queue.append(value) - - elif ( - isinstance(value, six.string_types) - and "{task}" in value - ): - item[key] = value.replace("{task}", "{task[name]}") - - solved_templates = self.solve_template_inner_links(templates) - self._templates = solved_templates - self._objected_templates = self.create_objected_templates( - solved_templates - ) - - def _create_template_object(self, template): - return AnatomyStringTemplate(self, template) - - def default_templates(self): - """Return default templates data with solved inner keys.""" - return self.solve_template_inner_links( - self.anatomy["templates"] - ) - - def _discover(self): - """ Loads anatomy templates from yaml. - Default templates are loaded if project is not set or project does - not have set it's own. - TODO: create templates if not exist. + Templates data with replaced common data. Returns: - TemplatesResultDict: Contain templates data for current project of - default templates. + dict[str, Any]: Templates data. + """ + self._validate_discovery() + return self._templates @property def frame_padding(self): @@ -459,17 +522,152 @@ class AnatomyTemplates(TemplatesDict): return output - if self.project_name is None: - # QUESTION create project specific if not found? - raise AssertionError(( - "Project \"{0}\" does not have his own templates." - " Trying to use default." - ).format(self.project_name)) + def format(self, data, strict=True): + """Fill all templates based on entered data. - self.set_templates(self.anatomy["templates"]) + Args: + data (dict[str, Any]): Fill data used for template formatting. + strict (Optional[bool]): Raise exception is accessed value is + not fully filled. + + Returns: + TemplatesResultDict: Output `TemplateResult` have `strict` + attribute set to False so accessing unfilled keys in templates + won't raise any exceptions. + + """ + self._validate_discovery() + copy_data = copy.deepcopy(data) + roots = self._anatomy.roots + if roots: + copy_data["root"] = roots + + return self._solve_dict(copy_data, strict) + + def format_all(self, in_data): + """Fill all templates based on entered data. + + Deprecated: + Use `format` method with `strict=False` instead. + + Args: + in_data (dict): Containing keys to be filled into template. + + Returns: + TemplatesResultDict: Output `TemplateResult` have `strict` + attribute set to False so accessing unfilled keys in templates + won't raise any exceptions. + + """ + return self.format(in_data, strict=False) + + def get_template(self, category_name, template_name, subkey=None): + """Get template item from category. + + Args: + category_name (str): Category name. + template_name (str): Template name. + subkey (Optional[str]): Subkey name. + + Returns: + Any: Template item or subkey value. + + """ + self._validate_discovery() + category = self.get(category_name) + if category is None: + return None + + template_item = category.get(template_name) + if template_item is None: + return template_item + + if subkey is None: + return template_item + + return template_item.get(subkey) + + def _solve_dict(self, data, strict): + """ Solves templates with entered data. + + Args: + data (dict): Containing keys to be filled into template. + + Returns: + dict: With `TemplateResult` in values containing filled or + partially filled templates. + + """ + output = {} + for key, value in self._objected_templates.items(): + if isinstance(value, TemplateCategory): + value = value.format(data, strict) + elif isinstance(value, AnatomyStringTemplate): + value = value.format(data) + output[key] = value + return TemplatesResultDict(output, strict=strict) + + def _validate_discovery(self): + """Validate if templates are discovered and loaded for anatomy project. + + When project changes the cached data are reset and discovered again. + """ + if self.project_name != self._loaded_project: + self.reset() + + if self._templates is None: + self._discover() + self._loaded_project = self.project_name + + def _create_objected_templates(self, templates): + """Create objected templates from templates data. + + Args: + templates (dict[str, Any]): Templates data from project entity. + + Returns: + dict[str, Any]: Values are cnmverted to template objects. + + """ + objected_templates = {} + for category_name, category_value in copy.deepcopy(templates).items(): + if isinstance(category_value, dict): + category_value = TemplateCategory( + self, category_name, category_value + ) + elif isinstance(category_value, str): + category_value = AnatomyStringTemplate(self, category_value) + objected_templates[category_name] = category_value + return objected_templates + + def _discover(self): + """Load and cache templates from project entity.""" + if self.project_name is None: + raise ProjectNotSet("Anatomy project is not set.") + + templates = self.anatomy["templates"] + self._raw_templates = copy.deepcopy(templates) + + templates = copy.deepcopy(templates) + # Make sure all the keys are available + for key in ( + "publish", + "hero", + "work", + "delivery", + "staging", + "others", + ): + templates.setdefault(key, {}) + + solved_templates = self._solve_template_inner_links(templates) + self._templates = solved_templates + self._objected_templates = self._create_objected_templates( + solved_templates + ) @classmethod - def replace_inner_keys(cls, matches, value, key_values, key): + def _replace_inner_keys(cls, matches, value, key_values, key): """Replacement of inner keys in template values.""" for match in matches: anatomy_sub_keys = ( @@ -493,7 +691,7 @@ class AnatomyTemplates(TemplatesDict): if not ( isinstance(replace_value, numbers.Number) - or isinstance(replace_value, six.string_types) + or isinstance(replace_value, str) ): raise ValueError(( "Anatomy templates can't be filled." @@ -507,7 +705,7 @@ class AnatomyTemplates(TemplatesDict): return value @classmethod - def prepare_inner_keys(cls, key_values): + def _prepare_inner_keys(cls, key_values): """Check values of inner keys. Check if inner key exist in template group and has valid value. @@ -521,14 +719,14 @@ class AnatomyTemplates(TemplatesDict): for key in tuple(keys_to_solve): value = key_values[key] - if isinstance(value, six.string_types): + if isinstance(value, str): matches = cls.inner_key_pattern.findall(value) if not matches: keys_to_solve.remove(key) continue found = True - key_values[key] = cls.replace_inner_keys( + key_values[key] = cls._replace_inner_keys( matches, value, key_values, key ) continue @@ -545,7 +743,7 @@ class AnatomyTemplates(TemplatesDict): subdict_found = True found = True - key_values[key][_key] = cls.replace_inner_keys( + key_values[key][_key] = cls._replace_inner_keys( matches, _value, key_values, "{}.{}".format(key, _key) ) @@ -559,7 +757,7 @@ class AnatomyTemplates(TemplatesDict): return key_values @classmethod - def solve_template_inner_links(cls, templates): + def _solve_template_inner_links(cls, templates): """Solve templates inner keys identified by "{@*}". Process is split into 2 parts. @@ -599,132 +797,63 @@ class AnatomyTemplates(TemplatesDict): key_1: "value_1" key_2: "value_2" key_4: "value_3/value_2" + + Returns: + dict[str, Any]: Solved templates data. + """ default_key_values = templates.pop("common", {}) - for key, value in tuple(templates.items()): - if isinstance(value, dict): - continue - default_key_values[key] = templates.pop(key) - - # Pop "others" key before before expected keys are processed - other_templates = templates.pop("others") or {} - - keys_by_subkey = {} - for sub_key, sub_value in templates.items(): - key_values = {} - key_values.update(default_key_values) - key_values.update(sub_value) - keys_by_subkey[sub_key] = cls.prepare_inner_keys(key_values) - - for sub_key, sub_value in other_templates.items(): - if sub_key in keys_by_subkey: - self.log.warning(( - "Key \"{}\" is duplicated in others. Skipping." - ).format(sub_key)) - continue - - key_values = {} - key_values.update(default_key_values) - key_values.update(sub_value) - keys_by_subkey[sub_key] = cls.prepare_inner_keys(key_values) - - default_keys_by_subkeys = cls.prepare_inner_keys(default_key_values) + output = {} + for category_name, category_value in templates.items(): + new_category_value = {} + for key, value in category_value.items(): + key_values = copy.deepcopy(default_key_values) + key_values.update(value) + new_category_value[key] = cls._prepare_inner_keys(key_values) + output[category_name] = new_category_value + default_keys_by_subkeys = cls._prepare_inner_keys(default_key_values) for key, value in default_keys_by_subkeys.items(): - keys_by_subkey[key] = value + output[key] = value - return keys_by_subkey + return output @classmethod - def _dict_to_subkeys_list(cls, subdict, pre_keys=None): - if pre_keys is None: - pre_keys = [] + def _dict_to_subkeys_list(cls, subdict): + """Convert dictionary to list of subkeys. + + Example:: + + _dict_to_subkeys_list({ + "root": { + "work": "path/to/work", + "publish": "path/to/publish" + } + }) + [ + ["root", "work"], + ["root", "publish"] + ] + + + Args: + dict[str, Any]: Dictionary to be converted. + + Returns: + list[list[str]]: List of subkeys. + + """ output = [] - for key in subdict: - value = subdict[key] - result = list(pre_keys) - result.append(key) - if isinstance(value, dict): - for item in cls._dict_to_subkeys_list(value, result): - output.append(item) - else: - output.append(result) - return output - - def _keys_to_dicts(self, key_list, value): - if not key_list: - return None - if len(key_list) == 1: - return {key_list[0]: value} - return {key_list[0]: self._keys_to_dicts(key_list[1:], value)} - - @classmethod - def rootless_path_from_result(cls, result): - """Calculate rootless path from formatting result. - - Args: - result (TemplateResult): Result of StringTemplate formatting. - - Returns: - str: Rootless path if result contains one of anatomy roots. - """ - - used_values = result.used_values - missing_keys = result.missing_keys - template = result.template - invalid_types = result.invalid_types - if ( - "root" not in used_values - or "root" in missing_keys - or "{root" not in template - ): - return - - for invalid_type in invalid_types: - if "root" in invalid_type: - return - - root_keys = cls._dict_to_subkeys_list({"root": used_values["root"]}) - if not root_keys: - return - - output = str(result) - for used_root_keys in root_keys: - if not used_root_keys: - continue - - used_value = used_values - root_key = None - for key in used_root_keys: - used_value = used_value[key] - if root_key is None: - root_key = key + subkey_queue = collections.deque() + subkey_queue.append((subdict, [])) + while subkey_queue: + queue_item = subkey_queue.popleft() + data, pre_keys = queue_item + for key, value in data.items(): + result = list(pre_keys) + result.append(key) + if isinstance(value, dict): + subkey_queue.append((value, result)) else: - root_key += "[{}]".format(key) - - root_key = "{" + root_key + "}" - output = output.replace(str(used_value), root_key) - + output.append(result) return output - - def format(self, data, strict=True): - copy_data = copy.deepcopy(data) - roots = self.roots - if roots: - copy_data["root"] = roots - result = super(AnatomyTemplates, self).format(copy_data) - result.strict = strict - return result - - def format_all(self, in_data, only_keys=True): - """ Solves templates based on entered data. - - Args: - data (dict): Containing keys to be filled into template. - - Returns: - TemplatesResultDict: Output `TemplateResult` have `strict` - attribute set to False so accessing unfilled keys in templates - won't raise any exceptions. - """ - return self.format(in_data, strict=False) \ No newline at end of file