diff --git a/openpype/modules/royalrender/plugins/publish/create_nuke_deadline_job.py b/openpype/modules/royalrender/plugins/publish/create_nuke_deadline_job.py new file mode 100644 index 0000000000..9f49294459 --- /dev/null +++ b/openpype/modules/royalrender/plugins/publish/create_nuke_deadline_job.py @@ -0,0 +1,307 @@ +# -*- coding: utf-8 -*- +"""Submitting render job to RoyalRender.""" +import os +import sys +import re +import platform +from datetime import datetime + +from pyblish.api import InstancePlugin, IntegratorOrder, Context +from openpype.tests.lib import is_in_tests +from openpype.lib import is_running_from_build +from openpype.pipeline.publish.lib import get_published_workfile_instance +from openpype.pipeline.publish import KnownPublishError +from openpype.modules.royalrender.api import Api as rrApi +from openpype.modules.royalrender.rr_job import ( + RRJob, CustomAttribute, get_rr_platform) +from openpype.lib import ( + is_running_from_build, + BoolDef, + NumberDef +) + + +class CreateNukeRoyalRenderJob(InstancePlugin): + label = "Create Nuke Render job in RR" + order = IntegratorOrder + 0.1 + hosts = ["nuke"] + families = ["render", "prerender"] + targets = ["local"] + optional = True + + priority = 50 + chunk_size = 1 + concurrent_tasks = 1 + + @classmethod + def get_attribute_defs(cls): + return [ + NumberDef( + "priority", + label="Priority", + default=cls.priority, + decimals=0 + ), + NumberDef( + "chunk", + label="Frames Per Task", + default=cls.chunk_size, + decimals=0, + minimum=1, + maximum=1000 + ), + NumberDef( + "concurrency", + label="Concurrency", + default=cls.concurrent_tasks, + decimals=0, + minimum=1, + maximum=10 + ), + BoolDef( + "use_gpu", + default=cls.use_gpu, + label="Use GPU" + ), + BoolDef( + "suspend_publish", + default=False, + label="Suspend publish" + ) + ] + + def __init__(self, *args, **kwargs): + self._instance = None + self._rr_root = None + self.scene_path = None + self.job = None + self.submission_parameters = None + self.rr_api = None + + def process(self, instance): + if not instance.data.get("farm"): + self.log.info("Skipping local instance.") + return + + instance.data["attributeValues"] = self.get_attr_values_from_data( + instance.data) + + # add suspend_publish attributeValue to instance data + instance.data["suspend_publish"] = instance.data["attributeValues"][ + "suspend_publish"] + + context = instance.context + + self._rr_root = self._resolve_rr_path(context, instance.data.get( + "rrPathName")) # noqa + self.log.debug(self._rr_root) + if not self._rr_root: + raise KnownPublishError( + ("Missing RoyalRender root. " + "You need to configure RoyalRender module.")) + + self.rr_api = rrApi(self._rr_root) + + self.scene_path = context.data["currentFile"] + if self.use_published: + file_path = get_published_workfile_instance(context) + + # fallback if nothing was set + if not file_path: + self.log.warning("Falling back to workfile") + file_path = context.data["currentFile"] + + self.scene_path = file_path + self.log.info( + "Using published scene for render {}".format(self.scene_path) + ) + + if not self._instance.data.get("expectedFiles"): + self._instance.data["expectedFiles"] = [] + + if not self._instance.data.get("rrJobs"): + self._instance.data["rrJobs"] = [] + + self._instance.data["rrJobs"] += self.create_jobs() + + # redefinition of families + if "render" in self._instance.data["family"]: + self._instance.data["family"] = "write" + self._instance.data["families"].insert(0, "render2d") + elif "prerender" in self._instance.data["family"]: + self._instance.data["family"] = "write" + self._instance.data["families"].insert(0, "prerender") + + self._instance.data["outputDir"] = os.path.dirname( + self._instance.data["path"]).replace("\\", "/") + + + def create_jobs(self): + submit_frame_start = int(self._instance.data["frameStartHandle"]) + submit_frame_end = int(self._instance.data["frameEndHandle"]) + + # get output path + render_path = self._instance.data['path'] + script_path = self.scene_path + node = self._instance.data["transientData"]["node"] + + # main job + jobs = [ + self.get_job( + script_path, + render_path, + node.name(), + submit_frame_start, + submit_frame_end, + ) + ] + + for baking_script in self._instance.data.get("bakingNukeScripts", []): + render_path = baking_script["bakeRenderPath"] + script_path = baking_script["bakeScriptPath"] + exe_node_name = baking_script["bakeWriteNodeName"] + + jobs.append(self.get_job( + script_path, + render_path, + exe_node_name, + submit_frame_start, + submit_frame_end + )) + + return jobs + + def get_job(self, script_path, render_path, + node_name, start_frame, end_frame): + """Get RR job based on current instance. + + Args: + script_path (str): Path to Nuke script. + render_path (str): Output path. + node_name (str): Name of the render node. + start_frame (int): Start frame. + end_frame (int): End frame. + + Returns: + RRJob: RoyalRender Job instance. + + """ + render_dir = os.path.normpath(os.path.dirname(render_path)) + batch_name = os.path.basename(script_path) + jobname = "%s - %s" % (batch_name, self._instance.name) + if is_in_tests(): + batch_name += datetime.now().strftime("%d%m%Y%H%M%S") + + output_filename_0 = self.preview_fname(render_path) + + custom_attributes = [] + if is_running_from_build(): + custom_attributes = [ + CustomAttribute( + name="OpenPypeVersion", + value=os.environ.get("OPENPYPE_VERSION")) + ] + + nuke_version = re.search( + r"\d+\.\d+", self._instance.context.data.get("hostVersion")) + + # this will append expected files to instance as needed. + expected_files = self.expected_files( + render_path, start_frame, end_frame) + first_file = next(self._iter_expected_files(expected_files)) + + job = RRJob( + Software="Nuke", + Renderer="", + SeqStart=int(start_frame), + SeqEnd=int(end_frame), + SeqStep=int(self._instance.data.get("byFrameStep"), 1), + SeqFileOffset=0, + Version=nuke_version.group(), + SceneName=script_path, + IsActive=True, + ImageDir=render_dir.replace("\\", "/"), + ImageFilename="{}.".format(os.path.splitext(first_file)[0]), + ImageExtension=os.path.splitext(first_file)[1], + ImagePreNumberLetter=".", + ImageSingleOutputFile=False, + SceneOS=get_rr_platform(), + Layer=node_name, + SceneDatabaseDir=script_path, + CustomSHotName=self._instance.context.data["asset"], + CompanyProjectName=self._instance.context.data["projectName"], + ImageWidth=self._instance.data["resolutionWidth"], + ImageHeight=self._instance.data["resolutionHeight"], + CustomAttributes=custom_attributes + ) + + @staticmethod + def _resolve_rr_path(context, rr_path_name): + # type: (Context, str) -> str + rr_settings = ( + context.data + ["system_settings"] + ["modules"] + ["royalrender"] + ) + try: + default_servers = rr_settings["rr_paths"] + project_servers = ( + context.data + ["project_settings"] + ["royalrender"] + ["rr_paths"] + ) + rr_servers = { + k: default_servers[k] + for k in project_servers + if k in default_servers + } + + except (AttributeError, KeyError): + # Handle situation were we had only one url for royal render. + return context.data["defaultRRPath"][platform.system().lower()] + + return rr_servers[rr_path_name][platform.system().lower()] + + def expected_files(self, path, start_frame, end_frame): + """Get expected files. + + This function generate expected files from provided + path and start/end frames. + + It was taken from Deadline module, but this should be + probably handled better in collector to support more + flexible scenarios. + + Args: + path (str): Output path. + start_frame (int): Start frame. + end_frame (int): End frame. + + Returns: + list: List of expected files. + + """ + dir_name = os.path.dirname(path) + file = os.path.basename(path) + + expected_files = [] + + if "#" in file: + pparts = file.split("#") + padding = "%0{}d".format(len(pparts) - 1) + file = pparts[0] + padding + pparts[-1] + + if "%" not in file: + expected_files.append(path) + return + + if self._instance.data.get("slate"): + start_frame -= 1 + + expected_files.extend( + os.path.join(dir_name, (file % i)).replace("\\", "/") + for i in range(start_frame, (end_frame + 1)) + ) + return expected_files diff --git a/openpype/modules/royalrender/plugins/publish/create_publish_royalrender_job.py b/openpype/modules/royalrender/plugins/publish/create_publish_royalrender_job.py index a5493dd061..b1c84c87b9 100644 --- a/openpype/modules/royalrender/plugins/publish/create_publish_royalrender_job.py +++ b/openpype/modules/royalrender/plugins/publish/create_publish_royalrender_job.py @@ -2,14 +2,20 @@ """Create publishing job on RoyalRender.""" import os from copy import deepcopy +import json -from pyblish.api import InstancePlugin, IntegratorOrder +from pyblish.api import InstancePlugin, IntegratorOrder, Instance from openpype.pipeline import legacy_io from openpype.modules.royalrender.rr_job import RRJob, RREnvList from openpype.pipeline.publish import KnownPublishError from openpype.lib.openpype_version import ( get_OpenPypeVersion, get_openpype_version) +from openpype.pipeline.farm.pyblish import ( + create_skeleton_instance, + create_instances_for_aov, + attach_instances_to_subset +) class CreatePublishRoyalRenderJob(InstancePlugin): @@ -31,50 +37,84 @@ class CreatePublishRoyalRenderJob(InstancePlugin): self.context = context self.anatomy = instance.context.data["anatomy"] - # asset = data.get("asset") - # subset = data.get("subset") - # source = self._remap_source( - # data.get("source") or context.data["source"]) + if not instance.data.get("farm"): + self.log.info("Skipping local instance.") + return + + instance_skeleton_data = create_skeleton_instance( + instance, + families_transfer=self.families_transfer, + instance_transfer=self.instance_transfer) + + instances = None + if isinstance(instance.data.get("expectedFiles")[0], dict): + instances = create_instances_for_aov( + instance, instance_skeleton_data, self.aov_filter) - def _remap_source(self, source): - success, rootless_path = ( - self.anatomy.find_root_template_from_path(source) - ) - if success: - source = rootless_path else: - # `rootless_path` is not set to `source` if none of roots match - self.log.warning(( - "Could not find root path for remapping \"{}\"." - " This may cause issues." - ).format(source)) - return source + representations = self._get_representations( + instance_skeleton_data, + instance.data.get("expectedFiles") + ) - def get_job(self, instance, job, instances): - """Submit publish job to RoyalRender.""" + if "representations" not in instance_skeleton_data.keys(): + instance_skeleton_data["representations"] = [] + + # add representation + instance_skeleton_data["representations"] += representations + instances = [instance_skeleton_data] + + # attach instances to subset + if instance.data.get("attachTo"): + instances = attach_instances_to_subset( + instance.data.get("attachTo"), instances + ) + + self.log.info("Creating RoyalRender Publish job ...") + + if not instance.data.get("rrJobs"): + self.log.error(("There is no prior RoyalRender " + "job on the instance.")) + raise KnownPublishError( + "Can't create publish job without prior ppducing jobs first") + + publish_job = self.get_job(instance, instances) + + instance.data["rrJobs"] += publish_job + + metadata_path, rootless_metadata_path = self._create_metadata_path( + instance) + + self.log.info("Writing json file: {}".format(metadata_path)) + with open(metadata_path, "w") as f: + json.dump(publish_job, f, indent=4, sort_keys=True) + + def get_job(self, instance, instances): + """Create RR publishing job. + + Based on provided original instance and additional instances, + create publishing job and return it to be submitted to farm. + + Args: + instance (Instance): Original instance. + instances (list of Instance): List of instances to + be published on farm. + + Returns: + RRJob: RoyalRender publish job. + + """ data = instance.data.copy() subset = data["subset"] job_name = "Publish - {subset}".format(subset=subset) - override_version = None instance_version = instance.data.get("version") # take this if exists - if instance_version != 1: - override_version = instance_version - output_dir = self._get_publish_folder( - instance.context.data['anatomy'], - deepcopy(instance.data["anatomyData"]), - instance.data.get("asset"), - instances[0]["subset"], - # TODO: this shouldn't be hardcoded and is in fact settable by - # Settings. - 'render', - override_version - ) + override_version = instance_version if instance_version != 1 else None # Transfer the environment from the original job to this dependent # job, so they use the same environment metadata_path, roothless_metadata_path = \ - self._create_metadata_path(instance) + self._create_metadata_path(instance) environment = RREnvList({ "AVALON_PROJECT": legacy_io.Session["AVALON_PROJECT"], @@ -96,7 +136,7 @@ class CreatePublishRoyalRenderJob(InstancePlugin): # and collect all pre_ids to wait for job_environ = {} jobs_pre_ids = [] - for job in instance["rrJobs"]: # type: RRJob + for job in instance.data["rrJobs"]: # type: RRJob if job.rrEnvList: job_environ.update( dict(RREnvList.parse(job.rrEnvList)) @@ -159,11 +199,4 @@ class CreatePublishRoyalRenderJob(InstancePlugin): else: job.WaitForPreIDs += jobs_pre_ids - self.log.info("Creating RoyalRender Publish job ...") - - if not instance.data.get("rrJobs"): - self.log.error("There is no RoyalRender job on the instance.") - raise KnownPublishError( - "Can't create publish job without producing jobs") - - instance.data["rrJobs"] += job + return job diff --git a/openpype/modules/royalrender/rr_job.py b/openpype/modules/royalrender/rr_job.py index 689a488a5c..5f034e74a1 100644 --- a/openpype/modules/royalrender/rr_job.py +++ b/openpype/modules/royalrender/rr_job.py @@ -9,6 +9,17 @@ from collections import namedtuple, OrderedDict CustomAttribute = namedtuple("CustomAttribute", ["name", "value"]) +def get_rr_platform(): + # type: () -> str + """Returns name of platform used in rr jobs.""" + if sys.platform.lower() in ["win32", "win64"]: + return "windows" + elif sys.platform.lower() == "darwin": + return "mac" + else: + return "linux" + + class RREnvList(dict): def serialize(self): # VariableA=ValueA~~~VariableB=ValueB @@ -163,17 +174,6 @@ class RRJob: # only used in RR 8.3 and newer rrEnvList = attr.ib(default=None) # type: str - @staticmethod - def get_rr_platform(): - # type: () -> str - """Returns name of platform used in rr jobs.""" - if sys.platform.lower() in ["win32", "win64"]: - return "windows" - elif sys.platform.lower() == "darwin": - return "mac" - else: - return "linux" - class SubmitterParameter: """Wrapper for Submitter Parameters.""" diff --git a/openpype/pipeline/farm/pyblish.py b/openpype/pipeline/farm/pyblish.py index 35a944444f..e5ebd3666c 100644 --- a/openpype/pipeline/farm/pyblish.py +++ b/openpype/pipeline/farm/pyblish.py @@ -11,10 +11,12 @@ from openpype.lib import Logger import attr import pyblish.api from openpype.pipeline.publish import KnownPublishError +from openpype.pipeline.farm.patterning import match_aov_pattern import os import clique from copy import deepcopy import re +import warnings @attr.s @@ -263,6 +265,7 @@ def create_skeleton_instance( return instance_skeleton_data + def _solve_families(families): """Solve families. @@ -277,7 +280,9 @@ def _solve_families(families): if "review" not in families: families.append("review") return families -def create_instances_for_aov(instance, skeleton): + + +def create_instances_for_aov(instance, skeleton, aov_filter): """Create instances from AOVs. This will create new pyblish.api.Instances by going over expected @@ -328,11 +333,12 @@ def create_instances_for_aov(instance, skeleton): return _create_instances_for_aov( instance, skeleton, + aov_filter, additional_color_data ) -def _create_instances_for_aov(instance, skeleton, additional_data): +def _create_instances_for_aov(instance, skeleton, aov_filter, additional_data): """Create instance for each AOV found. This will create new instance for every AOV it can detect in expected @@ -491,35 +497,64 @@ def _create_instances_for_aov(instance, skeleton, additional_data): log.debug("instances:{}".format(instances)) return instances -def get_resources(project_name, version, extension=None): - """Get the files from the specific version.""" - # TODO this functions seems to be weird - # - it's looking for representation with one extension or first (any) - # representation from a version? - # - not sure how this should work, maybe it does for specific use cases - # but probably can't be used for all resources from 2D workflows - extensions = None +def get_resources(project_name, version, extension=None): + """Get the files from the specific version. + + This will return all get all files from representation. + + Todo: + This is really weird function, and it's use is + highly controversial. First, it will not probably work + ar all in final release of AYON, second, the logic isn't sound. + It should try to find representation matching the current one - + because it is used to pull out files from previous version to + be included in this one. + + .. deprecated:: 3.15.5 + This won't work in AYON and even the logic must be refactored. + + Args: + project_name (str): Name of the project. + version (dict): Version document. + extension (str): extension used to filter + representations. + + Returns: + list: of files + + """ + warnings.warn(( + "This won't work in AYON and even " + "the logic must be refactored."), DeprecationWarning) + extensions = [] if extension: extensions = [extension] - repre_docs = list(get_representations( - project_name, version_ids=[version["_id"]], extensions=extensions - )) - assert repre_docs, "This is a bug" - representation = repre_docs[0] + # there is a `context_filter` argument that won't probably work in + # final release of AYON. SO we'll rather not use it + repre_docs = list(get_representations( + project_name, version_ids=[version["_id"]])) + + filtered = [] + for doc in repre_docs: + if doc["context"]["ext"] in extensions: + filtered.append(doc) + + representation = filtered[0] directory = get_representation_path(representation) print("Source: ", directory) resources = sorted( [ - os.path.normpath(os.path.join(directory, fname)) - for fname in os.listdir(directory) + os.path.normpath(os.path.join(directory, file_name)) + for file_name in os.listdir(directory) ] ) return resources -def copy_extend_frames(self, instance, representation): + +def copy_extend_frames(instance, representation): """Copy existing frames from latest version. This will copy all existing frames from subset's latest version back @@ -533,11 +568,15 @@ def copy_extend_frames(self, instance, representation): """ import speedcopy + R_FRAME_NUMBER = re.compile( + r".+\.(?P[0-9]+)\..+") + log = Logger.get_logger("farm_publishing") log.info("Preparing to copy ...") start = instance.data.get("frameStart") end = instance.data.get("frameEnd") project_name = instance.context.data["project"] + anatomy = instance.data["anatomy"] # type: Anatomy # get latest version of subset # this will stop if subset wasn't published yet @@ -554,7 +593,7 @@ def copy_extend_frames(self, instance, representation): ) r_col, _ = clique.assemble(subset_resources) - # if override remove all frames we are expecting to be rendered + # if override remove all frames we are expecting to be rendered, # so we'll copy only those missing from current render if instance.data.get("overrideExistingFrame"): for frame in range(start, end + 1): @@ -568,18 +607,18 @@ def copy_extend_frames(self, instance, representation): resource_files = [] r_filename = os.path.basename( representation.get("files")[0]) # first file - op = re.search(self.R_FRAME_NUMBER, r_filename) + op = re.search(R_FRAME_NUMBER, r_filename) pre = r_filename[:op.start("frame")] post = r_filename[op.end("frame"):] assert op is not None, "padding string wasn't found" for frame in list(r_col): - fn = re.search(self.R_FRAME_NUMBER, frame) + fn = re.search(R_FRAME_NUMBER, frame) # silencing linter as we need to compare to True, not to # type assert fn is not None, "padding string wasn't found" # list of tuples (source, destination) staging = representation.get("stagingDir") - staging = self.anatomy.fill_root(staging) + staging = anatomy.fill_root(staging) resource_files.append( (frame, os.path.join( staging, "{}{}{}".format(pre, fn["frame"], post))) @@ -596,3 +635,34 @@ def copy_extend_frames(self, instance, representation): log.info(" > {}".format(source[1])) log.info("Finished copying %i files" % len(resource_files)) + + +def attach_instances_to_subset(attach_to, instances): + """Attach instance to subset. + + If we are attaching to other subsets, create copy of existing + instances, change data to match its subset and replace + existing instances with modified data. + + Args: + attach_to (list): List of instances to attach to. + instances (list): List of instances to attach. + + Returns: + list: List of attached instances. + + """ + # + + new_instances = [] + for attach_instance in attach_to: + for i in instances: + new_inst = copy(i) + new_inst["version"] = attach_instance.get("version") + new_inst["subset"] = attach_instance.get("subset") + new_inst["family"] = attach_instance.get("family") + new_inst["append"] = True + # don't set subsetGroup if we are attaching + new_inst.pop("subsetGroup") + new_instances.append(new_inst) + return new_instances diff --git a/openpype/pipeline/farm/pyblish.pyi b/openpype/pipeline/farm/pyblish.pyi index 3667f2d8a5..76f7c34dcd 100644 --- a/openpype/pipeline/farm/pyblish.pyi +++ b/openpype/pipeline/farm/pyblish.pyi @@ -20,4 +20,5 @@ def extend_frames(asset: str, subset: str, start: int, end: int) -> Tuple[int, i def get_time_data_from_instance_or_context(instance: pyblish.api.Instance) -> TimeData: ... def get_transferable_representations(instance: pyblish.api.Instance) -> list: ... def create_skeleton_instance(instance: pyblish.api.Instance, families_transfer: list = ..., instance_transfer: dict = ...) -> dict: ... -def create_instances_for_aov(instance: pyblish.api.Instance, skeleton: dict) -> List[pyblish.api.Instance]: ... +def create_instances_for_aov(instance: pyblish.api.Instance, skeleton: dict, aov_filter: dict) -> List[pyblish.api.Instance]: ... +def attach_instances_to_subset(attach_to: list, instances: list) -> list: ...