split anatomy.py into multiple python files

This commit is contained in:
Jakub Trllo 2024-03-14 18:10:22 +01:00
parent b591030d33
commit b908ddcedb
6 changed files with 1615 additions and 1550 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,17 @@
from .exceptions import (
ProjectNotSet,
RootCombinationError,
TemplateMissingKey,
AnatomyTemplateUnsolved,
)
from .anatomy import Anatomy
__all__ = (
"ProjectNotSet",
"RootCombinationError",
"TemplateMissingKey",
"AnatomyTemplateUnsolved",
"Anatomy",
)

View file

@ -0,0 +1,502 @@
import os
import re
import copy
import platform
import collections
import time
import ayon_api
from ayon_core.lib import Logger, get_local_site_id
from ayon_core.addon import AddonsManager
from .exceptions import RootCombinationError, ProjectNotSet
from .roots import Roots
from .templates import AnatomyTemplates
log = Logger.get_logger(__name__)
class BaseAnatomy(object):
"""Anatomy module helps to keep project settings.
Wraps key project specifications, AnatomyTemplates and Roots.
"""
root_key_regex = re.compile(r"{(root?[^}]+)}")
root_name_regex = re.compile(r"root\[([^]]+)\]")
def __init__(self, project_entity, root_overrides=None):
project_name = project_entity["name"]
self.project_name = project_name
self.project_code = project_entity["code"]
self._data = self._prepare_anatomy_data(
project_entity, root_overrides
)
self._templates_obj = AnatomyTemplates(self)
self._roots_obj = Roots(self)
# Anatomy used as dictionary
# - implemented only getters returning copy
def __getitem__(self, key):
return copy.deepcopy(self._data[key])
def get(self, key, default=None):
return copy.deepcopy(self._data).get(key, default)
def keys(self):
return copy.deepcopy(self._data).keys()
def values(self):
return copy.deepcopy(self._data).values()
def items(self):
return copy.deepcopy(self._data).items()
def _prepare_anatomy_data(self, project_entity, root_overrides):
"""Prepare anatomy data for further processing.
Method added to replace `{task}` with `{task[name]}` in templates.
"""
anatomy_data = self._project_entity_to_anatomy_data(project_entity)
self._apply_local_settings_on_anatomy_data(
anatomy_data,
root_overrides
)
return anatomy_data
@property
def templates(self):
"""Wrap property `templates` of Anatomy's AnatomyTemplates instance."""
return self._templates_obj.templates
@property
def templates_obj(self):
"""Return `AnatomyTemplates` object of current Anatomy instance."""
return self._templates_obj
def format(self, *args, **kwargs):
"""Wrap `format` method of Anatomy's `templates_obj`."""
return self._templates_obj.format(*args, **kwargs)
def format_all(self, *args, **kwargs):
"""Wrap `format_all` method of Anatomy's `templates_obj`."""
return self._templates_obj.format_all(*args, **kwargs)
@property
def roots(self):
"""Wrap `roots` property of Anatomy's `roots_obj`."""
return self._roots_obj.roots
@property
def roots_obj(self):
"""Return `Roots` object of current Anatomy instance."""
return self._roots_obj
def root_environments(self):
"""Return AYON_PROJECT_ROOT_* environments for current project."""
return self._roots_obj.root_environments()
def root_environmets_fill_data(self, template=None):
"""Environment variable values in dictionary for rootless path.
Args:
template (str): Template for environment variable key fill.
By default is set to `"${}"`.
"""
return self.roots_obj.root_environmets_fill_data(template)
def find_root_template_from_path(self, *args, **kwargs):
"""Wrapper for Roots `find_root_template_from_path`."""
return self.roots_obj.find_root_template_from_path(*args, **kwargs)
def path_remapper(self, *args, **kwargs):
"""Wrapper for Roots `path_remapper`."""
return self.roots_obj.path_remapper(*args, **kwargs)
def all_root_paths(self):
"""Wrapper for Roots `all_root_paths`."""
return self.roots_obj.all_root_paths()
def set_root_environments(self):
"""Set AYON_PROJECT_ROOT_* environments for current project."""
self._roots_obj.set_root_environments()
def root_names(self):
"""Return root names for current project."""
return self.root_names_from_templates(self.templates)
def _root_keys_from_templates(self, data):
"""Extract root key from templates in data.
Args:
data (dict): Data that may contain templates as string.
Return:
set: Set of all root names from templates as strings.
Output example: `{"root[work]", "root[publish]"}`
"""
output = set()
if isinstance(data, dict):
for value in data.values():
for root in self._root_keys_from_templates(value):
output.add(root)
elif isinstance(data, str):
for group in re.findall(self.root_key_regex, data):
output.add(group)
return output
def root_value_for_template(self, template):
"""Returns value of root key from template."""
root_templates = []
for group in re.findall(self.root_key_regex, template):
root_templates.append("{" + group + "}")
if not root_templates:
return None
return root_templates[0].format(**{"root": self.roots})
def root_names_from_templates(self, templates):
"""Extract root names form anatomy templates.
Returns None if values in templates contain only "{root}".
Empty list is returned if there is no "root" in templates.
Else returns all root names from templates in list.
RootCombinationError is raised when templates contain both root types,
basic "{root}" and with root name specification "{root[work]}".
Args:
templates (dict): Anatomy templates where roots are not filled.
Return:
list/None: List of all root names from templates as strings when
multiroot setup is used, otherwise None is returned.
"""
roots = list(self._root_keys_from_templates(templates))
# Return empty list if no roots found in templates
if not roots:
return roots
# Raise exception when root keys have roots with and without root name.
# Invalid output example: ["root", "root[project]", "root[render]"]
if len(roots) > 1 and "root" in roots:
raise RootCombinationError(roots)
# Return None if "root" without root name in templates
if len(roots) == 1 and roots[0] == "root":
return None
names = set()
for root in roots:
for group in re.findall(self.root_name_regex, root):
names.add(group)
return list(names)
def fill_root(self, template_path):
"""Fill template path where is only "root" key unfilled.
Args:
template_path (str): Path with "root" key in.
Example path: "{root}/projects/MyProject/Shot01/Lighting/..."
Return:
str: formatted path
"""
# NOTE does not care if there are different keys than "root"
return template_path.format(**{"root": self.roots})
@classmethod
def fill_root_with_path(cls, rootless_path, root_path):
"""Fill path without filled "root" key with passed path.
This is helper to fill root with different directory path than anatomy
has defined no matter if is single or multiroot.
Output path is same as input path if `rootless_path` does not contain
unfilled root key.
Args:
rootless_path (str): Path without filled "root" key. Example:
"{root[work]}/MyProject/..."
root_path (str): What should replace root key in `rootless_path`.
Returns:
str: Path with filled root.
"""
output = str(rootless_path)
for group in re.findall(cls.root_key_regex, rootless_path):
replacement = "{" + group + "}"
output = output.replace(replacement, root_path)
return output
def replace_root_with_env_key(self, filepath, template=None):
"""Replace root of path with environment key.
# Example:
## Project with roots:
```
{
"nas": {
"windows": P:/projects",
...
}
...
}
```
## Entered filepath
"P:/projects/project/folder/task/animation_v001.ma"
## Entered template
"<{}>"
## Output
"<AYON_PROJECT_ROOT_NAS>/project/folder/task/animation_v001.ma"
Args:
filepath (str): Full file path where root should be replaced.
template (str): Optional template for environment key. Must
have one index format key.
Default value if not entered: "${}"
Returns:
str: Path where root is replaced with environment root key.
Raise:
ValueError: When project's roots were not found in entered path.
"""
success, rootless_path = self.find_root_template_from_path(filepath)
if not success:
raise ValueError(
"{}: Project's roots were not found in path: {}".format(
self.project_name, filepath
)
)
data = self.root_environmets_fill_data(template)
return rootless_path.format(**data)
def _project_entity_to_anatomy_data(self, project_entity):
"""Convert project document to anatomy data.
Probably should fill missing keys and values.
"""
output = copy.deepcopy(project_entity["config"])
# TODO remove AYON convertion
task_types = copy.deepcopy(project_entity["taskTypes"])
new_task_types = {}
for task_type in task_types:
name = task_type["name"]
new_task_types[name] = task_type
output["tasks"] = new_task_types
output["attributes"] = copy.deepcopy(project_entity["attrib"])
return output
def _apply_local_settings_on_anatomy_data(
self, anatomy_data, root_overrides
):
"""Apply local settings on anatomy data.
ATM local settings can modify project roots. Project name is required
as local settings have data stored data by project's name.
Local settings override root values in this order:
1.) Check if local settings contain overrides for default project and
apply it's values on roots if there are any.
2.) If passed `project_name` is not None then check project specific
overrides in local settings for the project and apply it's value on
roots if there are any.
NOTE: Root values of default project from local settings are always
applied if are set.
Args:
anatomy_data (dict): Data for anatomy.
root_overrides (dict): Data of local settings.
"""
# Skip processing if roots for current active site are not available in
# local settings
if not root_overrides:
return
current_platform = platform.system().lower()
root_data = anatomy_data["roots"]
for root_name, path in root_overrides.items():
if root_name not in root_data:
continue
anatomy_data["roots"][root_name][current_platform] = (
path
)
class CacheItem:
"""Helper to cache data.
Helper does not handle refresh of data and does not mark data as outdated.
Who uses the object should check of outdated state on his own will.
"""
default_lifetime = 10
def __init__(self, lifetime=None):
self._data = None
self._cached = None
self._lifetime = lifetime or self.default_lifetime
@property
def data(self):
"""Cached data/object.
Returns:
Any: Whatever was cached.
"""
return self._data
@property
def is_outdated(self):
"""Item has outdated cache.
Lifetime of cache item expired or was not yet set.
Returns:
bool: Item is outdated.
"""
if self._cached is None:
return True
return (time.time() - self._cached) > self._lifetime
def update_data(self, data):
"""Update cache of data.
Args:
data (Any): Data to cache.
"""
self._data = data
self._cached = time.time()
class Anatomy(BaseAnatomy):
_sync_server_addon_cache = CacheItem()
_project_cache = collections.defaultdict(CacheItem)
_default_site_id_cache = collections.defaultdict(CacheItem)
_root_overrides_cache = collections.defaultdict(
lambda: collections.defaultdict(CacheItem)
)
def __init__(
self, project_name=None, site_name=None, project_entity=None
):
if not project_name:
project_name = os.environ.get("AYON_PROJECT_NAME")
if not project_name:
raise ProjectNotSet((
"Implementation bug: Project name is not set. Anatomy requires"
" to load data for specific project."
))
if not project_entity:
project_entity = self.get_project_entity_from_cache(project_name)
root_overrides = self._get_site_root_overrides(
project_name, site_name
)
super(Anatomy, self).__init__(project_entity, root_overrides)
@classmethod
def get_project_entity_from_cache(cls, project_name):
project_cache = cls._project_cache[project_name]
if project_cache.is_outdated:
project_cache.update_data(ayon_api.get_project(project_name))
return copy.deepcopy(project_cache.data)
@classmethod
def get_sync_server_addon(cls):
if cls._sync_server_addon_cache.is_outdated:
manager = AddonsManager()
cls._sync_server_addon_cache.update_data(
manager.get_enabled_addon("sync_server")
)
return cls._sync_server_addon_cache.data
@classmethod
def _get_studio_roots_overrides(cls, project_name):
"""This would return 'studio' site override by local settings.
Notes:
This logic handles local overrides of studio site which may be
available even when sync server is not enabled.
Handling of 'studio' and 'local' site was separated as preparation
for AYON development where that will be received from
separated sources.
Args:
project_name (str): Name of project.
Returns:
Union[Dict[str, str], None]): Local root overrides.
"""
if not project_name:
return
return ayon_api.get_project_roots_for_site(
project_name, get_local_site_id()
)
@classmethod
def _get_site_root_overrides(cls, project_name, site_name):
"""Get root overrides for site.
Args:
project_name (str): Project name for which root overrides should be
received.
site_name (Union[str, None]): Name of site for which root overrides
should be returned.
"""
# First check if sync server is available and enabled
sync_server = cls.get_sync_server_addon()
if sync_server is None or not sync_server.enabled:
# QUESTION is ok to force 'studio' when site sync is not enabled?
site_name = "studio"
elif not site_name:
# Use sync server to receive active site name
project_cache = cls._default_site_id_cache[project_name]
if project_cache.is_outdated:
project_cache.update_data(
sync_server.get_active_site_type(project_name)
)
site_name = project_cache.data
site_cache = cls._root_overrides_cache[project_name][site_name]
if site_cache.is_outdated:
if site_name == "studio":
# Handle studio root overrides without sync server
# - studio root overrides can be done even without sync server
roots_overrides = cls._get_studio_roots_overrides(
project_name
)
else:
# Ask sync server to get roots overrides
roots_overrides = sync_server.get_site_root_overrides(
project_name, site_name
)
site_cache.update_data(roots_overrides)
return site_cache.data

