[Automated] Merged develop into main

This commit is contained in:
pypebot 2022-08-13 05:38:14 +02:00 committed by GitHub
commit fc959f8039
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 1638 additions and 982 deletions

View file

@ -793,7 +793,7 @@ def get_output_link_versions(project_name, version_id, fields=None):
# Does make sense to look for hero versions?
query_filter = {
"type": "version",
"data.inputLinks.input": version_id
"data.inputLinks.id": version_id
}
return conn.find(query_filter, _prepare_fields(fields))

View file

@ -17,6 +17,7 @@ CURRENT_ASSET_DOC_SCHEMA = "openpype:asset-3.0"
CURRENT_SUBSET_SCHEMA = "openpype:subset-3.0"
CURRENT_VERSION_SCHEMA = "openpype:version-3.0"
CURRENT_REPRESENTATION_SCHEMA = "openpype:representation-2.0"
CURRENT_WORKFILE_INFO_SCHEMA = "openpype:workfile-1.0"
def _create_or_convert_to_mongo_id(mongo_id):
@ -188,6 +189,38 @@ def new_representation_doc(
}
def new_workfile_info_doc(
filename, asset_id, task_name, files, data=None, entity_id=None
):
"""Create skeleton data of workfile info document.
Workfile document is at this moment used primarily for artist notes.
Args:
filename (str): Filename of workfile.
asset_id (Union[str, ObjectId]): Id of asset under which workfile live.
task_name (str): Task under which was workfile created.
files (List[str]): List of rootless filepaths related to workfile.
data (Dict[str, Any]): Additional metadata.
Returns:
Dict[str, Any]: Skeleton of workfile info document.
"""
if not data:
data = {}
return {
"_id": _create_or_convert_to_mongo_id(entity_id),
"type": "workfile",
"parent": ObjectId(asset_id),
"task_name": task_name,
"filename": filename,
"data": data,
"files": files
}
def _prepare_update_data(old_doc, new_doc, replace):
changes = {}
for key, value in new_doc.items():
@ -243,6 +276,20 @@ def prepare_representation_update_data(old_doc, new_doc, replace=True):
return _prepare_update_data(old_doc, new_doc, replace)
def prepare_workfile_info_update_data(old_doc, new_doc, replace=True):
"""Compare two workfile info documents and prepare update data.
Based on compared values will create update data for 'UpdateOperation'.
Empty output means that documents are identical.
Returns:
Dict[str, Any]: Changes between old and new document.
"""
return _prepare_update_data(old_doc, new_doc, replace)
@six.add_metaclass(ABCMeta)
class AbstractOperation(object):
"""Base operation class.

View file

@ -1,11 +1,11 @@
import os
import shutil
from openpype.lib import (
PreLaunchHook,
get_custom_workfile_template_by_context,
from openpype.lib import PreLaunchHook
from openpype.settings import get_project_settings
from openpype.pipeline.workfile import (
get_custom_workfile_template,
get_custom_workfile_template_by_string_context
)
from openpype.settings import get_project_settings
class CopyTemplateWorkfile(PreLaunchHook):
@ -54,41 +54,22 @@ class CopyTemplateWorkfile(PreLaunchHook):
project_name = self.data["project_name"]
asset_name = self.data["asset_name"]
task_name = self.data["task_name"]
host_name = self.application.host_name
project_settings = get_project_settings(project_name)
host_settings = project_settings[self.application.host_name]
workfile_builder_settings = host_settings.get("workfile_builder")
if not workfile_builder_settings:
# TODO remove warning when deprecated
self.log.warning((
"Seems like old version of settings is used."
" Can't access custom templates in host \"{}\"."
).format(self.application.full_label))
return
if not workfile_builder_settings["create_first_version"]:
self.log.info((
"Project \"{}\" has turned off to create first workfile for"
" application \"{}\""
).format(project_name, self.application.full_label))
return
# Backwards compatibility
template_profiles = workfile_builder_settings.get("custom_templates")
if not template_profiles:
self.log.info(
"Custom templates are not filled. Skipping template copy."
)
return
project_doc = self.data.get("project_doc")
asset_doc = self.data.get("asset_doc")
anatomy = self.data.get("anatomy")
if project_doc and asset_doc:
self.log.debug("Started filtering of custom template paths.")
template_path = get_custom_workfile_template_by_context(
template_profiles, project_doc, asset_doc, task_name, anatomy
template_path = get_custom_workfile_template(
project_doc,
asset_doc,
task_name,
host_name,
anatomy,
project_settings
)
else:
@ -96,10 +77,13 @@ class CopyTemplateWorkfile(PreLaunchHook):
"Global data collection probably did not execute."
" Using backup solution."
))
dbcon = self.data.get("dbcon")
template_path = get_custom_workfile_template_by_string_context(
template_profiles, project_name, asset_name, task_name,
dbcon, anatomy
project_name,
asset_name,
task_name,
host_name,
anatomy,
project_settings
)
if not template_path:

View file

@ -3,9 +3,9 @@ import copy
from collections import OrderedDict
from pprint import pformat
import pyblish
from openpype.lib import get_workdir
import openpype.hosts.flame.api as opfapi
import openpype.pipeline as op_pipeline
from openpype.pipeline.workfile import get_workdir
class IntegrateBatchGroup(pyblish.api.InstancePlugin):
@ -324,7 +324,13 @@ class IntegrateBatchGroup(pyblish.api.InstancePlugin):
project_doc = instance.data["projectEntity"]
asset_entity = instance.data["assetEntity"]
anatomy = instance.context.data["anatomy"]
project_settings = instance.context.data["project_settings"]
return get_workdir(
project_doc, asset_entity, task_data["name"], "flame", anatomy
project_doc,
asset_entity,
task_data["name"],
"flame",
anatomy,
project_settings=project_settings
)

View file

@ -15,7 +15,7 @@ from openpype.pipeline import (
from openpype.lib import version_up
from openpype.hosts.fusion import api
from openpype.hosts.fusion.api import lib
from openpype.lib.avalon_context import get_workdir_from_session
from openpype.pipeline.context_tools import get_workdir_from_session
log = logging.getLogger("Update Slap Comp")

View file

@ -14,7 +14,7 @@ from openpype.pipeline import (
legacy_io,
)
from openpype.hosts.fusion import api
from openpype.lib.avalon_context import get_workdir_from_session
from openpype.pipeline.context_tools import get_workdir_from_session
log = logging.getLogger("Fusion Switch Shot")

View file

@ -6,9 +6,9 @@ from Qt import QtWidgets, QtGui
import maya.utils
import maya.cmds as cmds
from openpype.api import BuildWorkfile
from openpype.settings import get_project_settings
from openpype.pipeline import legacy_io
from openpype.pipeline.workfile import BuildWorkfile
from openpype.tools.utils import host_tools
from openpype.hosts.maya.api import lib, lib_rendersettings
from .lib import get_main_window, IS_HEADLESS

View file

@ -208,7 +208,8 @@ class ReferenceLoader(Loader):
file_type = {
"ma": "mayaAscii",
"mb": "mayaBinary",
"abc": "Alembic"
"abc": "Alembic",
"fbx": "FBX"
}.get(representation["name"])
assert file_type, "Unsupported representation: %s" % representation
@ -234,7 +235,7 @@ class ReferenceLoader(Loader):
path = self.prepare_root_value(path,
representation["context"]
["project"]
["code"])
["name"])
content = cmds.file(path,
loadReference=reference_node,
type=file_type,

View file

@ -199,7 +199,6 @@ class CollectMayaRender(pyblish.api.ContextPlugin):
)
# append full path
full_exp_files = []
aov_dict = {}
default_render_file = context.data.get('project_settings')\
.get('maya')\
@ -219,6 +218,7 @@ class CollectMayaRender(pyblish.api.ContextPlugin):
full_paths.append(full_path)
publish_meta_path = os.path.dirname(full_path)
aov_dict[aov_first_key] = full_paths
full_exp_files = [aov_dict]
frame_start_render = int(self.get_render_attribute(
"startFrame", layer=layer_name))

View file

@ -21,7 +21,6 @@ from openpype.client import (
)
from openpype.api import (
Logger,
BuildWorkfile,
get_version_from_path,
get_current_project_settings,
)
@ -39,7 +38,11 @@ from openpype.pipeline import (
legacy_io,
Anatomy,
)
from openpype.pipeline.context_tools import get_current_project_asset
from openpype.pipeline.context_tools import (
get_current_project_asset,
get_custom_workfile_template_from_session
)
from openpype.pipeline.workfile import BuildWorkfile
from . import gizmo_menu
@ -2444,15 +2447,12 @@ def _launch_workfile_app():
def process_workfile_builder():
from openpype.lib import (
env_value_to_bool,
get_custom_workfile_template
)
# to avoid looping of the callback, remove it!
nuke.removeOnCreate(process_workfile_builder, nodeClass="Root")
# get state from settings
workfile_builder = get_current_project_settings()["nuke"].get(
project_settings = get_current_project_settings()
workfile_builder = project_settings["nuke"].get(
"workfile_builder", {})
# get all imortant settings
@ -2462,7 +2462,6 @@ def process_workfile_builder():
# get settings
createfv_on = workfile_builder.get("create_first_version") or None
custom_templates = workfile_builder.get("custom_templates") or None
builder_on = workfile_builder.get("builder_on_start") or None
last_workfile_path = os.environ.get("AVALON_LAST_WORKFILE")
@ -2470,8 +2469,8 @@ def process_workfile_builder():
# generate first version in file not existing and feature is enabled
if createfv_on and not os.path.exists(last_workfile_path):
# get custom template path if any
custom_template_path = get_custom_workfile_template(
custom_templates
custom_template_path = get_custom_workfile_template_from_session(
project_settings=project_settings
)
# if custom template is defined

View file

@ -9,7 +9,6 @@ import pyblish.api
import openpype
from openpype.api import (
Logger,
BuildWorkfile,
get_current_project_settings
)
from openpype.lib import register_event_callback
@ -22,6 +21,7 @@ from openpype.pipeline import (
deregister_inventory_action_path,
AVALON_CONTAINER_ID,
)
from openpype.pipeline.workfile import BuildWorkfile
from openpype.tools.utils import host_tools
from .command import viewer_update_and_undo_stop

View file

@ -1,15 +1,15 @@
import os
from openpype.lib import (
StringTemplate,
get_workfile_template_key_from_context,
get_last_workfile_with_version,
)
from openpype.lib import StringTemplate
from openpype.pipeline import (
registered_host,
legacy_io,
Anatomy,
)
from openpype.pipeline.workfile import (
get_workfile_template_key_from_context,
get_last_workfile_with_version,
)
from openpype.pipeline.template_data import get_template_data_with_names
from openpype.hosts.tvpaint.api import lib, pipeline, plugin
@ -57,8 +57,7 @@ class LoadWorkfile(plugin.Loader):
asset_name,
task_name,
host_name,
project_name=project_name,
dbcon=legacy_io
project_name=project_name
)
anatomy = Anatomy(project_name)

View file

@ -27,11 +27,6 @@ from openpype.settings.constants import (
from . import PypeLogger
from .profiles_filtering import filter_profiles
from .local_settings import get_openpype_username
from .avalon_context import (
get_workdir_with_workdir_data,
get_workfile_template_key,
get_last_workfile
)
from .python_module_tools import (
modules_from_path,
@ -1635,7 +1630,14 @@ def prepare_context_environments(data, env_group=None):
data["task_type"] = task_type
try:
workdir = get_workdir_with_workdir_data(workdir_data, anatomy)
from openpype.pipeline.workfile import get_workdir_with_workdir_data
workdir = get_workdir_with_workdir_data(
workdir_data,
anatomy.project_name,
anatomy,
project_settings=project_settings
)
except Exception as exc:
raise ApplicationLaunchFailed(
@ -1725,11 +1727,19 @@ def _prepare_last_workfile(data, workdir):
if not last_workfile_path:
extensions = HOST_WORKFILE_EXTENSIONS.get(app.host_name)
if extensions:
from openpype.pipeline.workfile import (
get_workfile_template_key,
get_last_workfile
)
anatomy = data["anatomy"]
project_settings = data["project_settings"]
task_type = workdir_data["task"]["type"]
template_key = get_workfile_template_key(
task_type, app.host_name, project_settings=project_settings
task_type,
app.host_name,
project_name,
project_settings=project_settings
)
# Find last workfile
file_template = str(anatomy.templates[template_key]["file"])

File diff suppressed because it is too large Load diff

View file

@ -12,12 +12,10 @@ from openpype.client import (
get_assets,
)
from openpype.settings import get_project_settings, get_system_settings
from openpype.lib import (
get_workfile_template_key,
StringTemplate,
)
from openpype.lib import StringTemplate
from openpype.pipeline import Anatomy
from openpype.pipeline.template_data import get_template_data
from openpype.pipeline.workfile import get_workfile_template_key
from openpype_modules.ftrack.lib import BaseAction, statics_icon
from openpype_modules.ftrack.lib.avalon_sync import create_chunks
@ -299,7 +297,10 @@ class FillWorkfileAttributeAction(BaseAction):
task_type = workfile_data["task"]["type"]
template_key = get_workfile_template_key(
task_type, host_name, project_settings=project_settings
task_type,
host_name,
project_name,
project_settings=project_settings
)
if template_key in templates_by_key:
template = templates_by_key[template_key]

View file

@ -219,18 +219,23 @@ def update_op_assets(
# Add parents for hierarchy
item_data["parents"] = []
while parent_zou_id is not None:
parent_doc = asset_doc_ids[parent_zou_id]
ancestor_id = parent_zou_id
while ancestor_id is not None:
parent_doc = asset_doc_ids[ancestor_id]
item_data["parents"].insert(0, parent_doc["name"])
# Get parent entity
parent_entity = parent_doc["data"]["zou"]
parent_zou_id = parent_entity.get("parent_id")
ancestor_id = parent_entity.get("parent_id")
if item_type in ["Shot", "Sequence"]:
# Build OpenPype compatible name
if item_type in ["Shot", "Sequence"] and parent_zou_id is not None:
# Name with parents hierarchy "({episode}_){sequence}_{shot}"
# to avoid duplicate name issue
item_name = f"{item_data['parents'][-1]}_{item['name']}"
# Update doc name
asset_doc_ids[item["id"]]["name"] = item_name
else:
item_name = item["name"]

View file

@ -0,0 +1,39 @@
"""
Requires:
context -> system_settings
context -> openPypeModules
"""
import pyblish.api
from openpype.pipeline import legacy_io
class StartTimer(pyblish.api.ContextPlugin):
label = "Start Timer"
order = pyblish.api.IntegratorOrder + 1
hosts = ["*"]
def process(self, context):
timers_manager = context.data["openPypeModules"]["timers_manager"]
if not timers_manager.enabled:
self.log.debug("TimersManager is disabled")
return
modules_settings = context.data["system_settings"]["modules"]
if not modules_settings["timers_manager"]["disregard_publishing"]:
self.log.debug("Publish is not affecting running timers.")
return
project_name = legacy_io.active_project()
asset_name = legacy_io.Session.get("AVALON_ASSET")
task_name = legacy_io.Session.get("AVALON_TASK")
if not project_name or not asset_name or not task_name:
self.log.info((
"Current context does not contain all"
" required information to start a timer."
))
return
timers_manager.start_timer_with_webserver(
project_name, asset_name, task_name, self.log
)

View file

@ -0,0 +1,27 @@
"""
Requires:
context -> system_settings
context -> openPypeModules
"""
import pyblish.api
class StopTimer(pyblish.api.ContextPlugin):
label = "Stop Timer"
order = pyblish.api.ExtractorOrder - 0.49
hosts = ["*"]
def process(self, context):
timers_manager = context.data["openPypeModules"]["timers_manager"]
if not timers_manager.enabled:
self.log.debug("TimersManager is disabled")
return
modules_settings = context.data["system_settings"]["modules"]
if not modules_settings["timers_manager"]["disregard_publishing"]:
self.log.debug("Publish is not affecting running timers.")
return
timers_manager.stop_timer_with_webserver(self.log)

View file

@ -6,12 +6,15 @@ from openpype.client import get_asset_by_name
from openpype.modules import OpenPypeModule
from openpype_interfaces import (
ITrayService,
ILaunchHookPaths
ILaunchHookPaths,
IPluginPaths
)
from openpype.lib.events import register_event_callback
from .exceptions import InvalidContextError
TIMER_MODULE_DIR = os.path.dirname(os.path.abspath(__file__))
class ExampleTimersManagerConnector:
"""Timers manager can handle timers of multiple modules/addons.
@ -33,6 +36,7 @@ class ExampleTimersManagerConnector:
}
```
"""
# Not needed at all
def __init__(self, module):
# Store timer manager module to be able call it's methods when needed
@ -72,7 +76,12 @@ class ExampleTimersManagerConnector:
self._timers_manager_module.timer_stopped(self._module.id)
class TimersManager(OpenPypeModule, ITrayService, ILaunchHookPaths):
class TimersManager(
OpenPypeModule,
ITrayService,
ILaunchHookPaths,
IPluginPaths
):
""" Handles about Timers.
Should be able to start/stop all timers at once.
@ -177,11 +186,19 @@ class TimersManager(OpenPypeModule, ITrayService, ILaunchHookPaths):
def get_launch_hook_paths(self):
"""Implementation of `ILaunchHookPaths`."""
return os.path.join(
os.path.dirname(os.path.abspath(__file__)),
TIMER_MODULE_DIR,
"launch_hooks"
)
def get_plugin_paths(self):
"""Implementation of `IPluginPaths`."""
return {
"publish": [os.path.join(TIMER_MODULE_DIR, "plugins", "publish")]
}
@staticmethod
def get_timer_data_for_context(
project_name, asset_name, task_name, logger=None
@ -388,6 +405,7 @@ class TimersManager(OpenPypeModule, ITrayService, ILaunchHookPaths):
logger (logging.Logger): Logger object. Using 'print' if not
passed.
"""
webserver_url = os.environ.get("OPENPYPE_WEBSERVER_URL")
if not webserver_url:
msg = "Couldn't find webserver url"
@ -415,6 +433,36 @@ class TimersManager(OpenPypeModule, ITrayService, ILaunchHookPaths):
return requests.post(rest_api_url, json=data)
@staticmethod
def stop_timer_with_webserver(logger=None):
"""Prepared method for calling stop timers on REST api.
Args:
logger (logging.Logger): Logger used for logging messages.
"""
webserver_url = os.environ.get("OPENPYPE_WEBSERVER_URL")
if not webserver_url:
msg = "Couldn't find webserver url"
if logger is not None:
logger.warning(msg)
else:
print(msg)
return
rest_api_url = "{}/timers_manager/stop_timer".format(webserver_url)
try:
import requests
except Exception:
msg = "Couldn't start timer ('requests' is not available)"
if logger is not None:
logger.warning(msg)
else:
print(msg)
return
return requests.post(rest_api_url)
def on_host_install(self, host, host_name, project_name):
self.log.debug("Installing task changed callback")
register_event_callback("taskChanged", self._on_host_task_change)

View file

@ -22,6 +22,10 @@ from openpype.settings import get_project_settings
from .publish.lib import filter_pyblish_plugins
from .anatomy import Anatomy
from .template_data import get_template_data_with_names
from .workfile import (
get_workfile_template_key,
get_custom_workfile_template_by_string_context,
)
from . import (
legacy_io,
register_loader_plugin_path,
@ -377,3 +381,67 @@ def get_template_data_from_session(session=None, system_settings=None):
return get_template_data_with_names(
project_name, asset_name, task_name, host_name, system_settings
)
def get_workdir_from_session(session=None, template_key=None):
"""Template data for template fill from session keys.
Args:
session (Union[Dict[str, str], None]): The Session to use. If not
provided use the currently active global Session.
template_key (str): Prepared template key from which workdir is
calculated.
Returns:
str: Workdir path.
"""
if session is None:
session = legacy_io.Session
project_name = session["AVALON_PROJECT"]
host_name = session["AVALON_APP"]
anatomy = Anatomy(project_name)
template_data = get_template_data_from_session(session)
anatomy_filled = anatomy.format(template_data)
if not template_key:
task_type = template_data["task"]["type"]
template_key = get_workfile_template_key(
task_type,
host_name,
project_name=project_name
)
path = anatomy_filled[template_key]["folder"]
if path:
path = os.path.normpath(path)
return path
def get_custom_workfile_template_from_session(
session=None, project_settings=None
):
"""Filter and fill workfile template profiles by current context.
Current context is defined by `legacy_io.Session`. That's why this
function should be used only inside host where context is set and stable.
Args:
session (Union[None, Dict[str, str]]): Session from which are taken
data.
project_settings(Dict[str, Any]): Template profiles from settings.
Returns:
str: Path to template or None if none of profiles match current
context. (Existence of formatted path is not validated.)
"""
if session is None:
session = legacy_io.Session
return get_custom_workfile_template_by_string_context(
session["AVALON_PROJECT"],
session["AVALON_ASSET"],
session["AVALON_TASK"],
session["AVALON_APP"],
project_settings=project_settings
)

View file

@ -0,0 +1,30 @@
from .path_resolving import (
get_workfile_template_key_from_context,
get_workfile_template_key,
get_workdir_with_workdir_data,
get_workdir,
get_last_workfile_with_version,
get_last_workfile,
get_custom_workfile_template,
get_custom_workfile_template_by_string_context,
)
from .build_workfile import BuildWorkfile
__all__ = (
"get_workfile_template_key_from_context",
"get_workfile_template_key",
"get_workdir_with_workdir_data",
"get_workdir",
"get_last_workfile_with_version",
"get_last_workfile",
"get_custom_workfile_template",
"get_custom_workfile_template_by_string_context",
"BuildWorkfile",
)

View file

@ -0,0 +1,693 @@
import os
import re
import collections
import json
from openpype.client import (
get_asset_by_name,
get_subsets,
get_last_versions,
get_representations,
)
from openpype.settings import get_project_settings
from openpype.lib import (
get_linked_assets,
filter_profiles,
Logger,
)
from openpype.pipeline import legacy_io
from openpype.pipeline.load import (
discover_loader_plugins,
IncompatibleLoaderError,
load_container,
)
class BuildWorkfile:
"""Wrapper for build workfile process.
Load representations for current context by build presets. Build presets
are host related, since each host has it's loaders.
"""
_log = None
@property
def log(self):
if self._log is None:
self._log = Logger.get_logger(self.__class__.__name__)
return self._log
@staticmethod
def map_subsets_by_family(subsets):
subsets_by_family = collections.defaultdict(list)
for subset in subsets:
family = subset["data"].get("family")
if not family:
families = subset["data"].get("families")
if not families:
continue
family = families[0]
subsets_by_family[family].append(subset)
return subsets_by_family
def process(self):
"""Main method of this wrapper.
Building of workfile is triggered and is possible to implement
post processing of loaded containers if necessary.
Returns:
List[Dict[str, Any]]: Loaded containers during build.
"""
return self.build_workfile()
def build_workfile(self):
"""Prepares and load containers into workfile.
Loads latest versions of current and linked assets to workfile by logic
stored in Workfile profiles from presets. Profiles are set by host,
filtered by current task name and used by families.
Each family can specify representation names and loaders for
representations and first available and successful loaded
representation is returned as container.
At the end you'll get list of loaded containers per each asset.
loaded_containers [{
"asset_entity": <AssetEntity1>,
"containers": [<Container1>, <Container2>, ...]
}, {
"asset_entity": <AssetEntity2>,
"containers": [<Container3>, ...]
}, {
...
}]
Returns:
List[Dict[str, Any]]: Loaded containers during build.
"""
loaded_containers = []
# Get current asset name and entity
project_name = legacy_io.active_project()
current_asset_name = legacy_io.Session["AVALON_ASSET"]
current_asset_entity = get_asset_by_name(
project_name, current_asset_name
)
# Skip if asset was not found
if not current_asset_entity:
print("Asset entity with name `{}` was not found".format(
current_asset_name
))
return loaded_containers
# Prepare available loaders
loaders_by_name = {}
for loader in discover_loader_plugins():
loader_name = loader.__name__
if loader_name in loaders_by_name:
raise KeyError(
"Duplicated loader name {0}!".format(loader_name)
)
loaders_by_name[loader_name] = loader
# Skip if there are any loaders
if not loaders_by_name:
self.log.warning("There are no registered loaders.")
return loaded_containers
# Get current task name
current_task_name = legacy_io.Session["AVALON_TASK"]
# Load workfile presets for task
self.build_presets = self.get_build_presets(
current_task_name, current_asset_entity
)
# Skip if there are any presets for task
if not self.build_presets:
self.log.warning(
"Current task `{}` does not have any loading preset.".format(
current_task_name
)
)
return loaded_containers
# Get presets for loading current asset
current_context_profiles = self.build_presets.get("current_context")
# Get presets for loading linked assets
link_context_profiles = self.build_presets.get("linked_assets")
# Skip if both are missing
if not current_context_profiles and not link_context_profiles:
self.log.warning(
"Current task `{}` has empty loading preset.".format(
current_task_name
)
)
return loaded_containers
elif not current_context_profiles:
self.log.warning((
"Current task `{}` doesn't have any loading"
" preset for it's context."
).format(current_task_name))
elif not link_context_profiles:
self.log.warning((
"Current task `{}` doesn't have any"
"loading preset for it's linked assets."
).format(current_task_name))
# Prepare assets to process by workfile presets
assets = []
current_asset_id = None
if current_context_profiles:
# Add current asset entity if preset has current context set
assets.append(current_asset_entity)
current_asset_id = current_asset_entity["_id"]
if link_context_profiles:
# Find and append linked assets if preset has set linked mapping
link_assets = get_linked_assets(current_asset_entity)
if link_assets:
assets.extend(link_assets)
# Skip if there are no assets. This can happen if only linked mapping
# is set and there are no links for his asset.
if not assets:
self.log.warning(
"Asset does not have linked assets. Nothing to process."
)
return loaded_containers
# Prepare entities from database for assets
prepared_entities = self._collect_last_version_repres(assets)
# Load containers by prepared entities and presets
# - Current asset containers
if current_asset_id and current_asset_id in prepared_entities:
current_context_data = prepared_entities.pop(current_asset_id)
loaded_data = self.load_containers_by_asset_data(
current_context_data, current_context_profiles, loaders_by_name
)
if loaded_data:
loaded_containers.append(loaded_data)
# - Linked assets container
for linked_asset_data in prepared_entities.values():
loaded_data = self.load_containers_by_asset_data(
linked_asset_data, link_context_profiles, loaders_by_name
)
if loaded_data:
loaded_containers.append(loaded_data)
# Return list of loaded containers
return loaded_containers
def get_build_presets(self, task_name, asset_doc):
""" Returns presets to build workfile for task name.
Presets are loaded for current project set in
io.Session["AVALON_PROJECT"], filtered by registered host
and entered task name.
Args:
task_name (str): Task name used for filtering build presets.
Returns:
Dict[str, Any]: preset per entered task name
"""
host_name = os.environ["AVALON_APP"]
project_settings = get_project_settings(
legacy_io.Session["AVALON_PROJECT"]
)
host_settings = project_settings.get(host_name) or {}
# Get presets for host
wb_settings = host_settings.get("workfile_builder")
if not wb_settings:
# backward compatibility
wb_settings = host_settings.get("workfile_build") or {}
builder_profiles = wb_settings.get("profiles")
if not builder_profiles:
return None
task_type = (
asset_doc
.get("data", {})
.get("tasks", {})
.get(task_name, {})
.get("type")
)
filter_data = {
"task_types": task_type,
"tasks": task_name
}
return filter_profiles(builder_profiles, filter_data)
def _filter_build_profiles(self, build_profiles, loaders_by_name):
""" Filter build profiles by loaders and prepare process data.
Valid profile must have "loaders", "families" and "repre_names" keys
with valid values.
- "loaders" expects list of strings representing possible loaders.
- "families" expects list of strings for filtering
by main subset family.
- "repre_names" expects list of strings for filtering by
representation name.
Lowered "families" and "repre_names" are prepared for each profile with
all required keys.
Args:
build_profiles (Dict[str, Any]): Profiles for building workfile.
loaders_by_name (Dict[str, LoaderPlugin]): Available loaders
per name.
Returns:
List[Dict[str, Any]]: Filtered and prepared profiles.
"""
valid_profiles = []
for profile in build_profiles:
# Check loaders
profile_loaders = profile.get("loaders")
if not profile_loaders:
self.log.warning((
"Build profile has missing loaders configuration: {0}"
).format(json.dumps(profile, indent=4)))
continue
# Check if any loader is available
loaders_match = False
for loader_name in profile_loaders:
if loader_name in loaders_by_name:
loaders_match = True
break
if not loaders_match:
self.log.warning((
"All loaders from Build profile are not available: {0}"
).format(json.dumps(profile, indent=4)))
continue
# Check families
profile_families = profile.get("families")
if not profile_families:
self.log.warning((
"Build profile is missing families configuration: {0}"
).format(json.dumps(profile, indent=4)))
continue
# Check representation names
profile_repre_names = profile.get("repre_names")
if not profile_repre_names:
self.log.warning((
"Build profile is missing"
" representation names filtering: {0}"
).format(json.dumps(profile, indent=4)))
continue
# Prepare lowered families and representation names
profile["families_lowered"] = [
fam.lower() for fam in profile_families
]
profile["repre_names_lowered"] = [
name.lower() for name in profile_repre_names
]
valid_profiles.append(profile)
return valid_profiles
def _prepare_profile_for_subsets(self, subsets, profiles):
"""Select profile for each subset by it's data.
Profiles are filtered for each subset individually.
Profile is filtered by subset's family, optionally by name regex and
representation names set in profile.
It is possible to not find matching profile for subset, in that case
subset is skipped and it is possible that none of subsets have
matching profile.
Args:
subsets (List[Dict[str, Any]]): Subset documents.
profiles (List[Dict[str, Any]]): Build profiles.
Returns:
Dict[str, Any]: Profile by subset's id.
"""
# Prepare subsets
subsets_by_family = self.map_subsets_by_family(subsets)
profiles_per_subset_id = {}
for family, subsets in subsets_by_family.items():
family_low = family.lower()
for profile in profiles:
# Skip profile if does not contain family
if family_low not in profile["families_lowered"]:
continue
# Precompile name filters as regexes
profile_regexes = profile.get("subset_name_filters")
if profile_regexes:
_profile_regexes = []
for regex in profile_regexes:
_profile_regexes.append(re.compile(regex))
profile_regexes = _profile_regexes
# TODO prepare regex compilation
for subset in subsets:
# Verify regex filtering (optional)
if profile_regexes:
valid = False
for pattern in profile_regexes:
if re.match(pattern, subset["name"]):
valid = True
break
if not valid:
continue
profiles_per_subset_id[subset["_id"]] = profile
# break profiles loop on finding the first matching profile
break
return profiles_per_subset_id
def load_containers_by_asset_data(
self, asset_entity_data, build_profiles, loaders_by_name
):
"""Load containers for entered asset entity by Build profiles.
Args:
asset_entity_data (Dict[str, Any]): Prepared data with subsets,
last versions and representations for specific asset.
build_profiles (Dict[str, Any]): Build profiles.
loaders_by_name (Dict[str, LoaderPlugin]): Available loaders
per name.
Returns:
Dict[str, Any]: Output contains asset document
and loaded containers.
"""
# Make sure all data are not empty
if not asset_entity_data or not build_profiles or not loaders_by_name:
return
asset_entity = asset_entity_data["asset_entity"]
valid_profiles = self._filter_build_profiles(
build_profiles, loaders_by_name
)
if not valid_profiles:
self.log.warning(
"There are not valid Workfile profiles. Skipping process."
)
return
self.log.debug("Valid Workfile profiles: {}".format(valid_profiles))
subsets_by_id = {}
version_by_subset_id = {}
repres_by_version_id = {}
for subset_id, in_data in asset_entity_data["subsets"].items():
subset_entity = in_data["subset_entity"]
subsets_by_id[subset_entity["_id"]] = subset_entity
version_data = in_data["version"]
version_entity = version_data["version_entity"]
version_by_subset_id[subset_id] = version_entity
repres_by_version_id[version_entity["_id"]] = (
version_data["repres"]
)
if not subsets_by_id:
self.log.warning("There are not subsets for asset {0}".format(
asset_entity["name"]
))
return
profiles_per_subset_id = self._prepare_profile_for_subsets(
subsets_by_id.values(), valid_profiles
)
if not profiles_per_subset_id:
self.log.warning("There are not valid subsets.")
return
valid_repres_by_subset_id = collections.defaultdict(list)
for subset_id, profile in profiles_per_subset_id.items():
profile_repre_names = profile["repre_names_lowered"]
version_entity = version_by_subset_id[subset_id]
version_id = version_entity["_id"]
repres = repres_by_version_id[version_id]
for repre in repres:
repre_name_low = repre["name"].lower()
if repre_name_low in profile_repre_names:
valid_repres_by_subset_id[subset_id].append(repre)
# DEBUG message
msg = "Valid representations for Asset: `{}`".format(
asset_entity["name"]
)
for subset_id, repres in valid_repres_by_subset_id.items():
subset = subsets_by_id[subset_id]
msg += "\n# Subset Name/ID: `{}`/{}".format(
subset["name"], subset_id
)
for repre in repres:
msg += "\n## Repre name: `{}`".format(repre["name"])
self.log.debug(msg)
containers = self._load_containers(
valid_repres_by_subset_id, subsets_by_id,
profiles_per_subset_id, loaders_by_name
)
return {
"asset_entity": asset_entity,
"containers": containers
}
def _load_containers(
self, repres_by_subset_id, subsets_by_id,
profiles_per_subset_id, loaders_by_name
):
"""Real load by collected data happens here.
Loading of representations per subset happens here. Each subset can
loads one representation. Loading is tried in specific order.
Representations are tried to load by names defined in configuration.
If subset has representation matching representation name each loader
is tried to load it until any is successful. If none of them was
successful then next representation name is tried.
Subset process loop ends when any representation is loaded or
all matching representations were already tried.
Args:
repres_by_subset_id (Dict[str, Dict[str, Any]]): Available
representations mapped by their parent (subset) id.
subsets_by_id (Dict[str, Dict[str, Any]]): Subset documents
mapped by their id.
profiles_per_subset_id (Dict[str, Dict[str, Any]]): Build profiles
mapped by subset id.
loaders_by_name (Dict[str, LoaderPlugin]): Available loaders
per name.
Returns:
List[Dict[str, Any]]: Objects of loaded containers.
"""
loaded_containers = []
# Get subset id order from build presets.
build_presets = self.build_presets.get("current_context", [])
build_presets += self.build_presets.get("linked_assets", [])
subset_ids_ordered = []
for preset in build_presets:
for preset_family in preset["families"]:
for id, subset in subsets_by_id.items():
if preset_family not in subset["data"].get("families", []):
continue
subset_ids_ordered.append(id)
# Order representations from subsets.
print("repres_by_subset_id", repres_by_subset_id)
representations_ordered = []
representations = []
for id in subset_ids_ordered:
for subset_id, repres in repres_by_subset_id.items():
if repres in representations:
continue
if id == subset_id:
representations_ordered.append((subset_id, repres))
representations.append(repres)
print("representations", representations)
# Load ordered representations.
for subset_id, repres in representations_ordered:
subset_name = subsets_by_id[subset_id]["name"]
profile = profiles_per_subset_id[subset_id]
loaders_last_idx = len(profile["loaders"]) - 1
repre_names_last_idx = len(profile["repre_names_lowered"]) - 1
repre_by_low_name = {
repre["name"].lower(): repre for repre in repres
}
is_loaded = False
for repre_name_idx, profile_repre_name in enumerate(
profile["repre_names_lowered"]
):
# Break iteration if representation was already loaded
if is_loaded:
break
repre = repre_by_low_name.get(profile_repre_name)
if not repre:
continue
for loader_idx, loader_name in enumerate(profile["loaders"]):
if is_loaded:
break
loader = loaders_by_name.get(loader_name)
if not loader:
continue
try:
container = load_container(
loader,
repre["_id"],
name=subset_name
)
loaded_containers.append(container)
is_loaded = True
except Exception as exc:
if exc == IncompatibleLoaderError:
self.log.info((
"Loader `{}` is not compatible with"
" representation `{}`"
).format(loader_name, repre["name"]))
else:
self.log.error(
"Unexpected error happened during loading",
exc_info=True
)
msg = "Loading failed."
if loader_idx < loaders_last_idx:
msg += " Trying next loader."
elif repre_name_idx < repre_names_last_idx:
msg += (
" Loading of subset `{}` was not successful."
).format(subset_name)
else:
msg += " Trying next representation."
self.log.info(msg)
return loaded_containers
def _collect_last_version_repres(self, asset_docs):
"""Collect subsets, versions and representations for asset_entities.
Args:
asset_docs (List[Dict[str, Any]]): Asset entities for which
want to find data.
Returns:
Dict[str, Any]: collected entities
Example output:
```
{
{Asset ID}: {
"asset_entity": <AssetEntity>,
"subsets": {
{Subset ID}: {
"subset_entity": <SubsetEntity>,
"version": {
"version_entity": <VersionEntity>,
"repres": [
<RepreEntity1>, <RepreEntity2>, ...
]
}
},
...
}
},
...
}
output[asset_id]["subsets"][subset_id]["version"]["repres"]
```
"""
output = {}
if not asset_docs:
return output
asset_docs_by_ids = {asset["_id"]: asset for asset in asset_docs}
project_name = legacy_io.active_project()
subsets = list(get_subsets(
project_name, asset_ids=asset_docs_by_ids.keys()
))
subset_entity_by_ids = {subset["_id"]: subset for subset in subsets}
last_version_by_subset_id = get_last_versions(
project_name, subset_entity_by_ids.keys()
)
last_version_docs_by_id = {
version["_id"]: version
for version in last_version_by_subset_id.values()
}
repre_docs = get_representations(
project_name, version_ids=last_version_docs_by_id.keys()
)
for repre_doc in repre_docs:
version_id = repre_doc["parent"]
version_doc = last_version_docs_by_id[version_id]
subset_id = version_doc["parent"]
subset_doc = subset_entity_by_ids[subset_id]
asset_id = subset_doc["parent"]
asset_doc = asset_docs_by_ids[asset_id]
if asset_id not in output:
output[asset_id] = {
"asset_entity": asset_doc,
"subsets": {}
}
if subset_id not in output[asset_id]["subsets"]:
output[asset_id]["subsets"][subset_id] = {
"subset_entity": subset_doc,
"version": {
"version_entity": version_doc,
"repres": []
}
}
output[asset_id]["subsets"][subset_id]["version"]["repres"].append(
repre_doc
)
return output

View file

@ -0,0 +1,464 @@
import os
import re
import copy
import platform
from openpype.client import get_project, get_asset_by_name
from openpype.settings import get_project_settings
from openpype.lib import (
filter_profiles,
Logger,
StringTemplate,
)
from openpype.pipeline import Anatomy
from openpype.pipeline.template_data import get_template_data
def get_workfile_template_key_from_context(
asset_name, task_name, host_name, project_name, project_settings=None
):
"""Helper function to get template key for workfile template.
Do the same as `get_workfile_template_key` but returns value for "session
context".
Args:
asset_name(str): Name of asset document.
task_name(str): Task name for which is template key retrieved.
Must be available on asset document under `data.tasks`.
host_name(str): Name of host implementation for which is workfile
used.
project_name(str): Project name where asset and task is.
project_settings(Dict[str, Any]): Project settings for passed
'project_name'. Not required at all but makes function faster.
"""
asset_doc = get_asset_by_name(
project_name, asset_name, fields=["data.tasks"]
)
asset_tasks = asset_doc.get("data", {}).get("tasks") or {}
task_info = asset_tasks.get(task_name) or {}
task_type = task_info.get("type")
return get_workfile_template_key(
task_type, host_name, project_name, project_settings
)
def get_workfile_template_key(
task_type, host_name, project_name, project_settings=None
):
"""Workfile template key which should be used to get workfile template.
Function is using profiles from project settings to return right template
for passet task type and host name.
Args:
task_type(str): Name of task type.
host_name(str): Name of host implementation (e.g. "maya", "nuke", ...)
project_name(str): Name of project in which context should look for
settings.
project_settings(Dict[str, Any]): Prepared project settings for
project name. Optional to make processing faster.
"""
default = "work"
if not task_type or not host_name:
return default
if not project_settings:
project_settings = get_project_settings(project_name)
try:
profiles = (
project_settings
["global"]
["tools"]
["Workfiles"]
["workfile_template_profiles"]
)
except Exception:
profiles = []
if not profiles:
return default
profile_filter = {
"task_types": task_type,
"hosts": host_name
}
profile = filter_profiles(profiles, profile_filter)
if profile:
return profile["workfile_template"] or default
return default
def get_workdir_with_workdir_data(
workdir_data,
project_name,
anatomy=None,
template_key=None,
project_settings=None
):
"""Fill workdir path from entered data and project's anatomy.
It is possible to pass only project's name instead of project's anatomy but
one of them **must** be entered. It is preferred to enter anatomy if is
available as initialization of a new Anatomy object may be time consuming.
Args:
workdir_data (Dict[str, Any]): Data to fill workdir template.
project_name (str): Project's name.
anatomy (Anatomy): Anatomy object for specific project. Faster
processing if is passed.
template_key (str): Key of work templates in anatomy templates. If not
passed `get_workfile_template_key_from_context` is used to get it.
project_settings(Dict[str, Any]): Prepared project settings for
project name. Optional to make processing faster. Ans id used only
if 'template_key' is not passed.
Returns:
TemplateResult: Workdir path.
"""
if not anatomy:
anatomy = Anatomy(project_name)
if not template_key:
template_key = get_workfile_template_key(
workdir_data["task"]["type"],
workdir_data["app"],
workdir_data["project"]["name"],
project_settings
)
anatomy_filled = anatomy.format(workdir_data)
# Output is TemplateResult object which contain useful data
output = anatomy_filled[template_key]["folder"]
if output:
return output.normalized()
return output
def get_workdir(
project_doc,
asset_doc,
task_name,
host_name,
anatomy=None,
template_key=None,
project_settings=None
):
"""Fill workdir path from entered data and project's anatomy.
Args:
project_doc (Dict[str, Any]): Mongo document of project from MongoDB.
asset_doc (Dict[str, Any]): Mongo document of asset from MongoDB.
task_name (str): Task name for which are workdir data preapred.
host_name (str): Host which is used to workdir. This is required
because workdir template may contain `{app}` key. In `Session`
is stored under `AVALON_APP` key.
anatomy (Anatomy): Optional argument. Anatomy object is created using
project name from `project_doc`. It is preferred to pass this
argument as initialization of a new Anatomy object may be time
consuming.
template_key (str): Key of work templates in anatomy templates. Default
value is defined in `get_workdir_with_workdir_data`.
project_settings(Dict[str, Any]): Prepared project settings for
project name. Optional to make processing faster. Ans id used only
if 'template_key' is not passed.
Returns:
TemplateResult: Workdir path.
"""
if not anatomy:
anatomy = Anatomy(project_doc["name"])
workdir_data = get_template_data(
project_doc, asset_doc, task_name, host_name
)
# Output is TemplateResult object which contain useful data
return get_workdir_with_workdir_data(
workdir_data,
anatomy.project_name,
anatomy,
template_key,
project_settings
)
def get_last_workfile_with_version(
workdir, file_template, fill_data, extensions
):
"""Return last workfile version.
Usign workfile template and it's filling data find most possible last
version of workfile which was created for the context.
Functionality is fully based on knowing which keys are optional or what
values are expected as value.
The last modified file is used if more files can be considered as
last workfile.
Args:
workdir (str): Path to dir where workfiles are stored.
file_template (str): Template of file name.
fill_data (Dict[str, Any]): Data for filling template.
extensions (Iterable[str]): All allowed file extensions of workfile.
Returns:
Tuple[Union[str, None], Union[int, None]]: Last workfile with version
if there is any workfile otherwise None for both.
"""
if not os.path.exists(workdir):
return None, None
dotted_extensions = set()
for ext in extensions:
if not ext.startswith("."):
ext = ".{}".format(ext)
dotted_extensions.add(ext)
# Fast match on extension
filenames = [
filename
for filename in os.listdir(workdir)
if os.path.splitext(filename)[-1] in dotted_extensions
]
# Build template without optionals, version to digits only regex
# and comment to any definable value.
# Escape extensions dot for regex
regex_exts = [
"\\" + ext
for ext in dotted_extensions
]
ext_expression = "(?:" + "|".join(regex_exts) + ")"
# Replace `.{ext}` with `{ext}` so we are sure there is not dot at the end
file_template = re.sub(r"\.?{ext}", ext_expression, file_template)
# Replace optional keys with optional content regex
file_template = re.sub(r"<.*?>", r".*?", file_template)
# Replace `{version}` with group regex
file_template = re.sub(r"{version.*?}", r"([0-9]+)", file_template)
file_template = re.sub(r"{comment.*?}", r".+?", file_template)
file_template = StringTemplate.format_strict_template(
file_template, fill_data
)
# Match with ignore case on Windows due to the Windows
# OS not being case-sensitive. This avoids later running
# into the error that the file did exist if it existed
# with a different upper/lower-case.
kwargs = {}
if platform.system().lower() == "windows":
kwargs["flags"] = re.IGNORECASE
# Get highest version among existing matching files
version = None
output_filenames = []
for filename in sorted(filenames):
match = re.match(file_template, filename, **kwargs)
if not match:
continue
file_version = int(match.group(1))
if version is None or file_version > version:
output_filenames[:] = []
version = file_version
if file_version == version:
output_filenames.append(filename)
output_filename = None
if output_filenames:
if len(output_filenames) == 1:
output_filename = output_filenames[0]
else:
last_time = None
for _output_filename in output_filenames:
full_path = os.path.join(workdir, _output_filename)
mod_time = os.path.getmtime(full_path)
if last_time is None or last_time < mod_time:
output_filename = _output_filename
last_time = mod_time
return output_filename, version
def get_last_workfile(
workdir, file_template, fill_data, extensions, full_path=False
):
"""Return last workfile filename.
Returns file with version 1 if there is not workfile yet.
Args:
workdir(str): Path to dir where workfiles are stored.
file_template(str): Template of file name.
fill_data(Dict[str, Any]): Data for filling template.
extensions(Iterable[str]): All allowed file extensions of workfile.
full_path(bool): Full path to file is returned if set to True.
Returns:
str: Last or first workfile as filename of full path to filename.
"""
filename, version = get_last_workfile_with_version(
workdir, file_template, fill_data, extensions
)
if filename is None:
data = copy.deepcopy(fill_data)
data["version"] = 1
data.pop("comment", None)
if not data.get("ext"):
data["ext"] = extensions[0]
data["ext"] = data["ext"].replace('.', '')
filename = StringTemplate.format_strict_template(file_template, data)
if full_path:
return os.path.normpath(os.path.join(workdir, filename))
return filename
def get_custom_workfile_template(
project_doc,
asset_doc,
task_name,
host_name,
anatomy=None,
project_settings=None
):
"""Filter and fill workfile template profiles by passed context.
Custom workfile template can be used as first version of workfiles.
Template is a file on a disk which is set in settings. Expected settings
structure to have this feature enabled is:
project settings
|- <host name>
|- workfile_builder
|- create_first_version - a bool which must be set to 'True'
|- custom_templates - profiles based on task name/type which
points to a file which is copied as
first workfile
It is expected that passed argument are already queried documents of
project and asset as parents of processing task name.
Args:
project_doc (Dict[str, Any]): Project document from MongoDB.
asset_doc (Dict[str, Any]): Asset document from MongoDB.
task_name (str): Name of task for which templates are filtered.
host_name (str): Name of host.
anatomy (Anatomy): Optionally passed anatomy object for passed project
name.
project_settings(Dict[str, Any]): Preloaded project settings.
Returns:
str: Path to template or None if none of profiles match current
context. Existence of formatted path is not validated.
None: If no profile is matching context.
"""
log = Logger.get_logger("CustomWorkfileResolve")
project_name = project_doc["name"]
if project_settings is None:
project_settings = get_project_settings(project_name)
host_settings = project_settings.get(host_name)
if not host_settings:
log.info("Host \"{}\" doesn't have settings".format(host_name))
return None
workfile_builder_settings = host_settings.get("workfile_builder")
if not workfile_builder_settings:
log.info((
"Seems like old version of settings is used."
" Can't access custom templates in host \"{}\"."
).format(host_name))
return
if not workfile_builder_settings["create_first_version"]:
log.info((
"Project \"{}\" has turned off to create first workfile for"
" host \"{}\""
).format(project_name, host_name))
return
# Backwards compatibility
template_profiles = workfile_builder_settings.get("custom_templates")
if not template_profiles:
log.info(
"Custom templates are not filled. Skipping template copy."
)
return
if anatomy is None:
anatomy = Anatomy(project_name)
# get project, asset, task anatomy context data
anatomy_context_data = get_template_data(
project_doc, asset_doc, task_name, host_name
)
# add root dict
anatomy_context_data["root"] = anatomy.roots
# get task type for the task in context
current_task_type = anatomy_context_data["task"]["type"]
# get path from matching profile
matching_item = filter_profiles(
template_profiles,
{"task_types": current_task_type}
)
# when path is available try to format it in case
# there are some anatomy template strings
if matching_item:
template = matching_item["path"][platform.system().lower()]
return StringTemplate.format_strict_template(
template, anatomy_context_data
).normalized()
return None
def get_custom_workfile_template_by_string_context(
project_name,
asset_name,
task_name,
host_name,
anatomy=None,
project_settings=None
):
"""Filter and fill workfile template profiles by passed context.
Passed context are string representations of project, asset and task.
Function will query documents of project and asset to be able use
`get_custom_workfile_template` for rest of logic.
Args:
project_name(str): Project name.
asset_name(str): Asset name.
task_name(str): Task name.
host_name (str): Name of host.
anatomy(Anatomy): Optionally prepared anatomy object for passed
project.
project_settings(Dict[str, Any]): Preloaded project settings.
Returns:
str: Path to template or None if none of profiles match current
context. (Existence of formatted path is not validated.)
None: If no profile is matching context.
"""
project_doc = get_project(project_name)
asset_doc = get_asset_by_name(project_name, asset_name)
return get_custom_workfile_template(
project_doc, asset_doc, task_name, host_name, anatomy, project_settings
)

View file

@ -7,7 +7,7 @@ import pyblish.api
class CollectModules(pyblish.api.ContextPlugin):
"""Collect OpenPype modules."""
order = pyblish.api.CollectorOrder - 0.45
order = pyblish.api.CollectorOrder - 0.5
label = "OpenPype Modules"
def process(self, context):

View file

@ -3,9 +3,9 @@
<error id="main">
<title>Not up-to-date assets</title>
<description>
## Obsolete containers found
## Outdated containers found
Scene contains one or more obsolete loaded containers, eg. items loaded into scene by Loader.
Scene contains one or more outdated loaded containers, eg. versions loaded into scene by Loader are not latest.
### How to repair?
@ -17,8 +17,7 @@ Use 'Scene Inventory' and update all highlighted old container to latest OR
<detail>
### __Detailed Info__ (optional)
This validator protects you from rendering obsolete content, someone modified some referenced asset in this scene, eg.
by skipping this you would ignore changes to that asset.
This validates whether you're working with the latest versions of published content loaded into your scene. This protects you from using outdated versions of an asset.
</detail>
</error>
</root>

View file

@ -313,13 +313,9 @@ class IntegrateHeroVersion(pyblish.api.InstancePlugin):
}
repre_context = template_filled.used_values
for key in self.db_representation_context_keys:
if (
key in repre_context or
key not in anatomy_data
):
continue
repre_context[key] = anatomy_data[key]
value = anatomy_data.get(key)
if value is not None:
repre_context[key] = value
# Prepare new repre
repre = copy.deepcopy(repre_info["representation"])

View file

@ -1,14 +0,0 @@
import pyblish.api
from openpype.lib import change_timer_to_current_context
class StartTimer(pyblish.api.ContextPlugin):
label = "Start Timer"
order = pyblish.api.IntegratorOrder + 1
hosts = ["*"]
def process(self, context):
modules_settings = context.data["system_settings"]["modules"]
if modules_settings["timers_manager"]["disregard_publishing"]:
change_timer_to_current_context()

View file

@ -1,17 +0,0 @@
import os
import requests
import pyblish.api
class StopTimer(pyblish.api.ContextPlugin):
label = "Stop Timer"
order = pyblish.api.ExtractorOrder - 0.49
hosts = ["*"]
def process(self, context):
modules_settings = context.data["system_settings"]["modules"]
if modules_settings["timers_manager"]["disregard_publishing"]:
webserver_url = os.environ.get("OPENPYPE_WEBSERVER_URL")
rest_api_url = "{}/timers_manager/stop_timer".format(webserver_url)
requests.post(rest_api_url)

View file

@ -17,7 +17,7 @@ from openpype.pipeline import (
legacy_io,
)
from openpype.lib.avalon_context import get_workdir_from_session
from openpype.pipeline.context_tools import get_workdir_from_session
log = logging.getLogger("Update Slap Comp")

View file

@ -551,16 +551,16 @@ class SceneInventoryView(QtWidgets.QTreeView):
"toggle": selection_model.Toggle,
}[options.get("mode", "select")]
for item in iter_model_rows(model, 0):
item = item.data(InventoryModel.ItemRole)
for index in iter_model_rows(model, 0):
item = index.data(InventoryModel.ItemRole)
if item.get("isGroupNode"):
continue
name = item.get("objectName")
if name in object_names:
self.scrollTo(item) # Ensure item is visible
self.scrollTo(index) # Ensure item is visible
flags = select_mode | selection_model.Rows
selection_model.select(item, flags)
selection_model.select(index, flags)
object_names.remove(name)

View file

@ -12,7 +12,6 @@ from openpype.tools.utils import PlaceholderLineEdit
from openpype.tools.utils.delegates import PrettyTimeDelegate
from openpype.lib import (
emit_event,
get_workfile_template_key,
create_workdir_extra_folders,
)
from openpype.lib.avalon_context import (
@ -24,6 +23,8 @@ from openpype.pipeline import (
legacy_io,
Anatomy,
)
from openpype.pipeline.workfile import get_workfile_template_key
from .model import (
WorkAreaFilesModel,
PublishFilesModel,

View file

@ -5,11 +5,11 @@ import logging
from Qt import QtWidgets, QtCore
from openpype.lib import get_last_workfile_with_version
from openpype.pipeline import (
registered_host,
legacy_io,
)
from openpype.pipeline.workfile import get_last_workfile_with_version
from openpype.pipeline.template_data import get_template_data_with_names
from openpype.tools.utils import PlaceholderLineEdit

View file

@ -1,18 +1,20 @@
import os
import datetime
import copy
from Qt import QtCore, QtWidgets, QtGui
from openpype.client import (
get_asset_by_id,
get_asset_by_name,
get_workfile_info,
)
from openpype.client.operations import (
OperationsSession,
new_workfile_info_doc,
prepare_workfile_info_update_data,
)
from openpype import style
from openpype import resources
from openpype.lib import (
create_workfile_doc,
save_workfile_data_to_doc,
)
from openpype.pipeline import Anatomy
from openpype.pipeline import legacy_io
from openpype.tools.utils.assets_widget import SingleSelectAssetsWidget
from openpype.tools.utils.tasks_widget import TasksWidget
@ -324,10 +326,23 @@ class Window(QtWidgets.QWidget):
workfile_doc, data = self.side_panel.get_workfile_data()
if not workfile_doc:
filepath = self.files_widget._get_selected_filepath()
self._create_workfile_doc(filepath, force=True)
workfile_doc = self._get_current_workfile_doc()
workfile_doc = self._create_workfile_doc(filepath)
save_workfile_data_to_doc(workfile_doc, data, legacy_io)
new_workfile_doc = copy.deepcopy(workfile_doc)
new_workfile_doc["data"] = data
update_data = prepare_workfile_info_update_data(
workfile_doc, new_workfile_doc
)
if not update_data:
return
project_name = legacy_io.active_project()
session = OperationsSession()
session.update_entity(
project_name, "workfile", workfile_doc["_id"], update_data
)
session.commit()
def _get_current_workfile_doc(self, filepath=None):
if filepath is None:
@ -343,20 +358,32 @@ class Window(QtWidgets.QWidget):
project_name, asset_id, task_name, filename
)
def _create_workfile_doc(self, filepath, force=False):
workfile_doc = None
if not force:
workfile_doc = self._get_current_workfile_doc(filepath)
def _create_workfile_doc(self, filepath):
workfile_doc = self._get_current_workfile_doc(filepath)
if workfile_doc:
return workfile_doc
if not workfile_doc:
workdir, filename = os.path.split(filepath)
asset_id = self.assets_widget.get_selected_asset_id()
project_name = legacy_io.active_project()
asset_doc = get_asset_by_id(project_name, asset_id)
task_name = self.tasks_widget.get_selected_task_name()
create_workfile_doc(
asset_doc, task_name, filename, workdir, legacy_io
)
workdir, filename = os.path.split(filepath)
project_name = legacy_io.active_project()
asset_id = self.assets_widget.get_selected_asset_id()
task_name = self.tasks_widget.get_selected_task_name()
anatomy = Anatomy(project_name)
success, rootless_dir = anatomy.find_root_template_from_path(workdir)
filepath = "/".join([
os.path.normpath(rootless_dir).replace("\\", "/"),
filename
])
workfile_doc = new_workfile_info_doc(
filename, asset_id, task_name, [filepath]
)
session = OperationsSession()
session.create_entity(project_name, "workfile", workfile_doc)
session.commit()
return workfile_doc
def refresh(self):
# Refresh asset widget