#664 - moved last 2 functions

removed obsolete lib_old.py
This commit is contained in:
Petr Kalis 2020-11-11 20:25:44 +01:00
parent 0353a833bf
commit 5d07af223c
5 changed files with 97 additions and 101 deletions

View file

@ -21,10 +21,11 @@ from .hooks import PypeHook, execute_hook
from .applications import (
ApplicationLaunchFailed,
launch_application,
ApplicationAction
ApplicationAction,
_subprocess
)
from .plugin_tools import filter_pyblish_plugins
from .plugin_tools import filter_pyblish_plugins, source_hash
from .path_tools import (
version_up,
@ -34,10 +35,6 @@ from .path_tools import (
get_ffmpeg_tool_path
)
from .lib_old import (
_subprocess,
source_hash
)
from .ffmpeg_utils import ffprobe_streams
__all__ = [
@ -67,5 +64,8 @@ __all__ = [
"get_paths_from_environ",
"get_ffmpeg_tool_path",
"ffprobe_streams"
"ffprobe_streams",
"source_hash",
"_subprocess"
]

View file

@ -4,6 +4,7 @@ import getpass
import copy
import platform
import logging
import subprocess
import acre
@ -389,3 +390,68 @@ class ApplicationAction(avalon.api.Action):
on_error="ignore"
)
self.log.debug("Timer start triggered successfully.")
# Special naming case for subprocess since its a built-in method.
def _subprocess(*args, **kwargs):
"""Convenience method for getting output errors for subprocess.
Entered arguments and keyword arguments are passed to subprocess Popen.
Args:
*args: Variable length arument list passed to Popen.
**kwargs : Arbitary keyword arguments passed to Popen. Is possible to
pass `logging.Logger` object under "logger" if want to use
different than lib's logger.
Returns:
str: Full output of subprocess concatenated stdout and stderr.
Raises:
RuntimeError: Exception is raised if process finished with nonzero
return code.
"""
# Get environents from kwarg or use current process environments if were
# not passed.
env = kwargs.get("env") or os.environ
# Make sure environment contains only strings
filtered_env = {k: str(v) for k, v in env.items()}
# Use lib's logger if was not passed with kwargs.
logger = kwargs.pop("logger", log)
# set overrides
kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE)
kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE)
kwargs['stdin'] = kwargs.get('stdin', subprocess.PIPE)
kwargs['env'] = filtered_env
proc = subprocess.Popen(*args, **kwargs)
full_output = ""
_stdout, _stderr = proc.communicate()
if _stdout:
_stdout = _stdout.decode("utf-8")
full_output += _stdout
logger.debug(_stdout)
if _stderr:
_stderr = _stderr.decode("utf-8")
# Add additional line break if output already containt stdout
if full_output:
full_output += "\n"
full_output += _stderr
logger.warning(_stderr)
if proc.returncode != 0:
exc_msg = "Executing arguments was not successful: \"{}\"".format(args)
if _stdout:
exc_msg += "\n\nOutput:\n{}".format(_stdout)
if _stderr:
exc_msg += "Error:\n{}".format(_stderr)
raise RuntimeError(exc_msg)
return full_output

View file

@ -1,94 +0,0 @@
import os
import logging
import contextlib
import subprocess
import avalon.api
log = logging.getLogger(__name__)
# Special naming case for subprocess since its a built-in method.
def _subprocess(*args, **kwargs):
"""Convenience method for getting output errors for subprocess.
Entered arguments and keyword arguments are passed to subprocess Popen.
Args:
*args: Variable length arument list passed to Popen.
**kwargs : Arbitary keyword arguments passed to Popen. Is possible to
pass `logging.Logger` object under "logger" if want to use
different than lib's logger.
Returns:
str: Full output of subprocess concatenated stdout and stderr.
Raises:
RuntimeError: Exception is raised if process finished with nonzero
return code.
"""
# Get environents from kwarg or use current process environments if were
# not passed.
env = kwargs.get("env") or os.environ
# Make sure environment contains only strings
filtered_env = {k: str(v) for k, v in env.items()}
# Use lib's logger if was not passed with kwargs.
logger = kwargs.pop("logger", log)
# set overrides
kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE)
kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE)
kwargs['stdin'] = kwargs.get('stdin', subprocess.PIPE)
kwargs['env'] = filtered_env
proc = subprocess.Popen(*args, **kwargs)
full_output = ""
_stdout, _stderr = proc.communicate()
if _stdout:
_stdout = _stdout.decode("utf-8")
full_output += _stdout
logger.debug(_stdout)
if _stderr:
_stderr = _stderr.decode("utf-8")
# Add additional line break if output already containt stdout
if full_output:
full_output += "\n"
full_output += _stderr
logger.warning(_stderr)
if proc.returncode != 0:
exc_msg = "Executing arguments was not successful: \"{}\"".format(args)
if _stdout:
exc_msg += "\n\nOutput:\n{}".format(_stdout)
if _stderr:
exc_msg += "Error:\n{}".format(_stderr)
raise RuntimeError(exc_msg)
return full_output
def source_hash(filepath, *args):
"""Generate simple identifier for a source file.
This is used to identify whether a source file has previously been
processe into the pipeline, e.g. a texture.
The hash is based on source filepath, modification time and file size.
This is only used to identify whether a specific source file was already
published before from the same location with the same modification date.
We opt to do it this way as opposed to Avalanch C4 hash as this is much
faster and predictable enough for all our production use cases.
Args:
filepath (str): The source file path.
You can specify additional arguments in the function
to allow for specific 'processing' values to be included.
"""
# We replace dots with comma because . cannot be a key in a pymongo dict.
file_name = os.path.basename(filepath)
time = str(os.path.getmtime(filepath))
size = str(os.path.getsize(filepath))
return "|".join([file_name, time, size] + list(args)).replace(".", ",")

View file

@ -57,3 +57,24 @@ def filter_pyblish_plugins(plugins):
option, value, plugin.__name__))
setattr(plugin, option, value)
def source_hash(filepath, *args):
"""Generate simple identifier for a source file.
This is used to identify whether a source file has previously been
processe into the pipeline, e.g. a texture.
The hash is based on source filepath, modification time and file size.
This is only used to identify whether a specific source file was already
published before from the same location with the same modification date.
We opt to do it this way as opposed to Avalanch C4 hash as this is much
faster and predictable enough for all our production use cases.
Args:
filepath (str): The source file path.
You can specify additional arguments in the function
to allow for specific 'processing' values to be included.
"""
# We replace dots with comma because . cannot be a key in a pymongo dict.
file_name = os.path.basename(filepath)
time = str(os.path.getmtime(filepath))
size = str(os.path.getsize(filepath))
return "|".join([file_name, time, size] + list(args)).replace(".", ",")

View file

@ -32,5 +32,8 @@ def test_backward_compatibility(printer):
from pype.hosts.fusion.lib import switch_item
from pype.lib import source_hash
from pype.lib import _subprocess
except ImportError as e:
raise