ayon-core/pype/lib/lib_old.py
2020-11-10 11:54:30 +01:00

381 lines
11 KiB
Python

import os
import re
import json
import collections
import logging
import itertools
import contextlib
import subprocess
from avalon import io, pipeline
import avalon.api
from ..api import config
log = logging.getLogger(__name__)
def get_paths_from_environ(env_key, return_first=False):
"""Return existing paths from specific envirnment variable.
:param env_key: Environment key where should look for paths.
:type env_key: str
:param return_first: Return first path on `True`, list of all on `False`.
:type return_first: boolean
Difference when none of paths exists:
- when `return_first` is set to `False` then function returns empty list.
- when `return_first` is set to `True` then function returns `None`.
"""
existing_paths = []
paths = os.environ.get(env_key) or ""
path_items = paths.split(os.pathsep)
for path in path_items:
# Skip empty string
if not path:
continue
# Normalize path
path = os.path.normpath(path)
# Check if path exists
if os.path.exists(path):
# Return path if `return_first` is set to True
if return_first:
return path
# Store path
existing_paths.append(path)
# Return None if none of paths exists
if return_first:
return None
# Return all existing paths from environment variable
return existing_paths
def get_ffmpeg_tool_path(tool="ffmpeg"):
"""Find path to ffmpeg tool in FFMPEG_PATH paths.
Function looks for tool in paths set in FFMPEG_PATH environment. If tool
exists then returns it's full path.
Returns tool name itself when tool path was not found. (FFmpeg path may be
set in PATH environment variable)
"""
dir_paths = get_paths_from_environ("FFMPEG_PATH")
for dir_path in dir_paths:
for file_name in os.listdir(dir_path):
base, ext = os.path.splitext(file_name)
if base.lower() == tool.lower():
return os.path.join(dir_path, tool)
return tool
# Special naming case for subprocess since its a built-in method.
def _subprocess(*args, **kwargs):
"""Convenience method for getting output errors for subprocess.
Entered arguments and keyword arguments are passed to subprocess Popen.
Args:
*args: Variable length arument list passed to Popen.
**kwargs : Arbitary keyword arguments passed to Popen. Is possible to
pass `logging.Logger` object under "logger" if want to use
different than lib's logger.
Returns:
str: Full output of subprocess concatenated stdout and stderr.
Raises:
RuntimeError: Exception is raised if process finished with nonzero
return code.
"""
# Get environents from kwarg or use current process environments if were
# not passed.
env = kwargs.get("env") or os.environ
# Make sure environment contains only strings
filtered_env = {k: str(v) for k, v in env.items()}
# Use lib's logger if was not passed with kwargs.
logger = kwargs.pop("logger", log)
# set overrides
kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE)
kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE)
kwargs['stdin'] = kwargs.get('stdin', subprocess.PIPE)
kwargs['env'] = filtered_env
proc = subprocess.Popen(*args, **kwargs)
full_output = ""
_stdout, _stderr = proc.communicate()
if _stdout:
_stdout = _stdout.decode("utf-8")
full_output += _stdout
logger.debug(_stdout)
if _stderr:
_stderr = _stderr.decode("utf-8")
# Add additional line break if output already containt stdout
if full_output:
full_output += "\n"
full_output += _stderr
logger.warning(_stderr)
if proc.returncode != 0:
exc_msg = "Executing arguments was not successful: \"{}\"".format(args)
if _stdout:
exc_msg += "\n\nOutput:\n{}".format(_stdout)
if _stderr:
exc_msg += "Error:\n{}".format(_stderr)
raise RuntimeError(exc_msg)
return full_output
def add_tool_to_environment(tools):
"""
It is adding dynamic environment to os environment.
Args:
tool (list, tuple): list of tools, name should corespond to json/toml
Returns:
os.environ[KEY]: adding to os.environ
"""
import acre
tools_env = acre.get_tools(tools)
env = acre.compute(tools_env)
env = acre.merge(env, current_env=dict(os.environ))
os.environ.update(env)
@contextlib.contextmanager
def modified_environ(*remove, **update):
"""
Temporarily updates the ``os.environ`` dictionary in-place.
The ``os.environ`` dictionary is updated in-place so that the modification
is sure to work in all situations.
:param remove: Environment variables to remove.
:param update: Dictionary of environment variables
and values to add/update.
"""
env = os.environ
update = update or {}
remove = remove or []
# List of environment variables being updated or removed.
stomped = (set(update.keys()) | set(remove)) & set(env.keys())
# Environment variables and values to restore on exit.
update_after = {k: env[k] for k in stomped}
# Environment variables and values to remove on exit.
remove_after = frozenset(k for k in update if k not in env)
try:
env.update(update)
[env.pop(k, None) for k in remove]
yield
finally:
env.update(update_after)
[env.pop(k) for k in remove_after]
def pairwise(iterable):
"""s -> (s0,s1), (s2,s3), (s4, s5), ..."""
a = iter(iterable)
return itertools.izip(a, a)
def grouper(iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks
Examples:
grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
"""
args = [iter(iterable)] * n
return itertools.izip_longest(fillvalue=fillvalue, *args)
def _rreplace(s, a, b, n=1):
"""Replace a with b in string s from right side n times"""
return b.join(s.rsplit(a, n))
def version_up(filepath):
"""Version up filepath to a new non-existing version.
Parses for a version identifier like `_v001` or `.v001`
When no version present _v001 is appended as suffix.
Returns:
str: filepath with increased version number
"""
dirname = os.path.dirname(filepath)
basename, ext = os.path.splitext(os.path.basename(filepath))
regex = r"[._]v\d+"
matches = re.findall(regex, str(basename), re.IGNORECASE)
if not matches:
log.info("Creating version...")
new_label = "_v{version:03d}".format(version=1)
new_basename = "{}{}".format(basename, new_label)
else:
label = matches[-1]
version = re.search(r"\d+", label).group()
padding = len(version)
new_version = int(version) + 1
new_version = '{version:0{padding}d}'.format(version=new_version,
padding=padding)
new_label = label.replace(version, new_version, 1)
new_basename = _rreplace(basename, label, new_label)
if not new_basename.endswith(new_label):
index = (new_basename.find(new_label))
index += len(new_label)
new_basename = new_basename[:index]
new_filename = "{}{}".format(new_basename, ext)
new_filename = os.path.join(dirname, new_filename)
new_filename = os.path.normpath(new_filename)
if new_filename == filepath:
raise RuntimeError("Created path is the same as current file,"
"this is a bug")
for file in os.listdir(dirname):
if file.endswith(ext) and file.startswith(new_basename):
log.info("Skipping existing version %s" % new_label)
return version_up(new_filename)
log.info("New version %s" % new_label)
return new_filename
def _get_host_name():
_host = avalon.api.registered_host()
# This covers nested module name like avalon.maya
return _host.__name__.rsplit(".", 1)[-1]
def get_version_from_path(file):
"""
Finds version number in file path string
Args:
file (string): file path
Returns:
v: version number in string ('001')
"""
pattern = re.compile(r"[\._]v([0-9]+)", re.IGNORECASE)
try:
return pattern.findall(file)[0]
except IndexError:
log.error(
"templates:get_version_from_workfile:"
"`{}` missing version string."
"Example `v004`".format(file)
)
def get_last_version_from_path(path_dir, filter):
"""
Finds last version of given directory content
Args:
path_dir (string): directory path
filter (list): list of strings used as file name filter
Returns:
string: file name with last version
Example:
last_version_file = get_last_version_from_path(
"/project/shots/shot01/work", ["shot01", "compositing", "nk"])
"""
assert os.path.isdir(path_dir), "`path_dir` argument needs to be directory"
assert isinstance(filter, list) and (
len(filter) != 0), "`filter` argument needs to be list and not empty"
filtred_files = list()
# form regex for filtering
patern = r".*".join(filter)
for f in os.listdir(path_dir):
if not re.findall(patern, f):
continue
filtred_files.append(f)
if filtred_files:
sorted(filtred_files)
return filtred_files[-1]
else:
return None
def ffprobe_streams(path_to_file, logger=None):
"""Load streams from entered filepath via ffprobe."""
if not logger:
logger = log
logger.info(
"Getting information about input \"{}\".".format(path_to_file)
)
args = [
"\"{}\"".format(get_ffmpeg_tool_path("ffprobe")),
"-v quiet",
"-print_format json",
"-show_format",
"-show_streams",
"\"{}\"".format(path_to_file)
]
command = " ".join(args)
logger.debug("FFprobe command: \"{}\"".format(command))
popen = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
popen_stdout, popen_stderr = popen.communicate()
if popen_stdout:
logger.debug("ffprobe stdout: {}".format(popen_stdout))
if popen_stderr:
logger.debug("ffprobe stderr: {}".format(popen_stderr))
return json.loads(popen_stdout)["streams"]
def source_hash(filepath, *args):
"""Generate simple identifier for a source file.
This is used to identify whether a source file has previously been
processe into the pipeline, e.g. a texture.
The hash is based on source filepath, modification time and file size.
This is only used to identify whether a specific source file was already
published before from the same location with the same modification date.
We opt to do it this way as opposed to Avalanch C4 hash as this is much
faster and predictable enough for all our production use cases.
Args:
filepath (str): The source file path.
You can specify additional arguments in the function
to allow for specific 'processing' values to be included.
"""
# We replace dots with comma because . cannot be a key in a pymongo dict.
file_name = os.path.basename(filepath)
time = str(os.path.getmtime(filepath))
size = str(os.path.getsize(filepath))
return "|".join([file_name, time, size] + list(args)).replace(".", ",")