Merge fixes for abstract classes

This commit is contained in:
Petr Kalis 2020-11-11 16:34:32 +01:00
commit d146ba6c86
15 changed files with 650 additions and 577 deletions

View file

@ -39,7 +39,6 @@ from .action import (
from .lib import (
version_up,
get_asset,
get_project,
get_hierarchy,
get_subsets,
get_version_from_path,
@ -88,7 +87,6 @@ __all__ = [
# get contextual data
"version_up",
"get_project",
"get_hierarchy",
"get_asset",
"get_subsets",

View file

@ -4,6 +4,7 @@ import sys
import hiero
import pyblish.api
import avalon.api as avalon
import avalon.io
from avalon.vendor.Qt import (QtWidgets, QtGui)
import pype.api as pype
from pype.api import Logger, Anatomy
@ -58,7 +59,8 @@ def sync_avalon_data_to_workfile():
project.setProjectRoot(active_project_root)
# get project data from avalon db
project_data = pype.get_project()["data"]
project_doc = avalon.io.find_one({"type": "project"})
project_data = project_doc["data"]
log.debug("project_data: {}".format(project_data))

View file

@ -1857,8 +1857,8 @@ def set_context_settings():
"""
# Todo (Wijnand): apply renderer and resolution of project
project_data = lib.get_project()["data"]
project_doc = io.find_one({"type": "project"})
project_data = project_doc["data"]
asset_data = lib.get_asset()["data"]
# Set project fps

View file

@ -195,7 +195,7 @@ def format_anatomy(data):
if not version:
file = script_name()
data["version"] = pype.get_version_from_path(file)
project_document = pype.get_project()
project_document = io.find_one({"type": "project"})
data.update({
"subset": data["avalon"]["subset"],
"asset": data["avalon"]["asset"],

View file

@ -2,10 +2,63 @@
"""Pype lib module."""
from .abstract_submit_deadline import DeadlineJobInfo, AbstractSubmitDeadline
from .abstract_collect_render import RenderInstance, AbstractCollectRender
from .deprecated import (
get_avalon_database,
set_io_database
)
from .hooks import PypeHook, execute_hook
from .applications import (
ApplicationLaunchFailed,
launch_application,
ApplicationAction
)
from .plugin_tools import filter_pyblish_plugins
from .lib_old import (
_subprocess,
get_paths_from_environ,
get_ffmpeg_tool_path,
get_hierarchy,
add_tool_to_environment,
modified_environ,
pairwise,
grouper,
is_latest,
any_outdated,
_rreplace,
version_up,
switch_item,
_get_host_name,
get_asset,
get_version_from_path,
get_last_version_from_path,
get_subsets,
get_linked_assets,
BuildWorkfile,
ffprobe_streams,
source_hash,
get_latest_version
)
__all__ = [
"get_avalon_database",
"set_io_database",
"PypeHook",
"execute_hook",
"ApplicationLaunchFailed",
"launch_application",
"ApplicationAction",
"filter_pyblish_plugins",
"AbstractSubmitDeadline",
"DeadlineJobInfo",
"RenderInstance",

391
pype/lib/applications.py Normal file
View file

@ -0,0 +1,391 @@
import os
import sys
import getpass
import copy
import platform
import logging
import acre
import avalon.lib
from ..api import Anatomy, Logger, config
from .hooks import execute_hook
from .deprecated import get_avalon_database
log = logging.getLogger(__name__)
class ApplicationLaunchFailed(Exception):
pass
def launch_application(project_name, asset_name, task_name, app_name):
"""Launch host application with filling required environments.
TODO(iLLiCiT): This should be split into more parts.
"""
# `get_avalon_database` is in Pype 3 replaced with using `AvalonMongoDB`
database = get_avalon_database()
project_document = database[project_name].find_one({"type": "project"})
asset_document = database[project_name].find_one({
"type": "asset",
"name": asset_name
})
asset_doc_parents = asset_document["data"].get("parents")
hierarchy = "/".join(asset_doc_parents)
app_def = avalon.lib.get_application(app_name)
app_label = app_def.get("ftrack_label", app_def.get("label", app_name))
host_name = app_def["application_dir"]
# Workfile data collection may be special function?
data = {
"project": {
"name": project_document["name"],
"code": project_document["data"].get("code")
},
"task": task_name,
"asset": asset_name,
"app": host_name,
"hierarchy": hierarchy
}
try:
anatomy = Anatomy(project_name)
anatomy_filled = anatomy.format(data)
workdir = os.path.normpath(anatomy_filled["work"]["folder"])
except Exception as exc:
raise ApplicationLaunchFailed(
"Error in anatomy.format: {}".format(str(exc))
)
try:
os.makedirs(workdir)
except FileExistsError:
pass
last_workfile_path = None
extensions = avalon.api.HOST_WORKFILE_EXTENSIONS.get(host_name)
if extensions:
# Find last workfile
file_template = anatomy.templates["work"]["file"]
data.update({
"version": 1,
"user": os.environ.get("PYPE_USERNAME") or getpass.getuser(),
"ext": extensions[0]
})
last_workfile_path = avalon.api.last_workfile(
workdir, file_template, data, extensions, True
)
# set environments for Avalon
prep_env = copy.deepcopy(os.environ)
prep_env.update({
"AVALON_PROJECT": project_name,
"AVALON_ASSET": asset_name,
"AVALON_TASK": task_name,
"AVALON_APP": host_name,
"AVALON_APP_NAME": app_name,
"AVALON_HIERARCHY": hierarchy,
"AVALON_WORKDIR": workdir
})
start_last_workfile = avalon.api.should_start_last_workfile(
project_name, host_name, task_name
)
# Store boolean as "0"(False) or "1"(True)
prep_env["AVALON_OPEN_LAST_WORKFILE"] = (
str(int(bool(start_last_workfile)))
)
if (
start_last_workfile
and last_workfile_path
and os.path.exists(last_workfile_path)
):
prep_env["AVALON_LAST_WORKFILE"] = last_workfile_path
prep_env.update(anatomy.roots_obj.root_environments())
# collect all the 'environment' attributes from parents
tools_attr = [prep_env["AVALON_APP"], prep_env["AVALON_APP_NAME"]]
tools_env = asset_document["data"].get("tools_env") or []
tools_attr.extend(tools_env)
tools_env = acre.get_tools(tools_attr)
env = acre.compute(tools_env)
env = acre.merge(env, current_env=dict(prep_env))
# Get path to execute
st_temp_path = os.environ["PYPE_CONFIG"]
os_plat = platform.system().lower()
# Path to folder with launchers
path = os.path.join(st_temp_path, "launchers", os_plat)
# Full path to executable launcher
execfile = None
launch_hook = app_def.get("launch_hook")
if launch_hook:
log.info("launching hook: {}".format(launch_hook))
ret_val = execute_hook(launch_hook, env=env)
if not ret_val:
raise ApplicationLaunchFailed(
"Hook didn't finish successfully {}".format(app_label)
)
if sys.platform == "win32":
for ext in os.environ["PATHEXT"].split(os.pathsep):
fpath = os.path.join(path.strip('"'), app_def["executable"] + ext)
if os.path.isfile(fpath) and os.access(fpath, os.X_OK):
execfile = fpath
break
# Run SW if was found executable
if execfile is None:
raise ApplicationLaunchFailed(
"We didn't find launcher for {}".format(app_label)
)
popen = avalon.lib.launch(
executable=execfile, args=[], environment=env
)
elif (
sys.platform.startswith("linux")
or sys.platform.startswith("darwin")
):
execfile = os.path.join(path.strip('"'), app_def["executable"])
# Run SW if was found executable
if execfile is None:
raise ApplicationLaunchFailed(
"We didn't find launcher for {}".format(app_label)
)
if not os.path.isfile(execfile):
raise ApplicationLaunchFailed(
"Launcher doesn't exist - {}".format(execfile)
)
try:
fp = open(execfile)
except PermissionError as perm_exc:
raise ApplicationLaunchFailed(
"Access denied on launcher {} - {}".format(execfile, perm_exc)
)
fp.close()
# check executable permission
if not os.access(execfile, os.X_OK):
raise ApplicationLaunchFailed(
"No executable permission - {}".format(execfile)
)
popen = avalon.lib.launch( # noqa: F841
"/usr/bin/env", args=["bash", execfile], environment=env
)
return popen
class ApplicationAction(avalon.api.Action):
"""Default application launcher
This is a convenience application Action that when "config" refers to a
parsed application `.toml` this can launch the application.
"""
_log = None
config = None
group = None
variant = None
required_session_keys = (
"AVALON_PROJECT",
"AVALON_ASSET",
"AVALON_TASK"
)
@property
def log(self):
if self._log is None:
self._log = Logger().get_logger(self.__class__.__name__)
return self._log
def is_compatible(self, session):
for key in self.required_session_keys:
if key not in session:
return False
return True
def process(self, session, **kwargs):
"""Process the full Application action"""
project_name = session["AVALON_PROJECT"]
asset_name = session["AVALON_ASSET"]
task_name = session["AVALON_TASK"]
launch_application(
project_name, asset_name, task_name, self.name
)
self._ftrack_after_launch_procedure(
project_name, asset_name, task_name
)
def _ftrack_after_launch_procedure(
self, project_name, asset_name, task_name
):
# TODO move to launch hook
required_keys = ("FTRACK_SERVER", "FTRACK_API_USER", "FTRACK_API_KEY")
for key in required_keys:
if not os.environ.get(key):
self.log.debug((
"Missing required environment \"{}\""
" for Ftrack after launch procedure."
).format(key))
return
try:
import ftrack_api
session = ftrack_api.Session(auto_connect_event_hub=True)
self.log.debug("Ftrack session created")
except Exception:
self.log.warning("Couldn't create Ftrack session")
return
try:
entity = self._find_ftrack_task_entity(
session, project_name, asset_name, task_name
)
self._ftrack_status_change(session, entity, project_name)
self._start_timer(session, entity, ftrack_api)
except Exception:
self.log.warning(
"Couldn't finish Ftrack procedure.", exc_info=True
)
return
finally:
session.close()
def _find_ftrack_task_entity(
self, session, project_name, asset_name, task_name
):
project_entity = session.query(
"Project where full_name is \"{}\"".format(project_name)
).first()
if not project_entity:
self.log.warning(
"Couldn't find project \"{}\" in Ftrack.".format(project_name)
)
return
potential_task_entities = session.query((
"TypedContext where parent.name is \"{}\" and project_id is \"{}\""
).format(asset_name, project_entity["id"])).all()
filtered_entities = []
for _entity in potential_task_entities:
if (
_entity.entity_type.lower() == "task"
and _entity["name"] == task_name
):
filtered_entities.append(_entity)
if not filtered_entities:
self.log.warning((
"Couldn't find task \"{}\" under parent \"{}\" in Ftrack."
).format(task_name, asset_name))
return
if len(filtered_entities) > 1:
self.log.warning((
"Found more than one task \"{}\""
" under parent \"{}\" in Ftrack."
).format(task_name, asset_name))
return
return filtered_entities[0]
def _ftrack_status_change(self, session, entity, project_name):
presets = config.get_presets(project_name)["ftrack"]["ftrack_config"]
statuses = presets.get("status_update")
if not statuses:
return
actual_status = entity["status"]["name"].lower()
already_tested = set()
ent_path = "/".join(
[ent["name"] for ent in entity["link"]]
)
while True:
next_status_name = None
for key, value in statuses.items():
if key in already_tested:
continue
if actual_status in value or "_any_" in value:
if key != "_ignore_":
next_status_name = key
already_tested.add(key)
break
already_tested.add(key)
if next_status_name is None:
break
try:
query = "Status where name is \"{}\"".format(
next_status_name
)
status = session.query(query).one()
entity["status"] = status
session.commit()
self.log.debug("Changing status to \"{}\" <{}>".format(
next_status_name, ent_path
))
break
except Exception:
session.rollback()
msg = (
"Status \"{}\" in presets wasn't found"
" on Ftrack entity type \"{}\""
).format(next_status_name, entity.entity_type)
self.log.warning(msg)
def _start_timer(self, session, entity, _ftrack_api):
self.log.debug("Triggering timer start.")
user_entity = session.query("User where username is \"{}\"".format(
os.environ["FTRACK_API_USER"]
)).first()
if not user_entity:
self.log.warning(
"Couldn't find user with username \"{}\" in Ftrack".format(
os.environ["FTRACK_API_USER"]
)
)
return
source = {
"user": {
"id": user_entity["id"],
"username": user_entity["username"]
}
}
event_data = {
"actionIdentifier": "start.timer",
"selection": [{"entityId": entity["id"], "entityType": "task"}]
}
session.event_hub.publish(
_ftrack_api.event.base.Event(
topic="ftrack.action.launch",
data=event_data,
source=source
),
on_error="ignore"
)
self.log.debug("Timer start triggered successfully.")

26
pype/lib/deprecated.py Normal file
View file

@ -0,0 +1,26 @@
import os
from avalon import io
def get_avalon_database():
"""Mongo database used in avalon's io.
* Function is not used in pype 3.0 where was replaced with usage of
AvalonMongoDB.
"""
if io._database is None:
set_io_database()
return io._database
def set_io_database():
"""Set avalon's io context with environemnts.
* Function is not used in pype 3.0 where was replaced with usage of
AvalonMongoDB.
"""
required_keys = ["AVALON_PROJECT", "AVALON_ASSET", "AVALON_SILO"]
for key in required_keys:
os.environ[key] = os.environ.get(key, "")
io.install()

71
pype/lib/hooks.py Normal file
View file

@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-
"""Package containing code for handling hooks."""
import os
import sys
import types
import logging
from abc import ABCMeta, abstractmethod
import six
log = logging.getLogger(__name__)
@six.add_metaclass(ABCMeta)
class PypeHook:
"""Abstract class from all hooks should inherit."""
def __init__(self):
"""Constructor."""
pass
@abstractmethod
def execute(self, *args, **kwargs):
"""Abstract execute method."""
pass
def execute_hook(hook, *args, **kwargs):
"""Execute hook with arguments.
This will load hook file, instantiate class and call
:meth:`PypeHook.execute` method on it. Hook must be in a form::
$PYPE_SETUP_PATH/repos/pype/path/to/hook.py/HookClass
This will load `hook.py`, instantiate HookClass and then execute_hook
`execute(*args, **kwargs)`
Args:
hook (str): path to hook class.
"""
class_name = hook.split("/")[-1]
abspath = os.path.join(os.getenv('PYPE_SETUP_PATH'),
'repos', 'pype', *hook.split("/")[:-1])
mod_name, mod_ext = os.path.splitext(os.path.basename(abspath))
if not mod_ext == ".py":
return False
module = types.ModuleType(mod_name)
module.__file__ = abspath
try:
with open(abspath) as f:
six.exec_(f.read(), module.__dict__)
sys.modules[abspath] = module
except Exception as exp:
log.exception("loading hook failed: {}".format(exp),
exc_info=True)
return False
obj = getattr(module, class_name)
hook_obj = obj()
ret_val = hook_obj.execute(*args, **kwargs)
return ret_val

View file

@ -1,25 +1,15 @@
import os
import sys
import types
import re
import uuid
import json
import collections
import logging
import itertools
import copy
import contextlib
import subprocess
import getpass
import inspect
import acre
import platform
from abc import ABCMeta, abstractmethod
from avalon import io, pipeline
import six
import avalon.api
from .api import config, Anatomy, Logger
from ..api import config, Anatomy, Logger
log = logging.getLogger(__name__)
@ -474,11 +464,6 @@ def get_asset(asset_name=None):
return asset_document
def get_project():
io.install()
return io.find_one({"type": "project"})
def get_version_from_path(file):
"""
Finds version number in file path string
@ -538,66 +523,6 @@ def get_last_version_from_path(path_dir, filter):
return None
def get_avalon_database():
if io._database is None:
set_io_database()
return io._database
def set_io_database():
required_keys = ["AVALON_PROJECT", "AVALON_ASSET", "AVALON_SILO"]
for key in required_keys:
os.environ[key] = os.environ.get(key, "")
io.install()
def filter_pyblish_plugins(plugins):
"""
This servers as plugin filter / modifier for pyblish. It will load plugin
definitions from presets and filter those needed to be excluded.
:param plugins: Dictionary of plugins produced by :mod:`pyblish-base`
`discover()` method.
:type plugins: Dict
"""
from pyblish import api
host = api.current_host()
presets = config.get_presets().get('plugins', {})
# iterate over plugins
for plugin in plugins[:]:
# skip if there are no presets to process
if not presets:
continue
file = os.path.normpath(inspect.getsourcefile(plugin))
file = os.path.normpath(file)
# host determined from path
host_from_file = file.split(os.path.sep)[-3:-2][0]
plugin_kind = file.split(os.path.sep)[-2:-1][0]
try:
config_data = presets[host]["publish"][plugin.__name__]
except KeyError:
try:
config_data = presets[host_from_file][plugin_kind][plugin.__name__] # noqa: E501
except KeyError:
continue
for option, value in config_data.items():
if option == "enabled" and value is False:
log.info('removing plugin {}'.format(plugin.__name__))
plugins.remove(plugin)
else:
log.info('setting {}:{} on plugin {}'.format(
option, value, plugin.__name__))
setattr(plugin, option, value)
def get_subsets(asset_name,
regex_filter=None,
version=None,
@ -676,100 +601,6 @@ def get_subsets(asset_name,
return output_dict
class CustomNone:
"""Created object can be used as custom None (not equal to None).
WARNING: Multiple created objects are not equal either.
Exmple:
>>> a = CustomNone()
>>> a == None
False
>>> b = CustomNone()
>>> a == b
False
>>> a == a
True
"""
def __init__(self):
"""Create uuid as identifier for custom None."""
self.identifier = str(uuid.uuid4())
def __bool__(self):
"""Return False (like default None)."""
return False
def __eq__(self, other):
"""Equality is compared by identifier value."""
if type(other) == type(self):
if other.identifier == self.identifier:
return True
return False
def __str__(self):
"""Return value of identifier when converted to string."""
return self.identifier
def __repr__(self):
"""Representation of custom None."""
return "<CustomNone-{}>".format(str(self.identifier))
def execute_hook(hook, *args, **kwargs):
"""
This will load hook file, instantiate class and call `execute` method
on it. Hook must be in a form:
`$PYPE_SETUP_PATH/repos/pype/path/to/hook.py/HookClass`
This will load `hook.py`, instantiate HookClass and then execute_hook
`execute(*args, **kwargs)`
:param hook: path to hook class
:type hook: str
"""
class_name = hook.split("/")[-1]
abspath = os.path.join(os.getenv('PYPE_SETUP_PATH'),
'repos', 'pype', *hook.split("/")[:-1])
mod_name, mod_ext = os.path.splitext(os.path.basename(abspath))
if not mod_ext == ".py":
return False
module = types.ModuleType(mod_name)
module.__file__ = abspath
try:
with open(abspath) as f:
six.exec_(f.read(), module.__dict__)
sys.modules[abspath] = module
except Exception as exp:
log.exception("loading hook failed: {}".format(exp),
exc_info=True)
return False
obj = getattr(module, class_name)
hook_obj = obj()
ret_val = hook_obj.execute(*args, **kwargs)
return ret_val
@six.add_metaclass(ABCMeta)
class PypeHook:
def __init__(self):
pass
@abstractmethod
def execute(self, *args, **kwargs):
pass
def get_linked_assets(asset_entity):
"""Return linked assets for `asset_entity`."""
inputs = asset_entity["data"].get("inputs", [])
@ -777,20 +608,6 @@ def get_linked_assets(asset_entity):
return inputs
def map_subsets_by_family(subsets):
subsets_by_family = collections.defaultdict(list)
for subset in subsets:
family = subset["data"].get("family")
if not family:
families = subset["data"].get("families")
if not families:
continue
family = families[0]
subsets_by_family[family].append(subset)
return subsets_by_family
class BuildWorkfile:
"""Wrapper for build workfile process.
@ -798,6 +615,20 @@ class BuildWorkfile:
are host related, since each host has it's loaders.
"""
@staticmethod
def map_subsets_by_family(subsets):
subsets_by_family = collections.defaultdict(list)
for subset in subsets:
family = subset["data"].get("family")
if not family:
families = subset["data"].get("families")
if not families:
continue
family = families[0]
subsets_by_family[family].append(subset)
return subsets_by_family
def process(self):
"""Main method of this wrapper.
@ -1070,7 +901,7 @@ class BuildWorkfile:
:rtype: dict
"""
# Prepare subsets
subsets_by_family = map_subsets_by_family(subsets)
subsets_by_family = self.map_subsets_by_family(subsets)
profiles_per_subset_id = {}
for family, subsets in subsets_by_family.items():
@ -1544,372 +1375,3 @@ def get_latest_version(asset_name, subset_name, dbcon=None, project_name=None):
)
return None
return version_doc
class ApplicationLaunchFailed(Exception):
pass
def launch_application(project_name, asset_name, task_name, app_name):
database = get_avalon_database()
project_document = database[project_name].find_one({"type": "project"})
asset_document = database[project_name].find_one({
"type": "asset",
"name": asset_name
})
asset_doc_parents = asset_document["data"].get("parents")
hierarchy = "/".join(asset_doc_parents)
app_def = avalon.lib.get_application(app_name)
app_label = app_def.get("ftrack_label", app_def.get("label", app_name))
host_name = app_def["application_dir"]
data = {
"project": {
"name": project_document["name"],
"code": project_document["data"].get("code")
},
"task": task_name,
"asset": asset_name,
"app": host_name,
"hierarchy": hierarchy
}
try:
anatomy = Anatomy(project_name)
anatomy_filled = anatomy.format(data)
workdir = os.path.normpath(anatomy_filled["work"]["folder"])
except Exception as exc:
raise ApplicationLaunchFailed(
"Error in anatomy.format: {}".format(str(exc))
)
try:
os.makedirs(workdir)
except FileExistsError:
pass
last_workfile_path = None
extensions = avalon.api.HOST_WORKFILE_EXTENSIONS.get(host_name)
if extensions:
# Find last workfile
file_template = anatomy.templates["work"]["file"]
data.update({
"version": 1,
"user": os.environ.get("PYPE_USERNAME") or getpass.getuser(),
"ext": extensions[0]
})
last_workfile_path = avalon.api.last_workfile(
workdir, file_template, data, extensions, True
)
# set environments for Avalon
prep_env = copy.deepcopy(os.environ)
prep_env.update({
"AVALON_PROJECT": project_name,
"AVALON_ASSET": asset_name,
"AVALON_TASK": task_name,
"AVALON_APP": host_name,
"AVALON_APP_NAME": app_name,
"AVALON_HIERARCHY": hierarchy,
"AVALON_WORKDIR": workdir
})
start_last_workfile = avalon.api.should_start_last_workfile(
project_name, host_name, task_name
)
# Store boolean as "0"(False) or "1"(True)
prep_env["AVALON_OPEN_LAST_WORKFILE"] = (
str(int(bool(start_last_workfile)))
)
if (
start_last_workfile
and last_workfile_path
and os.path.exists(last_workfile_path)
):
prep_env["AVALON_LAST_WORKFILE"] = last_workfile_path
prep_env.update(anatomy.roots_obj.root_environments())
# collect all the 'environment' attributes from parents
tools_attr = [prep_env["AVALON_APP"], prep_env["AVALON_APP_NAME"]]
tools_env = asset_document["data"].get("tools_env") or []
tools_attr.extend(tools_env)
tools_env = acre.get_tools(tools_attr)
env = acre.compute(tools_env)
env = acre.merge(env, current_env=dict(prep_env))
# Get path to execute
st_temp_path = os.environ["PYPE_CONFIG"]
os_plat = platform.system().lower()
# Path to folder with launchers
path = os.path.join(st_temp_path, "launchers", os_plat)
# Full path to executable launcher
execfile = None
launch_hook = app_def.get("launch_hook")
if launch_hook:
log.info("launching hook: {}".format(launch_hook))
ret_val = execute_hook(launch_hook, env=env)
if not ret_val:
raise ApplicationLaunchFailed(
"Hook didn't finish successfully {}".format(app_label)
)
if sys.platform == "win32":
for ext in os.environ["PATHEXT"].split(os.pathsep):
fpath = os.path.join(path.strip('"'), app_def["executable"] + ext)
if os.path.isfile(fpath) and os.access(fpath, os.X_OK):
execfile = fpath
break
# Run SW if was found executable
if execfile is None:
raise ApplicationLaunchFailed(
"We didn't find launcher for {}".format(app_label)
)
popen = avalon.lib.launch(
executable=execfile, args=[], environment=env
)
elif (
sys.platform.startswith("linux")
or sys.platform.startswith("darwin")
):
execfile = os.path.join(path.strip('"'), app_def["executable"])
# Run SW if was found executable
if execfile is None:
raise ApplicationLaunchFailed(
"We didn't find launcher for {}".format(app_label)
)
if not os.path.isfile(execfile):
raise ApplicationLaunchFailed(
"Launcher doesn't exist - {}".format(execfile)
)
try:
fp = open(execfile)
except PermissionError as perm_exc:
raise ApplicationLaunchFailed(
"Access denied on launcher {} - {}".format(execfile, perm_exc)
)
fp.close()
# check executable permission
if not os.access(execfile, os.X_OK):
raise ApplicationLaunchFailed(
"No executable permission - {}".format(execfile)
)
popen = avalon.lib.launch( # noqa: F841
"/usr/bin/env", args=["bash", execfile], environment=env
)
return popen
class ApplicationAction(avalon.api.Action):
"""Default application launcher
This is a convenience application Action that when "config" refers to a
parsed application `.toml` this can launch the application.
"""
_log = None
config = None
group = None
variant = None
required_session_keys = (
"AVALON_PROJECT",
"AVALON_ASSET",
"AVALON_TASK"
)
@property
def log(self):
if self._log is None:
self._log = Logger().get_logger(self.__class__.__name__)
return self._log
def is_compatible(self, session):
for key in self.required_session_keys:
if key not in session:
return False
return True
def process(self, session, **kwargs):
"""Process the full Application action"""
project_name = session["AVALON_PROJECT"]
asset_name = session["AVALON_ASSET"]
task_name = session["AVALON_TASK"]
launch_application(
project_name, asset_name, task_name, self.name
)
self._ftrack_after_launch_procedure(
project_name, asset_name, task_name
)
def _ftrack_after_launch_procedure(
self, project_name, asset_name, task_name
):
# TODO move to launch hook
required_keys = ("FTRACK_SERVER", "FTRACK_API_USER", "FTRACK_API_KEY")
for key in required_keys:
if not os.environ.get(key):
self.log.debug((
"Missing required environment \"{}\""
" for Ftrack after launch procedure."
).format(key))
return
try:
import ftrack_api
session = ftrack_api.Session(auto_connect_event_hub=True)
self.log.debug("Ftrack session created")
except Exception:
self.log.warning("Couldn't create Ftrack session")
return
try:
entity = self._find_ftrack_task_entity(
session, project_name, asset_name, task_name
)
self._ftrack_status_change(session, entity, project_name)
self._start_timer(session, entity, ftrack_api)
except Exception:
self.log.warning(
"Couldn't finish Ftrack procedure.", exc_info=True
)
return
finally:
session.close()
def _find_ftrack_task_entity(
self, session, project_name, asset_name, task_name
):
project_entity = session.query(
"Project where full_name is \"{}\"".format(project_name)
).first()
if not project_entity:
self.log.warning(
"Couldn't find project \"{}\" in Ftrack.".format(project_name)
)
return
potential_task_entities = session.query((
"TypedContext where parent.name is \"{}\" and project_id is \"{}\""
).format(asset_name, project_entity["id"])).all()
filtered_entities = []
for _entity in potential_task_entities:
if (
_entity.entity_type.lower() == "task"
and _entity["name"] == task_name
):
filtered_entities.append(_entity)
if not filtered_entities:
self.log.warning((
"Couldn't find task \"{}\" under parent \"{}\" in Ftrack."
).format(task_name, asset_name))
return
if len(filtered_entities) > 1:
self.log.warning((
"Found more than one task \"{}\""
" under parent \"{}\" in Ftrack."
).format(task_name, asset_name))
return
return filtered_entities[0]
def _ftrack_status_change(self, session, entity, project_name):
presets = config.get_presets(project_name)["ftrack"]["ftrack_config"]
statuses = presets.get("status_update")
if not statuses:
return
actual_status = entity["status"]["name"].lower()
already_tested = set()
ent_path = "/".join(
[ent["name"] for ent in entity["link"]]
)
while True:
next_status_name = None
for key, value in statuses.items():
if key in already_tested:
continue
if actual_status in value or "_any_" in value:
if key != "_ignore_":
next_status_name = key
already_tested.add(key)
break
already_tested.add(key)
if next_status_name is None:
break
try:
query = "Status where name is \"{}\"".format(
next_status_name
)
status = session.query(query).one()
entity["status"] = status
session.commit()
self.log.debug("Changing status to \"{}\" <{}>".format(
next_status_name, ent_path
))
break
except Exception:
session.rollback()
msg = (
"Status \"{}\" in presets wasn't found"
" on Ftrack entity type \"{}\""
).format(next_status_name, entity.entity_type)
self.log.warning(msg)
def _start_timer(self, session, entity, _ftrack_api):
self.log.debug("Triggering timer start.")
user_entity = session.query("User where username is \"{}\"".format(
os.environ["FTRACK_API_USER"]
)).first()
if not user_entity:
self.log.warning(
"Couldn't find user with username \"{}\" in Ftrack".format(
os.environ["FTRACK_API_USER"]
)
)
return
source = {
"user": {
"id": user_entity["id"],
"username": user_entity["username"]
}
}
event_data = {
"actionIdentifier": "start.timer",
"selection": [{"entityId": entity["id"], "entityType": "task"}]
}
session.event_hub.publish(
_ftrack_api.event.base.Event(
topic="ftrack.action.launch",
data=event_data,
source=source
),
on_error="ignore"
)
self.log.debug("Timer start triggered successfully.")

59
pype/lib/plugin_tools.py Normal file
View file

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
"""Avalon/Pyblish plugin tools."""
import os
import inspect
import logging
from ..api import config
log = logging.getLogger(__name__)
def filter_pyblish_plugins(plugins):
"""Filter pyblish plugins by presets.
This servers as plugin filter / modifier for pyblish. It will load plugin
definitions from presets and filter those needed to be excluded.
Args:
plugins (dict): Dictionary of plugins produced by :mod:`pyblish-base`
`discover()` method.
"""
from pyblish import api
host = api.current_host()
presets = config.get_presets().get('plugins', {})
# iterate over plugins
for plugin in plugins[:]:
# skip if there are no presets to process
if not presets:
continue
file = os.path.normpath(inspect.getsourcefile(plugin))
file = os.path.normpath(file)
# host determined from path
host_from_file = file.split(os.path.sep)[-3:-2][0]
plugin_kind = file.split(os.path.sep)[-2:-1][0]
try:
config_data = presets[host]["publish"][plugin.__name__]
except KeyError:
try:
config_data = presets[host_from_file][plugin_kind][plugin.__name__] # noqa: E501
except KeyError:
continue
for option, value in config_data.items():
if option == "enabled" and value is False:
log.info('removing plugin {}'.format(plugin.__name__))
plugins.remove(plugin)
else:
log.info('setting {}:{} on plugin {}'.format(
option, value, plugin.__name__))
setattr(plugin, option, value)

View file

@ -16,3 +16,5 @@ class CollectCurrentFile(pyblish.api.ContextPlugin):
context.data["currentFile"] = os.path.normpath(
aftereffects.stub().get_active_document_full_name()
).replace("\\", "/")
#print(debug_pyblish_plugins_loading.discover())

View file

@ -1,15 +0,0 @@
from pype.lib import RenderInstance, AbstractCollectRender
import pyblish.api
from avalon import aftereffects
class CollectRender(AbstractCollectRender):
order = pyblish.api.CollectorOrder + 0.01
hosts = ["maya"]
label = "Collect Render Layers"
sync_workfile_version = False
def get_instances(self):
print("hello")
return aftereffects.stub().get_metadata()

View file

@ -1,4 +1,4 @@
from pype.lib import AbstractSubmitDeadline, DeadlineJobInfo
from pype.lib import abstract_submit_deadline, DeadlineJobInfo
from abc import ABCMeta, abstractmethod
import pyblish.api
import os
@ -17,7 +17,7 @@ class DeadlinePluginInfo():
@six.add_metaclass(ABCMeta)
class AfterEffectsSubmitDeadline(AbstractSubmitDeadline):
class AfterEffectsSubmitDeadline(abstract_submit_deadline.AbstractSubmitDeadline):
label = "Submit to Deadline"
order = pyblish.api.IntegratorOrder + 0.1

4
pype/tests/README.md Normal file
View file

@ -0,0 +1,4 @@
Tests for Pype
--------------
Trigger by:
`pype test --pype`

View file

@ -0,0 +1,20 @@
# Test for backward compatibility of restructure of lib.py into lib library
# Contains simple imports that should still work
def test_backward_compatibility(printer):
printer("Test if imports still work")
try:
from pype.lib import filter_pyblish_plugins
from pype.lib import execute_hook
from pype.lib import PypeHook
from pype.lib import get_latest_version
from pype.lib import ApplicationLaunchFailed
from pype.lib import launch_application
from pype.lib import ApplicationAction
from pype.lib import get_avalon_database
from pype.lib import set_io_database
except ImportError as e:
raise