diff --git a/pype/api.py b/pype/api.py index a6762beca3..d5f0ced1a8 100644 --- a/pype/api.py +++ b/pype/api.py @@ -43,7 +43,6 @@ from .lib import ( get_subsets, get_version_from_path, get_last_version_from_path, - add_tool_to_environment, source_hash, get_latest_version ) diff --git a/pype/lib/__init__.py b/pype/lib/__init__.py index 7c9c7b0d41..05aee8cf7b 100644 --- a/pype/lib/__init__.py +++ b/pype/lib/__init__.py @@ -16,20 +16,21 @@ from .applications import ( from .plugin_tools import filter_pyblish_plugins -from .lib_old import ( - _subprocess, - get_paths_from_environ, - get_ffmpeg_tool_path, - get_hierarchy, - add_tool_to_environment, - is_latest, - any_outdated, - _rreplace, +from .path_tools import ( version_up, - switch_item, - get_asset, get_version_from_path, get_last_version_from_path, + get_paths_from_environ, + get_ffmpeg_tool_path +) + +from .lib_old import ( + _subprocess, + get_hierarchy, + is_latest, + any_outdated, + switch_item, + get_asset, get_subsets, get_linked_assets, BuildWorkfile, @@ -49,5 +50,11 @@ __all__ = [ "launch_application", "ApplicationAction", - "filter_pyblish_plugins" + "filter_pyblish_plugins", + + "version_up", + "get_version_from_path", + "get_last_version_from_path", + "get_paths_from_environ", + "get_ffmpeg_tool_path" ] diff --git a/pype/lib/lib_old.py b/pype/lib/lib_old.py index cbece671f5..eafd34264c 100644 --- a/pype/lib/lib_old.py +++ b/pype/lib/lib_old.py @@ -14,62 +14,6 @@ from ..api import config, Anatomy, Logger log = logging.getLogger(__name__) -def get_paths_from_environ(env_key, return_first=False): - """Return existing paths from specific envirnment variable. - - :param env_key: Environment key where should look for paths. - :type env_key: str - :param return_first: Return first path on `True`, list of all on `False`. - :type return_first: boolean - - Difference when none of paths exists: - - when `return_first` is set to `False` then function returns empty list. - - when `return_first` is set to `True` then function returns `None`. - """ - - existing_paths = [] - paths = os.environ.get(env_key) or "" - path_items = paths.split(os.pathsep) - for path in path_items: - # Skip empty string - if not path: - continue - # Normalize path - path = os.path.normpath(path) - # Check if path exists - if os.path.exists(path): - # Return path if `return_first` is set to True - if return_first: - return path - # Store path - existing_paths.append(path) - - # Return None if none of paths exists - if return_first: - return None - # Return all existing paths from environment variable - return existing_paths - - -def get_ffmpeg_tool_path(tool="ffmpeg"): - """Find path to ffmpeg tool in FFMPEG_PATH paths. - - Function looks for tool in paths set in FFMPEG_PATH environment. If tool - exists then returns it's full path. - - Returns tool name itself when tool path was not found. (FFmpeg path may be - set in PATH environment variable) - """ - - dir_paths = get_paths_from_environ("FFMPEG_PATH") - for dir_path in dir_paths: - for file_name in os.listdir(dir_path): - base, ext = os.path.splitext(file_name) - if base.lower() == tool.lower(): - return os.path.join(dir_path, tool) - return tool - - # Special naming case for subprocess since its a built-in method. def _subprocess(*args, **kwargs): """Convenience method for getting output errors for subprocess. @@ -135,9 +79,11 @@ def _subprocess(*args, **kwargs): return full_output +# Avalon databse functions + + def get_hierarchy(asset_name=None): - """ - Obtain asset hierarchy path string from mongo db + """Obtain asset hierarchy path string from mongo db. Returns: string: asset hierarchy path @@ -179,26 +125,8 @@ def get_hierarchy(asset_name=None): return "/".join(hierarchy_items) -def add_tool_to_environment(tools): - """ - It is adding dynamic environment to os environment. - - Args: - tool (list, tuple): list of tools, name should corespond to json/toml - - Returns: - os.environ[KEY]: adding to os.environ - """ - - import acre - tools_env = acre.get_tools(tools) - env = acre.compute(tools_env) - env = acre.merge(env, current_env=dict(os.environ)) - os.environ.update(env) - - def is_latest(representation): - """Return whether the representation is from latest version + """Return whether the representation is from latest version. Args: representation (dict): The representation document from the database. @@ -207,7 +135,6 @@ def is_latest(representation): bool: Whether the representation is of latest version. """ - version = io.find_one({"_id": representation['parent']}) if version["type"] == "master_version": return True @@ -225,8 +152,7 @@ def is_latest(representation): def any_outdated(): - """Return whether the current scene has any outdated content""" - + """Return whether the current scene has any outdated content.""" checked = set() host = avalon.api.registered_host() for container in host.ls(): @@ -243,73 +169,15 @@ def any_outdated(): ) if representation_doc and not is_latest(representation_doc): return True - elif not representation_doc: - log.debug("Container '{objectName}' has an invalid " - "representation, it is missing in the " - "database".format(**container)) + + log.debug("Container '{objectName}' has an invalid " + "representation, it is missing in the " + "database".format(**container)) checked.add(representation) return False -def _rreplace(s, a, b, n=1): - """Replace a with b in string s from right side n times""" - return b.join(s.rsplit(a, n)) - - -def version_up(filepath): - """Version up filepath to a new non-existing version. - - Parses for a version identifier like `_v001` or `.v001` - When no version present _v001 is appended as suffix. - - Returns: - str: filepath with increased version number - - """ - - dirname = os.path.dirname(filepath) - basename, ext = os.path.splitext(os.path.basename(filepath)) - - regex = r"[._]v\d+" - matches = re.findall(regex, str(basename), re.IGNORECASE) - if not matches: - log.info("Creating version...") - new_label = "_v{version:03d}".format(version=1) - new_basename = "{}{}".format(basename, new_label) - else: - label = matches[-1] - version = re.search(r"\d+", label).group() - padding = len(version) - - new_version = int(version) + 1 - new_version = '{version:0{padding}d}'.format(version=new_version, - padding=padding) - new_label = label.replace(version, new_version, 1) - new_basename = _rreplace(basename, label, new_label) - - if not new_basename.endswith(new_label): - index = (new_basename.find(new_label)) - index += len(new_label) - new_basename = new_basename[:index] - - new_filename = "{}{}".format(new_basename, ext) - new_filename = os.path.join(dirname, new_filename) - new_filename = os.path.normpath(new_filename) - - if new_filename == filepath: - raise RuntimeError("Created path is the same as current file," - "this is a bug") - - for file in os.listdir(dirname): - if file.endswith(ext) and file.startswith(new_basename): - log.info("Skipping existing version %s" % new_label) - return version_up(new_filename) - - log.info("New version %s" % new_label) - return new_filename - - def switch_item(container, asset_name=None, subset_name=None, @@ -407,65 +275,6 @@ def get_asset(asset_name=None): return asset_document -def get_version_from_path(file): - """ - Finds version number in file path string - - Args: - file (string): file path - - Returns: - v: version number in string ('001') - - """ - pattern = re.compile(r"[\._]v([0-9]+)", re.IGNORECASE) - try: - return pattern.findall(file)[0] - except IndexError: - log.error( - "templates:get_version_from_workfile:" - "`{}` missing version string." - "Example `v004`".format(file) - ) - - -def get_last_version_from_path(path_dir, filter): - """ - Finds last version of given directory content - - Args: - path_dir (string): directory path - filter (list): list of strings used as file name filter - - Returns: - string: file name with last version - - Example: - last_version_file = get_last_version_from_path( - "/project/shots/shot01/work", ["shot01", "compositing", "nk"]) - """ - - assert os.path.isdir(path_dir), "`path_dir` argument needs to be directory" - assert isinstance(filter, list) and ( - len(filter) != 0), "`filter` argument needs to be list and not empty" - - filtred_files = list() - - # form regex for filtering - patern = r".*".join(filter) - - for f in os.listdir(path_dir): - if not re.findall(patern, f): - continue - filtred_files.append(f) - - if filtred_files: - sorted(filtred_files) - return filtred_files[-1] - else: - return None - - def get_subsets(asset_name, regex_filter=None, version=None, diff --git a/pype/lib/path_tools.py b/pype/lib/path_tools.py new file mode 100644 index 0000000000..c56f4c8974 --- /dev/null +++ b/pype/lib/path_tools.py @@ -0,0 +1,172 @@ +import os +import re +import logging + +log = logging.getLogger(__name__) + + +def get_paths_from_environ(env_key, return_first=False): + """Return existing paths from specific envirnment variable. + + :param env_key: Environment key where should look for paths. + :type env_key: str + :param return_first: Return first path on `True`, list of all on `False`. + :type return_first: boolean + + Difference when none of paths exists: + - when `return_first` is set to `False` then function returns empty list. + - when `return_first` is set to `True` then function returns `None`. + """ + existing_paths = [] + paths = os.environ.get(env_key) or "" + path_items = paths.split(os.pathsep) + for path in path_items: + # Skip empty string + if not path: + continue + # Normalize path + path = os.path.normpath(path) + # Check if path exists + if os.path.exists(path): + # Return path if `return_first` is set to True + if return_first: + return path + # Store path + existing_paths.append(path) + + # Return None if none of paths exists + if return_first: + return None + # Return all existing paths from environment variable + return existing_paths + + +def get_ffmpeg_tool_path(tool="ffmpeg"): + """Find path to ffmpeg tool in FFMPEG_PATH paths. + + Function looks for tool in paths set in FFMPEG_PATH environment. If tool + exists then returns it's full path. + + Returns tool name itself when tool path was not found. (FFmpeg path may be + set in PATH environment variable) + """ + dir_paths = get_paths_from_environ("FFMPEG_PATH") + for dir_path in dir_paths: + for file_name in os.listdir(dir_path): + base, _ext = os.path.splitext(file_name) + if base.lower() == tool.lower(): + return os.path.join(dir_path, tool) + return tool + + +def _rreplace(s, a, b, n=1): + """Replace a with b in string s from right side n times.""" + return b.join(s.rsplit(a, n)) + + +def version_up(filepath): + """Version up filepath to a new non-existing version. + + Parses for a version identifier like `_v001` or `.v001` + When no version present _v001 is appended as suffix. + + Returns: + str: filepath with increased version number + + """ + dirname = os.path.dirname(filepath) + basename, ext = os.path.splitext(os.path.basename(filepath)) + + regex = r"[._]v\d+" + matches = re.findall(regex, str(basename), re.IGNORECASE) + if not matches: + log.info("Creating version...") + new_label = "_v{version:03d}".format(version=1) + new_basename = "{}{}".format(basename, new_label) + else: + label = matches[-1] + version = re.search(r"\d+", label).group() + padding = len(version) + + new_version = int(version) + 1 + new_version = '{version:0{padding}d}'.format(version=new_version, + padding=padding) + new_label = label.replace(version, new_version, 1) + new_basename = _rreplace(basename, label, new_label) + + if not new_basename.endswith(new_label): + index = (new_basename.find(new_label)) + index += len(new_label) + new_basename = new_basename[:index] + + new_filename = "{}{}".format(new_basename, ext) + new_filename = os.path.join(dirname, new_filename) + new_filename = os.path.normpath(new_filename) + + if new_filename == filepath: + raise RuntimeError("Created path is the same as current file," + "this is a bug") + + for file in os.listdir(dirname): + if file.endswith(ext) and file.startswith(new_basename): + log.info("Skipping existing version %s" % new_label) + return version_up(new_filename) + + log.info("New version %s" % new_label) + return new_filename + + +def get_version_from_path(file): + """Find version number in file path string.s + + Args: + file (string): file path + + Returns: + v: version number in string ('001') + + """ + pattern = re.compile(r"[\._]v([0-9]+)", re.IGNORECASE) + try: + return pattern.findall(file)[0] + except IndexError: + log.error( + "templates:get_version_from_workfile:" + "`{}` missing version string." + "Example `v004`".format(file) + ) + + +def get_last_version_from_path(path_dir, filter): + """Find last version of given directory content. + + Args: + path_dir (string): directory path + filter (list): list of strings used as file name filter + + Returns: + string: file name with last version + + Example: + last_version_file = get_last_version_from_path( + "/project/shots/shot01/work", ["shot01", "compositing", "nk"]) + """ + assert os.path.isdir(path_dir), "`path_dir` argument needs to be directory" + assert isinstance(filter, list) and ( + len(filter) != 0), "`filter` argument needs to be list and not empty" + + filtred_files = list() + + # form regex for filtering + patern = r".*".join(filter) + + for file in os.listdir(path_dir): + if not re.findall(patern, file): + continue + filtred_files.append(file) + + if filtred_files: + sorted(filtred_files) + return filtred_files[-1] + + return None