mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 21:04:40 +01:00
561 lines
16 KiB
Python
561 lines
16 KiB
Python
import sys
|
|
import os
|
|
import re
|
|
import json
|
|
import pickle
|
|
import contextlib
|
|
from pprint import pformat
|
|
from .constants import (
|
|
MARKER_COLOR,
|
|
MARKER_DURATION,
|
|
MARKER_NAME,
|
|
COLOR_MAP,
|
|
MARKER_PUBLISH_DEFAULT
|
|
)
|
|
from openpype.api import Logger
|
|
|
|
log = Logger.get_logger(__name__)
|
|
|
|
|
|
class CTX:
|
|
# singleton used for passing data between api modules
|
|
app_framework = None
|
|
flame_apps = []
|
|
selection = None
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def io_preferences_file(klass, filepath, write=False):
|
|
try:
|
|
flag = "w" if write else "r"
|
|
yield open(filepath, flag)
|
|
|
|
except IOError as _error:
|
|
klass.log.info("Unable to work with preferences `{}`: {}".format(
|
|
filepath, _error))
|
|
|
|
|
|
class FlameAppFramework(object):
|
|
# flameAppFramework class takes care of preferences
|
|
|
|
class prefs_dict(dict):
|
|
|
|
def __init__(self, master, name, **kwargs):
|
|
self.name = name
|
|
self.master = master
|
|
if not self.master.get(self.name):
|
|
self.master[self.name] = {}
|
|
self.master[self.name].__init__()
|
|
|
|
def __getitem__(self, k):
|
|
return self.master[self.name].__getitem__(k)
|
|
|
|
def __setitem__(self, k, v):
|
|
return self.master[self.name].__setitem__(k, v)
|
|
|
|
def __delitem__(self, k):
|
|
return self.master[self.name].__delitem__(k)
|
|
|
|
def get(self, k, default=None):
|
|
return self.master[self.name].get(k, default)
|
|
|
|
def setdefault(self, k, default=None):
|
|
return self.master[self.name].setdefault(k, default)
|
|
|
|
def pop(self, *args, **kwargs):
|
|
return self.master[self.name].pop(*args, **kwargs)
|
|
|
|
def update(self, mapping=(), **kwargs):
|
|
self.master[self.name].update(mapping, **kwargs)
|
|
|
|
def __contains__(self, k):
|
|
return self.master[self.name].__contains__(k)
|
|
|
|
def copy(self): # don"t delegate w/ super - dict.copy() -> dict :(
|
|
return type(self)(self)
|
|
|
|
def keys(self):
|
|
return self.master[self.name].keys()
|
|
|
|
@classmethod
|
|
def fromkeys(cls, keys, v=None):
|
|
return cls.master[cls.name].fromkeys(keys, v)
|
|
|
|
def __repr__(self):
|
|
return "{0}({1})".format(
|
|
type(self).__name__, self.master[self.name].__repr__())
|
|
|
|
def master_keys(self):
|
|
return self.master.keys()
|
|
|
|
def __init__(self):
|
|
self.name = self.__class__.__name__
|
|
self.bundle_name = "OpenPypeFlame"
|
|
# self.prefs scope is limited to flame project and user
|
|
self.prefs = {}
|
|
self.prefs_user = {}
|
|
self.prefs_global = {}
|
|
self.log = log
|
|
|
|
try:
|
|
import flame
|
|
self.flame = flame
|
|
self.flame_project_name = self.flame.project.current_project.name
|
|
self.flame_user_name = flame.users.current_user.name
|
|
except Exception:
|
|
self.flame = None
|
|
self.flame_project_name = None
|
|
self.flame_user_name = None
|
|
|
|
import socket
|
|
self.hostname = socket.gethostname()
|
|
|
|
if sys.platform == "darwin":
|
|
self.prefs_folder = os.path.join(
|
|
os.path.expanduser("~"),
|
|
"Library",
|
|
"Caches",
|
|
"OpenPype",
|
|
self.bundle_name
|
|
)
|
|
elif sys.platform.startswith("linux"):
|
|
self.prefs_folder = os.path.join(
|
|
os.path.expanduser("~"),
|
|
".OpenPype",
|
|
self.bundle_name)
|
|
|
|
self.prefs_folder = os.path.join(
|
|
self.prefs_folder,
|
|
self.hostname,
|
|
)
|
|
|
|
self.log.info("[{}] waking up".format(self.__class__.__name__))
|
|
|
|
try:
|
|
self.load_prefs()
|
|
except RuntimeError:
|
|
self.save_prefs()
|
|
|
|
# menu auto-refresh defaults
|
|
if not self.prefs_global.get("menu_auto_refresh"):
|
|
self.prefs_global["menu_auto_refresh"] = {
|
|
"media_panel": True,
|
|
"batch": True,
|
|
"main_menu": True,
|
|
"timeline_menu": True
|
|
}
|
|
|
|
self.apps = []
|
|
|
|
def get_pref_file_paths(self):
|
|
|
|
prefix = self.prefs_folder + os.path.sep + self.bundle_name
|
|
prefs_file_path = "_".join([
|
|
prefix, self.flame_user_name,
|
|
self.flame_project_name]) + ".prefs"
|
|
prefs_user_file_path = "_".join([
|
|
prefix, self.flame_user_name]) + ".prefs"
|
|
prefs_global_file_path = prefix + ".prefs"
|
|
|
|
return (prefs_file_path, prefs_user_file_path, prefs_global_file_path)
|
|
|
|
def load_prefs(self):
|
|
|
|
(proj_pref_path, user_pref_path,
|
|
glob_pref_path) = self.get_pref_file_paths()
|
|
|
|
with io_preferences_file(self, proj_pref_path) as prefs_file:
|
|
self.prefs = pickle.load(prefs_file)
|
|
self.log.info(
|
|
"Project - preferences contents:\n{}".format(
|
|
pformat(self.prefs)
|
|
))
|
|
|
|
with io_preferences_file(self, user_pref_path) as prefs_file:
|
|
self.prefs_user = pickle.load(prefs_file)
|
|
self.log.info(
|
|
"User - preferences contents:\n{}".format(
|
|
pformat(self.prefs_user)
|
|
))
|
|
|
|
with io_preferences_file(self, glob_pref_path) as prefs_file:
|
|
self.prefs_global = pickle.load(prefs_file)
|
|
self.log.info(
|
|
"Global - preferences contents:\n{}".format(
|
|
pformat(self.prefs_global)
|
|
))
|
|
|
|
return True
|
|
|
|
def save_prefs(self):
|
|
# make sure the preference folder is available
|
|
if not os.path.isdir(self.prefs_folder):
|
|
try:
|
|
os.makedirs(self.prefs_folder)
|
|
except Exception:
|
|
self.log.info("Unable to create folder {}".format(
|
|
self.prefs_folder))
|
|
return False
|
|
|
|
# get all pref file paths
|
|
(proj_pref_path, user_pref_path,
|
|
glob_pref_path) = self.get_pref_file_paths()
|
|
|
|
with io_preferences_file(self, proj_pref_path, True) as prefs_file:
|
|
pickle.dump(self.prefs, prefs_file)
|
|
self.log.info(
|
|
"Project - preferences contents:\n{}".format(
|
|
pformat(self.prefs)
|
|
))
|
|
|
|
with io_preferences_file(self, user_pref_path, True) as prefs_file:
|
|
pickle.dump(self.prefs_user, prefs_file)
|
|
self.log.info(
|
|
"User - preferences contents:\n{}".format(
|
|
pformat(self.prefs_user)
|
|
))
|
|
|
|
with io_preferences_file(self, glob_pref_path, True) as prefs_file:
|
|
pickle.dump(self.prefs_global, prefs_file)
|
|
self.log.info(
|
|
"Global - preferences contents:\n{}".format(
|
|
pformat(self.prefs_global)
|
|
))
|
|
|
|
return True
|
|
|
|
|
|
def get_project_manager():
|
|
# TODO: get_project_manager
|
|
return
|
|
|
|
|
|
def get_media_storage():
|
|
# TODO: get_media_storage
|
|
return
|
|
|
|
|
|
def get_current_project():
|
|
import flame
|
|
return flame.project.current_project
|
|
|
|
|
|
def get_current_sequence(selection):
|
|
import flame
|
|
|
|
def segment_to_sequence(_segment):
|
|
track = _segment.parent
|
|
version = track.parent
|
|
return version.parent
|
|
|
|
process_timeline = None
|
|
|
|
if len(selection) == 1:
|
|
if isinstance(selection[0], flame.PySequence):
|
|
process_timeline = selection[0]
|
|
if isinstance(selection[0], flame.PySegment):
|
|
process_timeline = segment_to_sequence(selection[0])
|
|
else:
|
|
for segment in selection:
|
|
if isinstance(segment, flame.PySegment):
|
|
process_timeline = segment_to_sequence(segment)
|
|
break
|
|
|
|
return process_timeline
|
|
|
|
|
|
def create_bin(name, root=None):
|
|
# TODO: create_bin
|
|
return
|
|
|
|
|
|
def rescan_hooks():
|
|
import flame
|
|
try:
|
|
flame.execute_shortcut('Rescan Python Hooks')
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def get_metadata(project_name, _log=None):
|
|
from adsk.libwiretapPythonClientAPI import (
|
|
WireTapClient,
|
|
WireTapServerHandle,
|
|
WireTapNodeHandle,
|
|
WireTapStr
|
|
)
|
|
|
|
class GetProjectColorPolicy(object):
|
|
def __init__(self, host_name=None, _log=None):
|
|
# Create a connection to the Backburner manager using the Wiretap
|
|
# python API.
|
|
#
|
|
self.log = _log or log
|
|
self.host_name = host_name or "localhost"
|
|
self._wiretap_client = WireTapClient()
|
|
if not self._wiretap_client.init():
|
|
raise Exception("Could not initialize Wiretap Client")
|
|
self._server = WireTapServerHandle(
|
|
"{}:IFFFS".format(self.host_name))
|
|
|
|
def process(self, project_name):
|
|
policy_node_handle = WireTapNodeHandle(
|
|
self._server,
|
|
"/projects/{}/syncolor/policy".format(project_name)
|
|
)
|
|
self.log.info(policy_node_handle)
|
|
|
|
policy = WireTapStr()
|
|
if not policy_node_handle.getNodeTypeStr(policy):
|
|
self.log.warning(
|
|
"Could not retrieve policy of '%s': %s" % (
|
|
policy_node_handle.getNodeId().id(),
|
|
policy_node_handle.lastError()
|
|
)
|
|
)
|
|
|
|
return policy.c_str()
|
|
|
|
policy_wiretap = GetProjectColorPolicy(_log=_log)
|
|
return policy_wiretap.process(project_name)
|
|
|
|
|
|
def get_segment_data_marker(segment, with_marker=None):
|
|
"""
|
|
Get openpype track item tag created by creator or loader plugin.
|
|
|
|
Attributes:
|
|
segment (flame.PySegment): flame api object
|
|
with_marker (bool)[optional]: if true it will return also marker object
|
|
|
|
Returns:
|
|
dict: openpype tag data
|
|
|
|
Returns(with_marker=True):
|
|
flame.PyMarker, dict
|
|
"""
|
|
for marker in segment.markers:
|
|
comment = marker.comment.get_value()
|
|
color = marker.colour.get_value()
|
|
name = marker.name.get_value()
|
|
|
|
if (name == MARKER_NAME) and (
|
|
color == COLOR_MAP[MARKER_COLOR]):
|
|
if not with_marker:
|
|
return json.loads(comment)
|
|
else:
|
|
return marker, json.loads(comment)
|
|
|
|
|
|
def set_segment_data_marker(segment, data=None):
|
|
"""
|
|
Set openpype track item tag to input segment.
|
|
|
|
Attributes:
|
|
segment (flame.PySegment): flame api object
|
|
|
|
Returns:
|
|
dict: json loaded data
|
|
"""
|
|
data = data or dict()
|
|
|
|
marker_data = get_segment_data_marker(segment, True)
|
|
|
|
if marker_data:
|
|
# get available openpype tag if any
|
|
marker, tag_data = marker_data
|
|
# update tag data with new data
|
|
tag_data.update(data)
|
|
# update marker with tag data
|
|
marker.comment = json.dumps(tag_data)
|
|
else:
|
|
# update tag data with new data
|
|
marker = create_segment_data_marker(segment)
|
|
# add tag data to marker's comment
|
|
marker.comment = json.dumps(data)
|
|
|
|
|
|
def set_publish_attribute(segment, value):
|
|
""" Set Publish attribute in input Tag object
|
|
|
|
Attribute:
|
|
segment (flame.PySegment)): flame api object
|
|
value (bool): True or False
|
|
"""
|
|
tag_data = get_segment_data_marker(segment)
|
|
tag_data["publish"] = value
|
|
|
|
# set data to the publish attribute
|
|
set_segment_data_marker(segment, tag_data)
|
|
|
|
|
|
def get_publish_attribute(segment):
|
|
""" Get Publish attribute from input Tag object
|
|
|
|
Attribute:
|
|
segment (flame.PySegment)): flame api object
|
|
|
|
Returns:
|
|
bool: True or False
|
|
"""
|
|
tag_data = get_segment_data_marker(segment)
|
|
|
|
if not tag_data:
|
|
set_publish_attribute(segment, MARKER_PUBLISH_DEFAULT)
|
|
return MARKER_PUBLISH_DEFAULT
|
|
|
|
return tag_data["publish"]
|
|
|
|
|
|
def create_segment_data_marker(segment):
|
|
""" Create openpype marker on a segment.
|
|
|
|
Attributes:
|
|
segment (flame.PySegment): flame api object
|
|
|
|
Returns:
|
|
flame.PyMarker: flame api object
|
|
"""
|
|
# get duration of segment
|
|
duration = segment.record_duration.relative_frame
|
|
# calculate start frame of the new marker
|
|
start_frame = int(segment.record_in.relative_frame) + int(duration / 2)
|
|
# create marker
|
|
marker = segment.create_marker(start_frame)
|
|
# set marker name
|
|
marker.name = MARKER_NAME
|
|
# set duration
|
|
marker.duration = MARKER_DURATION
|
|
# set colour
|
|
marker.colour = COLOR_MAP[MARKER_COLOR] # Red
|
|
|
|
return marker
|
|
|
|
|
|
def get_sequence_segments(sequence, selected=False):
|
|
segments = []
|
|
# loop versions in sequence
|
|
for ver in sequence.versions:
|
|
# loop track in versions
|
|
for track in ver.tracks:
|
|
# ignore all empty tracks and hidden too
|
|
if len(track.segments) == 0 and track.hidden:
|
|
continue
|
|
# loop all segment in remaining tracks
|
|
for segment in track.segments:
|
|
if segment.name.get_value() == "":
|
|
continue
|
|
if (
|
|
selected is True
|
|
and segment.selected.get_value() is not True
|
|
):
|
|
continue
|
|
# add it to original selection
|
|
segments.append(segment)
|
|
return segments
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def maintained_segment_selection(sequence):
|
|
"""Maintain selection during context
|
|
|
|
Attributes:
|
|
sequence (flame.PySequence): python api object
|
|
|
|
Yield:
|
|
list of flame.PySegment
|
|
|
|
Example:
|
|
>>> with maintained_segment_selection(sequence) as selected_segments:
|
|
... for segment in selected_segments:
|
|
... segment.selected = False
|
|
>>> print(segment.selected)
|
|
True
|
|
"""
|
|
selected_segments = get_sequence_segments(sequence, True)
|
|
try:
|
|
# do the operation on selected segments
|
|
yield selected_segments
|
|
finally:
|
|
# reset all selected clips
|
|
reset_segment_selection(sequence)
|
|
# select only original selection of segments
|
|
for segment in selected_segments:
|
|
segment.selected = True
|
|
|
|
|
|
def reset_segment_selection(sequence):
|
|
"""Deselect all selected nodes
|
|
"""
|
|
for ver in sequence.versions:
|
|
for track in ver.tracks:
|
|
if len(track.segments) == 0 and track.hidden:
|
|
continue
|
|
for segment in track.segments:
|
|
segment.selected = False
|
|
|
|
|
|
def _get_shot_tokens_values(clip, tokens):
|
|
old_value = None
|
|
output = {}
|
|
|
|
if not clip.shot_name:
|
|
return output
|
|
|
|
old_value = clip.shot_name.get_value()
|
|
|
|
for token in tokens:
|
|
clip.shot_name.set_value(token)
|
|
_key = str(re.sub("[<>]", "", token)).replace(" ", "_")
|
|
|
|
try:
|
|
output[_key] = int(clip.shot_name.get_value())
|
|
except ValueError:
|
|
output[_key] = clip.shot_name.get_value()
|
|
|
|
clip.shot_name.set_value(old_value)
|
|
|
|
return output
|
|
|
|
|
|
def get_segment_attributes(segment):
|
|
if str(segment.name)[1:-1] == "":
|
|
return None
|
|
|
|
# Add timeline segment to tree
|
|
clip_data = {
|
|
"segment_name": segment.name.get_value(),
|
|
"segment_comment": segment.comment.get_value(),
|
|
"tape_name": segment.tape_name,
|
|
"source_name": segment.source_name,
|
|
"fpath": segment.file_path,
|
|
"PySegment": segment
|
|
}
|
|
|
|
# add all available shot tokens
|
|
shot_tokens = _get_shot_tokens_values(segment, [
|
|
"<colour space>", "<width>", "<height>", "<depth>", "<segment>",
|
|
"<track>", "<track name>"
|
|
])
|
|
clip_data.update(shot_tokens)
|
|
|
|
# populate shot source metadata
|
|
segment_attrs = [
|
|
"record_duration", "record_in", "record_out",
|
|
"source_duration", "source_in", "source_out"
|
|
]
|
|
segment_attrs_data = {}
|
|
for attr in segment_attrs:
|
|
if not hasattr(segment, attr):
|
|
continue
|
|
_value = getattr(segment, attr)
|
|
segment_attrs_data[attr] = str(_value).replace("+", ":")
|
|
|
|
if attr in ["record_in", "record_out"]:
|
|
clip_data[attr] = _value.relative_frame
|
|
else:
|
|
clip_data[attr] = _value.frame
|
|
|
|
clip_data["segment_timecodes"] = segment_attrs_data
|
|
|
|
return clip_data
|