mirror of
https://github.com/ynput/ayon-core.git
synced 2026-01-02 00:44:52 +01:00
copied lib functions
This commit is contained in:
parent
7f34b6ee43
commit
b745f86cf3
1 changed files with 519 additions and 47 deletions
|
|
@ -1,66 +1,538 @@
|
|||
from PIL import Image
|
||||
import os
|
||||
import logging
|
||||
import tempfile
|
||||
|
||||
import avalon.io
|
||||
from avalon.tvpaint.lib import execute_george
|
||||
|
||||
from . import CommunicationWrapper
|
||||
from .pipeline import (
|
||||
list_instances,
|
||||
write_instances
|
||||
)
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def set_context_settings(asset_doc=None):
|
||||
"""Set workfile settings by asset document data.
|
||||
def execute_george(george_script, communicator=None):
|
||||
if not communicator:
|
||||
communicator = CommunicationWrapper.communicator
|
||||
return communicator.execute_george(george_script)
|
||||
|
||||
Change fps, resolution and frame start/end.
|
||||
|
||||
def execute_george_through_file(george_script, communicator=None):
|
||||
"""Execute george script with temp file.
|
||||
|
||||
Allows to execute multiline george script without stopping websocket
|
||||
client.
|
||||
|
||||
On windows make sure script does not contain paths with backwards
|
||||
slashes in paths, TVPaint won't execute properly in that case.
|
||||
|
||||
Args:
|
||||
george_script (str): George script to execute. May be multilined.
|
||||
"""
|
||||
if asset_doc is None:
|
||||
# Use current session asset if not passed
|
||||
asset_doc = avalon.io.find_one({
|
||||
"type": "asset",
|
||||
"name": avalon.io.Session["AVALON_ASSET"]
|
||||
})
|
||||
if not communicator:
|
||||
communicator = CommunicationWrapper.communicator
|
||||
|
||||
project_doc = avalon.io.find_one({"type": "project"})
|
||||
return communicator.execute_george_through_file(george_script)
|
||||
|
||||
framerate = asset_doc["data"].get("fps")
|
||||
if framerate is None:
|
||||
framerate = project_doc["data"].get("fps")
|
||||
|
||||
if framerate is not None:
|
||||
execute_george(
|
||||
"tv_framerate {} \"timestretch\"".format(framerate)
|
||||
)
|
||||
def parse_layers_data(data):
|
||||
"""Parse layers data loaded in 'get_layers_data'."""
|
||||
layers = []
|
||||
layers_raw = data.split("\n")
|
||||
for layer_raw in layers_raw:
|
||||
layer_raw = layer_raw.strip()
|
||||
if not layer_raw:
|
||||
continue
|
||||
(
|
||||
layer_id, group_id, visible, position, opacity, name,
|
||||
layer_type,
|
||||
frame_start, frame_end, prelighttable, postlighttable,
|
||||
selected, editable, sencil_state
|
||||
) = layer_raw.split("|")
|
||||
layer = {
|
||||
"layer_id": int(layer_id),
|
||||
"group_id": int(group_id),
|
||||
"visible": visible == "ON",
|
||||
"position": int(position),
|
||||
"opacity": int(opacity),
|
||||
"name": name,
|
||||
"type": layer_type,
|
||||
"frame_start": int(frame_start),
|
||||
"frame_end": int(frame_end),
|
||||
"prelighttable": prelighttable == "1",
|
||||
"postlighttable": postlighttable == "1",
|
||||
"selected": selected == "1",
|
||||
"editable": editable == "1",
|
||||
"sencil_state": sencil_state
|
||||
}
|
||||
layers.append(layer)
|
||||
return layers
|
||||
|
||||
|
||||
def get_layers_data_george_script(output_filepath, layer_ids=None):
|
||||
"""Prepare george script which will collect all layers from workfile."""
|
||||
output_filepath = output_filepath.replace("\\", "/")
|
||||
george_script_lines = [
|
||||
# Variable containing full path to output file
|
||||
"output_path = \"{}\"".format(output_filepath),
|
||||
# Get Current Layer ID
|
||||
"tv_LayerCurrentID",
|
||||
"current_layer_id = result"
|
||||
]
|
||||
# Script part for getting and storing layer information to temp
|
||||
layer_data_getter = (
|
||||
# Get information about layer's group
|
||||
"tv_layercolor \"get\" layer_id",
|
||||
"group_id = result",
|
||||
"tv_LayerInfo layer_id",
|
||||
(
|
||||
"PARSE result visible position opacity name"
|
||||
" type startFrame endFrame prelighttable postlighttable"
|
||||
" selected editable sencilState"
|
||||
),
|
||||
# Check if layer ID match `tv_LayerCurrentID`
|
||||
"IF CMP(current_layer_id, layer_id)==1",
|
||||
# - mark layer as selected if layer id match to current layer id
|
||||
"selected=1",
|
||||
"END",
|
||||
# Prepare line with data separated by "|"
|
||||
(
|
||||
"line = layer_id'|'group_id'|'visible'|'position'|'opacity'|'"
|
||||
"name'|'type'|'startFrame'|'endFrame'|'prelighttable'|'"
|
||||
"postlighttable'|'selected'|'editable'|'sencilState"
|
||||
),
|
||||
# Write data to output file
|
||||
"tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' line",
|
||||
)
|
||||
|
||||
# Collect data for all layers if layers are not specified
|
||||
if layer_ids is None:
|
||||
george_script_lines.extend((
|
||||
# Layer loop variables
|
||||
"loop = 1",
|
||||
"idx = 0",
|
||||
# Layers loop
|
||||
"WHILE loop",
|
||||
"tv_LayerGetID idx",
|
||||
"layer_id = result",
|
||||
"idx = idx + 1",
|
||||
# Stop loop if layer_id is "NONE"
|
||||
"IF CMP(layer_id, \"NONE\")==1",
|
||||
"loop = 0",
|
||||
"ELSE",
|
||||
*layer_data_getter,
|
||||
"END",
|
||||
"END"
|
||||
))
|
||||
else:
|
||||
print("Framerate was not found!")
|
||||
for layer_id in layer_ids:
|
||||
george_script_lines.append("layer_id = {}".format(layer_id))
|
||||
george_script_lines.extend(layer_data_getter)
|
||||
|
||||
width_key = "resolutionWidth"
|
||||
height_key = "resolutionHeight"
|
||||
return "\n".join(george_script_lines)
|
||||
|
||||
width = asset_doc["data"].get(width_key)
|
||||
height = asset_doc["data"].get(height_key)
|
||||
if width is None or height is None:
|
||||
width = project_doc["data"].get(width_key)
|
||||
height = project_doc["data"].get(height_key)
|
||||
|
||||
if width is None or height is None:
|
||||
print("Resolution was not found!")
|
||||
else:
|
||||
execute_george("tv_resizepage {} {} 0".format(width, height))
|
||||
def layers_data(layer_ids=None, communicator=None):
|
||||
"""Backwards compatible function of 'get_layers_data'."""
|
||||
return get_layers_data(layer_ids, communicator)
|
||||
|
||||
frame_start = asset_doc["data"].get("frameStart")
|
||||
frame_end = asset_doc["data"].get("frameEnd")
|
||||
|
||||
if frame_start is None or frame_end is None:
|
||||
print("Frame range was not found!")
|
||||
return
|
||||
def get_layers_data(layer_ids=None, communicator=None):
|
||||
"""Collect all layers information from currently opened workfile."""
|
||||
output_file = tempfile.NamedTemporaryFile(
|
||||
mode="w", prefix="a_tvp_", suffix=".txt", delete=False
|
||||
)
|
||||
output_file.close()
|
||||
if layer_ids is not None and isinstance(layer_ids, int):
|
||||
layer_ids = [layer_ids]
|
||||
|
||||
handles = asset_doc["data"].get("handles") or 0
|
||||
handle_start = asset_doc["data"].get("handleStart")
|
||||
handle_end = asset_doc["data"].get("handleEnd")
|
||||
output_filepath = output_file.name
|
||||
|
||||
if handle_start is None or handle_end is None:
|
||||
handle_start = handles
|
||||
handle_end = handles
|
||||
george_script = get_layers_data_george_script(output_filepath, layer_ids)
|
||||
|
||||
# Always start from 0 Mark In and set only Mark Out
|
||||
mark_in = 0
|
||||
mark_out = mark_in + (frame_end - frame_start) + handle_start + handle_end
|
||||
execute_george_through_file(george_script, communicator)
|
||||
|
||||
execute_george("tv_markin {} set".format(mark_in))
|
||||
execute_george("tv_markout {} set".format(mark_out))
|
||||
with open(output_filepath, "r") as stream:
|
||||
data = stream.read()
|
||||
|
||||
output = parse_layers_data(data)
|
||||
os.remove(output_filepath)
|
||||
return output
|
||||
|
||||
|
||||
def parse_group_data(data):
|
||||
"""Paser group data collected in 'get_groups_data'."""
|
||||
output = []
|
||||
groups_raw = data.split("\n")
|
||||
for group_raw in groups_raw:
|
||||
group_raw = group_raw.strip()
|
||||
if not group_raw:
|
||||
continue
|
||||
|
||||
parts = group_raw.split(" ")
|
||||
# Check for length and concatenate 2 last items until length match
|
||||
# - this happens if name contain spaces
|
||||
while len(parts) > 6:
|
||||
last_item = parts.pop(-1)
|
||||
parts[-1] = " ".join([parts[-1], last_item])
|
||||
clip_id, group_id, red, green, blue, name = parts
|
||||
|
||||
group = {
|
||||
"group_id": int(group_id),
|
||||
"name": name,
|
||||
"clip_id": int(clip_id),
|
||||
"red": int(red),
|
||||
"green": int(green),
|
||||
"blue": int(blue),
|
||||
}
|
||||
output.append(group)
|
||||
return output
|
||||
|
||||
|
||||
def groups_data(communicator=None):
|
||||
"""Backwards compatible function of 'get_groups_data'."""
|
||||
return get_groups_data(communicator)
|
||||
|
||||
|
||||
def get_groups_data(communicator=None):
|
||||
"""Information about groups from current workfile."""
|
||||
output_file = tempfile.NamedTemporaryFile(
|
||||
mode="w", prefix="a_tvp_", suffix=".txt", delete=False
|
||||
)
|
||||
output_file.close()
|
||||
|
||||
output_filepath = output_file.name.replace("\\", "/")
|
||||
george_script_lines = (
|
||||
# Variable containing full path to output file
|
||||
"output_path = \"{}\"".format(output_filepath),
|
||||
"loop = 1",
|
||||
"FOR idx = 1 TO 12",
|
||||
"tv_layercolor \"getcolor\" 0 idx",
|
||||
"tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' result",
|
||||
"END"
|
||||
)
|
||||
george_script = "\n".join(george_script_lines)
|
||||
execute_george_through_file(george_script, communicator)
|
||||
|
||||
with open(output_filepath, "r") as stream:
|
||||
data = stream.read()
|
||||
|
||||
output = parse_group_data(data)
|
||||
os.remove(output_filepath)
|
||||
return output
|
||||
|
||||
|
||||
def get_layers_pre_post_behavior(layer_ids, communicator=None):
|
||||
"""Collect data about pre and post behavior of layer ids.
|
||||
|
||||
Pre and Post behaviors is enumerator of possible values:
|
||||
- "none"
|
||||
- "repeat" / "loop"
|
||||
- "pingpong"
|
||||
- "hold"
|
||||
|
||||
Example output:
|
||||
```json
|
||||
{
|
||||
0: {
|
||||
"pre": "none",
|
||||
"post": "loop"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Returns:
|
||||
dict: Key is layer id value is dictionary with "pre" and "post" keys.
|
||||
"""
|
||||
# Skip if is empty
|
||||
if not layer_ids:
|
||||
return {}
|
||||
|
||||
# Auto convert to list
|
||||
if not isinstance(layer_ids, (list, set, tuple)):
|
||||
layer_ids = [layer_ids]
|
||||
|
||||
# Prepare temp file
|
||||
output_file = tempfile.NamedTemporaryFile(
|
||||
mode="w", prefix="a_tvp_", suffix=".txt", delete=False
|
||||
)
|
||||
output_file.close()
|
||||
|
||||
output_filepath = output_file.name.replace("\\", "/")
|
||||
george_script_lines = [
|
||||
# Variable containing full path to output file
|
||||
"output_path = \"{}\"".format(output_filepath),
|
||||
]
|
||||
for layer_id in layer_ids:
|
||||
george_script_lines.extend([
|
||||
"layer_id = {}".format(layer_id),
|
||||
"tv_layerprebehavior layer_id",
|
||||
"pre_beh = result",
|
||||
"tv_layerpostbehavior layer_id",
|
||||
"post_beh = result",
|
||||
"line = layer_id'|'pre_beh'|'post_beh",
|
||||
"tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' line"
|
||||
])
|
||||
|
||||
george_script = "\n".join(george_script_lines)
|
||||
execute_george_through_file(george_script, communicator)
|
||||
|
||||
# Read data
|
||||
with open(output_filepath, "r") as stream:
|
||||
data = stream.read()
|
||||
|
||||
# Remove temp file
|
||||
os.remove(output_filepath)
|
||||
|
||||
# Parse data
|
||||
output = {}
|
||||
raw_lines = data.split("\n")
|
||||
for raw_line in raw_lines:
|
||||
line = raw_line.strip()
|
||||
if not line:
|
||||
continue
|
||||
parts = line.split("|")
|
||||
if len(parts) != 3:
|
||||
continue
|
||||
layer_id, pre_beh, post_beh = parts
|
||||
output[int(layer_id)] = {
|
||||
"pre": pre_beh.lower(),
|
||||
"post": post_beh.lower()
|
||||
}
|
||||
return output
|
||||
|
||||
|
||||
def get_layers_exposure_frames(layer_ids, layers_data=None, communicator=None):
|
||||
"""Get exposure frames.
|
||||
|
||||
Easily said returns frames where keyframes are. Recognized with george
|
||||
function `tv_exposureinfo` returning "Head".
|
||||
|
||||
Args:
|
||||
layer_ids (list): Ids of a layers for which exposure frames should
|
||||
look for.
|
||||
layers_data (list): Precollected layers data. If are not passed then
|
||||
'get_layers_data' is used.
|
||||
communicator (BaseCommunicator): Communicator used for communication
|
||||
with TVPaint.
|
||||
|
||||
Returns:
|
||||
dict: Frames where exposure is set to "Head" by layer id.
|
||||
"""
|
||||
|
||||
if layers_data is None:
|
||||
layers_data = get_layers_data(layer_ids)
|
||||
_layers_by_id = {
|
||||
layer["layer_id"]: layer
|
||||
for layer in layers_data
|
||||
}
|
||||
layers_by_id = {
|
||||
layer_id: _layers_by_id.get(layer_id)
|
||||
for layer_id in layer_ids
|
||||
}
|
||||
tmp_file = tempfile.NamedTemporaryFile(
|
||||
mode="w", prefix="a_tvp_", suffix=".txt", delete=False
|
||||
)
|
||||
tmp_file.close()
|
||||
tmp_output_path = tmp_file.name.replace("\\", "/")
|
||||
george_script_lines = [
|
||||
"output_path = \"{}\"".format(tmp_output_path)
|
||||
]
|
||||
|
||||
output = {}
|
||||
layer_id_mapping = {}
|
||||
for layer_id, layer_data in layers_by_id.items():
|
||||
layer_id_mapping[str(layer_id)] = layer_id
|
||||
output[layer_id] = []
|
||||
if not layer_data:
|
||||
continue
|
||||
first_frame = layer_data["frame_start"]
|
||||
last_frame = layer_data["frame_end"]
|
||||
george_script_lines.extend([
|
||||
"line = \"\"",
|
||||
"layer_id = {}".format(layer_id),
|
||||
"line = line''layer_id",
|
||||
"tv_layerset layer_id",
|
||||
"frame = {}".format(first_frame),
|
||||
"WHILE (frame <= {})".format(last_frame),
|
||||
"tv_exposureinfo frame",
|
||||
"exposure = result",
|
||||
"IF (CMP(exposure, \"Head\") == 1)",
|
||||
"line = line'|'frame",
|
||||
"END",
|
||||
"frame = frame + 1",
|
||||
"END",
|
||||
"tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' line"
|
||||
])
|
||||
|
||||
execute_george_through_file("\n".join(george_script_lines), communicator)
|
||||
|
||||
with open(tmp_output_path, "r") as stream:
|
||||
data = stream.read()
|
||||
|
||||
os.remove(tmp_output_path)
|
||||
|
||||
lines = []
|
||||
for line in data.split("\n"):
|
||||
line = line.strip()
|
||||
if line:
|
||||
lines.append(line)
|
||||
|
||||
for line in lines:
|
||||
line_items = list(line.split("|"))
|
||||
layer_id = line_items.pop(0)
|
||||
_layer_id = layer_id_mapping[layer_id]
|
||||
output[_layer_id] = [int(frame) for frame in line_items]
|
||||
|
||||
return output
|
||||
|
||||
|
||||
def get_exposure_frames(
|
||||
layer_id, first_frame=None, last_frame=None, communicator=None
|
||||
):
|
||||
"""Get exposure frames.
|
||||
|
||||
Easily said returns frames where keyframes are. Recognized with george
|
||||
function `tv_exposureinfo` returning "Head".
|
||||
|
||||
Args:
|
||||
layer_id (int): Id of a layer for which exposure frames should
|
||||
look for.
|
||||
first_frame (int): From which frame will look for exposure frames.
|
||||
Used layers first frame if not entered.
|
||||
last_frame (int): Last frame where will look for exposure frames.
|
||||
Used layers last frame if not entered.
|
||||
|
||||
Returns:
|
||||
list: Frames where exposure is set to "Head".
|
||||
"""
|
||||
if first_frame is None or last_frame is None:
|
||||
layer = layers_data(layer_id)[0]
|
||||
if first_frame is None:
|
||||
first_frame = layer["frame_start"]
|
||||
if last_frame is None:
|
||||
last_frame = layer["frame_end"]
|
||||
|
||||
tmp_file = tempfile.NamedTemporaryFile(
|
||||
mode="w", prefix="a_tvp_", suffix=".txt", delete=False
|
||||
)
|
||||
tmp_file.close()
|
||||
tmp_output_path = tmp_file.name.replace("\\", "/")
|
||||
george_script_lines = [
|
||||
"tv_layerset {}".format(layer_id),
|
||||
"output_path = \"{}\"".format(tmp_output_path),
|
||||
"output = \"\"",
|
||||
"frame = {}".format(first_frame),
|
||||
"WHILE (frame <= {})".format(last_frame),
|
||||
"tv_exposureinfo frame",
|
||||
"exposure = result",
|
||||
"IF (CMP(exposure, \"Head\") == 1)",
|
||||
"IF (CMP(output, \"\") == 1)",
|
||||
"output = output''frame",
|
||||
"ELSE",
|
||||
"output = output'|'frame",
|
||||
"END",
|
||||
"END",
|
||||
"frame = frame + 1",
|
||||
"END",
|
||||
"tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' output"
|
||||
]
|
||||
|
||||
execute_george_through_file("\n".join(george_script_lines), communicator)
|
||||
|
||||
with open(tmp_output_path, "r") as stream:
|
||||
data = stream.read()
|
||||
|
||||
os.remove(tmp_output_path)
|
||||
|
||||
lines = []
|
||||
for line in data.split("\n"):
|
||||
line = line.strip()
|
||||
if line:
|
||||
lines.append(line)
|
||||
|
||||
exposure_frames = []
|
||||
for line in lines:
|
||||
for frame in line.split("|"):
|
||||
exposure_frames.append(int(frame))
|
||||
return exposure_frames
|
||||
|
||||
|
||||
def get_scene_data(communicator=None):
|
||||
"""Scene data of currently opened scene.
|
||||
|
||||
Result contains resolution, pixel aspect, fps mark in/out with states,
|
||||
frame start and background color.
|
||||
|
||||
Returns:
|
||||
dict: Scene data collected in many ways.
|
||||
"""
|
||||
workfile_info = execute_george("tv_projectinfo", communicator)
|
||||
workfile_info_parts = workfile_info.split(" ")
|
||||
|
||||
# Project frame start - not used
|
||||
workfile_info_parts.pop(-1)
|
||||
field_order = workfile_info_parts.pop(-1)
|
||||
frame_rate = float(workfile_info_parts.pop(-1))
|
||||
pixel_apsect = float(workfile_info_parts.pop(-1))
|
||||
height = int(workfile_info_parts.pop(-1))
|
||||
width = int(workfile_info_parts.pop(-1))
|
||||
|
||||
# Marks return as "{frame - 1} {state} ", example "0 set".
|
||||
result = execute_george("tv_markin", communicator)
|
||||
mark_in_frame, mark_in_state, _ = result.split(" ")
|
||||
|
||||
result = execute_george("tv_markout", communicator)
|
||||
mark_out_frame, mark_out_state, _ = result.split(" ")
|
||||
|
||||
start_frame = execute_george("tv_startframe", communicator)
|
||||
return {
|
||||
"width": width,
|
||||
"height": height,
|
||||
"pixel_aspect": pixel_apsect,
|
||||
"fps": frame_rate,
|
||||
"field_order": field_order,
|
||||
"mark_in": int(mark_in_frame),
|
||||
"mark_in_state": mark_in_state,
|
||||
"mark_in_set": mark_in_state == "set",
|
||||
"mark_out": int(mark_out_frame),
|
||||
"mark_out_state": mark_out_state,
|
||||
"mark_out_set": mark_out_state == "set",
|
||||
"start_frame": int(start_frame),
|
||||
"bg_color": get_scene_bg_color(communicator)
|
||||
}
|
||||
|
||||
|
||||
def get_scene_bg_color(communicator=None):
|
||||
"""Background color set on scene.
|
||||
|
||||
Is important for review exporting where scene bg color is used as
|
||||
background.
|
||||
"""
|
||||
output_file = tempfile.NamedTemporaryFile(
|
||||
mode="w", prefix="a_tvp_", suffix=".txt", delete=False
|
||||
)
|
||||
output_file.close()
|
||||
output_filepath = output_file.name.replace("\\", "/")
|
||||
george_script_lines = [
|
||||
# Variable containing full path to output file
|
||||
"output_path = \"{}\"".format(output_filepath),
|
||||
"tv_background",
|
||||
"bg_color = result",
|
||||
# Write data to output file
|
||||
"tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' bg_color"
|
||||
]
|
||||
|
||||
george_script = "\n".join(george_script_lines)
|
||||
execute_george_through_file(george_script, communicator)
|
||||
|
||||
with open(output_filepath, "r") as stream:
|
||||
data = stream.read()
|
||||
|
||||
os.remove(output_filepath)
|
||||
data = data.strip()
|
||||
if not data:
|
||||
return None
|
||||
return data.split(" ")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue