mirror of
https://github.com/ynput/ayon-core.git
synced 2026-01-02 00:44:52 +01:00
action_create_fodler uses new anatomy
This commit is contained in:
parent
3713559915
commit
4a3b98c945
1 changed files with 109 additions and 189 deletions
|
|
@ -1,30 +1,16 @@
|
|||
import os
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
import re
|
||||
|
||||
import ftrack_api
|
||||
from pype.ftrack import BaseAction
|
||||
from avalon import lib as avalonlib
|
||||
from pype.ftrack.lib.io_nonsingleton import DbConnector
|
||||
from pypeapp import config, Anatomy
|
||||
|
||||
|
||||
class CreateFolders(BaseAction):
|
||||
#: Action identifier.
|
||||
identifier = 'create.folders'
|
||||
|
||||
#: Action label.
|
||||
label = 'Create Folders'
|
||||
|
||||
#: Action Icon.
|
||||
icon = '{}/ftrack/action_icons/CreateFolders.svg'.format(
|
||||
os.environ.get('PYPE_STATICS_SERVER', '')
|
||||
identifier = "create.folders"
|
||||
label = "Create Folders"
|
||||
icon = "{}/ftrack/action_icons/CreateFolders.svg".format(
|
||||
os.environ.get("PYPE_STATICS_SERVER", "")
|
||||
)
|
||||
|
||||
db = DbConnector()
|
||||
|
||||
def discover(self, session, entities, event):
|
||||
if len(entities) != 1:
|
||||
return False
|
||||
|
|
@ -90,49 +76,52 @@ class CreateFolders(BaseAction):
|
|||
with_childrens = event["data"]["values"]["children_included"]
|
||||
|
||||
entity = entities[0]
|
||||
if entity.entity_type.lower() == 'project':
|
||||
if entity.entity_type.lower() == "project":
|
||||
proj = entity
|
||||
else:
|
||||
proj = entity['project']
|
||||
project_name = proj['full_name']
|
||||
project_code = proj['name']
|
||||
proj = entity["project"]
|
||||
project_name = proj["full_name"]
|
||||
project_code = proj["name"]
|
||||
|
||||
if entity.entity_type.lower() == 'project' and with_childrens == False:
|
||||
if entity.entity_type.lower() == 'project' and with_childrens is False:
|
||||
return {
|
||||
'success': True,
|
||||
'message': 'Nothing was created'
|
||||
}
|
||||
data = {
|
||||
"root": os.environ["AVALON_PROJECTS"],
|
||||
"project": {
|
||||
"name": project_name,
|
||||
"code": project_code
|
||||
}
|
||||
}
|
||||
|
||||
all_entities = []
|
||||
all_entities.append(entity)
|
||||
if with_childrens:
|
||||
all_entities = self.get_notask_children(entity)
|
||||
|
||||
av_project = None
|
||||
try:
|
||||
self.db.install()
|
||||
self.db.Session['AVALON_PROJECT'] = project_name
|
||||
av_project = self.db.find_one({'type': 'project'})
|
||||
template_work = av_project['config']['template']['work']
|
||||
template_publish = av_project['config']['template']['publish']
|
||||
self.db.uninstall()
|
||||
except Exception:
|
||||
templates = Anatomy().templates
|
||||
template_work = templates["avalon"]["work"]
|
||||
template_publish = templates["avalon"]["publish"]
|
||||
anatomy = Anatomy(project_name)
|
||||
|
||||
work_keys = ["work", "folder"]
|
||||
work_template = anatomy.templates
|
||||
for key in work_keys:
|
||||
work_template = work_template[key]
|
||||
work_has_apps = "{app" in work_template
|
||||
|
||||
publish_keys = ["publish", "folder"]
|
||||
publish_template = anatomy.templates
|
||||
for key in publish_keys:
|
||||
publish_template = publish_template[key]
|
||||
publish_has_apps = "{app" in publish_template
|
||||
|
||||
presets = config.get_presets()
|
||||
app_presets = presets.get("tools", {}).get("sw_folders")
|
||||
cached_apps = {}
|
||||
|
||||
collected_paths = []
|
||||
presets = config.get_presets()["tools"]["sw_folders"]
|
||||
for entity in all_entities:
|
||||
if entity.entity_type.lower() == "project":
|
||||
continue
|
||||
ent_data = data.copy()
|
||||
ent_data = {
|
||||
"project": {
|
||||
"name": project_name,
|
||||
"code": project_code
|
||||
}
|
||||
}
|
||||
|
||||
ent_data["asset"] = entity["name"]
|
||||
|
||||
|
|
@ -144,69 +133,72 @@ class CreateFolders(BaseAction):
|
|||
ent_data["hierarchy"] = hierarchy
|
||||
|
||||
tasks_created = False
|
||||
if entity['children']:
|
||||
for child in entity['children']:
|
||||
if child['object_type']['name'].lower() != 'task':
|
||||
continue
|
||||
tasks_created = True
|
||||
task_type_name = child['type']['name'].lower()
|
||||
task_data = ent_data.copy()
|
||||
task_data['task'] = child['name']
|
||||
possible_apps = presets.get(task_type_name, [])
|
||||
template_work_created = False
|
||||
template_publish_created = False
|
||||
apps = []
|
||||
for child in entity["children"]:
|
||||
if child["object_type"]["name"].lower() != "task":
|
||||
continue
|
||||
tasks_created = True
|
||||
task_type_name = child["type"]["name"].lower()
|
||||
task_data = ent_data.copy()
|
||||
task_data["task"] = child["name"]
|
||||
|
||||
apps = []
|
||||
if app_presets and (work_has_apps or publish_has_apps):
|
||||
possible_apps = app_presets.get(task_type_name, [])
|
||||
for app in possible_apps:
|
||||
try:
|
||||
app_data = avalonlib.get_application(app)
|
||||
app_dir = app_data['application_dir']
|
||||
except ValueError:
|
||||
app_dir = app
|
||||
if app in cached_apps:
|
||||
app_dir = cached_apps[app]
|
||||
else:
|
||||
try:
|
||||
app_data = avalonlib.get_application(app)
|
||||
app_dir = app_data["application_dir"]
|
||||
except ValueError:
|
||||
app_dir = app
|
||||
cached_apps[app] = app_dir
|
||||
apps.append(app_dir)
|
||||
|
||||
# Template wok
|
||||
if '{app}' in template_work:
|
||||
for app in apps:
|
||||
template_work_created = True
|
||||
app_data = task_data.copy()
|
||||
app_data['app'] = app
|
||||
collected_paths.append(
|
||||
self.compute_template(
|
||||
template_work, app_data
|
||||
)
|
||||
)
|
||||
if template_work_created is False:
|
||||
collected_paths.append(
|
||||
self.compute_template(template_work, task_data)
|
||||
)
|
||||
# Template publish
|
||||
if '{app}' in template_publish:
|
||||
for app in apps:
|
||||
template_publish_created = True
|
||||
app_data = task_data.copy()
|
||||
app_data['app'] = app
|
||||
collected_paths.append(
|
||||
self.compute_template(
|
||||
template_publish, app_data, True
|
||||
)
|
||||
)
|
||||
if template_publish_created is False:
|
||||
collected_paths.append(
|
||||
self.compute_template(
|
||||
template_publish, task_data, True
|
||||
)
|
||||
)
|
||||
# Template wok
|
||||
if work_has_apps:
|
||||
app_data = task_data.copy()
|
||||
for app in apps:
|
||||
app_data["app"] = app
|
||||
collected_paths.append(self.compute_template(
|
||||
anatomy, app_data, work_keys
|
||||
))
|
||||
else:
|
||||
collected_paths.append(self.compute_template(
|
||||
anatomy, task_data, work_keys
|
||||
))
|
||||
|
||||
# Template publish
|
||||
if publish_has_apps:
|
||||
app_data = task_data.copy()
|
||||
for app in apps:
|
||||
app_data["app"] = app
|
||||
collected_paths.append(self.compute_template(
|
||||
anatomy, app_data, publish_keys
|
||||
))
|
||||
else:
|
||||
collected_paths.append(self.compute_template(
|
||||
anatomy, task_data, publish_keys
|
||||
))
|
||||
|
||||
if not tasks_created:
|
||||
# create path for entity
|
||||
collected_paths.append(
|
||||
self.compute_template(template_work, ent_data)
|
||||
)
|
||||
collected_paths.append(
|
||||
self.compute_template(template_publish, ent_data)
|
||||
)
|
||||
if len(collected_paths) > 0:
|
||||
self.log.info('Creating folders:')
|
||||
collected_paths.append(self.compute_template(
|
||||
anatomy, ent_data, work_keys
|
||||
))
|
||||
collected_paths.append(self.compute_template(
|
||||
anatomy, ent_data, publish_keys
|
||||
))
|
||||
|
||||
if len(collected_paths) == 0:
|
||||
return {
|
||||
"success": True,
|
||||
"message": "No project folders to create."
|
||||
}
|
||||
|
||||
self.log.info("Creating folders:")
|
||||
|
||||
for path in set(collected_paths):
|
||||
self.log.info(path)
|
||||
if not os.path.exists(path):
|
||||
|
|
@ -219,100 +211,28 @@ class CreateFolders(BaseAction):
|
|||
|
||||
def get_notask_children(self, entity):
|
||||
output = []
|
||||
if entity.get('object_type', {}).get(
|
||||
'name', entity.entity_type
|
||||
).lower() == 'task':
|
||||
if entity.entity_type.lower() == "task":
|
||||
return output
|
||||
else:
|
||||
output.append(entity)
|
||||
if entity['children']:
|
||||
for child in entity['children']:
|
||||
output.extend(self.get_notask_children(child))
|
||||
|
||||
output.append(entity)
|
||||
for child in entity["children"]:
|
||||
output.extend(self.get_notask_children(child))
|
||||
return output
|
||||
|
||||
def template_format(self, template, data):
|
||||
def compute_template(self, anatomy, data, anatomy_keys):
|
||||
filled_template = anatomy.format_all(data)
|
||||
for key in anatomy_keys:
|
||||
filled_template = filled_template[key]
|
||||
|
||||
partial_data = PartialDict(data)
|
||||
if filled_template.solved:
|
||||
return os.path.normpath(filled_template)
|
||||
|
||||
# remove subdict items from string (like 'project[name]')
|
||||
subdict = PartialDict()
|
||||
count = 1
|
||||
store_pattern = 5*'_'+'{:0>3}'
|
||||
regex_patern = "\{\w*\[[^\}]*\]\}"
|
||||
matches = re.findall(regex_patern, template)
|
||||
|
||||
for match in matches:
|
||||
key = store_pattern.format(count)
|
||||
subdict[key] = match
|
||||
template = template.replace(match, '{'+key+'}')
|
||||
count += 1
|
||||
# solve fillind keys with optional keys
|
||||
solved = self._solve_with_optional(template, partial_data)
|
||||
# try to solve subdict and replace them back to string
|
||||
for k, v in subdict.items():
|
||||
try:
|
||||
v = v.format_map(data)
|
||||
except (KeyError, TypeError):
|
||||
pass
|
||||
subdict[k] = v
|
||||
|
||||
return solved.format_map(subdict)
|
||||
|
||||
def _solve_with_optional(self, template, data):
|
||||
# Remove optional missing keys
|
||||
pattern = re.compile(r"(<.*?[^{0]*>)[^0-9]*?")
|
||||
invalid_optionals = []
|
||||
for group in pattern.findall(template):
|
||||
try:
|
||||
group.format(**data)
|
||||
except KeyError:
|
||||
invalid_optionals.append(group)
|
||||
for group in invalid_optionals:
|
||||
template = template.replace(group, "")
|
||||
|
||||
solved = template.format_map(data)
|
||||
|
||||
# solving after format optional in second round
|
||||
for catch in re.compile(r"(<.*?[^{0]*>)[^0-9]*?").findall(solved):
|
||||
if "{" in catch:
|
||||
# remove all optional
|
||||
solved = solved.replace(catch, "")
|
||||
else:
|
||||
# Remove optional symbols
|
||||
solved = solved.replace(catch, catch[1:-1])
|
||||
|
||||
return solved
|
||||
|
||||
def compute_template(self, str, data, task=False):
|
||||
first_result = self.template_format(str, data)
|
||||
if first_result == first_result.split('{')[0]:
|
||||
return os.path.normpath(first_result)
|
||||
if task:
|
||||
return os.path.normpath(first_result.split('{')[0])
|
||||
|
||||
index = first_result.index('{')
|
||||
|
||||
regex = '\{\w*[^\}]*\}'
|
||||
match = re.findall(regex, first_result[index:])[0]
|
||||
without_missing = str.split(match)[0].split('}')
|
||||
output_items = []
|
||||
for part in without_missing:
|
||||
if '{' in part:
|
||||
output_items.append(part + '}')
|
||||
return os.path.normpath(
|
||||
self.template_format(''.join(output_items), data)
|
||||
self.log.warning(
|
||||
"Template \"{}\" was not fully filled \"{}\"".format(
|
||||
filled_template.template, filled_template
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class PartialDict(dict):
|
||||
def __getitem__(self, item):
|
||||
out = super().__getitem__(item)
|
||||
if isinstance(out, dict):
|
||||
return '{'+item+'}'
|
||||
return out
|
||||
|
||||
def __missing__(self, key):
|
||||
return '{'+key+'}'
|
||||
return os.path.normpath(filled_template.split("{")[0])
|
||||
|
||||
|
||||
def register(session, plugins_presets={}):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue