From b908ddcedb7e0c29d9d3ca3df35d5940d8e56d82 Mon Sep 17 00:00:00 2001 From: Jakub Trllo Date: Thu, 14 Mar 2024 18:10:22 +0100 Subject: [PATCH] split anatomy.py into multiple python files --- client/ayon_core/pipeline/anatomy.py | 1550 ----------------- client/ayon_core/pipeline/anatomy/__init__.py | 17 + client/ayon_core/pipeline/anatomy/anatomy.py | 502 ++++++ .../ayon_core/pipeline/anatomy/exceptions.py | 39 + client/ayon_core/pipeline/anatomy/roots.py | 534 ++++++ .../ayon_core/pipeline/anatomy/templates.py | 523 ++++++ 6 files changed, 1615 insertions(+), 1550 deletions(-) delete mode 100644 client/ayon_core/pipeline/anatomy.py create mode 100644 client/ayon_core/pipeline/anatomy/__init__.py create mode 100644 client/ayon_core/pipeline/anatomy/anatomy.py create mode 100644 client/ayon_core/pipeline/anatomy/exceptions.py create mode 100644 client/ayon_core/pipeline/anatomy/roots.py create mode 100644 client/ayon_core/pipeline/anatomy/templates.py diff --git a/client/ayon_core/pipeline/anatomy.py b/client/ayon_core/pipeline/anatomy.py deleted file mode 100644 index d6e09bad39..0000000000 --- a/client/ayon_core/pipeline/anatomy.py +++ /dev/null @@ -1,1550 +0,0 @@ -import os -import re -import copy -import platform -import collections -import numbers -import time - -import six -import ayon_api - -from ayon_core.lib import Logger, get_local_site_id -from ayon_core.lib.path_templates import ( - TemplateUnsolved, - TemplateResult, - StringTemplate, - TemplatesDict, - FormatObject, -) -from ayon_core.addon import AddonsManager - -log = Logger.get_logger(__name__) - - -class ProjectNotSet(Exception): - """Exception raised when is created Anatomy without project name.""" - - -class RootCombinationError(Exception): - """This exception is raised when templates has combined root types.""" - - def __init__(self, roots): - joined_roots = ", ".join( - ["\"{}\"".format(_root) for _root in roots] - ) - # TODO better error message - msg = ( - "Combination of root with and" - " without root name in AnatomyTemplates. {}" - ).format(joined_roots) - - super(RootCombinationError, self).__init__(msg) - - -class BaseAnatomy(object): - """Anatomy module helps to keep project settings. - - Wraps key project specifications, AnatomyTemplates and Roots. - """ - root_key_regex = re.compile(r"{(root?[^}]+)}") - root_name_regex = re.compile(r"root\[([^]]+)\]") - - def __init__(self, project_entity, root_overrides=None): - project_name = project_entity["name"] - self.project_name = project_name - self.project_code = project_entity["code"] - - self._data = self._prepare_anatomy_data( - project_entity, root_overrides - ) - self._templates_obj = AnatomyTemplates(self) - self._roots_obj = Roots(self) - - # Anatomy used as dictionary - # - implemented only getters returning copy - def __getitem__(self, key): - return copy.deepcopy(self._data[key]) - - def get(self, key, default=None): - return copy.deepcopy(self._data).get(key, default) - - def keys(self): - return copy.deepcopy(self._data).keys() - - def values(self): - return copy.deepcopy(self._data).values() - - def items(self): - return copy.deepcopy(self._data).items() - - def _prepare_anatomy_data(self, project_entity, root_overrides): - """Prepare anatomy data for further processing. - - Method added to replace `{task}` with `{task[name]}` in templates. - """ - - anatomy_data = self._project_entity_to_anatomy_data(project_entity) - - self._apply_local_settings_on_anatomy_data( - anatomy_data, - root_overrides - ) - - return anatomy_data - - @property - def templates(self): - """Wrap property `templates` of Anatomy's AnatomyTemplates instance.""" - return self._templates_obj.templates - - @property - def templates_obj(self): - """Return `AnatomyTemplates` object of current Anatomy instance.""" - return self._templates_obj - - def format(self, *args, **kwargs): - """Wrap `format` method of Anatomy's `templates_obj`.""" - return self._templates_obj.format(*args, **kwargs) - - def format_all(self, *args, **kwargs): - """Wrap `format_all` method of Anatomy's `templates_obj`.""" - return self._templates_obj.format_all(*args, **kwargs) - - @property - def roots(self): - """Wrap `roots` property of Anatomy's `roots_obj`.""" - return self._roots_obj.roots - - @property - def roots_obj(self): - """Return `Roots` object of current Anatomy instance.""" - return self._roots_obj - - def root_environments(self): - """Return AYON_PROJECT_ROOT_* environments for current project.""" - return self._roots_obj.root_environments() - - def root_environmets_fill_data(self, template=None): - """Environment variable values in dictionary for rootless path. - - Args: - template (str): Template for environment variable key fill. - By default is set to `"${}"`. - """ - return self.roots_obj.root_environmets_fill_data(template) - - def find_root_template_from_path(self, *args, **kwargs): - """Wrapper for Roots `find_root_template_from_path`.""" - return self.roots_obj.find_root_template_from_path(*args, **kwargs) - - def path_remapper(self, *args, **kwargs): - """Wrapper for Roots `path_remapper`.""" - return self.roots_obj.path_remapper(*args, **kwargs) - - def all_root_paths(self): - """Wrapper for Roots `all_root_paths`.""" - return self.roots_obj.all_root_paths() - - def set_root_environments(self): - """Set AYON_PROJECT_ROOT_* environments for current project.""" - self._roots_obj.set_root_environments() - - def root_names(self): - """Return root names for current project.""" - return self.root_names_from_templates(self.templates) - - def _root_keys_from_templates(self, data): - """Extract root key from templates in data. - - Args: - data (dict): Data that may contain templates as string. - - Return: - set: Set of all root names from templates as strings. - - Output example: `{"root[work]", "root[publish]"}` - """ - - output = set() - if isinstance(data, dict): - for value in data.values(): - for root in self._root_keys_from_templates(value): - output.add(root) - - elif isinstance(data, str): - for group in re.findall(self.root_key_regex, data): - output.add(group) - - return output - - def root_value_for_template(self, template): - """Returns value of root key from template.""" - root_templates = [] - for group in re.findall(self.root_key_regex, template): - root_templates.append("{" + group + "}") - - if not root_templates: - return None - - return root_templates[0].format(**{"root": self.roots}) - - def root_names_from_templates(self, templates): - """Extract root names form anatomy templates. - - Returns None if values in templates contain only "{root}". - Empty list is returned if there is no "root" in templates. - Else returns all root names from templates in list. - - RootCombinationError is raised when templates contain both root types, - basic "{root}" and with root name specification "{root[work]}". - - Args: - templates (dict): Anatomy templates where roots are not filled. - - Return: - list/None: List of all root names from templates as strings when - multiroot setup is used, otherwise None is returned. - """ - roots = list(self._root_keys_from_templates(templates)) - # Return empty list if no roots found in templates - if not roots: - return roots - - # Raise exception when root keys have roots with and without root name. - # Invalid output example: ["root", "root[project]", "root[render]"] - if len(roots) > 1 and "root" in roots: - raise RootCombinationError(roots) - - # Return None if "root" without root name in templates - if len(roots) == 1 and roots[0] == "root": - return None - - names = set() - for root in roots: - for group in re.findall(self.root_name_regex, root): - names.add(group) - return list(names) - - def fill_root(self, template_path): - """Fill template path where is only "root" key unfilled. - - Args: - template_path (str): Path with "root" key in. - Example path: "{root}/projects/MyProject/Shot01/Lighting/..." - - Return: - str: formatted path - """ - # NOTE does not care if there are different keys than "root" - return template_path.format(**{"root": self.roots}) - - @classmethod - def fill_root_with_path(cls, rootless_path, root_path): - """Fill path without filled "root" key with passed path. - - This is helper to fill root with different directory path than anatomy - has defined no matter if is single or multiroot. - - Output path is same as input path if `rootless_path` does not contain - unfilled root key. - - Args: - rootless_path (str): Path without filled "root" key. Example: - "{root[work]}/MyProject/..." - root_path (str): What should replace root key in `rootless_path`. - - Returns: - str: Path with filled root. - """ - output = str(rootless_path) - for group in re.findall(cls.root_key_regex, rootless_path): - replacement = "{" + group + "}" - output = output.replace(replacement, root_path) - - return output - - def replace_root_with_env_key(self, filepath, template=None): - """Replace root of path with environment key. - - # Example: - ## Project with roots: - ``` - { - "nas": { - "windows": P:/projects", - ... - } - ... - } - ``` - - ## Entered filepath - "P:/projects/project/folder/task/animation_v001.ma" - - ## Entered template - "<{}>" - - ## Output - "/project/folder/task/animation_v001.ma" - - Args: - filepath (str): Full file path where root should be replaced. - template (str): Optional template for environment key. Must - have one index format key. - Default value if not entered: "${}" - - Returns: - str: Path where root is replaced with environment root key. - - Raise: - ValueError: When project's roots were not found in entered path. - """ - success, rootless_path = self.find_root_template_from_path(filepath) - if not success: - raise ValueError( - "{}: Project's roots were not found in path: {}".format( - self.project_name, filepath - ) - ) - - data = self.root_environmets_fill_data(template) - return rootless_path.format(**data) - - def _project_entity_to_anatomy_data(self, project_entity): - """Convert project document to anatomy data. - - Probably should fill missing keys and values. - """ - - output = copy.deepcopy(project_entity["config"]) - # TODO remove AYON convertion - task_types = copy.deepcopy(project_entity["taskTypes"]) - new_task_types = {} - for task_type in task_types: - name = task_type["name"] - new_task_types[name] = task_type - output["tasks"] = new_task_types - output["attributes"] = copy.deepcopy(project_entity["attrib"]) - - return output - - def _apply_local_settings_on_anatomy_data( - self, anatomy_data, root_overrides - ): - """Apply local settings on anatomy data. - - ATM local settings can modify project roots. Project name is required - as local settings have data stored data by project's name. - - Local settings override root values in this order: - 1.) Check if local settings contain overrides for default project and - apply it's values on roots if there are any. - 2.) If passed `project_name` is not None then check project specific - overrides in local settings for the project and apply it's value on - roots if there are any. - - NOTE: Root values of default project from local settings are always - applied if are set. - - Args: - anatomy_data (dict): Data for anatomy. - root_overrides (dict): Data of local settings. - """ - - # Skip processing if roots for current active site are not available in - # local settings - if not root_overrides: - return - - current_platform = platform.system().lower() - - root_data = anatomy_data["roots"] - for root_name, path in root_overrides.items(): - if root_name not in root_data: - continue - anatomy_data["roots"][root_name][current_platform] = ( - path - ) - - -class CacheItem: - """Helper to cache data. - - Helper does not handle refresh of data and does not mark data as outdated. - Who uses the object should check of outdated state on his own will. - """ - - default_lifetime = 10 - - def __init__(self, lifetime=None): - self._data = None - self._cached = None - self._lifetime = lifetime or self.default_lifetime - - @property - def data(self): - """Cached data/object. - - Returns: - Any: Whatever was cached. - """ - - return self._data - - @property - def is_outdated(self): - """Item has outdated cache. - - Lifetime of cache item expired or was not yet set. - - Returns: - bool: Item is outdated. - """ - - if self._cached is None: - return True - return (time.time() - self._cached) > self._lifetime - - def update_data(self, data): - """Update cache of data. - - Args: - data (Any): Data to cache. - """ - - self._data = data - self._cached = time.time() - - -class Anatomy(BaseAnatomy): - _sync_server_addon_cache = CacheItem() - _project_cache = collections.defaultdict(CacheItem) - _default_site_id_cache = collections.defaultdict(CacheItem) - _root_overrides_cache = collections.defaultdict( - lambda: collections.defaultdict(CacheItem) - ) - - def __init__( - self, project_name=None, site_name=None, project_entity=None - ): - if not project_name: - project_name = os.environ.get("AYON_PROJECT_NAME") - - if not project_name: - raise ProjectNotSet(( - "Implementation bug: Project name is not set. Anatomy requires" - " to load data for specific project." - )) - - if not project_entity: - project_entity = self.get_project_entity_from_cache(project_name) - root_overrides = self._get_site_root_overrides( - project_name, site_name - ) - - super(Anatomy, self).__init__(project_entity, root_overrides) - - @classmethod - def get_project_entity_from_cache(cls, project_name): - project_cache = cls._project_cache[project_name] - if project_cache.is_outdated: - project_cache.update_data(ayon_api.get_project(project_name)) - return copy.deepcopy(project_cache.data) - - @classmethod - def get_sync_server_addon(cls): - if cls._sync_server_addon_cache.is_outdated: - manager = AddonsManager() - cls._sync_server_addon_cache.update_data( - manager.get_enabled_addon("sync_server") - ) - return cls._sync_server_addon_cache.data - - @classmethod - def _get_studio_roots_overrides(cls, project_name): - """This would return 'studio' site override by local settings. - - Notes: - This logic handles local overrides of studio site which may be - available even when sync server is not enabled. - Handling of 'studio' and 'local' site was separated as preparation - for AYON development where that will be received from - separated sources. - - Args: - project_name (str): Name of project. - - Returns: - Union[Dict[str, str], None]): Local root overrides. - """ - if not project_name: - return - return ayon_api.get_project_roots_for_site( - project_name, get_local_site_id() - ) - - @classmethod - def _get_site_root_overrides(cls, project_name, site_name): - """Get root overrides for site. - - Args: - project_name (str): Project name for which root overrides should be - received. - site_name (Union[str, None]): Name of site for which root overrides - should be returned. - """ - - # First check if sync server is available and enabled - sync_server = cls.get_sync_server_addon() - if sync_server is None or not sync_server.enabled: - # QUESTION is ok to force 'studio' when site sync is not enabled? - site_name = "studio" - - elif not site_name: - # Use sync server to receive active site name - project_cache = cls._default_site_id_cache[project_name] - if project_cache.is_outdated: - project_cache.update_data( - sync_server.get_active_site_type(project_name) - ) - site_name = project_cache.data - - site_cache = cls._root_overrides_cache[project_name][site_name] - if site_cache.is_outdated: - if site_name == "studio": - # Handle studio root overrides without sync server - # - studio root overrides can be done even without sync server - roots_overrides = cls._get_studio_roots_overrides( - project_name - ) - else: - # Ask sync server to get roots overrides - roots_overrides = sync_server.get_site_root_overrides( - project_name, site_name - ) - site_cache.update_data(roots_overrides) - return site_cache.data - - -class AnatomyTemplateUnsolved(TemplateUnsolved): - """Exception for unsolved template when strict is set to True.""" - - msg = "Anatomy template \"{0}\" is unsolved.{1}{2}" - - -class AnatomyTemplateResult(TemplateResult): - rootless = None - - def __new__(cls, result, rootless_path): - new_obj = super(AnatomyTemplateResult, cls).__new__( - cls, - str(result), - result.template, - result.solved, - result.used_values, - result.missing_keys, - result.invalid_types - ) - new_obj.rootless = rootless_path - return new_obj - - def validate(self): - if not self.solved: - raise AnatomyTemplateUnsolved( - self.template, - self.missing_keys, - self.invalid_types - ) - - def copy(self): - tmp = TemplateResult( - str(self), - self.template, - self.solved, - self.used_values, - self.missing_keys, - self.invalid_types - ) - return self.__class__(tmp, self.rootless) - - def normalized(self): - """Convert to normalized path.""" - - tmp = TemplateResult( - os.path.normpath(self), - self.template, - self.solved, - self.used_values, - self.missing_keys, - self.invalid_types - ) - return self.__class__(tmp, self.rootless) - - -class AnatomyStringTemplate(StringTemplate): - """String template which has access to anatomy.""" - - def __init__(self, anatomy_templates, template): - self.anatomy_templates = anatomy_templates - super(AnatomyStringTemplate, self).__init__(template) - - def format(self, data): - """Format template and add 'root' key to data if not available. - - Args: - data (dict[str, Any]): Formatting data for template. - - Returns: - AnatomyTemplateResult: Formatting result. - """ - - anatomy_templates = self.anatomy_templates - if not data.get("root"): - data = copy.deepcopy(data) - data["root"] = anatomy_templates.anatomy.roots - result = StringTemplate.format(self, data) - rootless_path = anatomy_templates.rootless_path_from_result(result) - return AnatomyTemplateResult(result, rootless_path) - - -class AnatomyTemplates(TemplatesDict): - inner_key_pattern = re.compile(r"(\{@.*?[^{}0]*\})") - inner_key_name_pattern = re.compile(r"\{@(.*?[^{}0]*)\}") - - def __init__(self, anatomy): - super(AnatomyTemplates, self).__init__() - self.anatomy = anatomy - self.loaded_project = None - - def reset(self): - self._raw_templates = None - self._templates = None - self._objected_templates = None - - @property - def project_name(self): - return self.anatomy.project_name - - @property - def roots(self): - return self.anatomy.roots - - @property - def templates(self): - self._validate_discovery() - return self._templates - - @property - def objected_templates(self): - self._validate_discovery() - return self._objected_templates - - def _validate_discovery(self): - if self.project_name != self.loaded_project: - self.reset() - - if self._templates is None: - self._discover() - self.loaded_project = self.project_name - - def _format_value(self, value, data): - if isinstance(value, RootItem): - return self._solve_dict(value, data) - return super(AnatomyTemplates, self)._format_value(value, data) - - @staticmethod - def _ayon_template_conversion(templates): - def _convert_template_item(template_item): - # Change 'directory' to 'folder' - if "directory" in template_item: - template_item["folder"] = template_item["directory"] - - if ( - "path" not in template_item - and "file" in template_item - and "folder" in template_item - ): - template_item["path"] = "/".join( - (template_item["folder"], template_item["file"]) - ) - - def _get_default_template_name(templates): - default_template = None - for name, template in templates.items(): - if name == "default": - return "default" - - if default_template is None: - default_template = name - - return default_template - - def _fill_template_category(templates, cat_templates, cat_key): - default_template_name = _get_default_template_name(cat_templates) - for template_name, cat_template in cat_templates.items(): - _convert_template_item(cat_template) - if template_name == default_template_name: - templates[cat_key] = cat_template - else: - new_name = "{}_{}".format(cat_key, template_name) - templates["others"][new_name] = cat_template - - others_templates = templates.pop("others", None) or {} - new_others_templates = {} - templates["others"] = new_others_templates - for name, template in others_templates.items(): - _convert_template_item(template) - new_others_templates[name] = template - - for key in ( - "work", - "publish", - "hero", - ): - cat_templates = templates.pop(key) - _fill_template_category(templates, cat_templates, key) - - delivery_templates = templates.pop("delivery", None) or {} - new_delivery_templates = {} - for name, delivery_template in delivery_templates.items(): - new_delivery_templates[name] = "/".join( - (delivery_template["directory"], delivery_template["file"]) - ) - templates["delivery"] = new_delivery_templates - - def set_templates(self, templates): - if not templates: - self.reset() - return - - templates = copy.deepcopy(templates) - # TODO remove AYON convertion - self._ayon_template_conversion(templates) - - self._raw_templates = copy.deepcopy(templates) - v_queue = collections.deque() - v_queue.append(templates) - while v_queue: - item = v_queue.popleft() - if not isinstance(item, dict): - continue - - for key in tuple(item.keys()): - value = item[key] - if isinstance(value, dict): - v_queue.append(value) - - elif ( - isinstance(value, six.string_types) - and "{task}" in value - ): - item[key] = value.replace("{task}", "{task[name]}") - - solved_templates = self.solve_template_inner_links(templates) - self._templates = solved_templates - self._objected_templates = self.create_objected_templates( - solved_templates - ) - - def _create_template_object(self, template): - return AnatomyStringTemplate(self, template) - - def default_templates(self): - """Return default templates data with solved inner keys.""" - return self.solve_template_inner_links( - self.anatomy["templates"] - ) - - def _discover(self): - """ Loads anatomy templates from yaml. - Default templates are loaded if project is not set or project does - not have set it's own. - TODO: create templates if not exist. - - Returns: - TemplatesResultDict: Contain templates data for current project of - default templates. - """ - - if self.project_name is None: - # QUESTION create project specific if not found? - raise AssertionError(( - "Project \"{0}\" does not have his own templates." - " Trying to use default." - ).format(self.project_name)) - - self.set_templates(self.anatomy["templates"]) - - @classmethod - def replace_inner_keys(cls, matches, value, key_values, key): - """Replacement of inner keys in template values.""" - for match in matches: - anatomy_sub_keys = ( - cls.inner_key_name_pattern.findall(match) - ) - if key in anatomy_sub_keys: - raise ValueError(( - "Unsolvable recursion in inner keys, " - "key: \"{}\" is in his own value." - " Can't determine source, please check Anatomy templates." - ).format(key)) - - for anatomy_sub_key in anatomy_sub_keys: - replace_value = key_values.get(anatomy_sub_key) - if replace_value is None: - raise KeyError(( - "Anatomy templates can't be filled." - " Anatomy key `{0}` has" - " invalid inner key `{1}`." - ).format(key, anatomy_sub_key)) - - if not ( - isinstance(replace_value, numbers.Number) - or isinstance(replace_value, six.string_types) - ): - raise ValueError(( - "Anatomy templates can't be filled." - " Anatomy key `{0}` has" - " invalid inner key `{1}`" - " with value `{2}`." - ).format(key, anatomy_sub_key, str(replace_value))) - - value = value.replace(match, str(replace_value)) - - return value - - @classmethod - def prepare_inner_keys(cls, key_values): - """Check values of inner keys. - - Check if inner key exist in template group and has valid value. - It is also required to avoid infinite loop with unsolvable recursion - when first inner key's value refers to second inner key's value where - first is used. - """ - keys_to_solve = set(key_values.keys()) - while True: - found = False - for key in tuple(keys_to_solve): - value = key_values[key] - - if isinstance(value, six.string_types): - matches = cls.inner_key_pattern.findall(value) - if not matches: - keys_to_solve.remove(key) - continue - - found = True - key_values[key] = cls.replace_inner_keys( - matches, value, key_values, key - ) - continue - - elif not isinstance(value, dict): - keys_to_solve.remove(key) - continue - - subdict_found = False - for _key, _value in tuple(value.items()): - matches = cls.inner_key_pattern.findall(_value) - if not matches: - continue - - subdict_found = True - found = True - key_values[key][_key] = cls.replace_inner_keys( - matches, _value, key_values, - "{}.{}".format(key, _key) - ) - - if not subdict_found: - keys_to_solve.remove(key) - - if not found: - break - - return key_values - - @classmethod - def solve_template_inner_links(cls, templates): - """Solve templates inner keys identified by "{@*}". - - Process is split into 2 parts. - First is collecting all global keys (keys in top hierarchy where value - is not dictionary). All global keys are set for all group keys (keys - in top hierarchy where value is dictionary). Value of a key is not - overridden in group if already contain value for the key. - - In second part all keys with "at" symbol in value are replaced with - value of the key afterward "at" symbol from the group. - - Args: - templates (dict): Raw templates data. - - Example: - templates:: - key_1: "value_1", - key_2: "{@key_1}/{filling_key}" - - group_1: - key_3: "value_3/{@key_2}" - - group_2: - key_2": "value_2" - key_4": "value_4/{@key_2}" - - output:: - key_1: "value_1" - key_2: "value_1/{filling_key}" - - group_1: { - key_1: "value_1" - key_2: "value_1/{filling_key}" - key_3: "value_3/value_1/{filling_key}" - - group_2: { - key_1: "value_1" - key_2: "value_2" - key_4: "value_3/value_2" - """ - default_key_values = templates.pop("common", {}) - for key, value in tuple(templates.items()): - if isinstance(value, dict): - continue - default_key_values[key] = templates.pop(key) - - # Pop "others" key before before expected keys are processed - other_templates = templates.pop("others") or {} - - keys_by_subkey = {} - for sub_key, sub_value in templates.items(): - key_values = {} - key_values.update(default_key_values) - key_values.update(sub_value) - keys_by_subkey[sub_key] = cls.prepare_inner_keys(key_values) - - for sub_key, sub_value in other_templates.items(): - if sub_key in keys_by_subkey: - log.warning(( - "Key \"{}\" is duplicated in others. Skipping." - ).format(sub_key)) - continue - - key_values = {} - key_values.update(default_key_values) - key_values.update(sub_value) - keys_by_subkey[sub_key] = cls.prepare_inner_keys(key_values) - - default_keys_by_subkeys = cls.prepare_inner_keys(default_key_values) - - for key, value in default_keys_by_subkeys.items(): - keys_by_subkey[key] = value - - return keys_by_subkey - - @classmethod - def _dict_to_subkeys_list(cls, subdict, pre_keys=None): - if pre_keys is None: - pre_keys = [] - output = [] - for key in subdict: - value = subdict[key] - result = list(pre_keys) - result.append(key) - if isinstance(value, dict): - for item in cls._dict_to_subkeys_list(value, result): - output.append(item) - else: - output.append(result) - return output - - def _keys_to_dicts(self, key_list, value): - if not key_list: - return None - if len(key_list) == 1: - return {key_list[0]: value} - return {key_list[0]: self._keys_to_dicts(key_list[1:], value)} - - @classmethod - def rootless_path_from_result(cls, result): - """Calculate rootless path from formatting result. - - Args: - result (TemplateResult): Result of StringTemplate formatting. - - Returns: - str: Rootless path if result contains one of anatomy roots. - """ - - used_values = result.used_values - missing_keys = result.missing_keys - template = result.template - invalid_types = result.invalid_types - if ( - "root" not in used_values - or "root" in missing_keys - or "{root" not in template - ): - return - - for invalid_type in invalid_types: - if "root" in invalid_type: - return - - root_keys = cls._dict_to_subkeys_list({"root": used_values["root"]}) - if not root_keys: - return - - output = str(result) - for used_root_keys in root_keys: - if not used_root_keys: - continue - - used_value = used_values - root_key = None - for key in used_root_keys: - used_value = used_value[key] - if root_key is None: - root_key = key - else: - root_key += "[{}]".format(key) - - root_key = "{" + root_key + "}" - output = output.replace(str(used_value), root_key) - - return output - - def format(self, data, strict=True): - copy_data = copy.deepcopy(data) - roots = self.roots - if roots: - copy_data["root"] = roots - result = super(AnatomyTemplates, self).format(copy_data) - result.strict = strict - return result - - def format_all(self, in_data, only_keys=True): - """ Solves templates based on entered data. - - Args: - data (dict): Containing keys to be filled into template. - - Returns: - TemplatesResultDict: Output `TemplateResult` have `strict` - attribute set to False so accessing unfilled keys in templates - won't raise any exceptions. - """ - return self.format(in_data, strict=False) - - -class RootItem(FormatObject): - """Represents one item or roots. - - Holds raw data of root item specification. Raw data contain value - for each platform, but current platform value is used when object - is used for formatting of template. - - Args: - root_raw_data (dict): Dictionary containing root values by platform - names. ["windows", "linux" and "darwin"] - name (str, optional): Root name which is representing. Used with - multi root setup otherwise None value is expected. - parent_keys (list, optional): All dictionary parent keys. Values of - `parent_keys` are used for get full key which RootItem is - representing. Used for replacing root value in path with - formattable key. e.g. parent_keys == ["work"] -> {root[work]} - parent (object, optional): It is expected to be `Roots` object. - Value of `parent` won't affect code logic much. - """ - - def __init__( - self, root_raw_data, name=None, parent_keys=None, parent=None - ): - lowered_platform_keys = {} - for key, value in root_raw_data.items(): - lowered_platform_keys[key.lower()] = value - self.raw_data = lowered_platform_keys - self.cleaned_data = self._clean_roots(lowered_platform_keys) - self.name = name - self.parent_keys = parent_keys or [] - self.parent = parent - - self.available_platforms = list(lowered_platform_keys.keys()) - self.value = lowered_platform_keys.get(platform.system().lower()) - self.clean_value = self.clean_root(self.value) - - def __format__(self, *args, **kwargs): - return self.value.__format__(*args, **kwargs) - - def __str__(self): - return str(self.value) - - def __repr__(self): - return self.__str__() - - def __getitem__(self, key): - if isinstance(key, numbers.Number): - return self.value[key] - - additional_info = "" - if self.parent and self.parent.project_name: - additional_info += " for project \"{}\"".format( - self.parent.project_name - ) - - raise AssertionError( - "Root key \"{}\" is missing{}.".format( - key, additional_info - ) - ) - - def full_key(self): - """Full key value for dictionary formatting in template. - - Returns: - str: Return full replacement key for formatting. This helps when - multiple roots are set. In that case e.g. `"root[work]"` is - returned. - """ - if not self.name: - return "root" - - joined_parent_keys = "".join( - ["[{}]".format(key) for key in self.parent_keys] - ) - return "root{}".format(joined_parent_keys) - - def clean_path(self, path): - """Just replace backslashes with forward slashes.""" - return str(path).replace("\\", "/") - - def clean_root(self, root): - """Makes sure root value does not end with slash.""" - if root: - root = self.clean_path(root) - while root.endswith("/"): - root = root[:-1] - return root - - def _clean_roots(self, raw_data): - """Clean all values of raw root item values.""" - cleaned = {} - for key, value in raw_data.items(): - cleaned[key] = self.clean_root(value) - return cleaned - - def path_remapper(self, path, dst_platform=None, src_platform=None): - """Remap path for specific platform. - - Args: - path (str): Source path which need to be remapped. - dst_platform (str, optional): Specify destination platform - for which remapping should happen. - src_platform (str, optional): Specify source platform. This is - recommended to not use and keep unset until you really want - to use specific platform. - roots (dict/RootItem/None, optional): It is possible to remap - path with different roots then instance where method was - called has. - - Returns: - str/None: When path does not contain known root then - None is returned else returns remapped path with "{root}" - or "{root[]}". - """ - cleaned_path = self.clean_path(path) - if dst_platform: - dst_root_clean = self.cleaned_data.get(dst_platform) - if not dst_root_clean: - key_part = "" - full_key = self.full_key() - if full_key != "root": - key_part += "\"{}\" ".format(full_key) - - log.warning( - "Root {}miss platform \"{}\" definition.".format( - key_part, dst_platform - ) - ) - return None - - if cleaned_path.startswith(dst_root_clean): - return cleaned_path - - if src_platform: - src_root_clean = self.cleaned_data.get(src_platform) - if src_root_clean is None: - log.warning( - "Root \"{}\" miss platform \"{}\" definition.".format( - self.full_key(), src_platform - ) - ) - return None - - if not cleaned_path.startswith(src_root_clean): - return None - - subpath = cleaned_path[len(src_root_clean):] - if dst_platform: - # `dst_root_clean` is used from upper condition - return dst_root_clean + subpath - return self.clean_value + subpath - - result, template = self.find_root_template_from_path(path) - if not result: - return None - - def parent_dict(keys, value): - if not keys: - return value - - key = keys.pop(0) - return {key: parent_dict(keys, value)} - - if dst_platform: - format_value = parent_dict(list(self.parent_keys), dst_root_clean) - else: - format_value = parent_dict(list(self.parent_keys), self.value) - - return template.format(**{"root": format_value}) - - def find_root_template_from_path(self, path): - """Replaces known root value with formattable key in path. - - All platform values are checked for this replacement. - - Args: - path (str): Path where root value should be found. - - Returns: - tuple: Tuple contain 2 values: `success` (bool) and `path` (str). - When success it True then path should contain replaced root - value with formattable key. - - Example: - When input path is:: - "C:/windows/path/root/projects/my_project/file.ext" - - And raw data of item looks like:: - { - "windows": "C:/windows/path/root", - "linux": "/mount/root" - } - - Output will be:: - (True, "{root}/projects/my_project/file.ext") - - If any of raw data value wouldn't match path's root output is:: - (False, "C:/windows/path/root/projects/my_project/file.ext") - """ - result = False - output = str(path) - - mod_path = self.clean_path(path) - for root_os, root_path in self.cleaned_data.items(): - # Skip empty paths - if not root_path: - continue - - _mod_path = mod_path # reset to original cleaned value - if root_os == "windows": - root_path = root_path.lower() - _mod_path = _mod_path.lower() - - if _mod_path.startswith(root_path): - result = True - replacement = "{" + self.full_key() + "}" - output = replacement + mod_path[len(root_path):] - break - - return (result, output) - - -class Roots: - """Object which should be used for formatting "root" key in templates. - - Args: - anatomy Anatomy: Anatomy object created for a specific project. - """ - - env_prefix = "AYON_PROJECT_ROOT" - roots_filename = "roots.json" - - def __init__(self, anatomy): - self.anatomy = anatomy - self.loaded_project = None - self._roots = None - - def __format__(self, *args, **kwargs): - return self.roots.__format__(*args, **kwargs) - - def __getitem__(self, key): - return self.roots[key] - - def reset(self): - """Reset current roots value.""" - self._roots = None - - def path_remapper( - self, path, dst_platform=None, src_platform=None, roots=None - ): - """Remap path for specific platform. - - Args: - path (str): Source path which need to be remapped. - dst_platform (str, optional): Specify destination platform - for which remapping should happen. - src_platform (str, optional): Specify source platform. This is - recommended to not use and keep unset until you really want - to use specific platform. - roots (dict/RootItem/None, optional): It is possible to remap - path with different roots then instance where method was - called has. - - Returns: - str/None: When path does not contain known root then - None is returned else returns remapped path with "{root}" - or "{root[]}". - """ - if roots is None: - roots = self.roots - - if roots is None: - raise ValueError("Roots are not set. Can't find path.") - - if "{root" in path: - path = path.format(**{"root": roots}) - # If `dst_platform` is not specified then return else continue. - if not dst_platform: - return path - - if isinstance(roots, RootItem): - return roots.path_remapper(path, dst_platform, src_platform) - - for _root in roots.values(): - result = self.path_remapper( - path, dst_platform, src_platform, _root - ) - if result is not None: - return result - - def find_root_template_from_path(self, path, roots=None): - """Find root value in entered path and replace it with formatting key. - - Args: - path (str): Source path where root will be searched. - roots (Roots/dict, optional): It is possible to use different - roots than instance where method was triggered has. - - Returns: - tuple: Output contains tuple with bool representing success as - first value and path with or without replaced root with - formatting key as second value. - - Raises: - ValueError: When roots are not entered and can't be loaded. - """ - if roots is None: - log.debug( - "Looking for matching root in path \"{}\".".format(path) - ) - roots = self.roots - - if roots is None: - raise ValueError("Roots are not set. Can't find path.") - - if isinstance(roots, RootItem): - return roots.find_root_template_from_path(path) - - for root_name, _root in roots.items(): - success, result = self.find_root_template_from_path(path, _root) - if success: - log.info("Found match in root \"{}\".".format(root_name)) - return success, result - - log.warning("No matching root was found in current setting.") - return (False, path) - - def set_root_environments(self): - """Set root environments for current project.""" - for key, value in self.root_environments().items(): - os.environ[key] = value - - def root_environments(self): - """Use root keys to create unique keys for environment variables. - - Concatenates prefix "AYON_PROJECT_ROOT_" with root keys to create - unique keys. - - Returns: - dict: Result is `{(str): (str)}` dicitonary where key represents - unique key concatenated by keys and value is root value of - current platform root. - - Example: - With raw root values:: - "work": { - "windows": "P:/projects/work", - "linux": "/mnt/share/projects/work", - "darwin": "/darwin/path/work" - }, - "publish": { - "windows": "P:/projects/publish", - "linux": "/mnt/share/projects/publish", - "darwin": "/darwin/path/publish" - } - - Result on windows platform:: - { - "AYON_PROJECT_ROOT_WORK": "P:/projects/work", - "AYON_PROJECT_ROOT_PUBLISH": "P:/projects/publish" - } - - """ - return self._root_environments() - - def all_root_paths(self, roots=None): - """Return all paths for all roots of all platforms.""" - if roots is None: - roots = self.roots - - output = [] - if isinstance(roots, RootItem): - for value in roots.raw_data.values(): - output.append(value) - return output - - for _roots in roots.values(): - output.extend(self.all_root_paths(_roots)) - return output - - def _root_environments(self, keys=None, roots=None): - if not keys: - keys = [] - if roots is None: - roots = self.roots - - if isinstance(roots, RootItem): - key_items = [self.env_prefix] - for _key in keys: - key_items.append(_key.upper()) - - key = "_".join(key_items) - # Make sure key and value does not contain unicode - # - can happen in Python 2 hosts - return {str(key): str(roots.value)} - - output = {} - for _key, _value in roots.items(): - _keys = list(keys) - _keys.append(_key) - output.update(self._root_environments(_keys, _value)) - return output - - def root_environmets_fill_data(self, template=None): - """Environment variable values in dictionary for rootless path. - - Args: - template (str): Template for environment variable key fill. - By default is set to `"${}"`. - """ - if template is None: - template = "${}" - return self._root_environmets_fill_data(template) - - def _root_environmets_fill_data(self, template, keys=None, roots=None): - if keys is None and roots is None: - return { - "root": self._root_environmets_fill_data( - template, [], self.roots - ) - } - - if isinstance(roots, RootItem): - key_items = [Roots.env_prefix] - for _key in keys: - key_items.append(_key.upper()) - key = "_".join(key_items) - return template.format(key) - - output = {} - for key, value in roots.items(): - _keys = list(keys) - _keys.append(key) - output[key] = self._root_environmets_fill_data( - template, _keys, value - ) - return output - - @property - def project_name(self): - """Return project name which will be used for loading root values.""" - return self.anatomy.project_name - - @property - def roots(self): - """Property for filling "root" key in templates. - - This property returns roots for current project or default root values. - Warning: - Default roots value may cause issues when project use different - roots settings. That may happen when project use multiroot - templates but default roots miss their keys. - """ - if self.project_name != self.loaded_project: - self._roots = None - - if self._roots is None: - self._roots = self._discover() - self.loaded_project = self.project_name - return self._roots - - def _discover(self): - """ Loads current project's roots or default. - - Default roots are loaded if project override's does not contain roots. - - Returns: - `RootItem` or `dict` with multiple `RootItem`s when multiroot - setting is used. - """ - - return self._parse_dict(self.anatomy["roots"], parent=self) - - @staticmethod - def _parse_dict(data, key=None, parent_keys=None, parent=None): - """Parse roots raw data into RootItem or dictionary with RootItems. - - Converting raw roots data to `RootItem` helps to handle platform keys. - This method is recursive to be able handle multiroot setup and - is static to be able to load default roots without creating new object. - - Args: - data (dict): Should contain raw roots data to be parsed. - key (str, optional): Current root key. Set by recursion. - parent_keys (list): Parent dictionary keys. Set by recursion. - parent (Roots, optional): Parent object set in `RootItem` - helps to keep RootItem instance updated with `Roots` object. - - Returns: - `RootItem` or `dict` with multiple `RootItem`s when multiroot - setting is used. - """ - if not parent_keys: - parent_keys = [] - is_last = False - for value in data.values(): - if isinstance(value, six.string_types): - is_last = True - break - - if is_last: - return RootItem(data, key, parent_keys, parent=parent) - - output = {} - for _key, value in data.items(): - _parent_keys = list(parent_keys) - _parent_keys.append(_key) - output[_key] = Roots._parse_dict(value, _key, _parent_keys, parent) - return output diff --git a/client/ayon_core/pipeline/anatomy/__init__.py b/client/ayon_core/pipeline/anatomy/__init__.py new file mode 100644 index 0000000000..336d09ccaa --- /dev/null +++ b/client/ayon_core/pipeline/anatomy/__init__.py @@ -0,0 +1,17 @@ +from .exceptions import ( + ProjectNotSet, + RootCombinationError, + TemplateMissingKey, + AnatomyTemplateUnsolved, +) +from .anatomy import Anatomy + + +__all__ = ( + "ProjectNotSet", + "RootCombinationError", + "TemplateMissingKey", + "AnatomyTemplateUnsolved", + + "Anatomy", +) diff --git a/client/ayon_core/pipeline/anatomy/anatomy.py b/client/ayon_core/pipeline/anatomy/anatomy.py new file mode 100644 index 0000000000..5eaf918663 --- /dev/null +++ b/client/ayon_core/pipeline/anatomy/anatomy.py @@ -0,0 +1,502 @@ +import os +import re +import copy +import platform +import collections +import time + +import ayon_api + +from ayon_core.lib import Logger, get_local_site_id +from ayon_core.addon import AddonsManager + +from .exceptions import RootCombinationError, ProjectNotSet +from .roots import Roots +from .templates import AnatomyTemplates + +log = Logger.get_logger(__name__) + + +class BaseAnatomy(object): + """Anatomy module helps to keep project settings. + + Wraps key project specifications, AnatomyTemplates and Roots. + """ + root_key_regex = re.compile(r"{(root?[^}]+)}") + root_name_regex = re.compile(r"root\[([^]]+)\]") + + def __init__(self, project_entity, root_overrides=None): + project_name = project_entity["name"] + self.project_name = project_name + self.project_code = project_entity["code"] + + self._data = self._prepare_anatomy_data( + project_entity, root_overrides + ) + self._templates_obj = AnatomyTemplates(self) + self._roots_obj = Roots(self) + + # Anatomy used as dictionary + # - implemented only getters returning copy + def __getitem__(self, key): + return copy.deepcopy(self._data[key]) + + def get(self, key, default=None): + return copy.deepcopy(self._data).get(key, default) + + def keys(self): + return copy.deepcopy(self._data).keys() + + def values(self): + return copy.deepcopy(self._data).values() + + def items(self): + return copy.deepcopy(self._data).items() + + def _prepare_anatomy_data(self, project_entity, root_overrides): + """Prepare anatomy data for further processing. + + Method added to replace `{task}` with `{task[name]}` in templates. + """ + + anatomy_data = self._project_entity_to_anatomy_data(project_entity) + + self._apply_local_settings_on_anatomy_data( + anatomy_data, + root_overrides + ) + + return anatomy_data + + @property + def templates(self): + """Wrap property `templates` of Anatomy's AnatomyTemplates instance.""" + return self._templates_obj.templates + + @property + def templates_obj(self): + """Return `AnatomyTemplates` object of current Anatomy instance.""" + return self._templates_obj + + def format(self, *args, **kwargs): + """Wrap `format` method of Anatomy's `templates_obj`.""" + return self._templates_obj.format(*args, **kwargs) + + def format_all(self, *args, **kwargs): + """Wrap `format_all` method of Anatomy's `templates_obj`.""" + return self._templates_obj.format_all(*args, **kwargs) + + @property + def roots(self): + """Wrap `roots` property of Anatomy's `roots_obj`.""" + return self._roots_obj.roots + + @property + def roots_obj(self): + """Return `Roots` object of current Anatomy instance.""" + return self._roots_obj + + def root_environments(self): + """Return AYON_PROJECT_ROOT_* environments for current project.""" + return self._roots_obj.root_environments() + + def root_environmets_fill_data(self, template=None): + """Environment variable values in dictionary for rootless path. + + Args: + template (str): Template for environment variable key fill. + By default is set to `"${}"`. + """ + return self.roots_obj.root_environmets_fill_data(template) + + def find_root_template_from_path(self, *args, **kwargs): + """Wrapper for Roots `find_root_template_from_path`.""" + return self.roots_obj.find_root_template_from_path(*args, **kwargs) + + def path_remapper(self, *args, **kwargs): + """Wrapper for Roots `path_remapper`.""" + return self.roots_obj.path_remapper(*args, **kwargs) + + def all_root_paths(self): + """Wrapper for Roots `all_root_paths`.""" + return self.roots_obj.all_root_paths() + + def set_root_environments(self): + """Set AYON_PROJECT_ROOT_* environments for current project.""" + self._roots_obj.set_root_environments() + + def root_names(self): + """Return root names for current project.""" + return self.root_names_from_templates(self.templates) + + def _root_keys_from_templates(self, data): + """Extract root key from templates in data. + + Args: + data (dict): Data that may contain templates as string. + + Return: + set: Set of all root names from templates as strings. + + Output example: `{"root[work]", "root[publish]"}` + """ + + output = set() + if isinstance(data, dict): + for value in data.values(): + for root in self._root_keys_from_templates(value): + output.add(root) + + elif isinstance(data, str): + for group in re.findall(self.root_key_regex, data): + output.add(group) + + return output + + def root_value_for_template(self, template): + """Returns value of root key from template.""" + root_templates = [] + for group in re.findall(self.root_key_regex, template): + root_templates.append("{" + group + "}") + + if not root_templates: + return None + + return root_templates[0].format(**{"root": self.roots}) + + def root_names_from_templates(self, templates): + """Extract root names form anatomy templates. + + Returns None if values in templates contain only "{root}". + Empty list is returned if there is no "root" in templates. + Else returns all root names from templates in list. + + RootCombinationError is raised when templates contain both root types, + basic "{root}" and with root name specification "{root[work]}". + + Args: + templates (dict): Anatomy templates where roots are not filled. + + Return: + list/None: List of all root names from templates as strings when + multiroot setup is used, otherwise None is returned. + """ + roots = list(self._root_keys_from_templates(templates)) + # Return empty list if no roots found in templates + if not roots: + return roots + + # Raise exception when root keys have roots with and without root name. + # Invalid output example: ["root", "root[project]", "root[render]"] + if len(roots) > 1 and "root" in roots: + raise RootCombinationError(roots) + + # Return None if "root" without root name in templates + if len(roots) == 1 and roots[0] == "root": + return None + + names = set() + for root in roots: + for group in re.findall(self.root_name_regex, root): + names.add(group) + return list(names) + + def fill_root(self, template_path): + """Fill template path where is only "root" key unfilled. + + Args: + template_path (str): Path with "root" key in. + Example path: "{root}/projects/MyProject/Shot01/Lighting/..." + + Return: + str: formatted path + """ + # NOTE does not care if there are different keys than "root" + return template_path.format(**{"root": self.roots}) + + @classmethod + def fill_root_with_path(cls, rootless_path, root_path): + """Fill path without filled "root" key with passed path. + + This is helper to fill root with different directory path than anatomy + has defined no matter if is single or multiroot. + + Output path is same as input path if `rootless_path` does not contain + unfilled root key. + + Args: + rootless_path (str): Path without filled "root" key. Example: + "{root[work]}/MyProject/..." + root_path (str): What should replace root key in `rootless_path`. + + Returns: + str: Path with filled root. + """ + output = str(rootless_path) + for group in re.findall(cls.root_key_regex, rootless_path): + replacement = "{" + group + "}" + output = output.replace(replacement, root_path) + + return output + + def replace_root_with_env_key(self, filepath, template=None): + """Replace root of path with environment key. + + # Example: + ## Project with roots: + ``` + { + "nas": { + "windows": P:/projects", + ... + } + ... + } + ``` + + ## Entered filepath + "P:/projects/project/folder/task/animation_v001.ma" + + ## Entered template + "<{}>" + + ## Output + "/project/folder/task/animation_v001.ma" + + Args: + filepath (str): Full file path where root should be replaced. + template (str): Optional template for environment key. Must + have one index format key. + Default value if not entered: "${}" + + Returns: + str: Path where root is replaced with environment root key. + + Raise: + ValueError: When project's roots were not found in entered path. + """ + success, rootless_path = self.find_root_template_from_path(filepath) + if not success: + raise ValueError( + "{}: Project's roots were not found in path: {}".format( + self.project_name, filepath + ) + ) + + data = self.root_environmets_fill_data(template) + return rootless_path.format(**data) + + def _project_entity_to_anatomy_data(self, project_entity): + """Convert project document to anatomy data. + + Probably should fill missing keys and values. + """ + + output = copy.deepcopy(project_entity["config"]) + # TODO remove AYON convertion + task_types = copy.deepcopy(project_entity["taskTypes"]) + new_task_types = {} + for task_type in task_types: + name = task_type["name"] + new_task_types[name] = task_type + output["tasks"] = new_task_types + output["attributes"] = copy.deepcopy(project_entity["attrib"]) + + return output + + def _apply_local_settings_on_anatomy_data( + self, anatomy_data, root_overrides + ): + """Apply local settings on anatomy data. + + ATM local settings can modify project roots. Project name is required + as local settings have data stored data by project's name. + + Local settings override root values in this order: + 1.) Check if local settings contain overrides for default project and + apply it's values on roots if there are any. + 2.) If passed `project_name` is not None then check project specific + overrides in local settings for the project and apply it's value on + roots if there are any. + + NOTE: Root values of default project from local settings are always + applied if are set. + + Args: + anatomy_data (dict): Data for anatomy. + root_overrides (dict): Data of local settings. + """ + + # Skip processing if roots for current active site are not available in + # local settings + if not root_overrides: + return + + current_platform = platform.system().lower() + + root_data = anatomy_data["roots"] + for root_name, path in root_overrides.items(): + if root_name not in root_data: + continue + anatomy_data["roots"][root_name][current_platform] = ( + path + ) + + +class CacheItem: + """Helper to cache data. + + Helper does not handle refresh of data and does not mark data as outdated. + Who uses the object should check of outdated state on his own will. + """ + + default_lifetime = 10 + + def __init__(self, lifetime=None): + self._data = None + self._cached = None + self._lifetime = lifetime or self.default_lifetime + + @property + def data(self): + """Cached data/object. + + Returns: + Any: Whatever was cached. + """ + + return self._data + + @property + def is_outdated(self): + """Item has outdated cache. + + Lifetime of cache item expired or was not yet set. + + Returns: + bool: Item is outdated. + """ + + if self._cached is None: + return True + return (time.time() - self._cached) > self._lifetime + + def update_data(self, data): + """Update cache of data. + + Args: + data (Any): Data to cache. + """ + + self._data = data + self._cached = time.time() + + +class Anatomy(BaseAnatomy): + _sync_server_addon_cache = CacheItem() + _project_cache = collections.defaultdict(CacheItem) + _default_site_id_cache = collections.defaultdict(CacheItem) + _root_overrides_cache = collections.defaultdict( + lambda: collections.defaultdict(CacheItem) + ) + + def __init__( + self, project_name=None, site_name=None, project_entity=None + ): + if not project_name: + project_name = os.environ.get("AYON_PROJECT_NAME") + + if not project_name: + raise ProjectNotSet(( + "Implementation bug: Project name is not set. Anatomy requires" + " to load data for specific project." + )) + + if not project_entity: + project_entity = self.get_project_entity_from_cache(project_name) + root_overrides = self._get_site_root_overrides( + project_name, site_name + ) + + super(Anatomy, self).__init__(project_entity, root_overrides) + + @classmethod + def get_project_entity_from_cache(cls, project_name): + project_cache = cls._project_cache[project_name] + if project_cache.is_outdated: + project_cache.update_data(ayon_api.get_project(project_name)) + return copy.deepcopy(project_cache.data) + + @classmethod + def get_sync_server_addon(cls): + if cls._sync_server_addon_cache.is_outdated: + manager = AddonsManager() + cls._sync_server_addon_cache.update_data( + manager.get_enabled_addon("sync_server") + ) + return cls._sync_server_addon_cache.data + + @classmethod + def _get_studio_roots_overrides(cls, project_name): + """This would return 'studio' site override by local settings. + + Notes: + This logic handles local overrides of studio site which may be + available even when sync server is not enabled. + Handling of 'studio' and 'local' site was separated as preparation + for AYON development where that will be received from + separated sources. + + Args: + project_name (str): Name of project. + + Returns: + Union[Dict[str, str], None]): Local root overrides. + """ + if not project_name: + return + return ayon_api.get_project_roots_for_site( + project_name, get_local_site_id() + ) + + @classmethod + def _get_site_root_overrides(cls, project_name, site_name): + """Get root overrides for site. + + Args: + project_name (str): Project name for which root overrides should be + received. + site_name (Union[str, None]): Name of site for which root overrides + should be returned. + """ + + # First check if sync server is available and enabled + sync_server = cls.get_sync_server_addon() + if sync_server is None or not sync_server.enabled: + # QUESTION is ok to force 'studio' when site sync is not enabled? + site_name = "studio" + + elif not site_name: + # Use sync server to receive active site name + project_cache = cls._default_site_id_cache[project_name] + if project_cache.is_outdated: + project_cache.update_data( + sync_server.get_active_site_type(project_name) + ) + site_name = project_cache.data + + site_cache = cls._root_overrides_cache[project_name][site_name] + if site_cache.is_outdated: + if site_name == "studio": + # Handle studio root overrides without sync server + # - studio root overrides can be done even without sync server + roots_overrides = cls._get_studio_roots_overrides( + project_name + ) + else: + # Ask sync server to get roots overrides + roots_overrides = sync_server.get_site_root_overrides( + project_name, site_name + ) + site_cache.update_data(roots_overrides) + return site_cache.data diff --git a/client/ayon_core/pipeline/anatomy/exceptions.py b/client/ayon_core/pipeline/anatomy/exceptions.py new file mode 100644 index 0000000000..39f116baf0 --- /dev/null +++ b/client/ayon_core/pipeline/anatomy/exceptions.py @@ -0,0 +1,39 @@ +from ayon_core.lib.path_templates import TemplateUnsolved + + +class ProjectNotSet(Exception): + """Exception raised when is created Anatomy without project name.""" + + +class RootCombinationError(Exception): + """This exception is raised when templates has combined root types.""" + + def __init__(self, roots): + joined_roots = ", ".join( + ["\"{}\"".format(_root) for _root in roots] + ) + # TODO better error message + msg = ( + "Combination of root with and" + " without root name in AnatomyTemplates. {}" + ).format(joined_roots) + + super(RootCombinationError, self).__init__(msg) + + +class TemplateMissingKey(Exception): + """Exception for cases when key does not exist in template.""" + + msg = "Template key '{}' was not found." + + def __init__(self, parents): + parent_join = "".join(["[\"{0}\"]".format(key) for key in parents]) + super(TemplateMissingKey, self).__init__( + self.msg.format(parent_join) + ) + + +class AnatomyTemplateUnsolved(TemplateUnsolved): + """Exception for unsolved template when strict is set to True.""" + + msg = "Anatomy template \"{0}\" is unsolved.{1}{2}" diff --git a/client/ayon_core/pipeline/anatomy/roots.py b/client/ayon_core/pipeline/anatomy/roots.py new file mode 100644 index 0000000000..9290a2ca38 --- /dev/null +++ b/client/ayon_core/pipeline/anatomy/roots.py @@ -0,0 +1,534 @@ +import os +import numbers +import platform + +import six + +from ayon_core.lib import Logger +from ayon_core.lib.path_templates import FormatObject + +class RootItem(FormatObject): + """Represents one item or roots. + + Holds raw data of root item specification. Raw data contain value + for each platform, but current platform value is used when object + is used for formatting of template. + + Args: + root_raw_data (dict): Dictionary containing root values by platform + names. ["windows", "linux" and "darwin"] + name (str, optional): Root name which is representing. Used with + multi root setup otherwise None value is expected. + parent_keys (list, optional): All dictionary parent keys. Values of + `parent_keys` are used for get full key which RootItem is + representing. Used for replacing root value in path with + formattable key. e.g. parent_keys == ["work"] -> {root[work]} + parent (object, optional): It is expected to be `Roots` object. + Value of `parent` won't affect code logic much. + """ + + def __init__( + self, root_raw_data, name=None, parent_keys=None, parent=None + ): + super(RootItem, self).__init__() + self._log = None + lowered_platform_keys = {} + for key, value in root_raw_data.items(): + lowered_platform_keys[key.lower()] = value + self.raw_data = lowered_platform_keys + self.cleaned_data = self._clean_roots(lowered_platform_keys) + self.name = name + self.parent_keys = parent_keys or [] + self.parent = parent + + self.available_platforms = list(lowered_platform_keys.keys()) + self.value = lowered_platform_keys.get(platform.system().lower()) + self.clean_value = self.clean_root(self.value) + + def __format__(self, *args, **kwargs): + return self.value.__format__(*args, **kwargs) + + def __str__(self): + return str(self.value) + + def __repr__(self): + return self.__str__() + + def __getitem__(self, key): + if isinstance(key, numbers.Number): + return self.value[key] + + additional_info = "" + if self.parent and self.parent.project_name: + additional_info += " for project \"{}\"".format( + self.parent.project_name + ) + + raise AssertionError( + "Root key \"{}\" is missing{}.".format( + key, additional_info + ) + ) + + @property + def log(self): + if self._log is None: + self._log = Logger.get_logger(self.__class__.__name__) + return self._log + + def full_key(self): + """Full key value for dictionary formatting in template. + + Returns: + str: Return full replacement key for formatting. This helps when + multiple roots are set. In that case e.g. `"root[work]"` is + returned. + """ + if not self.name: + return "root" + + joined_parent_keys = "".join( + ["[{}]".format(key) for key in self.parent_keys] + ) + return "root{}".format(joined_parent_keys) + + def clean_path(self, path): + """Just replace backslashes with forward slashes.""" + return str(path).replace("\\", "/") + + def clean_root(self, root): + """Makes sure root value does not end with slash.""" + if root: + root = self.clean_path(root) + while root.endswith("/"): + root = root[:-1] + return root + + def _clean_roots(self, raw_data): + """Clean all values of raw root item values.""" + cleaned = {} + for key, value in raw_data.items(): + cleaned[key] = self.clean_root(value) + return cleaned + + def path_remapper(self, path, dst_platform=None, src_platform=None): + """Remap path for specific platform. + + Args: + path (str): Source path which need to be remapped. + dst_platform (str, optional): Specify destination platform + for which remapping should happen. + src_platform (str, optional): Specify source platform. This is + recommended to not use and keep unset until you really want + to use specific platform. + roots (dict/RootItem/None, optional): It is possible to remap + path with different roots then instance where method was + called has. + + Returns: + str/None: When path does not contain known root then + None is returned else returns remapped path with "{root}" + or "{root[]}". + """ + cleaned_path = self.clean_path(path) + if dst_platform: + dst_root_clean = self.cleaned_data.get(dst_platform) + if not dst_root_clean: + key_part = "" + full_key = self.full_key() + if full_key != "root": + key_part += "\"{}\" ".format(full_key) + + self.log.warning( + "Root {}miss platform \"{}\" definition.".format( + key_part, dst_platform + ) + ) + return None + + if cleaned_path.startswith(dst_root_clean): + return cleaned_path + + if src_platform: + src_root_clean = self.cleaned_data.get(src_platform) + if src_root_clean is None: + self.log.warning( + "Root \"{}\" miss platform \"{}\" definition.".format( + self.full_key(), src_platform + ) + ) + return None + + if not cleaned_path.startswith(src_root_clean): + return None + + subpath = cleaned_path[len(src_root_clean):] + if dst_platform: + # `dst_root_clean` is used from upper condition + return dst_root_clean + subpath + return self.clean_value + subpath + + result, template = self.find_root_template_from_path(path) + if not result: + return None + + def parent_dict(keys, value): + if not keys: + return value + + key = keys.pop(0) + return {key: parent_dict(keys, value)} + + if dst_platform: + format_value = parent_dict(list(self.parent_keys), dst_root_clean) + else: + format_value = parent_dict(list(self.parent_keys), self.value) + + return template.format(**{"root": format_value}) + + def find_root_template_from_path(self, path): + """Replaces known root value with formattable key in path. + + All platform values are checked for this replacement. + + Args: + path (str): Path where root value should be found. + + Returns: + tuple: Tuple contain 2 values: `success` (bool) and `path` (str). + When success it True then path should contain replaced root + value with formattable key. + + Example: + When input path is:: + "C:/windows/path/root/projects/my_project/file.ext" + + And raw data of item looks like:: + { + "windows": "C:/windows/path/root", + "linux": "/mount/root" + } + + Output will be:: + (True, "{root}/projects/my_project/file.ext") + + If any of raw data value wouldn't match path's root output is:: + (False, "C:/windows/path/root/projects/my_project/file.ext") + """ + result = False + output = str(path) + + mod_path = self.clean_path(path) + for root_os, root_path in self.cleaned_data.items(): + # Skip empty paths + if not root_path: + continue + + _mod_path = mod_path # reset to original cleaned value + if root_os == "windows": + root_path = root_path.lower() + _mod_path = _mod_path.lower() + + if _mod_path.startswith(root_path): + result = True + replacement = "{" + self.full_key() + "}" + output = replacement + mod_path[len(root_path):] + break + + return (result, output) + + +class Roots: + """Object which should be used for formatting "root" key in templates. + + Args: + anatomy Anatomy: Anatomy object created for a specific project. + """ + + env_prefix = "AYON_PROJECT_ROOT" + roots_filename = "roots.json" + + def __init__(self, anatomy): + self._log = None + self.anatomy = anatomy + self.loaded_project = None + self._roots = None + + def __format__(self, *args, **kwargs): + return self.roots.__format__(*args, **kwargs) + + def __getitem__(self, key): + return self.roots[key] + + @property + def log(self): + if self._log is None: + self._log = Logger.get_logger(self.__class__.__name__) + return self._log + + def reset(self): + """Reset current roots value.""" + self._roots = None + + def path_remapper( + self, path, dst_platform=None, src_platform=None, roots=None + ): + """Remap path for specific platform. + + Args: + path (str): Source path which need to be remapped. + dst_platform (str, optional): Specify destination platform + for which remapping should happen. + src_platform (str, optional): Specify source platform. This is + recommended to not use and keep unset until you really want + to use specific platform. + roots (dict/RootItem/None, optional): It is possible to remap + path with different roots then instance where method was + called has. + + Returns: + str/None: When path does not contain known root then + None is returned else returns remapped path with "{root}" + or "{root[]}". + """ + if roots is None: + roots = self.roots + + if roots is None: + raise ValueError("Roots are not set. Can't find path.") + + if "{root" in path: + path = path.format(**{"root": roots}) + # If `dst_platform` is not specified then return else continue. + if not dst_platform: + return path + + if isinstance(roots, RootItem): + return roots.path_remapper(path, dst_platform, src_platform) + + for _root in roots.values(): + result = self.path_remapper( + path, dst_platform, src_platform, _root + ) + if result is not None: + return result + + def find_root_template_from_path(self, path, roots=None): + """Find root value in entered path and replace it with formatting key. + + Args: + path (str): Source path where root will be searched. + roots (Roots/dict, optional): It is possible to use different + roots than instance where method was triggered has. + + Returns: + tuple: Output contains tuple with bool representing success as + first value and path with or without replaced root with + formatting key as second value. + + Raises: + ValueError: When roots are not entered and can't be loaded. + """ + if roots is None: + self.log.debug( + "Looking for matching root in path \"{}\".".format(path) + ) + roots = self.roots + + if roots is None: + raise ValueError("Roots are not set. Can't find path.") + + if isinstance(roots, RootItem): + return roots.find_root_template_from_path(path) + + for root_name, _root in roots.items(): + success, result = self.find_root_template_from_path(path, _root) + if success: + self.log.info("Found match in root \"{}\".".format(root_name)) + return success, result + + self.log.warning("No matching root was found in current setting.") + return (False, path) + + def set_root_environments(self): + """Set root environments for current project.""" + for key, value in self.root_environments().items(): + os.environ[key] = value + + def root_environments(self): + """Use root keys to create unique keys for environment variables. + + Concatenates prefix "AYON_PROJECT_ROOT_" with root keys to create + unique keys. + + Returns: + dict: Result is `{(str): (str)}` dicitonary where key represents + unique key concatenated by keys and value is root value of + current platform root. + + Example: + With raw root values:: + "work": { + "windows": "P:/projects/work", + "linux": "/mnt/share/projects/work", + "darwin": "/darwin/path/work" + }, + "publish": { + "windows": "P:/projects/publish", + "linux": "/mnt/share/projects/publish", + "darwin": "/darwin/path/publish" + } + + Result on windows platform:: + { + "AYON_PROJECT_ROOT_WORK": "P:/projects/work", + "AYON_PROJECT_ROOT_PUBLISH": "P:/projects/publish" + } + + """ + return self._root_environments() + + def all_root_paths(self, roots=None): + """Return all paths for all roots of all platforms.""" + if roots is None: + roots = self.roots + + output = [] + if isinstance(roots, RootItem): + for value in roots.raw_data.values(): + output.append(value) + return output + + for _roots in roots.values(): + output.extend(self.all_root_paths(_roots)) + return output + + def _root_environments(self, keys=None, roots=None): + if not keys: + keys = [] + if roots is None: + roots = self.roots + + if isinstance(roots, RootItem): + key_items = [self.env_prefix] + for _key in keys: + key_items.append(_key.upper()) + + key = "_".join(key_items) + # Make sure key and value does not contain unicode + # - can happen in Python 2 hosts + return {str(key): str(roots.value)} + + output = {} + for _key, _value in roots.items(): + _keys = list(keys) + _keys.append(_key) + output.update(self._root_environments(_keys, _value)) + return output + + def root_environmets_fill_data(self, template=None): + """Environment variable values in dictionary for rootless path. + + Args: + template (str): Template for environment variable key fill. + By default is set to `"${}"`. + """ + if template is None: + template = "${}" + return self._root_environmets_fill_data(template) + + def _root_environmets_fill_data(self, template, keys=None, roots=None): + if keys is None and roots is None: + return { + "root": self._root_environmets_fill_data( + template, [], self.roots + ) + } + + if isinstance(roots, RootItem): + key_items = [Roots.env_prefix] + for _key in keys: + key_items.append(_key.upper()) + key = "_".join(key_items) + return template.format(key) + + output = {} + for key, value in roots.items(): + _keys = list(keys) + _keys.append(key) + output[key] = self._root_environmets_fill_data( + template, _keys, value + ) + return output + + @property + def project_name(self): + """Return project name which will be used for loading root values.""" + return self.anatomy.project_name + + @property + def roots(self): + """Property for filling "root" key in templates. + + This property returns roots for current project or default root values. + Warning: + Default roots value may cause issues when project use different + roots settings. That may happen when project use multiroot + templates but default roots miss their keys. + """ + if self.project_name != self.loaded_project: + self._roots = None + + if self._roots is None: + self._roots = self._discover() + self.loaded_project = self.project_name + return self._roots + + def _discover(self): + """ Loads current project's roots or default. + + Default roots are loaded if project override's does not contain roots. + + Returns: + `RootItem` or `dict` with multiple `RootItem`s when multiroot + setting is used. + """ + + return self._parse_dict(self.anatomy["roots"], parent=self) + + @staticmethod + def _parse_dict(data, key=None, parent_keys=None, parent=None): + """Parse roots raw data into RootItem or dictionary with RootItems. + + Converting raw roots data to `RootItem` helps to handle platform keys. + This method is recursive to be able handle multiroot setup and + is static to be able to load default roots without creating new object. + + Args: + data (dict): Should contain raw roots data to be parsed. + key (str, optional): Current root key. Set by recursion. + parent_keys (list): Parent dictionary keys. Set by recursion. + parent (Roots, optional): Parent object set in `RootItem` + helps to keep RootItem instance updated with `Roots` object. + + Returns: + `RootItem` or `dict` with multiple `RootItem`s when multiroot + setting is used. + """ + if not parent_keys: + parent_keys = [] + is_last = False + for value in data.values(): + if isinstance(value, six.string_types): + is_last = True + break + + if is_last: + return RootItem(data, key, parent_keys, parent=parent) + + output = {} + for _key, value in data.items(): + _parent_keys = list(parent_keys) + _parent_keys.append(_key) + output[_key] = Roots._parse_dict(value, _key, _parent_keys, parent) + return output diff --git a/client/ayon_core/pipeline/anatomy/templates.py b/client/ayon_core/pipeline/anatomy/templates.py new file mode 100644 index 0000000000..1eee5b0945 --- /dev/null +++ b/client/ayon_core/pipeline/anatomy/templates.py @@ -0,0 +1,523 @@ +import os +import copy +import re +import collections +import numbers + +import six + +from ayon_core.lib import Logger +from ayon_core.lib.path_templates import ( + TemplateResult, + StringTemplate, + TemplatesDict, +) + +from .exceptions import AnatomyTemplateUnsolved +from .roots import RootItem + + +class AnatomyTemplateResult(TemplateResult): + rootless = None + + def __new__(cls, result, rootless_path): + new_obj = super(AnatomyTemplateResult, cls).__new__( + cls, + str(result), + result.template, + result.solved, + result.used_values, + result.missing_keys, + result.invalid_types + ) + new_obj.rootless = rootless_path + return new_obj + + def validate(self): + if not self.solved: + raise AnatomyTemplateUnsolved( + self.template, + self.missing_keys, + self.invalid_types + ) + + def copy(self): + tmp = TemplateResult( + str(self), + self.template, + self.solved, + self.used_values, + self.missing_keys, + self.invalid_types + ) + return self.__class__(tmp, self.rootless) + + def normalized(self): + """Convert to normalized path.""" + + tmp = TemplateResult( + os.path.normpath(self), + self.template, + self.solved, + self.used_values, + self.missing_keys, + self.invalid_types + ) + return self.__class__(tmp, self.rootless) + + +class AnatomyStringTemplate(StringTemplate): + """String template which has access to anatomy.""" + + def __init__(self, anatomy_templates, template): + self.anatomy_templates = anatomy_templates + super(AnatomyStringTemplate, self).__init__(template) + + def format(self, data): + """Format template and add 'root' key to data if not available. + + Args: + data (dict[str, Any]): Formatting data for template. + + Returns: + AnatomyTemplateResult: Formatting result. + """ + + anatomy_templates = self.anatomy_templates + if not data.get("root"): + data = copy.deepcopy(data) + data["root"] = anatomy_templates.anatomy.roots + result = StringTemplate.format(self, data) + rootless_path = anatomy_templates.rootless_path_from_result(result) + return AnatomyTemplateResult(result, rootless_path) + + +class AnatomyTemplates(TemplatesDict): + inner_key_pattern = re.compile(r"(\{@.*?[^{}0]*\})") + inner_key_name_pattern = re.compile(r"\{@(.*?[^{}0]*)\}") + + def __init__(self, anatomy): + self._log = Logger.get_logger(self.__class__.__name__) + super(AnatomyTemplates, self).__init__() + self.anatomy = anatomy + self.loaded_project = None + + def reset(self): + self._raw_templates = None + self._templates = None + self._objected_templates = None + + @property + def project_name(self): + return self.anatomy.project_name + + @property + def roots(self): + return self.anatomy.roots + + @property + def templates(self): + self._validate_discovery() + return self._templates + + @property + def objected_templates(self): + self._validate_discovery() + return self._objected_templates + + def _validate_discovery(self): + if self.project_name != self.loaded_project: + self.reset() + + if self._templates is None: + self._discover() + self.loaded_project = self.project_name + + def _format_value(self, value, data): + if isinstance(value, RootItem): + return self._solve_dict(value, data) + return super(AnatomyTemplates, self)._format_value(value, data) + + @staticmethod + def _ayon_template_conversion(templates): + def _convert_template_item(template_item): + # Change 'directory' to 'folder' + if "directory" in template_item: + template_item["folder"] = template_item["directory"] + + if ( + "path" not in template_item + and "file" in template_item + and "folder" in template_item + ): + template_item["path"] = "/".join( + (template_item["folder"], template_item["file"]) + ) + + def _get_default_template_name(templates): + default_template = None + for name, template in templates.items(): + if name == "default": + return "default" + + if default_template is None: + default_template = name + + return default_template + + def _fill_template_category(templates, cat_templates, cat_key): + default_template_name = _get_default_template_name(cat_templates) + for template_name, cat_template in cat_templates.items(): + _convert_template_item(cat_template) + if template_name == default_template_name: + templates[cat_key] = cat_template + else: + new_name = "{}_{}".format(cat_key, template_name) + templates["others"][new_name] = cat_template + + others_templates = templates.pop("others", None) or {} + new_others_templates = {} + templates["others"] = new_others_templates + for name, template in others_templates.items(): + _convert_template_item(template) + new_others_templates[name] = template + + for key in ( + "work", + "publish", + "hero", + ): + cat_templates = templates.pop(key) + _fill_template_category(templates, cat_templates, key) + + delivery_templates = templates.pop("delivery", None) or {} + new_delivery_templates = {} + for name, delivery_template in delivery_templates.items(): + new_delivery_templates[name] = "/".join( + (delivery_template["directory"], delivery_template["file"]) + ) + templates["delivery"] = new_delivery_templates + + def set_templates(self, templates): + if not templates: + self.reset() + return + + templates = copy.deepcopy(templates) + # TODO remove AYON convertion + self._ayon_template_conversion(templates) + + self._raw_templates = copy.deepcopy(templates) + v_queue = collections.deque() + v_queue.append(templates) + while v_queue: + item = v_queue.popleft() + if not isinstance(item, dict): + continue + + for key in tuple(item.keys()): + value = item[key] + if isinstance(value, dict): + v_queue.append(value) + + elif ( + isinstance(value, six.string_types) + and "{task}" in value + ): + item[key] = value.replace("{task}", "{task[name]}") + + solved_templates = self.solve_template_inner_links(templates) + self._templates = solved_templates + self._objected_templates = self.create_objected_templates( + solved_templates + ) + + def _create_template_object(self, template): + return AnatomyStringTemplate(self, template) + + def default_templates(self): + """Return default templates data with solved inner keys.""" + return self.solve_template_inner_links( + self.anatomy["templates"] + ) + + def _discover(self): + """ Loads anatomy templates from yaml. + Default templates are loaded if project is not set or project does + not have set it's own. + TODO: create templates if not exist. + + Returns: + TemplatesResultDict: Contain templates data for current project of + default templates. + """ + + if self.project_name is None: + # QUESTION create project specific if not found? + raise AssertionError(( + "Project \"{0}\" does not have his own templates." + " Trying to use default." + ).format(self.project_name)) + + self.set_templates(self.anatomy["templates"]) + + @classmethod + def replace_inner_keys(cls, matches, value, key_values, key): + """Replacement of inner keys in template values.""" + for match in matches: + anatomy_sub_keys = ( + cls.inner_key_name_pattern.findall(match) + ) + if key in anatomy_sub_keys: + raise ValueError(( + "Unsolvable recursion in inner keys, " + "key: \"{}\" is in his own value." + " Can't determine source, please check Anatomy templates." + ).format(key)) + + for anatomy_sub_key in anatomy_sub_keys: + replace_value = key_values.get(anatomy_sub_key) + if replace_value is None: + raise KeyError(( + "Anatomy templates can't be filled." + " Anatomy key `{0}` has" + " invalid inner key `{1}`." + ).format(key, anatomy_sub_key)) + + if not ( + isinstance(replace_value, numbers.Number) + or isinstance(replace_value, six.string_types) + ): + raise ValueError(( + "Anatomy templates can't be filled." + " Anatomy key `{0}` has" + " invalid inner key `{1}`" + " with value `{2}`." + ).format(key, anatomy_sub_key, str(replace_value))) + + value = value.replace(match, str(replace_value)) + + return value + + @classmethod + def prepare_inner_keys(cls, key_values): + """Check values of inner keys. + + Check if inner key exist in template group and has valid value. + It is also required to avoid infinite loop with unsolvable recursion + when first inner key's value refers to second inner key's value where + first is used. + """ + keys_to_solve = set(key_values.keys()) + while True: + found = False + for key in tuple(keys_to_solve): + value = key_values[key] + + if isinstance(value, six.string_types): + matches = cls.inner_key_pattern.findall(value) + if not matches: + keys_to_solve.remove(key) + continue + + found = True + key_values[key] = cls.replace_inner_keys( + matches, value, key_values, key + ) + continue + + elif not isinstance(value, dict): + keys_to_solve.remove(key) + continue + + subdict_found = False + for _key, _value in tuple(value.items()): + matches = cls.inner_key_pattern.findall(_value) + if not matches: + continue + + subdict_found = True + found = True + key_values[key][_key] = cls.replace_inner_keys( + matches, _value, key_values, + "{}.{}".format(key, _key) + ) + + if not subdict_found: + keys_to_solve.remove(key) + + if not found: + break + + return key_values + + @classmethod + def solve_template_inner_links(cls, templates): + """Solve templates inner keys identified by "{@*}". + + Process is split into 2 parts. + First is collecting all global keys (keys in top hierarchy where value + is not dictionary). All global keys are set for all group keys (keys + in top hierarchy where value is dictionary). Value of a key is not + overridden in group if already contain value for the key. + + In second part all keys with "at" symbol in value are replaced with + value of the key afterward "at" symbol from the group. + + Args: + templates (dict): Raw templates data. + + Example: + templates:: + key_1: "value_1", + key_2: "{@key_1}/{filling_key}" + + group_1: + key_3: "value_3/{@key_2}" + + group_2: + key_2": "value_2" + key_4": "value_4/{@key_2}" + + output:: + key_1: "value_1" + key_2: "value_1/{filling_key}" + + group_1: { + key_1: "value_1" + key_2: "value_1/{filling_key}" + key_3: "value_3/value_1/{filling_key}" + + group_2: { + key_1: "value_1" + key_2: "value_2" + key_4: "value_3/value_2" + """ + default_key_values = templates.pop("common", {}) + for key, value in tuple(templates.items()): + if isinstance(value, dict): + continue + default_key_values[key] = templates.pop(key) + + # Pop "others" key before before expected keys are processed + other_templates = templates.pop("others") or {} + + keys_by_subkey = {} + for sub_key, sub_value in templates.items(): + key_values = {} + key_values.update(default_key_values) + key_values.update(sub_value) + keys_by_subkey[sub_key] = cls.prepare_inner_keys(key_values) + + for sub_key, sub_value in other_templates.items(): + if sub_key in keys_by_subkey: + self.log.warning(( + "Key \"{}\" is duplicated in others. Skipping." + ).format(sub_key)) + continue + + key_values = {} + key_values.update(default_key_values) + key_values.update(sub_value) + keys_by_subkey[sub_key] = cls.prepare_inner_keys(key_values) + + default_keys_by_subkeys = cls.prepare_inner_keys(default_key_values) + + for key, value in default_keys_by_subkeys.items(): + keys_by_subkey[key] = value + + return keys_by_subkey + + @classmethod + def _dict_to_subkeys_list(cls, subdict, pre_keys=None): + if pre_keys is None: + pre_keys = [] + output = [] + for key in subdict: + value = subdict[key] + result = list(pre_keys) + result.append(key) + if isinstance(value, dict): + for item in cls._dict_to_subkeys_list(value, result): + output.append(item) + else: + output.append(result) + return output + + def _keys_to_dicts(self, key_list, value): + if not key_list: + return None + if len(key_list) == 1: + return {key_list[0]: value} + return {key_list[0]: self._keys_to_dicts(key_list[1:], value)} + + @classmethod + def rootless_path_from_result(cls, result): + """Calculate rootless path from formatting result. + + Args: + result (TemplateResult): Result of StringTemplate formatting. + + Returns: + str: Rootless path if result contains one of anatomy roots. + """ + + used_values = result.used_values + missing_keys = result.missing_keys + template = result.template + invalid_types = result.invalid_types + if ( + "root" not in used_values + or "root" in missing_keys + or "{root" not in template + ): + return + + for invalid_type in invalid_types: + if "root" in invalid_type: + return + + root_keys = cls._dict_to_subkeys_list({"root": used_values["root"]}) + if not root_keys: + return + + output = str(result) + for used_root_keys in root_keys: + if not used_root_keys: + continue + + used_value = used_values + root_key = None + for key in used_root_keys: + used_value = used_value[key] + if root_key is None: + root_key = key + else: + root_key += "[{}]".format(key) + + root_key = "{" + root_key + "}" + output = output.replace(str(used_value), root_key) + + return output + + def format(self, data, strict=True): + copy_data = copy.deepcopy(data) + roots = self.roots + if roots: + copy_data["root"] = roots + result = super(AnatomyTemplates, self).format(copy_data) + result.strict = strict + return result + + def format_all(self, in_data, only_keys=True): + """ Solves templates based on entered data. + + Args: + data (dict): Containing keys to be filled into template. + + Returns: + TemplatesResultDict: Output `TemplateResult` have `strict` + attribute set to False so accessing unfilled keys in templates + won't raise any exceptions. + """ + return self.format(in_data, strict=False) \ No newline at end of file