diff --git a/openpype/lib/__init__.py b/openpype/lib/__init__.py index 65974ba479..6ed974bfb0 100644 --- a/openpype/lib/__init__.py +++ b/openpype/lib/__init__.py @@ -81,6 +81,8 @@ from .avalon_context import ( get_creator_by_name, + get_custom_workfile_template, + change_timer_to_current_context ) diff --git a/openpype/lib/applications.py b/openpype/lib/applications.py index 0abfc1bd45..d82b7cd847 100644 --- a/openpype/lib/applications.py +++ b/openpype/lib/applications.py @@ -1396,95 +1396,3 @@ def compile_list_of_regexes(in_list): " Expected string based object. Skipping." ).format(str(type(item)), str(item))) return regexes - - -def _get_basic_context_data_for_anatomy(env=None): - """ Prepare Task context for anatomy data. - Args: - env (dict): Initial environment variables. `os.environ` is used when - not passed. - - Returns: - dict: With Anatomy context data. - """ - from ..settings.lib import get_default_anatomy_settings - from avalon.api import AvalonMongoDB - env = env or dict(os.environ) - - default_anatomy_tasks = get_default_anatomy_settings()["tasks"] - project_name = env["AVALON_PROJECT"] - asset_name = env["AVALON_ASSET"] - task_name = env["AVALON_TASK"] - - # Avalon database connection - dbcon = AvalonMongoDB() - dbcon.Session["AVALON_PROJECT"] = project_name - dbcon.install() - - # Project document - project_doc = dbcon.find_one({"type": "project"}) - asset_doc = dbcon.find_one({ - "type": "asset", - "name": asset_name - }) - - # Discard avalon connection - dbcon.uninstall() - - # get relevant task type from asset doc - task_type = None - for task_n, task_t in asset_doc["data"]["tasks"].items(): - if task_name == task_n: - task_type = task_t["type"] - break - - assert task_type, ( - "Task type cannot be found in on asset `{}` " - "with given task name `{}`").format(asset_name, task_name) - - # get short name for task type defined in default anatomy settings - default_anatomy_task = default_anatomy_tasks[task_type] - assert default_anatomy_task, ( - "Something went wrong. Default anatomy tasks are not holding" - "requested task type: `{}`".format(task_type) - ) - - return { - "project": { - "name": project_doc["name"], - "code": project_doc["data"].get("code") - }, - "asset": asset_name, - "task": { - "name": task_name, - "type": task_type, - "sort_name": default_anatomy_task["short_name"] - } - } - - -def get_custom_workfile_template(custom_templates): - anatomy = Anatomy() - - # get project, asset, task anatomy context data - anatomy_context_data = _get_basic_context_data_for_anatomy() - # add root dict - anatomy_context_data.update({"root": anatomy.roots}) - - # get task type for the task in context - test_task_type = anatomy_context_data["task"]["type"] - - # get path from matching profile - path = None - for templ in custom_templates: - for task_t in templ["task_types"]: - if task_t.lower() != test_task_type.lower(): - continue - path = templ["path"][platform.system().lower()] - - # when path is available try to format it in case - # there are some anatomy template strings - if path: - return path.format(**anatomy_context_data) - - return False diff --git a/openpype/lib/avalon_context.py b/openpype/lib/avalon_context.py index 80744cf1f5..cb7ed41840 100644 --- a/openpype/lib/avalon_context.py +++ b/openpype/lib/avalon_context.py @@ -1270,3 +1270,95 @@ def change_timer_to_current_context(): } requests.post(rest_api_url, json=data) + + +def _get_basic_context_data_for_anatomy(env=None): + """ Prepare Task context for anatomy data. + Args: + env (dict): Initial environment variables. `os.environ` is used when + not passed. + + Returns: + dict: With Anatomy context data. + """ + from ..settings.lib import get_default_anatomy_settings + from avalon.api import AvalonMongoDB + env = env or dict(os.environ) + + default_anatomy_tasks = get_default_anatomy_settings()["tasks"] + project_name = env["AVALON_PROJECT"] + asset_name = env["AVALON_ASSET"] + task_name = env["AVALON_TASK"] + + # Avalon database connection + dbcon = AvalonMongoDB() + dbcon.Session["AVALON_PROJECT"] = project_name + dbcon.install() + + # Project document + project_doc = dbcon.find_one({"type": "project"}) + asset_doc = dbcon.find_one({ + "type": "asset", + "name": asset_name + }) + + # Discard avalon connection + dbcon.uninstall() + + # get relevant task type from asset doc + task_type = None + for task_n, task_t in asset_doc["data"]["tasks"].items(): + if task_name == task_n: + task_type = task_t["type"] + break + + assert task_type, ( + "Task type cannot be found in on asset `{}` " + "with given task name `{}`").format(asset_name, task_name) + + # get short name for task type defined in default anatomy settings + default_anatomy_task = default_anatomy_tasks[task_type] + assert default_anatomy_task, ( + "Something went wrong. Default anatomy tasks are not holding" + "requested task type: `{}`".format(task_type) + ) + + return { + "project": { + "name": project_doc["name"], + "code": project_doc["data"].get("code") + }, + "asset": asset_name, + "task": { + "name": task_name, + "type": task_type, + "sort_name": default_anatomy_task["short_name"] + } + } + + +def get_custom_workfile_template(custom_templates): + anatomy = Anatomy() + + # get project, asset, task anatomy context data + anatomy_context_data = _get_basic_context_data_for_anatomy() + # add root dict + anatomy_context_data.update({"root": anatomy.roots}) + + # get task type for the task in context + test_task_type = anatomy_context_data["task"]["type"] + + # get path from matching profile + path = None + for templ in custom_templates: + for task_t in templ["task_types"]: + if task_t.lower() != test_task_type.lower(): + continue + path = templ["path"][platform.system().lower()] + + # when path is available try to format it in case + # there are some anatomy template strings + if path: + return path.format(**anatomy_context_data) + + return False