Merge pull request #3225 from pypeclub/enhancement/OP-2787_Maya-ABC-farm-publishing

Deadline: publishing of animation and pointcache on a farm
This commit is contained in:
Petr Kalis 2022-06-07 10:39:56 +02:00 committed by GitHub
commit df12eec2c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 394 additions and 59 deletions

View file

@ -44,6 +44,7 @@ from . import resources
from .plugin import (
Extractor,
Integrator,
ValidatePipelineOrder,
ValidateContentsOrder,
@ -86,6 +87,7 @@ __all__ = [
# plugin classes
"Extractor",
"Integrator",
# ordering
"ValidatePipelineOrder",
"ValidateContentsOrder",

View file

@ -66,13 +66,23 @@ def install():
log.info("Installing callbacks ... ")
register_event_callback("init", on_init)
# Callbacks below are not required for headless mode, the `init` however
# is important to load referenced Alembics correctly at rendertime.
if os.environ.get("HEADLESS_PUBLISH"):
# Maya launched on farm, lib.IS_HEADLESS might be triggered locally too
# target "farm" == rendering on farm, expects OPENPYPE_PUBLISH_DATA
# target "remote" == remote execution
print("Registering pyblish target: remote")
pyblish.api.register_target("remote")
return
if lib.IS_HEADLESS:
log.info(("Running in headless mode, skipping Maya "
"save/open/new callback installation.."))
return
print("Registering pyblish target: local")
pyblish.api.register_target("local")
_set_project()
_register_callbacks()

View file

@ -38,3 +38,7 @@ class CreateAnimation(plugin.Creator):
# Default to exporting world-space
self.data["worldSpace"] = True
# Default to not send to farm.
self.data["farm"] = False
self.data["priority"] = 50

View file

@ -28,3 +28,7 @@ class CreatePointCache(plugin.Creator):
# Add options for custom attributes
self.data["attr"] = ""
self.data["attrPrefix"] = ""
# Default to not send to farm.
self.data["farm"] = False
self.data["priority"] = 50

View file

@ -55,3 +55,6 @@ class CollectAnimationOutputGeometry(pyblish.api.InstancePlugin):
# Store data in the instance for the validator
instance.data["out_hierarchy"] = hierarchy
if instance.data.get("farm"):
instance.data["families"].append("publish.farm")

View file

@ -0,0 +1,14 @@
import pyblish.api
class CollectPointcache(pyblish.api.InstancePlugin):
"""Collect pointcache data for instance."""
order = pyblish.api.CollectorOrder + 0.4
families = ["pointcache"]
label = "Collect Pointcache"
hosts = ["maya"]
def process(self, instance):
if instance.data.get("farm"):
instance.data["families"].append("publish.farm")

View file

@ -16,13 +16,19 @@ class ExtractAnimation(openpype.api.Extractor):
Positions and normals, uvs, creases are preserved, but nothing more,
for plain and predictable point caches.
Plugin can run locally or remotely (on a farm - if instance is marked with
"farm" it will be skipped in local processing, but processed on farm)
"""
label = "Extract Animation"
hosts = ["maya"]
families = ["animation"]
targets = ["local", "remote"]
def process(self, instance):
if instance.data.get("farm"):
self.log.debug("Should be processed on farm, skipping.")
return
# Collect the out set nodes
out_sets = [node for node in instance if node.endswith("out_SET")]
@ -89,4 +95,6 @@ class ExtractAnimation(openpype.api.Extractor):
}
instance.data["representations"].append(representation)
instance.context.data["cleanupFullPaths"].append(path)
self.log.info("Extracted {} to {}".format(instance, dirname))

View file

@ -16,6 +16,8 @@ class ExtractAlembic(openpype.api.Extractor):
Positions and normals, uvs, creases are preserved, but nothing more,
for plain and predictable point caches.
Plugin can run locally or remotely (on a farm - if instance is marked with
"farm" it will be skipped in local processing, but processed on farm)
"""
label = "Extract Pointcache (Alembic)"
@ -23,8 +25,12 @@ class ExtractAlembic(openpype.api.Extractor):
families = ["pointcache",
"model",
"vrayproxy"]
targets = ["local", "remote"]
def process(self, instance):
if instance.data.get("farm"):
self.log.debug("Should be processed on farm, skipping.")
return
nodes = instance[:]
@ -92,4 +98,6 @@ class ExtractAlembic(openpype.api.Extractor):
}
instance.data["representations"].append(representation)
instance.context.data["cleanupFullPaths"].append(path)
self.log.info("Extracted {} to {}".format(instance, dirname))

View file

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<root>
<error id="main">
<title>Errors found</title>
<description>
## Publish process has errors
At least one plugin failed before this plugin, job won't be sent to Deadline for processing before all issues are fixed.
### How to repair?
Check all failing plugins (should be highlighted in red) and fix issues if possible.
</description>
</error>
</root>

View file

@ -30,6 +30,10 @@ class ValidateAnimationContent(pyblish.api.InstancePlugin):
assert 'out_hierarchy' in instance.data, "Missing `out_hierarchy` data"
out_sets = [node for node in instance if node.endswith("out_SET")]
msg = "Couldn't find exactly one out_SET: {0}".format(out_sets)
assert len(out_sets) == 1, msg
# All nodes in the `out_hierarchy` must be among the nodes that are
# in the instance. The nodes in the instance are found from the top
# group, as such this tests whether all nodes are under that top group.

View file

@ -1282,7 +1282,13 @@ class EnvironmentPrepData(dict):
def get_app_environments_for_context(
project_name, asset_name, task_name, app_name, env_group=None, env=None
project_name,
asset_name,
task_name,
app_name,
env_group=None,
env=None,
modules_manager=None
):
"""Prepare environment variables by context.
Args:
@ -1293,10 +1299,12 @@ def get_app_environments_for_context(
by ApplicationManager.
env (dict): Initial environment variables. `os.environ` is used when
not passed.
modules_manager (ModulesManager): Initialized modules manager.
Returns:
dict: Environments for passed context and application.
"""
from openpype.pipeline import AvalonMongoDB
# Avalon database connection
@ -1311,6 +1319,11 @@ def get_app_environments_for_context(
"name": asset_name
})
if modules_manager is None:
from openpype.modules import ModulesManager
modules_manager = ModulesManager()
# Prepare app object which can be obtained only from ApplciationManager
app_manager = ApplicationManager()
app = app_manager.applications[app_name]
@ -1334,7 +1347,7 @@ def get_app_environments_for_context(
"env": env
})
prepare_app_environments(data, env_group)
prepare_app_environments(data, env_group, modules_manager)
prepare_context_environments(data, env_group)
# Discard avalon connection
@ -1355,9 +1368,12 @@ def _merge_env(env, current_env):
return result
def _add_python_version_paths(app, env, logger):
def _add_python_version_paths(app, env, logger, modules_manager):
"""Add vendor packages specific for a Python version."""
for module in modules_manager.get_enabled_modules():
module.modify_application_launch_arguments(app, env)
# Skip adding if host name is not set
if not app.host_name:
return
@ -1390,7 +1406,9 @@ def _add_python_version_paths(app, env, logger):
env["PYTHONPATH"] = os.pathsep.join(python_paths)
def prepare_app_environments(data, env_group=None, implementation_envs=True):
def prepare_app_environments(
data, env_group=None, implementation_envs=True, modules_manager=None
):
"""Modify launch environments based on launched app and context.
Args:
@ -1403,7 +1421,12 @@ def prepare_app_environments(data, env_group=None, implementation_envs=True):
log = data["log"]
source_env = data["env"].copy()
_add_python_version_paths(app, source_env, log)
if modules_manager is None:
from openpype.modules import ModulesManager
modules_manager = ModulesManager()
_add_python_version_paths(app, source_env, log, modules_manager)
# Use environments from local settings
filtered_local_envs = {}

View file

@ -60,7 +60,7 @@ def start_webpublish_log(dbcon, batch_id, user):
}).inserted_id
def publish(log, close_plugin_name=None):
def publish(log, close_plugin_name=None, raise_error=False):
"""Loops through all plugins, logs to console. Used for tests.
Args:
@ -79,10 +79,15 @@ def publish(log, close_plugin_name=None):
result["plugin"].label, record.msg))
if result["error"]:
log.error(error_format.format(**result))
error_message = error_format.format(**result)
log.error(error_message)
if close_plugin: # close host app explicitly after error
context = pyblish.api.Context()
close_plugin().process(context)
if raise_error:
# Fatal Error is because of Deadline
error_message = "Fatal Error: " + error_format.format(**result)
raise RuntimeError(error_message)
def publish_and_log(dbcon, _id, log, close_plugin_name=None, batch_id=None):
@ -228,7 +233,7 @@ def _get_close_plugin(close_plugin_name, log):
if plugin.__name__ == close_plugin_name:
return plugin
log.warning("Close plugin not found, app might not close.")
log.debug("Close plugin not found, app might not close.")
def get_task_data(batch_dir):

