ayon-core/pype/lib.py
Ondřej Samohel 2a25a9ce44 Merged in feature/PYPE-425-global-context-plugin-with-loaded-presets (pull request #293)
Feature/PYPE-425 global context plugin with loaded presets

Approved-by: Milan Kolar <milan@orbi.tools>
2019-09-06 16:47:16 +00:00

574 lines
17 KiB
Python

import os
import re
import logging
import importlib
import itertools
import contextlib
import subprocess
import inspect
from .vendor import pather
from .vendor.pather.error import ParseError
import avalon.io as io
import avalon.api
import avalon
log = logging.getLogger(__name__)
# Special naming case for subprocess since its a built-in method.
def _subprocess(args):
"""Convenience method for getting output errors for subprocess."""
proc = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
env=os.environ
)
output = proc.communicate()[0]
if proc.returncode != 0:
log.error(output)
raise ValueError("\"{}\" was not successful: {}".format(args, output))
return output
def get_hierarchy(asset_name=None):
"""
Obtain asset hierarchy path string from mongo db
Returns:
string: asset hierarchy path
"""
if not asset_name:
asset_name = io.Session.get("AVALON_ASSET", os.environ["AVALON_ASSET"])
asset_entity = io.find_one({
"type": 'asset',
"name": asset_name
})
not_set = "PARENTS_NOT_SET"
entity_parents = asset_entity.get("data", {}).get("parents", not_set)
# If entity already have parents then just return joined
if entity_parents != not_set:
return "/".join(entity_parents)
# Else query parents through visualParents and store result to entity
hierarchy_items = []
entity = asset_entity
while True:
parent_id = entity.get("data", {}).get("visualParent")
if not parent_id:
break
entity = io.find_one({"_id": parent_id})
hierarchy_items.append(entity["name"])
# Add parents to entity data for next query
entity_data = asset_entity.get("data", {})
entity_data["parents"] = hierarchy_items
io.update_many(
{"_id": asset_entity["_id"]},
{"$set": {"data": entity_data}}
)
return "/".join(hierarchy_items)
def add_tool_to_environment(tools):
"""
It is adding dynamic environment to os environment.
Args:
tool (list, tuple): list of tools, name should corespond to json/toml
Returns:
os.environ[KEY]: adding to os.environ
"""
import acre
tools_env = acre.get_tools(tools)
env = acre.compute(tools_env)
env = acre.merge(env, current_env=dict(os.environ))
os.environ.update(env)
@contextlib.contextmanager
def modified_environ(*remove, **update):
"""
Temporarily updates the ``os.environ`` dictionary in-place.
The ``os.environ`` dictionary is updated in-place so that the modification
is sure to work in all situations.
:param remove: Environment variables to remove.
:param update: Dictionary of environment variables and values to add/update.
"""
env = os.environ
update = update or {}
remove = remove or []
# List of environment variables being updated or removed.
stomped = (set(update.keys()) | set(remove)) & set(env.keys())
# Environment variables and values to restore on exit.
update_after = {k: env[k] for k in stomped}
# Environment variables and values to remove on exit.
remove_after = frozenset(k for k in update if k not in env)
try:
env.update(update)
[env.pop(k, None) for k in remove]
yield
finally:
env.update(update_after)
[env.pop(k) for k in remove_after]
def pairwise(iterable):
"""s -> (s0,s1), (s2,s3), (s4, s5), ..."""
a = iter(iterable)
return itertools.izip(a, a)
def grouper(iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks
Examples:
grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx
"""
args = [iter(iterable)] * n
return itertools.izip_longest(fillvalue=fillvalue, *args)
def is_latest(representation):
"""Return whether the representation is from latest version
Args:
representation (dict): The representation document from the database.
Returns:
bool: Whether the representation is of latest version.
"""
version = io.find_one({"_id": representation['parent']})
# Get highest version under the parent
highest_version = io.find_one({
"type": "version",
"parent": version["parent"]
}, sort=[("name", -1)], projection={"name": True})
if version['name'] == highest_version['name']:
return True
else:
return False
def any_outdated():
"""Return whether the current scene has any outdated content"""
checked = set()
host = avalon.api.registered_host()
for container in host.ls():
representation = container['representation']
if representation in checked:
continue
representation_doc = io.find_one({"_id": io.ObjectId(representation),
"type": "representation"},
projection={"parent": True})
if representation_doc and not is_latest(representation_doc):
return True
elif not representation_doc:
log.debug("Container '{objectName}' has an invalid "
"representation, it is missing in the "
"database".format(**container))
checked.add(representation)
return False
def _rreplace(s, a, b, n=1):
"""Replace a with b in string s from right side n times"""
return b.join(s.rsplit(a, n))
def version_up(filepath):
"""Version up filepath to a new non-existing version.
Parses for a version identifier like `_v001` or `.v001`
When no version present _v001 is appended as suffix.
Returns:
str: filepath with increased version number
"""
dirname = os.path.dirname(filepath)
basename, ext = os.path.splitext(os.path.basename(filepath))
regex = r"[._]v\d+"
matches = re.findall(regex, str(basename), re.IGNORECASE)
if not matches:
log.info("Creating version...")
new_label = "_v{version:03d}".format(version=1)
new_basename = "{}{}".format(basename, new_label)
else:
label = matches[-1]
version = re.search(r"\d+", label).group()
padding = len(version)
new_version = int(version) + 1
new_version = '{version:0{padding}d}'.format(version=new_version,
padding=padding)
new_label = label.replace(version, new_version, 1)
new_basename = _rreplace(basename, label, new_label)
if not new_basename.endswith(new_label):
index = (new_basename.find(new_label))
index += len(new_label)
new_basename = new_basename[:index]
new_filename = "{}{}".format(new_basename, ext)
new_filename = os.path.join(dirname, new_filename)
new_filename = os.path.normpath(new_filename)
if new_filename == filepath:
raise RuntimeError("Created path is the same as current file,"
"this is a bug")
for file in os.listdir(dirname):
if file.endswith(ext) and file.startswith(new_basename):
log.info("Skipping existing version %s" % new_label)
return version_up(new_filename)
log.info("New version %s" % new_label)
return new_filename
def switch_item(container,
asset_name=None,
subset_name=None,
representation_name=None):
"""Switch container asset, subset or representation of a container by name.
It'll always switch to the latest version - of course a different
approach could be implemented.
Args:
container (dict): data of the item to switch with
asset_name (str): name of the asset
subset_name (str): name of the subset
representation_name (str): name of the representation
Returns:
dict
"""
if all(not x for x in [asset_name, subset_name, representation_name]):
raise ValueError("Must have at least one change provided to switch.")
# Collect any of current asset, subset and representation if not provided
# so we can use the original name from those.
if any(not x for x in [asset_name, subset_name, representation_name]):
_id = io.ObjectId(container["representation"])
representation = io.find_one({"type": "representation", "_id": _id})
version, subset, asset, project = io.parenthood(representation)
if asset_name is None:
asset_name = asset["name"]
if subset_name is None:
subset_name = subset["name"]
if representation_name is None:
representation_name = representation["name"]
# Find the new one
asset = io.find_one({"name": asset_name, "type": "asset"})
assert asset, ("Could not find asset in the database with the name "
"'%s'" % asset_name)
subset = io.find_one({"name": subset_name,
"type": "subset",
"parent": asset["_id"]})
assert subset, ("Could not find subset in the database with the name "
"'%s'" % subset_name)
version = io.find_one({"type": "version",
"parent": subset["_id"]},
sort=[('name', -1)])
assert version, "Could not find a version for {}.{}".format(
asset_name, subset_name
)
representation = io.find_one({"name": representation_name,
"type": "representation",
"parent": version["_id"]})
assert representation, ("Could not find representation in the database with"
" the name '%s'" % representation_name)
avalon.api.switch(container, representation)
return representation
def _get_host_name():
_host = avalon.api.registered_host()
# This covers nested module name like avalon.maya
return _host.__name__.rsplit(".", 1)[-1]
def get_asset(asset_name=None):
entity_data_keys_from_project_when_miss = [
"frameStart", "frameEnd", "handleStart", "handleEnd", "fps",
"resolutionWidth", "resolutionHeight"
]
entity_keys_from_project_when_miss = []
alternatives = {
"handleStart": "handles",
"handleEnd": "handles"
}
defaults = {
"handleStart": 0,
"handleEnd": 0
}
if not asset_name:
asset_name = avalon.api.Session["AVALON_ASSET"]
asset_document = io.find_one({"name": asset_name, "type": "asset"})
if not asset_document:
raise TypeError("Entity \"{}\" was not found in DB".format(asset_name))
project_document = io.find_one({"type": "project"})
for key in entity_data_keys_from_project_when_miss:
if asset_document["data"].get(key):
continue
value = project_document["data"].get(key)
if value is not None or key not in alternatives:
asset_document["data"][key] = value
continue
alt_key = alternatives[key]
value = asset_document["data"].get(alt_key)
if value is not None:
asset_document["data"][key] = value
continue
value = project_document["data"].get(alt_key)
if value:
asset_document["data"][key] = value
continue
if key in defaults:
asset_document["data"][key] = defaults[key]
for key in entity_keys_from_project_when_miss:
if asset_document.get(key):
continue
value = project_document.get(key)
if value is not None or key not in alternatives:
asset_document[key] = value
continue
alt_key = alternatives[key]
value = asset_document.get(alt_key)
if value:
asset_document[key] = value
continue
value = project_document.get(alt_key)
if value:
asset_document[key] = value
continue
if key in defaults:
asset_document[key] = defaults[key]
return asset_document
def get_project():
io.install()
return io.find_one({"type": "project"})
def get_version_from_path(file):
"""
Finds version number in file path string
Args:
file (string): file path
Returns:
v: version number in string ('001')
"""
pattern = re.compile(r"[\._]v([0-9]+)")
try:
return pattern.findall(file)[0]
except IndexError:
log.error(
"templates:get_version_from_workfile:"
"`{}` missing version string."
"Example `v004`".format(file)
)
def get_avalon_database():
if io._database is None:
set_io_database()
return io._database
def set_io_database():
required_keys = ["AVALON_PROJECT", "AVALON_ASSET", "AVALON_SILO"]
for key in required_keys:
os.environ[key] = os.environ.get(key, "")
io.install()
def get_all_avalon_projects():
db = get_avalon_database()
projects = []
for name in db.collection_names():
projects.append(db[name].find_one({'type': 'project'}))
return projects
def filter_pyblish_plugins(plugins):
"""
This servers as plugin filter / modifier for pyblish. It will load plugin
definitions from presets and filter those needed to be excluded.
:param plugins: Dictionary of plugins produced by :mod:`pyblish-base`
`discover()` method.
:type plugins: Dict
"""
from pypeapp import config
from pyblish import api
host = api.current_host()
presets = config.get_presets().get('plugins', {})
# iterate over plugins
for plugin in plugins[:]:
# skip if there are no presets to process
if not presets:
continue
file = os.path.normpath(inspect.getsourcefile(plugin))
file = os.path.normpath(file)
# host determined from path
host_from_file = file.split(os.path.sep)[-3:-2][0]
plugin_kind = file.split(os.path.sep)[-2:-1][0]
try:
config_data = presets[host]["publish"][plugin.__name__]
except KeyError:
try:
config_data = presets[host_from_file][plugin_kind][plugin.__name__] # noqa: E501
except KeyError:
continue
for option, value in config_data.items():
if option == "enabled" and value is False:
log.info('removing plugin {}'.format(plugin.__name__))
plugins.remove(plugin)
else:
log.info('setting {}:{} on plugin {}'.format(
option, value, plugin.__name__))
setattr(plugin, option, value)
def get_subsets(asset_name,
regex_filter=None,
version=None,
representations=["exr", "dpx"]):
"""
Query subsets with filter on name.
The method will return all found subsets and its defined version and subsets. Version could be specified with number. Representation can be filtered.
Arguments:
asset_name (str): asset (shot) name
regex_filter (raw): raw string with filter pattern
version (str or int): `last` or number of version
representations (list): list for all representations
Returns:
dict: subsets with version and representaions in keys
"""
from avalon import io
# query asset from db
asset_io = io.find_one({"type": "asset",
"name": asset_name})
# check if anything returned
assert asset_io, "Asset not existing. \
Check correct name: `{}`".format(asset_name)
# create subsets query filter
filter_query = {"type": "subset", "parent": asset_io["_id"]}
# add reggex filter string into query filter
if regex_filter:
filter_query.update({"name": {"$regex": r"{}".format(regex_filter)}})
else:
filter_query.update({"name": {"$regex": r'.*'}})
# query all assets
subsets = [s for s in io.find(filter_query)]
assert subsets, "No subsets found. Check correct filter. Try this for start `r'.*'`: asset: `{}`".format(asset_name)
output_dict = {}
# Process subsets
for subset in subsets:
if not version:
version_sel = io.find_one({"type": "version",
"parent": subset["_id"]},
sort=[("name", -1)])
else:
assert isinstance(version, int), "version needs to be `int` type"
version_sel = io.find_one({"type": "version",
"parent": subset["_id"],
"name": int(version)})
find_dict = {"type": "representation",
"parent": version_sel["_id"]}
filter_repr = {"$or": [{"name": repr} for repr in representations]}
find_dict.update(filter_repr)
repres_out = [i for i in io.find(find_dict)]
if len(repres_out) > 0:
output_dict[subset["name"]] = {"version": version_sel,
"representaions": repres_out}
return output_dict