View file

@ -0,0 +1,39 @@
from ayon_core.lib.path_templates import TemplateUnsolved
class ProjectNotSet(Exception):
"""Exception raised when is created Anatomy without project name."""
class RootCombinationError(Exception):
"""This exception is raised when templates has combined root types."""
def __init__(self, roots):
joined_roots = ", ".join(
["\"{}\"".format(_root) for _root in roots]
)
# TODO better error message
msg = (
"Combination of root with and"
" without root name in AnatomyTemplates. {}"
).format(joined_roots)
super(RootCombinationError, self).__init__(msg)
class TemplateMissingKey(Exception):
"""Exception for cases when key does not exist in template."""
msg = "Template key '{}' was not found."
def __init__(self, parents):
parent_join = "".join(["[\"{0}\"]".format(key) for key in parents])
super(TemplateMissingKey, self).__init__(
self.msg.format(parent_join)
)
class AnatomyTemplateUnsolved(TemplateUnsolved):
"""Exception for unsolved template when strict is set to True."""
msg = "Anatomy template \"{0}\" is unsolved.{1}{2}"

View file

@ -0,0 +1,534 @@
import os
import numbers
import platform
import six
from ayon_core.lib import Logger
from ayon_core.lib.path_templates import FormatObject
class RootItem(FormatObject):
"""Represents one item or roots.
Holds raw data of root item specification. Raw data contain value
for each platform, but current platform value is used when object
is used for formatting of template.
Args:
root_raw_data (dict): Dictionary containing root values by platform
names. ["windows", "linux" and "darwin"]
name (str, optional): Root name which is representing. Used with
multi root setup otherwise None value is expected.
parent_keys (list, optional): All dictionary parent keys. Values of
`parent_keys` are used for get full key which RootItem is
representing. Used for replacing root value in path with
formattable key. e.g. parent_keys == ["work"] -> {root[work]}
parent (object, optional): It is expected to be `Roots` object.
Value of `parent` won't affect code logic much.
"""
def __init__(
self, root_raw_data, name=None, parent_keys=None, parent=None
):
super(RootItem, self).__init__()
self._log = None
lowered_platform_keys = {}
for key, value in root_raw_data.items():
lowered_platform_keys[key.lower()] = value
self.raw_data = lowered_platform_keys
self.cleaned_data = self._clean_roots(lowered_platform_keys)
self.name = name
self.parent_keys = parent_keys or []
self.parent = parent
self.available_platforms = list(lowered_platform_keys.keys())
self.value = lowered_platform_keys.get(platform.system().lower())
self.clean_value = self.clean_root(self.value)
def __format__(self, *args, **kwargs):
return self.value.__format__(*args, **kwargs)
def __str__(self):
return str(self.value)
def __repr__(self):
return self.__str__()
def __getitem__(self, key):
if isinstance(key, numbers.Number):
return self.value[key]
additional_info = ""
if self.parent and self.parent.project_name:
additional_info += " for project \"{}\"".format(
self.parent.project_name
)
raise AssertionError(
"Root key \"{}\" is missing{}.".format(
key, additional_info
)
)
@property
def log(self):
if self._log is None:
self._log = Logger.get_logger(self.__class__.__name__)
return self._log
def full_key(self):
"""Full key value for dictionary formatting in template.
Returns:
str: Return full replacement key for formatting. This helps when
multiple roots are set. In that case e.g. `"root[work]"` is
returned.
"""
if not self.name:
return "root"
joined_parent_keys = "".join(
["[{}]".format(key) for key in self.parent_keys]
)
return "root{}".format(joined_parent_keys)
def clean_path(self, path):
"""Just replace backslashes with forward slashes."""
return str(path).replace("\\", "/")
def clean_root(self, root):
"""Makes sure root value does not end with slash."""
if root:
root = self.clean_path(root)
while root.endswith("/"):
root = root[:-1]
return root
def _clean_roots(self, raw_data):
"""Clean all values of raw root item values."""
cleaned = {}
for key, value in raw_data.items():
cleaned[key] = self.clean_root(value)
return cleaned
def path_remapper(self, path, dst_platform=None, src_platform=None):
"""Remap path for specific platform.
Args:
path (str): Source path which need to be remapped.
dst_platform (str, optional): Specify destination platform
for which remapping should happen.
src_platform (str, optional): Specify source platform. This is
recommended to not use and keep unset until you really want
to use specific platform.
roots (dict/RootItem/None, optional): It is possible to remap
path with different roots then instance where method was
called has.
Returns:
str/None: When path does not contain known root then
None is returned else returns remapped path with "{root}"
or "{root[<name>]}".
"""
cleaned_path = self.clean_path(path)
if dst_platform:
dst_root_clean = self.cleaned_data.get(dst_platform)
if not dst_root_clean:
key_part = ""
full_key = self.full_key()
if full_key != "root":
key_part += "\"{}\" ".format(full_key)
self.log.warning(
"Root {}miss platform \"{}\" definition.".format(
key_part, dst_platform
)
)
return None
if cleaned_path.startswith(dst_root_clean):
return cleaned_path
if src_platform:
src_root_clean = self.cleaned_data.get(src_platform)
if src_root_clean is None:
self.log.warning(
"Root \"{}\" miss platform \"{}\" definition.".format(
self.full_key(), src_platform
)
)
return None
if not cleaned_path.startswith(src_root_clean):
return None
subpath = cleaned_path[len(src_root_clean):]
if dst_platform:
# `dst_root_clean` is used from upper condition
return dst_root_clean + subpath
return self.clean_value + subpath
result, template = self.find_root_template_from_path(path)
if not result:
return None
def parent_dict(keys, value):
if not keys:
return value
key = keys.pop(0)
return {key: parent_dict(keys, value)}
if dst_platform:
format_value = parent_dict(list(self.parent_keys), dst_root_clean)
else:
format_value = parent_dict(list(self.parent_keys), self.value)
return template.format(**{"root": format_value})
def find_root_template_from_path(self, path):
"""Replaces known root value with formattable key in path.
All platform values are checked for this replacement.
Args:
path (str): Path where root value should be found.
Returns:
tuple: Tuple contain 2 values: `success` (bool) and `path` (str).
When success it True then path should contain replaced root
value with formattable key.
Example:
When input path is::
"C:/windows/path/root/projects/my_project/file.ext"
And raw data of item looks like::
{
"windows": "C:/windows/path/root",
"linux": "/mount/root"
}
Output will be::
(True, "{root}/projects/my_project/file.ext")
If any of raw data value wouldn't match path's root output is::
(False, "C:/windows/path/root/projects/my_project/file.ext")
"""
result = False
output = str(path)
mod_path = self.clean_path(path)
for root_os, root_path in self.cleaned_data.items():
# Skip empty paths
if not root_path:
continue
_mod_path = mod_path # reset to original cleaned value
if root_os == "windows":
root_path = root_path.lower()
_mod_path = _mod_path.lower()
if _mod_path.startswith(root_path):
result = True
replacement = "{" + self.full_key() + "}"
output = replacement + mod_path[len(root_path):]
break
return (result, output)
class Roots:
"""Object which should be used for formatting "root" key in templates.
Args:
anatomy Anatomy: Anatomy object created for a specific project.
"""
env_prefix = "AYON_PROJECT_ROOT"
roots_filename = "roots.json"
def __init__(self, anatomy):
self._log = None
self.anatomy = anatomy
self.loaded_project = None
self._roots = None
def __format__(self, *args, **kwargs):
return self.roots.__format__(*args, **kwargs)
def __getitem__(self, key):
return self.roots[key]
@property
def log(self):
if self._log is None:
self._log = Logger.get_logger(self.__class__.__name__)
return self._log
def reset(self):
"""Reset current roots value."""
self._roots = None
def path_remapper(
self, path, dst_platform=None, src_platform=None, roots=None
):
"""Remap path for specific platform.
Args:
path (str): Source path which need to be remapped.
dst_platform (str, optional): Specify destination platform
for which remapping should happen.
src_platform (str, optional): Specify source platform. This is
recommended to not use and keep unset until you really want
to use specific platform.
roots (dict/RootItem/None, optional): It is possible to remap
path with different roots then instance where method was
called has.
Returns:
str/None: When path does not contain known root then
None is returned else returns remapped path with "{root}"
or "{root[<name>]}".
"""
if roots is None:
roots = self.roots
if roots is None:
raise ValueError("Roots are not set. Can't find path.")
if "{root" in path:
path = path.format(**{"root": roots})
# If `dst_platform` is not specified then return else continue.
if not dst_platform:
return path
if isinstance(roots, RootItem):
return roots.path_remapper(path, dst_platform, src_platform)
for _root in roots.values():
result = self.path_remapper(
path, dst_platform, src_platform, _root
)
if result is not None:
return result
def find_root_template_from_path(self, path, roots=None):
"""Find root value in entered path and replace it with formatting key.
Args:
path (str): Source path where root will be searched.
roots (Roots/dict, optional): It is possible to use different
roots than instance where method was triggered has.
Returns:
tuple: Output contains tuple with bool representing success as
first value and path with or without replaced root with
formatting key as second value.
Raises:
ValueError: When roots are not entered and can't be loaded.
"""
if roots is None:
self.log.debug(
"Looking for matching root in path \"{}\".".format(path)
)
roots = self.roots
if roots is None:
raise ValueError("Roots are not set. Can't find path.")
if isinstance(roots, RootItem):
return roots.find_root_template_from_path(path)
for root_name, _root in roots.items():
success, result = self.find_root_template_from_path(path, _root)
if success:
self.log.info("Found match in root \"{}\".".format(root_name))
return success, result
self.log.warning("No matching root was found in current setting.")
return (False, path)
def set_root_environments(self):
"""Set root environments for current project."""
for key, value in self.root_environments().items():
os.environ[key] = value
def root_environments(self):
"""Use root keys to create unique keys for environment variables.
Concatenates prefix "AYON_PROJECT_ROOT_" with root keys to create
unique keys.
Returns:
dict: Result is `{(str): (str)}` dicitonary where key represents
unique key concatenated by keys and value is root value of
current platform root.
Example:
With raw root values::
"work": {
"windows": "P:/projects/work",
"linux": "/mnt/share/projects/work",
"darwin": "/darwin/path/work"
},
"publish": {
"windows": "P:/projects/publish",
"linux": "/mnt/share/projects/publish",
"darwin": "/darwin/path/publish"
}
Result on windows platform::
{
"AYON_PROJECT_ROOT_WORK": "P:/projects/work",
"AYON_PROJECT_ROOT_PUBLISH": "P:/projects/publish"
}
"""
return self._root_environments()
def all_root_paths(self, roots=None):
"""Return all paths for all roots of all platforms."""
if roots is None:
roots = self.roots
output = []
if isinstance(roots, RootItem):
for value in roots.raw_data.values():
output.append(value)
return output
for _roots in roots.values():
output.extend(self.all_root_paths(_roots))
return output
def _root_environments(self, keys=None, roots=None):
if not keys:
keys = []
if roots is None:
roots = self.roots
if isinstance(roots, RootItem):
key_items = [self.env_prefix]
for _key in keys:
key_items.append(_key.upper())
key = "_".join(key_items)
# Make sure key and value does not contain unicode
# - can happen in Python 2 hosts
return {str(key): str(roots.value)}
output = {}
for _key, _value in roots.items():
_keys = list(keys)
_keys.append(_key)
output.update(self._root_environments(_keys, _value))
return output
def root_environmets_fill_data(self, template=None):
"""Environment variable values in dictionary for rootless path.
Args:
template (str): Template for environment variable key fill.
By default is set to `"${}"`.
"""
if template is None:
template = "${}"
return self._root_environmets_fill_data(template)
def _root_environmets_fill_data(self, template, keys=None, roots=None):
if keys is None and roots is None:
return {
"root": self._root_environmets_fill_data(
template, [], self.roots
)
}
if isinstance(roots, RootItem):
key_items = [Roots.env_prefix]
for _key in keys:
key_items.append(_key.upper())
key = "_".join(key_items)
return template.format(key)
output = {}
for key, value in roots.items():
_keys = list(keys)
_keys.append(key)
output[key] = self._root_environmets_fill_data(
template, _keys, value
)
return output
@property
def project_name(self):
"""Return project name which will be used for loading root values."""
return self.anatomy.project_name
@property
def roots(self):
"""Property for filling "root" key in templates.
This property returns roots for current project or default root values.
Warning:
Default roots value may cause issues when project use different
roots settings. That may happen when project use multiroot
templates but default roots miss their keys.
"""
if self.project_name != self.loaded_project:
self._roots = None
if self._roots is None:
self._roots = self._discover()
self.loaded_project = self.project_name
return self._roots
def _discover(self):
""" Loads current project's roots or default.
Default roots are loaded if project override's does not contain roots.
Returns:
`RootItem` or `dict` with multiple `RootItem`s when multiroot
setting is used.
"""
return self._parse_dict(self.anatomy["roots"], parent=self)
@staticmethod
def _parse_dict(data, key=None, parent_keys=None, parent=None):
"""Parse roots raw data into RootItem or dictionary with RootItems.
Converting raw roots data to `RootItem` helps to handle platform keys.
This method is recursive to be able handle multiroot setup and
is static to be able to load default roots without creating new object.
Args:
data (dict): Should contain raw roots data to be parsed.
key (str, optional): Current root key. Set by recursion.
parent_keys (list): Parent dictionary keys. Set by recursion.
parent (Roots, optional): Parent object set in `RootItem`
helps to keep RootItem instance updated with `Roots` object.
Returns:
`RootItem` or `dict` with multiple `RootItem`s when multiroot
setting is used.
"""
if not parent_keys:
parent_keys = []
is_last = False
for value in data.values():
if isinstance(value, six.string_types):
is_last = True
break
if is_last:
return RootItem(data, key, parent_keys, parent=parent)
output = {}
for _key, value in data.items():
_parent_keys = list(parent_keys)
_parent_keys.append(_key)
output[_key] = Roots._parse_dict(value, _key, _parent_keys, parent)
return output

