mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 21:04:40 +01:00
344 lines
10 KiB
Python
344 lines
10 KiB
Python
import os
|
|
import re
|
|
import logging
|
|
import itertools
|
|
import contextlib
|
|
import subprocess
|
|
|
|
import avalon.api
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def get_paths_from_environ(env_key, return_first=False):
|
|
"""Return existing paths from specific envirnment variable.
|
|
|
|
:param env_key: Environment key where should look for paths.
|
|
:type env_key: str
|
|
:param return_first: Return first path on `True`, list of all on `False`.
|
|
:type return_first: boolean
|
|
|
|
Difference when none of paths exists:
|
|
- when `return_first` is set to `False` then function returns empty list.
|
|
- when `return_first` is set to `True` then function returns `None`.
|
|
"""
|
|
|
|
existing_paths = []
|
|
paths = os.environ.get(env_key) or ""
|
|
path_items = paths.split(os.pathsep)
|
|
for path in path_items:
|
|
# Skip empty string
|
|
if not path:
|
|
continue
|
|
# Normalize path
|
|
path = os.path.normpath(path)
|
|
# Check if path exists
|
|
if os.path.exists(path):
|
|
# Return path if `return_first` is set to True
|
|
if return_first:
|
|
return path
|
|
# Store path
|
|
existing_paths.append(path)
|
|
|
|
# Return None if none of paths exists
|
|
if return_first:
|
|
return None
|
|
# Return all existing paths from environment variable
|
|
return existing_paths
|
|
|
|
|
|
def get_ffmpeg_tool_path(tool="ffmpeg"):
|
|
"""Find path to ffmpeg tool in FFMPEG_PATH paths.
|
|
|
|
Function looks for tool in paths set in FFMPEG_PATH environment. If tool
|
|
exists then returns it's full path.
|
|
|
|
Returns tool name itself when tool path was not found. (FFmpeg path may be
|
|
set in PATH environment variable)
|
|
"""
|
|
|
|
dir_paths = get_paths_from_environ("FFMPEG_PATH")
|
|
for dir_path in dir_paths:
|
|
for file_name in os.listdir(dir_path):
|
|
base, ext = os.path.splitext(file_name)
|
|
if base.lower() == tool.lower():
|
|
return os.path.join(dir_path, tool)
|
|
return tool
|
|
|
|
|
|
# Special naming case for subprocess since its a built-in method.
|
|
def _subprocess(*args, **kwargs):
|
|
"""Convenience method for getting output errors for subprocess.
|
|
|
|
Entered arguments and keyword arguments are passed to subprocess Popen.
|
|
|
|
Args:
|
|
*args: Variable length arument list passed to Popen.
|
|
**kwargs : Arbitary keyword arguments passed to Popen. Is possible to
|
|
pass `logging.Logger` object under "logger" if want to use
|
|
different than lib's logger.
|
|
|
|
Returns:
|
|
str: Full output of subprocess concatenated stdout and stderr.
|
|
|
|
Raises:
|
|
RuntimeError: Exception is raised if process finished with nonzero
|
|
return code.
|
|
"""
|
|
|
|
# Get environents from kwarg or use current process environments if were
|
|
# not passed.
|
|
env = kwargs.get("env") or os.environ
|
|
# Make sure environment contains only strings
|
|
filtered_env = {k: str(v) for k, v in env.items()}
|
|
|
|
# Use lib's logger if was not passed with kwargs.
|
|
logger = kwargs.pop("logger", log)
|
|
|
|
# set overrides
|
|
kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE)
|
|
kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE)
|
|
kwargs['stdin'] = kwargs.get('stdin', subprocess.PIPE)
|
|
kwargs['env'] = filtered_env
|
|
|
|
proc = subprocess.Popen(*args, **kwargs)
|
|
|
|
full_output = ""
|
|
_stdout, _stderr = proc.communicate()
|
|
if _stdout:
|
|
_stdout = _stdout.decode("utf-8")
|
|
full_output += _stdout
|
|
logger.debug(_stdout)
|
|
|
|
if _stderr:
|
|
_stderr = _stderr.decode("utf-8")
|
|
# Add additional line break if output already containt stdout
|
|
if full_output:
|
|
full_output += "\n"
|
|
full_output += _stderr
|
|
logger.warning(_stderr)
|
|
|
|
if proc.returncode != 0:
|
|
exc_msg = "Executing arguments was not successful: \"{}\"".format(args)
|
|
if _stdout:
|
|
exc_msg += "\n\nOutput:\n{}".format(_stdout)
|
|
|
|
if _stderr:
|
|
exc_msg += "Error:\n{}".format(_stderr)
|
|
|
|
raise RuntimeError(exc_msg)
|
|
|
|
return full_output
|
|
|
|
|
|
def add_tool_to_environment(tools):
|
|
"""
|
|
It is adding dynamic environment to os environment.
|
|
|
|
Args:
|
|
tool (list, tuple): list of tools, name should corespond to json/toml
|
|
|
|
Returns:
|
|
os.environ[KEY]: adding to os.environ
|
|
"""
|
|
|
|
import acre
|
|
tools_env = acre.get_tools(tools)
|
|
env = acre.compute(tools_env)
|
|
env = acre.merge(env, current_env=dict(os.environ))
|
|
os.environ.update(env)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def modified_environ(*remove, **update):
|
|
"""
|
|
Temporarily updates the ``os.environ`` dictionary in-place.
|
|
|
|
The ``os.environ`` dictionary is updated in-place so that the modification
|
|
is sure to work in all situations.
|
|
|
|
:param remove: Environment variables to remove.
|
|
:param update: Dictionary of environment variables
|
|
and values to add/update.
|
|
"""
|
|
env = os.environ
|
|
update = update or {}
|
|
remove = remove or []
|
|
|
|
# List of environment variables being updated or removed.
|
|
stomped = (set(update.keys()) | set(remove)) & set(env.keys())
|
|
# Environment variables and values to restore on exit.
|
|
update_after = {k: env[k] for k in stomped}
|
|
# Environment variables and values to remove on exit.
|
|
remove_after = frozenset(k for k in update if k not in env)
|
|
|
|
try:
|
|
env.update(update)
|
|
[env.pop(k, None) for k in remove]
|
|
yield
|
|
finally:
|
|
env.update(update_after)
|
|
[env.pop(k) for k in remove_after]
|
|
|
|
|
|
def pairwise(iterable):
|
|
"""s -> (s0,s1), (s2,s3), (s4, s5), ..."""
|
|
a = iter(iterable)
|
|
return itertools.izip(a, a)
|
|
|
|
|
|
def grouper(iterable, n, fillvalue=None):
|
|
"""Collect data into fixed-length chunks or blocks
|
|
|
|
Examples:
|
|
grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
|
|
|
|
"""
|
|
|
|
args = [iter(iterable)] * n
|
|
return itertools.izip_longest(fillvalue=fillvalue, *args)
|
|
|
|
|
|
def _rreplace(s, a, b, n=1):
|
|
"""Replace a with b in string s from right side n times"""
|
|
return b.join(s.rsplit(a, n))
|
|
|
|
|
|
def version_up(filepath):
|
|
"""Version up filepath to a new non-existing version.
|
|
|
|
Parses for a version identifier like `_v001` or `.v001`
|
|
When no version present _v001 is appended as suffix.
|
|
|
|
Returns:
|
|
str: filepath with increased version number
|
|
|
|
"""
|
|
|
|
dirname = os.path.dirname(filepath)
|
|
basename, ext = os.path.splitext(os.path.basename(filepath))
|
|
|
|
regex = r"[._]v\d+"
|
|
matches = re.findall(regex, str(basename), re.IGNORECASE)
|
|
if not matches:
|
|
log.info("Creating version...")
|
|
new_label = "_v{version:03d}".format(version=1)
|
|
new_basename = "{}{}".format(basename, new_label)
|
|
else:
|
|
label = matches[-1]
|
|
version = re.search(r"\d+", label).group()
|
|
padding = len(version)
|
|
|
|
new_version = int(version) + 1
|
|
new_version = '{version:0{padding}d}'.format(version=new_version,
|
|
padding=padding)
|
|
new_label = label.replace(version, new_version, 1)
|
|
new_basename = _rreplace(basename, label, new_label)
|
|
|
|
if not new_basename.endswith(new_label):
|
|
index = (new_basename.find(new_label))
|
|
index += len(new_label)
|
|
new_basename = new_basename[:index]
|
|
|
|
new_filename = "{}{}".format(new_basename, ext)
|
|
new_filename = os.path.join(dirname, new_filename)
|
|
new_filename = os.path.normpath(new_filename)
|
|
|
|
if new_filename == filepath:
|
|
raise RuntimeError("Created path is the same as current file,"
|
|
"this is a bug")
|
|
|
|
for file in os.listdir(dirname):
|
|
if file.endswith(ext) and file.startswith(new_basename):
|
|
log.info("Skipping existing version %s" % new_label)
|
|
return version_up(new_filename)
|
|
|
|
log.info("New version %s" % new_label)
|
|
return new_filename
|
|
|
|
|
|
def _get_host_name():
|
|
|
|
_host = avalon.api.registered_host()
|
|
# This covers nested module name like avalon.maya
|
|
return _host.__name__.rsplit(".", 1)[-1]
|
|
|
|
|
|
def get_version_from_path(file):
|
|
"""
|
|
Finds version number in file path string
|
|
|
|
Args:
|
|
file (string): file path
|
|
|
|
Returns:
|
|
v: version number in string ('001')
|
|
|
|
"""
|
|
pattern = re.compile(r"[\._]v([0-9]+)", re.IGNORECASE)
|
|
try:
|
|
return pattern.findall(file)[0]
|
|
except IndexError:
|
|
log.error(
|
|
"templates:get_version_from_workfile:"
|
|
"`{}` missing version string."
|
|
"Example `v004`".format(file)
|
|
)
|
|
|
|
|
|
def get_last_version_from_path(path_dir, filter):
|
|
"""
|
|
Finds last version of given directory content
|
|
|
|
Args:
|
|
path_dir (string): directory path
|
|
filter (list): list of strings used as file name filter
|
|
|
|
Returns:
|
|
string: file name with last version
|
|
|
|
Example:
|
|
last_version_file = get_last_version_from_path(
|
|
"/project/shots/shot01/work", ["shot01", "compositing", "nk"])
|
|
"""
|
|
|
|
assert os.path.isdir(path_dir), "`path_dir` argument needs to be directory"
|
|
assert isinstance(filter, list) and (
|
|
len(filter) != 0), "`filter` argument needs to be list and not empty"
|
|
|
|
filtred_files = list()
|
|
|
|
# form regex for filtering
|
|
patern = r".*".join(filter)
|
|
|
|
for f in os.listdir(path_dir):
|
|
if not re.findall(patern, f):
|
|
continue
|
|
filtred_files.append(f)
|
|
|
|
if filtred_files:
|
|
sorted(filtred_files)
|
|
return filtred_files[-1]
|
|
else:
|
|
return None
|
|
|
|
|
|
def source_hash(filepath, *args):
|
|
"""Generate simple identifier for a source file.
|
|
This is used to identify whether a source file has previously been
|
|
processe into the pipeline, e.g. a texture.
|
|
The hash is based on source filepath, modification time and file size.
|
|
This is only used to identify whether a specific source file was already
|
|
published before from the same location with the same modification date.
|
|
We opt to do it this way as opposed to Avalanch C4 hash as this is much
|
|
faster and predictable enough for all our production use cases.
|
|
Args:
|
|
filepath (str): The source file path.
|
|
You can specify additional arguments in the function
|
|
to allow for specific 'processing' values to be included.
|
|
"""
|
|
# We replace dots with comma because . cannot be a key in a pymongo dict.
|
|
file_name = os.path.basename(filepath)
|
|
time = str(os.path.getmtime(filepath))
|
|
size = str(os.path.getsize(filepath))
|
|
return "|".join([file_name, time, size] + list(args)).replace(".", ",")
|