Apps Action runs also timer in FT. New actions Thumb to child/parent, sort review and *open_folder

This commit is contained in:
Jakub Trllo 2018-11-01 17:46:09 +01:00
parent 06f1dd9057
commit ab6f07e28c
12 changed files with 1111 additions and 87 deletions

View file

@ -0,0 +1,117 @@
import sys
import argparse
import logging
import os
import getpass
import ftrack_api
from ftrack_action_handler import BaseAction
class ClientReviewSort(BaseAction):
'''Custom action.'''
#: Action identifier.
identifier = 'client.review.sort'
#: Action label.
label = 'Sort Review'
def validateSelection(self, entities):
'''Return true if the selection is valid. '''
if len(entities) == 0:
return False
return True
def discover(self, session, entities, event):
'''Return action config if triggered on a single selection.'''
selection = event['data']['selection']
# this action will only handle a single version.
if (not self.validateSelection(entities) or
selection[0]['entityType'] != 'reviewsession'):
return False
return True
def launch(self, session, entities, event):
entity_type, entity_id = entities[0]
entity = session.get(entity_type, entity_id)
# Get all objects from Review Session and all 'sort order' possibilities
obj_list = []
sort_order_list = []
for obj in entity['review_session_objects']:
obj_list.append(obj)
sort_order_list.append(obj['sort_order'])
# Sort criteria
obj_list = sorted(obj_list, key=lambda k: k['asset_version']['task']['name'])
obj_list = sorted(obj_list, key=lambda k: k['version'])
obj_list = sorted(obj_list, key=lambda k: k['name'])
# Set 'sort order' to sorted list, so they are sorted in Ftrack also
for i in range(len(obj_list)):
obj_list[i]['sort_order'] = sort_order_list[i]
session.commit()
return {
'success': True,
'message': 'Client Review sorted!'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = ClientReviewSort(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -215,7 +215,6 @@ def register(session, **kw):
action_handler = AvalonIdAttribute(session)
action_handler.register()
print("----- action - <" + action_handler.__class__.__name__ + "> - Has been registered -----")
def main(arguments=None):

View file

@ -0,0 +1,170 @@
import sys
import argparse
import logging
import getpass
import subprocess
import os
import ftrack_api
from ftrack_action_handler import BaseAction
import ft_utils
class openFolder(BaseAction):
'''Open folders action'''
#: Action identifier.
identifier = 'open.folders'
#: Action label.
label = 'Open Folders'
#: Action Icon.
icon = "https://cdn3.iconfinder.com/data/icons/stroke/53/Open-Folder-256.png"
def validateSelection(self, selection):
'''Return true if the selection is valid. '''
if len(selection) == 0 or selection[0]['entityType'] in ['assetversion', 'Component']:
return False
return True
def discover(self, session, entities, event):
selection = event['data']['selection']
# validate selection, and only return action if it is valid.
return self.validateSelection(selection)
def get_paths(self, entity):
'''Prepare all the paths for the entity.
This function uses custom module to deal with paths.
You will need to replace it with your logic.
'''
root = entity['project']['root']
entity_type = entity.entity_type.lower()
if entity_type == 'task':
if entity['parent'].entity_type == 'Asset Build':
templates = ['asset.task']
else:
templates = ['shot.task']
elif entity_type in ['shot', 'folder', 'sequence', 'episode']:
templates = ['shot']
elif entity_type in ['asset build', 'library']:
templates = ['asset']
paths = ft_utils.getPathsYaml(entity,
templateList=templates,
root=root)
return paths
def launch(self, session, entities, event):
'''Callback method for action.'''
selection = event['data'].get('selection', [])
self.logger.info(u'Launching action with selection \
{0}'.format(selection))
# Prepare lists to keep track of failures and successes
fails = []
hits = set([])
for entity in entities:
entity_type, entity_id = entity
entity = session.get(entity_type, entity_id)
# Get paths base on the entity.
# This function needs to be chagned to fit your path logic
paths = self.get_paths(entity)
# For each path, check if it exists on the disk and try opening it
for path in paths:
if os.path.isdir(path):
self.logger.info('Opening: ' + path)
# open the folder
if sys.platform == 'darwin':
subprocess.Popen(['open', '--', path])
elif sys.platform == 'linux2':
subprocess.Popen(['gnome-open', '--', path])
elif sys.platform == 'win32':
subprocess.Popen(['explorer', path])
# add path to list of hits
hits.add(entity['name'])
# Add entity to fails list if no folder could be openned for it
if entity['name'] not in hits:
fails.append(entity['name'])
# Inform user of the result
if len(hits) == 0:
return {
'success': False,
'message': 'No folders found for: {}'.format(', '.join(fails))
}
if len(fails) > 0:
return {
'success': True,
'message': 'No folders found for: {}'.format(', '.join(fails))
}
return {
'success': True,
'message': 'Opening folders'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
# Validate that session is an instance of ftrack_api.Session. If not,
# assume that register is being called from an old or incompatible API and
# return without doing anything.
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = openFolder(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -238,7 +238,6 @@ def register(session, **kw):
action_handler = SyncToAvalon(session)
action_handler.register()
print("----- action - <" + action_handler.__class__.__name__ + "> - Has been registered -----")
def main(arguments=None):

View file

@ -27,7 +27,7 @@ class TestAction(BaseAction):
def validate_selection(self, session, entities):
'''Return if *entities* is a valid selection.'''
pass
return True
def discover(self, session, entities, event):
@ -38,6 +38,13 @@ class TestAction(BaseAction):
def launch(self, session, entities, event):
for entity in entities:
entity_type, entity_id = entity
entity = session.get(entity_type, entity_id)
import ft_utils
print(ft_utils.getNewContext(entity))
return True

View file

@ -0,0 +1,125 @@
# :coding: utf-8
# :copyright: Copyright (c) 2015 Milan Kolar
import sys
import argparse
import logging
import getpass
import json
import ftrack_api
from ftrack_action_handler import BaseAction
class ThumbToChildren(BaseAction):
'''Custom action.'''
# Action identifier
identifier = 'thumb.to.children'
# Action label
label = 'Thumbnail to Children'
# Action icon
icon = "https://cdn3.iconfinder.com/data/icons/transfers/100/239322-download_transfer-128.png"
def validateSelection(self, selection):
'''Return true if the selection is valid.
Legacy plugins can only be started from a single Task. '''
if len(selection) > 0:
if selection[0]['entityType'] in ['assetversion', 'task']:
return True
return False
def discover(self, session, entities, event):
'''Return action config if triggered on asset versions.'''
selection = event['data']['selection']
# validate selection, and only return action if it is valid.
return self.validateSelection(selection)
def launch(self, session, entities, event):
'''Callback method for action.'''
userId = event['source']['user']['id']
user = session.query('User where id is ' + userId).one()
job = session.create('Job', {
'user': user,
'status': 'running',
'data': json.dumps({
'description': 'Push thumbnails to Childrens'
})
})
try:
for entity in entities:
entity_type, entity_id = entity
entity = session.get(entity_type, entity_id)
thumbid = entity['thumbnail_id']
if thumbid:
for child in entity['children']:
child['thumbnail_id'] = thumbid
# inform the user that the job is done
job['status'] = 'done'
session.commit()
except:
# fail the job if something goes wrong
job['status'] = 'failed'
raise
return {
'success': True,
'message': 'Created job for updating thumbnails!'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = ThumbToChildren(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -0,0 +1,134 @@
# :coding: utf-8
# :copyright: Copyright (c) 2015 Milan Kolar
import sys
import argparse
import logging
import getpass
import json
import ftrack_api
from ftrack_action_handler import BaseAction
class ThumbToParent(BaseAction):
'''Custom action.'''
# Action identifier
identifier = 'thumb.to.parent'
# Action label
label = 'Thumbnail to Parent'
# Action icon
icon = "https://cdn3.iconfinder.com/data/icons/transfers/100/239419-upload_transfer-512.png"
def validateSelection(self, selection):
'''Return true if the selection is valid.
Legacy plugins can only be started from a single Task.
'''
if len(selection) > 0:
if selection[0]['entityType'] in ['assetversion', 'task']:
return True
return False
def discover(self, session, entities, event):
'''Return action config if triggered on asset versions.'''
selection = event['data']['selection']
# validate selection, and only return action if it is valid.
return self.validateSelection(selection)
def launch(self, session, entities, event):
'''Callback method for action.'''
userId = event['source']['user']['id']
user = session.query('User where id is ' + userId).one()
job = session.create('Job', {
'user': user,
'status': 'running',
'data': json.dumps({
'description': 'Push thumbnails to parents'
})
})
try:
for entity in entities:
entity_type, entity_id = entity
entity = session.get(entity_type, entity_id)
if entity.entity_type.lower() == 'assetversion':
try:
parent = entity['task']
except:
par_ent = entity['link'][-2]
parent = session.get(par_ent['type'], par_ent['id'])
elif entity.entity_type.lower() == 'task':
parent = entity['parent']
thumbid = entity['thumbnail_id']
if parent and thumbid:
parent['thumbnail_id'] = thumbid
# inform the user that the job is done
job['status'] = 'done'
session.commit()
except:
# fail the job if something goes wrong
job['status'] = 'failed'
raise
return {
'success': True,
'message': 'Created job for updating thumbnails!'
}
def register(session, **kw):
'''Register action. Called when used as an event plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
action_handler = ThumbToParent(session)
action_handler.register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -0,0 +1,428 @@
import os
import operator
import ftrack_api
import collections
import sys
import json
import base64
# sys.path.append(os.path.dirname(os.path.dirname(__file__)))
# from ftrack_kredenc.lucidity.vendor import yaml
# from ftrack_kredenc import lucidity
#
#
# def get_ftrack_connect_path():
#
# ftrack_connect_root = os.path.abspath(os.getenv('FTRACK_CONNECT_PACKAGE'))
#
# return ftrack_connect_root
#
#
# def from_yaml(filepath):
# ''' Parse a Schema from a YAML file at the given *filepath*.
# '''
# with open(filepath, 'r') as f:
# data = yaml.safe_load(f)
# return data
#
#
# def get_task_enviro(entity, environment=None):
#
# context = get_context(entity)
#
# if not environment:
# environment = {}
#
# for key in context:
# os.environ[key.upper()] = context[key]['name']
# environment[key.upper()] = context[key]['name']
#
# if key == 'Project':
# os.putenv('PROJECT_ROOT', context[key]['root'])
# os.environ['PROJECT_ROOT'] = context[key]['root']
# environment['PROJECT_ROOT'] = context[key]['root']
# print('PROJECT_ROOT: ' + context[key]['root'])
# print(key + ': ' + context[key]['name'])
#
# return environment
#
#
# def get_entity():
# decodedEventData = json.loads(
# base64.b64decode(
# os.environ.get('FTRACK_CONNECT_EVENT')
# )
# )
#
# entity = decodedEventData.get('selection')[0]
#
# if entity['entityType'] == 'task':
# return ftrack_api.Task(entity['entityId'])
# else:
# return None
#
#
# def set_env_vars():
#
# entity = get_entity()
#
# if entity:
# if not os.environ.get('project_root'):
# enviro = get_task_enviro(entity)
#
# print(enviro)
#
#
def get_context(entity):
entityName = entity['name']
entityId = entity['id']
entityType = entity.entity_type
entityDescription = entity['description']
print(100*"*")
for k in entity['ancestors']:
print(k['name'])
print(100*"*")
hierarchy = entity.getParents()
ctx = collections.OrderedDict()
if entity.get('entityType') == 'task' and entityType == 'Task':
taskType = entity.getType().getName()
entityDic = {
'type': taskType,
'name': entityName,
'id': entityId,
'description': entityDescription
}
elif entity.get('entityType') == 'task':
entityDic = {
'name': entityName,
'id': entityId,
'description': entityDescription
}
ctx[entityType] = entityDic
folder_counter = 0
for ancestor in hierarchy:
tempdic = {}
if isinstance(ancestor, ftrack_api.Component):
# Ignore intermediate components.
continue
tempdic['name'] = ancestor.getName()
tempdic['id'] = ancestor.getId()
try:
objectType = ancestor.getObjectType()
tempdic['description'] = ancestor.getDescription()
except AttributeError:
objectType = 'Project'
tempdic['description'] = ''
if objectType == 'Asset Build':
tempdic['type'] = ancestor.getType().get('name')
objectType = objectType.replace(' ', '_')
elif objectType == 'Project':
tempdic['code'] = tempdic['name']
tempdic['name'] = ancestor.get('fullname')
tempdic['root'] = ancestor.getRoot()
if objectType == 'Folder':
objectType = objectType + str(folder_counter)
folder_counter += 1
ctx[objectType] = tempdic
return ctx
def getNewContext(entity):
parents = []
item = entity
while True:
item = item['parent']
if not item:
break
parents.append(item)
ctx = collections.OrderedDict()
entityDic = {
'name': entity['name'],
'id': entity['id'],
}
try:
entityDic['type'] = entity['type']['name']
except:
pass
ctx[entity['object_type']['name']] = entityDic
print(100*"-")
for p in parents:
print(p)
# add all parents to the context
for parent in parents:
tempdic = {}
if not parent.get('project_schema'):
tempdic = {
'name': parent['full_name'],
'code': parent['name'],
'id': parent['id'],
}
tempdic = {
'name': parent['name'],
'id': parent['id'],
}
object_type = parent['object_type']['name']
ctx[object_type] = tempdic
# add project to the context
project = entity['project']
ctx['Project'] = {
'name': project['full_name'],
'code': project['name'],
'id': project['id'],
'root': project['root'],
},
return ctx
#
#
# def get_frame_range():
#
# entity = get_entity()
# entityType = entity.getObjectType()
# environment = {}
#
# if entityType == 'Task':
# try:
# environment['FS'] = str(int(entity.getFrameStart()))
# except Exception:
# environment['FS'] = '1'
# try:
# environment['FE'] = str(int(entity.getFrameEnd()))
# except Exception:
# environment['FE'] = '1'
# else:
# try:
# environment['FS'] = str(int(entity.getFrameStart()))
# except Exception:
# environment['FS'] = '1'
# try:
# environment['FE'] = str(int(entity.getFrameEnd()))
# except Exception:
# environment['FE'] = '1'
#
#
# def get_asset_name_by_id(id):
# for t in ftrack_api.getAssetTypes():
# try:
# if t.get('typeid') == id:
# return t.get('name')
# except:
# return None
#
#
# def get_status_by_name(name):
# statuses = ftrack_api.getTaskStatuses()
#
# result = None
# for s in statuses:
# if s.get('name').lower() == name.lower():
# result = s
#
# return result
#
#
# def sort_types(types):
# data = {}
# for t in types:
# data[t] = t.get('sort')
#
# data = sorted(data.items(), key=operator.itemgetter(1))
# results = []
# for item in data:
# results.append(item[0])
#
# return results
#
#
# def get_next_task(task):
# shot = task.getParent()
# tasks = shot.getTasks()
#
# types_sorted = sort_types(ftrack_api.getTaskTypes())
#
# next_types = None
# for t in types_sorted:
# if t.get('typeid') == task.get('typeid'):
# try:
# next_types = types_sorted[(types_sorted.index(t) + 1):]
# except:
# pass
#
# for nt in next_types:
# for t in tasks:
# if nt.get('typeid') == t.get('typeid'):
# return t
#
# return None
#
#
# def get_latest_version(versions):
# latestVersion = None
# if len(versions) > 0:
# versionNumber = 0
# for item in versions:
# if item.get('version') > versionNumber:
# versionNumber = item.getVersion()
# latestVersion = item
# return latestVersion
#
#
# def get_thumbnail_recursive(task):
# if task.get('thumbid'):
# thumbid = task.get('thumbid')
# return ftrack_api.Attachment(id=thumbid)
# if not task.get('thumbid'):
# parent = ftrack_api.Task(id=task.get('parent_id'))
# return get_thumbnail_recursive(parent)
#
#
# # paths_collected
#
# def getFolderHierarchy(context):
# '''Return structure for *hierarchy*.
# '''
#
# hierarchy = []
# for key in reversed(context):
# hierarchy.append(context[key]['name'])
# print(hierarchy)
#
# return os.path.join(*hierarchy[1:-1])
#
#
def tweakContext(context, include=False):
for key in context:
if key == 'Asset Build':
context['Asset_Build'] = context.pop(key)
key = 'Asset_Build'
description = context[key].get('description')
if description:
context[key]['description'] = '_' + description
hierarchy = []
for key in reversed(context):
hierarchy.append(context[key]['name'])
if include:
hierarchy = os.path.join(*hierarchy[1:])
else:
hierarchy = os.path.join(*hierarchy[1:-1])
context['ft_hierarchy'] = hierarchy
def getSchema(entity):
project = entity['project']
schema = project['project_schema']['name']
tools = os.path.abspath(os.environ.get('studio_tools'))
schema_path = os.path.join(tools, 'studio', 'templates', (schema + '_' + project['name'] + '.yml'))
if not os.path.exists(schema_path):
schema_path = os.path.join(tools, 'studio', 'templates', (schema + '.yml'))
if not os.path.exists(schema_path):
schema_path = os.path.join(tools, 'studio', 'templates', 'default.yml')
schema = lucidity.Schema.from_yaml(schema_path)
print(schema_path)
return schema
# def getAllPathsYaml(entity, root=''):
#
# if isinstance(entity, str) or isinstance(entity, unicode):
# entity = ftrack_api.Task(entity)
#
# context = get_context(entity)
#
# tweakContext(context)
#
# schema = getSchema(entity)
#
# paths = schema.format_all(context)
# paths_collected = []
#
# for path in paths:
# tweak_path = path[0].replace(" ", '_').replace('\'', '').replace('\\', '/')
#
# tempPath = os.path.join(root, tweak_path)
# path = list(path)
# path[0] = tempPath
# paths_collected.append(path)
#
# return paths_collected
#
def getPathsYaml(entity, templateList=None, root=None, **kwargs):
'''
version=None
ext=None
item=None
family=None
subset=None
'''
context = get_context(entity)
if entity.entity_type != 'Task':
tweakContext(context, include=True)
else:
tweakContext(context)
context.update(kwargs)
host = sys.executable.lower()
ext = None
if not context.get('ext'):
if "nuke" in host:
ext = 'nk'
elif "maya" in host:
ext = 'ma'
elif "houdini" in host:
ext = 'hip'
if ext:
context['ext'] = ext
if not context.get('subset'):
context['subset'] = ''
else:
context['subset'] = '_' + context['subset']
schema = getSchema(entity)
paths = schema.format_all(context)
paths_collected = set([])
for temp_mask in templateList:
for path in paths:
if temp_mask in path[1].name:
path = path[0].lower().replace(" ", '_').replace('\'', '').replace('\\', '/')
path_list = path.split('/')
if path_list[0].endswith(':'):
path_list[0] = path_list[0] + os.path.sep
path = os.path.join(*path_list)
temppath = os.path.join(root, path)
paths_collected.add(temppath)
return list(paths_collected)

View file

@ -72,6 +72,7 @@ class AppAction(object):
self._launch
)
def _discover(self, event):
args = self._translate_event(
self.session, event
@ -218,6 +219,7 @@ class AppAction(object):
*event* the unmodified original event
'''
# TODO Delete this line
print("Action - {0} ({1}) - just started".format(self.label, self.identifier))
@ -289,6 +291,13 @@ class AppAction(object):
'message': "We didn't found launcher for {0}".format(self.label)
}
# RUN TIMER IN FTRACK
username = event['source']['user']['username']
user = session.query('User where username is "{}"'.format(username)).one()
task = session.query('Task where id is {}'.format(entity['id'])).one()
print('Starting timer for task: ' + task['name'])
user.start_timer(task, force=True)
return {
'success': True,
'message': "Launching {0}".format(self.label)
@ -404,6 +413,7 @@ class BaseAction(object):
),
self._launch
)
print("----- action - <" + self.__class__.__name__ + "> - Has been registered -----")
def _discover(self, event):
args = self._translate_event(

View file

@ -6,58 +6,6 @@ import os
from pprint import *
def deleteAssetsForTask(taskId):
#taskId = os.environ['FTRACK_TASKID']
task = ftrack.Task(taskId)
taskAssets = task.getAssets()
print(taskAssets)
for a in taskAssets:
print(a.getName())
a.delete()
#shot = task.getParent()
#shotAssets = shot.getAssets()
def deleteAssetsFromShotByName(shotId, assNm=None):
if not assNm:
return
shot = ftrack.Task(shotId)
shotAssets = shot.getAssets()
for a in shotAssets:
nm = a.getName()
if nm == assNm:
a.delete()
# Created as action
def killRunningTasks(tm=None):
import datetime
import ftrack_api
session = ftrack_api.Session()
# Query all jobs created prior to yesterday which has not yet been completed.
yesterday = datetime.date.today() - datetime.timedelta(days=1)
if tm:
yesterday = tm
print(yesterday)
jobs = session.query(
'select id, status from Job '
'where status in ("queued", "running") and created_at > {0}'.format(yesterday)
)
# Update all the queried jobs, setting the status to failed.
for job in jobs:
print(job['created_at'])
print('Changing Job ({}) status: {} -> failed'.format(job['id'], job['status']))
job['status'] = 'failed'
session.commit()
print('Complete')
def checkRegex():
# _handle_result -> would be solution?
# """ TODO Check if name of entities match REGEX"""
@ -83,3 +31,122 @@ def checkRegex():
'message': 'Entity name contains invalid character!'
}
)
def get_context(entity):
parents = []
item = entity
while True:
item = item['parent']
if not item:
break
parents.append(item)
ctx = collections.OrderedDict()
folder_counter = 0
entityDic = {
'name': entity['name'],
'id': entity['id'],
}
try:
entityDic['type'] = entity['type']['name']
except:
pass
ctx[entity['object_type']['name']] = entityDic
# add all parents to the context
for parent in parents:
tempdic = {}
if not parent.get('project_schema'):
tempdic = {
'name': parent['name'],
'id': parent['id'],
}
object_type = parent['object_type']['name']
if object_type == 'Folder':
object_type = object_type + str(folder_counter)
folder_counter += 1
ctx[object_type] = tempdic
# add project to the context
project = entity['project']
ctx['Project'] = {
'name': project['full_name'],
'code': project['name'],
'id': project['id'],
'root': project['root']
}
return ctx
def get_status_by_name(name):
statuses = ftrack.getTaskStatuses()
result = None
for s in statuses:
if s.get('name').lower() == name.lower():
result = s
return result
def sort_types(types):
data = {}
for t in types:
data[t] = t.get('sort')
data = sorted(data.items(), key=operator.itemgetter(1))
results = []
for item in data:
results.append(item[0])
return results
def get_next_task(task):
shot = task.getParent()
tasks = shot.getTasks()
types_sorted = sort_types(ftrack.getTaskTypes())
next_types = None
for t in types_sorted:
if t.get('typeid') == task.get('typeid'):
try:
next_types = types_sorted[(types_sorted.index(t) + 1):]
except:
pass
for nt in next_types:
for t in tasks:
if nt.get('typeid') == t.get('typeid'):
return t
return None
def get_latest_version(versions):
latestVersion = None
if len(versions) > 0:
versionNumber = 0
for item in versions:
if item.get('version') > versionNumber:
versionNumber = item.getVersion()
latestVersion = item
return latestVersion
def get_thumbnail_recursive(task):
if task.get('thumbid'):
thumbid = task.get('thumbid')
return ftrack.Attachment(id=thumbid)
if not task.get('thumbid'):
parent = ftrack.Task(id=task.get('parent_id'))
return get_thumbnail_recursive(parent)

View file

@ -1,16 +0,0 @@
# import ftrack_api as local session
import ftrack_api
#
session = ftrack_api.Session()
# ----------------------------------
def test_event(event):
'''just a testing event'''
# start of event procedure ----------------------------------
for entity in event['data'].get('entities', []):
print(100*"_")
print(entity['changes'])
# end of event procedure ----------------------------------

View file

@ -1,16 +0,0 @@
# import ftrack_api as local session
import ftrack_api
#
session = ftrack_api.Session()
# ----------------------------------
def test_event(event):
'''just a testing event'''
# start of event procedure ----------------------------------
for entity in event['data'].get('entities', []):
print(100*"_")
print(entity['keys'])
# end of event procedure ----------------------------------