mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 21:04:40 +01:00
591 lines
19 KiB
Python
591 lines
19 KiB
Python
import os
|
|
import random
|
|
import string
|
|
|
|
import nuke
|
|
|
|
import avalon.api
|
|
|
|
from openpype.api import get_current_project_settings
|
|
from openpype.pipeline import LegacyCreator
|
|
from .lib import (
|
|
Knobby,
|
|
check_subsetname_exists,
|
|
reset_selection,
|
|
maintained_selection,
|
|
set_avalon_knob_data,
|
|
add_publish_knob
|
|
)
|
|
|
|
|
|
class OpenPypeCreator(LegacyCreator):
|
|
"""Pype Nuke Creator class wrapper"""
|
|
node_color = "0xdfea5dff"
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(OpenPypeCreator, self).__init__(*args, **kwargs)
|
|
self.presets = get_current_project_settings()["nuke"]["create"].get(
|
|
self.__class__.__name__, {}
|
|
)
|
|
if check_subsetname_exists(
|
|
nuke.allNodes(),
|
|
self.data["subset"]):
|
|
msg = ("The subset name `{0}` is already used on a node in"
|
|
"this workfile.".format(self.data["subset"]))
|
|
self.log.error(msg + "\n\nPlease use other subset name!")
|
|
raise NameError("`{0}: {1}".format(__name__, msg))
|
|
return
|
|
|
|
def process(self):
|
|
from nukescripts import autoBackdrop
|
|
|
|
instance = None
|
|
|
|
if (self.options or {}).get("useSelection"):
|
|
|
|
nodes = nuke.selectedNodes()
|
|
if not nodes:
|
|
nuke.message("Please select nodes that you "
|
|
"wish to add to a container")
|
|
return
|
|
|
|
elif len(nodes) == 1:
|
|
# only one node is selected
|
|
instance = nodes[0]
|
|
|
|
if not instance:
|
|
# Not using selection or multiple nodes selected
|
|
bckd_node = autoBackdrop()
|
|
bckd_node["tile_color"].setValue(int(self.node_color, 16))
|
|
bckd_node["note_font_size"].setValue(24)
|
|
bckd_node["label"].setValue("[{}]".format(self.name))
|
|
|
|
instance = bckd_node
|
|
|
|
# add avalon knobs
|
|
set_avalon_knob_data(instance, self.data)
|
|
add_publish_knob(instance)
|
|
|
|
return instance
|
|
|
|
|
|
def get_review_presets_config():
|
|
settings = get_current_project_settings()
|
|
review_profiles = (
|
|
settings["global"]
|
|
["publish"]
|
|
["ExtractReview"]
|
|
["profiles"]
|
|
)
|
|
|
|
outputs = {}
|
|
for profile in review_profiles:
|
|
outputs.update(profile.get("outputs", {}))
|
|
|
|
return [str(name) for name, _prop in outputs.items()]
|
|
|
|
|
|
class NukeLoader(avalon.api.Loader):
|
|
container_id_knob = "containerId"
|
|
container_id = None
|
|
|
|
def reset_container_id(self):
|
|
self.container_id = "".join(random.choice(
|
|
string.ascii_uppercase + string.digits) for _ in range(10))
|
|
|
|
def get_container_id(self, node):
|
|
id_knob = node.knobs().get(self.container_id_knob)
|
|
return id_knob.value() if id_knob else None
|
|
|
|
def get_members(self, source):
|
|
"""Return nodes that has same "containerId" as `source`"""
|
|
source_id = self.get_container_id(source)
|
|
return [node for node in nuke.allNodes(recurseGroups=True)
|
|
if self.get_container_id(node) == source_id
|
|
and node is not source] if source_id else []
|
|
|
|
def set_as_member(self, node):
|
|
source_id = self.get_container_id(node)
|
|
|
|
if source_id:
|
|
node[self.container_id_knob].setValue(source_id)
|
|
else:
|
|
HIDEN_FLAG = 0x00040000
|
|
_knob = Knobby(
|
|
"String_Knob",
|
|
self.container_id,
|
|
flags=[
|
|
nuke.READ_ONLY,
|
|
HIDEN_FLAG
|
|
])
|
|
knob = _knob.create(self.container_id_knob)
|
|
node.addKnob(knob)
|
|
|
|
def clear_members(self, parent_node):
|
|
members = self.get_members(parent_node)
|
|
|
|
dependent_nodes = None
|
|
for node in members:
|
|
_depndc = [n for n in node.dependent() if n not in members]
|
|
if not _depndc:
|
|
continue
|
|
|
|
dependent_nodes = _depndc
|
|
break
|
|
|
|
for member in members:
|
|
self.log.info("removing node: `{}".format(member.name()))
|
|
nuke.delete(member)
|
|
|
|
return dependent_nodes
|
|
|
|
|
|
class ExporterReview(object):
|
|
"""
|
|
Base class object for generating review data from Nuke
|
|
|
|
Args:
|
|
klass (pyblish.plugin): pyblish plugin parent
|
|
instance (pyblish.instance): instance of pyblish context
|
|
|
|
"""
|
|
data = None
|
|
publish_on_farm = False
|
|
|
|
def __init__(self,
|
|
klass,
|
|
instance,
|
|
multiple_presets=True
|
|
):
|
|
|
|
self.log = klass.log
|
|
self.instance = instance
|
|
self.multiple_presets = multiple_presets
|
|
self.path_in = self.instance.data.get("path", None)
|
|
self.staging_dir = self.instance.data["stagingDir"]
|
|
self.collection = self.instance.data.get("collection", None)
|
|
self.data = dict({
|
|
"representations": list()
|
|
})
|
|
|
|
def get_file_info(self):
|
|
if self.collection:
|
|
self.log.debug("Collection: `{}`".format(self.collection))
|
|
# get path
|
|
self.fname = os.path.basename(self.collection.format(
|
|
"{head}{padding}{tail}"))
|
|
self.fhead = self.collection.format("{head}")
|
|
|
|
# get first and last frame
|
|
self.first_frame = min(self.collection.indexes)
|
|
self.last_frame = max(self.collection.indexes)
|
|
if "slate" in self.instance.data["families"]:
|
|
self.first_frame += 1
|
|
else:
|
|
self.fname = os.path.basename(self.path_in)
|
|
self.fhead = os.path.splitext(self.fname)[0] + "."
|
|
self.first_frame = self.instance.data.get("frameStartHandle", None)
|
|
self.last_frame = self.instance.data.get("frameEndHandle", None)
|
|
|
|
if "#" in self.fhead:
|
|
self.fhead = self.fhead.replace("#", "")[:-1]
|
|
|
|
def get_representation_data(self, tags=None, range=False):
|
|
add_tags = tags or []
|
|
repre = {
|
|
"name": self.name,
|
|
"ext": self.ext,
|
|
"files": self.file,
|
|
"stagingDir": self.staging_dir,
|
|
"tags": [self.name.replace("_", "-")] + add_tags
|
|
}
|
|
|
|
if range:
|
|
repre.update({
|
|
"frameStart": self.first_frame,
|
|
"frameEnd": self.last_frame,
|
|
})
|
|
|
|
if self.multiple_presets:
|
|
repre["outputName"] = self.name
|
|
|
|
if self.publish_on_farm:
|
|
repre["tags"].append("publish_on_farm")
|
|
|
|
self.data["representations"].append(repre)
|
|
|
|
def get_view_input_process_node(self):
|
|
"""
|
|
Will get any active view process.
|
|
|
|
Arguments:
|
|
self (class): in object definition
|
|
|
|
Returns:
|
|
nuke.Node: copy node of Input Process node
|
|
"""
|
|
reset_selection()
|
|
ipn_orig = None
|
|
for v in nuke.allNodes(filter="Viewer"):
|
|
ip = v["input_process"].getValue()
|
|
ipn = v["input_process_node"].getValue()
|
|
if "VIEWER_INPUT" not in ipn and ip:
|
|
ipn_orig = nuke.toNode(ipn)
|
|
ipn_orig.setSelected(True)
|
|
|
|
if ipn_orig:
|
|
# copy selected to clipboard
|
|
nuke.nodeCopy("%clipboard%")
|
|
# reset selection
|
|
reset_selection()
|
|
# paste node and selection is on it only
|
|
nuke.nodePaste("%clipboard%")
|
|
# assign to variable
|
|
ipn = nuke.selectedNode()
|
|
|
|
return ipn
|
|
|
|
def get_imageio_baking_profile(self):
|
|
from . import lib as opnlib
|
|
nuke_imageio = opnlib.get_nuke_imageio_settings()
|
|
|
|
# TODO: this is only securing backward compatibility lets remove
|
|
# this once all projects's anotomy are updated to newer config
|
|
if "baking" in nuke_imageio.keys():
|
|
return nuke_imageio["baking"]["viewerProcess"]
|
|
else:
|
|
return nuke_imageio["viewer"]["viewerProcess"]
|
|
|
|
|
|
|
|
|
|
class ExporterReviewLut(ExporterReview):
|
|
"""
|
|
Generator object for review lut from Nuke
|
|
|
|
Args:
|
|
klass (pyblish.plugin): pyblish plugin parent
|
|
instance (pyblish.instance): instance of pyblish context
|
|
|
|
|
|
"""
|
|
_temp_nodes = []
|
|
|
|
def __init__(self,
|
|
klass,
|
|
instance,
|
|
name=None,
|
|
ext=None,
|
|
cube_size=None,
|
|
lut_size=None,
|
|
lut_style=None,
|
|
multiple_presets=True):
|
|
# initialize parent class
|
|
super(ExporterReviewLut, self).__init__(
|
|
klass, instance, multiple_presets)
|
|
|
|
# deal with now lut defined in viewer lut
|
|
if hasattr(klass, "viewer_lut_raw"):
|
|
self.viewer_lut_raw = klass.viewer_lut_raw
|
|
else:
|
|
self.viewer_lut_raw = False
|
|
|
|
self.name = name or "baked_lut"
|
|
self.ext = ext or "cube"
|
|
self.cube_size = cube_size or 32
|
|
self.lut_size = lut_size or 1024
|
|
self.lut_style = lut_style or "linear"
|
|
|
|
# set frame start / end and file name to self
|
|
self.get_file_info()
|
|
|
|
self.log.info("File info was set...")
|
|
|
|
self.file = self.fhead + self.name + ".{}".format(self.ext)
|
|
self.path = os.path.join(
|
|
self.staging_dir, self.file).replace("\\", "/")
|
|
|
|
def clean_nodes(self):
|
|
for node in self._temp_nodes:
|
|
nuke.delete(node)
|
|
self._temp_nodes = []
|
|
self.log.info("Deleted nodes...")
|
|
|
|
def generate_lut(self):
|
|
bake_viewer_process = kwargs["bake_viewer_process"]
|
|
bake_viewer_input_process_node = kwargs[
|
|
"bake_viewer_input_process"]
|
|
|
|
# ---------- start nodes creation
|
|
|
|
# CMSTestPattern
|
|
cms_node = nuke.createNode("CMSTestPattern")
|
|
cms_node["cube_size"].setValue(self.cube_size)
|
|
# connect
|
|
self._temp_nodes.append(cms_node)
|
|
self.previous_node = cms_node
|
|
self.log.debug("CMSTestPattern... `{}`".format(self._temp_nodes))
|
|
|
|
if bake_viewer_process:
|
|
# Node View Process
|
|
if bake_viewer_input_process_node:
|
|
ipn = self.get_view_input_process_node()
|
|
if ipn is not None:
|
|
# connect
|
|
ipn.setInput(0, self.previous_node)
|
|
self._temp_nodes.append(ipn)
|
|
self.previous_node = ipn
|
|
self.log.debug(
|
|
"ViewProcess... `{}`".format(self._temp_nodes))
|
|
|
|
if not self.viewer_lut_raw:
|
|
# OCIODisplay
|
|
dag_node = nuke.createNode("OCIODisplay")
|
|
# connect
|
|
dag_node.setInput(0, self.previous_node)
|
|
self._temp_nodes.append(dag_node)
|
|
self.previous_node = dag_node
|
|
self.log.debug("OCIODisplay... `{}`".format(self._temp_nodes))
|
|
|
|
# GenerateLUT
|
|
gen_lut_node = nuke.createNode("GenerateLUT")
|
|
gen_lut_node["file"].setValue(self.path)
|
|
gen_lut_node["file_type"].setValue(".{}".format(self.ext))
|
|
gen_lut_node["lut1d"].setValue(self.lut_size)
|
|
gen_lut_node["style1d"].setValue(self.lut_style)
|
|
# connect
|
|
gen_lut_node.setInput(0, self.previous_node)
|
|
self._temp_nodes.append(gen_lut_node)
|
|
self.log.debug("GenerateLUT... `{}`".format(self._temp_nodes))
|
|
|
|
# ---------- end nodes creation
|
|
|
|
# Export lut file
|
|
nuke.execute(
|
|
gen_lut_node.name(),
|
|
int(self.first_frame),
|
|
int(self.first_frame))
|
|
|
|
self.log.info("Exported...")
|
|
|
|
# ---------- generate representation data
|
|
self.get_representation_data()
|
|
|
|
self.log.debug("Representation... `{}`".format(self.data))
|
|
|
|
# ---------- Clean up
|
|
self.clean_nodes()
|
|
|
|
return self.data
|
|
|
|
|
|
class ExporterReviewMov(ExporterReview):
|
|
"""
|
|
Metaclass for generating review mov files
|
|
|
|
Args:
|
|
klass (pyblish.plugin): pyblish plugin parent
|
|
instance (pyblish.instance): instance of pyblish context
|
|
|
|
"""
|
|
_temp_nodes = {}
|
|
|
|
def __init__(self,
|
|
klass,
|
|
instance,
|
|
name=None,
|
|
ext=None,
|
|
multiple_presets=True
|
|
):
|
|
# initialize parent class
|
|
super(ExporterReviewMov, self).__init__(
|
|
klass, instance, multiple_presets)
|
|
# passing presets for nodes to self
|
|
self.nodes = klass.nodes if hasattr(klass, "nodes") else {}
|
|
|
|
# deal with now lut defined in viewer lut
|
|
self.viewer_lut_raw = klass.viewer_lut_raw
|
|
self.write_colorspace = instance.data["colorspace"]
|
|
|
|
self.name = name or "baked"
|
|
self.ext = ext or "mov"
|
|
|
|
# set frame start / end and file name to self
|
|
self.get_file_info()
|
|
|
|
self.log.info("File info was set...")
|
|
|
|
self.file = self.fhead + self.name + ".{}".format(self.ext)
|
|
self.path = os.path.join(
|
|
self.staging_dir, self.file).replace("\\", "/")
|
|
|
|
def clean_nodes(self, node_name):
|
|
for node in self._temp_nodes[node_name]:
|
|
nuke.delete(node)
|
|
self._temp_nodes[node_name] = []
|
|
self.log.info("Deleted nodes...")
|
|
|
|
def render(self, render_node_name):
|
|
self.log.info("Rendering... ")
|
|
# Render Write node
|
|
nuke.execute(
|
|
render_node_name,
|
|
int(self.first_frame),
|
|
int(self.last_frame))
|
|
|
|
self.log.info("Rendered...")
|
|
|
|
def save_file(self):
|
|
import shutil
|
|
with maintained_selection():
|
|
self.log.info("Saving nodes as file... ")
|
|
# create nk path
|
|
path = os.path.splitext(self.path)[0] + ".nk"
|
|
# save file to the path
|
|
shutil.copyfile(self.instance.context.data["currentFile"], path)
|
|
|
|
self.log.info("Nodes exported...")
|
|
return path
|
|
|
|
def generate_mov(self, farm=False, **kwargs):
|
|
self.publish_on_farm = farm
|
|
reformat_node_add = kwargs["reformat_node_add"]
|
|
reformat_node_config = kwargs["reformat_node_config"]
|
|
bake_viewer_process = kwargs["bake_viewer_process"]
|
|
bake_viewer_input_process_node = kwargs[
|
|
"bake_viewer_input_process"]
|
|
viewer_process_override = kwargs[
|
|
"viewer_process_override"]
|
|
|
|
baking_view_profile = (
|
|
viewer_process_override or self.get_imageio_baking_profile())
|
|
|
|
fps = self.instance.context.data["fps"]
|
|
|
|
self.log.debug(">> baking_view_profile `{}`".format(
|
|
baking_view_profile))
|
|
|
|
add_tags = kwargs.get("add_tags", [])
|
|
|
|
self.log.info(
|
|
"__ add_tags: `{0}`".format(add_tags))
|
|
|
|
subset = self.instance.data["subset"]
|
|
self._temp_nodes[subset] = []
|
|
# ---------- start nodes creation
|
|
|
|
# Read node
|
|
r_node = nuke.createNode("Read")
|
|
r_node["file"].setValue(self.path_in)
|
|
r_node["first"].setValue(self.first_frame)
|
|
r_node["origfirst"].setValue(self.first_frame)
|
|
r_node["last"].setValue(self.last_frame)
|
|
r_node["origlast"].setValue(self.last_frame)
|
|
r_node["colorspace"].setValue(self.write_colorspace)
|
|
|
|
# connect
|
|
self._temp_nodes[subset].append(r_node)
|
|
self.previous_node = r_node
|
|
self.log.debug("Read... `{}`".format(self._temp_nodes[subset]))
|
|
|
|
# add reformat node
|
|
if reformat_node_add:
|
|
# append reformated tag
|
|
add_tags.append("reformated")
|
|
|
|
rf_node = nuke.createNode("Reformat")
|
|
for kn_conf in reformat_node_config:
|
|
_type = kn_conf["type"]
|
|
k_name = str(kn_conf["name"])
|
|
k_value = kn_conf["value"]
|
|
|
|
# to remove unicode as nuke doesn't like it
|
|
if _type == "string":
|
|
k_value = str(kn_conf["value"])
|
|
|
|
rf_node[k_name].setValue(k_value)
|
|
|
|
# connect
|
|
rf_node.setInput(0, self.previous_node)
|
|
self._temp_nodes[subset].append(rf_node)
|
|
self.previous_node = rf_node
|
|
self.log.debug(
|
|
"Reformat... `{}`".format(self._temp_nodes[subset]))
|
|
|
|
# only create colorspace baking if toggled on
|
|
if bake_viewer_process:
|
|
if bake_viewer_input_process_node:
|
|
# View Process node
|
|
ipn = self.get_view_input_process_node()
|
|
if ipn is not None:
|
|
# connect
|
|
ipn.setInput(0, self.previous_node)
|
|
self._temp_nodes[subset].append(ipn)
|
|
self.previous_node = ipn
|
|
self.log.debug(
|
|
"ViewProcess... `{}`".format(
|
|
self._temp_nodes[subset]))
|
|
|
|
if not self.viewer_lut_raw:
|
|
# OCIODisplay
|
|
dag_node = nuke.createNode("OCIODisplay")
|
|
dag_node["view"].setValue(str(baking_view_profile))
|
|
|
|
# connect
|
|
dag_node.setInput(0, self.previous_node)
|
|
self._temp_nodes[subset].append(dag_node)
|
|
self.previous_node = dag_node
|
|
self.log.debug("OCIODisplay... `{}`".format(
|
|
self._temp_nodes[subset]))
|
|
|
|
# Write node
|
|
write_node = nuke.createNode("Write")
|
|
self.log.debug("Path: {}".format(self.path))
|
|
write_node["file"].setValue(str(self.path))
|
|
write_node["file_type"].setValue(str(self.ext))
|
|
|
|
# Knobs `meta_codec` and `mov64_codec` are not available on centos.
|
|
# TODO shouldn't this come from settings on outputs?
|
|
try:
|
|
write_node["meta_codec"].setValue("ap4h")
|
|
except Exception:
|
|
self.log.info("`meta_codec` knob was not found")
|
|
|
|
try:
|
|
write_node["mov64_codec"].setValue("ap4h")
|
|
write_node["mov64_fps"].setValue(float(fps))
|
|
except Exception:
|
|
self.log.info("`mov64_codec` knob was not found")
|
|
|
|
write_node["mov64_write_timecode"].setValue(1)
|
|
write_node["raw"].setValue(1)
|
|
# connect
|
|
write_node.setInput(0, self.previous_node)
|
|
self._temp_nodes[subset].append(write_node)
|
|
self.log.debug("Write... `{}`".format(self._temp_nodes[subset]))
|
|
# ---------- end nodes creation
|
|
|
|
# ---------- render or save to nk
|
|
if self.publish_on_farm:
|
|
nuke.scriptSave()
|
|
path_nk = self.save_file()
|
|
self.data.update({
|
|
"bakeScriptPath": path_nk,
|
|
"bakeWriteNodeName": write_node.name(),
|
|
"bakeRenderPath": self.path
|
|
})
|
|
else:
|
|
self.render(write_node.name())
|
|
|
|
# ---------- generate representation data
|
|
self.get_representation_data(
|
|
tags=["review", "delete"] + add_tags,
|
|
range=True
|
|
)
|
|
|
|
self.log.debug("Representation... `{}`".format(self.data))
|
|
|
|
self.clean_nodes(subset)
|
|
nuke.scriptSave()
|
|
|
|
return self.data
|