View file

@ -0,0 +1,523 @@
import os
import copy
import re
import collections
import numbers
import six
from ayon_core.lib import Logger
from ayon_core.lib.path_templates import (
TemplateResult,
StringTemplate,
TemplatesDict,
)
from .exceptions import AnatomyTemplateUnsolved
from .roots import RootItem
class AnatomyTemplateResult(TemplateResult):
rootless = None
def __new__(cls, result, rootless_path):
new_obj = super(AnatomyTemplateResult, cls).__new__(
cls,
str(result),
result.template,
result.solved,
result.used_values,
result.missing_keys,
result.invalid_types
)
new_obj.rootless = rootless_path
return new_obj
def validate(self):
if not self.solved:
raise AnatomyTemplateUnsolved(
self.template,
self.missing_keys,
self.invalid_types
)
def copy(self):
tmp = TemplateResult(
str(self),
self.template,
self.solved,
self.used_values,
self.missing_keys,
self.invalid_types
)
return self.__class__(tmp, self.rootless)
def normalized(self):
"""Convert to normalized path."""
tmp = TemplateResult(
os.path.normpath(self),
self.template,
self.solved,
self.used_values,
self.missing_keys,
self.invalid_types
)
return self.__class__(tmp, self.rootless)
class AnatomyStringTemplate(StringTemplate):
"""String template which has access to anatomy."""
def __init__(self, anatomy_templates, template):
self.anatomy_templates = anatomy_templates
super(AnatomyStringTemplate, self).__init__(template)
def format(self, data):
"""Format template and add 'root' key to data if not available.
Args:
data (dict[str, Any]): Formatting data for template.
Returns:
AnatomyTemplateResult: Formatting result.
"""
anatomy_templates = self.anatomy_templates
if not data.get("root"):
data = copy.deepcopy(data)
data["root"] = anatomy_templates.anatomy.roots
result = StringTemplate.format(self, data)
rootless_path = anatomy_templates.rootless_path_from_result(result)
return AnatomyTemplateResult(result, rootless_path)
class AnatomyTemplates(TemplatesDict):
inner_key_pattern = re.compile(r"(\{@.*?[^{}0]*\})")
inner_key_name_pattern = re.compile(r"\{@(.*?[^{}0]*)\}")
def __init__(self, anatomy):
self._log = Logger.get_logger(self.__class__.__name__)
super(AnatomyTemplates, self).__init__()
self.anatomy = anatomy
self.loaded_project = None
def reset(self):
self._raw_templates = None
self._templates = None
self._objected_templates = None
@property
def project_name(self):
return self.anatomy.project_name
@property
def roots(self):
return self.anatomy.roots
@property
def templates(self):
self._validate_discovery()
return self._templates
@property
def objected_templates(self):
self._validate_discovery()
return self._objected_templates
def _validate_discovery(self):
if self.project_name != self.loaded_project:
self.reset()
if self._templates is None:
self._discover()
self.loaded_project = self.project_name
def _format_value(self, value, data):
if isinstance(value, RootItem):
return self._solve_dict(value, data)
return super(AnatomyTemplates, self)._format_value(value, data)
@staticmethod
def _ayon_template_conversion(templates):
def _convert_template_item(template_item):
# Change 'directory' to 'folder'
if "directory" in template_item:
template_item["folder"] = template_item["directory"]
if (
"path" not in template_item
and "file" in template_item
and "folder" in template_item
):
template_item["path"] = "/".join(
(template_item["folder"], template_item["file"])
)
def _get_default_template_name(templates):
default_template = None
for name, template in templates.items():
if name == "default":
return "default"
if default_template is None:
default_template = name
return default_template
def _fill_template_category(templates, cat_templates, cat_key):
default_template_name = _get_default_template_name(cat_templates)
for template_name, cat_template in cat_templates.items():
_convert_template_item(cat_template)
if template_name == default_template_name:
templates[cat_key] = cat_template
else:
new_name = "{}_{}".format(cat_key, template_name)
templates["others"][new_name] = cat_template
others_templates = templates.pop("others", None) or {}
new_others_templates = {}
templates["others"] = new_others_templates
for name, template in others_templates.items():
_convert_template_item(template)
new_others_templates[name] = template
for key in (
"work",
"publish",
"hero",
):
cat_templates = templates.pop(key)
_fill_template_category(templates, cat_templates, key)
delivery_templates = templates.pop("delivery", None) or {}
new_delivery_templates = {}
for name, delivery_template in delivery_templates.items():
new_delivery_templates[name] = "/".join(
(delivery_template["directory"], delivery_template["file"])
)
templates["delivery"] = new_delivery_templates
def set_templates(self, templates):
if not templates:
self.reset()
return
templates = copy.deepcopy(templates)
# TODO remove AYON convertion
self._ayon_template_conversion(templates)
self._raw_templates = copy.deepcopy(templates)
v_queue = collections.deque()
v_queue.append(templates)
while v_queue:
item = v_queue.popleft()
if not isinstance(item, dict):
continue
for key in tuple(item.keys()):
value = item[key]
if isinstance(value, dict):
v_queue.append(value)
elif (
isinstance(value, six.string_types)
and "{task}" in value
):
item[key] = value.replace("{task}", "{task[name]}")
solved_templates = self.solve_template_inner_links(templates)
self._templates = solved_templates
self._objected_templates = self.create_objected_templates(
solved_templates
)
def _create_template_object(self, template):
return AnatomyStringTemplate(self, template)
def default_templates(self):
"""Return default templates data with solved inner keys."""
return self.solve_template_inner_links(
self.anatomy["templates"]
)
def _discover(self):
""" Loads anatomy templates from yaml.
Default templates are loaded if project is not set or project does
not have set it's own.
TODO: create templates if not exist.
Returns:
TemplatesResultDict: Contain templates data for current project of
default templates.
"""
if self.project_name is None:
# QUESTION create project specific if not found?
raise AssertionError((
"Project \"{0}\" does not have his own templates."
" Trying to use default."
).format(self.project_name))
self.set_templates(self.anatomy["templates"])
@classmethod
def replace_inner_keys(cls, matches, value, key_values, key):
"""Replacement of inner keys in template values."""
for match in matches:
anatomy_sub_keys = (
cls.inner_key_name_pattern.findall(match)
)
if key in anatomy_sub_keys:
raise ValueError((
"Unsolvable recursion in inner keys, "
"key: \"{}\" is in his own value."
" Can't determine source, please check Anatomy templates."
).format(key))
for anatomy_sub_key in anatomy_sub_keys:
replace_value = key_values.get(anatomy_sub_key)
if replace_value is None:
raise KeyError((
"Anatomy templates can't be filled."
" Anatomy key `{0}` has"
" invalid inner key `{1}`."
).format(key, anatomy_sub_key))
if not (
isinstance(replace_value, numbers.Number)
or isinstance(replace_value, six.string_types)
):
raise ValueError((
"Anatomy templates can't be filled."
" Anatomy key `{0}` has"
" invalid inner key `{1}`"
" with value `{2}`."
).format(key, anatomy_sub_key, str(replace_value)))
value = value.replace(match, str(replace_value))
return value
@classmethod
def prepare_inner_keys(cls, key_values):
"""Check values of inner keys.
Check if inner key exist in template group and has valid value.
It is also required to avoid infinite loop with unsolvable recursion
when first inner key's value refers to second inner key's value where
first is used.
"""
keys_to_solve = set(key_values.keys())
while True:
found = False
for key in tuple(keys_to_solve):
value = key_values[key]
if isinstance(value, six.string_types):
matches = cls.inner_key_pattern.findall(value)
if not matches:
keys_to_solve.remove(key)
continue
found = True
key_values[key] = cls.replace_inner_keys(
matches, value, key_values, key
)
continue
elif not isinstance(value, dict):
keys_to_solve.remove(key)
continue
subdict_found = False
for _key, _value in tuple(value.items()):
matches = cls.inner_key_pattern.findall(_value)
if not matches:
continue
subdict_found = True
found = True
key_values[key][_key] = cls.replace_inner_keys(
matches, _value, key_values,
"{}.{}".format(key, _key)
)
if not subdict_found:
keys_to_solve.remove(key)
if not found:
break
return key_values
@classmethod
def solve_template_inner_links(cls, templates):
"""Solve templates inner keys identified by "{@*}".
Process is split into 2 parts.
First is collecting all global keys (keys in top hierarchy where value
is not dictionary). All global keys are set for all group keys (keys
in top hierarchy where value is dictionary). Value of a key is not
overridden in group if already contain value for the key.
In second part all keys with "at" symbol in value are replaced with
value of the key afterward "at" symbol from the group.
Args:
templates (dict): Raw templates data.
Example:
templates::
key_1: "value_1",
key_2: "{@key_1}/{filling_key}"
group_1:
key_3: "value_3/{@key_2}"
group_2:
key_2": "value_2"
key_4": "value_4/{@key_2}"
output::
key_1: "value_1"
key_2: "value_1/{filling_key}"
group_1: {
key_1: "value_1"
key_2: "value_1/{filling_key}"
key_3: "value_3/value_1/{filling_key}"
group_2: {
key_1: "value_1"
key_2: "value_2"
key_4: "value_3/value_2"
"""
default_key_values = templates.pop("common", {})
for key, value in tuple(templates.items()):
if isinstance(value, dict):
continue
default_key_values[key] = templates.pop(key)
# Pop "others" key before before expected keys are processed
other_templates = templates.pop("others") or {}
keys_by_subkey = {}
for sub_key, sub_value in templates.items():
key_values = {}
key_values.update(default_key_values)
key_values.update(sub_value)
keys_by_subkey[sub_key] = cls.prepare_inner_keys(key_values)
for sub_key, sub_value in other_templates.items():
if sub_key in keys_by_subkey:
self.log.warning((
"Key \"{}\" is duplicated in others. Skipping."
).format(sub_key))
continue
key_values = {}
key_values.update(default_key_values)
key_values.update(sub_value)
keys_by_subkey[sub_key] = cls.prepare_inner_keys(key_values)
default_keys_by_subkeys = cls.prepare_inner_keys(default_key_values)
for key, value in default_keys_by_subkeys.items():
keys_by_subkey[key] = value
return keys_by_subkey
@classmethod
def _dict_to_subkeys_list(cls, subdict, pre_keys=None):
if pre_keys is None:
pre_keys = []
output = []
for key in subdict:
value = subdict[key]
result = list(pre_keys)
result.append(key)
if isinstance(value, dict):
for item in cls._dict_to_subkeys_list(value, result):
output.append(item)
else:
output.append(result)
return output
def _keys_to_dicts(self, key_list, value):
if not key_list:
return None
if len(key_list) == 1:
return {key_list[0]: value}
return {key_list[0]: self._keys_to_dicts(key_list[1:], value)}
@classmethod
def rootless_path_from_result(cls, result):
"""Calculate rootless path from formatting result.
Args:
result (TemplateResult): Result of StringTemplate formatting.
Returns:
str: Rootless path if result contains one of anatomy roots.
"""
used_values = result.used_values
missing_keys = result.missing_keys
template = result.template
invalid_types = result.invalid_types
if (
"root" not in used_values
or "root" in missing_keys
or "{root" not in template
):
return
for invalid_type in invalid_types:
if "root" in invalid_type:
return
root_keys = cls._dict_to_subkeys_list({"root": used_values["root"]})
if not root_keys:
return
output = str(result)
for used_root_keys in root_keys:
if not used_root_keys:
continue
used_value = used_values
root_key = None
for key in used_root_keys:
used_value = used_value[key]
if root_key is None:
root_key = key
else:
root_key += "[{}]".format(key)
root_key = "{" + root_key + "}"
output = output.replace(str(used_value), root_key)
return output
def format(self, data, strict=True):
copy_data = copy.deepcopy(data)
roots = self.roots
if roots:
copy_data["root"] = roots
result = super(AnatomyTemplates, self).format(copy_data)
result.strict = strict
return result
def format_all(self, in_data, only_keys=True):
""" Solves templates based on entered data.
Args:
data (dict): Containing keys to be filled into template.
Returns:
TemplatesResultDict: Output `TemplateResult` have `strict`
attribute set to False so accessing unfilled keys in templates
won't raise any exceptions.
"""
return self.format(in_data, strict=False)