application is used everywhere so removed a lot of code

This commit is contained in:
iLLiCiTiT 2020-12-03 12:58:20 +01:00
parent 26a60111a6
commit 6f3fe95405
8 changed files with 18 additions and 877 deletions

View file

@ -1,6 +1,4 @@
import os
import sys
import getpass
import copy
import platform
import inspect
@ -10,25 +8,14 @@ import distutils.spawn
from abc import ABCMeta, abstractmethod
import six
import acre
import avalon.lib
import avalon.api
from pype.settings import system_settings, environemtns
from ..api import Logger
from ..api import (
Anatomy,
Logger,
config,
system_settings,
environments
)
from .python_module_tools import (
modules_from_path,
classes_from_module
)
from .hooks import execute_hook
from .deprecated import get_avalon_database
log = logging.getLogger(__name__)
@ -79,377 +66,6 @@ class ApplicationLaunchFailed(Exception):
pass
def launch_application(project_name, asset_name, task_name, app_name):
"""Launch host application with filling required environments.
TODO(iLLiCiT): This should be split into more parts.
"""
# `get_avalon_database` is in Pype 3 replaced with using `AvalonMongoDB`
database = get_avalon_database()
project_document = database[project_name].find_one({"type": "project"})
asset_document = database[project_name].find_one({
"type": "asset",
"name": asset_name
})
asset_doc_parents = asset_document["data"].get("parents")
hierarchy = "/".join(asset_doc_parents)
app_def = avalon.lib.get_application(app_name)
app_label = app_def.get("ftrack_label", app_def.get("label", app_name))
host_name = app_def["application_dir"]
# Workfile data collection may be special function?
data = {
"project": {
"name": project_document["name"],
"code": project_document["data"].get("code")
},
"task": task_name,
"asset": asset_name,
"app": host_name,
"hierarchy": hierarchy
}
try:
anatomy = Anatomy(project_name)
anatomy_filled = anatomy.format(data)
workdir = os.path.normpath(anatomy_filled["work"]["folder"])
except Exception as exc:
raise ApplicationLaunchFailed(
"Error in anatomy.format: {}".format(str(exc))
)
try:
os.makedirs(workdir)
except FileExistsError:
pass
last_workfile_path = None
extensions = avalon.api.HOST_WORKFILE_EXTENSIONS.get(host_name)
if extensions:
# Find last workfile
file_template = anatomy.templates["work"]["file"]
data.update({
"version": 1,
"user": os.environ.get("PYPE_USERNAME") or getpass.getuser(),
"ext": extensions[0]
})
last_workfile_path = avalon.api.last_workfile(
workdir, file_template, data, extensions, True
)
# set environments for Avalon
prep_env = copy.deepcopy(os.environ)
prep_env.update({
"AVALON_PROJECT": project_name,
"AVALON_ASSET": asset_name,
"AVALON_TASK": task_name,
"AVALON_APP": host_name,
"AVALON_APP_NAME": app_name,
"AVALON_HIERARCHY": hierarchy,
"AVALON_WORKDIR": workdir
})
start_last_workfile = avalon.api.should_start_last_workfile(
project_name, host_name, task_name
)
# Store boolean as "0"(False) or "1"(True)
prep_env["AVALON_OPEN_LAST_WORKFILE"] = (
str(int(bool(start_last_workfile)))
)
if (
start_last_workfile
and last_workfile_path
and os.path.exists(last_workfile_path)
):
prep_env["AVALON_LAST_WORKFILE"] = last_workfile_path
prep_env.update(anatomy.roots_obj.root_environments())
# collect all the 'environment' attributes from parents
tools_attr = [prep_env["AVALON_APP"], prep_env["AVALON_APP_NAME"]]
tools_env = asset_document["data"].get("tools_env") or []
tools_attr.extend(tools_env)
tools_env = acre.get_tools(tools_attr)
env = acre.compute(tools_env)
env = acre.merge(env, current_env=dict(prep_env))
# Get path to execute
st_temp_path = os.environ["PYPE_CONFIG"]
os_plat = platform.system().lower()
# Path to folder with launchers
path = os.path.join(st_temp_path, "launchers", os_plat)
# Full path to executable launcher
execfile = None
launch_hook = app_def.get("launch_hook")
if launch_hook:
log.info("launching hook: {}".format(launch_hook))
ret_val = execute_hook(launch_hook, env=env)
if not ret_val:
raise ApplicationLaunchFailed(
"Hook didn't finish successfully {}".format(app_label)
)
if sys.platform == "win32":
for ext in os.environ["PATHEXT"].split(os.pathsep):
fpath = os.path.join(path.strip('"'), app_def["executable"] + ext)
if os.path.isfile(fpath) and os.access(fpath, os.X_OK):
execfile = fpath
break
# Run SW if was found executable
if execfile is None:
raise ApplicationLaunchFailed(
"We didn't find launcher for {}".format(app_label)
)
popen = avalon.lib.launch(
executable=execfile, args=[], environment=env
)
elif (
sys.platform.startswith("linux")
or sys.platform.startswith("darwin")
):
execfile = os.path.join(path.strip('"'), app_def["executable"])
# Run SW if was found executable
if execfile is None:
raise ApplicationLaunchFailed(
"We didn't find launcher for {}".format(app_label)
)
if not os.path.isfile(execfile):
raise ApplicationLaunchFailed(
"Launcher doesn't exist - {}".format(execfile)
)
try:
fp = open(execfile)
except PermissionError as perm_exc:
raise ApplicationLaunchFailed(
"Access denied on launcher {} - {}".format(execfile, perm_exc)
)
fp.close()
# check executable permission
if not os.access(execfile, os.X_OK):
raise ApplicationLaunchFailed(
"No executable permission - {}".format(execfile)
)
popen = avalon.lib.launch( # noqa: F841
"/usr/bin/env", args=["bash", execfile], environment=env
)
return popen
class ApplicationAction:
"""Default application launcher
This is a convenience application Action that when "config" refers to a
parsed application `.toml` this can launch the application.
"""
_log = None
config = None
group = None
variant = None
required_session_keys = (
"AVALON_PROJECT",
"AVALON_ASSET",
"AVALON_TASK"
)
@property
def log(self):
if self._log is None:
self._log = Logger().get_logger(self.__class__.__name__)
return self._log
def is_compatible(self, session):
for key in self.required_session_keys:
if key not in session:
return False
return True
def process(self, session, **kwargs):
"""Process the full Application action"""
project_name = session["AVALON_PROJECT"]
asset_name = session["AVALON_ASSET"]
task_name = session["AVALON_TASK"]
launch_application(
project_name, asset_name, task_name, self.name
)
self._ftrack_after_launch_procedure(
project_name, asset_name, task_name
)
def _ftrack_after_launch_procedure(
self, project_name, asset_name, task_name
):
# TODO move to launch hook
required_keys = ("FTRACK_SERVER", "FTRACK_API_USER", "FTRACK_API_KEY")
for key in required_keys:
if not os.environ.get(key):
self.log.debug((
"Missing required environment \"{}\""
" for Ftrack after launch procedure."
).format(key))
return
try:
import ftrack_api
session = ftrack_api.Session(auto_connect_event_hub=True)
self.log.debug("Ftrack session created")
except Exception:
self.log.warning("Couldn't create Ftrack session")
return
try:
entity = self._find_ftrack_task_entity(
session, project_name, asset_name, task_name
)
self._ftrack_status_change(session, entity, project_name)
self._start_timer(session, entity, ftrack_api)
except Exception:
self.log.warning(
"Couldn't finish Ftrack procedure.", exc_info=True
)
return
finally:
session.close()
def _find_ftrack_task_entity(
self, session, project_name, asset_name, task_name
):
project_entity = session.query(
"Project where full_name is \"{}\"".format(project_name)
).first()
if not project_entity:
self.log.warning(
"Couldn't find project \"{}\" in Ftrack.".format(project_name)
)
return
potential_task_entities = session.query((
"TypedContext where parent.name is \"{}\" and project_id is \"{}\""
).format(asset_name, project_entity["id"])).all()
filtered_entities = []
for _entity in potential_task_entities:
if (
_entity.entity_type.lower() == "task"
and _entity["name"] == task_name
):
filtered_entities.append(_entity)
if not filtered_entities:
self.log.warning((
"Couldn't find task \"{}\" under parent \"{}\" in Ftrack."
).format(task_name, asset_name))
return
if len(filtered_entities) > 1:
self.log.warning((
"Found more than one task \"{}\""
" under parent \"{}\" in Ftrack."
).format(task_name, asset_name))
return
return filtered_entities[0]
def _ftrack_status_change(self, session, entity, project_name):
presets = config.get_presets(project_name)["ftrack"]["ftrack_config"]
statuses = presets.get("status_update")
if not statuses:
return
actual_status = entity["status"]["name"].lower()
already_tested = set()
ent_path = "/".join(
[ent["name"] for ent in entity["link"]]
)
while True:
next_status_name = None
for key, value in statuses.items():
if key in already_tested:
continue
if actual_status in value or "_any_" in value:
if key != "_ignore_":
next_status_name = key
already_tested.add(key)
break
already_tested.add(key)
if next_status_name is None:
break
try:
query = "Status where name is \"{}\"".format(
next_status_name
)
status = session.query(query).one()
entity["status"] = status
session.commit()
self.log.debug("Changing status to \"{}\" <{}>".format(
next_status_name, ent_path
))
break
except Exception:
session.rollback()
msg = (
"Status \"{}\" in presets wasn't found"
" on Ftrack entity type \"{}\""
).format(next_status_name, entity.entity_type)
self.log.warning(msg)
def _start_timer(self, session, entity, _ftrack_api):
self.log.debug("Triggering timer start.")
user_entity = session.query("User where username is \"{}\"".format(
os.environ["FTRACK_API_USER"]
)).first()
if not user_entity:
self.log.warning(
"Couldn't find user with username \"{}\" in Ftrack".format(
os.environ["FTRACK_API_USER"]
)
)
return
source = {
"user": {
"id": user_entity["id"],
"username": user_entity["username"]
}
}
event_data = {
"actionIdentifier": "start.timer",
"selection": [{"entityId": entity["id"], "entityType": "task"}]
}
session.event_hub.publish(
_ftrack_api.event.base.Event(
topic="ftrack.action.launch",
data=event_data,
source=source
),
on_error="ignore"
)
self.log.debug("Timer start triggered successfully.")
# Special naming case for subprocess since its a built-in method.
def _subprocess(*args, **kwargs):
"""Convenience method for getting output errors for subprocess.

View file

@ -1,104 +0,0 @@
import os
import toml
import time
from pype.modules.ftrack.lib import AppAction
from avalon import lib
from pype.api import Logger, config
log = Logger().get_logger(__name__)
def registerApp(app, session, plugins_presets):
name = app['name']
variant = ""
try:
variant = app['name'].split("_")[1]
except Exception:
pass
abspath = lib.which_app(app['name'])
if abspath is None:
log.error(
"'{0}' - App don't have config toml file".format(app['name'])
)
return
apptoml = toml.load(abspath)
''' REQUIRED '''
executable = apptoml['executable']
''' OPTIONAL '''
label = apptoml.get('ftrack_label', app.get('label', name))
icon = apptoml.get('ftrack_icon', None)
description = apptoml.get('description', None)
preactions = apptoml.get('preactions', [])
if icon:
icon = icon.format(os.environ.get('PYPE_STATICS_SERVER', ''))
# register action
AppAction(
session, label, name, executable, variant,
icon, description, preactions, plugins_presets
).register()
if not variant:
log.info('- Variant is not set')
def register(session, plugins_presets={}):
from pype.lib import env_value_to_bool
if env_value_to_bool("PYPE_USE_APP_MANAGER", default=False):
return
app_usages = (
config.get_presets()
.get("global", {})
.get("applications")
) or {}
apps = []
missing_app_names = []
launchers_path = os.path.join(os.environ["PYPE_CONFIG"], "launchers")
for file in os.listdir(launchers_path):
filename, ext = os.path.splitext(file)
if ext.lower() != ".toml":
continue
app_usage = app_usages.get(filename)
if not app_usage:
if app_usage is None:
missing_app_names.append(filename)
continue
loaded_data = toml.load(os.path.join(launchers_path, file))
app_data = {
"name": filename,
"label": loaded_data.get("label", filename)
}
apps.append(app_data)
if missing_app_names:
log.debug(
"Apps not defined in applications usage. ({})".format(
", ".join((
"\"{}\"".format(app_name)
for app_name in missing_app_names
))
)
)
apps = sorted(apps, key=lambda app: app["name"])
app_counter = 0
for app in apps:
try:
registerApp(app, session, plugins_presets)
if app_counter % 5 == 0:
time.sleep(0.1)
app_counter += 1
except Exception as exc:
log.warning(
"\"{}\" - not a proper App ({})".format(app['name'], str(exc)),
exc_info=True
)

View file

@ -211,7 +211,5 @@ class AppplicationsAction(BaseAction):
def register(session, plugins_presets=None):
'''Register action. Called when used as an event plugin.'''
from pype.lib import env_value_to_bool
if env_value_to_bool("PYPE_USE_APP_MANAGER", default=False):
AppplicationsAction(session, plugins_presets).register()
"""Register action. Called when used as an event plugin."""
AppplicationsAction(session, plugins_presets).register()

View file

@ -146,9 +146,6 @@ class CustomAttributes(BaseAction):
"text", "boolean", "date", "enumerator",
"dynamic enumerator", "number"
)
# Pype 3 features
use_app_manager = env_value_to_bool("PYPE_USE_APP_MANAGER", default=False)
app_manager = None
def discover(self, session, entities, event):
'''
@ -171,8 +168,7 @@ class CustomAttributes(BaseAction):
})
session.commit()
if self.use_app_manager:
self.app_manager = ApplicationManager()
self.app_manager = ApplicationManager()
try:
self.prepare_global_data(session)
@ -391,54 +387,8 @@ class CustomAttributes(BaseAction):
app_definitions.append({"empty": "< Empty >"})
return app_definitions
def application_definitions(self):
app_usages = self.presets.get("global", {}).get("applications") or {}
app_definitions = []
launchers_path = os.path.join(os.environ["PYPE_CONFIG"], "launchers")
missing_app_names = []
for file in os.listdir(launchers_path):
app_name, ext = os.path.splitext(file)
if ext.lower() != ".toml":
continue
if not app_usages.get(app_name):
missing_app_names.append(app_name)
continue
loaded_data = toml.load(os.path.join(launchers_path, file))
ftrack_label = loaded_data.get("ftrack_label")
if ftrack_label:
parts = app_name.split("_")
if len(parts) > 1:
ftrack_label = " ".join((ftrack_label, parts[-1]))
else:
ftrack_label = loaded_data.get("label", app_name)
app_definitions.append({app_name: ftrack_label})
if missing_app_names:
self.log.warning(
"Apps not defined in applications usage. ({})".format(
", ".join((
"\"{}\"".format(app_name)
for app_name in missing_app_names
))
)
)
# Make sure there is at least one item
if not app_definitions:
app_definitions.append({"empty": "< Empty >"})
return app_definitions
def applications_attribute(self, event):
if self.use_app_manager:
apps_data = self.app_defs_from_app_manager()
else:
apps_data = self.application_definitions()
apps_data = self.app_defs_from_app_manager()
applications_custom_attr_data = {
"label": "Applications",
@ -453,28 +403,13 @@ class CustomAttributes(BaseAction):
}
self.process_attr_data(applications_custom_attr_data, event)
def tools_from_app_manager(self):
def tools_attribute(self, event):
tools_data = []
for tool_name, tool in self.app_manager.tools.items():
if tool.enabled:
tools_data.append({
tool_name: tool_name
})
return tools_data
def tools_data(self):
tool_usages = self.presets.get("global", {}).get("tools") or {}
tools_data = []
for tool_name, usage in tool_usages.items():
if usage:
tools_data.append({tool_name: tool_name})
return tools_data
def tools_attribute(self, event):
if self.use_app_manager:
tools_data = self.tools_from_app_manager()
else:
tools_data = self.tools_data()
# Make sure there is at least one item
if not tools_data:

View file

@ -191,43 +191,17 @@ def get_project_apps(in_app_list):
apps = []
warnings = collections.defaultdict(list)
if env_value_to_bool("PYPE_USE_APP_MANAGER", default=False):
missing_app_msg = "Missing definition of application"
application_manager = ApplicationManager()
for app_name in in_app_list:
app = application_manager.applications.get(app_name)
if app:
apps.append({
"name": app_name,
"label": app.full_label
})
else:
warnings[missing_app_msg].append(app_name)
return apps, warnings
# TODO report
missing_toml_msg = "Missing config file for application"
error_msg = (
"Unexpected error happend during preparation of application"
)
for app in in_app_list:
try:
toml_path = avalon.lib.which_app(app)
if not toml_path:
log.warning(missing_toml_msg + ' "{}"'.format(app))
warnings[missing_toml_msg].append(app)
continue
missing_app_msg = "Missing definition of application"
application_manager = ApplicationManager()
for app_name in in_app_list:
app = application_manager.applications.get(app_name)
if app:
apps.append({
"name": app,
"label": toml.load(toml_path)["label"]
"name": app_name,
"label": app.full_label
})
except Exception:
warnings[error_msg].append(app)
log.warning((
"Error has happened during preparing application \"{}\""
).format(app), exc_info=True)
else:
warnings[missing_app_msg].append(app_name)
return apps, warnings

View file

@ -1,223 +0,0 @@
from pype import lib as pypelib
from pype.api import config
from .ftrack_action_handler import BaseAction
class AppAction(BaseAction):
"""Application Action class.
Args:
session (ftrack_api.Session): Session where action will be registered.
label (str): A descriptive string identifing your action.
varaint (str, optional): To group actions together, give them the same
label and specify a unique variant per action.
identifier (str): An unique identifier for app.
description (str): A verbose descriptive text for you action.
icon (str): Url path to icon which will be shown in Ftrack web.
"""
type = "Application"
preactions = ["start.timer"]
def __init__(
self, session, label, name, executable, variant=None,
icon=None, description=None, preactions=[], plugins_presets={}
):
self.label = label
self.identifier = name
self.executable = executable
self.variant = variant
self.icon = icon
self.description = description
self.preactions.extend(preactions)
super().__init__(session, plugins_presets)
if label is None:
raise ValueError("Action missing label.")
if name is None:
raise ValueError("Action missing identifier.")
if executable is None:
raise ValueError("Action missing executable.")
def register(self):
"""Registers the action, subscribing the discover and launch topics."""
discovery_subscription = (
"topic=ftrack.action.discover and source.user.username={0}"
).format(self.session.api_user)
self.session.event_hub.subscribe(
discovery_subscription,
self._discover,
priority=self.priority
)
launch_subscription = (
"topic=ftrack.action.launch"
" and data.actionIdentifier={0}"
" and source.user.username={1}"
).format(
self.identifier,
self.session.api_user
)
self.session.event_hub.subscribe(
launch_subscription,
self._launch
)
def discover(self, session, entities, event):
"""Return true if we can handle the selected entities.
Args:
session (ftrack_api.Session): Helps to query necessary data.
entities (list): Object of selected entities.
event (ftrack_api.Event): Ftrack event causing discover callback.
"""
if (
len(entities) != 1
or entities[0].entity_type.lower() != "task"
):
return False
entity = entities[0]
if entity["parent"].entity_type.lower() == "project":
return False
avalon_project_apps = event["data"].get("avalon_project_apps", None)
avalon_project_doc = event["data"].get("avalon_project_doc", None)
if avalon_project_apps is None:
if avalon_project_doc is None:
ft_project = self.get_project_from_entity(entity)
database = pypelib.get_avalon_database()
project_name = ft_project["full_name"]
avalon_project_doc = database[project_name].find_one({
"type": "project"
}) or False
event["data"]["avalon_project_doc"] = avalon_project_doc
if not avalon_project_doc:
return False
project_apps_config = avalon_project_doc["config"].get("apps", [])
avalon_project_apps = [
app["name"] for app in project_apps_config
] or False
event["data"]["avalon_project_apps"] = avalon_project_apps
if not avalon_project_apps:
return False
return self.identifier in avalon_project_apps
def _launch(self, event):
entities = self._translate_event(event)
preactions_launched = self._handle_preactions(
self.session, event
)
if preactions_launched is False:
return
response = self.launch(self.session, entities, event)
return self._handle_result(response)
def launch(self, session, entities, event):
"""Callback method for the custom action.
return either a bool (True if successful or False if the action failed)
or a dictionary with they keys `message` and `success`, the message
should be a string and will be displayed as feedback to the user,
success should be a bool, True if successful or False if the action
failed.
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and
the entity id. If the entity is a hierarchical you will always get
the entity type TypedContext, once retrieved through a get operation
you will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
"""
entity = entities[0]
task_name = entity["name"]
asset_name = entity["parent"]["name"]
project_name = entity["project"]["full_name"]
try:
pypelib.launch_application(
project_name, asset_name, task_name, self.identifier
)
except pypelib.ApplicationLaunchFailed as exc:
self.log.error(str(exc))
return {
"success": False,
"message": str(exc)
}
except Exception:
msg = "Unexpected failure of application launch {}".format(
self.label
)
self.log.error(msg, exc_info=True)
return {
"success": False,
"message": msg
}
# Change status of task to In progress
presets = config.get_presets()["ftrack"]["ftrack_config"]
if "status_update" in presets:
statuses = presets["status_update"]
actual_status = entity["status"]["name"].lower()
already_tested = []
ent_path = "/".join(
[ent["name"] for ent in entity["link"]]
)
while True:
next_status_name = None
for key, value in statuses.items():
if key in already_tested:
continue
if actual_status in value or "_any_" in value:
if key != "_ignore_":
next_status_name = key
already_tested.append(key)
break
already_tested.append(key)
if next_status_name is None:
break
try:
query = "Status where name is \"{}\"".format(
next_status_name
)
status = session.query(query).one()
entity["status"] = status
session.commit()
self.log.debug("Changing status to \"{}\" <{}>".format(
next_status_name, ent_path
))
break
except Exception:
session.rollback()
msg = (
"Status \"{}\" in presets wasn't found"
" on Ftrack entity type \"{}\""
).format(next_status_name, entity.entity_type)
self.log.warning(msg)
return {
"success": True,
"message": "Launching {0}".format(self.label)
}

View file

@ -16,60 +16,13 @@ provides a bridge between the file-based project inventory and configuration.
import os
from Qt import QtGui
from avalon import lib
from avalon.vendor import qtawesome
from pype.api import resources
from pype.lib import ApplicationAction
ICON_CACHE = {}
NOT_FOUND = type("NotFound", (object, ), {})
def get_application_actions(project):
"""Define dynamic Application classes for project using `.toml` files
Args:
project (dict): project document from the database
Returns:
list: list of dictionaries
"""
apps = []
for app in project["config"]["apps"]:
try:
app_name = app["name"]
app_definition = lib.get_application(app_name)
except Exception as exc:
print("Unable to load application: %s - %s" % (app['name'], exc))
continue
# Get from app definition, if not there from app in project
icon = app_definition.get("icon", app.get("icon", "folder-o"))
color = app_definition.get("color", app.get("color", None))
order = app_definition.get("order", app.get("order", 0))
label = app_definition.get("label") or app.get("label") or app_name
label_variant = app_definition.get("label_variant")
group = app_definition.get("group") or app.get("group")
action = type(
"app_{}".format(app_name),
(ApplicationAction,),
{
"name": app_name,
"label": label,
"label_variant": label_variant,
"group": group,
"icon": icon,
"color": color,
"order": order,
"config": app_definition.copy()
}
)
apps.append(action)
return apps
def get_action_icon(action):
icon_name = action.icon
if not icon_name:

View file

@ -117,11 +117,7 @@ class ActionModel(QtGui.QStandardItemModel):
super(ActionModel, self).__init__(parent=parent)
self.dbcon = dbcon
self.use_manager = env_value_to_bool(
"PYPE_USE_APP_MANAGER", default=False
)
if self.use_manager:
self.application_manager = ApplicationManager()
self.application_manager = ApplicationManager()
self._session = {}
self._groups = {}
@ -141,11 +137,7 @@ class ActionModel(QtGui.QStandardItemModel):
actions = api.discover(api.Action)
# Get available project actions and the application actions
if self.use_manager:
app_actions = self.get_application_actions()
else:
project_doc = self.dbcon.find_one({"type": "project"})
app_actions = lib.get_application_actions(project_doc)
app_actions = self.get_application_actions()
actions.extend(app_actions)
self._registered_actions = actions