ayon-core/pype/ftrack/ftrack_utils.py

287 lines
7.7 KiB
Python

import os
import sys
import re
from pprint import *
import ftrack_api
from pype import lib
import avalon.io as io
import avalon.api
import avalon
from avalon.vendor import toml, jsonschema
from app.api import Logger
log = Logger.getLogger(__name__)
def get_data(parent, entity, session, custom_attributes):
entity_type = entity.entity_type
data = {}
data['ftrackId'] = entity['id']
data['entityType'] = entity_type
for cust_attr in custom_attributes:
key = cust_attr['key']
if cust_attr['entity_type'].lower() in ['asset']:
data[key] = entity['custom_attributes'][key]
elif cust_attr['entity_type'].lower() in ['show'] and entity_type.lower() == 'project':
data[key] = entity['custom_attributes'][key]
elif cust_attr['entity_type'].lower() in ['task'] and entity_type.lower() != 'project':
# Put space between capitals (e.g. 'AssetBuild' -> 'Asset Build')
entity_type_full = re.sub(r"(\w)([A-Z])", r"\1 \2", entity_type)
# Get object id of entity type
ent_obj_type_id = session.query('ObjectType where name is "{}"'.format(entity_type_full)).one()['id']
if cust_attr['object_type_id'] == ent_obj_type_id:
data[key] = entity['custom_attributes'][key]
if entity_type in ['Project']:
data['code'] = entity['name']
return data
# Get info for 'Data' in Avalon DB
tasks = []
for child in entity['children']:
if child.entity_type in ['Task']:
tasks.append(child['name'])
# Get list of parents without project
parents = []
folderStruct = []
for i in range(1, len(entity['link'])-1):
parEnt = session.get(entity['link'][i]['type'], entity['link'][i]['id'])
parName = parEnt['name']
folderStruct.append(parName)
if i > 1:
parents.append(parEnt)
parentId = None
for parent in parents:
parentId = io.find_one({'type': 'asset', 'name': parName})['_id']
if parent['parent'].entity_type != 'project' and parentId is None:
parent.importToAvalon(session, parent)
parentId = io.find_one({'type': 'asset', 'name': parName})['_id']
hierarchy = os.path.sep.join(folderStruct)
data['visualParent'] = parentId
data['parents'] = folderStruct
data['tasks'] = tasks
data['hierarchy'] = hierarchy
return data
def avalon_check_name(entity, inSchema = None):
ValidationError = jsonschema.ValidationError
alright = True
name = entity['name']
if " " in name:
alright = False
data = {}
data['data'] = {}
data['type'] = 'asset'
schema = "avalon-core:asset-2.0"
# TODO have project any REGEX check?
if entity.entity_type in ['Project']:
# data['type'] = 'project'
name = entity['full_name']
# schema = get_avalon_project_template_schema()['schema']
# elif entity.entity_type in ['AssetBuild','Library']:
# data['silo'] = 'Assets'
# else:
# data['silo'] = 'Film'
data['silo'] = 'Film'
if inSchema is not None:
schema = inSchema
data['schema'] = schema
data['name'] = name
try:
avalon.schema.validate(data)
except ValidationError:
alright = False
if alright is False:
raise ValueError("{} includes unsupported symbols like 'dash' or 'space'".format(name))
def get_apps(entity):
""" Get apps from project
Requirements:
'Entity' MUST be object of ftrack entity with entity_type 'Project'
Checking if app from ftrack is available in Templates/bin/{app_name}.toml
Returns:
Array with dictionaries with app Name and Label
"""
apps = []
for app in entity['custom_attributes']['applications']:
try:
app_config = {}
app_config['name'] = app
app_config['label'] = toml.load(avalon.lib.which_app(app))['label']
apps.append(app_config)
except Exception as e:
log.warning('Error with application {0} - {1}'.format(app, e))
return apps
def get_config(entity):
config = {}
config['schema'] = lib.get_avalon_project_config_schema()
config['tasks'] = [{'name': ''}]
config['apps'] = get_apps(entity)
config['template'] = lib.get_avalon_project_template()
return config
def checkRegex():
# _handle_result -> would be solution?
# """ TODO Check if name of entities match REGEX"""
for entity in importable:
for e in entity['link']:
item = {
"silo": "silo",
"parent": "parent",
"type": "asset",
"schema": "avalon-core:asset-2.0",
"name": e['name'],
"data": dict(),
}
try:
schema.validate(item)
except Exception as e:
print(e)
print(e['name'])
ftrack.EVENT_HUB.publishReply(
event,
data={
'success': False,
'message': 'Entity name contains invalid character!'
}
)
def get_context(entity):
parents = []
item = entity
while True:
item = item['parent']
if not item:
break
parents.append(item)
ctx = collections.OrderedDict()
folder_counter = 0
entityDic = {
'name': entity['name'],
'id': entity['id'],
}
try:
entityDic['type'] = entity['type']['name']
except:
pass
ctx[entity['object_type']['name']] = entityDic
# add all parents to the context
for parent in parents:
tempdic = {}
if not parent.get('project_schema'):
tempdic = {
'name': parent['name'],
'id': parent['id'],
}
object_type = parent['object_type']['name']
if object_type == 'Folder':
object_type = object_type + str(folder_counter)
folder_counter += 1
ctx[object_type] = tempdic
# add project to the context
project = entity['project']
ctx['Project'] = {
'name': project['full_name'],
'code': project['name'],
'id': project['id'],
'root': project['root']
}
return ctx
def get_status_by_name(name):
statuses = ftrack.getTaskStatuses()
result = None
for s in statuses:
if s.get('name').lower() == name.lower():
result = s
return result
def sort_types(types):
data = {}
for t in types:
data[t] = t.get('sort')
data = sorted(data.items(), key=operator.itemgetter(1))
results = []
for item in data:
results.append(item[0])
return results
def get_next_task(task):
shot = task.getParent()
tasks = shot.getTasks()
types_sorted = sort_types(ftrack.getTaskTypes())
next_types = None
for t in types_sorted:
if t.get('typeid') == task.get('typeid'):
try:
next_types = types_sorted[(types_sorted.index(t) + 1):]
except:
pass
for nt in next_types:
for t in tasks:
if nt.get('typeid') == t.get('typeid'):
return t
return None
def get_latest_version(versions):
latestVersion = None
if len(versions) > 0:
versionNumber = 0
for item in versions:
if item.get('version') > versionNumber:
versionNumber = item.getVersion()
latestVersion = item
return latestVersion
def get_thumbnail_recursive(task):
if task.get('thumbid'):
thumbid = task.get('thumbid')
return ftrack.Attachment(id=thumbid)
if not task.get('thumbid'):
parent = ftrack.Task(id=task.get('parent_id'))
return get_thumbnail_recursive(parent)