mirror of
https://github.com/ynput/ayon-core.git
synced 2026-01-01 16:34:53 +01:00
639 lines
22 KiB
Python
639 lines
22 KiB
Python
import re
|
|
import collections
|
|
|
|
import nuke
|
|
|
|
from openpype.client import get_representations
|
|
from openpype.pipeline import legacy_io
|
|
from openpype.pipeline.workfile.abstract_template_loader import (
|
|
AbstractPlaceholder,
|
|
AbstractTemplateLoader,
|
|
)
|
|
|
|
from .lib import (
|
|
find_free_space_to_paste_nodes,
|
|
get_extreme_positions,
|
|
get_group_io_nodes,
|
|
imprint,
|
|
refresh_node,
|
|
refresh_nodes,
|
|
reset_selection,
|
|
get_names_from_nodes,
|
|
get_nodes_by_names,
|
|
select_nodes,
|
|
duplicate_node,
|
|
node_tempfile,
|
|
)
|
|
|
|
from .lib_template_builder import (
|
|
delete_placeholder_attributes,
|
|
get_placeholder_attributes,
|
|
hide_placeholder_attributes
|
|
)
|
|
|
|
PLACEHOLDER_SET = "PLACEHOLDERS_SET"
|
|
|
|
|
|
class NukeTemplateLoader(AbstractTemplateLoader):
|
|
"""Concrete implementation of AbstractTemplateLoader for Nuke
|
|
|
|
"""
|
|
|
|
def import_template(self, path):
|
|
"""Import template into current scene.
|
|
Block if a template is already loaded.
|
|
|
|
Args:
|
|
path (str): A path to current template (usually given by
|
|
get_template_path implementation)
|
|
|
|
Returns:
|
|
bool: Wether the template was succesfully imported or not
|
|
"""
|
|
|
|
# TODO check if the template is already imported
|
|
|
|
nuke.nodePaste(path)
|
|
reset_selection()
|
|
|
|
return True
|
|
|
|
def preload(self, placeholder, loaders_by_name, last_representation):
|
|
placeholder.data["nodes_init"] = nuke.allNodes()
|
|
placeholder.data["last_repre_id"] = str(last_representation["_id"])
|
|
|
|
def populate_template(self, ignored_ids=None):
|
|
processed_key = "_node_processed"
|
|
|
|
processed_nodes = []
|
|
nodes = self.get_template_nodes()
|
|
while nodes:
|
|
# Mark nodes as processed so they're not re-executed
|
|
# - that can happen if processing of placeholder node fails
|
|
for node in nodes:
|
|
imprint(node, {processed_key: True})
|
|
processed_nodes.append(node)
|
|
|
|
super(NukeTemplateLoader, self).populate_template(ignored_ids)
|
|
|
|
# Recollect nodes to repopulate
|
|
nodes = []
|
|
for node in self.get_template_nodes():
|
|
# Skip already processed nodes
|
|
if (
|
|
processed_key in node.knobs()
|
|
and node.knob(processed_key).value()
|
|
):
|
|
continue
|
|
nodes.append(node)
|
|
|
|
for node in processed_nodes:
|
|
knob = node.knob(processed_key)
|
|
if knob is not None:
|
|
node.removeKnob(knob)
|
|
|
|
@staticmethod
|
|
def get_template_nodes():
|
|
placeholders = []
|
|
all_groups = collections.deque()
|
|
all_groups.append(nuke.thisGroup())
|
|
while all_groups:
|
|
group = all_groups.popleft()
|
|
for node in group.nodes():
|
|
if isinstance(node, nuke.Group):
|
|
all_groups.append(node)
|
|
|
|
node_knobs = node.knobs()
|
|
if (
|
|
"builder_type" not in node_knobs
|
|
or "is_placeholder" not in node_knobs
|
|
or not node.knob("is_placeholder").value()
|
|
):
|
|
continue
|
|
|
|
if "empty" in node_knobs and node.knob("empty").value():
|
|
continue
|
|
|
|
placeholders.append(node)
|
|
|
|
return placeholders
|
|
|
|
def update_missing_containers(self):
|
|
nodes_by_id = collections.defaultdict(list)
|
|
|
|
for node in nuke.allNodes():
|
|
node_knobs = node.knobs().keys()
|
|
if "repre_id" in node_knobs:
|
|
repre_id = node.knob("repre_id").getValue()
|
|
nodes_by_id[repre_id].append(node.name())
|
|
|
|
if "empty" in node_knobs:
|
|
node.removeKnob(node.knob("empty"))
|
|
imprint(node, {"empty": False})
|
|
|
|
for node_names in nodes_by_id.values():
|
|
node = None
|
|
for node_name in node_names:
|
|
node_by_name = nuke.toNode(node_name)
|
|
if "builder_type" in node_by_name.knobs().keys():
|
|
node = node_by_name
|
|
break
|
|
|
|
if node is None:
|
|
continue
|
|
|
|
placeholder = nuke.nodes.NoOp()
|
|
placeholder.setName("PLACEHOLDER")
|
|
placeholder.knob("tile_color").setValue(4278190335)
|
|
attributes = get_placeholder_attributes(node, enumerate=True)
|
|
imprint(placeholder, attributes)
|
|
pos_x = int(node.knob("x").getValue())
|
|
pos_y = int(node.knob("y").getValue())
|
|
placeholder.setXYpos(pos_x, pos_y)
|
|
imprint(placeholder, {"nb_children": 1})
|
|
refresh_node(placeholder)
|
|
|
|
self.populate_template(self.get_loaded_containers_by_id())
|
|
|
|
def get_loaded_containers_by_id(self):
|
|
repre_ids = set()
|
|
for node in nuke.allNodes():
|
|
if "repre_id" in node.knobs():
|
|
repre_ids.add(node.knob("repre_id").getValue())
|
|
|
|
# Removes duplicates in the list
|
|
return list(repre_ids)
|
|
|
|
def delete_placeholder(self, placeholder):
|
|
placeholder_node = placeholder.data["node"]
|
|
last_loaded = placeholder.data["last_loaded"]
|
|
if not placeholder.data["delete"]:
|
|
if "empty" in placeholder_node.knobs().keys():
|
|
placeholder_node.removeKnob(placeholder_node.knob("empty"))
|
|
imprint(placeholder_node, {"empty": True})
|
|
return
|
|
|
|
if not last_loaded:
|
|
nuke.delete(placeholder_node)
|
|
return
|
|
|
|
if "last_loaded" in placeholder_node.knobs().keys():
|
|
for node_name in placeholder_node.knob("last_loaded").values():
|
|
node = nuke.toNode(node_name)
|
|
try:
|
|
delete_placeholder_attributes(node)
|
|
except Exception:
|
|
pass
|
|
|
|
last_loaded_names = [
|
|
loaded_node.name()
|
|
for loaded_node in last_loaded
|
|
]
|
|
imprint(placeholder_node, {"last_loaded": last_loaded_names})
|
|
|
|
for node in last_loaded:
|
|
refresh_node(node)
|
|
refresh_node(placeholder_node)
|
|
if "builder_type" not in node.knobs().keys():
|
|
attributes = get_placeholder_attributes(placeholder_node, True)
|
|
imprint(node, attributes)
|
|
imprint(node, {"is_placeholder": False})
|
|
hide_placeholder_attributes(node)
|
|
node.knob("is_placeholder").setVisible(False)
|
|
imprint(
|
|
node,
|
|
{
|
|
"x": placeholder_node.xpos(),
|
|
"y": placeholder_node.ypos()
|
|
}
|
|
)
|
|
node.knob("x").setVisible(False)
|
|
node.knob("y").setVisible(False)
|
|
nuke.delete(placeholder_node)
|
|
|
|
|
|
class NukePlaceholder(AbstractPlaceholder):
|
|
"""Concrete implementation of AbstractPlaceholder for Nuke"""
|
|
|
|
optional_keys = {"asset", "subset", "hierarchy"}
|
|
|
|
def get_data(self, node):
|
|
user_data = dict()
|
|
node_knobs = node.knobs()
|
|
for attr in self.required_keys.union(self.optional_keys):
|
|
if attr in node_knobs:
|
|
user_data[attr] = node_knobs[attr].getValue()
|
|
user_data["node"] = node
|
|
|
|
nb_children = 0
|
|
if "nb_children" in node_knobs:
|
|
nb_children = int(node_knobs["nb_children"].getValue())
|
|
user_data["nb_children"] = nb_children
|
|
|
|
siblings = []
|
|
if "siblings" in node_knobs:
|
|
siblings = node_knobs["siblings"].values()
|
|
user_data["siblings"] = siblings
|
|
|
|
node_full_name = node.fullName()
|
|
user_data["group_name"] = node_full_name.rpartition(".")[0]
|
|
user_data["last_loaded"] = []
|
|
user_data["delete"] = False
|
|
self.data = user_data
|
|
|
|
def parent_in_hierarchy(self, containers):
|
|
return
|
|
|
|
def create_sib_copies(self):
|
|
""" creating copies of the palce_holder siblings (the ones who were
|
|
loaded with it) for the new nodes added
|
|
|
|
Returns :
|
|
copies (dict) : with copied nodes names and their copies
|
|
"""
|
|
|
|
copies = {}
|
|
siblings = get_nodes_by_names(self.data["siblings"])
|
|
for node in siblings:
|
|
new_node = duplicate_node(node)
|
|
|
|
x_init = int(new_node.knob("x_init").getValue())
|
|
y_init = int(new_node.knob("y_init").getValue())
|
|
new_node.setXYpos(x_init, y_init)
|
|
if isinstance(new_node, nuke.BackdropNode):
|
|
w_init = new_node.knob("w_init").getValue()
|
|
h_init = new_node.knob("h_init").getValue()
|
|
new_node.knob("bdwidth").setValue(w_init)
|
|
new_node.knob("bdheight").setValue(h_init)
|
|
refresh_node(node)
|
|
|
|
if "repre_id" in node.knobs().keys():
|
|
node.removeKnob(node.knob("repre_id"))
|
|
copies[node.name()] = new_node
|
|
return copies
|
|
|
|
def fix_z_order(self):
|
|
"""Fix the problem of z_order when a backdrop is loaded."""
|
|
|
|
nodes_loaded = self.data["last_loaded"]
|
|
loaded_backdrops = []
|
|
bd_orders = set()
|
|
for node in nodes_loaded:
|
|
if isinstance(node, nuke.BackdropNode):
|
|
loaded_backdrops.append(node)
|
|
bd_orders.add(node.knob("z_order").getValue())
|
|
|
|
if not bd_orders:
|
|
return
|
|
|
|
sib_orders = set()
|
|
for node_name in self.data["siblings"]:
|
|
node = nuke.toNode(node_name)
|
|
if isinstance(node, nuke.BackdropNode):
|
|
sib_orders.add(node.knob("z_order").getValue())
|
|
|
|
if not sib_orders:
|
|
return
|
|
|
|
min_order = min(bd_orders)
|
|
max_order = max(sib_orders)
|
|
for backdrop_node in loaded_backdrops:
|
|
z_order = backdrop_node.knob("z_order").getValue()
|
|
backdrop_node.knob("z_order").setValue(
|
|
z_order + max_order - min_order + 1)
|
|
|
|
def update_nodes(self, nodes, considered_nodes, offset_y=None):
|
|
"""Adjust backdrop nodes dimensions and positions.
|
|
|
|
Considering some nodes sizes.
|
|
|
|
Args:
|
|
nodes (list): list of nodes to update
|
|
considered_nodes (list): list of nodes to consider while updating
|
|
positions and dimensions
|
|
offset (int): distance between copies
|
|
"""
|
|
|
|
placeholder_node = self.data["node"]
|
|
|
|
min_x, min_y, max_x, max_y = get_extreme_positions(considered_nodes)
|
|
|
|
diff_x = diff_y = 0
|
|
contained_nodes = [] # for backdrops
|
|
|
|
if offset_y is None:
|
|
width_ph = placeholder_node.screenWidth()
|
|
height_ph = placeholder_node.screenHeight()
|
|
diff_y = max_y - min_y - height_ph
|
|
diff_x = max_x - min_x - width_ph
|
|
contained_nodes = [placeholder_node]
|
|
min_x = placeholder_node.xpos()
|
|
min_y = placeholder_node.ypos()
|
|
else:
|
|
siblings = get_nodes_by_names(self.data["siblings"])
|
|
minX, _, maxX, _ = get_extreme_positions(siblings)
|
|
diff_y = max_y - min_y + 20
|
|
diff_x = abs(max_x - min_x - maxX + minX)
|
|
contained_nodes = considered_nodes
|
|
|
|
if diff_y <= 0 and diff_x <= 0:
|
|
return
|
|
|
|
for node in nodes:
|
|
refresh_node(node)
|
|
|
|
if (
|
|
node == placeholder_node
|
|
or node in considered_nodes
|
|
):
|
|
continue
|
|
|
|
if (
|
|
not isinstance(node, nuke.BackdropNode)
|
|
or (
|
|
isinstance(node, nuke.BackdropNode)
|
|
and not set(contained_nodes) <= set(node.getNodes())
|
|
)
|
|
):
|
|
if offset_y is None and node.xpos() >= min_x:
|
|
node.setXpos(node.xpos() + diff_x)
|
|
|
|
if node.ypos() >= min_y:
|
|
node.setYpos(node.ypos() + diff_y)
|
|
|
|
else:
|
|
width = node.screenWidth()
|
|
height = node.screenHeight()
|
|
node.knob("bdwidth").setValue(width + diff_x)
|
|
node.knob("bdheight").setValue(height + diff_y)
|
|
|
|
refresh_node(node)
|
|
|
|
def imprint_inits(self):
|
|
"""Add initial positions and dimensions to the attributes"""
|
|
|
|
for node in nuke.allNodes():
|
|
refresh_node(node)
|
|
imprint(node, {"x_init": node.xpos(), "y_init": node.ypos()})
|
|
node.knob("x_init").setVisible(False)
|
|
node.knob("y_init").setVisible(False)
|
|
width = node.screenWidth()
|
|
height = node.screenHeight()
|
|
if "bdwidth" in node.knobs():
|
|
imprint(node, {"w_init": width, "h_init": height})
|
|
node.knob("w_init").setVisible(False)
|
|
node.knob("h_init").setVisible(False)
|
|
refresh_node(node)
|
|
|
|
def imprint_siblings(self):
|
|
"""
|
|
- add siblings names to placeholder attributes (nodes loaded with it)
|
|
- add Id to the attributes of all the other nodes
|
|
"""
|
|
|
|
loaded_nodes = self.data["last_loaded"]
|
|
loaded_nodes_set = set(loaded_nodes)
|
|
data = {"repre_id": str(self.data["last_repre_id"])}
|
|
|
|
for node in loaded_nodes:
|
|
node_knobs = node.knobs()
|
|
if "builder_type" not in node_knobs:
|
|
# save the id of representation for all imported nodes
|
|
imprint(node, data)
|
|
node.knob("repre_id").setVisible(False)
|
|
refresh_node(node)
|
|
continue
|
|
|
|
if (
|
|
"is_placeholder" not in node_knobs
|
|
or (
|
|
"is_placeholder" in node_knobs
|
|
and node.knob("is_placeholder").value()
|
|
)
|
|
):
|
|
siblings = list(loaded_nodes_set - {node})
|
|
siblings_name = get_names_from_nodes(siblings)
|
|
siblings = {"siblings": siblings_name}
|
|
imprint(node, siblings)
|
|
|
|
def set_loaded_connections(self):
|
|
"""
|
|
set inputs and outputs of loaded nodes"""
|
|
|
|
placeholder_node = self.data["node"]
|
|
input_node, output_node = get_group_io_nodes(self.data["last_loaded"])
|
|
for node in placeholder_node.dependent():
|
|
for idx in range(node.inputs()):
|
|
if node.input(idx) == placeholder_node:
|
|
node.setInput(idx, output_node)
|
|
|
|
for node in placeholder_node.dependencies():
|
|
for idx in range(placeholder_node.inputs()):
|
|
if placeholder_node.input(idx) == node:
|
|
input_node.setInput(0, node)
|
|
|
|
def set_copies_connections(self, copies):
|
|
"""Set inputs and outputs of the copies.
|
|
|
|
Args:
|
|
copies (dict): Copied nodes by their names.
|
|
"""
|
|
|
|
last_input, last_output = get_group_io_nodes(self.data["last_loaded"])
|
|
siblings = get_nodes_by_names(self.data["siblings"])
|
|
siblings_input, siblings_output = get_group_io_nodes(siblings)
|
|
copy_input = copies[siblings_input.name()]
|
|
copy_output = copies[siblings_output.name()]
|
|
|
|
for node_init in siblings:
|
|
if node_init == siblings_output:
|
|
continue
|
|
|
|
node_copy = copies[node_init.name()]
|
|
for node in node_init.dependent():
|
|
for idx in range(node.inputs()):
|
|
if node.input(idx) != node_init:
|
|
continue
|
|
|
|
if node in siblings:
|
|
copies[node.name()].setInput(idx, node_copy)
|
|
else:
|
|
last_input.setInput(0, node_copy)
|
|
|
|
for node in node_init.dependencies():
|
|
for idx in range(node_init.inputs()):
|
|
if node_init.input(idx) != node:
|
|
continue
|
|
|
|
if node_init == siblings_input:
|
|
copy_input.setInput(idx, node)
|
|
elif node in siblings:
|
|
node_copy.setInput(idx, copies[node.name()])
|
|
else:
|
|
node_copy.setInput(idx, last_output)
|
|
|
|
siblings_input.setInput(0, copy_output)
|
|
|
|
def move_to_placeholder_group(self, nodes_loaded):
|
|
"""
|
|
opening the placeholder's group and copying loaded nodes in it.
|
|
|
|
Returns :
|
|
nodes_loaded (list): the new list of pasted nodes
|
|
"""
|
|
|
|
groups_name = self.data["group_name"]
|
|
reset_selection()
|
|
select_nodes(nodes_loaded)
|
|
if groups_name:
|
|
with node_tempfile() as filepath:
|
|
nuke.nodeCopy(filepath)
|
|
for node in nuke.selectedNodes():
|
|
nuke.delete(node)
|
|
group = nuke.toNode(groups_name)
|
|
group.begin()
|
|
nuke.nodePaste(filepath)
|
|
nodes_loaded = nuke.selectedNodes()
|
|
return nodes_loaded
|
|
|
|
def clean(self):
|
|
# deselect all selected nodes
|
|
placeholder_node = self.data["node"]
|
|
|
|
# getting the latest nodes added
|
|
nodes_init = self.data["nodes_init"]
|
|
nodes_loaded = list(set(nuke.allNodes()) - set(nodes_init))
|
|
self.log.debug("Loaded nodes: {}".format(nodes_loaded))
|
|
if not nodes_loaded:
|
|
return
|
|
|
|
self.data["delete"] = True
|
|
|
|
nodes_loaded = self.move_to_placeholder_group(nodes_loaded)
|
|
self.data["last_loaded"] = nodes_loaded
|
|
refresh_nodes(nodes_loaded)
|
|
|
|
# positioning of the loaded nodes
|
|
min_x, min_y, _, _ = get_extreme_positions(nodes_loaded)
|
|
for node in nodes_loaded:
|
|
xpos = (node.xpos() - min_x) + placeholder_node.xpos()
|
|
ypos = (node.ypos() - min_y) + placeholder_node.ypos()
|
|
node.setXYpos(xpos, ypos)
|
|
refresh_nodes(nodes_loaded)
|
|
|
|
self.fix_z_order() # fix the problem of z_order for backdrops
|
|
self.imprint_siblings()
|
|
|
|
if self.data["nb_children"] == 0:
|
|
# save initial nodes postions and dimensions, update them
|
|
# and set inputs and outputs of loaded nodes
|
|
|
|
self.imprint_inits()
|
|
self.update_nodes(nuke.allNodes(), nodes_loaded)
|
|
self.set_loaded_connections()
|
|
|
|
elif self.data["siblings"]:
|
|
# create copies of placeholder siblings for the new loaded nodes,
|
|
# set their inputs and outpus and update all nodes positions and
|
|
# dimensions and siblings names
|
|
|
|
siblings = get_nodes_by_names(self.data["siblings"])
|
|
refresh_nodes(siblings)
|
|
copies = self.create_sib_copies()
|
|
new_nodes = list(copies.values()) # copies nodes
|
|
self.update_nodes(new_nodes, nodes_loaded)
|
|
placeholder_node.removeKnob(placeholder_node.knob("siblings"))
|
|
new_nodes_name = get_names_from_nodes(new_nodes)
|
|
imprint(placeholder_node, {"siblings": new_nodes_name})
|
|
self.set_copies_connections(copies)
|
|
|
|
self.update_nodes(
|
|
nuke.allNodes(),
|
|
new_nodes + nodes_loaded,
|
|
20
|
|
)
|
|
|
|
new_siblings = get_names_from_nodes(new_nodes)
|
|
self.data["siblings"] = new_siblings
|
|
|
|
else:
|
|
# if the placeholder doesn't have siblings, the loaded
|
|
# nodes will be placed in a free space
|
|
|
|
xpointer, ypointer = find_free_space_to_paste_nodes(
|
|
nodes_loaded, direction="bottom", offset=200
|
|
)
|
|
node = nuke.createNode("NoOp")
|
|
reset_selection()
|
|
nuke.delete(node)
|
|
for node in nodes_loaded:
|
|
xpos = (node.xpos() - min_x) + xpointer
|
|
ypos = (node.ypos() - min_y) + ypointer
|
|
node.setXYpos(xpos, ypos)
|
|
|
|
self.data["nb_children"] += 1
|
|
reset_selection()
|
|
# go back to root group
|
|
nuke.root().begin()
|
|
|
|
def get_representations(self, current_asset_doc, linked_asset_docs):
|
|
project_name = legacy_io.active_project()
|
|
|
|
builder_type = self.data["builder_type"]
|
|
if builder_type == "context_asset":
|
|
context_filters = {
|
|
"asset": [re.compile(self.data["asset"])],
|
|
"subset": [re.compile(self.data["subset"])],
|
|
"hierarchy": [re.compile(self.data["hierarchy"])],
|
|
"representations": [self.data["representation"]],
|
|
"family": [self.data["family"]]
|
|
}
|
|
|
|
elif builder_type != "linked_asset":
|
|
context_filters = {
|
|
"asset": [
|
|
current_asset_doc["name"],
|
|
re.compile(self.data["asset"])
|
|
],
|
|
"subset": [re.compile(self.data["subset"])],
|
|
"hierarchy": [re.compile(self.data["hierarchy"])],
|
|
"representation": [self.data["representation"]],
|
|
"family": [self.data["family"]]
|
|
}
|
|
|
|
else:
|
|
asset_regex = re.compile(self.data["asset"])
|
|
linked_asset_names = []
|
|
for asset_doc in linked_asset_docs:
|
|
asset_name = asset_doc["name"]
|
|
if asset_regex.match(asset_name):
|
|
linked_asset_names.append(asset_name)
|
|
|
|
if not linked_asset_names:
|
|
return []
|
|
|
|
context_filters = {
|
|
"asset": linked_asset_names,
|
|
"subset": [re.compile(self.data["subset"])],
|
|
"hierarchy": [re.compile(self.data["hierarchy"])],
|
|
"representation": [self.data["representation"]],
|
|
"family": [self.data["family"]],
|
|
}
|
|
|
|
return list(get_representations(
|
|
project_name,
|
|
context_filters=context_filters
|
|
))
|
|
|
|
def err_message(self):
|
|
return (
|
|
"Error while trying to load a representation.\n"
|
|
"Either the subset wasn't published or the template is malformed."
|
|
"\n\n"
|
|
"Builder was looking for:\n{attributes}".format(
|
|
attributes="\n".join([
|
|
"{}: {}".format(key.title(), value)
|
|
for key, value in self.data.items()]
|
|
)
|
|
)
|
|
)
|