Merged in feature/PYPE-140_software_folders (pull request #115)

Feature/PYPE-140 software folders

Approved-by: Milan Kolar <milan@orbi.tools>
This commit is contained in:
Jakub Trllo 2019-04-12 14:49:35 +00:00 committed by Milan Kolar
commit 09d0a14633
3 changed files with 326 additions and 260 deletions

View file

@ -1,13 +1,15 @@
import logging
import os
import argparse
import sys
import errno
import logging
import argparse
import re
import json
import ftrack_api
from pype.ftrack import BaseAction
import json
from pype import api as pype
from pype import api as pype, lib as pypelib
from avalon import lib as avalonlib
from avalon.tools.libraryloader.io_nonsingleton import DbConnector
class CreateFolders(BaseAction):
@ -25,111 +27,312 @@ class CreateFolders(BaseAction):
'https://cdn1.iconfinder.com/data/icons/hawcons/32/'
'698620-icon-105-folder-add-512.png'
)
db = DbConnector()
def discover(self, session, entities, event):
''' Validation '''
not_allowed = ['assetversion']
if len(entities) != 1:
return False
if entities[0].entity_type.lower() in not_allowed:
return False
return True
def getShotAsset(self, entity):
if entity not in self.importable:
if entity['object_type']['name'] != 'Task':
self.importable.add(entity)
def interface(self, session, entities, event):
if event['data'].get('values', {}):
return
entity = entities[0]
without_interface = True
for child in entity['children']:
if child['object_type']['name'].lower() != 'task':
without_interface = False
break
self.without_interface = without_interface
if without_interface:
return
title = 'Create folders'
if entity['children']:
children = entity['children']
for child in children:
self.getShotAsset(child)
entity_name = entity['name']
msg = (
'<h2>Do you want create folders also'
' for all children of "{}"?</h2>'
)
if entity.entity_type.lower() == 'project':
entity_name = entity['full_name']
msg = msg.replace(' also', '')
msg += '<h3>(Project root won\'t be created if not checked)</h3>'
items = []
item_msg = {
'type': 'label',
'value': msg.format(entity_name)
}
item_label = {
'type': 'label',
'value': 'With all chilren entities'
}
item = {
'name': 'children_included',
'type': 'boolean',
'value': False
}
items.append(item_msg)
items.append(item_label)
items.append(item)
if len(items) == 0:
return {
'success': False,
'message': 'Didn\'t found any running jobs'
}
else:
return {
'items': items,
'title': title
}
def launch(self, session, entities, event):
'''Callback method for custom action.'''
with_childrens = True
if self.without_interface is False:
if 'values' not in event['data']:
return
with_childrens = event['data']['values']['children_included']
entity = entities[0]
if entity.entity_type.lower() == 'project':
proj = entity
else:
proj = entity['project']
project_name = proj['full_name']
project_code = proj['name']
if entity.entity_type.lower() == 'project' and with_childrens == False:
return {
'success': True,
'message': 'Nothing was created'
}
data = {
"root": os.environ["AVALON_PROJECTS"],
"project": {
"name": project_name,
"code": project_code
}
}
all_entities = []
all_entities.append(entity)
if with_childrens:
all_entities = self.get_notask_children(entity)
#######################################################################
# JOB SETTINGS
userId = event['source']['user']['id']
user = session.query('User where id is ' + userId).one()
job = session.create('Job', {
'user': user,
'status': 'running',
'data': json.dumps({
'description': 'Creating Folders.'
})
})
av_project = None
try:
self.importable = set([])
# self.importable = []
self.Anatomy = pype.Anatomy
project = entities[0]['project']
paths_collected = set([])
# get all child entities separately/unique
for entity in entities:
self.getShotAsset(entity)
for ent in self.importable:
self.log.info("{}".format(ent['name']))
for entity in self.importable:
print(entity['name'])
anatomy = pype.Anatomy
parents = entity['link']
hierarchy_names = []
for p in parents[1:-1]:
hierarchy_names.append(p['name'])
if hierarchy_names:
# hierarchy = os.path.sep.join(hierarchy)
hierarchy = os.path.join(*hierarchy_names)
template_data = {"project": {"name": project['full_name'],
"code": project['name']},
"asset": entity['name'],
"hierarchy": hierarchy}
for task in entity['children']:
if task['object_type']['name'] == 'Task':
self.log.info('child: {}'.format(task['name']))
template_data['task'] = task['name']
anatomy_filled = anatomy.format(template_data)
paths_collected.add(anatomy_filled.work.folder)
paths_collected.add(anatomy_filled.publish.folder)
for path in paths_collected:
self.log.info(path)
try:
os.makedirs(path)
except OSError as error:
if error.errno != errno.EEXIST:
raise
job['status'] = 'done'
session.commit()
except ValueError as ve:
job['status'] = 'failed'
session.commit()
message = str(ve)
self.log.error('Error during syncToAvalon: {}'.format(message))
self.db.install()
self.db.Session['AVALON_PROJECT'] = project_name
av_project = self.db.find_one({'type': 'project'})
template_work = av_project['config']['template']['work']
template_publish = av_project['config']['template']['publish']
self.db.uninstall()
except Exception:
job['status'] = 'failed'
session.commit()
anatomy = pype.Anatomy
template_work = anatomy.avalon.work
template_publish = anatomy.avalon.publish
#######################################################################
collected_paths = []
presets = self.get_presets()
for entity in all_entities:
if entity.entity_type.lower() == 'project':
continue
ent_data = data.copy()
asset_name = entity['name']
ent_data['asset'] = asset_name
parents = entity['link']
hierarchy_names = [p['name'] for p in parents[1:-1]]
hierarchy = ''
if hierarchy_names:
hierarchy = os.path.sep.join(hierarchy_names)
ent_data['hierarchy'] = hierarchy
tasks_created = False
if entity['children']:
for child in entity['children']:
if child['object_type']['name'].lower() != 'task':
continue
tasks_created = True
task_type_name = child['type']['name'].lower()
task_data = ent_data.copy()
task_data['task'] = child['name']
possible_apps = presets.get(task_type_name, [])
template_work_created = False
template_publish_created = False
apps = []
for app in possible_apps:
try:
app_data = avalonlib.get_application(app)
app_dir = app_data['application_dir']
except ValueError:
app_dir = app
apps.append(app_dir)
# Template wok
if '{app}' in template_work:
for app in apps:
template_work_created = True
app_data = task_data.copy()
app_data['app'] = app
collected_paths.append(
self.compute_template(
template_work, app_data
)
)
if template_work_created is False:
collected_paths.append(
self.compute_template(template_work, task_data)
)
# Template publish
if '{app}' in template_publish:
for app in apps:
template_publish_created = True
app_data = task_data.copy()
app_data['app'] = app
collected_paths.append(
self.compute_template(
template_publish, app_data, True
)
)
if template_publish_created is False:
collected_paths.append(
self.compute_template(
template_publish, task_data, True
)
)
if not tasks_created:
# create path for entity
collected_paths.append(
self.compute_template(template_work, ent_data)
)
collected_paths.append(
self.compute_template(template_publish, ent_data)
)
if len(collected_paths) > 0:
self.log.info('Creating folders:')
for path in set(collected_paths):
self.log.info(path)
if not os.path.exists(path):
os.makedirs(path)
return {
'success': True,
'message': 'Created Folders Successfully!'
}
def get_notask_children(self, entity):
output = []
if entity.get('object_type', {}).get(
'name', entity.entity_type
).lower() == 'task':
return output
else:
output.append(entity)
if entity['children']:
for child in entity['children']:
output.extend(self.get_notask_children(child))
return output
def get_presets(self):
fpath_items = [pypelib.get_presets_path(), 'tools', 'sw_folders.json']
filepath = os.path.normpath(os.path.sep.join(fpath_items))
presets = dict()
try:
with open(filepath) as data_file:
presets = json.load(data_file)
except Exception as e:
self.log.warning('Wasn\'t able to load presets')
return dict(presets)
def template_format(self, template, data):
partial_data = PartialDict(data)
# remove subdict items from string (like 'project[name]')
subdict = PartialDict()
count = 1
store_pattern = 5*'_'+'{:0>3}'
regex_patern = "\{\w*\[[^\}]*\]\}"
matches = re.findall(regex_patern, template)
for match in matches:
key = store_pattern.format(count)
subdict[key] = match
template = template.replace(match, '{'+key+'}')
count += 1
# solve fillind keys with optional keys
solved = self._solve_with_optional(template, partial_data)
# try to solve subdict and replace them back to string
for k, v in subdict.items():
try:
v = v.format_map(data)
except (KeyError, TypeError):
pass
subdict[k] = v
return solved.format_map(subdict)
def _solve_with_optional(self, template, data):
# Remove optional missing keys
pattern = re.compile(r"(<.*?[^{0]*>)[^0-9]*?")
invalid_optionals = []
for group in pattern.findall(template):
try:
group.format(**data)
except KeyError:
invalid_optionals.append(group)
for group in invalid_optionals:
template = template.replace(group, "")
solved = template.format_map(data)
# solving after format optional in second round
for catch in re.compile(r"(<.*?[^{0]*>)[^0-9]*?").findall(solved):
if "{" in catch:
# remove all optional
solved = solved.replace(catch, "")
else:
# Remove optional symbols
solved = solved.replace(catch, catch[1:-1])
return solved
def compute_template(self, str, data, task=False):
first_result = self.template_format(str, data)
if first_result == first_result.split('{')[0]:
return os.path.normpath(first_result)
if task:
return os.path.normpath(first_result.split('{')[0])
index = first_result.index('{')
regex = '\{\w*[^\}]*\}'
match = re.findall(regex, first_result[index:])[0]
without_missing = str.split(match)[0].split('}')
output_items = []
for part in without_missing:
if '{' in part:
output_items.append(part + '}')
return os.path.normpath(
self.template_format(''.join(output_items), data)
)
class PartialDict(dict):
def __getitem__(self, item):
out = super().__getitem__(item)
if isinstance(out, dict):
return '{'+item+'}'
return out
def __missing__(self, key):
return '{'+key+'}'
def register(session, **kw):
'''Register plugin. Called when used as an plugin.'''

