diff --git a/pype/hosts/resolve/lib_hiero.py b/pype/hosts/resolve/lib_hiero.py deleted file mode 100644 index 891ca3905c..0000000000 --- a/pype/hosts/resolve/lib_hiero.py +++ /dev/null @@ -1,838 +0,0 @@ -""" -Host specific functions where host api is connected -""" -import os -import re -import sys -import ast -import hiero -import avalon.api as avalon -import avalon.io -from avalon.vendor.Qt import QtWidgets -from pype.api import (Logger, Anatomy, config) -from . import tags -import shutil -from compiler.ast import flatten - -try: - from PySide.QtCore import QFile, QTextStream - from PySide.QtXml import QDomDocument -except ImportError: - from PySide2.QtCore import QFile, QTextStream - from PySide2.QtXml import QDomDocument - -# from opentimelineio import opentime -# from pprint import pformat - -log = Logger().get_logger(__name__, "hiero") - -self = sys.modules[__name__] -self._has_been_setup = False -self._has_menu = False -self._registered_gui = None -self.pype_tag_name = "Pype Data" -self.default_sequence_name = "PypeSequence" -self.default_bin_name = "PypeBin" - -AVALON_CONFIG = os.getenv("AVALON_CONFIG", "pype") - - -def get_current_project(remove_untitled=False): - projects = flatten(hiero.core.projects()) - if not remove_untitled: - return next(iter(projects)) - - # if remove_untitled - for proj in projects: - if "Untitled" in proj.name(): - proj.close() - else: - return proj - - -def get_current_sequence(name=None, new=False): - """ - Get current sequence in context of active project. - - Args: - name (str)[optional]: name of sequence we want to return - new (bool)[optional]: if we want to create new one - - Returns: - hiero.core.Sequence: the sequence object - """ - sequence = None - project = get_current_project() - root_bin = project.clipsBin() - - if new: - # create new - name = name or self.default_sequence_name - sequence = hiero.core.Sequence(name) - root_bin.addItem(hiero.core.BinItem(sequence)) - elif name: - # look for sequence by name - sequences = project.sequences() - for _sequence in sequences: - if _sequence.name() == name: - sequence = _sequence - if not sequence: - # if nothing found create new with input name - sequence = get_current_sequence(name, True) - elif not name and not new: - # if name is none and new is False then return current open sequence - sequence = hiero.ui.activeSequence() - - return sequence - - -def get_current_track(sequence, name, audio=False): - """ - Get current track in context of active project. - - Creates new if none is found. - - Args: - sequence (hiero.core.Sequence): hiero sequene object - name (str): name of track we want to return - audio (bool)[optional]: switch to AudioTrack - - Returns: - hiero.core.Track: the track object - """ - tracks = sequence.videoTracks() - - if audio: - tracks = sequence.audioTracks() - - # get track by name - track = None - for _track in tracks: - if _track.name() in name: - track = _track - - if not track: - if not audio: - track = hiero.core.VideoTrack(name) - else: - track = hiero.core.AudioTrack(name) - sequence.addTrack(track) - - return track - - -def get_track_items( - selected=False, - sequence_name=None, - track_item_name=None, - track_name=None, - track_type=None, - check_enabled=True, - check_locked=True, - check_tagged=False): - """Get all available current timeline track items. - - Attribute: - selected (bool)[optional]: return only selected items on timeline - sequence_name (str)[optional]: return only clips from input sequence - track_item_name (str)[optional]: return only item with input name - track_name (str)[optional]: return only items from track name - track_type (str)[optional]: return only items of given type - (`audio` or `video`) default is `video` - check_enabled (bool)[optional]: ignore disabled if True - check_locked (bool)[optional]: ignore locked if True - - Return: - list or hiero.core.TrackItem: list of track items or single track item - """ - return_list = list() - track_items = list() - - # get selected track items or all in active sequence - if selected: - selected_items = list(hiero.selection) - for item in selected_items: - if track_name and track_name in item.parent().name(): - # filter only items fitting input track name - track_items.append(item) - elif not track_name: - # or add all if no track_name was defined - track_items.append(item) - else: - sequence = get_current_sequence(name=sequence_name) - # get all available tracks from sequence - tracks = list(sequence.audioTracks()) + list(sequence.videoTracks()) - # loop all tracks - for track in tracks: - if check_locked and track.isLocked(): - continue - if check_enabled and not track.isEnabled(): - continue - # and all items in track - for item in track.items(): - if check_tagged and not item.tags(): - continue - - # check if track item is enabled - if check_enabled: - if not item.isEnabled(): - continue - if track_item_name: - if item.name() in track_item_name: - return item - # make sure only track items with correct track names are added - if track_name and track_name in track.name(): - # filter out only defined track_name items - track_items.append(item) - elif not track_name: - # or add all if no track_name is defined - track_items.append(item) - - # filter out only track items with defined track_type - for track_item in track_items: - if track_type and track_type == "video" and isinstance( - track_item.parent(), hiero.core.VideoTrack): - # only video track items are allowed - return_list.append(track_item) - elif track_type and track_type == "audio" and isinstance( - track_item.parent(), hiero.core.AudioTrack): - # only audio track items are allowed - return_list.append(track_item) - elif not track_type: - # add all if no track_type is defined - return_list.append(track_item) - - return return_list - - -def get_track_item_pype_tag(track_item): - """ - Get pype track item tag created by creator or loader plugin. - - Attributes: - trackItem (hiero.core.TrackItem): hiero object - - Returns: - hiero.core.Tag: hierarchy, orig clip attributes - """ - # get all tags from track item - _tags = track_item.tags() - if not _tags: - return None - for tag in _tags: - # return only correct tag defined by global name - if tag.name() in self.pype_tag_name: - return tag - - -def set_track_item_pype_tag(track_item, data=None): - """ - Set pype track item tag to input track_item. - - Attributes: - trackItem (hiero.core.TrackItem): hiero object - - Returns: - hiero.core.Tag - """ - data = data or dict() - - # basic Tag's attribute - tag_data = { - "editable": "0", - "note": "Pype data holder", - "icon": "pype_icon.png", - "metadata": {k: v for k, v in data.items()} - } - # get available pype tag if any - _tag = get_track_item_pype_tag(track_item) - - if _tag: - # it not tag then create one - tag = tags.update_tag(_tag, tag_data) - else: - # if pype tag available then update with input data - tag = tags.create_tag(self.pype_tag_name, tag_data) - # add it to the input track item - track_item.addTag(tag) - - return tag - - -def get_track_item_pype_data(track_item): - """ - Get track item's pype tag data. - - Attributes: - trackItem (hiero.core.TrackItem): hiero object - - Returns: - dict: data found on pype tag - """ - data = dict() - # get pype data tag from track item - tag = get_track_item_pype_tag(track_item) - - if not tag: - return None - - # get tag metadata attribut - tag_data = tag.metadata() - # convert tag metadata to normal keys names and values to correct types - for k, v in dict(tag_data).items(): - key = k.replace("tag.", "") - - try: - # capture exceptions which are related to strings only - value = ast.literal_eval(v) - except (ValueError, SyntaxError): - value = v - - data.update({key: value}) - - return data - - -def imprint(track_item, data=None): - """ - Adding `Avalon data` into a hiero track item tag. - - Also including publish attribute into tag. - - Arguments: - track_item (hiero.core.TrackItem): hiero track item object - data (dict): Any data which needst to be imprinted - - Examples: - data = { - 'asset': 'sq020sh0280', - 'family': 'render', - 'subset': 'subsetMain' - } - """ - data = data or {} - - tag = set_track_item_pype_tag(track_item, data) - - # add publish attribute - set_publish_attribute(tag, True) - - -def set_publish_attribute(tag, value): - """ Set Publish attribute in input Tag object - - Attribute: - tag (hiero.core.Tag): a tag object - value (bool): True or False - """ - tag_data = tag.metadata() - # set data to the publish attribute - tag_data.setValue("tag.publish", str(value)) - - -def get_publish_attribute(tag): - """ Get Publish attribute from input Tag object - - Attribute: - tag (hiero.core.Tag): a tag object - value (bool): True or False - """ - tag_data = tag.metadata() - # get data to the publish attribute - value = tag_data.value("tag.publish") - # return value converted to bool value. Atring is stored in tag. - return ast.literal_eval(value) - - -def sync_avalon_data_to_workfile(): - # import session to get project dir - project_name = avalon.Session["AVALON_PROJECT"] - - anatomy = Anatomy(project_name) - work_template = anatomy.templates["work"]["path"] - work_root = anatomy.root_value_for_template(work_template) - active_project_root = ( - os.path.join(work_root, project_name) - ).replace("\\", "/") - # getting project - project = get_current_project() - - if "Tag Presets" in project.name(): - return - - log.debug("Synchronizing Pype metadata to project: {}".format( - project.name())) - - # set project root with backward compatibility - try: - project.setProjectDirectory(active_project_root) - except Exception: - # old way of seting it - project.setProjectRoot(active_project_root) - - # get project data from avalon db - project_doc = avalon.io.find_one({"type": "project"}) - project_data = project_doc["data"] - - log.debug("project_data: {}".format(project_data)) - - # get format and fps property from avalon db on project - width = project_data["resolutionWidth"] - height = project_data["resolutionHeight"] - pixel_aspect = project_data["pixelAspect"] - fps = project_data['fps'] - format_name = project_data['code'] - - # create new format in hiero project - format = hiero.core.Format(width, height, pixel_aspect, format_name) - project.setOutputFormat(format) - - # set fps to hiero project - project.setFramerate(fps) - - # TODO: add auto colorspace set from project drop - log.info("Project property has been synchronised with Avalon db") - - -def launch_workfiles_app(event): - """ - Event for launching workfiles after hiero start - - Args: - event (obj): required but unused - """ - from . import launch_workfiles_app - launch_workfiles_app() - - -def setup(console=False, port=None, menu=True): - """Setup integration - - Registers Pyblish for Hiero plug-ins and appends an item to the File-menu - - Arguments: - console (bool): Display console with GUI - port (int, optional): Port from which to start looking for an - available port to connect with Pyblish QML, default - provided by Pyblish Integration. - menu (bool, optional): Display file menu in Hiero. - """ - - if self._has_been_setup: - teardown() - - add_submission() - - if menu: - add_to_filemenu() - self._has_menu = True - - self._has_been_setup = True - log.debug("pyblish: Loaded successfully.") - - -def teardown(): - """Remove integration""" - if not self._has_been_setup: - return - - if self._has_menu: - remove_from_filemenu() - self._has_menu = False - - self._has_been_setup = False - log.debug("pyblish: Integration torn down successfully") - - -def remove_from_filemenu(): - raise NotImplementedError("Implement me please.") - - -def add_to_filemenu(): - PublishAction() - - -class PyblishSubmission(hiero.exporters.FnSubmission.Submission): - - def __init__(self): - hiero.exporters.FnSubmission.Submission.__init__(self) - - def addToQueue(self): - from . import publish - # Add submission to Hiero module for retrieval in plugins. - hiero.submission = self - publish() - - -def add_submission(): - registry = hiero.core.taskRegistry - registry.addSubmission("Pyblish", PyblishSubmission) - - -class PublishAction(QtWidgets.QAction): - """ - Action with is showing as menu item - """ - - def __init__(self): - QtWidgets.QAction.__init__(self, "Publish", None) - self.triggered.connect(self.publish) - - for interest in ["kShowContextMenu/kTimeline", - "kShowContextMenukBin", - "kShowContextMenu/kSpreadsheet"]: - hiero.core.events.registerInterest(interest, self.eventHandler) - - self.setShortcut("Ctrl+Alt+P") - - def publish(self): - from . import publish - # Removing "submission" attribute from hiero module, to prevent tasks - # from getting picked up when not using the "Export" dialog. - if hasattr(hiero, "submission"): - del hiero.submission - publish() - - def eventHandler(self, event): - # Add the Menu to the right-click menu - event.menu.addAction(self) - - -# def CreateNukeWorkfile(nodes=None, -# nodes_effects=None, -# to_timeline=False, -# **kwargs): -# ''' Creating nuke workfile with particular version with given nodes -# Also it is creating timeline track items as precomps. -# -# Arguments: -# nodes(list of dict): each key in dict is knob order is important -# to_timeline(type): will build trackItem with metadata -# -# Returns: -# bool: True if done -# -# Raises: -# Exception: with traceback -# -# ''' -# import hiero.core -# from avalon.nuke import imprint -# from pype.hosts.nuke import ( -# lib as nklib -# ) -# -# # check if the file exists if does then Raise "File exists!" -# if os.path.exists(filepath): -# raise FileExistsError("File already exists: `{}`".format(filepath)) -# -# # if no representations matching then -# # Raise "no representations to be build" -# if len(representations) == 0: -# raise AttributeError("Missing list of `representations`") -# -# # check nodes input -# if len(nodes) == 0: -# log.warning("Missing list of `nodes`") -# -# # create temp nk file -# nuke_script = hiero.core.nuke.ScriptWriter() -# -# # create root node and save all metadata -# root_node = hiero.core.nuke.RootNode() -# -# anatomy = Anatomy(os.environ["AVALON_PROJECT"]) -# work_template = anatomy.templates["work"]["path"] -# root_path = anatomy.root_value_for_template(work_template) -# -# nuke_script.addNode(root_node) -# -# # here to call pype.hosts.nuke.lib.BuildWorkfile -# script_builder = nklib.BuildWorkfile( -# root_node=root_node, -# root_path=root_path, -# nodes=nuke_script.getNodes(), -# **kwargs -# ) - - -def create_nuke_workfile_clips(nuke_workfiles, seq=None): - ''' - nuke_workfiles is list of dictionaries like: - [{ - 'path': 'P:/Jakub_testy_pipeline/test_v01.nk', - 'name': 'test', - 'handleStart': 15, # added asymetrically to handles - 'handleEnd': 10, # added asymetrically to handles - "clipIn": 16, - "frameStart": 991, - "frameEnd": 1023, - 'task': 'Comp-tracking', - 'work_dir': 'VFX_PR', - 'shot': '00010' - }] - ''' - - proj = hiero.core.projects()[-1] - root = proj.clipsBin() - - if not seq: - seq = hiero.core.Sequence('NewSequences') - root.addItem(hiero.core.BinItem(seq)) - # todo will ned to define this better - # track = seq[1] # lazy example to get a destination# track - clips_lst = [] - for nk in nuke_workfiles: - task_path = '/'.join([nk['work_dir'], nk['shot'], nk['task']]) - bin = create_bin(task_path, proj) - - if nk['task'] not in seq.videoTracks(): - track = hiero.core.VideoTrack(nk['task']) - seq.addTrack(track) - else: - track = seq.tracks(nk['task']) - - # create clip media - media = hiero.core.MediaSource(nk['path']) - media_in = int(media.startTime() or 0) - media_duration = int(media.duration() or 0) - - handle_start = nk.get("handleStart") - handle_end = nk.get("handleEnd") - - if media_in: - source_in = media_in + handle_start - else: - source_in = nk["frameStart"] + handle_start - - if media_duration: - source_out = (media_in + media_duration - 1) - handle_end - else: - source_out = nk["frameEnd"] - handle_end - - source = hiero.core.Clip(media) - - name = os.path.basename(os.path.splitext(nk['path'])[0]) - split_name = split_by_client_version(name)[0] or name - - # add to bin as clip item - items_in_bin = [b.name() for b in bin.items()] - if split_name not in items_in_bin: - binItem = hiero.core.BinItem(source) - bin.addItem(binItem) - - new_source = [ - item for item in bin.items() if split_name in item.name() - ][0].items()[0].item() - - # add to track as clip item - trackItem = hiero.core.TrackItem( - split_name, hiero.core.TrackItem.kVideo) - trackItem.setSource(new_source) - trackItem.setSourceIn(source_in) - trackItem.setSourceOut(source_out) - trackItem.setTimelineIn(nk["clipIn"]) - trackItem.setTimelineOut(nk["clipIn"] + (source_out - source_in)) - track.addTrackItem(trackItem) - clips_lst.append(trackItem) - - return clips_lst - - -def create_bin(path=None, project=None): - ''' - Create bin in project. - If the path is "bin1/bin2/bin3" it will create whole depth - and return `bin3` - - ''' - # get the first loaded project - project = project or get_current_project() - - path = path or self.default_bin_name - - path = path.replace("\\", "/").split("/") - - root_bin = project.clipsBin() - - done_bin_lst = [] - for i, b in enumerate(path): - if i == 0 and len(path) > 1: - if b in [bin.name() for bin in root_bin.bins()]: - bin = [bin for bin in root_bin.bins() if b in bin.name()][0] - done_bin_lst.append(bin) - else: - create_bin = hiero.core.Bin(b) - root_bin.addItem(create_bin) - done_bin_lst.append(create_bin) - - elif i >= 1 and i < len(path) - 1: - if b in [bin.name() for bin in done_bin_lst[i - 1].bins()]: - bin = [ - bin for bin in done_bin_lst[i - 1].bins() - if b in bin.name() - ][0] - done_bin_lst.append(bin) - else: - create_bin = hiero.core.Bin(b) - done_bin_lst[i - 1].addItem(create_bin) - done_bin_lst.append(create_bin) - - elif i == len(path) - 1: - if b in [bin.name() for bin in done_bin_lst[i - 1].bins()]: - bin = [ - bin for bin in done_bin_lst[i - 1].bins() - if b in bin.name() - ][0] - done_bin_lst.append(bin) - else: - create_bin = hiero.core.Bin(b) - done_bin_lst[i - 1].addItem(create_bin) - done_bin_lst.append(create_bin) - - return done_bin_lst[-1] - - -def split_by_client_version(string): - regex = r"[/_.]v\d+" - try: - matches = re.findall(regex, string, re.IGNORECASE) - return string.split(matches[0]) - except Exception as error: - log.error(error) - return None - - -def get_selected_track_items(sequence=None): - _sequence = sequence or get_current_sequence() - - # Getting selection - timeline_editor = hiero.ui.getTimelineEditor(_sequence) - return timeline_editor.selection() - - -def set_selected_track_items(track_items_list, sequence=None): - _sequence = sequence or get_current_sequence() - - # Getting selection - timeline_editor = hiero.ui.getTimelineEditor(_sequence) - return timeline_editor.setSelection(track_items_list) - - -def _read_doc_from_path(path): - # reading QDomDocument from HROX path - hrox_file = QFile(path) - if not hrox_file.open(QFile.ReadOnly): - raise RuntimeError("Failed to open file for reading") - doc = QDomDocument() - doc.setContent(hrox_file) - hrox_file.close() - return doc - - -def _write_doc_to_path(doc, path): - # write QDomDocument to path as HROX - hrox_file = QFile(path) - if not hrox_file.open(QFile.WriteOnly): - raise RuntimeError("Failed to open file for writing") - stream = QTextStream(hrox_file) - doc.save(stream, 1) - hrox_file.close() - - -def _set_hrox_project_knobs(doc, **knobs): - # set attributes to Project Tag - proj_elem = doc.documentElement().firstChildElement("Project") - for k, v in knobs.items(): - proj_elem.setAttribute(k, v) - - -def apply_colorspace_project(): - # get path the the active projects - project = get_current_project(remove_untitled=True) - current_file = project.path() - - # close the active project - project.close() - - # get presets for hiero - presets = config.get_init_presets() - colorspace = presets["colorspace"] - hiero_project_clrs = colorspace.get("hiero", {}).get("project", {}) - - # save the workfile as subversion "comment:_colorspaceChange" - split_current_file = os.path.splitext(current_file) - copy_current_file = current_file - - if "_colorspaceChange" not in current_file: - copy_current_file = ( - split_current_file[0] - + "_colorspaceChange" - + split_current_file[1] - ) - - try: - # duplicate the file so the changes are applied only to the copy - shutil.copyfile(current_file, copy_current_file) - except shutil.Error: - # in case the file already exists and it want to copy to the - # same filewe need to do this trick - # TEMP file name change - copy_current_file_tmp = copy_current_file + "_tmp" - # create TEMP file - shutil.copyfile(current_file, copy_current_file_tmp) - # remove original file - os.remove(current_file) - # copy TEMP back to original name - shutil.copyfile(copy_current_file_tmp, copy_current_file) - # remove the TEMP file as we dont need it - os.remove(copy_current_file_tmp) - - # use the code from bellow for changing xml hrox Attributes - hiero_project_clrs.update({"name": os.path.basename(copy_current_file)}) - - # read HROX in as QDomSocument - doc = _read_doc_from_path(copy_current_file) - - # apply project colorspace properties - _set_hrox_project_knobs(doc, **hiero_project_clrs) - - # write QDomSocument back as HROX - _write_doc_to_path(doc, copy_current_file) - - # open the file as current project - hiero.core.openProject(copy_current_file) - - -def apply_colorspace_clips(): - project = get_current_project(remove_untitled=True) - clips = project.clips() - - # get presets for hiero - presets = config.get_init_presets() - colorspace = presets["colorspace"] - hiero_clips_clrs = colorspace.get("hiero", {}).get("clips", {}) - - for clip in clips: - clip_media_source_path = clip.mediaSource().firstpath() - clip_name = clip.name() - clip_colorspace = clip.sourceMediaColourTransform() - - if "default" in clip_colorspace: - continue - - # check if any colorspace presets for read is mathing - preset_clrsp = next((hiero_clips_clrs[k] - for k in hiero_clips_clrs - if bool(re.search(k, clip_media_source_path))), - None) - - if preset_clrsp: - log.debug("Changing clip.path: {}".format(clip_media_source_path)) - log.info("Changing clip `{}` colorspace {} to {}".format( - clip_name, clip_colorspace, preset_clrsp)) - # set the found preset to the clip - clip.setSourceMediaColourTransform(preset_clrsp) - - # save project after all is changed - project.save() diff --git a/pype/hosts/resolve/pipeline_hiero.py b/pype/hosts/resolve/pipeline_hiero.py deleted file mode 100644 index 73025e790f..0000000000 --- a/pype/hosts/resolve/pipeline_hiero.py +++ /dev/null @@ -1,302 +0,0 @@ -""" -Basic avalon integration -""" -import os -import contextlib -from collections import OrderedDict -from avalon.tools import ( - workfiles, - publish as _publish -) -from avalon.pipeline import AVALON_CONTAINER_ID -from avalon import api as avalon -from avalon import schema -from pyblish import api as pyblish -import pype -from pype.api import Logger - -from . import lib, menu, events - -log = Logger().get_logger(__name__, "hiero") - -AVALON_CONFIG = os.getenv("AVALON_CONFIG", "pype") - -# plugin paths -LOAD_PATH = os.path.join(pype.PLUGINS_DIR, "hiero", "load") -CREATE_PATH = os.path.join(pype.PLUGINS_DIR, "hiero", "create") -INVENTORY_PATH = os.path.join(pype.PLUGINS_DIR, "hiero", "inventory") - -PUBLISH_PATH = os.path.join( - pype.PLUGINS_DIR, "hiero", "publish" -).replace("\\", "/") - -AVALON_CONTAINERS = ":AVALON_CONTAINERS" - - -def install(): - """ - Installing Hiero integration for avalon - - Args: - config (obj): avalon config module `pype` in our case, it is not - used but required by avalon.api.install() - - """ - - # adding all events - events.register_events() - - log.info("Registering Hiero plug-ins..") - pyblish.register_host("hiero") - pyblish.register_plugin_path(PUBLISH_PATH) - avalon.register_plugin_path(avalon.Loader, LOAD_PATH) - avalon.register_plugin_path(avalon.Creator, CREATE_PATH) - avalon.register_plugin_path(avalon.InventoryAction, INVENTORY_PATH) - - # register callback for switching publishable - pyblish.register_callback("instanceToggled", on_pyblish_instance_toggled) - - # Disable all families except for the ones we explicitly want to see - family_states = [ - "write", - "review", - "plate" - ] - - avalon.data["familiesStateDefault"] = False - avalon.data["familiesStateToggled"] = family_states - - # install menu - menu.menu_install() - - # register hiero events - events.register_hiero_events() - - -def uninstall(): - """ - Uninstalling Hiero integration for avalon - - """ - log.info("Deregistering Hiero plug-ins..") - pyblish.deregister_host("hiero") - pyblish.deregister_plugin_path(PUBLISH_PATH) - avalon.deregister_plugin_path(avalon.Loader, LOAD_PATH) - avalon.deregister_plugin_path(avalon.Creator, CREATE_PATH) - - # register callback for switching publishable - pyblish.deregister_callback("instanceToggled", on_pyblish_instance_toggled) - - -def containerise(track_item, - name, - namespace, - context, - loader=None, - data=None): - """Bundle Hiero's object into an assembly and imprint it with metadata - - Containerisation enables a tracking of version, author and origin - for loaded assets. - - Arguments: - track_item (hiero.core.TrackItem): object to imprint as container - name (str): Name of resulting assembly - namespace (str): Namespace under which to host container - context (dict): Asset information - loader (str, optional): Name of node used to produce this container. - - Returns: - track_item (hiero.core.TrackItem): containerised object - - """ - - data_imprint = OrderedDict({ - "schema": "avalon-core:container-2.0", - "id": AVALON_CONTAINER_ID, - "name": str(name), - "namespace": str(namespace), - "loader": str(loader), - "representation": str(context["representation"]["_id"]), - }) - - if data: - for k, v in data.items(): - data_imprint.update({k: v}) - - log.debug("_ data_imprint: {}".format(data_imprint)) - lib.set_track_item_pype_tag(track_item, data_imprint) - - return track_item - - -def ls(): - """List available containers. - - This function is used by the Container Manager in Nuke. You'll - need to implement a for-loop that then *yields* one Container at - a time. - - See the `container.json` schema for details on how it should look, - and the Maya equivalent, which is in `avalon.maya.pipeline` - """ - - # get all track items from current timeline - all_track_items = lib.get_track_items() - - for track_item in all_track_items: - container = parse_container(track_item) - if container: - yield container - - -def parse_container(track_item, validate=True): - """Return container data from track_item's pype tag. - - Args: - track_item (hiero.core.TrackItem): A containerised track item. - validate (bool)[optional]: validating with avalon scheme - - Returns: - dict: The container schema data for input containerized track item. - - """ - # convert tag metadata to normal keys names - data = lib.get_track_item_pype_data(track_item) - - if validate and data and data.get("schema"): - schema.validate(data) - - if not isinstance(data, dict): - return - - # If not all required data return the empty container - required = ['schema', 'id', 'name', - 'namespace', 'loader', 'representation'] - - if not all(key in data for key in required): - return - - container = {key: data[key] for key in required} - - container["objectName"] = track_item.name() - - # Store reference to the node object - container["_track_item"] = track_item - - return container - - -def update_container(track_item, data=None): - """Update container data to input track_item's pype tag. - - Args: - track_item (hiero.core.TrackItem): A containerised track item. - data (dict)[optional]: dictionery with data to be updated - - Returns: - bool: True if container was updated correctly - - """ - data = data or dict() - - container = lib.get_track_item_pype_data(track_item) - - for _key, _value in container.items(): - try: - container[_key] = data[_key] - except KeyError: - pass - - log.info("Updating container: `{}`".format(track_item.name())) - return bool(lib.set_track_item_pype_tag(track_item, container)) - - -def launch_workfiles_app(*args): - ''' Wrapping function for workfiles launcher ''' - - workdir = os.environ["AVALON_WORKDIR"] - - # show workfile gui - workfiles.show(workdir) - - -def publish(parent): - """Shorthand to publish from within host""" - return _publish.show(parent) - - -@contextlib.contextmanager -def maintained_selection(): - """Maintain selection during context - - Example: - >>> with maintained_selection(): - ... for track_item in track_items: - ... < do some stuff > - """ - from .lib import ( - set_selected_track_items, - get_selected_track_items - ) - previous_selection = get_selected_track_items() - reset_selection() - try: - # do the operation - yield - finally: - reset_selection() - set_selected_track_items(previous_selection) - - -def reset_selection(): - """Deselect all selected nodes - """ - from .lib import set_selected_track_items - set_selected_track_items([]) - - -def reload_config(): - """Attempt to reload pipeline at run-time. - - CAUTION: This is primarily for development and debugging purposes. - - """ - import importlib - - for module in ( - "avalon", - "avalon.lib", - "avalon.pipeline", - "pyblish", - "pypeapp", - "{}.api".format(AVALON_CONFIG), - "{}.hosts.hiero.lib".format(AVALON_CONFIG), - "{}.hosts.hiero.menu".format(AVALON_CONFIG), - "{}.hosts.hiero.tags".format(AVALON_CONFIG) - ): - log.info("Reloading module: {}...".format(module)) - try: - module = importlib.import_module(module) - import imp - imp.reload(module) - except Exception as e: - log.warning("Cannot reload module: {}".format(e)) - importlib.reload(module) - - -def on_pyblish_instance_toggled(instance, old_value, new_value): - """Toggle node passthrough states on instance toggles.""" - - log.info("instance toggle: {}, old_value: {}, new_value:{} ".format( - instance, old_value, new_value)) - - from pype.hosts.hiero import ( - get_track_item_pype_tag, - set_publish_attribute - ) - - # Whether instances should be passthrough based on new value - track_item = instance.data["item"] - tag = get_track_item_pype_tag(track_item) - set_publish_attribute(tag, new_value) diff --git a/pype/hosts/resolve/rendering.py b/pype/hosts/resolve/rendering.py deleted file mode 100644 index e38466e5d4..0000000000 --- a/pype/hosts/resolve/rendering.py +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/env python - -""" -Example DaVinci Resolve script: -Load a still from DRX file, apply the still to all clips in all timelines. Set render format and codec, add render jobs for all timelines, render to specified path and wait for rendering completion. -Once render is complete, delete all jobs -""" - -from python_get_resolve import GetResolve -import sys -import time - -def AddTimelineToRender( project, timeline, presetName, targetDirectory, renderFormat, renderCodec ): - project.SetCurrentTimeline(timeline) - project.LoadRenderPreset(presetName) - - if not project.SetCurrentRenderFormatAndCodec(renderFormat, renderCodec): - return False - - project.SetRenderSettings({"SelectAllFrames" : 1, "TargetDir" : targetDirectory}) - return project.AddRenderJob() - -def RenderAllTimelines( resolve, presetName, targetDirectory, renderFormat, renderCodec ): - projectManager = resolve.GetProjectManager() - project = projectManager.GetCurrentProject() - if not project: - return False - - resolve.OpenPage("Deliver") - timelineCount = project.GetTimelineCount() - - for index in range (0, int(timelineCount)): - if not AddTimelineToRender(project, project.GetTimelineByIndex(index + 1), presetName, targetDirectory, renderFormat, renderCodec): - return False - return project.StartRendering() - -def IsRenderingInProgress( resolve ): - projectManager = resolve.GetProjectManager() - project = projectManager.GetCurrentProject() - if not project: - return False - - return project.IsRenderingInProgress() - -def WaitForRenderingCompletion( resolve ): - while IsRenderingInProgress(resolve): - time.sleep(1) - return - -def ApplyDRXToAllTimelineClips( timeline, path, gradeMode = 0 ): - trackCount = timeline.GetTrackCount("video") - - clips = {} - for index in range (1, int(trackCount) + 1): - clips.update( timeline.GetItemsInTrack("video", index) ) - return timeline.ApplyGradeFromDRX(path, int(gradeMode), clips) - -def ApplyDRXToAllTimelines( resolve, path, gradeMode = 0 ): - projectManager = resolve.GetProjectManager() - project = projectManager.GetCurrentProject() - if not project: - return False - timelineCount = project.GetTimelineCount() - - for index in range (0, int(timelineCount)): - timeline = project.GetTimelineByIndex(index + 1) - project.SetCurrentTimeline( timeline ) - if not ApplyDRXToAllTimelineClips(timeline, path, gradeMode): - return False - return True - -def DeleteAllRenderJobs( resolve ): - projectManager = resolve.GetProjectManager() - project = projectManager.GetCurrentProject() - project.DeleteAllRenderJobs() - return - -# Inputs: -# - DRX file to import grade still and apply it for clips -# - grade mode (0, 1 or 2) -# - preset name for rendering -# - render path -# - render format -# - render codec -if len(sys.argv) < 7: - print("input parameters for scripts are [drx file path] [grade mode] [render preset name] [render path] [render format] [render codec]") - sys.exit() - -drxPath = sys.argv[1] -gradeMode = sys.argv[2] -renderPresetName = sys.argv[3] -renderPath = sys.argv[4] -renderFormat = sys.argv[5] -renderCodec = sys.argv[6] - -# Get currently open project -resolve = GetResolve() - -if not ApplyDRXToAllTimelines(resolve, drxPath, gradeMode): - print("Unable to apply a still from drx file to all timelines") - sys.exit() - -if not RenderAllTimelines(resolve, renderPresetName, renderPath, renderFormat, renderCodec): - print("Unable to set all timelines for rendering") - sys.exit() - -WaitForRenderingCompletion(resolve) - -DeleteAllRenderJobs(resolve) - -print("Rendering is completed.") diff --git a/pype/hosts/resolve/todo-rendering.py b/pype/hosts/resolve/todo-rendering.py new file mode 100644 index 0000000000..cff9eebead --- /dev/null +++ b/pype/hosts/resolve/todo-rendering.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +# TODO: convert this script to be usable with PYPE +""" +Example DaVinci Resolve script: +Load a still from DRX file, apply the still to all clips in all timelines. +Set render format and codec, add render jobs for all timelines, render +to specified path and wait for rendering completion. +Once render is complete, delete all jobs + +clonned from: https://github.com/survos/transcribe/blob/fe3cf51eb95b82dabcf21fbe5f89bfb3d8bb6ce2/python/3_grade_and_render_all_timelines.py +""" + +from python_get_resolve import GetResolve +import sys +import time + + +def AddTimelineToRender(project, timeline, presetName, + targetDirectory, renderFormat, renderCodec): + project.SetCurrentTimeline(timeline) + project.LoadRenderPreset(presetName) + + if not project.SetCurrentRenderFormatAndCodec(renderFormat, renderCodec): + return False + + project.SetRenderSettings( + {"SelectAllFrames": 1, "TargetDir": targetDirectory}) + return project.AddRenderJob() + + +def RenderAllTimelines(resolve, presetName, targetDirectory, + renderFormat, renderCodec): + projectManager = resolve.GetProjectManager() + project = projectManager.GetCurrentProject() + if not project: + return False + + resolve.OpenPage("Deliver") + timelineCount = project.GetTimelineCount() + + for index in range(0, int(timelineCount)): + if not AddTimelineToRender( + project, + project.GetTimelineByIndex(index + 1), + presetName, + targetDirectory, + renderFormat, + renderCodec): + return False + return project.StartRendering() + + +def IsRenderingInProgress(resolve): + projectManager = resolve.GetProjectManager() + project = projectManager.GetCurrentProject() + if not project: + return False + + return project.IsRenderingInProgress() + + +def WaitForRenderingCompletion(resolve): + while IsRenderingInProgress(resolve): + time.sleep(1) + return + + +def ApplyDRXToAllTimelineClips(timeline, path, gradeMode=0): + trackCount = timeline.GetTrackCount("video") + + clips = {} + for index in range(1, int(trackCount) + 1): + clips.update(timeline.GetItemsInTrack("video", index)) + return timeline.ApplyGradeFromDRX(path, int(gradeMode), clips) + + +def ApplyDRXToAllTimelines(resolve, path, gradeMode=0): + projectManager = resolve.GetProjectManager() + project = projectManager.GetCurrentProject() + if not project: + return False + timelineCount = project.GetTimelineCount() + + for index in range(0, int(timelineCount)): + timeline = project.GetTimelineByIndex(index + 1) + project.SetCurrentTimeline(timeline) + if not ApplyDRXToAllTimelineClips(timeline, path, gradeMode): + return False + return True + + +def DeleteAllRenderJobs(resolve): + projectManager = resolve.GetProjectManager() + project = projectManager.GetCurrentProject() + project.DeleteAllRenderJobs() + return + + +# Inputs: +# - DRX file to import grade still and apply it for clips +# - grade mode (0, 1 or 2) +# - preset name for rendering +# - render path +# - render format +# - render codec +if len(sys.argv) < 7: + print( + "input parameters for scripts are [drx file path] [grade mode] " + "[render preset name] [render path] [render format] [render codec]") + sys.exit() + +drxPath = sys.argv[1] +gradeMode = sys.argv[2] +renderPresetName = sys.argv[3] +renderPath = sys.argv[4] +renderFormat = sys.argv[5] +renderCodec = sys.argv[6] + +# Get currently open project +resolve = GetResolve() + +if not ApplyDRXToAllTimelines(resolve, drxPath, gradeMode): + print("Unable to apply a still from drx file to all timelines") + sys.exit() + +if not RenderAllTimelines(resolve, renderPresetName, renderPath, + renderFormat, renderCodec): + print("Unable to set all timelines for rendering") + sys.exit() + +WaitForRenderingCompletion(resolve) + +DeleteAllRenderJobs(resolve) + +print("Rendering is completed.")