hound fixes and other

This commit is contained in:
Jakub Jezek 2020-12-18 17:11:00 +01:00
parent 5edb912a8d
commit abc60f3aae
No known key found for this signature in database
GPG key ID: C4B96E101D2A47F3
4 changed files with 135 additions and 1251 deletions

View file

@ -1,838 +0,0 @@
"""
Host specific functions where host api is connected
"""
import os
import re
import sys
import ast
import hiero
import avalon.api as avalon
import avalon.io
from avalon.vendor.Qt import QtWidgets
from pype.api import (Logger, Anatomy, config)
from . import tags
import shutil
from compiler.ast import flatten
try:
from PySide.QtCore import QFile, QTextStream
from PySide.QtXml import QDomDocument
except ImportError:
from PySide2.QtCore import QFile, QTextStream
from PySide2.QtXml import QDomDocument
# from opentimelineio import opentime
# from pprint import pformat
log = Logger().get_logger(__name__, "hiero")
self = sys.modules[__name__]
self._has_been_setup = False
self._has_menu = False
self._registered_gui = None
self.pype_tag_name = "Pype Data"
self.default_sequence_name = "PypeSequence"
self.default_bin_name = "PypeBin"
AVALON_CONFIG = os.getenv("AVALON_CONFIG", "pype")
def get_current_project(remove_untitled=False):
projects = flatten(hiero.core.projects())
if not remove_untitled:
return next(iter(projects))
# if remove_untitled
for proj in projects:
if "Untitled" in proj.name():
proj.close()
else:
return proj
def get_current_sequence(name=None, new=False):
"""
Get current sequence in context of active project.
Args:
name (str)[optional]: name of sequence we want to return
new (bool)[optional]: if we want to create new one
Returns:
hiero.core.Sequence: the sequence object
"""
sequence = None
project = get_current_project()
root_bin = project.clipsBin()
if new:
# create new
name = name or self.default_sequence_name
sequence = hiero.core.Sequence(name)
root_bin.addItem(hiero.core.BinItem(sequence))
elif name:
# look for sequence by name
sequences = project.sequences()
for _sequence in sequences:
if _sequence.name() == name:
sequence = _sequence
if not sequence:
# if nothing found create new with input name
sequence = get_current_sequence(name, True)
elif not name and not new:
# if name is none and new is False then return current open sequence
sequence = hiero.ui.activeSequence()
return sequence
def get_current_track(sequence, name, audio=False):
"""
Get current track in context of active project.
Creates new if none is found.
Args:
sequence (hiero.core.Sequence): hiero sequene object
name (str): name of track we want to return
audio (bool)[optional]: switch to AudioTrack
Returns:
hiero.core.Track: the track object
"""
tracks = sequence.videoTracks()
if audio:
tracks = sequence.audioTracks()
# get track by name
track = None
for _track in tracks:
if _track.name() in name:
track = _track
if not track:
if not audio:
track = hiero.core.VideoTrack(name)
else:
track = hiero.core.AudioTrack(name)
sequence.addTrack(track)
return track
def get_track_items(
selected=False,
sequence_name=None,
track_item_name=None,
track_name=None,
track_type=None,
check_enabled=True,
check_locked=True,
check_tagged=False):
"""Get all available current timeline track items.
Attribute:
selected (bool)[optional]: return only selected items on timeline
sequence_name (str)[optional]: return only clips from input sequence
track_item_name (str)[optional]: return only item with input name
track_name (str)[optional]: return only items from track name
track_type (str)[optional]: return only items of given type
(`audio` or `video`) default is `video`
check_enabled (bool)[optional]: ignore disabled if True
check_locked (bool)[optional]: ignore locked if True
Return:
list or hiero.core.TrackItem: list of track items or single track item
"""
return_list = list()
track_items = list()
# get selected track items or all in active sequence
if selected:
selected_items = list(hiero.selection)
for item in selected_items:
if track_name and track_name in item.parent().name():
# filter only items fitting input track name
track_items.append(item)
elif not track_name:
# or add all if no track_name was defined
track_items.append(item)
else:
sequence = get_current_sequence(name=sequence_name)
# get all available tracks from sequence
tracks = list(sequence.audioTracks()) + list(sequence.videoTracks())
# loop all tracks
for track in tracks:
if check_locked and track.isLocked():
continue
if check_enabled and not track.isEnabled():
continue
# and all items in track
for item in track.items():
if check_tagged and not item.tags():
continue
# check if track item is enabled
if check_enabled:
if not item.isEnabled():
continue
if track_item_name:
if item.name() in track_item_name:
return item
# make sure only track items with correct track names are added
if track_name and track_name in track.name():
# filter out only defined track_name items
track_items.append(item)
elif not track_name:
# or add all if no track_name is defined
track_items.append(item)
# filter out only track items with defined track_type
for track_item in track_items:
if track_type and track_type == "video" and isinstance(
track_item.parent(), hiero.core.VideoTrack):
# only video track items are allowed
return_list.append(track_item)
elif track_type and track_type == "audio" and isinstance(
track_item.parent(), hiero.core.AudioTrack):
# only audio track items are allowed
return_list.append(track_item)
elif not track_type:
# add all if no track_type is defined
return_list.append(track_item)
return return_list
def get_track_item_pype_tag(track_item):
"""
Get pype track item tag created by creator or loader plugin.
Attributes:
trackItem (hiero.core.TrackItem): hiero object
Returns:
hiero.core.Tag: hierarchy, orig clip attributes
"""
# get all tags from track item
_tags = track_item.tags()
if not _tags:
return None
for tag in _tags:
# return only correct tag defined by global name
if tag.name() in self.pype_tag_name:
return tag
def set_track_item_pype_tag(track_item, data=None):
"""
Set pype track item tag to input track_item.
Attributes:
trackItem (hiero.core.TrackItem): hiero object
Returns:
hiero.core.Tag
"""
data = data or dict()
# basic Tag's attribute
tag_data = {
"editable": "0",
"note": "Pype data holder",
"icon": "pype_icon.png",
"metadata": {k: v for k, v in data.items()}
}
# get available pype tag if any
_tag = get_track_item_pype_tag(track_item)
if _tag:
# it not tag then create one
tag = tags.update_tag(_tag, tag_data)
else:
# if pype tag available then update with input data
tag = tags.create_tag(self.pype_tag_name, tag_data)
# add it to the input track item
track_item.addTag(tag)
return tag
def get_track_item_pype_data(track_item):
"""
Get track item's pype tag data.
Attributes:
trackItem (hiero.core.TrackItem): hiero object
Returns:
dict: data found on pype tag
"""
data = dict()
# get pype data tag from track item
tag = get_track_item_pype_tag(track_item)
if not tag:
return None
# get tag metadata attribut
tag_data = tag.metadata()
# convert tag metadata to normal keys names and values to correct types
for k, v in dict(tag_data).items():
key = k.replace("tag.", "")
try:
# capture exceptions which are related to strings only
value = ast.literal_eval(v)
except (ValueError, SyntaxError):
value = v
data.update({key: value})
return data
def imprint(track_item, data=None):
"""
Adding `Avalon data` into a hiero track item tag.
Also including publish attribute into tag.
Arguments:
track_item (hiero.core.TrackItem): hiero track item object
data (dict): Any data which needst to be imprinted
Examples:
data = {
'asset': 'sq020sh0280',
'family': 'render',
'subset': 'subsetMain'
}
"""
data = data or {}
tag = set_track_item_pype_tag(track_item, data)
# add publish attribute
set_publish_attribute(tag, True)
def set_publish_attribute(tag, value):
""" Set Publish attribute in input Tag object
Attribute:
tag (hiero.core.Tag): a tag object
value (bool): True or False
"""
tag_data = tag.metadata()
# set data to the publish attribute
tag_data.setValue("tag.publish", str(value))
def get_publish_attribute(tag):
""" Get Publish attribute from input Tag object
Attribute:
tag (hiero.core.Tag): a tag object
value (bool): True or False
"""
tag_data = tag.metadata()
# get data to the publish attribute
value = tag_data.value("tag.publish")
# return value converted to bool value. Atring is stored in tag.
return ast.literal_eval(value)
def sync_avalon_data_to_workfile():
# import session to get project dir
project_name = avalon.Session["AVALON_PROJECT"]
anatomy = Anatomy(project_name)
work_template = anatomy.templates["work"]["path"]
work_root = anatomy.root_value_for_template(work_template)
active_project_root = (
os.path.join(work_root, project_name)
).replace("\\", "/")
# getting project
project = get_current_project()
if "Tag Presets" in project.name():
return
log.debug("Synchronizing Pype metadata to project: {}".format(
project.name()))
# set project root with backward compatibility
try:
project.setProjectDirectory(active_project_root)
except Exception:
# old way of seting it
project.setProjectRoot(active_project_root)
# get project data from avalon db
project_doc = avalon.io.find_one({"type": "project"})
project_data = project_doc["data"]
log.debug("project_data: {}".format(project_data))
# get format and fps property from avalon db on project
width = project_data["resolutionWidth"]
height = project_data["resolutionHeight"]
pixel_aspect = project_data["pixelAspect"]
fps = project_data['fps']
format_name = project_data['code']
# create new format in hiero project
format = hiero.core.Format(width, height, pixel_aspect, format_name)
project.setOutputFormat(format)
# set fps to hiero project
project.setFramerate(fps)
# TODO: add auto colorspace set from project drop
log.info("Project property has been synchronised with Avalon db")
def launch_workfiles_app(event):
"""
Event for launching workfiles after hiero start
Args:
event (obj): required but unused
"""
from . import launch_workfiles_app
launch_workfiles_app()
def setup(console=False, port=None, menu=True):
"""Setup integration
Registers Pyblish for Hiero plug-ins and appends an item to the File-menu
Arguments:
console (bool): Display console with GUI
port (int, optional): Port from which to start looking for an
available port to connect with Pyblish QML, default
provided by Pyblish Integration.
menu (bool, optional): Display file menu in Hiero.
"""
if self._has_been_setup:
teardown()
add_submission()
if menu:
add_to_filemenu()
self._has_menu = True
self._has_been_setup = True
log.debug("pyblish: Loaded successfully.")
def teardown():
"""Remove integration"""
if not self._has_been_setup:
return
if self._has_menu:
remove_from_filemenu()
self._has_menu = False
self._has_been_setup = False
log.debug("pyblish: Integration torn down successfully")
def remove_from_filemenu():
raise NotImplementedError("Implement me please.")
def add_to_filemenu():
PublishAction()
class PyblishSubmission(hiero.exporters.FnSubmission.Submission):
def __init__(self):
hiero.exporters.FnSubmission.Submission.__init__(self)
def addToQueue(self):
from . import publish
# Add submission to Hiero module for retrieval in plugins.
hiero.submission = self
publish()
def add_submission():
registry = hiero.core.taskRegistry
registry.addSubmission("Pyblish", PyblishSubmission)
class PublishAction(QtWidgets.QAction):
"""
Action with is showing as menu item
"""
def __init__(self):
QtWidgets.QAction.__init__(self, "Publish", None)
self.triggered.connect(self.publish)
for interest in ["kShowContextMenu/kTimeline",
"kShowContextMenukBin",
"kShowContextMenu/kSpreadsheet"]:
hiero.core.events.registerInterest(interest, self.eventHandler)
self.setShortcut("Ctrl+Alt+P")
def publish(self):
from . import publish
# Removing "submission" attribute from hiero module, to prevent tasks
# from getting picked up when not using the "Export" dialog.
if hasattr(hiero, "submission"):
del hiero.submission
publish()
def eventHandler(self, event):
# Add the Menu to the right-click menu
event.menu.addAction(self)
# def CreateNukeWorkfile(nodes=None,
# nodes_effects=None,
# to_timeline=False,
# **kwargs):
# ''' Creating nuke workfile with particular version with given nodes
# Also it is creating timeline track items as precomps.
#
# Arguments:
# nodes(list of dict): each key in dict is knob order is important
# to_timeline(type): will build trackItem with metadata
#
# Returns:
# bool: True if done
#
# Raises:
# Exception: with traceback
#
# '''
# import hiero.core
# from avalon.nuke import imprint
# from pype.hosts.nuke import (
# lib as nklib
# )
#
# # check if the file exists if does then Raise "File exists!"
# if os.path.exists(filepath):
# raise FileExistsError("File already exists: `{}`".format(filepath))
#
# # if no representations matching then
# # Raise "no representations to be build"
# if len(representations) == 0:
# raise AttributeError("Missing list of `representations`")
#
# # check nodes input
# if len(nodes) == 0:
# log.warning("Missing list of `nodes`")
#
# # create temp nk file
# nuke_script = hiero.core.nuke.ScriptWriter()
#
# # create root node and save all metadata
# root_node = hiero.core.nuke.RootNode()
#
# anatomy = Anatomy(os.environ["AVALON_PROJECT"])
# work_template = anatomy.templates["work"]["path"]
# root_path = anatomy.root_value_for_template(work_template)
#
# nuke_script.addNode(root_node)
#
# # here to call pype.hosts.nuke.lib.BuildWorkfile
# script_builder = nklib.BuildWorkfile(
# root_node=root_node,
# root_path=root_path,
# nodes=nuke_script.getNodes(),
# **kwargs
# )
def create_nuke_workfile_clips(nuke_workfiles, seq=None):
'''
nuke_workfiles is list of dictionaries like:
[{
'path': 'P:/Jakub_testy_pipeline/test_v01.nk',
'name': 'test',
'handleStart': 15, # added asymetrically to handles
'handleEnd': 10, # added asymetrically to handles
"clipIn": 16,
"frameStart": 991,
"frameEnd": 1023,
'task': 'Comp-tracking',
'work_dir': 'VFX_PR',
'shot': '00010'
}]
'''
proj = hiero.core.projects()[-1]
root = proj.clipsBin()
if not seq:
seq = hiero.core.Sequence('NewSequences')
root.addItem(hiero.core.BinItem(seq))
# todo will ned to define this better
# track = seq[1] # lazy example to get a destination# track
clips_lst = []
for nk in nuke_workfiles:
task_path = '/'.join([nk['work_dir'], nk['shot'], nk['task']])
bin = create_bin(task_path, proj)
if nk['task'] not in seq.videoTracks():
track = hiero.core.VideoTrack(nk['task'])
seq.addTrack(track)
else:
track = seq.tracks(nk['task'])
# create clip media
media = hiero.core.MediaSource(nk['path'])
media_in = int(media.startTime() or 0)
media_duration = int(media.duration() or 0)
handle_start = nk.get("handleStart")
handle_end = nk.get("handleEnd")
if media_in:
source_in = media_in + handle_start
else:
source_in = nk["frameStart"] + handle_start
if media_duration:
source_out = (media_in + media_duration - 1) - handle_end
else:
source_out = nk["frameEnd"] - handle_end
source = hiero.core.Clip(media)
name = os.path.basename(os.path.splitext(nk['path'])[0])
split_name = split_by_client_version(name)[0] or name
# add to bin as clip item
items_in_bin = [b.name() for b in bin.items()]
if split_name not in items_in_bin:
binItem = hiero.core.BinItem(source)
bin.addItem(binItem)
new_source = [
item for item in bin.items() if split_name in item.name()
][0].items()[0].item()
# add to track as clip item
trackItem = hiero.core.TrackItem(
split_name, hiero.core.TrackItem.kVideo)
trackItem.setSource(new_source)
trackItem.setSourceIn(source_in)
trackItem.setSourceOut(source_out)
trackItem.setTimelineIn(nk["clipIn"])
trackItem.setTimelineOut(nk["clipIn"] + (source_out - source_in))
track.addTrackItem(trackItem)
clips_lst.append(trackItem)
return clips_lst
def create_bin(path=None, project=None):
'''
Create bin in project.
If the path is "bin1/bin2/bin3" it will create whole depth
and return `bin3`
'''
# get the first loaded project
project = project or get_current_project()
path = path or self.default_bin_name
path = path.replace("\\", "/").split("/")
root_bin = project.clipsBin()
done_bin_lst = []
for i, b in enumerate(path):
if i == 0 and len(path) > 1:
if b in [bin.name() for bin in root_bin.bins()]:
bin = [bin for bin in root_bin.bins() if b in bin.name()][0]
done_bin_lst.append(bin)
else:
create_bin = hiero.core.Bin(b)
root_bin.addItem(create_bin)
done_bin_lst.append(create_bin)
elif i >= 1 and i < len(path) - 1:
if b in [bin.name() for bin in done_bin_lst[i - 1].bins()]:
bin = [
bin for bin in done_bin_lst[i - 1].bins()
if b in bin.name()
][0]
done_bin_lst.append(bin)
else:
create_bin = hiero.core.Bin(b)
done_bin_lst[i - 1].addItem(create_bin)
done_bin_lst.append(create_bin)
elif i == len(path) - 1:
if b in [bin.name() for bin in done_bin_lst[i - 1].bins()]:
bin = [
bin for bin in done_bin_lst[i - 1].bins()
if b in bin.name()
][0]
done_bin_lst.append(bin)
else:
create_bin = hiero.core.Bin(b)
done_bin_lst[i - 1].addItem(create_bin)
done_bin_lst.append(create_bin)
return done_bin_lst[-1]
def split_by_client_version(string):
regex = r"[/_.]v\d+"
try:
matches = re.findall(regex, string, re.IGNORECASE)
return string.split(matches[0])
except Exception as error:
log.error(error)
return None
def get_selected_track_items(sequence=None):
_sequence = sequence or get_current_sequence()
# Getting selection
timeline_editor = hiero.ui.getTimelineEditor(_sequence)
return timeline_editor.selection()
def set_selected_track_items(track_items_list, sequence=None):
_sequence = sequence or get_current_sequence()
# Getting selection
timeline_editor = hiero.ui.getTimelineEditor(_sequence)
return timeline_editor.setSelection(track_items_list)
def _read_doc_from_path(path):
# reading QDomDocument from HROX path
hrox_file = QFile(path)
if not hrox_file.open(QFile.ReadOnly):
raise RuntimeError("Failed to open file for reading")
doc = QDomDocument()
doc.setContent(hrox_file)
hrox_file.close()
return doc
def _write_doc_to_path(doc, path):
# write QDomDocument to path as HROX
hrox_file = QFile(path)
if not hrox_file.open(QFile.WriteOnly):
raise RuntimeError("Failed to open file for writing")
stream = QTextStream(hrox_file)
doc.save(stream, 1)
hrox_file.close()
def _set_hrox_project_knobs(doc, **knobs):
# set attributes to Project Tag
proj_elem = doc.documentElement().firstChildElement("Project")
for k, v in knobs.items():
proj_elem.setAttribute(k, v)
def apply_colorspace_project():
# get path the the active projects
project = get_current_project(remove_untitled=True)
current_file = project.path()
# close the active project
project.close()
# get presets for hiero
presets = config.get_init_presets()
colorspace = presets["colorspace"]
hiero_project_clrs = colorspace.get("hiero", {}).get("project", {})
# save the workfile as subversion "comment:_colorspaceChange"
split_current_file = os.path.splitext(current_file)
copy_current_file = current_file
if "_colorspaceChange" not in current_file:
copy_current_file = (
split_current_file[0]
+ "_colorspaceChange"
+ split_current_file[1]
)
try:
# duplicate the file so the changes are applied only to the copy
shutil.copyfile(current_file, copy_current_file)
except shutil.Error:
# in case the file already exists and it want to copy to the
# same filewe need to do this trick
# TEMP file name change
copy_current_file_tmp = copy_current_file + "_tmp"
# create TEMP file
shutil.copyfile(current_file, copy_current_file_tmp)
# remove original file
os.remove(current_file)
# copy TEMP back to original name
shutil.copyfile(copy_current_file_tmp, copy_current_file)
# remove the TEMP file as we dont need it
os.remove(copy_current_file_tmp)
# use the code from bellow for changing xml hrox Attributes
hiero_project_clrs.update({"name": os.path.basename(copy_current_file)})
# read HROX in as QDomSocument
doc = _read_doc_from_path(copy_current_file)
# apply project colorspace properties
_set_hrox_project_knobs(doc, **hiero_project_clrs)
# write QDomSocument back as HROX
_write_doc_to_path(doc, copy_current_file)
# open the file as current project
hiero.core.openProject(copy_current_file)
def apply_colorspace_clips():
project = get_current_project(remove_untitled=True)
clips = project.clips()
# get presets for hiero
presets = config.get_init_presets()
colorspace = presets["colorspace"]
hiero_clips_clrs = colorspace.get("hiero", {}).get("clips", {})
for clip in clips:
clip_media_source_path = clip.mediaSource().firstpath()
clip_name = clip.name()
clip_colorspace = clip.sourceMediaColourTransform()
if "default" in clip_colorspace:
continue
# check if any colorspace presets for read is mathing
preset_clrsp = next((hiero_clips_clrs[k]
for k in hiero_clips_clrs
if bool(re.search(k, clip_media_source_path))),
None)
if preset_clrsp:
log.debug("Changing clip.path: {}".format(clip_media_source_path))
log.info("Changing clip `{}` colorspace {} to {}".format(
clip_name, clip_colorspace, preset_clrsp))
# set the found preset to the clip
clip.setSourceMediaColourTransform(preset_clrsp)
# save project after all is changed
project.save()