View file

@ -370,6 +370,7 @@ def _load_modules():
class _OpenPypeInterfaceMeta(ABCMeta):
"""OpenPypeInterface meta class to print proper string."""
def __str__(self):
return "<'OpenPypeInterface.{}'>".format(self.__name__)
@ -388,6 +389,7 @@ class OpenPypeInterface:
OpenPype modules which means they have to have implemented methods defined
in the interface. By default interface does not have any abstract parts.
"""
pass
@ -432,10 +434,12 @@ class OpenPypeModule:
It is not recommended to override __init__ that's why specific method
was implemented.
"""
pass
def connect_with_modules(self, enabled_modules):
"""Connect with other enabled modules."""
pass
def get_global_environments(self):
@ -443,8 +447,22 @@ class OpenPypeModule:
Environment variables that can be get only from system settings.
"""
return {}
def modify_application_launch_arguments(self, application, env):
"""Give option to modify launch environments before application launch.
Implementation is optional. To change environments modify passed
dictionary of environments.
Args:
application (Application): Application that is launched.
env (dict): Current environemnt variables.
"""
pass
def cli(self, module_click_group):
"""Add commands to click group.
@ -465,6 +483,7 @@ class OpenPypeModule:
def mycommand():
print("my_command")
"""
pass

View file

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
"""Collect instances that should be processed and published on DL.
"""
import os
import pyblish.api
from openpype.pipeline import PublishValidationError
class CollectDeadlinePublishableInstances(pyblish.api.InstancePlugin):
"""Collect instances that should be processed and published on DL.
Some long running publishes (not just renders) could be offloaded to DL,
this plugin compares theirs name against env variable, marks only
publishable by farm.
Triggered only when running only in headless mode, eg on a farm.
"""
order = pyblish.api.CollectorOrder + 0.499
label = "Collect Deadline Publishable Instance"
targets = ["remote"]
def process(self, instance):
self.log.debug("CollectDeadlinePublishableInstances")
publish_inst = os.environ.get("OPENPYPE_PUBLISH_SUBSET", '')
if not publish_inst:
raise PublishValidationError("OPENPYPE_PUBLISH_SUBSET env var "
"required for remote publishing")
subset_name = instance.data["subset"]
if subset_name == publish_inst:
self.log.debug("Publish {}".format(subset_name))
instance.data["publish"] = True
instance.data["farm"] = False
else:
self.log.debug("Skipping {}".format(subset_name))
instance.data["publish"] = False

View file

@ -0,0 +1,136 @@
import os
import requests
from maya import cmds
from openpype.pipeline import legacy_io, PublishXmlValidationError
from openpype.settings import get_project_settings
import openpype.api
import pyblish.api
class MayaSubmitRemotePublishDeadline(openpype.api.Integrator):
"""Submit Maya scene to perform a local publish in Deadline.
Publishing in Deadline can be helpful for scenes that publish very slow.
This way it can process in the background on another machine without the
Artist having to wait for the publish to finish on their local machine.
Submission is done through the Deadline Web Service. DL then triggers
`openpype/scripts/remote_publish.py`.
Each publishable instance creates its own full publish job.
Different from `ProcessSubmittedJobOnFarm` which creates publish job
depending on metadata json containing context and instance data of
rendered files.
"""
label = "Submit Scene to Deadline"
order = pyblish.api.IntegratorOrder
hosts = ["maya"]
families = ["publish.farm"]
def process(self, instance):
settings = get_project_settings(os.getenv("AVALON_PROJECT"))
# use setting for publish job on farm, no reason to have it separately
deadline_publish_job_sett = (settings["deadline"]
["publish"]
["ProcessSubmittedJobOnFarm"])
# Ensure no errors so far
if not (all(result["success"]
for result in instance.context.data["results"])):
raise PublishXmlValidationError("Publish process has errors")
if not instance.data["publish"]:
self.log.warning("No active instances found. "
"Skipping submission..")
return
scene = instance.context.data["currentFile"]
scenename = os.path.basename(scene)
# Get project code
project_name = legacy_io.Session["AVALON_PROJECT"]
job_name = "{scene} [PUBLISH]".format(scene=scenename)
batch_name = "{code} - {scene}".format(code=project_name,
scene=scenename)
# Generate the payload for Deadline submission
payload = {
"JobInfo": {
"Plugin": "MayaBatch",
"BatchName": batch_name,
"Name": job_name,
"UserName": instance.context.data["user"],
"Comment": instance.context.data.get("comment", ""),
# "InitialStatus": state
"Department": deadline_publish_job_sett["deadline_department"],
"ChunkSize": deadline_publish_job_sett["deadline_chunk_size"],
"Priority": deadline_publish_job_sett["deadline_priority"],
"Group": deadline_publish_job_sett["deadline_group"],
"Pool": deadline_publish_job_sett["deadline_pool"],
},
"PluginInfo": {
"Build": None, # Don't force build
"StrictErrorChecking": True,
"ScriptJob": True,
# Inputs
"SceneFile": scene,
"ScriptFilename": "{OPENPYPE_REPOS_ROOT}/openpype/scripts/remote_publish.py", # noqa
# Mandatory for Deadline
"Version": cmds.about(version=True),
# Resolve relative references
"ProjectPath": cmds.workspace(query=True,
rootDirectory=True),
},
# Mandatory for Deadline, may be empty
"AuxFiles": []
}
# Include critical environment variables with submission + api.Session
keys = [
"FTRACK_API_USER",
"FTRACK_API_KEY",
"FTRACK_SERVER"
]
environment = dict({key: os.environ[key] for key in keys
if key in os.environ}, **legacy_io.Session)
# TODO replace legacy_io with context.data ?
environment["AVALON_PROJECT"] = legacy_io.Session["AVALON_PROJECT"]
environment["AVALON_ASSET"] = legacy_io.Session["AVALON_ASSET"]
environment["AVALON_TASK"] = legacy_io.Session["AVALON_TASK"]
environment["AVALON_APP_NAME"] = os.environ.get("AVALON_APP_NAME")
environment["OPENPYPE_LOG_NO_COLORS"] = "1"
environment["OPENPYPE_REMOTE_JOB"] = "1"
environment["OPENPYPE_USERNAME"] = instance.context.data["user"]
environment["OPENPYPE_PUBLISH_SUBSET"] = instance.data["subset"]
environment["HEADLESS_PUBLISH"] = "1"
payload["JobInfo"].update({
"EnvironmentKeyValue%d" % index: "{key}={value}".format(
key=key,
value=environment[key]
) for index, key in enumerate(environment)
})
self.log.info("Submitting Deadline job ...")
deadline_url = instance.context.data["defaultDeadline"]
# if custom one is set in instance, use that
if instance.data.get("deadlineUrl"):
deadline_url = instance.data.get("deadlineUrl")
assert deadline_url, "Requires Deadline Webservice URL"
url = "{}/api/jobs".format(deadline_url)
response = requests.post(url, json=payload, timeout=10)
if not response.ok:
raise Exception(response.text)

View file

@ -87,6 +87,13 @@ def inject_openpype_environment(deadlinePlugin):
for key, value in contents.items():
deadlinePlugin.SetProcessEnvironmentVariable(key, value)
script_url = job.GetJobPluginInfoKeyValue("ScriptFilename")
if script_url:
script_url = script_url.format(**contents).replace("\\", "/")
print(">>> Setting script path {}".format(script_url))
job.SetJobPluginInfoKeyValue("ScriptFilename", script_url)
print(">>> Removing temporary file")
os.remove(export_url)
@ -196,16 +203,19 @@ def __main__(deadlinePlugin):
job.GetJobEnvironmentKeyValue('OPENPYPE_RENDER_JOB') or '0'
openpype_publish_job = \
job.GetJobEnvironmentKeyValue('OPENPYPE_PUBLISH_JOB') or '0'
openpype_remote_job = \
job.GetJobEnvironmentKeyValue('OPENPYPE_REMOTE_JOB') or '0'
print("--- Job type - render {}".format(openpype_render_job))
print("--- Job type - publish {}".format(openpype_publish_job))
print("--- Job type - remote {}".format(openpype_remote_job))
if openpype_publish_job == '1' and openpype_render_job == '1':
raise RuntimeError("Misconfiguration. Job couldn't be both " +
"render and publish.")
if openpype_publish_job == '1':
inject_render_job_id(deadlinePlugin)
elif openpype_render_job == '1':
elif openpype_render_job == '1' or openpype_remote_job == '1':
inject_openpype_environment(deadlinePlugin)
else:
pype(deadlinePlugin) # backward compatibility with Pype2

View file

@ -88,6 +88,40 @@ class FtrackModule(
"""Implementation of `ILaunchHookPaths`."""
return os.path.join(FTRACK_MODULE_DIR, "launch_hooks")
def modify_application_launch_arguments(self, application, env):
if not application.use_python_2:
return
self.log.info("Adding Ftrack Python 2 packages to PYTHONPATH.")
# Prepare vendor dir path
python_2_vendor = os.path.join(FTRACK_MODULE_DIR, "python2_vendor")
# Add Python 2 modules
python_paths = [
# `python-ftrack-api`
os.path.join(python_2_vendor, "ftrack-python-api", "source"),
# `arrow`
os.path.join(python_2_vendor, "arrow"),
# `builtins` from `python-future`
# - `python-future` is strict Python 2 module that cause crashes
# of Python 3 scripts executed through OpenPype
# (burnin script etc.)
os.path.join(python_2_vendor, "builtins"),
# `backports.functools_lru_cache`
os.path.join(
python_2_vendor, "backports.functools_lru_cache"
)
]
# Load PYTHONPATH from current launch context
python_path = env.get("PYTHONPATH")
if python_path:
python_paths.append(python_path)
# Set new PYTHONPATH to launch context environments
env["PYTHONPATH"] = os.pathsep.join(python_paths)
def connect_with_modules(self, enabled_modules):
for module in enabled_modules:
if not hasattr(module, "get_ftrack_event_handler_paths"):

View file

@ -1,43 +0,0 @@
import os
from openpype.lib import PreLaunchHook
from openpype_modules.ftrack import FTRACK_MODULE_DIR
class PrePython2Support(PreLaunchHook):
"""Add python ftrack api module for Python 2 to PYTHONPATH.
Path to vendor modules is added to the beggining of PYTHONPATH.
"""
def execute(self):
if not self.application.use_python_2:
return
self.log.info("Adding Ftrack Python 2 packages to PYTHONPATH.")
# Prepare vendor dir path
python_2_vendor = os.path.join(FTRACK_MODULE_DIR, "python2_vendor")
# Add Python 2 modules
python_paths = [
# `python-ftrack-api`
os.path.join(python_2_vendor, "ftrack-python-api", "source"),
# `arrow`
os.path.join(python_2_vendor, "arrow"),
# `builtins` from `python-future`
# - `python-future` is strict Python 2 module that cause crashes
# of Python 3 scripts executed through OpenPype (burnin script etc.)
os.path.join(python_2_vendor, "builtins"),
# `backports.functools_lru_cache`
os.path.join(
python_2_vendor, "backports.functools_lru_cache"
)
]
# Load PYTHONPATH from current launch context
python_path = self.launch_context.env.get("PYTHONPATH")
if python_path:
python_paths.append(python_path)
# Set new PYTHONPATH to launch context environments
self.launch_context.env["PYTHONPATH"] = os.pathsep.join(python_paths)

View file

@ -18,6 +18,16 @@ class InstancePlugin(pyblish.api.InstancePlugin):
super(InstancePlugin, cls).process(cls, *args, **kwargs)
class Integrator(InstancePlugin):
"""Integrator base class.
Wraps pyblish instance plugin. Targets set to "local" which means all
integrators should run on "local" publishes, by default.
"farm" targets could be used for integrators that should run on a farm.
"""
targets = ["local"]
class Extractor(InstancePlugin):
"""Extractor base class.
@ -28,6 +38,8 @@ class Extractor(InstancePlugin):
"""
targets = ["local"]
order = 2.0
def staging_dir(self, instance):

View file

@ -139,6 +139,10 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
ef, instance.data["family"], instance.data["families"]))
return
# instance should be published on a farm
if instance.data.get("farm"):
return
self.integrated_file_sizes = {}
try:
self.register(instance)

View file

@ -144,6 +144,7 @@ class PypeCommands:
pyblish.api.register_target("farm")
os.environ["OPENPYPE_PUBLISH_DATA"] = os.pathsep.join(paths)
os.environ["HEADLESS_PUBLISH"] = 'true' # to use in app lib
log.info("Running publish ...")
@ -173,9 +174,11 @@ class PypeCommands:
user_email, targets=None):
"""Opens installed variant of 'host' and run remote publish there.
Eventually should be yanked out to Webpublisher cli.
Currently implemented and tested for Photoshop where customer
wants to process uploaded .psd file and publish collected layers
from there.
from there. Triggered by Webpublisher.
Checks if no other batches are running (status =='in_progress). If
so, it sleeps for SLEEP (this is separate process),
@ -273,7 +276,8 @@ class PypeCommands:
def remotepublish(project, batch_path, user_email, targets=None):
"""Start headless publishing.
Used to publish rendered assets, workfiles etc.
Used to publish rendered assets, workfiles etc via Webpublisher.
Eventually should be yanked out to Webpublisher cli.
Publish use json from passed paths argument.
@ -309,6 +313,7 @@ class PypeCommands:
os.environ["AVALON_PROJECT"] = project
os.environ["AVALON_APP"] = host_name
os.environ["USER_EMAIL"] = user_email
os.environ["HEADLESS_PUBLISH"] = 'true' # to use in app lib
pyblish.api.register_host(host_name)
@ -331,9 +336,12 @@ class PypeCommands:
log.info("Publish finished.")
@staticmethod
def extractenvironments(
output_json_path, project, asset, task, app, env_group
):
def extractenvironments(output_json_path, project, asset, task, app,
env_group):
"""Produces json file with environment based on project and app.
Called by Deadline plugin to propagate environment into render jobs.
"""
if all((project, asset, task, app)):
from openpype.api import get_app_environments_for_context
env = get_app_environments_for_context(

View file

@ -0,0 +1,11 @@
try:
from openpype.api import Logger
import openpype.lib.remote_publish
except ImportError as exc:
# Ensure Deadline fails by output an error that contains "Fatal Error:"
raise ImportError("Fatal Error: %s" % exc)
if __name__ == "__main__":
# Perform remote publish with thorough error checking
log = Logger.get_logger(__name__)
openpype.lib.remote_publish.publish(log, raise_error=True)

View file

@ -312,6 +312,10 @@ Example setup:
![Maya - Point Cache Example](assets/maya-pointcache_setup.png)
:::note Publish on farm
If your studio has Deadline configured, artists could choose to offload potentially long running export of pointache and publish it to the farm.
Only thing that is necessary is to toggle `Farm` property in created pointcache instance to True.
### Loading Point Caches
Loading point cache means creating reference to **abc** file with Go **OpenPype → Load...**.