🚧 work on publish job

This commit is contained in:
Ondrej Samohel 2023-02-07 19:22:54 +01:00
parent 5dda835a0d
commit cc8732aa9d
No known key found for this signature in database
GPG key ID: 02376E18990A97C6
4 changed files with 94 additions and 66 deletions

View file

@ -20,19 +20,19 @@ class Api:
self._rr_path = rr_path
os.environ["RR_ROOT"] = rr_path
def _get_rr_bin_path(self, tool_name=None, rr_root=None):
@staticmethod
def get_rr_bin_path(rr_root, tool_name=None):
# type: (str, str) -> str
"""Get path to RR bin folder.
Args:
tool_name (str): Name of RR executable you want.
rr_root (str, Optional): Custom RR root if needed.
rr_root (str): Custom RR root if needed.
Returns:
str: Path to the tool based on current platform.
"""
rr_root = rr_root or self._rr_path
is_64bit_python = sys.maxsize > 2 ** 32
rr_bin_parts = [rr_root, "bin"]
@ -65,7 +65,7 @@ class Api:
# type: () -> None
"""Set RR modules for Python."""
# default for linux
rr_bin = self._get_rr_bin_path()
rr_bin = self.get_rr_bin_path(self._rr_path)
rr_module_path = os.path.join(rr_bin, "lx64/lib")
if sys.platform.lower() == "win32":
@ -109,7 +109,8 @@ class Api:
def _submit_using_console(self, job_file):
# type: (SubmitFile) -> None
rr_start_local = self._get_rr_bin_path("rrStartLocal")
rr_start_local = self.get_rr_bin_path(
self._rr_path, "rrStartLocal")
self.log.info("rr_console: {}".format(rr_start_local))

View file