View file

@ -1,302 +0,0 @@
"""
Basic avalon integration
"""
import os
import contextlib
from collections import OrderedDict
from avalon.tools import (
workfiles,
publish as _publish
)
from avalon.pipeline import AVALON_CONTAINER_ID
from avalon import api as avalon
from avalon import schema
from pyblish import api as pyblish
import pype
from pype.api import Logger
from . import lib, menu, events
log = Logger().get_logger(__name__, "hiero")
AVALON_CONFIG = os.getenv("AVALON_CONFIG", "pype")
# plugin paths
LOAD_PATH = os.path.join(pype.PLUGINS_DIR, "hiero", "load")
CREATE_PATH = os.path.join(pype.PLUGINS_DIR, "hiero", "create")
INVENTORY_PATH = os.path.join(pype.PLUGINS_DIR, "hiero", "inventory")
PUBLISH_PATH = os.path.join(
pype.PLUGINS_DIR, "hiero", "publish"
).replace("\\", "/")
AVALON_CONTAINERS = ":AVALON_CONTAINERS"
def install():
"""
Installing Hiero integration for avalon
Args:
config (obj): avalon config module `pype` in our case, it is not
used but required by avalon.api.install()
"""
# adding all events
events.register_events()
log.info("Registering Hiero plug-ins..")
pyblish.register_host("hiero")
pyblish.register_plugin_path(PUBLISH_PATH)
avalon.register_plugin_path(avalon.Loader, LOAD_PATH)
avalon.register_plugin_path(avalon.Creator, CREATE_PATH)
avalon.register_plugin_path(avalon.InventoryAction, INVENTORY_PATH)
# register callback for switching publishable
pyblish.register_callback("instanceToggled", on_pyblish_instance_toggled)
# Disable all families except for the ones we explicitly want to see
family_states = [
"write",
"review",
"plate"
]
avalon.data["familiesStateDefault"] = False
avalon.data["familiesStateToggled"] = family_states
# install menu
menu.menu_install()
# register hiero events
events.register_hiero_events()
def uninstall():
"""
Uninstalling Hiero integration for avalon
"""
log.info("Deregistering Hiero plug-ins..")
pyblish.deregister_host("hiero")
pyblish.deregister_plugin_path(PUBLISH_PATH)
avalon.deregister_plugin_path(avalon.Loader, LOAD_PATH)
avalon.deregister_plugin_path(avalon.Creator, CREATE_PATH)
# register callback for switching publishable
pyblish.deregister_callback("instanceToggled", on_pyblish_instance_toggled)
def containerise(track_item,
name,
namespace,
context,
loader=None,
data=None):
"""Bundle Hiero's object into an assembly and imprint it with metadata
Containerisation enables a tracking of version, author and origin
for loaded assets.
Arguments:
track_item (hiero.core.TrackItem): object to imprint as container
name (str): Name of resulting assembly
namespace (str): Namespace under which to host container
context (dict): Asset information
loader (str, optional): Name of node used to produce this container.
Returns:
track_item (hiero.core.TrackItem): containerised object
"""
data_imprint = OrderedDict({
"schema": "avalon-core:container-2.0",
"id": AVALON_CONTAINER_ID,
"name": str(name),
"namespace": str(namespace),
"loader": str(loader),
"representation": str(context["representation"]["_id"]),
})
if data:
for k, v in data.items():
data_imprint.update({k: v})
log.debug("_ data_imprint: {}".format(data_imprint))
lib.set_track_item_pype_tag(track_item, data_imprint)
return track_item
def ls():
"""List available containers.
This function is used by the Container Manager in Nuke. You'll
need to implement a for-loop that then *yields* one Container at
a time.
See the `container.json` schema for details on how it should look,
and the Maya equivalent, which is in `avalon.maya.pipeline`
"""
# get all track items from current timeline
all_track_items = lib.get_track_items()
for track_item in all_track_items:
container = parse_container(track_item)
if container:
yield container
def parse_container(track_item, validate=True):
"""Return container data from track_item's pype tag.
Args:
track_item (hiero.core.TrackItem): A containerised track item.
validate (bool)[optional]: validating with avalon scheme
Returns:
dict: The container schema data for input containerized track item.
"""
# convert tag metadata to normal keys names
data = lib.get_track_item_pype_data(track_item)
if validate and data and data.get("schema"):
schema.validate(data)
if not isinstance(data, dict):
return
# If not all required data return the empty container
required = ['schema', 'id', 'name',
'namespace', 'loader', 'representation']
if not all(key in data for key in required):
return
container = {key: data[key] for key in required}
container["objectName"] = track_item.name()
# Store reference to the node object
container["_track_item"] = track_item
return container
def update_container(track_item, data=None):
"""Update container data to input track_item's pype tag.
Args:
track_item (hiero.core.TrackItem): A containerised track item.
data (dict)[optional]: dictionery with data to be updated
Returns:
bool: True if container was updated correctly
"""
data = data or dict()
container = lib.get_track_item_pype_data(track_item)
for _key, _value in container.items():
try:
container[_key] = data[_key]
except KeyError:
pass
log.info("Updating container: `{}`".format(track_item.name()))
return bool(lib.set_track_item_pype_tag(track_item, container))
def launch_workfiles_app(*args):
''' Wrapping function for workfiles launcher '''
workdir = os.environ["AVALON_WORKDIR"]
# show workfile gui
workfiles.show(workdir)
def publish(parent):
"""Shorthand to publish from within host"""
return _publish.show(parent)
@contextlib.contextmanager
def maintained_selection():
"""Maintain selection during context
Example:
>>> with maintained_selection():
... for track_item in track_items:
... < do some stuff >
"""
from .lib import (
set_selected_track_items,
get_selected_track_items
)
previous_selection = get_selected_track_items()
reset_selection()
try:
# do the operation
yield
finally:
reset_selection()
set_selected_track_items(previous_selection)
def reset_selection():
"""Deselect all selected nodes
"""
from .lib import set_selected_track_items
set_selected_track_items([])
def reload_config():
"""Attempt to reload pipeline at run-time.
CAUTION: This is primarily for development and debugging purposes.
"""
import importlib
for module in (
"avalon",
"avalon.lib",
"avalon.pipeline",
"pyblish",
"pypeapp",
"{}.api".format(AVALON_CONFIG),
"{}.hosts.hiero.lib".format(AVALON_CONFIG),
"{}.hosts.hiero.menu".format(AVALON_CONFIG),
"{}.hosts.hiero.tags".format(AVALON_CONFIG)
):
log.info("Reloading module: {}...".format(module))
try:
module = importlib.import_module(module)
import imp
imp.reload(module)
except Exception as e:
log.warning("Cannot reload module: {}".format(e))
importlib.reload(module)
def on_pyblish_instance_toggled(instance, old_value, new_value):
"""Toggle node passthrough states on instance toggles."""
log.info("instance toggle: {}, old_value: {}, new_value:{} ".format(
instance, old_value, new_value))
from pype.hosts.hiero import (
get_track_item_pype_tag,
set_publish_attribute
)
# Whether instances should be passthrough based on new value
track_item = instance.data["item"]
tag = get_track_item_pype_tag(track_item)
set_publish_attribute(tag, new_value)