View file

@ -1,155 +0,0 @@
import os
import sys
import json
import argparse
import logging
import ftrack_api
from avalon import lib as avalonlib
from avalon.tools.libraryloader.io_nonsingleton import DbConnector
from pype import lib as pypelib
from pype.ftrack import BaseAction
class CreateSWFolders(BaseAction):
'''Edit meta data action.'''
#: Action identifier.
identifier = 'create.sw.folders'
#: Action label.
label = 'Create SW Folders'
#: Action description.
description = 'Creates folders for all SW in project'
def __init__(self, session):
super().__init__(session)
self.avalon_db = DbConnector()
self.avalon_db.install()
def discover(self, session, entities, event):
''' Validation '''
return True
def launch(self, session, entities, event):
if len(entities) != 1:
self.log.warning(
'There are more entities in selection!'
)
return False
entity = entities[0]
if entity.entity_type.lower() != 'task':
self.log.warning(
'Selected entity is not Task!'
)
return False
asset = entity['parent']
project = asset['project']
project_name = project["full_name"]
self.avalon_db.Session['AVALON_PROJECT'] = project_name
av_project = self.avalon_db.find_one({'type': 'project'})
av_asset = self.avalon_db.find_one({
'type': 'asset',
'name': asset['name']
})
templates = av_project["config"]["template"]
template = templates.get("work", None)
if template is None:
return False
data = {
"root": os.environ["AVALON_PROJECTS"],
"project": {
"name": project_name,
"code": project["name"]
},
"hierarchy": av_asset['data']['hierarchy'],
"asset": asset['name'],
"task": entity['name'],
}
apps = []
if '{app}' in template:
# Apps in project
for app in av_project['data']['applications']:
app_data = avalonlib.get_application(app)
app_dir = app_data['application_dir']
if app_dir not in apps:
apps.append(app_dir)
# Apps in presets
path_items = [pypelib.get_presets_path(), 'tools', 'sw_folders.json']
filepath = os.path.sep.join(path_items)
presets = dict()
try:
with open(filepath) as data_file:
presets = json.load(data_file)
except Exception as e:
self.log.warning('Wasn\'t able to load presets')
preset_apps = presets.get(project_name, presets.get('__default__', []))
for app in preset_apps:
if app not in apps:
apps.append(app)
# Create folders for apps
for app in apps:
data['app'] = app
self.log.info('Created folder for app {}'.format(app))
path = os.path.normpath(template.format(**data))
if os.path.exists(path):
continue
os.makedirs(path)
return True
def register(session, **kw):
'''Register plugin. Called when used as an plugin.'''
if not isinstance(session, ftrack_api.session.Session):
return
CreateSWFolders(session).register()
def main(arguments=None):
'''Set up logging and register action.'''
if arguments is None:
arguments = []
parser = argparse.ArgumentParser()
# Allow setting of logging level from arguments.
loggingLevels = {}
for level in (
logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL
):
loggingLevels[logging.getLevelName(level).lower()] = level
parser.add_argument(
'-v', '--verbosity',
help='Set the logging output verbosity.',
choices=loggingLevels.keys(),
default='info'
)
namespace = parser.parse_args(arguments)
# Set up basic logging
logging.basicConfig(level=loggingLevels[namespace.verbosity])
session = ftrack_api.Session()
register(session)
# Wait for events
logging.info(
'Registered actions and listening for events. Use Ctrl-C to abort.'
)
session.event_hub.wait()
if __name__ == '__main__':
raise SystemExit(main(sys.argv[1:]))

