diff --git a/openpype/lib/__init__.py b/openpype/lib/__init__.py index c97545fdf4..65974ba479 100644 --- a/openpype/lib/__init__.py +++ b/openpype/lib/__init__.py @@ -109,7 +109,8 @@ from .applications import ( get_app_environments_for_context, apply_project_environments_value, - compile_list_of_regexes + compile_list_of_regexes, + get_custom_workfile_template ) from .profiles_filtering import filter_profiles @@ -213,6 +214,7 @@ __all__ = [ "apply_project_environments_value", "compile_list_of_regexes", + "get_custom_workfile_template", "filter_profiles", diff --git a/openpype/lib/applications.py b/openpype/lib/applications.py index d82b7cd847..16af4c1b4c 100644 --- a/openpype/lib/applications.py +++ b/openpype/lib/applications.py @@ -1396,3 +1396,95 @@ def compile_list_of_regexes(in_list): " Expected string based object. Skipping." ).format(str(type(item)), str(item))) return regexes + + +def _get_basic_context_data_for_anatomy(env=None): + """ Prepare Task context for anatomy data. + Args: + env (dict): Initial environment variables. `os.environ` is used when + not passed. + + Returns: + dict: With Anatomy context data. + """ + from ..settings.lib import get_default_anatomy_settings + from avalon.api import AvalonMongoDB + env = env or dict(os.environ) + + default_anatomy_tasks = get_default_anatomy_settings()["tasks"] + project_name = env["AVALON_PROJECT"] + asset_name = env["AVALON_ASSET"] + task_name = env["AVALON_TASK"] + + # Avalon database connection + dbcon = AvalonMongoDB() + dbcon.Session["AVALON_PROJECT"] = project_name + dbcon.install() + + # Project document + project_doc = dbcon.find_one({"type": "project"}) + asset_doc = dbcon.find_one({ + "type": "asset", + "name": asset_name + }) + + # Discard avalon connection + dbcon.uninstall() + + # get relevant task type from asset doc + task_type = None + for task_t, task_n in asset_doc["data"]["tasks"].items(): + if task_name == task_n: + task_type = task_t + break + + assert task_type, ( + "Task type cannot be found in on asset `{}` " + "with given task name `{}`").format(asset_name, task_name) + + # get short name for task type defined in default anatomy settings + default_anatomy_task = default_anatomy_tasks[task_type] + assert default_anatomy_task, ( + "Something went wrong. Default anatomy tasks are not holding" + "requested task type: `{}`".format(task_type) + ) + + return { + "project": { + "name": project_doc["name"], + "code": project_doc["data"].get("code") + }, + "asset": asset_name, + "task": { + "name": task_name, + "type": task_type, + "sort_name": default_anatomy_task["short_name"] + } + } + + +def get_custom_workfile_template(custom_templates: list): + anatomy = Anatomy() + + # get project, asset, task anatomy context data + anatomy_context_data = _get_basic_context_data_for_anatomy() + # add root dict + anatomy_context_data.update(anatomy.roots()) + + # get task type for the task in context + test_task_type = anatomy_context_data["task"]["type"] + + # get path from matching profile + path = None + for templ in custom_templates: + for task_t in templ["task_types"]: + if task_t.lower() != test_task_type.lower(): + continue + path = templ["path"][platform.system().lower()] + + # when path is available try to format it in case + # there are some anatomy template strings + if path: + return path.format(**anatomy_context_data) + + return False