cleanup ftrack action server

This commit is contained in:
Milan Kolar 2018-10-30 15:31:50 +01:00
parent c593d6798a
commit d40d504489
5 changed files with 0 additions and 516 deletions

View file

@ -1,234 +0,0 @@
# :coding: utf-8
# :copyright: Copyright (c) 2017 ftrack
import logging
import ftrack_api
class BaseAction(object):
'''Custom Action base class
`label` a descriptive string identifing your action.
`varaint` To group actions together, give them the same
label and specify a unique variant per action.
`identifier` a unique identifier for your action.
`description` a verbose descriptive text for you action
'''
label = None
variant = None
identifier = None
description = None
icon = None
def __init__(self, session):
'''Expects a ftrack_api.Session instance'''
self.logger = logging.getLogger(
'{0}.{1}'.format(__name__, self.__class__.__name__)
)
if self.label is None:
raise ValueError(
'Action missing label.'
)
elif self.identifier is None:
raise ValueError(
'Action missing identifier.'
)
self._session = session
@property
def session(self):
'''Return current session.'''
return self._session
def register(self):
'''Registers the action, subscribing the the discover and launch topics.'''
self.session.event_hub.subscribe(
'topic=ftrack.action.discover', self._discover
)
self.session.event_hub.subscribe(
'topic=ftrack.action.launch and data.actionIdentifier={0}'.format(
self.identifier
),
self._launch
)
def _discover(self, event):
args = self._translate_event(
self.session, event
)
accepts = self.discover(
self.session, *args
)
if accepts:
return {
'items': [{
'label': self.label,
'variant': self.variant,
'description': self.description,
'actionIdentifier': self.identifier,
'icon': self.icon,
}]
}
def discover(self, session, entities, event):
'''Return true if we can handle the selected entities.
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the entity id.
If the entity is a hierarchical you will always get the entity
type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
return False
def _translate_event(self, session, event):
'''Return *event* translated structure to be used with the API.'''
_selection = event['data'].get('selection', [])
_entities = list()
for entity in _selection:
_entities.append(
(
self._get_entity_type(entity), entity.get('entityId')
)
)
return [
_entities,
event
]
def _get_entity_type(self, entity):
'''Return translated entity type tht can be used with API.'''
# Get entity type and make sure it is lower cased. Most places except
# the component tab in the Sidebar will use lower case notation.
entity_type = entity.get('entityType').replace('_', '').lower()
for schema in self.session.schemas:
alias_for = schema.get('alias_for')
if (
alias_for and isinstance(alias_for, str) and
alias_for.lower() == entity_type
):
return schema['id']
for schema in self.session.schemas:
if schema['id'].lower() == entity_type:
return schema['id']
raise ValueError(
'Unable to translate entity type: {0}.'.format(entity_type)
)
def _launch(self, event):
args = self._translate_event(
self.session, event
)
interface = self._interface(
self.session, *args
)
if interface:
return interface
response = self.launch(
self.session, *args
)
return self._handle_result(
self.session, response, *args
)
def launch(self, session, entities, event):
'''Callback method for the custom action.
return either a bool ( True if successful or False if the action failed )
or a dictionary with they keys `message` and `success`, the message should be a
string and will be displayed as feedback to the user, success should be a bool,
True if successful or False if the action failed.
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the entity id.
If the entity is a hierarchical you will always get the entity
type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
raise NotImplementedError()
def _interface(self, *args):
interface = self.interface(*args)
if interface:
return {
'items': interface
}
def interface(self, session, entities, event):
'''Return a interface if applicable or None
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the entity id.
If the entity is a hierarchical you will always get the entity
type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
return None
def _handle_result(self, session, result, entities, event):
'''Validate the returned result from the action callback'''
if isinstance(result, bool):
result = {
'success': result,
'message': (
'{0} launched successfully.'.format(
self.label
)
)
}
elif isinstance(result, dict):
for key in ('success', 'message'):
if key in result:
continue
raise KeyError(
'Missing required key: {0}.'.format(key)
)
else:
self.logger.error(
'Invalid result type must be bool or dictionary!'
)
return result

View file

@ -1,44 +0,0 @@
import model
# model
name = model.data(index, "name")
# Get the action
Action = next((a for a in self._registered_actions if a.name == name),
None)
assert Action, "No action found"
action = Action()
# Run the action within current session
self.log("Running action: %s" % name, level=INFO)
popen = action.process(api.Session.copy())
# Action might return popen that pipes stdout
# in which case we listen for it.
process = {}
if popen and hasattr(popen, "stdout") and popen.stdout is not None:
class Thread(QtCore.QThread):
messaged = Signal(str)
def run(self):
for line in lib.stream(process["popen"].stdout):
self.messaged.emit(line.rstrip())
self.messaged.emit("%s killed." % process["name"])
thread = Thread()
thread.messaged.connect(
lambda line: terminal.log(line, terminal.INFO)
)
process.update({
"name": name,
"action": action,
"thread": thread,
"popen": popen
})
self._processes.append(process)
thread.start()
# return process