View file

@ -200,21 +200,39 @@ class AppAction(BaseHandler):
application = avalonlib.get_application(os.environ["AVALON_APP_NAME"])
data = {"project": {"name": entity['project']['full_name'],
"code": entity['project']['name']},
"task": entity['name'],
"asset": entity['parent']['name'],
"app": application["application_dir"],
"hierarchy": hierarchy}
try:
anatomy = anatomy.format(data)
except Exception as e:
self.log.error(
"{0} Error in anatomy.format: {1}".format(__name__, e)
data = {
"root": os.environ["AVALON_PROJECTS"],
"project": {
"name": entity['project']['full_name'],
"code": entity['project']['name']
},
"task": entity['name'],
"asset": entity['parent']['name'],
"app": application["application_dir"],
"hierarchy": hierarchy,
}
av_project = database[project_name].find_one({"type": 'project'})
templates = None
if av_project:
work_template = av_project.get('config', {}).get('template', {}).get(
'work', None
)
os.environ["AVALON_WORKDIR"] = os.path.join(
anatomy.work.root, anatomy.work.folder
)
work_template = None
try:
work_template = work_template.format(**data)
except Exception:
try:
anatomy = anatomy.format(data)
work_template = os.path.join(
anatomy.work.root,
anatomy.work.folder
)
except Exception as e:
self.log.error(
"{0} Error in anatomy.format: {1}".format(__name__, e)
)
os.environ["AVALON_WORKDIR"] = os.path.normpath(work_template)
# collect all parents from the task
parents = []