🎨 add nuke job creator

This commit is contained in:
Ondrej Samohel 2023-04-28 18:37:26 +02:00
parent 3a9109422d
commit 92bb47ed23
No known key found for this signature in database
GPG key ID: 02376E18990A97C6
5 changed files with 487 additions and 76 deletions

View file

@ -0,0 +1,307 @@
# -*- coding: utf-8 -*-
"""Submitting render job to RoyalRender."""
import os
import sys
import re
import platform
from datetime import datetime
from pyblish.api import InstancePlugin, IntegratorOrder, Context
from openpype.tests.lib import is_in_tests
from openpype.lib import is_running_from_build
from openpype.pipeline.publish.lib import get_published_workfile_instance
from openpype.pipeline.publish import KnownPublishError
from openpype.modules.royalrender.api import Api as rrApi
from openpype.modules.royalrender.rr_job import (
RRJob, CustomAttribute, get_rr_platform)
from openpype.lib import (
is_running_from_build,
BoolDef,
NumberDef
)
class CreateNukeRoyalRenderJob(InstancePlugin):
label = "Create Nuke Render job in RR"
order = IntegratorOrder + 0.1
hosts = ["nuke"]
families = ["render", "prerender"]
targets = ["local"]
optional = True
priority = 50
chunk_size = 1
concurrent_tasks = 1
@classmethod
def get_attribute_defs(cls):
return [
NumberDef(
"priority",
label="Priority",
default=cls.priority,
decimals=0
),
NumberDef(
"chunk",
label="Frames Per Task",
default=cls.chunk_size,
decimals=0,
minimum=1,
maximum=1000
),
NumberDef(
"concurrency",
label="Concurrency",
default=cls.concurrent_tasks,
decimals=0,
minimum=1,
maximum=10
),
BoolDef(
"use_gpu",
default=cls.use_gpu,
label="Use GPU"
),
BoolDef(
"suspend_publish",
default=False,
label="Suspend publish"
)
]
def __init__(self, *args, **kwargs):
self._instance = None
self._rr_root = None
self.scene_path = None
self.job = None
self.submission_parameters = None
self.rr_api = None
def process(self, instance):
if not instance.data.get("farm"):
self.log.info("Skipping local instance.")
return
instance.data["attributeValues"] = self.get_attr_values_from_data(
instance.data)
# add suspend_publish attributeValue to instance data
instance.data["suspend_publish"] = instance.data["attributeValues"][
"suspend_publish"]
context = instance.context
self._rr_root = self._resolve_rr_path(context, instance.data.get(
"rrPathName")) # noqa
self.log.debug(self._rr_root)
if not self._rr_root:
raise KnownPublishError(
("Missing RoyalRender root. "
"You need to configure RoyalRender module."))
self.rr_api = rrApi(self._rr_root)
self.scene_path = context.data["currentFile"]
if self.use_published:
file_path = get_published_workfile_instance(context)
# fallback if nothing was set
if not file_path:
self.log.warning("Falling back to workfile")
file_path = context.data["currentFile"]
self.scene_path = file_path
self.log.info(
"Using published scene for render {}".format(self.scene_path)
)
if not self._instance.data.get("expectedFiles"):
self._instance.data["expectedFiles"] = []
if not self._instance.data.get("rrJobs"):
self._instance.data["rrJobs"] = []
self._instance.data["rrJobs"] += self.create_jobs()
# redefinition of families
if "render" in self._instance.data["family"]:
self._instance.data["family"] = "write"
self._instance.data["families"].insert(0, "render2d")
elif "prerender" in self._instance.data["family"]:
self._instance.data["family"] = "write"
self._instance.data["families"].insert(0, "prerender")
self._instance.data["outputDir"] = os.path.dirname(
self._instance.data["path"]).replace("\\", "/")
def create_jobs(self):
submit_frame_start = int(self._instance.data["frameStartHandle"])
submit_frame_end = int(self._instance.data["frameEndHandle"])
# get output path
render_path = self._instance.data['path']
script_path = self.scene_path
node = self._instance.data["transientData"]["node"]
# main job
jobs = [
self.get_job(
script_path,
render_path,
node.name(),
submit_frame_start,
submit_frame_end,
)
]
for baking_script in self._instance.data.get("bakingNukeScripts", []):
render_path = baking_script["bakeRenderPath"]
script_path = baking_script["bakeScriptPath"]
exe_node_name = baking_script["bakeWriteNodeName"]
jobs.append(self.get_job(
script_path,
render_path,
exe_node_name,
submit_frame_start,
submit_frame_end
))
return jobs
def get_job(self, script_path, render_path,
node_name, start_frame, end_frame):
"""Get RR job based on current instance.
Args:
script_path (str): Path to Nuke script.
render_path (str): Output path.
node_name (str): Name of the render node.
start_frame (int): Start frame.
end_frame (int): End frame.
Returns:
RRJob: RoyalRender Job instance.
"""
render_dir = os.path.normpath(os.path.dirname(render_path))
batch_name = os.path.basename(script_path)
jobname = "%s - %s" % (batch_name, self._instance.name)
if is_in_tests():
batch_name += datetime.now().strftime("%d%m%Y%H%M%S")
output_filename_0 = self.preview_fname(render_path)
custom_attributes = []
if is_running_from_build():
custom_attributes = [
CustomAttribute(
name="OpenPypeVersion",
value=os.environ.get("OPENPYPE_VERSION"))
]
nuke_version = re.search(
r"\d+\.\d+", self._instance.context.data.get("hostVersion"))
# this will append expected files to instance as needed.
expected_files = self.expected_files(
render_path, start_frame, end_frame)
first_file = next(self._iter_expected_files(expected_files))
job = RRJob(
Software="Nuke",
Renderer="",
SeqStart=int(start_frame),
SeqEnd=int(end_frame),
SeqStep=int(self._instance.data.get("byFrameStep"), 1),
SeqFileOffset=0,
Version=nuke_version.group(),
SceneName=script_path,
IsActive=True,
ImageDir=render_dir.replace("\\", "/"),
ImageFilename="{}.".format(os.path.splitext(first_file)[0]),
ImageExtension=os.path.splitext(first_file)[1],
ImagePreNumberLetter=".",
ImageSingleOutputFile=False,
SceneOS=get_rr_platform(),
Layer=node_name,
SceneDatabaseDir=script_path,
CustomSHotName=self._instance.context.data["asset"],
CompanyProjectName=self._instance.context.data["projectName"],
ImageWidth=self._instance.data["resolutionWidth"],
ImageHeight=self._instance.data["resolutionHeight"],
CustomAttributes=custom_attributes
)
@staticmethod
def _resolve_rr_path(context, rr_path_name):
# type: (Context, str) -> str
rr_settings = (
context.data
["system_settings"]
["modules"]
["royalrender"]
)
try:
default_servers = rr_settings["rr_paths"]
project_servers = (
context.data
["project_settings"]
["royalrender"]
["rr_paths"]
)
rr_servers = {
k: default_servers[k]
for k in project_servers
if k in default_servers
}
except (AttributeError, KeyError):
# Handle situation were we had only one url for royal render.
return context.data["defaultRRPath"][platform.system().lower()]
return rr_servers[rr_path_name][platform.system().lower()]
def expected_files(self, path, start_frame, end_frame):
"""Get expected files.
This function generate expected files from provided
path and start/end frames.
It was taken from Deadline module, but this should be
probably handled better in collector to support more
flexible scenarios.
Args:
path (str): Output path.
start_frame (int): Start frame.
end_frame (int): End frame.
Returns:
list: List of expected files.
"""
dir_name = os.path.dirname(path)
file = os.path.basename(path)
expected_files = []
if "#" in file:
pparts = file.split("#")
padding = "%0{}d".format(len(pparts) - 1)
file = pparts[0] + padding + pparts[-1]
if "%" not in file:
expected_files.append(path)
return
if self._instance.data.get("slate"):
start_frame -= 1
expected_files.extend(
os.path.join(dir_name, (file % i)).replace("\\", "/")
for i in range(start_frame, (end_frame + 1))
)
return expected_files

