Merge pull request #94 from pypeclub/feature/ftrack_first_step_cleanup

Feature/ftrack first step cleanup
This commit is contained in:
Milan Kolar 2020-05-04 17:58:43 +02:00 committed by GitHub
commit 6d3d5610b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 413 additions and 466 deletions

View file

@ -49,27 +49,23 @@ class DeleteAssetSubset(BaseAction):
def _launch(self, event):
try:
args = self._translate_event(
self.session, event
)
entities = self._translate_event(event)
if "values" not in event["data"]:
self.dbcon.install()
return self._interface(self.session, *args)
return self._interface(self.session, entities, event)
confirmation = self.confirm_delete(*args)
confirmation = self.confirm_delete(entities, event)
if confirmation:
return confirmation
self.dbcon.install()
response = self.launch(
self.session, *args
self.session, entities, event
)
finally:
self.dbcon.uninstall()
return self._handle_result(
self.session, response, *args
)
return self._handle_result(response)
def interface(self, session, entities, event):
self.show_message(event, "Preparing data...", True)

View file

@ -5,13 +5,11 @@ import json
from bson.objectid import ObjectId
from pype.ftrack import BaseAction
from pype.ftrack.lib import (
get_project_from_entity,
get_avalon_entities_for_assetversion
)
from pypeapp import Anatomy
from pype.ftrack.lib.io_nonsingleton import DbConnector
from pype.ftrack.lib.avalon_sync import CustAttrIdKey
class StoreThumbnailsToAvalon(BaseAction):
# Action identifier
@ -89,7 +87,7 @@ class StoreThumbnailsToAvalon(BaseAction):
"message": msg
}
project = get_project_from_entity(entities[0])
project = self.get_project_from_entity(entities[0])
project_name = project["full_name"]
anatomy = Anatomy(project_name)
@ -186,7 +184,7 @@ class StoreThumbnailsToAvalon(BaseAction):
).format(entity["id"]))
continue
avalon_ents_result = get_avalon_entities_for_assetversion(
avalon_ents_result = self.get_avalon_entities_for_assetversion(
entity, self.db_con
)
version_full_path = (
@ -345,6 +343,119 @@ class StoreThumbnailsToAvalon(BaseAction):
file_open.close()
return True
def get_avalon_entities_for_assetversion(self, asset_version, db_con):
output = {
"success": True,
"message": None,
"project": None,
"project_name": None,
"asset": None,
"asset_name": None,
"asset_path": None,
"subset": None,
"subset_name": None,
"version": None,
"version_name": None,
"representations": None
}
db_con.install()
ft_asset = asset_version["asset"]
subset_name = ft_asset["name"]
version = asset_version["version"]
parent = ft_asset["parent"]
ent_path = "/".join(
[ent["name"] for ent in parent["link"]]
)
project = self.get_project_from_entity(asset_version)
project_name = project["full_name"]
output["project_name"] = project_name
output["asset_name"] = parent["name"]
output["asset_path"] = ent_path
output["subset_name"] = subset_name
output["version_name"] = version
db_con.Session["AVALON_PROJECT"] = project_name
avalon_project = db_con.find_one({"type": "project"})
output["project"] = avalon_project
if not avalon_project:
output["success"] = False
output["message"] = (
"Project not synchronized to avalon `{}`".format(project_name)
)
return output
asset_ent = None
asset_mongo_id = parent["custom_attributes"].get(CustAttrIdKey)
if asset_mongo_id:
try:
asset_mongo_id = ObjectId(asset_mongo_id)
asset_ent = db_con.find_one({
"type": "asset",
"_id": asset_mongo_id
})
except Exception:
pass
if not asset_ent:
asset_ent = db_con.find_one({
"type": "asset",
"data.ftrackId": parent["id"]
})
output["asset"] = asset_ent
if not asset_ent:
output["success"] = False
output["message"] = (
"Not synchronized entity to avalon `{}`".format(ent_path)
)
return output
asset_mongo_id = asset_ent["_id"]
subset_ent = db_con.find_one({
"type": "subset",
"parent": asset_mongo_id,
"name": subset_name
})
output["subset"] = subset_ent
if not subset_ent:
output["success"] = False
output["message"] = (
"Subset `{}` does not exist under Asset `{}`"
).format(subset_name, ent_path)
return output
version_ent = db_con.find_one({
"type": "version",
"name": version,
"parent": subset_ent["_id"]
})
output["version"] = version_ent
if not version_ent:
output["success"] = False
output["message"] = (
"Version `{}` does not exist under Subset `{}` | Asset `{}`"
).format(version, subset_name, ent_path)
return output
repre_ents = list(db_con.find({
"type": "representation",
"parent": version_ent["_id"]
}))
output["representations"] = repre_ents
return output
def register(session, plugins_presets={}):
StoreThumbnailsToAvalon(session, plugins_presets).register()

View file

@ -1,11 +1,15 @@
from . import avalon_sync
from . import credentials
from .ftrack_app_handler import *
from .ftrack_event_handler import *
from .ftrack_action_handler import *
from .ftrack_base_handler import *
from .ftrack_base_handler import BaseHandler
from .ftrack_event_handler import BaseEvent
from .ftrack_action_handler import BaseAction
from .ftrack_app_handler import AppAction
from .lib import (
get_project_from_entity,
get_avalon_entities_for_assetversion
)
__all__ = [
"avalon_sync",
"credentials",
"BaseHandler",
"BaseEvent",
"BaseAction",
"AppAction"
]

View file

@ -23,17 +23,13 @@ class BaseAction(BaseHandler):
def __init__(self, session, plugins_presets={}):
'''Expects a ftrack_api.Session instance'''
super().__init__(session, plugins_presets)
if self.label is None:
raise ValueError(
'Action missing label.'
)
raise ValueError('Action missing label.')
elif self.identifier is None:
raise ValueError(
'Action missing identifier.'
)
if self.identifier is None:
raise ValueError('Action missing identifier.')
super().__init__(session, plugins_presets)
def register(self):
'''
@ -61,66 +57,131 @@ class BaseAction(BaseHandler):
self._launch
)
def _launch(self, event):
args = self._translate_event(
self.session, event
def _discover(self, event):
entities = self._translate_event(event)
accepts = self.discover(self.session, entities, event)
if not accepts:
return
self.log.debug(u'Discovering action with selection: {0}'.format(
event['data'].get('selection', [])
))
return {
'items': [{
'label': self.label,
'variant': self.variant,
'description': self.description,
'actionIdentifier': self.identifier,
'icon': self.icon,
}]
}
def discover(self, session, entities, event):
'''Return true if we can handle the selected entities.
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the
entity id. If the entity is a hierarchical you will always get the
entity type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
return False
def _interface(self, session, entities, event):
interface = self.interface(session, entities, event)
if not interface:
return
if isinstance(interface, (tuple, list)):
return {"items": interface}
if isinstance(interface, dict):
if (
"items" in interface
or ("success" in interface and "message" in interface)
):
return interface
raise ValueError((
"Invalid interface output expected key: \"items\" or keys:"
" \"success\" and \"message\". Got: \"{}\""
).format(str(interface)))
raise ValueError(
"Invalid interface output type \"{}\"".format(
str(type(interface))
)
)
def interface(self, session, entities, event):
'''Return a interface if applicable or None
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and
the entity id. If the entity is a hierarchical you will always get the
entity type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
return None
def _launch(self, event):
entities = self._translate_event(event)
preactions_launched = self._handle_preactions(self.session, event)
if preactions_launched is False:
return
interface = self._interface(
self.session, *args
self.session, entities, event
)
if interface:
return interface
response = self.launch(
self.session, *args
self.session, entities, event
)
return self._handle_result(
self.session, response, *args
)
return self._handle_result(response)
def _handle_result(self, session, result, entities, event):
def _handle_result(self, result):
'''Validate the returned result from the action callback'''
if isinstance(result, bool):
if result is True:
result = {
'success': result,
'message': (
'{0} launched successfully.'.format(self.label)
)
}
msg = 'Action {0} finished.'
else:
result = {
'success': result,
'message': (
'{0} launch failed.'.format(self.label)
)
}
msg = 'Action {0} failed.'
elif isinstance(result, dict):
return {
'success': result,
'message': msg.format(self.label)
}
if isinstance(result, dict):
if 'items' in result:
items = result['items']
if not isinstance(items, list):
if not isinstance(result['items'], list):
raise ValueError('Invalid items format, must be list!')
else:
for key in ('success', 'message'):
if key in result:
continue
if key not in result:
raise KeyError('Missing required key: {0}.'.format(key))
return result
raise KeyError(
'Missing required key: {0}.'.format(key)
)
else:
self.log.error(
'Invalid result type must be bool or dictionary!'
)
self.log.warning((
'Invalid result type \"{}\" must be bool or dictionary!'
).format(str(type(result))))
return result

View file

@ -1,44 +1,35 @@
import os
import sys
import platform
from avalon import lib as avalonlib
import avalon.lib
import acre
from pype import api as pype
from pype import lib as pypelib
from pypeapp import config
from .ftrack_base_handler import BaseHandler
from .ftrack_action_handler import BaseAction
from pypeapp import Anatomy
class AppAction(BaseHandler):
'''Custom Action base class
class AppAction(BaseAction):
"""Application Action class.
<label> - a descriptive string identifing your action.
<varaint> - To group actions together, give them the same
label and specify a unique variant per action.
<identifier> - a unique identifier for app.
<description> - a verbose descriptive text for you action
<icon> - icon in ftrack
'''
Args:
session (ftrack_api.Session): Session where action will be registered.
label (str): A descriptive string identifing your action.
varaint (str, optional): To group actions together, give them the same
label and specify a unique variant per action.
identifier (str): An unique identifier for app.
description (str): A verbose descriptive text for you action.
icon (str): Url path to icon which will be shown in Ftrack web.
"""
type = 'Application'
preactions = ['start.timer']
type = "Application"
preactions = ["start.timer"]
def __init__(
self, session, label, name, executable, variant=None,
icon=None, description=None, preactions=[], plugins_presets={}
):
super().__init__(session, plugins_presets)
'''Expects a ftrack_api.Session instance'''
if label is None:
raise ValueError('Action missing label.')
elif name is None:
raise ValueError('Action missing identifier.')
elif executable is None:
raise ValueError('Action missing executable.')
self.label = label
self.identifier = name
self.executable = executable
@ -47,11 +38,19 @@ class AppAction(BaseHandler):
self.description = description
self.preactions.extend(preactions)
super().__init__(session, plugins_presets)
if label is None:
raise ValueError("Action missing label.")
if name is None:
raise ValueError("Action missing identifier.")
if executable is None:
raise ValueError("Action missing executable.")
def register(self):
'''Registers the action, subscribing the discover and launch topics.'''
"""Registers the action, subscribing the discover and launch topics."""
discovery_subscription = (
'topic=ftrack.action.discover and source.user.username={0}'
"topic=ftrack.action.discover and source.user.username={0}"
).format(self.session.api_user)
self.session.event_hub.subscribe(
@ -61,9 +60,9 @@ class AppAction(BaseHandler):
)
launch_subscription = (
'topic=ftrack.action.launch'
' and data.actionIdentifier={0}'
' and source.user.username={1}'
"topic=ftrack.action.launch"
" and data.actionIdentifier={0}"
" and source.user.username={1}"
).format(
self.identifier,
self.session.api_user
@ -74,7 +73,61 @@ class AppAction(BaseHandler):
)
def discover(self, session, entities, event):
'''Return true if we can handle the selected entities.
"""Return true if we can handle the selected entities.
Args:
session (ftrack_api.Session): Helps to query necessary data.
entities (list): Object of selected entities.
event (ftrack_api.Event): Ftrack event causing discover callback.
"""
if (
len(entities) != 1 or
entities[0].entity_type.lower() != "task"
):
return False
entity = entities[0]
if entity["parent"].entity_type.lower() == "project":
return False
ft_project = self.get_project_from_entity(entity)
database = pypelib.get_avalon_database()
project_name = ft_project["full_name"]
avalon_project = database[project_name].find_one({
"type": "project"
})
if not avalon_project:
return False
project_apps = avalon_project["config"].get("apps", [])
apps = [app["name"] for app in project_apps]
if self.identifier in apps:
return True
return False
def _launch(self, event):
entities = self._translate_event(event)
preactions_launched = self._handle_preactions(
self.session, event
)
if preactions_launched is False:
return
response = self.launch(self.session, entities, event)
return self._handle_result(response)
def launch(self, session, entities, event):
"""Callback method for the custom action.
return either a bool (True if successful or False if the action failed)
or a dictionary with they keys `message` and `success`, the message
should be a string and will be displayed as feedback to the user,
success should be a bool, True if successful or False if the action
failed.
*session* is a `ftrack_api.Session` instance
@ -85,90 +138,22 @@ class AppAction(BaseHandler):
or Asset Build.
*event* the unmodified original event
'''
if (
len(entities) != 1 or
entities[0].entity_type.lower() != 'task'
):
return False
if entities[0]['parent'].entity_type.lower() == 'project':
return False
ft_project = entities[0]['project']
database = pypelib.get_avalon_database()
project_name = ft_project['full_name']
avalon_project = database[project_name].find_one({
"type": "project"
})
if avalon_project is None:
return False
else:
apps = [app['name'] for app in avalon_project['config'].get(
'apps', []
)]
if self.identifier not in apps:
return False
return True
def _launch(self, event):
args = self._translate_event(
self.session, event
)
preactions_launched = self._handle_preactions(
self.session, event
)
if preactions_launched is False:
return
response = self.launch(
self.session, *args
)
return self._handle_result(
self.session, response, *args
)
def launch(self, session, entities, event):
'''Callback method for the custom action.
return either a bool ( True if successful or False if the action failed )
or a dictionary with they keys `message` and `success`, the message should be a
string and will be displayed as feedback to the user, success should be a bool,
True if successful or False if the action failed.
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the entity id.
If the entity is a hierarchical you will always get the entity
type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
"""
entity = entities[0]
project_name = entity['project']['full_name']
ft_project = self.get_project_from_entity(entity)
project_name = ft_project["full_name"]
database = pypelib.get_avalon_database()
# Get current environments
env_list = [
'AVALON_PROJECT',
'AVALON_SILO',
'AVALON_ASSET',
'AVALON_TASK',
'AVALON_APP',
'AVALON_APP_NAME'
"AVALON_PROJECT",
"AVALON_SILO",
"AVALON_ASSET",
"AVALON_TASK",
"AVALON_APP",
"AVALON_APP_NAME"
]
env_origin = {}
for env in env_list:
@ -176,37 +161,38 @@ class AppAction(BaseHandler):
# set environments for Avalon
os.environ["AVALON_PROJECT"] = project_name
os.environ["AVALON_SILO"] = entity['ancestors'][0]['name']
os.environ["AVALON_ASSET"] = entity['parent']['name']
os.environ["AVALON_TASK"] = entity['name']
os.environ["AVALON_SILO"] = entity["ancestors"][0]["name"]
os.environ["AVALON_ASSET"] = entity["parent"]["name"]
os.environ["AVALON_TASK"] = entity["name"]
os.environ["AVALON_APP"] = self.identifier.split("_")[0]
os.environ["AVALON_APP_NAME"] = self.identifier
anatomy = Anatomy()
anatomy = Anatomy(project_name)
asset_doc = database[project_name].find_one({
"type": "asset",
"name": entity["parent"]["name"]
})
parents = asset_doc["data"]["parents"]
hierarchy = ""
parents = database[project_name].find_one({
"type": 'asset',
"name": entity['parent']['name']
})['data']['parents']
if parents:
hierarchy = os.path.join(*parents)
os.environ["AVALON_HIERARCHY"] = hierarchy
application = avalonlib.get_application(os.environ["AVALON_APP_NAME"])
application = avalon.lib.get_application(os.environ["AVALON_APP_NAME"])
data = {
"root": os.environ.get("PYPE_STUDIO_PROJECTS_MOUNT"),
"project": {
"name": entity['project']['full_name'],
"code": entity['project']['name']
"name": ft_project["full_name"],
"code": ft_project["name"]
},
"task": entity['name'],
"asset": entity['parent']['name'],
"task": entity["name"],
"asset": entity["parent"]["name"],
"app": application["application_dir"],
"hierarchy": hierarchy,
"hierarchy": hierarchy
}
av_project = database[project_name].find_one({"type": 'project'})
@ -293,7 +279,7 @@ class AppAction(BaseHandler):
# Store subprocess to varaible. This is due to Blender launch
# bug. Please make sure Blender >=2.81 can be launched before
# remove `_popen` variable.
_popen = avalonlib.launch(
_popen = avalon.lib.launch(
executable=execfile, args=[], environment=env
)
else:
@ -340,7 +326,7 @@ class AppAction(BaseHandler):
# Store subprocess to varaible. This is due to Blender launch
# bug. Please make sure Blender >=2.81 can be launched before
# remove `_popen` variable.
_popen = avalonlib.launch(
_popen = avalon.lib.launch(
'/usr/bin/env', args=['bash', execfile], environment=env
)
else:

View file

@ -192,50 +192,10 @@ class BaseHandler(object):
raise NotImplementedError()
def _discover(self, event):
items = {
'items': [{
'label': self.label,
'variant': self.variant,
'description': self.description,
'actionIdentifier': self.identifier,
'icon': self.icon,
}]
}
args = self._translate_event(
self.session, event
)
accepts = self.discover(
self.session, *args
)
if accepts is True:
self.log.debug(u'Discovering action with selection: {0}'.format(
event['data'].get('selection', [])))
return items
def discover(self, session, entities, event):
'''Return true if we can handle the selected entities.
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the entity id.
If the entity is a hierarchical you will always get the entity
type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
return False
def _translate_event(self, session, event):
def _translate_event(self, event, session=None):
'''Return *event* translated structure to be used with the API.'''
if session is None:
session = self.session
_entities = event['data'].get('entities_object', None)
if (
@ -245,25 +205,40 @@ class BaseHandler(object):
) == ftrack_api.symbol.NOT_SET
):
_entities = self._get_entities(event)
event['data']['entities_object'] = _entities
return [
_entities,
event
]
return _entities
def _get_entities(self, event, session=None, ignore=None):
entities = []
selection = event['data'].get('selection')
if not selection:
return entities
if ignore is None:
ignore = []
elif isinstance(ignore, str):
ignore = [ignore]
filtered_selection = []
for entity in selection:
if entity['entityType'] not in ignore:
filtered_selection.append(entity)
if not filtered_selection:
return entities
def _get_entities(self, event, session=None):
if session is None:
session = self.session
session._local_cache.clear()
selection = event['data'].get('selection') or []
_entities = []
for entity in selection:
_entities.append(session.get(
for entity in filtered_selection:
entities.append(session.get(
self._get_entity_type(entity, session),
entity.get('entityId')
))
event['data']['entities_object'] = _entities
return _entities
return entities
def _get_entity_type(self, entity, session=None):
'''Return translated entity type tht can be used with API.'''
@ -292,30 +267,12 @@ class BaseHandler(object):
)
def _launch(self, event):
args = self._translate_event(
self.session, event
)
self.session.rollback()
self.session._local_cache.clear()
preactions_launched = self._handle_preactions(self.session, event)
if preactions_launched is False:
return
self.launch(self.session, event)
interface = self._interface(
self.session, *args
)
if interface:
return interface
response = self.launch(
self.session, *args
)
return self._handle_result(
self.session, response, *args
)
def launch(self, session, entities, event):
def launch(self, session, event):
'''Callback method for the custom action.
return either a bool ( True if successful or False if the action failed )
@ -360,35 +317,7 @@ class BaseHandler(object):
return False
def _interface(self, *args):
interface = self.interface(*args)
if interface:
if (
'items' in interface or
('success' in interface and 'message' in interface)
):
return interface
return {
'items': interface
}
def interface(self, session, entities, event):
'''Return a interface if applicable or None
*session* is a `ftrack_api.Session` instance
*entities* is a list of tuples each containing the entity type and the entity id.
If the entity is a hierarchical you will always get the entity
type TypedContext, once retrieved through a get operation you
will have the "real" entity type ie. example Shot, Sequence
or Asset Build.
*event* the unmodified original event
'''
return None
def _handle_result(self, session, result, entities, event):
def _handle_result(self, result):
'''Validate the returned result from the action callback'''
if isinstance(result, bool):
if result is True:
@ -417,11 +346,6 @@ class BaseHandler(object):
'Missing required key: {0}.'.format(key)
)
else:
self.log.error(
'Invalid result type must be bool or dictionary!'
)
return result
def show_message(self, event, input_message, result=False):
@ -623,3 +547,28 @@ class BaseHandler(object):
self.log.debug((
"Publishing event: {}"
).format(str(event.__dict__)))
def get_project_from_entity(self, entity):
low_entity_type = entity.entity_type.lower()
if low_entity_type == "project":
return entity
if "project" in entity:
# reviewsession, task(Task, Shot, Sequence,...)
return entity["project"]
if low_entity_type == "filecomponent":
entity = entity["version"]
low_entity_type = entity.entity_type.lower()
if low_entity_type == "assetversion":
asset = entity["asset"]
if asset:
parent = asset["parent"]
if parent:
return parent["project"]
project_data = entity["link"][0]
return self.session.query(
"Project where id is {}".format(project_data["id"])
).one()

View file

@ -43,35 +43,10 @@ class BaseEvent(BaseHandler):
priority=self.priority
)
def _launch(self, event):
self.session.rollback()
self.session._local_cache.clear()
self.launch(self.session, event)
def _translate_event(self, session, event):
def _translate_event(self, event, session=None):
'''Return *event* translated structure to be used with the API.'''
return [
self._get_entities(session, event),
event
]
def _get_entities(
self, session, event, ignore=['socialfeed', 'socialnotification']
):
_selection = event['data'].get('entities', [])
_entities = list()
if isinstance(ignore, str):
ignore = list(ignore)
for entity in _selection:
if entity['entityType'] in ignore:
continue
_entities.append(
(
session.get(
self._get_entity_type(entity),
entity.get('entityId')
)
)
)
return _entities
return self._get_entities(
event,
session,
ignore=['socialfeed', 'socialnotification']
)

View file

@ -1,135 +0,0 @@
from bson.objectid import ObjectId
from .avalon_sync import CustAttrIdKey
import avalon.io
def get_project_from_entity(entity):
# TODO add more entities
ent_type_lowered = entity.entity_type.lower()
if ent_type_lowered == "project":
return entity
elif ent_type_lowered == "assetversion":
return entity["asset"]["parent"]["project"]
elif "project" in entity:
return entity["project"]
return None
def get_avalon_entities_for_assetversion(asset_version, db_con=None):
output = {
"success": True,
"message": None,
"project": None,
"project_name": None,
"asset": None,
"asset_name": None,
"asset_path": None,
"subset": None,
"subset_name": None,
"version": None,
"version_name": None,
"representations": None
}
if db_con is None:
db_con = avalon.io
db_con.install()
ft_asset = asset_version["asset"]
subset_name = ft_asset["name"]
version = asset_version["version"]
parent = ft_asset["parent"]
ent_path = "/".join(
[ent["name"] for ent in parent["link"]]
)
project = get_project_from_entity(asset_version)
project_name = project["full_name"]
output["project_name"] = project_name
output["asset_name"] = parent["name"]
output["asset_path"] = ent_path
output["subset_name"] = subset_name
output["version_name"] = version
db_con.Session["AVALON_PROJECT"] = project_name
avalon_project = db_con.find_one({"type": "project"})
output["project"] = avalon_project
if not avalon_project:
output["success"] = False
output["message"] = "Project not synchronized to avalon `{}`".format(
project_name
)
return output
asset_ent = None
asset_mongo_id = parent["custom_attributes"].get(CustAttrIdKey)
if asset_mongo_id:
try:
asset_mongo_id = ObjectId(asset_mongo_id)
asset_ent = db_con.find_one({
"type": "asset",
"_id": asset_mongo_id
})
except Exception:
pass
if not asset_ent:
asset_ent = db_con.find_one({
"type": "asset",
"data.ftrackId": parent["id"]
})
output["asset"] = asset_ent
if not asset_ent:
output["success"] = False
output["message"] = "Not synchronized entity to avalon `{}`".format(
ent_path
)
return output
asset_mongo_id = asset_ent["_id"]
subset_ent = db_con.find_one({
"type": "subset",
"parent": asset_mongo_id,
"name": subset_name
})
output["subset"] = subset_ent
if not subset_ent:
output["success"] = False
output["message"] = (
"Subset `{}` does not exist under Asset `{}`"
).format(subset_name, ent_path)
return output
version_ent = db_con.find_one({
"type": "version",
"name": version,
"parent": subset_ent["_id"]
})
output["version"] = version_ent
if not version_ent:
output["success"] = False
output["message"] = (
"Version `{}` does not exist under Subset `{}` | Asset `{}`"
).format(version, subset_name, ent_path)
return output
repre_ents = list(db_con.find({
"type": "representation",
"parent": version_ent["_id"]
}))
output["representations"] = repre_ents
return output