import os from pprint import pformat import re import json import six import functools import warnings import platform import tempfile import contextlib from collections import OrderedDict import nuke from qtpy import QtCore, QtWidgets from openpype.client import ( get_project, get_asset_by_name, get_versions, get_last_versions, get_representations, ) from openpype.host import HostDirmap from openpype.tools.utils import host_tools from openpype.lib import ( env_value_to_bool, Logger, get_version_from_path, ) from openpype.settings import ( get_project_settings, get_current_project_settings, ) from openpype.modules import ModulesManager from openpype.pipeline.template_data import get_template_data_with_names from openpype.pipeline import ( discover_legacy_creator_plugins, legacy_io, Anatomy, ) from openpype.pipeline.context_tools import ( get_current_project_asset, get_custom_workfile_template_from_session ) from openpype.pipeline.colorspace import ( get_imageio_config ) from openpype.pipeline.workfile import BuildWorkfile from . import gizmo_menu from .constants import ASSIST from .workio import ( save_file, open_file ) log = Logger.get_logger(__name__) _NODE_TAB_NAME = "{}".format(os.getenv("AVALON_LABEL") or "Avalon") AVALON_LABEL = os.getenv("AVALON_LABEL") or "Avalon" AVALON_TAB = "{}".format(AVALON_LABEL) AVALON_DATA_GROUP = "{}DataGroup".format(AVALON_LABEL.capitalize()) EXCLUDED_KNOB_TYPE_ON_READ = ( 20, # Tab Knob 26, # Text Knob (But for backward compatibility, still be read # if value is not an empty string.) ) JSON_PREFIX = "JSON:::" ROOT_DATA_KNOB = "publish_context" INSTANCE_DATA_KNOB = "publish_instance" class DeprecatedWarning(DeprecationWarning): pass def deprecated(new_destination): """Mark functions as deprecated. It will result in a warning being emitted when the function is used. """ func = None if callable(new_destination): func = new_destination new_destination = None def _decorator(decorated_func): if new_destination is None: warning_message = ( " Please check content of deprecated function to figure out" " possible replacement." ) else: warning_message = " Please replace your usage with '{}'.".format( new_destination ) @functools.wraps(decorated_func) def wrapper(*args, **kwargs): warnings.simplefilter("always", DeprecatedWarning) warnings.warn( ( "Call to deprecated function '{}'" "\nFunction was moved or removed.{}" ).format(decorated_func.__name__, warning_message), category=DeprecatedWarning, stacklevel=4 ) return decorated_func(*args, **kwargs) return wrapper if func is None: return _decorator return _decorator(func) class Context: main_window = None context_label = None project_name = os.getenv("AVALON_PROJECT") # Workfile related code workfiles_launched = False workfiles_tool_timer = None # Seems unused _project_doc = None def get_main_window(): """Acquire Nuke's main window""" if Context.main_window is None: top_widgets = QtWidgets.QApplication.topLevelWidgets() name = "Foundry::UI::DockMainWindow" for widget in top_widgets: if ( widget.inherits("QMainWindow") and widget.metaObject().className() == name ): Context.main_window = widget break return Context.main_window def set_node_data(node, knobname, data): """Write data to node invisible knob Will create new in case it doesnt exists or update the one already created. Args: node (nuke.Node): node object knobname (str): knob name data (dict): data to be stored in knob """ # if exists then update data if knobname in node.knobs(): log.debug("Updating knobname `{}` on node `{}`".format( knobname, node.name() )) update_node_data(node, knobname, data) return log.debug("Creating knobname `{}` on node `{}`".format( knobname, node.name() )) # else create new knob_value = JSON_PREFIX + json.dumps(data) knob = nuke.String_Knob(knobname) knob.setValue(knob_value) knob.setFlag(nuke.INVISIBLE) node.addKnob(knob) def get_node_data(node, knobname): """Read data from node. Args: node (nuke.Node): node object knobname (str): knob name Returns: dict: data stored in knob """ if knobname not in node.knobs(): return rawdata = node[knobname].getValue() if ( isinstance(rawdata, six.string_types) and rawdata.startswith(JSON_PREFIX) ): try: return json.loads(rawdata[len(JSON_PREFIX):]) except json.JSONDecodeError: return def update_node_data(node, knobname, data): """Update already present data. Args: node (nuke.Node): node object knobname (str): knob name data (dict): data to update knob value """ knob = node[knobname] node_data = get_node_data(node, knobname) or {} node_data.update(data) knob_value = JSON_PREFIX + json.dumps(node_data) knob.setValue(knob_value) class Knobby(object): """[DEPRECATED] For creating knob which it's type isn't mapped in `create_knobs` Args: type (string): Nuke knob type name value: Value to be set with `Knob.setValue`, put `None` if not required flags (list, optional): Knob flags to be set with `Knob.setFlag` *args: Args other than knob name for initializing knob class """ def __init__(self, type, value, flags=None, *args): self.type = type self.value = value self.flags = flags or [] self.args = args def create(self, name, nice=None): knob_cls = getattr(nuke, self.type) knob = knob_cls(name, nice, *self.args) if self.value is not None: knob.setValue(self.value) for flag in self.flags: knob.setFlag(flag) return knob @staticmethod def nice_naming(key): """Convert camelCase name into UI Display Name""" words = re.findall('[A-Z][^A-Z]*', key[0].upper() + key[1:]) return " ".join(words) def create_knobs(data, tab=None): """Create knobs by data Depending on the type of each dict value and creates the correct Knob. Mapped types: bool: nuke.Boolean_Knob int: nuke.Int_Knob float: nuke.Double_Knob list: nuke.Enumeration_Knob six.string_types: nuke.String_Knob dict: If it's a nested dict (all values are dict), will turn into A tabs group. Or just a knobs group. Args: data (dict): collection of attributes and their value tab (string, optional): Knobs' tab name Returns: list: A list of `nuke.Knob` objects """ def nice_naming(key): """Convert camelCase name into UI Display Name""" words = re.findall('[A-Z][^A-Z]*', key[0].upper() + key[1:]) return " ".join(words) # Turn key-value pairs into knobs knobs = list() if tab: knobs.append(nuke.Tab_Knob(tab)) for key, value in data.items(): # Knob name if isinstance(key, tuple): name, nice = key else: name, nice = key, nice_naming(key) # Create knob by value type if isinstance(value, Knobby): knobby = value knob = knobby.create(name, nice) elif isinstance(value, float): knob = nuke.Double_Knob(name, nice) knob.setValue(value) elif isinstance(value, bool): knob = nuke.Boolean_Knob(name, nice) knob.setValue(value) knob.setFlag(nuke.STARTLINE) elif isinstance(value, int): knob = nuke.Int_Knob(name, nice) knob.setValue(value) elif isinstance(value, six.string_types): knob = nuke.String_Knob(name, nice) knob.setValue(value) elif isinstance(value, list): knob = nuke.Enumeration_Knob(name, nice, value) elif isinstance(value, dict): if all(isinstance(v, dict) for v in value.values()): # Create a group of tabs begain = nuke.BeginTabGroup_Knob() end = nuke.EndTabGroup_Knob() begain.setName(name) end.setName(name + "_End") knobs.append(begain) for k, v in value.items(): knobs += create_knobs(v, tab=k) knobs.append(end) else: # Create a group of knobs knobs.append(nuke.Tab_Knob( name, nice, nuke.TABBEGINCLOSEDGROUP)) knobs += create_knobs(value) knobs.append( nuke.Tab_Knob(name + "_End", nice, nuke.TABENDGROUP)) continue else: raise TypeError("Unsupported type: %r" % type(value)) knobs.append(knob) return knobs def imprint(node, data, tab=None): """Store attributes with value on node Parse user data into Node knobs. Use `collections.OrderedDict` to ensure knob order. Args: node(nuke.Node): node object from Nuke data(dict): collection of attributes and their value Returns: None Examples: ``` import nuke from openpype.hosts.nuke.api import lib node = nuke.createNode("NoOp") data = { # Regular type of attributes "myList": ["x", "y", "z"], "myBool": True, "myFloat": 0.1, "myInt": 5, # Creating non-default imprint type of knob "MyFilePath": lib.Knobby("File_Knob", "/file/path"), "divider": lib.Knobby("Text_Knob", ""), # Manual nice knob naming ("my_knob", "Nice Knob Name"): "some text", # dict type will be created as knob group "KnobGroup": { "knob1": 5, "knob2": "hello", "knob3": ["a", "b"], }, # Nested dict will be created as tab group "TabGroup": { "tab1": {"count": 5}, "tab2": {"isGood": True}, "tab3": {"direction": ["Left", "Right"]}, }, } lib.imprint(node, data, tab="Demo") ``` """ for knob in create_knobs(data, tab): node.addKnob(knob) @deprecated def add_publish_knob(node): """[DEPRECATED] Add Publish knob to node Arguments: node (nuke.Node): nuke node to be processed Returns: node (nuke.Node): processed nuke node """ if "publish" not in node.knobs(): body = OrderedDict() body[("divd", "Publishing")] = Knobby("Text_Knob", '') body["publish"] = True imprint(node, body) return node @deprecated def set_avalon_knob_data(node, data=None, prefix="avalon:"): """[DEPRECATED] Sets data into nodes's avalon knob Arguments: node (nuke.Node): Nuke node to imprint with data, data (dict, optional): Data to be imprinted into AvalonTab prefix (str, optional): filtering prefix Returns: node (nuke.Node) Examples: data = { 'asset': 'sq020sh0280', 'family': 'render', 'subset': 'subsetMain' } """ data = data or dict() create = OrderedDict() tab_name = AVALON_TAB editable = ["asset", "subset", "name", "namespace"] existed_knobs = node.knobs() for key, value in data.items(): knob_name = prefix + key gui_name = key if knob_name in existed_knobs: # Set value try: node[knob_name].setValue(value) except TypeError: node[knob_name].setValue(str(value)) else: # New knob name = (knob_name, gui_name) # Hide prefix on GUI if key in editable: create[name] = value else: create[name] = Knobby("String_Knob", str(value), flags=[nuke.READ_ONLY]) if tab_name in existed_knobs: tab_name = None else: tab = OrderedDict() warn = Knobby("Text_Knob", "Warning! Do not change following data!") divd = Knobby("Text_Knob", "") head = [ (("warn", ""), warn), (("divd", ""), divd), ] tab[AVALON_DATA_GROUP] = OrderedDict(head + list(create.items())) create = tab imprint(node, create, tab=tab_name) return node @deprecated def get_avalon_knob_data(node, prefix="avalon:", create=True): """[DEPRECATED] Gets a data from nodes's avalon knob Arguments: node (obj): Nuke node to search for data, prefix (str, optional): filtering prefix Returns: data (dict) """ # check if lists if not isinstance(prefix, list): prefix = list([prefix]) data = dict() # loop prefix for p in prefix: # check if the node is avalon tracked if AVALON_TAB not in node.knobs(): continue try: # check if data available on the node test = node[AVALON_DATA_GROUP].value() log.debug("Only testing if data avalable: `{}`".format(test)) except NameError as e: # if it doesn't then create it log.debug("Creating avalon knob: `{}`".format(e)) if create: node = set_avalon_knob_data(node) return get_avalon_knob_data(node) else: return {} # get data from filtered knobs data.update({k.replace(p, ''): node[k].value() for k in node.knobs().keys() if p in k}) return data @deprecated def fix_data_for_node_create(data): """[DEPRECATED] Fixing data to be used for nuke knobs """ for k, v in data.items(): if isinstance(v, six.text_type): data[k] = str(v) if str(v).startswith("0x"): data[k] = int(v, 16) return data @deprecated def add_write_node_legacy(name, **kwarg): """[DEPRECATED] Adding nuke write node Arguments: name (str): nuke node name kwarg (attrs): data for nuke knobs Returns: node (obj): nuke write node """ use_range_limit = kwarg.get("use_range_limit", None) w = nuke.createNode( "Write", "name {}".format(name)) w["file"].setValue(kwarg["file"]) for k, v in kwarg.items(): if "frame_range" in k: continue log.info([k, v]) try: w[k].setValue(v) except KeyError as e: log.debug(e) continue if use_range_limit: w["use_limit"].setValue(True) w["first"].setValue(kwarg["frame_range"][0]) w["last"].setValue(kwarg["frame_range"][1]) return w def add_write_node(name, file_path, knobs, **kwarg): """Adding nuke write node Arguments: name (str): nuke node name kwarg (attrs): data for nuke knobs Returns: node (obj): nuke write node """ use_range_limit = kwarg.get("use_range_limit", None) w = nuke.createNode( "Write", "name {}".format(name)) w["file"].setValue(file_path) # finally add knob overrides set_node_knobs_from_settings(w, knobs, **kwarg) if use_range_limit: w["use_limit"].setValue(True) w["first"].setValue(kwarg["frame_range"][0]) w["last"].setValue(kwarg["frame_range"][1]) return w def read_avalon_data(node): """Return user-defined knobs from given `node` Args: node (nuke.Node): Nuke node object Returns: list: A list of nuke.Knob object """ def compat_prefixed(knob_name): if knob_name.startswith("avalon:"): return knob_name[len("avalon:"):] elif knob_name.startswith("ak:"): return knob_name[len("ak:"):] data = dict() pattern = ("(?<=addUserKnob {)" "([0-9]*) (\\S*)" # Matching knob type and knob name "(?=[ |}])") tcl_script = node.writeKnobs(nuke.WRITE_USER_KNOB_DEFS) result = re.search(pattern, tcl_script) if result: first_user_knob = result.group(2) # Collect user knobs from the end of the knob list for knob in reversed(node.allKnobs()): knob_name = knob.name() if not knob_name: # Ignore unnamed knob continue knob_type = nuke.knob(knob.fullyQualifiedName(), type=True) value = knob.value() if ( knob_type not in EXCLUDED_KNOB_TYPE_ON_READ or # For compating read-only string data that imprinted # by `nuke.Text_Knob`. (knob_type == 26 and value) ): key = compat_prefixed(knob_name) if key is not None: data[key] = value if knob_name == first_user_knob: break return data def get_node_path(path, padding=4): """Get filename for the Nuke write with padded number as '#' Arguments: path (str): The path to render to. Returns: tuple: head, padding, tail (extension) Examples: >>> get_frame_path("test.exr") ('test', 4, '.exr') >>> get_frame_path("filename.#####.tif") ('filename.', 5, '.tif') >>> get_frame_path("foobar##.tif") ('foobar', 2, '.tif') >>> get_frame_path("foobar_%08d.tif") ('foobar_', 8, '.tif') """ filename, ext = os.path.splitext(path) # Find a final number group if '%' in filename: match = re.match('.*?(%[0-9]+d)$', filename) if match: padding = int(match.group(1).replace('%', '').replace('d', '')) # remove number from end since fusion # will swap it with the frame number filename = filename.replace(match.group(1), '') elif '#' in filename: match = re.match('.*?(#+)$', filename) if match: padding = len(match.group(1)) # remove number from end since fusion # will swap it with the frame number filename = filename.replace(match.group(1), '') return filename, padding, ext def get_nuke_imageio_settings(): return get_project_settings(Context.project_name)["nuke"]["imageio"] @deprecated("openpype.hosts.nuke.api.lib.get_nuke_imageio_settings") def get_created_node_imageio_setting_legacy(nodeclass, creator, subset): '''[DEPRECATED] Get preset data for dataflow (fileType, compression, bitDepth) ''' assert any([creator, nodeclass]), nuke.message( "`{}`: Missing mandatory kwargs `host`, `cls`".format(__file__)) imageio_nodes = get_nuke_imageio_settings()["nodes"] required_nodes = imageio_nodes["requiredNodes"] # HACK: for backward compatibility this needs to be optional override_nodes = imageio_nodes.get("overrideNodes", []) imageio_node = None for node in required_nodes: log.info(node) if ( nodeclass in node["nukeNodeClass"] and creator in node["plugins"] ): imageio_node = node break log.debug("__ imageio_node: {}".format(imageio_node)) # find matching override node override_imageio_node = None for onode in override_nodes: log.info(onode) if nodeclass not in node["nukeNodeClass"]: continue if creator not in node["plugins"]: continue if ( onode["subsets"] and not any( re.search(s.lower(), subset.lower()) for s in onode["subsets"] ) ): continue override_imageio_node = onode break log.debug("__ override_imageio_node: {}".format(override_imageio_node)) # add overrides to imageio_node if override_imageio_node: # get all knob names in imageio_node knob_names = [k["name"] for k in imageio_node["knobs"]] for oknob in override_imageio_node["knobs"]: for knob in imageio_node["knobs"]: # override matching knob name if oknob["name"] == knob["name"]: log.debug( "_ overriding knob: `{}` > `{}`".format( knob, oknob )) if not oknob["value"]: # remove original knob if no value found in oknob imageio_node["knobs"].remove(knob) else: # override knob value with oknob's knob["value"] = oknob["value"] # add missing knobs into imageio_node if oknob["name"] not in knob_names: log.debug( "_ adding knob: `{}`".format(oknob)) imageio_node["knobs"].append(oknob) knob_names.append(oknob["name"]) log.info("ImageIO node: {}".format(imageio_node)) return imageio_node def get_imageio_node_setting(node_class, plugin_name, subset): ''' Get preset data for dataflow (fileType, compression, bitDepth) ''' imageio_nodes = get_nuke_imageio_settings()["nodes"] required_nodes = imageio_nodes["requiredNodes"] imageio_node = None for node in required_nodes: log.info(node) if ( node_class in node["nukeNodeClass"] and plugin_name in node["plugins"] ): imageio_node = node break log.debug("__ imageio_node: {}".format(imageio_node)) if not imageio_node: return # find overrides and update knobs with them get_imageio_node_override_setting( node_class, plugin_name, subset, imageio_node["knobs"] ) log.info("ImageIO node: {}".format(imageio_node)) return imageio_node def get_imageio_node_override_setting( node_class, plugin_name, subset, knobs_settings ): ''' Get imageio node overrides from settings ''' imageio_nodes = get_nuke_imageio_settings()["nodes"] override_nodes = imageio_nodes["overrideNodes"] # find matching override node override_imageio_node = None for onode in override_nodes: log.debug("__ onode: {}".format(onode)) log.debug("__ subset: {}".format(subset)) if node_class not in onode["nukeNodeClass"]: continue if plugin_name not in onode["plugins"]: continue if ( onode["subsets"] and not any( re.search(s.lower(), subset.lower()) for s in onode["subsets"] ) ): continue override_imageio_node = onode break log.debug("__ override_imageio_node: {}".format(override_imageio_node)) # add overrides to imageio_node if override_imageio_node: # get all knob names in imageio_node knob_names = [k["name"] for k in knobs_settings] for oknob in override_imageio_node["knobs"]: for knob in knobs_settings: # override matching knob name if oknob["name"] == knob["name"]: log.debug( "_ overriding knob: `{}` > `{}`".format( knob, oknob )) if not oknob["value"]: # remove original knob if no value found in oknob knobs_settings.remove(knob) else: # override knob value with oknob's knob["value"] = oknob["value"] # add missing knobs into imageio_node if oknob["name"] not in knob_names: log.debug( "_ adding knob: `{}`".format(oknob)) knobs_settings.append(oknob) knob_names.append(oknob["name"]) return knobs_settings def get_imageio_input_colorspace(filename): ''' Get input file colorspace based on regex in settings. ''' imageio_regex_inputs = ( get_nuke_imageio_settings()["regexInputs"]["inputs"]) preset_clrsp = None for regexInput in imageio_regex_inputs: if bool(re.search(regexInput["regex"], filename)): preset_clrsp = str(regexInput["colorspace"]) return preset_clrsp def get_view_process_node(): reset_selection() ipn_node = None for v_ in nuke.allNodes(filter="Viewer"): ipn = v_['input_process_node'].getValue() ipn_node = nuke.toNode(ipn) # skip if no input node is set if not ipn: continue if ipn == "VIEWER_INPUT" and not ipn_node: # since it is set by default we can ignore it # nobody usually use this but use it if # it exists in nodes continue if not ipn_node: # in case a Viewer node is transfered from # different workfile with old values raise NameError(( "Input process node name '{}' set in " "Viewer '{}' is does't exists in nodes" ).format(ipn, v_.name())) ipn_node.setSelected(True) if ipn_node: return duplicate_node(ipn_node) def on_script_load(): ''' Callback for ffmpeg support ''' if nuke.env["LINUX"]: nuke.tcl('load ffmpegReader') nuke.tcl('load ffmpegWriter') else: nuke.tcl('load movReader') nuke.tcl('load movWriter') def check_inventory_versions(): """ Actual version idetifier of Loaded containers Any time this function is run it will check all nodes and filter only Loader nodes for its version. It will get all versions from database and check if the node is having actual version. If not then it will color it to red. """ from .pipeline import parse_container # get all Loader nodes by avalon attribute metadata node_with_repre_id = [] repre_ids = set() # Find all containers and collect it's node and representation ids for node in nuke.allNodes(): container = parse_container(node) if container: node = nuke.toNode(container["objectName"]) avalon_knob_data = read_avalon_data(node) repre_id = avalon_knob_data["representation"] repre_ids.add(repre_id) node_with_repre_id.append((node, repre_id)) # Skip if nothing was found if not repre_ids: return project_name = legacy_io.active_project() # Find representations based on found containers repre_docs = get_representations( project_name, representation_ids=repre_ids, fields=["_id", "parent"] ) # Store representations by id and collect version ids repre_docs_by_id = {} version_ids = set() for repre_doc in repre_docs: # Use stringed representation id to match value in containers repre_id = str(repre_doc["_id"]) repre_docs_by_id[repre_id] = repre_doc version_ids.add(repre_doc["parent"]) version_docs = get_versions( project_name, version_ids, fields=["_id", "name", "parent"] ) # Store versions by id and collect subset ids version_docs_by_id = {} subset_ids = set() for version_doc in version_docs: version_docs_by_id[version_doc["_id"]] = version_doc subset_ids.add(version_doc["parent"]) # Query last versions based on subset ids last_versions_by_subset_id = get_last_versions( project_name, subset_ids=subset_ids, fields=["_id", "parent"] ) # Loop through collected container nodes and their representation ids for item in node_with_repre_id: # Some python versions of nuke can't unfold tuple in for loop node, repre_id = item repre_doc = repre_docs_by_id.get(repre_id) # Failsafe for not finding the representation. if not repre_doc: log.warning(( "Could not find the representation on node \"{}\"" ).format(node.name())) continue version_id = repre_doc["parent"] version_doc = version_docs_by_id.get(version_id) if not version_doc: log.warning(( "Could not find the version on node \"{}\"" ).format(node.name())) continue # Get last version based on subset id subset_id = version_doc["parent"] last_version = last_versions_by_subset_id[subset_id] # Check if last version is same as current version if last_version["_id"] == version_doc["_id"]: color_value = "0x4ecd25ff" else: color_value = "0xd84f20ff" node["tile_color"].setValue(int(color_value, 16)) def writes_version_sync(): ''' Callback synchronizing version of publishable write nodes ''' try: rootVersion = get_version_from_path(nuke.root().name()) padding = len(rootVersion) new_version = "v" + str("{" + ":0>{}".format(padding) + "}").format( int(rootVersion) ) log.debug("new_version: {}".format(new_version)) except Exception: return for each in nuke.allNodes(filter="Write"): # check if the node is avalon tracked if _NODE_TAB_NAME not in each.knobs(): continue avalon_knob_data = read_avalon_data(each) try: if avalon_knob_data["families"] not in ["render"]: log.debug(avalon_knob_data["families"]) continue node_file = each["file"].value() node_version = "v" + get_version_from_path(node_file) log.debug("node_version: {}".format(node_version)) node_new_file = node_file.replace(node_version, new_version) each["file"].setValue(node_new_file) if not os.path.isdir(os.path.dirname(node_new_file)): log.warning("Path does not exist! I am creating it.") os.makedirs(os.path.dirname(node_new_file)) except Exception as e: log.warning( "Write node: `{}` has no version in path: {}".format( each.name(), e)) def version_up_script(): ''' Raising working script's version ''' import nukescripts nukescripts.script_and_write_nodes_version_up() def check_subsetname_exists(nodes, subset_name): """ Checking if node is not already created to secure there is no duplicity Arguments: nodes (list): list of nuke.Node objects subset_name (str): name we try to find Returns: bool: True of False """ return next((True for n in nodes if subset_name in read_avalon_data(n).get("subset", "")), False) def get_render_path(node): ''' Generate Render path from presets regarding avalon knob data ''' avalon_knob_data = read_avalon_data(node) nuke_imageio_writes = get_imageio_node_setting( node_class=avalon_knob_data["families"], plugin_name=avalon_knob_data["creator"], subset=avalon_knob_data["subset"] ) data = { "avalon": avalon_knob_data, "nuke_imageio_writes": nuke_imageio_writes } anatomy_filled = format_anatomy(data) return anatomy_filled["render"]["path"].replace("\\", "/") def format_anatomy(data): ''' Helping function for formatting of anatomy paths Arguments: data (dict): dictionary with attributes used for formatting Return: path (str) ''' anatomy = Anatomy() log.debug("__ anatomy.templates: {}".format(anatomy.templates)) padding = int( anatomy.templates["render"].get( "frame_padding" ) ) version = data.get("version", None) if not version: file = script_name() data["version"] = get_version_from_path(file) project_name = anatomy.project_name asset_name = data["asset"] task_name = data["task"] host_name = os.environ["AVALON_APP"] context_data = get_template_data_with_names( project_name, asset_name, task_name, host_name ) data.update(context_data) data.update({ "subset": data["subset"], "family": data["family"], "frame": "#" * padding, }) return anatomy.format(data) def script_name(): ''' Returns nuke script path ''' return nuke.root().knob("name").value() def add_button_write_to_read(node): name = "createReadNode" label = "Read From Rendered" value = "import write_to_read;\ write_to_read.write_to_read(nuke.thisNode(), allow_relative=False)" knob = nuke.PyScript_Knob(name, label, value) knob.clearFlag(nuke.STARTLINE) node.addKnob(knob) def add_button_clear_rendered(node, path): name = "clearRendered" label = "Clear Rendered" value = "import clear_rendered;\ clear_rendered.clear_rendered(\"{}\")".format(path) knob = nuke.PyScript_Knob(name, label, value) node.addKnob(knob) def create_prenodes( prev_node, nodes_setting, plugin_name=None, subset=None, **kwargs ): last_node = None for_dependency = {} for name, node in nodes_setting.items(): # get attributes nodeclass = node["nodeclass"] knobs = node["knobs"] # create node now_node = nuke.createNode( nodeclass, "name {}".format(name)) now_node.hideControlPanel() # add for dependency linking for_dependency[name] = { "node": now_node, "dependent": node["dependent"] } if all([plugin_name, subset]): # find imageio overrides get_imageio_node_override_setting( now_node.Class(), plugin_name, subset, knobs ) # add data to knob set_node_knobs_from_settings(now_node, knobs, **kwargs) # switch actual node to previous last_node = now_node for _node_name, node_prop in for_dependency.items(): if not node_prop["dependent"]: node_prop["node"].setInput( 0, prev_node) elif node_prop["dependent"] in for_dependency: _prev_node = for_dependency[node_prop["dependent"]]["node"] node_prop["node"].setInput( 0, _prev_node) else: log.warning("Dependency has wrong name of node: {}".format( node_prop )) return last_node def create_write_node( name, data, input=None, prenodes=None, linked_knobs=None, **kwargs ): ''' Creating write node which is group node Arguments: name (str): name of node data (dict): creator write instance data input (node)[optional]: selected node to connect to prenodes (dict)[optional]: nodes to be created before write with dependency review (bool)[optional]: adding review knob farm (bool)[optional]: rendering workflow target kwargs (dict)[optional]: additional key arguments for formatting Example: prenodes = { "nodeName": { "nodeclass": "Reformat", "dependent": [ following_node_01, ... ], "knobs": [ { "type": "text", "name": "knobname", "value": "knob value" }, ... ] }, ... } Return: node (obj): group node with avalon data as Knobs ''' prenodes = prenodes or {} # filtering variables plugin_name = data["creator"] subset = data["subset"] # get knob settings for write node imageio_writes = get_imageio_node_setting( node_class="Write", plugin_name=plugin_name, subset=subset ) for knob in imageio_writes["knobs"]: if knob["name"] == "file_type": ext = knob["value"] data.update({ "imageio_writes": imageio_writes, "ext": ext }) anatomy_filled = format_anatomy(data) # build file path to workfiles fdir = str(anatomy_filled["work"]["folder"]).replace("\\", "/") fpath = data["fpath_template"].format( work=fdir, version=data["version"], subset=data["subset"], frame=data["frame"], ext=ext ) # create directory if not os.path.isdir(os.path.dirname(fpath)): log.warning("Path does not exist! I am creating it.") os.makedirs(os.path.dirname(fpath)) GN = nuke.createNode("Group", "name {}".format(name)) prev_node = None with GN: if input: input_name = str(input.name()).replace(" ", "") # if connected input node was defined prev_node = nuke.createNode( "Input", "name {}".format(input_name)) else: # generic input node connected to nothing prev_node = nuke.createNode( "Input", "name {}".format("rgba")) prev_node.hideControlPanel() # creating pre-write nodes `prenodes` last_prenode = create_prenodes( prev_node, prenodes, plugin_name, subset, **kwargs ) if last_prenode: prev_node = last_prenode # creating write node write_node = now_node = add_write_node( "inside_{}".format(name), fpath, imageio_writes["knobs"], **data ) write_node.hideControlPanel() # connect to previous node now_node.setInput(0, prev_node) # switch actual node to previous prev_node = now_node now_node = nuke.createNode("Output", "name Output1") now_node.hideControlPanel() # connect to previous node now_node.setInput(0, prev_node) # add divider GN.addKnob(nuke.Text_Knob('', 'Rendering')) # Add linked knobs. linked_knob_names = [] # add input linked knobs and create group only if any input if linked_knobs: linked_knob_names.append("_grp-start_") linked_knob_names.extend(linked_knobs) linked_knob_names.append("_grp-end_") linked_knob_names.append("Render") for _k_name in linked_knob_names: if "_grp-start_" in _k_name: knob = nuke.Tab_Knob( "rnd_attr", "Rendering attributes", nuke.TABBEGINCLOSEDGROUP) GN.addKnob(knob) elif "_grp-end_" in _k_name: knob = nuke.Tab_Knob( "rnd_attr_end", "Rendering attributes", nuke.TABENDGROUP) GN.addKnob(knob) else: if "___" in _k_name: # add divider GN.addKnob(nuke.Text_Knob("")) else: # add linked knob by _k_name link = nuke.Link_Knob("") link.makeLink(write_node.name(), _k_name) link.setName(_k_name) # make render if "Render" in _k_name: link.setLabel("Render Local") link.setFlag(0x1000) GN.addKnob(link) # adding write to read button add_button_write_to_read(GN) # adding write to read button add_button_clear_rendered(GN, os.path.dirname(fpath)) GN.addKnob(nuke.Text_Knob('', '')) # set tile color tile_color = next( iter( k["value"] for k in imageio_writes["knobs"] if "tile_color" in k["name"] ), [255, 0, 0, 255] ) GN["tile_color"].setValue( color_gui_to_int(tile_color)) return GN @deprecated("openpype.hosts.nuke.api.lib.create_write_node") def create_write_node_legacy( name, data, input=None, prenodes=None, review=True, linked_knobs=None, farm=True ): ''' Creating write node which is group node Arguments: name (str): name of node data (dict): data to be imprinted input (node): selected node to connect to prenodes (list, optional): list of lists, definitions for nodes to be created before write review (bool): adding review knob Example: prenodes = [ { "nodeName": { "class": "" # string "knobs": [ ("knobName": value), ... ], "dependent": [ following_node_01, ... ] } }, ... ] Return: node (obj): group node with avalon data as Knobs ''' knob_overrides = data.get("knobs", []) nodeclass = data["nodeclass"] creator = data["creator"] subset = data["subset"] imageio_writes = get_created_node_imageio_setting_legacy( nodeclass, creator, subset ) for knob in imageio_writes["knobs"]: if knob["name"] == "file_type": representation = knob["value"] host_name = os.environ.get("AVALON_APP") try: data.update({ "app": host_name, "imageio_writes": imageio_writes, "representation": representation, }) anatomy_filled = format_anatomy(data) except Exception as e: msg = "problem with resolving anatomy template: {}".format(e) log.error(msg) nuke.message(msg) # build file path to workfiles fdir = str(anatomy_filled["work"]["folder"]).replace("\\", "/") fpath = data["fpath_template"].format( work=fdir, version=data["version"], subset=data["subset"], frame=data["frame"], ext=representation ) # create directory if not os.path.isdir(os.path.dirname(fpath)): log.warning("Path does not exist! I am creating it.") os.makedirs(os.path.dirname(fpath)) _data = OrderedDict({ "file": fpath }) # adding dataflow template log.debug("imageio_writes: `{}`".format(imageio_writes)) for knob in imageio_writes["knobs"]: _data[knob["name"]] = knob["value"] _data = fix_data_for_node_create(_data) log.debug("_data: `{}`".format(_data)) if "frame_range" in data.keys(): _data["frame_range"] = data.get("frame_range", None) log.debug("_data[frame_range]: `{}`".format(_data["frame_range"])) GN = nuke.createNode("Group", "name {}".format(name)) prev_node = None with GN: if input: input_name = str(input.name()).replace(" ", "") # if connected input node was defined prev_node = nuke.createNode( "Input", "name {}".format(input_name)) else: # generic input node connected to nothing prev_node = nuke.createNode( "Input", "name {}".format("rgba")) prev_node.hideControlPanel() # creating pre-write nodes `prenodes` if prenodes: for node in prenodes: # get attributes pre_node_name = node["name"] klass = node["class"] knobs = node["knobs"] dependent = node["dependent"] # create node now_node = nuke.createNode( klass, "name {}".format(pre_node_name)) now_node.hideControlPanel() # add data to knob for _knob in knobs: knob, value = _knob try: now_node[knob].value() except NameError: log.warning( "knob `{}` does not exist on node `{}`".format( knob, now_node["name"].value() )) continue if not knob and not value: continue log.info((knob, value)) if isinstance(value, str): if "[" in value: now_node[knob].setExpression(value) else: now_node[knob].setValue(value) # connect to previous node if dependent: if isinstance(dependent, (tuple or list)): for i, node_name in enumerate(dependent): input_node = nuke.createNode( "Input", "name {}".format(node_name)) input_node.hideControlPanel() now_node.setInput(1, input_node) elif isinstance(dependent, str): input_node = nuke.createNode( "Input", "name {}".format(node_name)) input_node.hideControlPanel() now_node.setInput(0, input_node) else: now_node.setInput(0, prev_node) # switch actual node to previous prev_node = now_node # creating write node write_node = now_node = add_write_node_legacy( "inside_{}".format(name), **_data ) write_node.hideControlPanel() # connect to previous node now_node.setInput(0, prev_node) # switch actual node to previous prev_node = now_node now_node = nuke.createNode("Output", "name Output1") now_node.hideControlPanel() # connect to previous node now_node.setInput(0, prev_node) # imprinting group node set_avalon_knob_data(GN, data["avalon"]) add_publish_knob(GN) add_rendering_knobs(GN, farm) if review: add_review_knob(GN) # add divider GN.addKnob(nuke.Text_Knob('', 'Rendering')) # Add linked knobs. linked_knob_names = [] # add input linked knobs and create group only if any input if linked_knobs: linked_knob_names.append("_grp-start_") linked_knob_names.extend(linked_knobs) linked_knob_names.append("_grp-end_") linked_knob_names.append("Render") for _k_name in linked_knob_names: if "_grp-start_" in _k_name: knob = nuke.Tab_Knob( "rnd_attr", "Rendering attributes", nuke.TABBEGINCLOSEDGROUP) GN.addKnob(knob) elif "_grp-end_" in _k_name: knob = nuke.Tab_Knob( "rnd_attr_end", "Rendering attributes", nuke.TABENDGROUP) GN.addKnob(knob) else: if "___" in _k_name: # add divider GN.addKnob(nuke.Text_Knob("")) else: # add linked knob by _k_name link = nuke.Link_Knob("") link.makeLink(write_node.name(), _k_name) link.setName(_k_name) # make render if "Render" in _k_name: link.setLabel("Render Local") link.setFlag(0x1000) GN.addKnob(link) # adding write to read button add_button_write_to_read(GN) # adding write to read button add_button_clear_rendered(GN, os.path.dirname(fpath)) # Deadline tab. add_deadline_tab(GN) # open the our Tab as default GN[_NODE_TAB_NAME].setFlag(0) # set tile color tile_color = _data.get("tile_color", "0xff0000ff") GN["tile_color"].setValue(tile_color) # overrie knob values from settings for knob in knob_overrides: knob_type = knob["type"] knob_name = knob["name"] knob_value = knob["value"] if knob_name not in GN.knobs(): continue if not knob_value: continue # set correctly knob types if knob_type == "string": knob_value = str(knob_value) if knob_type == "number": knob_value = int(knob_value) if knob_type == "decimal_number": knob_value = float(knob_value) if knob_type == "bool": knob_value = bool(knob_value) if knob_type in ["2d_vector", "3d_vector"]: knob_value = list(knob_value) GN[knob_name].setValue(knob_value) return GN def set_node_knobs_from_settings(node, knob_settings, **kwargs): """ Overriding knob values from settings Using `schema_nuke_knob_inputs` for knob type definitions. Args: node (nuke.Node): nuke node knob_settings (list): list of dict. Keys are `type`, `name`, `value` kwargs (dict)[optional]: keys for formatable knob settings """ for knob in knob_settings: log.debug("__ knob: {}".format(pformat(knob))) knob_type = knob["type"] knob_name = knob["name"] if knob_name not in node.knobs(): continue if knob_type == "expression": knob_expression = knob["expression"] node[knob_name].setExpression( knob_expression ) continue # first deal with formatable knob settings if knob_type == "formatable": template = knob["template"] to_type = knob["to_type"] try: _knob_value = template.format( **kwargs ) except KeyError as msg: log.warning("__ msg: {}".format(msg)) raise KeyError(msg) # convert value to correct type if to_type == "2d_vector": knob_value = _knob_value.split(";").split(",") else: knob_value = _knob_value knob_type = to_type else: knob_value = knob["value"] if not knob_value: continue knob_value = convert_knob_value_to_correct_type( knob_type, knob_value) node[knob_name].setValue(knob_value) def convert_knob_value_to_correct_type(knob_type, knob_value): # first convert string types to string # just to ditch unicode if isinstance(knob_value, six.text_type): knob_value = str(knob_value) # set correctly knob types if knob_type == "bool": knob_value = bool(knob_value) elif knob_type == "decimal_number": knob_value = float(knob_value) elif knob_type == "number": knob_value = int(knob_value) elif knob_type == "text": knob_value = knob_value elif knob_type == "color_gui": knob_value = color_gui_to_int(knob_value) elif knob_type in ["2d_vector", "3d_vector", "color"]: knob_value = [float(v) for v in knob_value] return knob_value def color_gui_to_int(color_gui): hex_value = ( "0x{0:0>2x}{1:0>2x}{2:0>2x}{3:0>2x}").format(*color_gui) return int(hex_value, 16) @deprecated def add_rendering_knobs(node, farm=True): ''' Adds additional rendering knobs to given node Arguments: node (obj): nuke node object to be fixed Return: node (obj): with added knobs ''' knob_options = ["Use existing frames", "Local"] if farm: knob_options.append("On farm") if "render" not in node.knobs(): knob = nuke.Enumeration_Knob("render", "", knob_options) knob.clearFlag(nuke.STARTLINE) node.addKnob(knob) return node @deprecated def add_review_knob(node): ''' Adds additional review knob to given node Arguments: node (obj): nuke node object to be fixed Return: node (obj): with added knob ''' if "review" not in node.knobs(): knob = nuke.Boolean_Knob("review", "Review") knob.setValue(True) node.addKnob(knob) return node @deprecated def add_deadline_tab(node): # TODO: remove this as it is only linked to legacy create node.addKnob(nuke.Tab_Knob("Deadline")) knob = nuke.Int_Knob("deadlinePriority", "Priority") knob.setValue(50) node.addKnob(knob) knob = nuke.Int_Knob("deadlineChunkSize", "Chunk Size") knob.setValue(0) node.addKnob(knob) knob = nuke.Int_Knob("deadlineConcurrentTasks", "Concurrent tasks") # zero as default will get value from Settings during collection # instead of being an explicit user override, see precollect_write.py knob.setValue(0) node.addKnob(knob) knob = nuke.Text_Knob("divd", '') knob.setValue('') node.addKnob(knob) knob = nuke.Boolean_Knob("suspend_publish", "Suspend publish") knob.setValue(False) node.addKnob(knob) @deprecated def get_deadline_knob_names(): # TODO: remove this as it is only linked to legacy # validate_write_deadline_tab return [ "Deadline", "deadlineChunkSize", "deadlinePriority", "deadlineConcurrentTasks" ] def create_backdrop(label="", color=None, layer=0, nodes=None): """ Create Backdrop node Arguments: color (str): nuke compatible string with color code layer (int): layer of node usually used (self.pos_layer - 1) label (str): the message nodes (list): list of nodes to be wrapped into backdrop """ assert isinstance(nodes, list), "`nodes` should be a list of nodes" # Calculate bounds for the backdrop node. bdX = min([node.xpos() for node in nodes]) bdY = min([node.ypos() for node in nodes]) bdW = max([node.xpos() + node.screenWidth() for node in nodes]) - bdX bdH = max([node.ypos() + node.screenHeight() for node in nodes]) - bdY # Expand the bounds to leave a little border. Elements are offsets # for left, top, right and bottom edges respectively left, top, right, bottom = (-20, -65, 20, 60) bdX += left bdY += top bdW += (right - left) bdH += (bottom - top) bdn = nuke.createNode("BackdropNode") bdn["z_order"].setValue(layer) if color: bdn["tile_color"].setValue(int(color, 16)) bdn["xpos"].setValue(bdX) bdn["ypos"].setValue(bdY) bdn["bdwidth"].setValue(bdW) bdn["bdheight"].setValue(bdH) if label: bdn["label"].setValue(label) bdn["note_font_size"].setValue(20) return bdn class WorkfileSettings(object): """ All settings for workfile will be set This object is setting all possible root settings to the workfile. Including Colorspace, Frame ranges, Resolution format. It can set it to Root node or to any given node. Arguments: root (node): nuke's root node nodes (list): list of nuke's nodes nodes_filter (list): filtering classes for nodes """ def __init__(self, root_node=None, nodes=None, **kwargs): project_doc = kwargs.get("project") if project_doc is None: project_name = legacy_io.active_project() project_doc = get_project(project_name) Context._project_doc = project_doc self._asset = ( kwargs.get("asset_name") or legacy_io.Session["AVALON_ASSET"] ) self._asset_entity = get_current_project_asset(self._asset) self._root_node = root_node or nuke.root() self._nodes = self.get_nodes(nodes=nodes) self.data = kwargs def get_nodes(self, nodes=None, nodes_filter=None): if not isinstance(nodes, list) and not isinstance(nodes_filter, list): return [n for n in nuke.allNodes()] elif not isinstance(nodes, list) and isinstance(nodes_filter, list): nodes = list() for filter in nodes_filter: [nodes.append(n) for n in nuke.allNodes(filter=filter)] return nodes elif isinstance(nodes, list) and not isinstance(nodes_filter, list): return [n for n in self._nodes] elif isinstance(nodes, list) and isinstance(nodes_filter, list): for filter in nodes_filter: return [n for n in self._nodes if filter in n.Class()] def set_viewers_colorspace(self, viewer_dict): ''' Adds correct colorspace to viewer Arguments: viewer_dict (dict): adjustments from presets ''' if not isinstance(viewer_dict, dict): msg = "set_viewers_colorspace(): argument should be dictionary" log.error(msg) nuke.message(msg) return filter_knobs = [ "viewerProcess", "wipe_position" ] erased_viewers = [] for v in nuke.allNodes(filter="Viewer"): # set viewProcess to preset from settings v["viewerProcess"].setValue( str(viewer_dict["viewerProcess"]) ) if str(viewer_dict["viewerProcess"]) \ not in v["viewerProcess"].value(): copy_inputs = v.dependencies() copy_knobs = {k: v[k].value() for k in v.knobs() if k not in filter_knobs} # delete viewer with wrong settings erased_viewers.append(v["name"].value()) nuke.delete(v) # create new viewer nv = nuke.createNode("Viewer") # connect to original inputs for i, n in enumerate(copy_inputs): nv.setInput(i, n) # set copied knobs for k, v in copy_knobs.items(): print(k, v) nv[k].setValue(v) # set viewerProcess nv["viewerProcess"].setValue(str(viewer_dict["viewerProcess"])) if erased_viewers: log.warning( "Attention! Viewer nodes {} were erased." "It had wrong color profile".format(erased_viewers)) def set_root_colorspace(self, nuke_colorspace): ''' Adds correct colorspace to root Arguments: nuke_colorspace (dict): adjustmensts from presets ''' workfile_settings = nuke_colorspace["workfile"] # resolve config data if they are enabled in host config_data = None if nuke_colorspace.get("ocio_config", {}).get("enabled"): # switch ocio config to custom config workfile_settings["OCIO_config"] = "custom" workfile_settings["colorManagement"] = "OCIO" # get resolved ocio config path config_data = get_imageio_config( legacy_io.active_project(), "nuke" ) # first set OCIO if self._root_node["colorManagement"].value() \ not in str(workfile_settings["colorManagement"]): self._root_node["colorManagement"].setValue( str(workfile_settings["colorManagement"])) # we dont need the key anymore workfile_settings.pop("colorManagement") # second set ocio version if self._root_node["OCIO_config"].value() \ not in str(workfile_settings["OCIO_config"]): self._root_node["OCIO_config"].setValue( str(workfile_settings["OCIO_config"])) # we dont need the key anymore workfile_settings.pop("OCIO_config") # third set ocio custom path if config_data: self._root_node["customOCIOConfigPath"].setValue( str(config_data["path"]).replace("\\", "/") ) # backward compatibility, remove in case it exists workfile_settings.pop("customOCIOConfigPath") # then set the rest for knob, value in workfile_settings.items(): # skip unfilled ocio config path # it will be dict in value if isinstance(value, dict): continue if self._root_node[knob].value() not in value: self._root_node[knob].setValue(str(value)) log.debug("nuke.root()['{}'] changed to: {}".format( knob, value)) def set_writes_colorspace(self): ''' Adds correct colorspace to write node dict ''' for node in nuke.allNodes(filter="Group"): # get data from avalon knob avalon_knob_data = read_avalon_data(node) if avalon_knob_data.get("id") != "pyblish.avalon.instance": continue if "creator" not in avalon_knob_data: continue # establish families families = [avalon_knob_data["family"]] if avalon_knob_data.get("families"): families.append(avalon_knob_data.get("families")) nuke_imageio_writes = get_imageio_node_setting( node_class=avalon_knob_data["families"], plugin_name=avalon_knob_data["creator"], subset=avalon_knob_data["subset"] ) log.debug("nuke_imageio_writes: `{}`".format(nuke_imageio_writes)) if not nuke_imageio_writes: return write_node = None # get into the group node node.begin() for x in nuke.allNodes(): if x.Class() == "Write": write_node = x node.end() if not write_node: return try: # write all knobs to node for knob in nuke_imageio_writes["knobs"]: value = knob["value"] if isinstance(value, six.text_type): value = str(value) if str(value).startswith("0x"): value = int(value, 16) log.debug("knob: {}| value: {}".format( knob["name"], value )) write_node[knob["name"]].setValue(value) except TypeError: log.warning( "Legacy workflow didnt work, switching to current") set_node_knobs_from_settings( write_node, nuke_imageio_writes["knobs"]) def set_reads_colorspace(self, read_clrs_inputs): """ Setting colorspace to Read nodes Looping through all read nodes and tries to set colorspace based on regex rules in presets """ changes = {} for n in nuke.allNodes(): file = nuke.filename(n) if n.Class() != "Read": continue # check if any colorspace presets for read is matching preset_clrsp = None for input in read_clrs_inputs: if not bool(re.search(input["regex"], file)): continue preset_clrsp = input["colorspace"] log.debug(preset_clrsp) if preset_clrsp is not None: current = n["colorspace"].value() future = str(preset_clrsp) if current != future: changes[n.name()] = { "from": current, "to": future } log.debug(changes) if changes: msg = "Read nodes are not set to correct colospace:\n\n" for nname, knobs in changes.items(): msg += ( " - node: '{0}' is now '{1}' but should be '{2}'\n" ).format(nname, knobs["from"], knobs["to"]) msg += "\nWould you like to change it?" if nuke.ask(msg): for nname, knobs in changes.items(): n = nuke.toNode(nname) n["colorspace"].setValue(knobs["to"]) log.info( "Setting `{0}` to `{1}`".format( nname, knobs["to"])) def set_colorspace(self): ''' Setting colorpace following presets ''' # get imageio nuke_colorspace = get_nuke_imageio_settings() log.info("Setting colorspace to workfile...") try: self.set_root_colorspace(nuke_colorspace) except AttributeError: msg = "set_colorspace(): missing `workfile` settings in template" nuke.message(msg) log.info("Setting colorspace to viewers...") try: self.set_viewers_colorspace(nuke_colorspace["viewer"]) except AttributeError: msg = "set_colorspace(): missing `viewer` settings in template" nuke.message(msg) log.error(msg) log.info("Setting colorspace to write nodes...") try: self.set_writes_colorspace() except AttributeError as _error: nuke.message(_error) log.error(_error) log.info("Setting colorspace to read nodes...") read_clrs_inputs = nuke_colorspace["regexInputs"].get("inputs", []) if read_clrs_inputs: self.set_reads_colorspace(read_clrs_inputs) def reset_frame_range_handles(self): """Set frame range to current asset""" if "data" not in self._asset_entity: msg = "Asset {} don't have set any 'data'".format(self._asset) log.warning(msg) nuke.message(msg) return data = self._asset_entity["data"] log.debug("__ asset data: `{}`".format(data)) missing_cols = [] check_cols = ["fps", "frameStart", "frameEnd", "handleStart", "handleEnd"] for col in check_cols: if col not in data: missing_cols.append(col) if len(missing_cols) > 0: missing = ", ".join(missing_cols) msg = "'{}' are not set for asset '{}'!".format( missing, self._asset) log.warning(msg) nuke.message(msg) return # get handles values handle_start = data["handleStart"] handle_end = data["handleEnd"] fps = float(data["fps"]) frame_start = int(data["frameStart"]) - handle_start frame_end = int(data["frameEnd"]) + handle_end self._root_node["lock_range"].setValue(False) self._root_node["fps"].setValue(fps) self._root_node["first_frame"].setValue(frame_start) self._root_node["last_frame"].setValue(frame_end) self._root_node["lock_range"].setValue(True) # setting active viewers try: nuke.frame(int(data["frameStart"])) except Exception as e: log.warning("no viewer in scene: `{}`".format(e)) range = '{0}-{1}'.format( int(data["frameStart"]), int(data["frameEnd"]) ) for node in nuke.allNodes(filter="Viewer"): node['frame_range'].setValue(range) node['frame_range_lock'].setValue(True) node['frame_range'].setValue(range) node['frame_range_lock'].setValue(True) if not ASSIST: set_node_data( self._root_node, INSTANCE_DATA_KNOB, { "handleStart": int(handle_start), "handleEnd": int(handle_end) } ) else: log.warning( "NukeAssist mode is not allowing " "updating custom knobs..." ) def reset_resolution(self): """Set resolution to project resolution.""" log.info("Resetting resolution") project_name = legacy_io.active_project() project = get_project(project_name) asset_name = legacy_io.Session["AVALON_ASSET"] asset = get_asset_by_name(project_name, asset_name) asset_data = asset.get('data', {}) data = { "width": int(asset_data.get( 'resolutionWidth', asset_data.get('resolution_width'))), "height": int(asset_data.get( 'resolutionHeight', asset_data.get('resolution_height'))), "pixel_aspect": asset_data.get( 'pixelAspect', asset_data.get('pixel_aspect', 1)), "name": project["name"] } if any(x for x in data.values() if x is None): msg = ("Missing set shot attributes in DB." "\nContact your supervisor!." "\n\nWidth: `{width}`" "\nHeight: `{height}`" "\nPixel Asspect: `{pixel_aspect}`").format(**data) log.error(msg) nuke.message(msg) existing_format = None for format in nuke.formats(): if data["name"] == format.name(): existing_format = format break if existing_format: # Enforce existing format to be correct. existing_format.setWidth(data["width"]) existing_format.setHeight(data["height"]) existing_format.setPixelAspect(data["pixel_aspect"]) else: format_string = self.make_format_string(**data) log.info("Creating new format: {}".format(format_string)) nuke.addFormat(format_string) nuke.root()["format"].setValue(data["name"]) log.info("Format is set.") def make_format_string(self, **kwargs): if kwargs.get("r"): return ( "{width} " "{height} " "{x} " "{y} " "{r} " "{t} " "{pixel_aspect:.2f} " "{name}".format(**kwargs) ) else: return ( "{width} " "{height} " "{pixel_aspect:.2f} " "{name}".format(**kwargs) ) def set_context_settings(self): # replace reset resolution from avalon core to pype's self.reset_resolution() # replace reset resolution from avalon core to pype's self.reset_frame_range_handles() # add colorspace menu item self.set_colorspace() def set_favorites(self): from .utils import set_context_favorites work_dir = os.getenv("AVALON_WORKDIR") asset = os.getenv("AVALON_ASSET") favorite_items = OrderedDict() # project # get project's root and split to parts projects_root = os.path.normpath(work_dir.split( Context.project_name)[0]) # add project name project_dir = os.path.join(projects_root, Context.project_name) + "/" # add to favorites favorite_items.update({"Project dir": project_dir.replace("\\", "/")}) # asset asset_root = os.path.normpath(work_dir.split( asset)[0]) # add asset name asset_dir = os.path.join(asset_root, asset) + "/" # add to favorites favorite_items.update({"Shot dir": asset_dir.replace("\\", "/")}) # workdir favorite_items.update({"Work dir": work_dir.replace("\\", "/")}) set_context_favorites(favorite_items) def get_write_node_template_attr(node): ''' Gets all defined data from presets ''' # TODO: add identifiers to settings and rename settings key plugin_names_mapping = { "create_write_image": "CreateWriteImage", "create_write_prerender": "CreateWritePrerender", "create_write_render": "CreateWriteRender" } # get avalon data from node node_data = get_node_data(node, INSTANCE_DATA_KNOB) identifier = node_data["creator_identifier"] # return template data return get_imageio_node_setting( node_class="Write", plugin_name=plugin_names_mapping[identifier], subset=node_data["subset"] ) def get_dependent_nodes(nodes): """Get all dependent nodes connected to the list of nodes. Looking for connections outside of the nodes in incoming argument. Arguments: nodes (list): list of nuke.Node objects Returns: connections_in: dictionary of nodes and its dependencies connections_out: dictionary of nodes and its dependency """ connections_in = dict() connections_out = dict() node_names = [n.name() for n in nodes] for node in nodes: inputs = node.dependencies() outputs = node.dependent() # collect all inputs outside test_in = [(i, n) for i, n in enumerate(inputs) if n.name() not in node_names] if test_in: connections_in.update({ node: test_in }) # collect all outputs outside test_out = [i for i in outputs if i.name() not in node_names] if test_out: # only one dependent node is allowed connections_out.update({ node: test_out[-1] }) return connections_in, connections_out def find_free_space_to_paste_nodes( nodes, group=nuke.root(), direction="right", offset=300 ): """ For getting coordinates in DAG (node graph) for placing new nodes Arguments: nodes (list): list of nuke.Node objects group (nuke.Node) [optional]: object in which context it is direction (str) [optional]: where we want it to be placed [left, right, top, bottom] offset (int) [optional]: what offset it is from rest of nodes Returns: xpos (int): x coordinace in DAG ypos (int): y coordinace in DAG """ if len(nodes) == 0: return 0, 0 group_xpos = list() group_ypos = list() # get local coordinates of all nodes nodes_xpos = [n.xpos() for n in nodes] + \ [n.xpos() + n.screenWidth() for n in nodes] nodes_ypos = [n.ypos() for n in nodes] + \ [n.ypos() + n.screenHeight() for n in nodes] # get complete screen size of all nodes to be placed in nodes_screen_width = max(nodes_xpos) - min(nodes_xpos) nodes_screen_heigth = max(nodes_ypos) - min(nodes_ypos) # get screen size (r,l,t,b) of all nodes in `group` with group: group_xpos = [n.xpos() for n in nuke.allNodes() if n not in nodes] + \ [n.xpos() + n.screenWidth() for n in nuke.allNodes() if n not in nodes] group_ypos = [n.ypos() for n in nuke.allNodes() if n not in nodes] + \ [n.ypos() + n.screenHeight() for n in nuke.allNodes() if n not in nodes] # calc output left if direction in "left": xpos = min(group_xpos) - abs(nodes_screen_width) - abs(offset) ypos = min(group_ypos) return xpos, ypos # calc output right if direction in "right": xpos = max(group_xpos) + abs(offset) ypos = min(group_ypos) return xpos, ypos # calc output top if direction in "top": xpos = min(group_xpos) ypos = min(group_ypos) - abs(nodes_screen_heigth) - abs(offset) return xpos, ypos # calc output bottom if direction in "bottom": xpos = min(group_xpos) ypos = max(group_ypos) + abs(offset) return xpos, ypos @contextlib.contextmanager def maintained_selection(): """Maintain selection during context Example: >>> with maintained_selection(): ... node["selected"].setValue(True) >>> print(node["selected"].value()) False """ previous_selection = nuke.selectedNodes() try: yield finally: # unselect all selection in case there is some reset_selection() # and select all previously selected nodes if previous_selection: select_nodes(previous_selection) def reset_selection(): """Deselect all selected nodes""" for node in nuke.selectedNodes(): node["selected"].setValue(False) def select_nodes(nodes): """Selects all inputed nodes Arguments: nodes (list): nuke nodes to be selected """ assert isinstance(nodes, (list, tuple)), "nodes has to be list or tuple" for node in nodes: node["selected"].setValue(True) def launch_workfiles_app(): """Show workfiles tool on nuke launch. Trigger to show workfiles tool on application launch. Can be executed only once all other calls are ignored. Workfiles tool show is deffered after application initialization using QTimer. """ if Context.workfiles_launched: return Context.workfiles_launched = True # get all imortant settings open_at_start = env_value_to_bool( env_key="OPENPYPE_WORKFILE_TOOL_ON_START", default=None) # return if none is defined if not open_at_start: return # Show workfiles tool using timer # - this will be probably triggered during initialization in that case # the application is not be able to show uis so it must be # deffered using timer # - timer should be processed when initialization ends # When applications starts to process events. timer = QtCore.QTimer() timer.timeout.connect(_launch_workfile_app) timer.setInterval(100) Context.workfiles_tool_timer = timer timer.start() def _launch_workfile_app(): # Safeguard to not show window when application is still starting up # or is already closing down. closing_down = QtWidgets.QApplication.closingDown() starting_up = QtWidgets.QApplication.startingUp() # Stop the timer if application finished start up of is closing down if closing_down or not starting_up: Context.workfiles_tool_timer.stop() Context.workfiles_tool_timer = None # Skip if application is starting up or closing down if starting_up or closing_down: return # Make sure on top is enabled on first show so the window is not hidden # under main nuke window # - this happened on Centos 7 and it is because the focus of nuke # changes to the main window after showing because of initialization # which moves workfiles tool under it host_tools.show_workfiles(parent=None, on_top=True) def process_workfile_builder(): # to avoid looping of the callback, remove it! nuke.removeOnCreate(process_workfile_builder, nodeClass="Root") # get state from settings project_settings = get_current_project_settings() workfile_builder = project_settings["nuke"].get( "workfile_builder", {}) # get all imortant settings openlv_on = env_value_to_bool( env_key="AVALON_OPEN_LAST_WORKFILE", default=None) # get settings createfv_on = workfile_builder.get("create_first_version") or None builder_on = workfile_builder.get("builder_on_start") or None last_workfile_path = os.environ.get("AVALON_LAST_WORKFILE") # generate first version in file not existing and feature is enabled if createfv_on and not os.path.exists(last_workfile_path): # get custom template path if any custom_template_path = get_custom_workfile_template_from_session( project_settings=project_settings ) # if custom template is defined if custom_template_path: log.info("Adding nodes from `{}`...".format( custom_template_path )) try: # import nodes into current script nuke.nodePaste(custom_template_path) except RuntimeError: raise RuntimeError(( "Template defined for project: {} is not working. " "Talk to your manager for an advise").format( custom_template_path)) # if builder at start is defined if builder_on: log.info("Building nodes from presets...") # build nodes by defined presets BuildWorkfile().process() log.info("Saving script as version `{}`...".format( last_workfile_path )) # safe file as version save_file(last_workfile_path) return # skip opening of last version if it is not enabled if not openlv_on or not os.path.exists(last_workfile_path): return log.info("Opening last workfile...") # open workfile open_file(last_workfile_path) def start_workfile_template_builder(): from .workfile_template_builder import ( build_workfile_template ) # to avoid looping of the callback, remove it! log.info("Starting workfile template builder...") build_workfile_template(workfile_creation_enabled=True) # remove callback since it would be duplicating the workfile nuke.removeOnCreate(start_workfile_template_builder, nodeClass="Root") @deprecated def recreate_instance(origin_node, avalon_data=None): """Recreate input instance to different data Args: origin_node (nuke.Node): Nuke node to be recreating from avalon_data (dict, optional): data to be used in new node avalon_data Returns: nuke.Node: newly created node """ knobs_wl = ["render", "publish", "review", "ypos", "use_limit", "first", "last"] # get data from avalon knobs data = get_avalon_knob_data( origin_node) # add input data to avalon data if avalon_data: data.update(avalon_data) # capture all node knobs allowed in op_knobs knobs_data = {k: origin_node[k].value() for k in origin_node.knobs() for key in knobs_wl if key in k} # get node dependencies inputs = origin_node.dependencies() outputs = origin_node.dependent() # remove the node nuke.delete(origin_node) # create new node # get appropriate plugin class creator_plugin = None for Creator in discover_legacy_creator_plugins(): if Creator.__name__ == data["creator"]: creator_plugin = Creator break # create write node with creator new_node_name = data["subset"] new_node = creator_plugin(new_node_name, data["asset"]).process() # white listed knobs to the new node for _k, _v in knobs_data.items(): try: print(_k, _v) new_node[_k].setValue(_v) except Exception as e: print(e) # connect to original inputs for i, n in enumerate(inputs): new_node.setInput(i, n) # connect to outputs if len(outputs) > 0: for dn in outputs: dn.setInput(0, new_node) return new_node def add_scripts_menu(): try: from scriptsmenu import launchfornuke except ImportError: log.warning( "Skipping studio.menu install, because " "'scriptsmenu' module seems unavailable." ) return # load configuration of custom menu project_settings = get_project_settings(os.getenv("AVALON_PROJECT")) config = project_settings["nuke"]["scriptsmenu"]["definition"] _menu = project_settings["nuke"]["scriptsmenu"]["name"] if not config: log.warning("Skipping studio menu, no definition found.") return # run the launcher for Maya menu studio_menu = launchfornuke.main(title=_menu.title()) # apply configuration studio_menu.build_from_configuration(studio_menu, config) def add_scripts_gizmo(): # load configuration of custom menu project_settings = get_project_settings(os.getenv("AVALON_PROJECT")) platform_name = platform.system().lower() for gizmo_settings in project_settings["nuke"]["gizmo"]: gizmo_list_definition = gizmo_settings["gizmo_definition"] toolbar_name = gizmo_settings["toolbar_menu_name"] # gizmo_toolbar_path = gizmo_settings["gizmo_toolbar_path"] gizmo_source_dir = gizmo_settings.get( "gizmo_source_dir", {}).get(platform_name) toolbar_icon_path = gizmo_settings.get( "toolbar_icon_path", {}).get(platform_name) if not gizmo_source_dir: log.debug("Skipping studio gizmo `{}`, " "no gizmo path found.".format(toolbar_name) ) return if not gizmo_list_definition: log.debug("Skipping studio gizmo `{}`, " "no definition found.".format(toolbar_name) ) return if toolbar_icon_path: try: toolbar_icon_path = toolbar_icon_path.format(**os.environ) except KeyError as e: log.error( "This environment variable doesn't exist: {}".format(e) ) existing_gizmo_path = [] for source_dir in gizmo_source_dir: try: resolve_source_dir = source_dir.format(**os.environ) except KeyError as e: log.error( "This environment variable doesn't exist: {}".format(e) ) continue if not os.path.exists(resolve_source_dir): log.warning( "The source of gizmo `{}` does not exists".format( resolve_source_dir ) ) continue existing_gizmo_path.append(resolve_source_dir) # run the launcher for Nuke toolbar toolbar_menu = gizmo_menu.GizmoMenu( title=toolbar_name, icon=toolbar_icon_path ) # apply configuration toolbar_menu.add_gizmo_path(existing_gizmo_path) toolbar_menu.build_from_configuration(gizmo_list_definition) class NukeDirmap(HostDirmap): def __init__(self, file_name, *args, **kwargs): """ Args: file_name (str): full path of referenced file from workfiles *args (tuple): Positional arguments for 'HostDirmap' class **kwargs (dict): Keyword arguments for 'HostDirmap' class """ self.file_name = file_name super(NukeDirmap, self).__init__(*args, **kwargs) def on_enable_dirmap(self): pass def dirmap_routine(self, source_path, destination_path): source_path = source_path.lower().replace(os.sep, '/') destination_path = destination_path.lower().replace(os.sep, '/') log.debug("Map: {} with: {}->{}".format(self.file_name, source_path, destination_path)) if platform.system().lower() == "windows": self.file_name = self.file_name.lower().replace( source_path, destination_path) else: self.file_name = self.file_name.replace( source_path, destination_path) class DirmapCache: """Caching class to get settings and sync_module easily and only once.""" _project_name = None _project_settings = None _sync_module = None _mapping = None @classmethod def project_name(cls): if cls._project_name is None: cls._project_name = os.getenv("AVALON_PROJECT") return cls._project_name @classmethod def project_settings(cls): if cls._project_settings is None: cls._project_settings = get_project_settings(cls.project_name()) return cls._project_settings @classmethod def sync_module(cls): if cls._sync_module is None: cls._sync_module = ModulesManager().modules_by_name["sync_server"] return cls._sync_module @classmethod def mapping(cls): return cls._mapping @classmethod def set_mapping(cls, mapping): cls._mapping = mapping def dirmap_file_name_filter(file_name): """Nuke callback function with single full path argument. Checks project settings for potential mapping from source to dest. """ dirmap_processor = NukeDirmap( file_name, "nuke", DirmapCache.project_name(), DirmapCache.project_settings(), DirmapCache.sync_module(), ) if not DirmapCache.mapping(): DirmapCache.set_mapping(dirmap_processor.get_mappings()) dirmap_processor.process_dirmap(DirmapCache.mapping()) if os.path.exists(dirmap_processor.file_name): return dirmap_processor.file_name return file_name @contextlib.contextmanager def node_tempfile(): """Create a temp file where node is pasted during duplication. This is to avoid using clipboard for node duplication. """ tmp_file = tempfile.NamedTemporaryFile( mode="w", prefix="openpype_nuke_temp_", suffix=".nk", delete=False ) tmp_file.close() node_tempfile_path = tmp_file.name try: # Yield the path where node can be copied yield node_tempfile_path finally: # Remove the file at the end os.remove(node_tempfile_path) def duplicate_node(node): reset_selection() # select required node for duplication node.setSelected(True) with node_tempfile() as filepath: # copy selected to temp filepath nuke.nodeCopy(filepath) # reset selection reset_selection() # paste node and selection is on it only dupli_node = nuke.nodePaste(filepath) # reset selection reset_selection() return dupli_node def get_group_io_nodes(nodes): """Get the input and the output of a group of nodes.""" if not nodes: raise ValueError("there is no nodes in the list") input_node = None output_node = None if len(nodes) == 1: input_node = output_node = nodes[0] else: for node in nodes: if "Input" in node.name(): input_node = node if "Output" in node.name(): output_node = node if input_node is not None and output_node is not None: break if input_node is None: log.warning("No Input found") if output_node is None: log.warning("No Output found") return input_node, output_node def get_extreme_positions(nodes): """Get the 4 numbers that represent the box of a group of nodes.""" if not nodes: raise ValueError("there is no nodes in the list") nodes_xpos = [n.xpos() for n in nodes] + \ [n.xpos() + n.screenWidth() for n in nodes] nodes_ypos = [n.ypos() for n in nodes] + \ [n.ypos() + n.screenHeight() for n in nodes] min_x, min_y = (min(nodes_xpos), min(nodes_ypos)) max_x, max_y = (max(nodes_xpos), max(nodes_ypos)) return min_x, min_y, max_x, max_y def refresh_node(node): """Correct a bug caused by the multi-threading of nuke. Refresh the node to make sure that it takes the desired attributes. """ x = node.xpos() y = node.ypos() nuke.autoplaceSnap(node) node.setXYpos(x, y) def refresh_nodes(nodes): for node in nodes: refresh_node(node) def get_names_from_nodes(nodes): """Get list of nodes names. Args: nodes(List[nuke.Node]): List of nodes to convert into names. Returns: List[str]: Name of passed nodes. """ return [ node.name() for node in nodes ] def get_nodes_by_names(names): """Get list of nuke nodes based on their names. Args: names (List[str]): List of node names to be found. Returns: List[nuke.Node]: List of nodes found by name. """ return [ nuke.toNode(name) for name in names ] def get_viewer_config_from_string(input_string): """Convert string to display and viewer string Args: input_string (str): string with viewer Raises: IndexError: if more then one slash in input string IndexError: if missing closing bracket Returns: tuple[str]: display, viewer """ display = None viewer = input_string # check if () or / or \ in name if "/" in viewer: split = viewer.split("/") # rise if more then one column if len(split) > 2: raise IndexError(( "Viewer Input string is not correct. " "more then two `/` slashes! {}" ).format(input_string)) viewer = split[1] display = split[0] elif "(" in viewer: pattern = r"([\w\d\s\.\-]+).*[(](.*)[)]" result = re.findall(pattern, viewer) try: result = result.pop() display = str(result[1]).rstrip() viewer = str(result[0]).rstrip() except IndexError: raise IndexError(( "Viewer Input string is not correct. " "Missing bracket! {}" ).format(input_string)) return (display, viewer)