View file

@ -2,14 +2,20 @@
"""Create publishing job on RoyalRender."""
import os
from copy import deepcopy
import json
from pyblish.api import InstancePlugin, IntegratorOrder
from pyblish.api import InstancePlugin, IntegratorOrder, Instance
from openpype.pipeline import legacy_io
from openpype.modules.royalrender.rr_job import RRJob, RREnvList
from openpype.pipeline.publish import KnownPublishError
from openpype.lib.openpype_version import (
get_OpenPypeVersion, get_openpype_version)
from openpype.pipeline.farm.pyblish import (
create_skeleton_instance,
create_instances_for_aov,
attach_instances_to_subset
)
class CreatePublishRoyalRenderJob(InstancePlugin):
@ -31,50 +37,84 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
self.context = context
self.anatomy = instance.context.data["anatomy"]
# asset = data.get("asset")
# subset = data.get("subset")
# source = self._remap_source(
# data.get("source") or context.data["source"])
if not instance.data.get("farm"):
self.log.info("Skipping local instance.")
return
instance_skeleton_data = create_skeleton_instance(
instance,
families_transfer=self.families_transfer,
instance_transfer=self.instance_transfer)
instances = None
if isinstance(instance.data.get("expectedFiles")[0], dict):
instances = create_instances_for_aov(
instance, instance_skeleton_data, self.aov_filter)
def _remap_source(self, source):
success, rootless_path = (
self.anatomy.find_root_template_from_path(source)
)
if success:
source = rootless_path
else:
# `rootless_path` is not set to `source` if none of roots match
self.log.warning((
"Could not find root path for remapping \"{}\"."
" This may cause issues."
).format(source))
return source
representations = self._get_representations(
instance_skeleton_data,
instance.data.get("expectedFiles")
)
def get_job(self, instance, job, instances):
"""Submit publish job to RoyalRender."""
if "representations" not in instance_skeleton_data.keys():
instance_skeleton_data["representations"] = []
# add representation
instance_skeleton_data["representations"] += representations
instances = [instance_skeleton_data]
# attach instances to subset
if instance.data.get("attachTo"):
instances = attach_instances_to_subset(
instance.data.get("attachTo"), instances
)
self.log.info("Creating RoyalRender Publish job ...")
if not instance.data.get("rrJobs"):
self.log.error(("There is no prior RoyalRender "
"job on the instance."))
raise KnownPublishError(
"Can't create publish job without prior ppducing jobs first")
publish_job = self.get_job(instance, instances)
instance.data["rrJobs"] += publish_job
metadata_path, rootless_metadata_path = self._create_metadata_path(
instance)
self.log.info("Writing json file: {}".format(metadata_path))
with open(metadata_path, "w") as f:
json.dump(publish_job, f, indent=4, sort_keys=True)
def get_job(self, instance, instances):
"""Create RR publishing job.
Based on provided original instance and additional instances,
create publishing job and return it to be submitted to farm.
Args:
instance (Instance): Original instance.
instances (list of Instance): List of instances to
be published on farm.
Returns:
RRJob: RoyalRender publish job.
"""
data = instance.data.copy()
subset = data["subset"]
job_name = "Publish - {subset}".format(subset=subset)
override_version = None
instance_version = instance.data.get("version") # take this if exists
if instance_version != 1:
override_version = instance_version
output_dir = self._get_publish_folder(
instance.context.data['anatomy'],
deepcopy(instance.data["anatomyData"]),
instance.data.get("asset"),
instances[0]["subset"],
# TODO: this shouldn't be hardcoded and is in fact settable by
# Settings.
'render',
override_version
)
override_version = instance_version if instance_version != 1 else None
# Transfer the environment from the original job to this dependent
# job, so they use the same environment
metadata_path, roothless_metadata_path = \
self._create_metadata_path(instance)
self._create_metadata_path(instance)
environment = RREnvList({
"AVALON_PROJECT": legacy_io.Session["AVALON_PROJECT"],
@ -96,7 +136,7 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
# and collect all pre_ids to wait for
job_environ = {}
jobs_pre_ids = []
for job in instance["rrJobs"]: # type: RRJob
for job in instance.data["rrJobs"]: # type: RRJob
if job.rrEnvList:
job_environ.update(
dict(RREnvList.parse(job.rrEnvList))
@ -159,11 +199,4 @@ class CreatePublishRoyalRenderJob(InstancePlugin):
else:
job.WaitForPreIDs += jobs_pre_ids
self.log.info("Creating RoyalRender Publish job ...")
if not instance.data.get("rrJobs"):
self.log.error("There is no RoyalRender job on the instance.")
raise KnownPublishError(
"Can't create publish job without producing jobs")
instance.data["rrJobs"] += job
return job

View file

@ -9,6 +9,17 @@ from collections import namedtuple, OrderedDict
CustomAttribute = namedtuple("CustomAttribute", ["name", "value"])
def get_rr_platform():
# type: () -> str
"""Returns name of platform used in rr jobs."""
if sys.platform.lower() in ["win32", "win64"]:
return "windows"
elif sys.platform.lower() == "darwin":
return "mac"
else:
return "linux"
class RREnvList(dict):
def serialize(self):
# <rrEnvList>VariableA=ValueA~~~VariableB=ValueB</rrEnvList>
@ -163,17 +174,6 @@ class RRJob:
# only used in RR 8.3 and newer
rrEnvList = attr.ib(default=None) # type: str
@staticmethod
def get_rr_platform():
# type: () -> str
"""Returns name of platform used in rr jobs."""
if sys.platform.lower() in ["win32", "win64"]:
return "windows"
elif sys.platform.lower() == "darwin":
return "mac"
else:
return "linux"
class SubmitterParameter:
"""Wrapper for Submitter Parameters."""

View file

@ -11,10 +11,12 @@ from openpype.lib import Logger
import attr
import pyblish.api
from openpype.pipeline.publish import KnownPublishError
from openpype.pipeline.farm.patterning import match_aov_pattern
import os
import clique
from copy import deepcopy
import re
import warnings
@attr.s
@ -263,6 +265,7 @@ def create_skeleton_instance(
return instance_skeleton_data
def _solve_families(families):
"""Solve families.
@ -277,7 +280,9 @@ def _solve_families(families):
if "review" not in families:
families.append("review")
return families
def create_instances_for_aov(instance, skeleton):
def create_instances_for_aov(instance, skeleton, aov_filter):
"""Create instances from AOVs.
This will create new pyblish.api.Instances by going over expected
@ -328,11 +333,12 @@ def create_instances_for_aov(instance, skeleton):
return _create_instances_for_aov(
instance,
skeleton,
aov_filter,
additional_color_data
)
def _create_instances_for_aov(instance, skeleton, additional_data):
def _create_instances_for_aov(instance, skeleton, aov_filter, additional_data):
"""Create instance for each AOV found.
This will create new instance for every AOV it can detect in expected
@ -491,35 +497,64 @@ def _create_instances_for_aov(instance, skeleton, additional_data):
log.debug("instances:{}".format(instances))
return instances
def get_resources(project_name, version, extension=None):
"""Get the files from the specific version."""
# TODO this functions seems to be weird
# - it's looking for representation with one extension or first (any)
# representation from a version?
# - not sure how this should work, maybe it does for specific use cases
# but probably can't be used for all resources from 2D workflows
extensions = None
def get_resources(project_name, version, extension=None):
"""Get the files from the specific version.
This will return all get all files from representation.
Todo:
This is really weird function, and it's use is
highly controversial. First, it will not probably work
ar all in final release of AYON, second, the logic isn't sound.
It should try to find representation matching the current one -
because it is used to pull out files from previous version to
be included in this one.
.. deprecated:: 3.15.5
This won't work in AYON and even the logic must be refactored.
Args:
project_name (str): Name of the project.
version (dict): Version document.
extension (str): extension used to filter
representations.
Returns:
list: of files
"""
warnings.warn((
"This won't work in AYON and even "
"the logic must be refactored."), DeprecationWarning)
extensions = []
if extension:
extensions = [extension]
repre_docs = list(get_representations(
project_name, version_ids=[version["_id"]], extensions=extensions
))
assert repre_docs, "This is a bug"
representation = repre_docs[0]
# there is a `context_filter` argument that won't probably work in
# final release of AYON. SO we'll rather not use it
repre_docs = list(get_representations(
project_name, version_ids=[version["_id"]]))
filtered = []
for doc in repre_docs:
if doc["context"]["ext"] in extensions:
filtered.append(doc)
representation = filtered[0]
directory = get_representation_path(representation)
print("Source: ", directory)
resources = sorted(
[
os.path.normpath(os.path.join(directory, fname))
for fname in os.listdir(directory)
os.path.normpath(os.path.join(directory, file_name))
for file_name in os.listdir(directory)
]
)
return resources
def copy_extend_frames(self, instance, representation):
def copy_extend_frames(instance, representation):
"""Copy existing frames from latest version.
This will copy all existing frames from subset's latest version back
@ -533,11 +568,15 @@ def copy_extend_frames(self, instance, representation):
"""
import speedcopy
R_FRAME_NUMBER = re.compile(
r".+\.(?P<frame>[0-9]+)\..+")
log = Logger.get_logger("farm_publishing")
log.info("Preparing to copy ...")
start = instance.data.get("frameStart")
end = instance.data.get("frameEnd")
project_name = instance.context.data["project"]
anatomy = instance.data["anatomy"] # type: Anatomy
# get latest version of subset
# this will stop if subset wasn't published yet
@ -554,7 +593,7 @@ def copy_extend_frames(self, instance, representation):
)
r_col, _ = clique.assemble(subset_resources)
# if override remove all frames we are expecting to be rendered
# if override remove all frames we are expecting to be rendered,
# so we'll copy only those missing from current render
if instance.data.get("overrideExistingFrame"):
for frame in range(start, end + 1):
@ -568,18 +607,18 @@ def copy_extend_frames(self, instance, representation):
resource_files = []
r_filename = os.path.basename(
representation.get("files")[0]) # first file
op = re.search(self.R_FRAME_NUMBER, r_filename)
op = re.search(R_FRAME_NUMBER, r_filename)
pre = r_filename[:op.start("frame")]
post = r_filename[op.end("frame"):]
assert op is not None, "padding string wasn't found"
for frame in list(r_col):
fn = re.search(self.R_FRAME_NUMBER, frame)
fn = re.search(R_FRAME_NUMBER, frame)
# silencing linter as we need to compare to True, not to
# type
assert fn is not None, "padding string wasn't found"
# list of tuples (source, destination)
staging = representation.get("stagingDir")
staging = self.anatomy.fill_root(staging)
staging = anatomy.fill_root(staging)
resource_files.append(
(frame, os.path.join(
staging, "{}{}{}".format(pre, fn["frame"], post)))
@ -596,3 +635,34 @@ def copy_extend_frames(self, instance, representation):
log.info(" > {}".format(source[1]))
log.info("Finished copying %i files" % len(resource_files))
def attach_instances_to_subset(attach_to, instances):
"""Attach instance to subset.
If we are attaching to other subsets, create copy of existing
instances, change data to match its subset and replace
existing instances with modified data.
Args:
attach_to (list): List of instances to attach to.
instances (list): List of instances to attach.
Returns:
list: List of attached instances.
"""
#
new_instances = []
for attach_instance in attach_to:
for i in instances:
new_inst = copy(i)
new_inst["version"] = attach_instance.get("version")
new_inst["subset"] = attach_instance.get("subset")
new_inst["family"] = attach_instance.get("family")
new_inst["append"] = True
# don't set subsetGroup if we are attaching
new_inst.pop("subsetGroup")
new_instances.append(new_inst)
return new_instances

View file

@ -20,4 +20,5 @@ def extend_frames(asset: str, subset: str, start: int, end: int) -> Tuple[int, i
def get_time_data_from_instance_or_context(instance: pyblish.api.Instance) -> TimeData: ...
def get_transferable_representations(instance: pyblish.api.Instance) -> list: ...
def create_skeleton_instance(instance: pyblish.api.Instance, families_transfer: list = ..., instance_transfer: dict = ...) -> dict: ...
def create_instances_for_aov(instance: pyblish.api.Instance, skeleton: dict) -> List[pyblish.api.Instance]: ...
def create_instances_for_aov(instance: pyblish.api.Instance, skeleton: dict, aov_filter: dict) -> List[pyblish.api.Instance]: ...
def attach_instances_to_subset(attach_to: list, instances: list) -> list: ...