mirror of
https://github.com/ynput/ayon-core.git
synced 2026-01-01 16:34:53 +01:00
1341 lines
43 KiB
Python
1341 lines
43 KiB
Python
import os
|
|
import sys
|
|
import re
|
|
import getpass
|
|
import json
|
|
import copy
|
|
import platform
|
|
import logging
|
|
import subprocess
|
|
|
|
import acre
|
|
|
|
import avalon.lib
|
|
import avalon.api
|
|
|
|
from ..api import (
|
|
Anatomy,
|
|
Logger,
|
|
config,
|
|
system_settings,
|
|
environments
|
|
)
|
|
from .hooks import execute_hook
|
|
from .deprecated import get_avalon_database
|
|
from .env_tools import env_value_to_bool
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class ApplicationNotFound(Exception):
|
|
"""Application was not found in ApplicationManager by name."""
|
|
|
|
def __init__(self, app_name):
|
|
self.app_name = app_name
|
|
super(ApplicationNotFound, self).__init__(
|
|
"Application \"{}\" was not found.".format(app_name)
|
|
)
|
|
|
|
|
|
class ApplictionExecutableNotFound(Exception):
|
|
"""Defined executable paths are not available on the machine."""
|
|
|
|
def __init__(self, application):
|
|
self.application = application
|
|
details = None
|
|
if not application.executables:
|
|
msg = (
|
|
"Executable paths for application \"{}\"({}) are not set."
|
|
)
|
|
else:
|
|
msg = (
|
|
"Defined executable paths for application \"{}\"({})"
|
|
" are not available on this machine."
|
|
)
|
|
details = "Defined paths:"
|
|
for executable_path in application.executables:
|
|
details += "\n- " + executable_path
|
|
|
|
self.msg = msg.format(application.full_label, application.app_name)
|
|
self.details = details
|
|
|
|
exc_mgs = str(self.msg)
|
|
if details:
|
|
# Is good idea to pass new line symbol to exception message?
|
|
exc_mgs += "\n" + details
|
|
self.exc_msg = exc_mgs
|
|
super(ApplictionExecutableNotFound, self).__init__(exc_mgs)
|
|
|
|
|
|
class ApplicationLaunchFailed(Exception):
|
|
"""Application launch failed due to known reason.
|
|
|
|
Message should be self explanatory as traceback won't be shown.
|
|
"""
|
|
pass
|
|
|
|
|
|
def compile_list_of_regexes(in_list):
|
|
"""Convert strings in entered list to compiled regex objects."""
|
|
regexes = list()
|
|
if not in_list:
|
|
return regexes
|
|
|
|
for item in in_list:
|
|
if item:
|
|
try:
|
|
regexes.append(re.compile(item))
|
|
except TypeError:
|
|
print((
|
|
"Invalid type \"{}\" value \"{}\"."
|
|
" Expected string based object. Skipping."
|
|
).format(str(type(item)), str(item)))
|
|
return regexes
|
|
|
|
|
|
def launch_application(project_name, asset_name, task_name, app_name):
|
|
"""Launch host application with filling required environments.
|
|
|
|
TODO(iLLiCiT): This should be split into more parts.
|
|
"""
|
|
# `get_avalon_database` is in Pype 3 replaced with using `AvalonMongoDB`
|
|
import acre
|
|
import avalon.lib
|
|
|
|
database = get_avalon_database()
|
|
project_document = database[project_name].find_one({"type": "project"})
|
|
asset_document = database[project_name].find_one({
|
|
"type": "asset",
|
|
"name": asset_name
|
|
})
|
|
|
|
asset_doc_parents = asset_document["data"].get("parents")
|
|
hierarchy = "/".join(asset_doc_parents)
|
|
|
|
app_def = avalon.lib.get_application(app_name)
|
|
app_label = app_def.get("ftrack_label", app_def.get("label", app_name))
|
|
|
|
host_name = app_def["application_dir"]
|
|
# Workfile data collection may be special function?
|
|
data = {
|
|
"project": {
|
|
"name": project_document["name"],
|
|
"code": project_document["data"].get("code")
|
|
},
|
|
"task": task_name,
|
|
"asset": asset_name,
|
|
"app": host_name,
|
|
"hierarchy": hierarchy
|
|
}
|
|
|
|
try:
|
|
anatomy = Anatomy(project_name)
|
|
anatomy_filled = anatomy.format(data)
|
|
workdir = os.path.normpath(anatomy_filled["work"]["folder"])
|
|
|
|
except Exception as exc:
|
|
raise ApplicationLaunchFailed(
|
|
"Error in anatomy.format: {}".format(str(exc))
|
|
)
|
|
|
|
try:
|
|
os.makedirs(workdir)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
last_workfile_path = None
|
|
extensions = avalon.api.HOST_WORKFILE_EXTENSIONS.get(host_name)
|
|
if extensions:
|
|
# Find last workfile
|
|
file_template = anatomy.templates["work"]["file"]
|
|
data.update({
|
|
"version": 1,
|
|
"user": os.environ.get("PYPE_USERNAME") or getpass.getuser(),
|
|
"ext": extensions[0]
|
|
})
|
|
|
|
last_workfile_path = avalon.api.last_workfile(
|
|
workdir, file_template, data, extensions, True
|
|
)
|
|
|
|
# set environments for Avalon
|
|
prep_env = copy.deepcopy(os.environ)
|
|
prep_env.update({
|
|
"AVALON_PROJECT": project_name,
|
|
"AVALON_ASSET": asset_name,
|
|
"AVALON_TASK": task_name,
|
|
"AVALON_APP": host_name,
|
|
"AVALON_APP_NAME": app_name,
|
|
"AVALON_HIERARCHY": hierarchy,
|
|
"AVALON_WORKDIR": workdir
|
|
})
|
|
|
|
start_last_workfile = avalon.api.should_start_last_workfile(
|
|
project_name, host_name, task_name
|
|
)
|
|
# Store boolean as "0"(False) or "1"(True)
|
|
prep_env["AVALON_OPEN_LAST_WORKFILE"] = (
|
|
str(int(bool(start_last_workfile)))
|
|
)
|
|
|
|
if (
|
|
start_last_workfile
|
|
and last_workfile_path
|
|
and os.path.exists(last_workfile_path)
|
|
):
|
|
prep_env["AVALON_LAST_WORKFILE"] = last_workfile_path
|
|
|
|
prep_env.update(anatomy.roots_obj.root_environments())
|
|
|
|
# collect all the 'environment' attributes from parents
|
|
tools_attr = [prep_env["AVALON_APP"], prep_env["AVALON_APP_NAME"]]
|
|
tools_env = asset_document["data"].get("tools_env") or []
|
|
tools_attr.extend(tools_env)
|
|
|
|
tools_env = acre.get_tools(tools_attr)
|
|
env = acre.compute(tools_env)
|
|
env = acre.merge(env, current_env=dict(prep_env))
|
|
|
|
# Get path to execute
|
|
st_temp_path = os.environ["PYPE_CONFIG"]
|
|
os_plat = platform.system().lower()
|
|
|
|
# Path to folder with launchers
|
|
path = os.path.join(st_temp_path, "launchers", os_plat)
|
|
|
|
# Full path to executable launcher
|
|
execfile = None
|
|
|
|
launch_hook = app_def.get("launch_hook")
|
|
if launch_hook:
|
|
log.info("launching hook: {}".format(launch_hook))
|
|
ret_val = execute_hook(launch_hook, env=env)
|
|
if not ret_val:
|
|
raise ApplicationLaunchFailed(
|
|
"Hook didn't finish successfully {}".format(app_label)
|
|
)
|
|
|
|
if sys.platform == "win32":
|
|
for ext in os.environ["PATHEXT"].split(os.pathsep):
|
|
fpath = os.path.join(path.strip('"'), app_def["executable"] + ext)
|
|
if os.path.isfile(fpath) and os.access(fpath, os.X_OK):
|
|
execfile = fpath
|
|
break
|
|
|
|
# Run SW if was found executable
|
|
if execfile is None:
|
|
raise ApplicationLaunchFailed(
|
|
"We didn't find launcher for {}".format(app_label)
|
|
)
|
|
|
|
popen = avalon.lib.launch(
|
|
executable=execfile, args=[], environment=env
|
|
)
|
|
|
|
elif (
|
|
sys.platform.startswith("linux")
|
|
or sys.platform.startswith("darwin")
|
|
):
|
|
execfile = os.path.join(path.strip('"'), app_def["executable"])
|
|
# Run SW if was found executable
|
|
if execfile is None:
|
|
raise ApplicationLaunchFailed(
|
|
"We didn't find launcher for {}".format(app_label)
|
|
)
|
|
|
|
if not os.path.isfile(execfile):
|
|
raise ApplicationLaunchFailed(
|
|
"Launcher doesn't exist - {}".format(execfile)
|
|
)
|
|
|
|
try:
|
|
fp = open(execfile)
|
|
except PermissionError as perm_exc:
|
|
raise ApplicationLaunchFailed(
|
|
"Access denied on launcher {} - {}".format(execfile, perm_exc)
|
|
)
|
|
|
|
fp.close()
|
|
# check executable permission
|
|
if not os.access(execfile, os.X_OK):
|
|
raise ApplicationLaunchFailed(
|
|
"No executable permission - {}".format(execfile)
|
|
)
|
|
|
|
popen = avalon.lib.launch( # noqa: F841
|
|
"/usr/bin/env", args=["bash", execfile], environment=env
|
|
)
|
|
return popen
|
|
|
|
|
|
class ApplicationAction:
|
|
"""Default application launcher
|
|
|
|
This is a convenience application Action that when "config" refers to a
|
|
parsed application `.toml` this can launch the application.
|
|
|
|
"""
|
|
_log = None
|
|
config = None
|
|
group = None
|
|
variant = None
|
|
required_session_keys = (
|
|
"AVALON_PROJECT",
|
|
"AVALON_ASSET",
|
|
"AVALON_TASK"
|
|
)
|
|
|
|
@property
|
|
def log(self):
|
|
if self._log is None:
|
|
self._log = logging.getLogger(self.__class__.__name__)
|
|
return self._log
|
|
|
|
def is_compatible(self, session):
|
|
for key in self.required_session_keys:
|
|
if key not in session:
|
|
return False
|
|
return True
|
|
|
|
def process(self, session, **kwargs):
|
|
"""Process the full Application action"""
|
|
|
|
project_name = session["AVALON_PROJECT"]
|
|
asset_name = session["AVALON_ASSET"]
|
|
task_name = session["AVALON_TASK"]
|
|
launch_application(
|
|
project_name, asset_name, task_name, self.name
|
|
)
|
|
|
|
self._ftrack_after_launch_procedure(
|
|
project_name, asset_name, task_name
|
|
)
|
|
|
|
def _ftrack_after_launch_procedure(
|
|
self, project_name, asset_name, task_name
|
|
):
|
|
# TODO move to launch hook
|
|
required_keys = ("FTRACK_SERVER", "FTRACK_API_USER", "FTRACK_API_KEY")
|
|
for key in required_keys:
|
|
if not os.environ.get(key):
|
|
self.log.debug((
|
|
"Missing required environment \"{}\""
|
|
" for Ftrack after launch procedure."
|
|
).format(key))
|
|
return
|
|
|
|
try:
|
|
import ftrack_api
|
|
session = ftrack_api.Session(auto_connect_event_hub=True)
|
|
self.log.debug("Ftrack session created")
|
|
except Exception:
|
|
self.log.warning("Couldn't create Ftrack session")
|
|
return
|
|
|
|
try:
|
|
entity = self._find_ftrack_task_entity(
|
|
session, project_name, asset_name, task_name
|
|
)
|
|
self._ftrack_status_change(session, entity, project_name)
|
|
self._start_timer(session, entity, ftrack_api)
|
|
except Exception:
|
|
self.log.warning(
|
|
"Couldn't finish Ftrack procedure.", exc_info=True
|
|
)
|
|
return
|
|
|
|
finally:
|
|
session.close()
|
|
|
|
def _find_ftrack_task_entity(
|
|
self, session, project_name, asset_name, task_name
|
|
):
|
|
project_entity = session.query(
|
|
"Project where full_name is \"{}\"".format(project_name)
|
|
).first()
|
|
if not project_entity:
|
|
self.log.warning(
|
|
"Couldn't find project \"{}\" in Ftrack.".format(project_name)
|
|
)
|
|
return
|
|
|
|
potential_task_entities = session.query((
|
|
"TypedContext where parent.name is \"{}\" and project_id is \"{}\""
|
|
).format(asset_name, project_entity["id"])).all()
|
|
filtered_entities = []
|
|
for _entity in potential_task_entities:
|
|
if (
|
|
_entity.entity_type.lower() == "task"
|
|
and _entity["name"] == task_name
|
|
):
|
|
filtered_entities.append(_entity)
|
|
|
|
if not filtered_entities:
|
|
self.log.warning((
|
|
"Couldn't find task \"{}\" under parent \"{}\" in Ftrack."
|
|
).format(task_name, asset_name))
|
|
return
|
|
|
|
if len(filtered_entities) > 1:
|
|
self.log.warning((
|
|
"Found more than one task \"{}\""
|
|
" under parent \"{}\" in Ftrack."
|
|
).format(task_name, asset_name))
|
|
return
|
|
|
|
return filtered_entities[0]
|
|
|
|
def _ftrack_status_change(self, session, entity, project_name):
|
|
presets = config.get_presets(project_name)["ftrack"]["ftrack_config"]
|
|
statuses = presets.get("status_update")
|
|
if not statuses:
|
|
return
|
|
|
|
actual_status = entity["status"]["name"].lower()
|
|
already_tested = set()
|
|
ent_path = "/".join(
|
|
[ent["name"] for ent in entity["link"]]
|
|
)
|
|
while True:
|
|
next_status_name = None
|
|
for key, value in statuses.items():
|
|
if key in already_tested:
|
|
continue
|
|
if actual_status in value or "_any_" in value:
|
|
if key != "_ignore_":
|
|
next_status_name = key
|
|
already_tested.add(key)
|
|
break
|
|
already_tested.add(key)
|
|
|
|
if next_status_name is None:
|
|
break
|
|
|
|
try:
|
|
query = "Status where name is \"{}\"".format(
|
|
next_status_name
|
|
)
|
|
status = session.query(query).one()
|
|
|
|
entity["status"] = status
|
|
session.commit()
|
|
self.log.debug("Changing status to \"{}\" <{}>".format(
|
|
next_status_name, ent_path
|
|
))
|
|
break
|
|
|
|
except Exception:
|
|
session.rollback()
|
|
msg = (
|
|
"Status \"{}\" in presets wasn't found"
|
|
" on Ftrack entity type \"{}\""
|
|
).format(next_status_name, entity.entity_type)
|
|
self.log.warning(msg)
|
|
|
|
def _start_timer(self, session, entity, _ftrack_api):
|
|
self.log.debug("Triggering timer start.")
|
|
|
|
user_entity = session.query("User where username is \"{}\"".format(
|
|
os.environ["FTRACK_API_USER"]
|
|
)).first()
|
|
if not user_entity:
|
|
self.log.warning(
|
|
"Couldn't find user with username \"{}\" in Ftrack".format(
|
|
os.environ["FTRACK_API_USER"]
|
|
)
|
|
)
|
|
return
|
|
|
|
source = {
|
|
"user": {
|
|
"id": user_entity["id"],
|
|
"username": user_entity["username"]
|
|
}
|
|
}
|
|
event_data = {
|
|
"actionIdentifier": "start.timer",
|
|
"selection": [{"entityId": entity["id"], "entityType": "task"}]
|
|
}
|
|
session.event_hub.publish(
|
|
_ftrack_api.event.base.Event(
|
|
topic="ftrack.action.launch",
|
|
data=event_data,
|
|
source=source
|
|
),
|
|
on_error="ignore"
|
|
)
|
|
self.log.debug("Timer start triggered successfully.")
|
|
|
|
|
|
# Special naming case for subprocess since its a built-in method.
|
|
def _subprocess(*args, **kwargs):
|
|
"""Convenience method for getting output errors for subprocess.
|
|
|
|
Entered arguments and keyword arguments are passed to subprocess Popen.
|
|
|
|
Args:
|
|
*args: Variable length arument list passed to Popen.
|
|
**kwargs : Arbitary keyword arguments passed to Popen. Is possible to
|
|
pass `logging.Logger` object under "logger" if want to use
|
|
different than lib's logger.
|
|
|
|
Returns:
|
|
str: Full output of subprocess concatenated stdout and stderr.
|
|
|
|
Raises:
|
|
RuntimeError: Exception is raised if process finished with nonzero
|
|
return code.
|
|
"""
|
|
|
|
# Get environents from kwarg or use current process environments if were
|
|
# not passed.
|
|
env = kwargs.get("env") or os.environ
|
|
# Make sure environment contains only strings
|
|
filtered_env = {k: str(v) for k, v in env.items()}
|
|
|
|
# Use lib's logger if was not passed with kwargs.
|
|
logger = kwargs.pop("logger", log)
|
|
|
|
# set overrides
|
|
kwargs['stdout'] = kwargs.get('stdout', subprocess.PIPE)
|
|
kwargs['stderr'] = kwargs.get('stderr', subprocess.PIPE)
|
|
kwargs['stdin'] = kwargs.get('stdin', subprocess.PIPE)
|
|
kwargs['env'] = filtered_env
|
|
|
|
proc = subprocess.Popen(*args, **kwargs)
|
|
|
|
full_output = ""
|
|
_stdout, _stderr = proc.communicate()
|
|
if _stdout:
|
|
_stdout = _stdout.decode("utf-8")
|
|
full_output += _stdout
|
|
logger.debug(_stdout)
|
|
|
|
if _stderr:
|
|
_stderr = _stderr.decode("utf-8")
|
|
# Add additional line break if output already containt stdout
|
|
if full_output:
|
|
full_output += "\n"
|
|
full_output += _stderr
|
|
logger.warning(_stderr)
|
|
|
|
if proc.returncode != 0:
|
|
exc_msg = "Executing arguments was not successful: \"{}\"".format(args)
|
|
if _stdout:
|
|
exc_msg += "\n\nOutput:\n{}".format(_stdout)
|
|
|
|
if _stderr:
|
|
exc_msg += "Error:\n{}".format(_stderr)
|
|
|
|
raise RuntimeError(exc_msg)
|
|
|
|
return full_output
|
|
|
|
|
|
class ApplicationManager:
|
|
def __init__(self):
|
|
self.log = Logger().get_logger(self.__class__.__name__)
|
|
|
|
self.applications = {}
|
|
self.tools = {}
|
|
|
|
self.refresh()
|
|
|
|
def refresh(self):
|
|
"""Refresh applications from settings."""
|
|
settings = system_settings()
|
|
|
|
hosts_definitions = settings["global"]["applications"]
|
|
for host_name, variant_definitions in hosts_definitions.items():
|
|
enabled = variant_definitions["enabled"]
|
|
label = variant_definitions.get("label") or host_name
|
|
variants = variant_definitions.get("variants") or {}
|
|
icon = variant_definitions.get("icon")
|
|
is_host = variant_definitions.get("is_host", False)
|
|
for app_name, app_data in variants.items():
|
|
if app_name in self.applications:
|
|
raise AssertionError((
|
|
"BUG: Duplicated application name in settings \"{}\""
|
|
).format(app_name))
|
|
|
|
# If host is disabled then disable all variants
|
|
if not enabled:
|
|
app_data["enabled"] = enabled
|
|
|
|
# Pass label from host definition
|
|
if not app_data.get("label"):
|
|
app_data["label"] = label
|
|
|
|
if not app_data.get("icon"):
|
|
app_data["icon"] = icon
|
|
|
|
is_host = app_data.get("is_host", is_host)
|
|
app_data["is_host"] = is_host
|
|
|
|
self.applications[app_name] = Application(
|
|
host_name, app_name, app_data, self
|
|
)
|
|
|
|
tools_definitions = settings["global"]["tools"]
|
|
for tool_group_name, tool_group_data in tools_definitions.items():
|
|
enabled = tool_group_data.get("enabled", True)
|
|
tool_variants = tool_group_data.get("variants") or {}
|
|
for tool_name, tool_data in tool_variants.items():
|
|
if tool_name in self.tools:
|
|
self.log.warning((
|
|
"Duplicated tool name in settings \"{}\""
|
|
).format(tool_name))
|
|
|
|
_enabled = tool_data.get("enabled", enabled)
|
|
self.tools[tool_name] = ApplicationTool(
|
|
tool_name, tool_group_name, _enabled
|
|
)
|
|
|
|
def launch(self, app_name, **data):
|
|
"""Launch procedure.
|
|
|
|
For host application it's expected to contain "project_name",
|
|
"asset_name" and "task_name".
|
|
|
|
Args:
|
|
app_name (str): Name of application that should be launched.
|
|
**data (dict): Any additional data. Data may be used during
|
|
preparation to store objects usable in multiple places.
|
|
|
|
Raises:
|
|
ApplicationNotFound: Application was not found by entered
|
|
argument `app_name`.
|
|
ApplictionExecutableNotFound: Executables in application definition
|
|
were not found on this machine.
|
|
ApplicationLaunchFailed: Something important for application launch
|
|
failed. Exception should contain explanation message,
|
|
traceback should not be needed.
|
|
"""
|
|
app = self.applications.get(app_name)
|
|
if not app:
|
|
raise ApplicationNotFound(app_name)
|
|
|
|
executable = app.find_executable()
|
|
if not executable:
|
|
raise ApplictionExecutableNotFound(app)
|
|
|
|
context = ApplicationLaunchContext(
|
|
app, executable, **data
|
|
)
|
|
# TODO pass context through launch hooks
|
|
return context.launch()
|
|
|
|
|
|
class ApplicationTool:
|
|
"""Hold information about application tool.
|
|
|
|
Structure of tool information.
|
|
|
|
Args:
|
|
tool_name (str): Name of the tool.
|
|
group_name (str): Name of group which wraps tool.
|
|
enabled (bool): Is tool enabled by studio.
|
|
"""
|
|
|
|
def __init__(self, tool_name, group_name, enabled):
|
|
self.name = tool_name
|
|
self.group_name = group_name
|
|
self.enabled = enabled
|
|
|
|
def __bool__(self):
|
|
return self.enabled
|
|
|
|
|
|
class Application:
|
|
"""Hold information about application.
|
|
|
|
Object by itself does nothing special.
|
|
|
|
Args:
|
|
host_name (str): Host name or rather name of host implementation.
|
|
e.g. "maya", "nuke", "photoshop", etc.
|
|
app_name (str): Specific version (or variant) of host.
|
|
e.g. "maya2020", "nuke11.3", etc.
|
|
app_data (dict): Data for the version containing information about
|
|
executables, label, variant label, icon or if is enabled.
|
|
Only required key is `executables`.
|
|
manager (ApplicationManager): Application manager that created object.
|
|
"""
|
|
|
|
def __init__(self, host_name, app_name, app_data, manager):
|
|
self.host_name = host_name
|
|
self.app_name = app_name
|
|
self.app_data = app_data
|
|
self.manager = manager
|
|
|
|
self.label = app_data.get("label") or app_name
|
|
self.variant_label = app_data.get("variant_label") or None
|
|
self.icon = app_data.get("icon") or None
|
|
self.enabled = app_data.get("enabled", True)
|
|
self.is_host = app_data.get("is_host", False)
|
|
|
|
executables = app_data["executables"]
|
|
if isinstance(executables, dict):
|
|
executables = executables.get(platform.system().lower()) or []
|
|
|
|
if not isinstance(executables, list):
|
|
executables = [executables]
|
|
self.executables = executables
|
|
|
|
@property
|
|
def full_label(self):
|
|
"""Full label of application.
|
|
|
|
Concatenate `label` and `variant_label` attributes if `variant_label`
|
|
is set.
|
|
"""
|
|
if self.variant_label:
|
|
return "{} {}".format(self.label, self.variant_label)
|
|
return str(self.label)
|
|
|
|
def find_executable(self):
|
|
"""Try to find existing executable for application.
|
|
|
|
Returns (str): Path to executable from `executables` or None if any
|
|
exists.
|
|
"""
|
|
for executable_path in self.executables:
|
|
if os.path.exists(executable_path):
|
|
return executable_path
|
|
return None
|
|
|
|
def launch(self, *args, **kwargs):
|
|
"""Launch the application.
|
|
|
|
For this purpose is used manager's launch method to keep logic at one
|
|
place.
|
|
|
|
Arguments must match with manager's launch method. That's why *args
|
|
**kwargs are used.
|
|
|
|
Returns:
|
|
subprocess.Popen: Return executed process as Popen object.
|
|
"""
|
|
return self.manager.launch(self.app_name, *args, **kwargs)
|
|
|
|
|
|
class ApplicationLaunchContext:
|
|
"""Context of launching application.
|
|
|
|
Main purpose of context is to prepare launch arguments and keword arguments
|
|
for new process. Most important part of keyword arguments preparations
|
|
are environment variables.
|
|
|
|
During the whole process is possible to use `data` attribute to store
|
|
object usable in multiple places.
|
|
|
|
Launch arguments are strings in list. It is possible to "chain" argument
|
|
when order of them matters. That is possible to do with adding list where
|
|
order is right and should not change.
|
|
NOTE: This is recommendation, not requirement.
|
|
e.g.: `["nuke.exe", "--NukeX"]` -> In this case any part of process may
|
|
insert argument between `nuke.exe` and `--NukeX`. To keep them together
|
|
it is better to wrap them in another list: `[["nuke.exe", "--NukeX"]]`.
|
|
|
|
Args:
|
|
application (Application): Application definition.
|
|
executable (str): Path to executable.
|
|
**data (dict): Any additional data. Data may be used during
|
|
preparation to store objects usable in multiple places.
|
|
"""
|
|
|
|
def __init__(self, application, executable, **data):
|
|
# Application object
|
|
self.application = application
|
|
|
|
# Logger
|
|
logger_name = "{}-{}".format(self.__class__.__name__, self.app_name)
|
|
self.log = Logger().get_logger(logger_name)
|
|
|
|
self.executable = executable
|
|
|
|
self.data = dict(data)
|
|
|
|
# Handle launch environemtns
|
|
passed_env = self.data.pop("env", None)
|
|
if passed_env is None:
|
|
env = os.environ
|
|
else:
|
|
env = passed_env
|
|
self.env = copy.deepcopy(env)
|
|
|
|
# Load settings if were not passed in data
|
|
settings_env = self.data.get("settings_env")
|
|
if settings_env is None:
|
|
settings_env = environments()
|
|
self.data["settings_env"] = settings_env
|
|
|
|
# subprocess.Popen launch arguments (first argument in constructor)
|
|
self.launch_args = [executable]
|
|
# subprocess.Popen keyword arguments
|
|
self.kwargs = {
|
|
"env": self.env
|
|
}
|
|
|
|
if platform.system().lower() == "windows":
|
|
# Detach new process from currently running process on Windows
|
|
flags = (
|
|
subprocess.CREATE_NEW_PROCESS_GROUP
|
|
| subprocess.DETACHED_PROCESS
|
|
)
|
|
self.kwargs["creationflags"] = flags
|
|
|
|
self.process = None
|
|
|
|
# TODO move these to pre-paunch hook
|
|
self.prepare_global_data()
|
|
self.prepare_host_environments()
|
|
self.prepare_context_environments()
|
|
|
|
@property
|
|
def app_name(self):
|
|
return self.application.app_name
|
|
|
|
@property
|
|
def host_name(self):
|
|
return self.application.host_name
|
|
|
|
@property
|
|
def manager(self):
|
|
return self.application.manager
|
|
|
|
def launch(self):
|
|
"""Collect data for new process and then create it.
|
|
|
|
This method must not be executed more than once.
|
|
|
|
Returns:
|
|
subprocess.Popen: Created process as Popen object.
|
|
"""
|
|
if self.process is not None:
|
|
self.log.warning("Application was already launched.")
|
|
return
|
|
|
|
args = self.clear_launch_args(self.launch_args)
|
|
self.log.debug(
|
|
"Launching \"{}\" with args: {}".format(self.app_name, args)
|
|
)
|
|
self.process = subprocess.Popen(args, **self.kwargs)
|
|
|
|
# TODO do this with after-launch hooks
|
|
try:
|
|
self.after_launch_procedures()
|
|
except Exception:
|
|
self.log.warning(
|
|
"After launch procedures were not successful.",
|
|
exc_info=True
|
|
)
|
|
|
|
return self.process
|
|
|
|
@staticmethod
|
|
def clear_launch_args(args):
|
|
"""Collect launch arguments to final order.
|
|
|
|
Launch argument should be list that may contain another lists this
|
|
function will upack inner lists and keep ordering.
|
|
|
|
```
|
|
# source
|
|
[ [ arg1, [ arg2, arg3 ] ], arg4, [arg5, arg6]]
|
|
# result
|
|
[ arg1, arg2, arg3, arg4, arg5, arg6]
|
|
|
|
Args:
|
|
args (list): Source arguments in list may contain inner lists.
|
|
|
|
Return:
|
|
list: Unpacked arguments.
|
|
"""
|
|
while True:
|
|
all_cleared = True
|
|
new_args = []
|
|
for arg in args:
|
|
if isinstance(arg, (list, tuple, set)):
|
|
all_cleared = False
|
|
for _arg in arg:
|
|
new_args.append(_arg)
|
|
else:
|
|
new_args.append(args)
|
|
args = new_args
|
|
|
|
if all_cleared:
|
|
break
|
|
return args
|
|
|
|
def prepare_global_data(self):
|
|
"""Prepare global objects to `data` that will be used for sure."""
|
|
# Mongo documents
|
|
project_name = self.data.get("project_name")
|
|
if not project_name:
|
|
self.log.info(
|
|
"Skipping global data preparation."
|
|
" Key `project_name` was not found in launch context."
|
|
)
|
|
return
|
|
|
|
self.log.debug("Project name is set to \"{}\"".format(project_name))
|
|
# Anatomy
|
|
self.data["anatomy"] = Anatomy(project_name)
|
|
|
|
# Mongo connection
|
|
dbcon = avalon.api.AvalonMongoDB()
|
|
dbcon.Session["AVALON_PROJECT"] = project_name
|
|
dbcon.install()
|
|
|
|
self.data["dbcon"] = dbcon
|
|
|
|
# Project document
|
|
project_doc = dbcon.find_one({"type": "project"})
|
|
self.data["project_doc"] = project_doc
|
|
|
|
asset_name = self.data.get("asset_name")
|
|
if not asset_name:
|
|
self.log.warning(
|
|
"Asset name was not set. Skipping asset document query."
|
|
)
|
|
return
|
|
|
|
asset_doc = dbcon.find_one({
|
|
"type": "asset",
|
|
"name": asset_name
|
|
})
|
|
self.data["asset_doc"] = asset_doc
|
|
|
|
def _merge_env(self, env, current_env):
|
|
"""Modified function(merge) from acre module."""
|
|
result = current_env.copy()
|
|
for key, value in env.items():
|
|
# Keep missing keys by not filling `missing` kwarg
|
|
value = acre.lib.partial_format(value, data=current_env)
|
|
result[key] = value
|
|
return result
|
|
|
|
def prepare_host_environments(self):
|
|
"""Modify launch environments based on launched app and context."""
|
|
# Keys for getting environments
|
|
env_keys = [self.host_name, self.app_name]
|
|
|
|
asset_doc = self.data.get("asset_doc")
|
|
if asset_doc:
|
|
# Add tools environments
|
|
for key in asset_doc["data"].get("tools_env") or []:
|
|
tool = self.manager.tools.get(key)
|
|
if tool:
|
|
if tool.group_name not in env_keys:
|
|
env_keys.append(tool.group_name)
|
|
|
|
if tool.name not in env_keys:
|
|
env_keys.append(tool.name)
|
|
|
|
self.log.debug(
|
|
"Finding environment groups for keys: {}".format(env_keys)
|
|
)
|
|
|
|
settings_env = self.data["settings_env"]
|
|
env_values = {}
|
|
for env_key in env_keys:
|
|
_env_values = settings_env.get(env_key)
|
|
if not _env_values:
|
|
continue
|
|
|
|
# Choose right platform
|
|
tool_env = acre.parse(_env_values)
|
|
# Merge dictionaries
|
|
env_values = self._merge_env(tool_env, env_values)
|
|
|
|
final_env = self._merge_env(acre.compute(env_values), self.env)
|
|
|
|
# Update env
|
|
self.env.update(final_env)
|
|
|
|
def prepare_context_environments(self):
|
|
"""Modify launch environemnts with context data for launched host."""
|
|
# Context environments
|
|
project_doc = self.data.get("project_doc")
|
|
asset_doc = self.data.get("asset_doc")
|
|
task_name = self.data.get("task_name")
|
|
if (
|
|
not project_doc
|
|
or not asset_doc
|
|
or not task_name
|
|
):
|
|
self.log.info(
|
|
"Skipping context environments preparation."
|
|
" Launch context does not contain required data."
|
|
)
|
|
return
|
|
|
|
workdir_data = self._prepare_workdir_data(
|
|
project_doc, asset_doc, task_name
|
|
)
|
|
self.data["workdir_data"] = workdir_data
|
|
|
|
hierarchy = workdir_data["hierarchy"]
|
|
anatomy = self.data["anatomy"]
|
|
|
|
try:
|
|
anatomy_filled = anatomy.format(workdir_data)
|
|
workdir = os.path.normpath(anatomy_filled["work"]["folder"])
|
|
if not os.path.exists(workdir):
|
|
self.log.debug(
|
|
"Creating workdir folder: \"{}\"".format(workdir)
|
|
)
|
|
os.makedirs(workdir)
|
|
|
|
except Exception as exc:
|
|
raise ApplicationLaunchFailed(
|
|
"Error in anatomy.format: {}".format(str(exc))
|
|
)
|
|
|
|
context_env = {
|
|
"AVALON_PROJECT": project_doc["name"],
|
|
"AVALON_ASSET": asset_doc["name"],
|
|
"AVALON_TASK": task_name,
|
|
"AVALON_APP": self.host_name,
|
|
"AVALON_APP_NAME": self.app_name,
|
|
"AVALON_HIERARCHY": hierarchy,
|
|
"AVALON_WORKDIR": workdir
|
|
}
|
|
self.log.debug(
|
|
"Context environemnts set:\n{}".format(
|
|
json.dumps(context_env, indent=4)
|
|
)
|
|
)
|
|
self.env.update(context_env)
|
|
|
|
self.prepare_last_workfile(workdir)
|
|
|
|
def _prepare_workdir_data(self, project_doc, asset_doc, task_name):
|
|
hierarchy = "/".join(asset_doc["data"]["parents"])
|
|
|
|
data = {
|
|
"project": {
|
|
"name": project_doc["name"],
|
|
"code": project_doc["data"].get("code")
|
|
},
|
|
"task": task_name,
|
|
"asset": asset_doc["name"],
|
|
"app": self.host_name,
|
|
"hierarchy": hierarchy
|
|
}
|
|
return data
|
|
|
|
def prepare_last_workfile(self, workdir):
|
|
"""last workfile workflow preparation.
|
|
|
|
Function check if should care about last workfile workflow and tries
|
|
to find the last workfile. Both information are stored to `data` and
|
|
environments.
|
|
|
|
Last workfile is filled always (with version 1) even if any workfile
|
|
exists yet.
|
|
|
|
Args:
|
|
workdir (str): Path to folder where workfiles should be stored.
|
|
"""
|
|
_workdir_data = self.data.get("workdir_data")
|
|
if not _workdir_data:
|
|
self.log.info(
|
|
"Skipping last workfile preparation."
|
|
" Key `workdir_data` not filled."
|
|
)
|
|
return
|
|
|
|
workdir_data = copy.deepcopy(_workdir_data)
|
|
project_name = self.data["project_name"]
|
|
task_name = self.data["task_name"]
|
|
start_last_workfile = self.should_start_last_workfile(
|
|
project_name, self.host_name, task_name
|
|
)
|
|
self.data["start_last_workfile"] = start_last_workfile
|
|
|
|
# Store boolean as "0"(False) or "1"(True)
|
|
self.env["AVALON_OPEN_LAST_WORKFILE"] = (
|
|
str(int(bool(start_last_workfile)))
|
|
)
|
|
|
|
_sub_msg = "" if start_last_workfile else " not"
|
|
self.log.debug(
|
|
"Last workfile should{} be opened on start.".format(_sub_msg)
|
|
)
|
|
|
|
# Last workfile path
|
|
last_workfile_path = ""
|
|
extensions = avalon.api.HOST_WORKFILE_EXTENSIONS.get(self.host_name)
|
|
if extensions:
|
|
anatomy = self.data["anatomy"]
|
|
# Find last workfile
|
|
file_template = anatomy.templates["work"]["file"]
|
|
workdir_data.update({
|
|
"version": 1,
|
|
"user": os.environ.get("PYPE_USERNAME") or getpass.getuser(),
|
|
"ext": extensions[0]
|
|
})
|
|
|
|
last_workfile_path = avalon.api.last_workfile(
|
|
workdir, file_template, workdir_data, extensions, True
|
|
)
|
|
|
|
if os.path.exists(last_workfile_path):
|
|
self.log.debug((
|
|
"Workfiles for launch context does not exists"
|
|
" yet but path will be set."
|
|
))
|
|
self.log.debug(
|
|
"Setting last workfile path: {}".format(last_workfile_path)
|
|
)
|
|
|
|
self.env["AVALON_LAST_WORKFILE"] = last_workfile_path
|
|
self.data["last_workfile_path"] = last_workfile_path
|
|
|
|
def should_start_last_workfile(self, project_name, host_name, task_name):
|
|
"""Define if host should start last version workfile if possible.
|
|
|
|
Default output is `False`. Can be overriden with environment variable
|
|
`AVALON_OPEN_LAST_WORKFILE`, valid values without case sensitivity are
|
|
`"0", "1", "true", "false", "yes", "no"`.
|
|
|
|
Args:
|
|
project_name (str): Name of project.
|
|
host_name (str): Name of host which is launched. In avalon's
|
|
application context it's value stored in app definition under
|
|
key `"application_dir"`. Is not case sensitive.
|
|
task_name (str): Name of task which is used for launching the host.
|
|
Task name is not case sensitive.
|
|
|
|
Returns:
|
|
bool: True if host should start workfile.
|
|
|
|
"""
|
|
default_output = env_value_to_bool(
|
|
"AVALON_OPEN_LAST_WORKFILE", default=False
|
|
)
|
|
# TODO convert to settings
|
|
try:
|
|
startup_presets = (
|
|
config.get_presets(project_name)
|
|
.get("tools", {})
|
|
.get("workfiles", {})
|
|
.get("last_workfile_on_startup")
|
|
)
|
|
except Exception:
|
|
startup_presets = None
|
|
self.log.warning("Couldn't load pype's presets", exc_info=True)
|
|
|
|
if not startup_presets:
|
|
return default_output
|
|
|
|
host_name_lowered = host_name.lower()
|
|
task_name_lowered = task_name.lower()
|
|
|
|
max_points = 2
|
|
matching_points = -1
|
|
matching_item = None
|
|
for item in startup_presets:
|
|
hosts = item.get("hosts") or tuple()
|
|
tasks = item.get("tasks") or tuple()
|
|
|
|
hosts_lowered = tuple(_host_name.lower() for _host_name in hosts)
|
|
# Skip item if has set hosts and current host is not in
|
|
if hosts_lowered and host_name_lowered not in hosts_lowered:
|
|
continue
|
|
|
|
tasks_lowered = tuple(_task_name.lower() for _task_name in tasks)
|
|
# Skip item if has set tasks and current task is not in
|
|
if tasks_lowered:
|
|
task_match = False
|
|
for task_regex in compile_list_of_regexes(tasks_lowered):
|
|
if re.match(task_regex, task_name_lowered):
|
|
task_match = True
|
|
break
|
|
|
|
if not task_match:
|
|
continue
|
|
|
|
points = int(bool(hosts_lowered)) + int(bool(tasks_lowered))
|
|
if points > matching_points:
|
|
matching_item = item
|
|
matching_points = points
|
|
|
|
if matching_points == max_points:
|
|
break
|
|
|
|
if matching_item is not None:
|
|
output = matching_item.get("enabled")
|
|
if output is None:
|
|
output = default_output
|
|
return output
|
|
return default_output
|
|
|
|
def after_launch_procedures(self):
|
|
self._ftrack_after_launch_procedure()
|
|
|
|
def _ftrack_after_launch_procedure(self):
|
|
# TODO move to launch hook
|
|
project_name = self.data.get("project_name")
|
|
asset_name = self.data.get("asset_name")
|
|
task_name = self.data.get("task_name")
|
|
if (
|
|
not project_name
|
|
or not asset_name
|
|
or not task_name
|
|
):
|
|
return
|
|
|
|
required_keys = ("FTRACK_SERVER", "FTRACK_API_USER", "FTRACK_API_KEY")
|
|
for key in required_keys:
|
|
if not os.environ.get(key):
|
|
self.log.debug((
|
|
"Missing required environment \"{}\""
|
|
" for Ftrack after launch procedure."
|
|
).format(key))
|
|
return
|
|
|
|
try:
|
|
import ftrack_api
|
|
session = ftrack_api.Session(auto_connect_event_hub=True)
|
|
self.log.debug("Ftrack session created")
|
|
except Exception:
|
|
self.log.warning("Couldn't create Ftrack session")
|
|
return
|
|
|
|
try:
|
|
entity = self._find_ftrack_task_entity(
|
|
session, project_name, asset_name, task_name
|
|
)
|
|
self._ftrack_status_change(session, entity, project_name)
|
|
self._start_timer(session, entity, ftrack_api)
|
|
except Exception:
|
|
self.log.warning(
|
|
"Couldn't finish Ftrack procedure.", exc_info=True
|
|
)
|
|
return
|
|
|
|
finally:
|
|
session.close()
|
|
|
|
def _find_ftrack_task_entity(
|
|
self, session, project_name, asset_name, task_name
|
|
):
|
|
project_entity = session.query(
|
|
"Project where full_name is \"{}\"".format(project_name)
|
|
).first()
|
|
if not project_entity:
|
|
self.log.warning(
|
|
"Couldn't find project \"{}\" in Ftrack.".format(project_name)
|
|
)
|
|
return
|
|
|
|
potential_task_entities = session.query((
|
|
"TypedContext where parent.name is \"{}\" and project_id is \"{}\""
|
|
).format(asset_name, project_entity["id"])).all()
|
|
filtered_entities = []
|
|
for _entity in potential_task_entities:
|
|
if (
|
|
_entity.entity_type.lower() == "task"
|
|
and _entity["name"] == task_name
|
|
):
|
|
filtered_entities.append(_entity)
|
|
|
|
if not filtered_entities:
|
|
self.log.warning((
|
|
"Couldn't find task \"{}\" under parent \"{}\" in Ftrack."
|
|
).format(task_name, asset_name))
|
|
return
|
|
|
|
if len(filtered_entities) > 1:
|
|
self.log.warning((
|
|
"Found more than one task \"{}\""
|
|
" under parent \"{}\" in Ftrack."
|
|
).format(task_name, asset_name))
|
|
return
|
|
|
|
return filtered_entities[0]
|
|
|
|
def _ftrack_status_change(self, session, entity, project_name):
|
|
from pype.api import config
|
|
presets = config.get_presets(project_name)["ftrack"]["ftrack_config"]
|
|
statuses = presets.get("status_update")
|
|
if not statuses:
|
|
return
|
|
|
|
actual_status = entity["status"]["name"].lower()
|
|
already_tested = set()
|
|
ent_path = "/".join(
|
|
[ent["name"] for ent in entity["link"]]
|
|
)
|
|
while True:
|
|
next_status_name = None
|
|
for key, value in statuses.items():
|
|
if key in already_tested:
|
|
continue
|
|
if actual_status in value or "_any_" in value:
|
|
if key != "_ignore_":
|
|
next_status_name = key
|
|
already_tested.add(key)
|
|
break
|
|
already_tested.add(key)
|
|
|
|
if next_status_name is None:
|
|
break
|
|
|
|
try:
|
|
query = "Status where name is \"{}\"".format(
|
|
next_status_name
|
|
)
|
|
status = session.query(query).one()
|
|
|
|
entity["status"] = status
|
|
session.commit()
|
|
self.log.debug("Changing status to \"{}\" <{}>".format(
|
|
next_status_name, ent_path
|
|
))
|
|
break
|
|
|
|
except Exception:
|
|
session.rollback()
|
|
msg = (
|
|
"Status \"{}\" in presets wasn't found"
|
|
" on Ftrack entity type \"{}\""
|
|
).format(next_status_name, entity.entity_type)
|
|
self.log.warning(msg)
|
|
|
|
def _start_timer(self, session, entity, _ftrack_api):
|
|
self.log.debug("Triggering timer start.")
|
|
|
|
user_entity = session.query("User where username is \"{}\"".format(
|
|
os.environ["FTRACK_API_USER"]
|
|
)).first()
|
|
if not user_entity:
|
|
self.log.warning(
|
|
"Couldn't find user with username \"{}\" in Ftrack".format(
|
|
os.environ["FTRACK_API_USER"]
|
|
)
|
|
)
|
|
return
|
|
|
|
source = {
|
|
"user": {
|
|
"id": user_entity["id"],
|
|
"username": user_entity["username"]
|
|
}
|
|
}
|
|
event_data = {
|
|
"actionIdentifier": "start.timer",
|
|
"selection": [{"entityId": entity["id"], "entityType": "task"}]
|
|
}
|
|
session.event_hub.publish(
|
|
_ftrack_api.event.base.Event(
|
|
topic="ftrack.action.launch",
|
|
data=event_data,
|
|
source=source
|
|
),
|
|
on_error="ignore"
|
|
)
|
|
self.log.debug("Timer start triggered successfully.")
|