View file

@ -1,111 +0,0 @@
#!/usr/bin/env python
"""
Example DaVinci Resolve script:
Load a still from DRX file, apply the still to all clips in all timelines. Set render format and codec, add render jobs for all timelines, render to specified path and wait for rendering completion.
Once render is complete, delete all jobs
"""
from python_get_resolve import GetResolve
import sys
import time
def AddTimelineToRender( project, timeline, presetName, targetDirectory, renderFormat, renderCodec ):
project.SetCurrentTimeline(timeline)
project.LoadRenderPreset(presetName)
if not project.SetCurrentRenderFormatAndCodec(renderFormat, renderCodec):
return False
project.SetRenderSettings({"SelectAllFrames" : 1, "TargetDir" : targetDirectory})
return project.AddRenderJob()
def RenderAllTimelines( resolve, presetName, targetDirectory, renderFormat, renderCodec ):
projectManager = resolve.GetProjectManager()
project = projectManager.GetCurrentProject()
if not project:
return False
resolve.OpenPage("Deliver")
timelineCount = project.GetTimelineCount()
for index in range (0, int(timelineCount)):
if not AddTimelineToRender(project, project.GetTimelineByIndex(index + 1), presetName, targetDirectory, renderFormat, renderCodec):
return False
return project.StartRendering()
def IsRenderingInProgress( resolve ):
projectManager = resolve.GetProjectManager()
project = projectManager.GetCurrentProject()
if not project:
return False
return project.IsRenderingInProgress()
def WaitForRenderingCompletion( resolve ):
while IsRenderingInProgress(resolve):
time.sleep(1)
return
def ApplyDRXToAllTimelineClips( timeline, path, gradeMode = 0 ):
trackCount = timeline.GetTrackCount("video")
clips = {}
for index in range (1, int(trackCount) + 1):
clips.update( timeline.GetItemsInTrack("video", index) )
return timeline.ApplyGradeFromDRX(path, int(gradeMode), clips)
def ApplyDRXToAllTimelines( resolve, path, gradeMode = 0 ):
projectManager = resolve.GetProjectManager()
project = projectManager.GetCurrentProject()
if not project:
return False
timelineCount = project.GetTimelineCount()
for index in range (0, int(timelineCount)):
timeline = project.GetTimelineByIndex(index + 1)
project.SetCurrentTimeline( timeline )
if not ApplyDRXToAllTimelineClips(timeline, path, gradeMode):
return False
return True
def DeleteAllRenderJobs( resolve ):
projectManager = resolve.GetProjectManager()
project = projectManager.GetCurrentProject()
project.DeleteAllRenderJobs()
return
# Inputs:
# - DRX file to import grade still and apply it for clips
# - grade mode (0, 1 or 2)
# - preset name for rendering
# - render path
# - render format
# - render codec
if len(sys.argv) < 7:
print("input parameters for scripts are [drx file path] [grade mode] [render preset name] [render path] [render format] [render codec]")
sys.exit()
drxPath = sys.argv[1]
gradeMode = sys.argv[2]
renderPresetName = sys.argv[3]
renderPath = sys.argv[4]
renderFormat = sys.argv[5]
renderCodec = sys.argv[6]
# Get currently open project
resolve = GetResolve()
if not ApplyDRXToAllTimelines(resolve, drxPath, gradeMode):
print("Unable to apply a still from drx file to all timelines")
sys.exit()
if not RenderAllTimelines(resolve, renderPresetName, renderPath, renderFormat, renderCodec):
print("Unable to set all timelines for rendering")
sys.exit()
WaitForRenderingCompletion(resolve)
DeleteAllRenderJobs(resolve)
print("Rendering is completed.")