@ -32,7 +32,6 @@ class CreateMayaRoyalRenderJob(InstancePlugin):
Returns:
RRJob: RoyalRender job payload.
"""
def get_rr_platform():
if sys.platform.lower() in ["win32", "win64"]:

View file

@ -6,6 +6,10 @@ from openpype.pipeline import legacy_io
import requests
import os
from openpype.modules.royalrender.rr_job import RRJob, RREnvList
from openpype.pipeline.publish import KnownPublishError
from openpype.modules.royalrender.api import Api as rrApi
class CreatePublishRoyalRenderJob(InstancePlugin):
label = "Create publish job in RR"
@ -45,14 +49,12 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
).format(source))
return source
def _submit_post_job(self, instance, job, instances):
def get_job(self, instance, job, instances):
"""Submit publish job to RoyalRender."""
data = instance.data.copy()
subset = data["subset"]
job_name = "Publish - {subset}".format(subset=subset)
# instance.data.get("subset") != instances[0]["subset"]
# 'Main' vs 'renderMain'
override_version = None
instance_version = instance.data.get("version") # take this if exists
if instance_version != 1:
@ -62,6 +64,8 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
deepcopy(instance.data["anatomyData"]),
instance.data.get("asset"),
instances[0]["subset"],
# TODO: this shouldn't be hardcoded and is in fact settable by
# Settings.
'render',
override_version
)
@ -71,7 +75,7 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
metadata_path, roothless_metadata_path = \
self._create_metadata_path(instance)
environment = {
environment = RREnvList({
"AVALON_PROJECT": legacy_io.Session["AVALON_PROJECT"],
"AVALON_ASSET": legacy_io.Session["AVALON_ASSET"],
"AVALON_TASK": legacy_io.Session["AVALON_TASK"],
@ -80,7 +84,7 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
"OPENPYPE_RENDER_JOB": "0",
"OPENPYPE_REMOTE_JOB": "0",
"OPENPYPE_LOG_NO_COLORS": "1"
}
})
# add environments from self.environ_keys
for env_key in self.environ_keys:
@ -88,7 +92,16 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
environment[env_key] = os.environ[env_key]
# pass environment keys from self.environ_job_filter
job_environ = job["Props"].get("Env", {})
# and collect all pre_ids to wait for
job_environ = {}
jobs_pre_ids = []
for job in instance["rrJobs"]: # type: RRJob
if job.rrEnvList:
job_environ.update(
dict(RREnvList.parse(job.rrEnvList))
)
jobs_pre_ids.append(job.PreID)
for env_j_key in self.environ_job_filter:
if job_environ.get(env_j_key):
environment[env_j_key] = job_environ[env_j_key]
@ -99,7 +112,7 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
if mongo_url:
environment["OPENPYPE_MONGO"] = mongo_url
priority = self.deadline_priority or instance.data.get("priority", 50)
priority = self.priority or instance.data.get("priority", 50)
args = [
"--headless",
@ -109,66 +122,37 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
"--targets", "farm"
]
# Generate the payload for Deadline submission
payload = {
"JobInfo": {
"Plugin": self.deadline_plugin,
"BatchName": job["Props"]["Batch"],
"Name": job_name,
"UserName": job["Props"]["User"],
"Comment": instance.context.data.get("comment", ""),
"Department": self.deadline_department,
"ChunkSize": self.deadline_chunk_size,
"Priority": priority,
"Group": self.deadline_group,
"Pool": instance.data.get("primaryPool"),
"SecondaryPool": instance.data.get("secondaryPool"),
"OutputDirectory0": output_dir
},
"PluginInfo": {
"Version": self.plugin_pype_version,
"Arguments": " ".join(args),
"SingleFrameOnly": "True",
},
# Mandatory for Deadline, may be empty
"AuxFiles": [],
}
job = RRJob(
Software="Execute",
Renderer="Once",
# path to OpenPype
SeqStart=1,
SeqEnd=1,
SeqStep=1,
SeqFileOffset=0,
Version="1.0",
SceneName="",
IsActive=True,
ImageFilename="execOnce.file",
ImageDir="<SceneFolder>",
ImageExtension="",
ImagePreNumberLetter="",
SceneOS=RRJob.get_rr_platform(),
rrEnvList=environment.serialize(),
Priority=priority
)
# add assembly jobs as dependencies
if instance.data.get("tileRendering"):
self.log.info("Adding tile assembly jobs as dependencies...")
job_index = 0
for assembly_id in instance.data.get("assemblySubmissionJobs"):
payload["JobInfo"]["JobDependency{}".format(job_index)] = assembly_id # noqa: E501
job_index += 1
job.WaitForPreIDs += instance.data.get("assemblySubmissionJobs")
elif instance.data.get("bakingSubmissionJobs"):
self.log.info("Adding baking submission jobs as dependencies...")
job_index = 0
for assembly_id in instance.data["bakingSubmissionJobs"]:
payload["JobInfo"]["JobDependency{}".format(job_index)] = assembly_id # noqa: E501
job_index += 1
job.WaitForPreIDs += instance.data["bakingSubmissionJobs"]
else:
payload["JobInfo"]["JobDependency0"] = job["_id"]
job.WaitForPreIDs += jobs_pre_ids
if instance.data.get("suspend_publish"):
payload["JobInfo"]["InitialStatus"] = "Suspended"
for index, (key_, value_) in enumerate(environment.items()):
payload["JobInfo"].update(
{
"EnvironmentKeyValue%d"
% index: "{key}={value}".format(
key=key_, value=value_
)
}
)
# remove secondary pool
payload["JobInfo"].pop("SecondaryPool", None)
self.log.info("Submitting Deadline job ...")
self.log.info("Creating RoyalRender Publish job ...")
url = "{}/api/jobs".format(self.deadline_url)
response = requests.post(url, json=payload, timeout=10)

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Python wrapper for RoyalRender XML job file."""
import sys
from xml.dom import minidom as md
import attr
from collections import namedtuple, OrderedDict
@ -8,6 +9,23 @@ from collections import namedtuple, OrderedDict
CustomAttribute = namedtuple("CustomAttribute", ["name", "value"])
class RREnvList(dict):
def serialize(self):
# <rrEnvList>VariableA=ValueA~~~VariableB=ValueB</rrEnvList>
return "~~~".join(
["{}={}".format(k, v) for k, v in sorted(self.items())])
@staticmethod
def parse(data):
# type: (str) -> RREnvList
"""Parse rrEnvList string and return it as RREnvList object."""
out = RREnvList()
for var in data.split("~~~"):
k, v = data.split("=")
out[k] = v
return out
@attr.s
class RRJob:
"""Mapping of Royal Render job file to a data class."""
@ -108,7 +126,7 @@ class RRJob:
# jobs send from this machine. If a job with the PreID was found, then
# this jobs waits for the other job. Note: This flag can be used multiple
# times to wait for multiple jobs.
WaitForPreID = attr.ib(default=None) # type: int
WaitForPreIDs = attr.ib(factory=list) # type: list
# List of submitter options per job
# list item must be of `SubmitterParameter` type
@ -138,6 +156,21 @@ class RRJob:
TotalFrames = attr.ib(default=None) # type: int
Tiled = attr.ib(default=None) # type: str
# Environment
# only used in RR 8.3 and newer
rrEnvList = attr.ib(default=None) # type: str
@staticmethod
def get_rr_platform():
# type: () -> str
"""Returns name of platform used in rr jobs."""
if sys.platform.lower() in ["win32", "win64"]:
return "windows"
elif sys.platform.lower() == "darwin":
return "mac"
else:
return "linux"
class SubmitterParameter:
"""Wrapper for Submitter Parameters."""
@ -242,6 +275,8 @@ class SubmitFile:
job, dict_factory=OrderedDict, filter=filter_data)
serialized_job.pop("CustomAttributes")
serialized_job.pop("SubmitterParameters")
# we are handling `WaitForPreIDs` separately.
wait_pre_ids = serialized_job.pop("WaitForPreIDs", [])
for custom_attr in job_custom_attributes: # type: CustomAttribute
serialized_job["Custom{}".format(
@ -253,6 +288,15 @@ class SubmitFile:
root.createTextNode(str(value))
)
xml_job.appendChild(xml_attr)
# WaitForPreID - can be used multiple times
for pre_id in wait_pre_ids:
xml_attr = root.createElement("WaitForPreID")
xml_attr.appendChild(
root.createTextNode(str(pre_id))
)
xml_job.appendChild(xml_attr)
job_file.appendChild(xml_job)
return root.toprettyxml(indent="\t")