mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 21:04:40 +01:00
move version and path related functions to new lib
This commit is contained in:
parent
dca965b691
commit
63b32be427
4 changed files with 201 additions and 214 deletions
|
|
@ -43,7 +43,6 @@ from .lib import (
|
|||
get_subsets,
|
||||
get_version_from_path,
|
||||
get_last_version_from_path,
|
||||
add_tool_to_environment,
|
||||
source_hash,
|
||||
get_latest_version
|
||||
)
|
||||
|
|
|
|||
|
|
@ -16,20 +16,21 @@ from .applications import (
|
|||
|
||||
from .plugin_tools import filter_pyblish_plugins
|
||||
|
||||
from .lib_old import (
|
||||
_subprocess,
|
||||
get_paths_from_environ,
|
||||
get_ffmpeg_tool_path,
|
||||
get_hierarchy,
|
||||
add_tool_to_environment,
|
||||
is_latest,
|
||||
any_outdated,
|
||||
_rreplace,
|
||||
from .path_tools import (
|
||||
version_up,
|
||||
switch_item,
|
||||
get_asset,
|
||||
get_version_from_path,
|
||||
get_last_version_from_path,
|
||||
get_paths_from_environ,
|
||||
get_ffmpeg_tool_path
|
||||
)
|
||||
|
||||
from .lib_old import (
|
||||
_subprocess,
|
||||
get_hierarchy,
|
||||
is_latest,
|
||||
any_outdated,
|
||||
switch_item,
|
||||
get_asset,
|
||||
get_subsets,
|
||||
get_linked_assets,
|
||||
BuildWorkfile,
|
||||
|
|
@ -49,5 +50,11 @@ __all__ = [
|
|||
"launch_application",
|
||||
"ApplicationAction",
|
||||
|
||||
"filter_pyblish_plugins"
|
||||
"filter_pyblish_plugins",
|
||||
|
||||
"version_up",
|
||||
"get_version_from_path",
|
||||
"get_last_version_from_path",
|
||||
"get_paths_from_environ",
|
||||
"get_ffmpeg_tool_path"
|
||||
]
|
||||
|
|
|
|||
|
|
@ -14,62 +14,6 @@ from ..api import config, Anatomy, Logger
|
|||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_paths_from_environ(env_key, return_first=False):
|
||||
"""Return existing paths from specific envirnment variable.
|
||||
|
||||
:param env_key: Environment key where should look for paths.
|
||||
:type env_key: str
|
||||
:param return_first: Return first path on `True`, list of all on `False`.
|
||||
:type return_first: boolean
|
||||
|
||||
Difference when none of paths exists:
|
||||
- when `return_first` is set to `False` then function returns empty list.
|
||||
- when `return_first` is set to `True` then function returns `None`.
|
||||
"""
|
||||
|
||||
existing_paths = []
|
||||
paths = os.environ.get(env_key) or ""
|
||||
path_items = paths.split(os.pathsep)
|
||||
for path in path_items:
|
||||
# Skip empty string
|
||||
if not path:
|
||||
continue
|
||||
# Normalize path
|
||||
path = os.path.normpath(path)
|
||||
# Check if path exists
|
||||
if os.path.exists(path):
|
||||
# Return path if `return_first` is set to True
|
||||
if return_first:
|
||||
return path
|
||||
# Store path
|
||||
existing_paths.append(path)
|
||||
|
||||
# Return None if none of paths exists
|
||||
if return_first:
|
||||
return None
|
||||
# Return all existing paths from environment variable
|
||||
return existing_paths
|
||||
|
||||
|
||||
def get_ffmpeg_tool_path(tool="ffmpeg"):
|
||||
"""Find path to ffmpeg tool in FFMPEG_PATH paths.
|
||||
|
||||
Function looks for tool in paths set in FFMPEG_PATH environment. If tool
|
||||
exists then returns it's full path.
|
||||
|
||||
Returns tool name itself when tool path was not found. (FFmpeg path may be
|
||||
set in PATH environment variable)
|
||||
"""
|
||||
|
||||
dir_paths = get_paths_from_environ("FFMPEG_PATH")
|
||||
for dir_path in dir_paths:
|
||||
for file_name in os.listdir(dir_path):
|
||||
base, ext = os.path.splitext(file_name)
|
||||
if base.lower() == tool.lower():
|
||||
return os.path.join(dir_path, tool)
|
||||
return tool
|
||||
|
||||
|
||||
# Special naming case for subprocess since its a built-in method.
|
||||
def _subprocess(*args, **kwargs):
|
||||
"""Convenience method for getting output errors for subprocess.
|
||||
|
|
@ -135,9 +79,11 @@ def _subprocess(*args, **kwargs):
|
|||
return full_output
|
||||
|
||||
|
||||
# Avalon databse functions
|
||||
|
||||
|
||||
def get_hierarchy(asset_name=None):
|
||||
"""
|
||||
Obtain asset hierarchy path string from mongo db
|
||||
"""Obtain asset hierarchy path string from mongo db.
|
||||
|
||||
Returns:
|
||||
string: asset hierarchy path
|
||||
|
|
@ -179,26 +125,8 @@ def get_hierarchy(asset_name=None):
|
|||
return "/".join(hierarchy_items)
|
||||
|
||||
|
||||
def add_tool_to_environment(tools):
|
||||
"""
|
||||
It is adding dynamic environment to os environment.
|
||||
|
||||
Args:
|
||||
tool (list, tuple): list of tools, name should corespond to json/toml
|
||||
|
||||
Returns:
|
||||
os.environ[KEY]: adding to os.environ
|
||||
"""
|
||||
|
||||
import acre
|
||||
tools_env = acre.get_tools(tools)
|
||||
env = acre.compute(tools_env)
|
||||
env = acre.merge(env, current_env=dict(os.environ))
|
||||
os.environ.update(env)
|
||||
|
||||
|
||||
def is_latest(representation):
|
||||
"""Return whether the representation is from latest version
|
||||
"""Return whether the representation is from latest version.
|
||||
|
||||
Args:
|
||||
representation (dict): The representation document from the database.
|
||||
|
|
@ -207,7 +135,6 @@ def is_latest(representation):
|
|||
bool: Whether the representation is of latest version.
|
||||
|
||||
"""
|
||||
|
||||
version = io.find_one({"_id": representation['parent']})
|
||||
if version["type"] == "master_version":
|
||||
return True
|
||||
|
|
@ -225,8 +152,7 @@ def is_latest(representation):
|
|||
|
||||
|
||||
def any_outdated():
|
||||
"""Return whether the current scene has any outdated content"""
|
||||
|
||||
"""Return whether the current scene has any outdated content."""
|
||||
checked = set()
|
||||
host = avalon.api.registered_host()
|
||||
for container in host.ls():
|
||||
|
|
@ -243,73 +169,15 @@ def any_outdated():
|
|||
)
|
||||
if representation_doc and not is_latest(representation_doc):
|
||||
return True
|
||||
elif not representation_doc:
|
||||
log.debug("Container '{objectName}' has an invalid "
|
||||
"representation, it is missing in the "
|
||||
"database".format(**container))
|
||||
|
||||
log.debug("Container '{objectName}' has an invalid "
|
||||
"representation, it is missing in the "
|
||||
"database".format(**container))
|
||||
|
||||
checked.add(representation)
|
||||
return False
|
||||
|
||||
|
||||
def _rreplace(s, a, b, n=1):
|
||||
"""Replace a with b in string s from right side n times"""
|
||||
return b.join(s.rsplit(a, n))
|
||||
|
||||
|
||||
def version_up(filepath):
|
||||
"""Version up filepath to a new non-existing version.
|
||||
|
||||
Parses for a version identifier like `_v001` or `.v001`
|
||||
When no version present _v001 is appended as suffix.
|
||||
|
||||
Returns:
|
||||
str: filepath with increased version number
|
||||
|
||||
"""
|
||||
|
||||
dirname = os.path.dirname(filepath)
|
||||
basename, ext = os.path.splitext(os.path.basename(filepath))
|
||||
|
||||
regex = r"[._]v\d+"
|
||||
matches = re.findall(regex, str(basename), re.IGNORECASE)
|
||||
if not matches:
|
||||
log.info("Creating version...")
|
||||
new_label = "_v{version:03d}".format(version=1)
|
||||
new_basename = "{}{}".format(basename, new_label)
|
||||
else:
|
||||
label = matches[-1]
|
||||
version = re.search(r"\d+", label).group()
|
||||
padding = len(version)
|
||||
|
||||
new_version = int(version) + 1
|
||||
new_version = '{version:0{padding}d}'.format(version=new_version,
|
||||
padding=padding)
|
||||
new_label = label.replace(version, new_version, 1)
|
||||
new_basename = _rreplace(basename, label, new_label)
|
||||
|
||||
if not new_basename.endswith(new_label):
|
||||
index = (new_basename.find(new_label))
|
||||
index += len(new_label)
|
||||
new_basename = new_basename[:index]
|
||||
|
||||
new_filename = "{}{}".format(new_basename, ext)
|
||||
new_filename = os.path.join(dirname, new_filename)
|
||||
new_filename = os.path.normpath(new_filename)
|
||||
|
||||
if new_filename == filepath:
|
||||
raise RuntimeError("Created path is the same as current file,"
|
||||
"this is a bug")
|
||||
|
||||
for file in os.listdir(dirname):
|
||||
if file.endswith(ext) and file.startswith(new_basename):
|
||||
log.info("Skipping existing version %s" % new_label)
|
||||
return version_up(new_filename)
|
||||
|
||||
log.info("New version %s" % new_label)
|
||||
return new_filename
|
||||
|
||||
|
||||
def switch_item(container,
|
||||
asset_name=None,
|
||||
subset_name=None,
|
||||
|
|
@ -407,65 +275,6 @@ def get_asset(asset_name=None):
|
|||
return asset_document
|
||||
|
||||
|
||||
def get_version_from_path(file):
|
||||
"""
|
||||
Finds version number in file path string
|
||||
|
||||
Args:
|
||||
file (string): file path
|
||||
|
||||
Returns:
|
||||
v: version number in string ('001')
|
||||
|
||||
"""
|
||||
pattern = re.compile(r"[\._]v([0-9]+)", re.IGNORECASE)
|
||||
try:
|
||||
return pattern.findall(file)[0]
|
||||
except IndexError:
|
||||
log.error(
|
||||
"templates:get_version_from_workfile:"
|
||||
"`{}` missing version string."
|
||||
"Example `v004`".format(file)
|
||||
)
|
||||
|
||||
|
||||
def get_last_version_from_path(path_dir, filter):
|
||||
"""
|
||||
Finds last version of given directory content
|
||||
|
||||
Args:
|
||||
path_dir (string): directory path
|
||||
filter (list): list of strings used as file name filter
|
||||
|
||||
Returns:
|
||||
string: file name with last version
|
||||
|
||||
Example:
|
||||
last_version_file = get_last_version_from_path(
|
||||
"/project/shots/shot01/work", ["shot01", "compositing", "nk"])
|
||||
"""
|
||||
|
||||
assert os.path.isdir(path_dir), "`path_dir` argument needs to be directory"
|
||||
assert isinstance(filter, list) and (
|
||||
len(filter) != 0), "`filter` argument needs to be list and not empty"
|
||||
|
||||
filtred_files = list()
|
||||
|
||||
# form regex for filtering
|
||||
patern = r".*".join(filter)
|
||||
|
||||
for f in os.listdir(path_dir):
|
||||
if not re.findall(patern, f):
|
||||
continue
|
||||
filtred_files.append(f)
|
||||
|
||||
if filtred_files:
|
||||
sorted(filtred_files)
|
||||
return filtred_files[-1]
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
def get_subsets(asset_name,
|
||||
regex_filter=None,
|
||||
version=None,
|
||||
|
|
|
|||
172
pype/lib/path_tools.py
Normal file
172
pype/lib/path_tools.py
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
import os
|
||||
import re
|
||||
import logging
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_paths_from_environ(env_key, return_first=False):
|
||||
"""Return existing paths from specific envirnment variable.
|
||||
|
||||
:param env_key: Environment key where should look for paths.
|
||||
:type env_key: str
|
||||
:param return_first: Return first path on `True`, list of all on `False`.
|
||||
:type return_first: boolean
|
||||
|
||||
Difference when none of paths exists:
|
||||
- when `return_first` is set to `False` then function returns empty list.
|
||||
- when `return_first` is set to `True` then function returns `None`.
|
||||
"""
|
||||
existing_paths = []
|
||||
paths = os.environ.get(env_key) or ""
|
||||
path_items = paths.split(os.pathsep)
|
||||
for path in path_items:
|
||||
# Skip empty string
|
||||
if not path:
|
||||
continue
|
||||
# Normalize path
|
||||
path = os.path.normpath(path)
|
||||
# Check if path exists
|
||||
if os.path.exists(path):
|
||||
# Return path if `return_first` is set to True
|
||||
if return_first:
|
||||
return path
|
||||
# Store path
|
||||
existing_paths.append(path)
|
||||
|
||||
# Return None if none of paths exists
|
||||
if return_first:
|
||||
return None
|
||||
# Return all existing paths from environment variable
|
||||
return existing_paths
|
||||
|
||||
|
||||
def get_ffmpeg_tool_path(tool="ffmpeg"):
|
||||
"""Find path to ffmpeg tool in FFMPEG_PATH paths.
|
||||
|
||||
Function looks for tool in paths set in FFMPEG_PATH environment. If tool
|
||||
exists then returns it's full path.
|
||||
|
||||
Returns tool name itself when tool path was not found. (FFmpeg path may be
|
||||
set in PATH environment variable)
|
||||
"""
|
||||
dir_paths = get_paths_from_environ("FFMPEG_PATH")
|
||||
for dir_path in dir_paths:
|
||||
for file_name in os.listdir(dir_path):
|
||||
base, _ext = os.path.splitext(file_name)
|
||||
if base.lower() == tool.lower():
|
||||
return os.path.join(dir_path, tool)
|
||||
return tool
|
||||
|
||||
|
||||
def _rreplace(s, a, b, n=1):
|
||||
"""Replace a with b in string s from right side n times."""
|
||||
return b.join(s.rsplit(a, n))
|
||||
|
||||
|
||||
def version_up(filepath):
|
||||
"""Version up filepath to a new non-existing version.
|
||||
|
||||
Parses for a version identifier like `_v001` or `.v001`
|
||||
When no version present _v001 is appended as suffix.
|
||||
|
||||
Returns:
|
||||
str: filepath with increased version number
|
||||
|
||||
"""
|
||||
dirname = os.path.dirname(filepath)
|
||||
basename, ext = os.path.splitext(os.path.basename(filepath))
|
||||
|
||||
regex = r"[._]v\d+"
|
||||
matches = re.findall(regex, str(basename), re.IGNORECASE)
|
||||
if not matches:
|
||||
log.info("Creating version...")
|
||||
new_label = "_v{version:03d}".format(version=1)
|
||||
new_basename = "{}{}".format(basename, new_label)
|
||||
else:
|
||||
label = matches[-1]
|
||||
version = re.search(r"\d+", label).group()
|
||||
padding = len(version)
|
||||
|
||||
new_version = int(version) + 1
|
||||
new_version = '{version:0{padding}d}'.format(version=new_version,
|
||||
padding=padding)
|
||||
new_label = label.replace(version, new_version, 1)
|
||||
new_basename = _rreplace(basename, label, new_label)
|
||||
|
||||
if not new_basename.endswith(new_label):
|
||||
index = (new_basename.find(new_label))
|
||||
index += len(new_label)
|
||||
new_basename = new_basename[:index]
|
||||
|
||||
new_filename = "{}{}".format(new_basename, ext)
|
||||
new_filename = os.path.join(dirname, new_filename)
|
||||
new_filename = os.path.normpath(new_filename)
|
||||
|
||||
if new_filename == filepath:
|
||||
raise RuntimeError("Created path is the same as current file,"
|
||||
"this is a bug")
|
||||
|
||||
for file in os.listdir(dirname):
|
||||
if file.endswith(ext) and file.startswith(new_basename):
|
||||
log.info("Skipping existing version %s" % new_label)
|
||||
return version_up(new_filename)
|
||||
|
||||
log.info("New version %s" % new_label)
|
||||
return new_filename
|
||||
|
||||
|
||||
def get_version_from_path(file):
|
||||
"""Find version number in file path string.s
|
||||
|
||||
Args:
|
||||
file (string): file path
|
||||
|
||||
Returns:
|
||||
v: version number in string ('001')
|
||||
|
||||
"""
|
||||
pattern = re.compile(r"[\._]v([0-9]+)", re.IGNORECASE)
|
||||
try:
|
||||
return pattern.findall(file)[0]
|
||||
except IndexError:
|
||||
log.error(
|
||||
"templates:get_version_from_workfile:"
|
||||
"`{}` missing version string."
|
||||
"Example `v004`".format(file)
|
||||
)
|
||||
|
||||
|
||||
def get_last_version_from_path(path_dir, filter):
|
||||
"""Find last version of given directory content.
|
||||
|
||||
Args:
|
||||
path_dir (string): directory path
|
||||
filter (list): list of strings used as file name filter
|
||||
|
||||
Returns:
|
||||
string: file name with last version
|
||||
|
||||
Example:
|
||||
last_version_file = get_last_version_from_path(
|
||||
"/project/shots/shot01/work", ["shot01", "compositing", "nk"])
|
||||
"""
|
||||
assert os.path.isdir(path_dir), "`path_dir` argument needs to be directory"
|
||||
assert isinstance(filter, list) and (
|
||||
len(filter) != 0), "`filter` argument needs to be list and not empty"
|
||||
|
||||
filtred_files = list()
|
||||
|
||||
# form regex for filtering
|
||||
patern = r".*".join(filter)
|
||||
|
||||
for file in os.listdir(path_dir):
|
||||
if not re.findall(patern, file):
|
||||
continue
|
||||
filtred_files.append(file)
|
||||
|
||||
if filtred_files:
|
||||
sorted(filtred_files)
|
||||
return filtred_files[-1]
|
||||
|
||||
return None
|
||||
Loading…
Add table
Add a link
Reference in a new issue