diff --git a/openpype/hosts/tvpaint/api/lib.py b/openpype/hosts/tvpaint/api/lib.py index 8e63c9e5b2..e41bbb5aad 100644 --- a/openpype/hosts/tvpaint/api/lib.py +++ b/openpype/hosts/tvpaint/api/lib.py @@ -1,66 +1,538 @@ -from PIL import Image +import os +import logging +import tempfile import avalon.io -from avalon.tvpaint.lib import execute_george + +from . import CommunicationWrapper +from .pipeline import ( + list_instances, + write_instances +) + +log = logging.getLogger(__name__) -def set_context_settings(asset_doc=None): - """Set workfile settings by asset document data. +def execute_george(george_script, communicator=None): + if not communicator: + communicator = CommunicationWrapper.communicator + return communicator.execute_george(george_script) - Change fps, resolution and frame start/end. + +def execute_george_through_file(george_script, communicator=None): + """Execute george script with temp file. + + Allows to execute multiline george script without stopping websocket + client. + + On windows make sure script does not contain paths with backwards + slashes in paths, TVPaint won't execute properly in that case. + + Args: + george_script (str): George script to execute. May be multilined. """ - if asset_doc is None: - # Use current session asset if not passed - asset_doc = avalon.io.find_one({ - "type": "asset", - "name": avalon.io.Session["AVALON_ASSET"] - }) + if not communicator: + communicator = CommunicationWrapper.communicator - project_doc = avalon.io.find_one({"type": "project"}) + return communicator.execute_george_through_file(george_script) - framerate = asset_doc["data"].get("fps") - if framerate is None: - framerate = project_doc["data"].get("fps") - if framerate is not None: - execute_george( - "tv_framerate {} \"timestretch\"".format(framerate) - ) +def parse_layers_data(data): + """Parse layers data loaded in 'get_layers_data'.""" + layers = [] + layers_raw = data.split("\n") + for layer_raw in layers_raw: + layer_raw = layer_raw.strip() + if not layer_raw: + continue + ( + layer_id, group_id, visible, position, opacity, name, + layer_type, + frame_start, frame_end, prelighttable, postlighttable, + selected, editable, sencil_state + ) = layer_raw.split("|") + layer = { + "layer_id": int(layer_id), + "group_id": int(group_id), + "visible": visible == "ON", + "position": int(position), + "opacity": int(opacity), + "name": name, + "type": layer_type, + "frame_start": int(frame_start), + "frame_end": int(frame_end), + "prelighttable": prelighttable == "1", + "postlighttable": postlighttable == "1", + "selected": selected == "1", + "editable": editable == "1", + "sencil_state": sencil_state + } + layers.append(layer) + return layers + + +def get_layers_data_george_script(output_filepath, layer_ids=None): + """Prepare george script which will collect all layers from workfile.""" + output_filepath = output_filepath.replace("\\", "/") + george_script_lines = [ + # Variable containing full path to output file + "output_path = \"{}\"".format(output_filepath), + # Get Current Layer ID + "tv_LayerCurrentID", + "current_layer_id = result" + ] + # Script part for getting and storing layer information to temp + layer_data_getter = ( + # Get information about layer's group + "tv_layercolor \"get\" layer_id", + "group_id = result", + "tv_LayerInfo layer_id", + ( + "PARSE result visible position opacity name" + " type startFrame endFrame prelighttable postlighttable" + " selected editable sencilState" + ), + # Check if layer ID match `tv_LayerCurrentID` + "IF CMP(current_layer_id, layer_id)==1", + # - mark layer as selected if layer id match to current layer id + "selected=1", + "END", + # Prepare line with data separated by "|" + ( + "line = layer_id'|'group_id'|'visible'|'position'|'opacity'|'" + "name'|'type'|'startFrame'|'endFrame'|'prelighttable'|'" + "postlighttable'|'selected'|'editable'|'sencilState" + ), + # Write data to output file + "tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' line", + ) + + # Collect data for all layers if layers are not specified + if layer_ids is None: + george_script_lines.extend(( + # Layer loop variables + "loop = 1", + "idx = 0", + # Layers loop + "WHILE loop", + "tv_LayerGetID idx", + "layer_id = result", + "idx = idx + 1", + # Stop loop if layer_id is "NONE" + "IF CMP(layer_id, \"NONE\")==1", + "loop = 0", + "ELSE", + *layer_data_getter, + "END", + "END" + )) else: - print("Framerate was not found!") + for layer_id in layer_ids: + george_script_lines.append("layer_id = {}".format(layer_id)) + george_script_lines.extend(layer_data_getter) - width_key = "resolutionWidth" - height_key = "resolutionHeight" + return "\n".join(george_script_lines) - width = asset_doc["data"].get(width_key) - height = asset_doc["data"].get(height_key) - if width is None or height is None: - width = project_doc["data"].get(width_key) - height = project_doc["data"].get(height_key) - if width is None or height is None: - print("Resolution was not found!") - else: - execute_george("tv_resizepage {} {} 0".format(width, height)) +def layers_data(layer_ids=None, communicator=None): + """Backwards compatible function of 'get_layers_data'.""" + return get_layers_data(layer_ids, communicator) - frame_start = asset_doc["data"].get("frameStart") - frame_end = asset_doc["data"].get("frameEnd") - if frame_start is None or frame_end is None: - print("Frame range was not found!") - return +def get_layers_data(layer_ids=None, communicator=None): + """Collect all layers information from currently opened workfile.""" + output_file = tempfile.NamedTemporaryFile( + mode="w", prefix="a_tvp_", suffix=".txt", delete=False + ) + output_file.close() + if layer_ids is not None and isinstance(layer_ids, int): + layer_ids = [layer_ids] - handles = asset_doc["data"].get("handles") or 0 - handle_start = asset_doc["data"].get("handleStart") - handle_end = asset_doc["data"].get("handleEnd") + output_filepath = output_file.name - if handle_start is None or handle_end is None: - handle_start = handles - handle_end = handles + george_script = get_layers_data_george_script(output_filepath, layer_ids) - # Always start from 0 Mark In and set only Mark Out - mark_in = 0 - mark_out = mark_in + (frame_end - frame_start) + handle_start + handle_end + execute_george_through_file(george_script, communicator) - execute_george("tv_markin {} set".format(mark_in)) - execute_george("tv_markout {} set".format(mark_out)) + with open(output_filepath, "r") as stream: + data = stream.read() + + output = parse_layers_data(data) + os.remove(output_filepath) + return output + + +def parse_group_data(data): + """Paser group data collected in 'get_groups_data'.""" + output = [] + groups_raw = data.split("\n") + for group_raw in groups_raw: + group_raw = group_raw.strip() + if not group_raw: + continue + + parts = group_raw.split(" ") + # Check for length and concatenate 2 last items until length match + # - this happens if name contain spaces + while len(parts) > 6: + last_item = parts.pop(-1) + parts[-1] = " ".join([parts[-1], last_item]) + clip_id, group_id, red, green, blue, name = parts + + group = { + "group_id": int(group_id), + "name": name, + "clip_id": int(clip_id), + "red": int(red), + "green": int(green), + "blue": int(blue), + } + output.append(group) + return output + + +def groups_data(communicator=None): + """Backwards compatible function of 'get_groups_data'.""" + return get_groups_data(communicator) + + +def get_groups_data(communicator=None): + """Information about groups from current workfile.""" + output_file = tempfile.NamedTemporaryFile( + mode="w", prefix="a_tvp_", suffix=".txt", delete=False + ) + output_file.close() + + output_filepath = output_file.name.replace("\\", "/") + george_script_lines = ( + # Variable containing full path to output file + "output_path = \"{}\"".format(output_filepath), + "loop = 1", + "FOR idx = 1 TO 12", + "tv_layercolor \"getcolor\" 0 idx", + "tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' result", + "END" + ) + george_script = "\n".join(george_script_lines) + execute_george_through_file(george_script, communicator) + + with open(output_filepath, "r") as stream: + data = stream.read() + + output = parse_group_data(data) + os.remove(output_filepath) + return output + + +def get_layers_pre_post_behavior(layer_ids, communicator=None): + """Collect data about pre and post behavior of layer ids. + + Pre and Post behaviors is enumerator of possible values: + - "none" + - "repeat" / "loop" + - "pingpong" + - "hold" + + Example output: + ```json + { + 0: { + "pre": "none", + "post": "loop" + } + } + ``` + + Returns: + dict: Key is layer id value is dictionary with "pre" and "post" keys. + """ + # Skip if is empty + if not layer_ids: + return {} + + # Auto convert to list + if not isinstance(layer_ids, (list, set, tuple)): + layer_ids = [layer_ids] + + # Prepare temp file + output_file = tempfile.NamedTemporaryFile( + mode="w", prefix="a_tvp_", suffix=".txt", delete=False + ) + output_file.close() + + output_filepath = output_file.name.replace("\\", "/") + george_script_lines = [ + # Variable containing full path to output file + "output_path = \"{}\"".format(output_filepath), + ] + for layer_id in layer_ids: + george_script_lines.extend([ + "layer_id = {}".format(layer_id), + "tv_layerprebehavior layer_id", + "pre_beh = result", + "tv_layerpostbehavior layer_id", + "post_beh = result", + "line = layer_id'|'pre_beh'|'post_beh", + "tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' line" + ]) + + george_script = "\n".join(george_script_lines) + execute_george_through_file(george_script, communicator) + + # Read data + with open(output_filepath, "r") as stream: + data = stream.read() + + # Remove temp file + os.remove(output_filepath) + + # Parse data + output = {} + raw_lines = data.split("\n") + for raw_line in raw_lines: + line = raw_line.strip() + if not line: + continue + parts = line.split("|") + if len(parts) != 3: + continue + layer_id, pre_beh, post_beh = parts + output[int(layer_id)] = { + "pre": pre_beh.lower(), + "post": post_beh.lower() + } + return output + + +def get_layers_exposure_frames(layer_ids, layers_data=None, communicator=None): + """Get exposure frames. + + Easily said returns frames where keyframes are. Recognized with george + function `tv_exposureinfo` returning "Head". + + Args: + layer_ids (list): Ids of a layers for which exposure frames should + look for. + layers_data (list): Precollected layers data. If are not passed then + 'get_layers_data' is used. + communicator (BaseCommunicator): Communicator used for communication + with TVPaint. + + Returns: + dict: Frames where exposure is set to "Head" by layer id. + """ + + if layers_data is None: + layers_data = get_layers_data(layer_ids) + _layers_by_id = { + layer["layer_id"]: layer + for layer in layers_data + } + layers_by_id = { + layer_id: _layers_by_id.get(layer_id) + for layer_id in layer_ids + } + tmp_file = tempfile.NamedTemporaryFile( + mode="w", prefix="a_tvp_", suffix=".txt", delete=False + ) + tmp_file.close() + tmp_output_path = tmp_file.name.replace("\\", "/") + george_script_lines = [ + "output_path = \"{}\"".format(tmp_output_path) + ] + + output = {} + layer_id_mapping = {} + for layer_id, layer_data in layers_by_id.items(): + layer_id_mapping[str(layer_id)] = layer_id + output[layer_id] = [] + if not layer_data: + continue + first_frame = layer_data["frame_start"] + last_frame = layer_data["frame_end"] + george_script_lines.extend([ + "line = \"\"", + "layer_id = {}".format(layer_id), + "line = line''layer_id", + "tv_layerset layer_id", + "frame = {}".format(first_frame), + "WHILE (frame <= {})".format(last_frame), + "tv_exposureinfo frame", + "exposure = result", + "IF (CMP(exposure, \"Head\") == 1)", + "line = line'|'frame", + "END", + "frame = frame + 1", + "END", + "tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' line" + ]) + + execute_george_through_file("\n".join(george_script_lines), communicator) + + with open(tmp_output_path, "r") as stream: + data = stream.read() + + os.remove(tmp_output_path) + + lines = [] + for line in data.split("\n"): + line = line.strip() + if line: + lines.append(line) + + for line in lines: + line_items = list(line.split("|")) + layer_id = line_items.pop(0) + _layer_id = layer_id_mapping[layer_id] + output[_layer_id] = [int(frame) for frame in line_items] + + return output + + +def get_exposure_frames( + layer_id, first_frame=None, last_frame=None, communicator=None +): + """Get exposure frames. + + Easily said returns frames where keyframes are. Recognized with george + function `tv_exposureinfo` returning "Head". + + Args: + layer_id (int): Id of a layer for which exposure frames should + look for. + first_frame (int): From which frame will look for exposure frames. + Used layers first frame if not entered. + last_frame (int): Last frame where will look for exposure frames. + Used layers last frame if not entered. + + Returns: + list: Frames where exposure is set to "Head". + """ + if first_frame is None or last_frame is None: + layer = layers_data(layer_id)[0] + if first_frame is None: + first_frame = layer["frame_start"] + if last_frame is None: + last_frame = layer["frame_end"] + + tmp_file = tempfile.NamedTemporaryFile( + mode="w", prefix="a_tvp_", suffix=".txt", delete=False + ) + tmp_file.close() + tmp_output_path = tmp_file.name.replace("\\", "/") + george_script_lines = [ + "tv_layerset {}".format(layer_id), + "output_path = \"{}\"".format(tmp_output_path), + "output = \"\"", + "frame = {}".format(first_frame), + "WHILE (frame <= {})".format(last_frame), + "tv_exposureinfo frame", + "exposure = result", + "IF (CMP(exposure, \"Head\") == 1)", + "IF (CMP(output, \"\") == 1)", + "output = output''frame", + "ELSE", + "output = output'|'frame", + "END", + "END", + "frame = frame + 1", + "END", + "tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' output" + ] + + execute_george_through_file("\n".join(george_script_lines), communicator) + + with open(tmp_output_path, "r") as stream: + data = stream.read() + + os.remove(tmp_output_path) + + lines = [] + for line in data.split("\n"): + line = line.strip() + if line: + lines.append(line) + + exposure_frames = [] + for line in lines: + for frame in line.split("|"): + exposure_frames.append(int(frame)) + return exposure_frames + + +def get_scene_data(communicator=None): + """Scene data of currently opened scene. + + Result contains resolution, pixel aspect, fps mark in/out with states, + frame start and background color. + + Returns: + dict: Scene data collected in many ways. + """ + workfile_info = execute_george("tv_projectinfo", communicator) + workfile_info_parts = workfile_info.split(" ") + + # Project frame start - not used + workfile_info_parts.pop(-1) + field_order = workfile_info_parts.pop(-1) + frame_rate = float(workfile_info_parts.pop(-1)) + pixel_apsect = float(workfile_info_parts.pop(-1)) + height = int(workfile_info_parts.pop(-1)) + width = int(workfile_info_parts.pop(-1)) + + # Marks return as "{frame - 1} {state} ", example "0 set". + result = execute_george("tv_markin", communicator) + mark_in_frame, mark_in_state, _ = result.split(" ") + + result = execute_george("tv_markout", communicator) + mark_out_frame, mark_out_state, _ = result.split(" ") + + start_frame = execute_george("tv_startframe", communicator) + return { + "width": width, + "height": height, + "pixel_aspect": pixel_apsect, + "fps": frame_rate, + "field_order": field_order, + "mark_in": int(mark_in_frame), + "mark_in_state": mark_in_state, + "mark_in_set": mark_in_state == "set", + "mark_out": int(mark_out_frame), + "mark_out_state": mark_out_state, + "mark_out_set": mark_out_state == "set", + "start_frame": int(start_frame), + "bg_color": get_scene_bg_color(communicator) + } + + +def get_scene_bg_color(communicator=None): + """Background color set on scene. + + Is important for review exporting where scene bg color is used as + background. + """ + output_file = tempfile.NamedTemporaryFile( + mode="w", prefix="a_tvp_", suffix=".txt", delete=False + ) + output_file.close() + output_filepath = output_file.name.replace("\\", "/") + george_script_lines = [ + # Variable containing full path to output file + "output_path = \"{}\"".format(output_filepath), + "tv_background", + "bg_color = result", + # Write data to output file + "tv_writetextfile \"strict\" \"append\" '\"'output_path'\"' bg_color" + ] + + george_script = "\n".join(george_script_lines) + execute_george_through_file(george_script, communicator) + + with open(output_filepath, "r") as stream: + data = stream.read() + + os.remove(output_filepath) + data = data.strip() + if not data: + return None + return data.split(" ")