From 74443c92e71c3e79cbe7dcc61528703b0ed8faad Mon Sep 17 00:00:00 2001 From: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> Date: Tue, 11 Feb 2025 15:37:00 +0100 Subject: [PATCH] small tweaks --- client/ayon_core/cli.py | 11 +-- client/ayon_core/lib/env_tools.py | 151 +++++++++++++++--------------- 2 files changed, 81 insertions(+), 81 deletions(-) diff --git a/client/ayon_core/cli.py b/client/ayon_core/cli.py index 1287534bbf..d7cd3ba7f5 100644 --- a/client/ayon_core/cli.py +++ b/client/ayon_core/cli.py @@ -239,15 +239,12 @@ def version(build): def _set_global_environments() -> None: """Set global AYON environments.""" - general_env = get_general_environments() + # First resolve general environment + general_env = parse_env_variables_structure(get_general_environments()) - # first resolve general environment because merge doesn't expect - # values to be list. - # TODO: switch to AYON environment functions + # Merge environments with current environments and update values merged_env = merge_env_variables( - compute_env_variables_structure( - parse_env_variables_structure(general_env) - ), + compute_env_variables_structure(general_env), dict(os.environ) ) env = compute_env_variables_structure(merged_env) diff --git a/client/ayon_core/lib/env_tools.py b/client/ayon_core/lib/env_tools.py index 6ed67d7270..c71350869e 100644 --- a/client/ayon_core/lib/env_tools.py +++ b/client/ayon_core/lib/env_tools.py @@ -13,11 +13,6 @@ if typing.TYPE_CHECKING: PlatformName = Literal["windows", "linux", "darwin"] EnvValue = Union[str, list[str], dict[str, str], dict[str, list[str]]] -Results = collections.namedtuple( - "Results", - ["sorted", "cyclic"] -) - class CycleError(ValueError): """Raised when a cycle is detected in dynamic env variables compute.""" @@ -124,7 +119,8 @@ def parse_env_variables_structure( dict: The flattened environment for a platform. """ - platform_name = platform_name or platform.system().lower() + if platform_name is None: + platform_name = platform.system().lower() result = {} for variable, value in env.items(): @@ -147,72 +143,94 @@ def parse_env_variables_structure( return result -def _topological_sort(dependency_pairs): - """Sort values subject to dependency constraints""" +def _topological_sort( + dependencies: dict[str, set[str]] +) -> tuple[list[str], list[str]]: + """Sort values subject to dependency constraints. + + Args: + dependencies (dict[str, set[str]): Mapping of environment variable + keys to a set of keys they depend on. + + Returns: + tuple[list[str], list[str]]: A tuple of two lists. The first list + contains the ordered keys in which order should be environment + keys filled, the second list contains the keys that would cause + cyclic fill of values. + + """ num_heads = collections.defaultdict(int) # num arrows pointing in tails = collections.defaultdict(list) # list of arrows going out heads = [] # unique list of heads in order first seen - for h, t in dependency_pairs: - num_heads[t] += 1 - if h in tails: - tails[h].append(t) - else: - tails[h] = [t] - heads.append(h) + for head, tail_values in dependencies.items(): + for tail_value in tail_values: + num_heads[tail_value] += 1 + if head not in tails: + heads.append(head) + tails[head].append(tail_value) - ordered = [h for h in heads if h not in num_heads] - for h in ordered: - for t in tails[h]: - num_heads[t] -= 1 - if not num_heads[t]: - ordered.append(t) - cyclic = [n for n, heads in num_heads.items() if heads] - return Results(ordered, cyclic) + ordered = [head for head in heads if head not in num_heads] + for head in ordered: + for tail in tails[head]: + num_heads[tail] -= 1 + if not num_heads[tail]: + ordered.append(tail) + cyclic = [tail for tail, heads in num_heads.items() if heads] + return ordered, cyclic + + +class _PartialFormatDict(dict): + """This supports partial formatting. + + Missing keys are replaced with the return value of __missing__. + + """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._missing_template: str = "{{{key}}}" + + def set_missing_template(self, template: str): + self._missing_template = template + + def __missing__(self, key: str) -> str: + return self._missing_template.format(key=key) def _partial_format( - s: str, + value: str, data: dict[str, str], - missing: Optional[str] = None, + missing_template: Optional[str] = None, ) -> str: """Return string `s` formatted by `data` allowing a partial format Arguments: - s (str): The string that will be formatted + value (str): The string that will be formatted data (dict): The dictionary used to format with. + missing_template (Optional[str]): The template to use when a key is + missing from the data. If `None`, the key will remain unformatted. Example: >>> _partial_format("{d} {a} {b} {c} {d}", {'b': "and", 'd': "left"}) 'left {a} and {c} left' + """ - if missing is None: - missing = "{{{key}}}" - - class FormatDict(dict): - """This supports partial formatting. - - Missing keys are replaced with the return value of __missing__. - - """ - - def __missing__(self, key): - return missing.format(key=key) + mapping = _PartialFormatDict(**data) + if missing_template is not None: + mapping.set_missing_template(missing_template) formatter = Formatter() - mapping = FormatDict(**data) try: - f = formatter.vformat(s, (), mapping) + output = formatter.vformat(value, (), mapping) except Exception: r_token = re.compile(r"({.*?})") - matches = re.findall(r_token, s) - f = s - for m in matches: + output = value + for match in re.findall(r_token, value): try: - f = re.sub(m, m.format(**data), f) - except (KeyError, ValueError): + output = re.sub(match, match.format(**data), output) + except (KeyError, ValueError, IndexError): continue - return f + return output def compute_env_variables_structure( @@ -230,28 +248,22 @@ def compute_env_variables_structure( env = env.copy() # Collect dependencies - dependencies = [] + dependencies = collections.defaultdict(set) for key, value in env.items(): - try: - dependent_keys = re.findall("{(.+?)}", value) - for dependency in dependent_keys: - # Ignore direct references to itself because - # we don't format with itself anyway - if dependency == key: - continue + dependent_keys = re.findall("{(.+?)}", value) + for dependent_key in dependent_keys: + # Ignore reference to itself or key is not in env + if dependent_key != key and dependent_key in env: + dependencies[key].add(dependent_key) - dependencies.append((key, dependency)) - except Exception: - dependencies.append((key, value)) - - result = _topological_sort(dependencies) + ordered, cyclic = _topological_sort(dependencies) # Check cycle - if result.cyclic: - raise CycleError(f"A cycle is detected on: {result.cyclic}") + if cyclic: + raise CycleError(f"A cycle is detected on: {cyclic}") # Format dynamic values - for key in reversed(result.sorted): + for key in reversed(ordered): if key in env: if not isinstance(env[key], str): continue @@ -259,15 +271,6 @@ def compute_env_variables_structure( data.pop(key) # format without itself env[key] = _partial_format(env[key], data=data) - # Format cyclic values - for key in result.cyclic: - if key in env: - if not isinstance(env[key], str): - continue - data = env.copy() - data.pop(key) # format without itself - env[key] = _partial_format(env[key], data=data) - # Format dynamic keys if fill_dynamic_keys: formatted = {} @@ -291,7 +294,7 @@ def compute_env_variables_structure( def merge_env_variables( src_env: dict[str, str], dst_env: dict[str, str], - missing: Optional[str] = None, + missing_template: Optional[str] = None, ): """Merge the tools environment with the 'current_env'. @@ -304,7 +307,7 @@ def merge_env_variables( src_env (dict): The dynamic environment dst_env (dict): The target environment variables mapping to merge the dynamic environment into. - missing (str): Argument passed to '_partial_format' during merging. + missing_template (str): Argument passed to '_partial_format' during merging. `None` should keep missing keys unchanged. Returns: @@ -314,7 +317,7 @@ def merge_env_variables( result = dst_env.copy() for key, value in src_env.items(): result[key] = _partial_format( - str(value), data=dst_env, missing=missing + str(value), dst_env, missing_template ) return result