View file

@ -0,0 +1,135 @@
#!/usr/bin/env python
# TODO: convert this script to be usable with PYPE
"""
Example DaVinci Resolve script:
Load a still from DRX file, apply the still to all clips in all timelines.
Set render format and codec, add render jobs for all timelines, render
to specified path and wait for rendering completion.
Once render is complete, delete all jobs
clonned from: https://github.com/survos/transcribe/blob/fe3cf51eb95b82dabcf21fbe5f89bfb3d8bb6ce2/python/3_grade_and_render_all_timelines.py
"""
from python_get_resolve import GetResolve
import sys
import time
def AddTimelineToRender(project, timeline, presetName,
targetDirectory, renderFormat, renderCodec):
project.SetCurrentTimeline(timeline)
project.LoadRenderPreset(presetName)
if not project.SetCurrentRenderFormatAndCodec(renderFormat, renderCodec):
return False
project.SetRenderSettings(
{"SelectAllFrames": 1, "TargetDir": targetDirectory})
return project.AddRenderJob()
def RenderAllTimelines(resolve, presetName, targetDirectory,
renderFormat, renderCodec):
projectManager = resolve.GetProjectManager()
project = projectManager.GetCurrentProject()
if not project:
return False
resolve.OpenPage("Deliver")
timelineCount = project.GetTimelineCount()
for index in range(0, int(timelineCount)):
if not AddTimelineToRender(
project,
project.GetTimelineByIndex(index + 1),
presetName,
targetDirectory,
renderFormat,
renderCodec):
return False
return project.StartRendering()
def IsRenderingInProgress(resolve):
projectManager = resolve.GetProjectManager()
project = projectManager.GetCurrentProject()
if not project:
return False
return project.IsRenderingInProgress()
def WaitForRenderingCompletion(resolve):
while IsRenderingInProgress(resolve):
time.sleep(1)
return
def ApplyDRXToAllTimelineClips(timeline, path, gradeMode=0):
trackCount = timeline.GetTrackCount("video")
clips = {}
for index in range(1, int(trackCount) + 1):
clips.update(timeline.GetItemsInTrack("video", index))
return timeline.ApplyGradeFromDRX(path, int(gradeMode), clips)
def ApplyDRXToAllTimelines(resolve, path, gradeMode=0):
projectManager = resolve.GetProjectManager()
project = projectManager.GetCurrentProject()
if not project:
return False
timelineCount = project.GetTimelineCount()
for index in range(0, int(timelineCount)):
timeline = project.GetTimelineByIndex(index + 1)
project.SetCurrentTimeline(timeline)
if not ApplyDRXToAllTimelineClips(timeline, path, gradeMode):
return False
return True
def DeleteAllRenderJobs(resolve):
projectManager = resolve.GetProjectManager()
project = projectManager.GetCurrentProject()
project.DeleteAllRenderJobs()
return
# Inputs:
# - DRX file to import grade still and apply it for clips
# - grade mode (0, 1 or 2)
# - preset name for rendering
# - render path
# - render format
# - render codec
if len(sys.argv) < 7:
print(
"input parameters for scripts are [drx file path] [grade mode] "
"[render preset name] [render path] [render format] [render codec]")
sys.exit()
drxPath = sys.argv[1]
gradeMode = sys.argv[2]
renderPresetName = sys.argv[3]
renderPath = sys.argv[4]
renderFormat = sys.argv[5]
renderCodec = sys.argv[6]
# Get currently open project
resolve = GetResolve()
if not ApplyDRXToAllTimelines(resolve, drxPath, gradeMode):
print("Unable to apply a still from drx file to all timelines")
sys.exit()
if not RenderAllTimelines(resolve, renderPresetName, renderPath,
renderFormat, renderCodec):
print("Unable to set all timelines for rendering")
sys.exit()
WaitForRenderingCompletion(resolve)
DeleteAllRenderJobs(resolve)
print("Rendering is completed.")