move traypublisher next to server codebase

This commit is contained in:
Jakub Trllo 2024-05-24 10:29:09 +02:00
parent a25bda81a1
commit 7fd8ca81e4
43 changed files with 0 additions and 0 deletions

View file

@ -0,0 +1,6 @@
from .addon import TrayPublishAddon
__all__ = (
"TrayPublishAddon",
)

View file

@ -0,0 +1,119 @@
import os
from pathlib import Path
from ayon_core.lib import get_ayon_launcher_args
from ayon_core.lib.execute import run_detached_process
from ayon_core.addon import (
click_wrap,
AYONAddon,
ITrayAction,
IHostAddon,
)
TRAYPUBLISH_ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
class TrayPublishAddon(AYONAddon, IHostAddon, ITrayAction):
label = "Publisher"
name = "traypublisher"
host_name = "traypublisher"
def initialize(self, settings):
self.publish_paths = [
os.path.join(TRAYPUBLISH_ROOT_DIR, "plugins", "publish")
]
def tray_init(self):
return
def on_action_trigger(self):
self.run_traypublisher()
def connect_with_addons(self, enabled_modules):
"""Collect publish paths from other modules."""
publish_paths = self.manager.collect_plugin_paths()["publish"]
self.publish_paths.extend(publish_paths)
def run_traypublisher(self):
args = get_ayon_launcher_args(
"addon", self.name, "launch"
)
run_detached_process(args)
def cli(self, click_group):
click_group.add_command(cli_main.to_click_obj())
@click_wrap.group(
TrayPublishAddon.name,
help="TrayPublisher related commands.")
def cli_main():
pass
@cli_main.command()
def launch():
"""Launch TrayPublish tool UI."""
from ayon_core.tools import traypublisher
traypublisher.main()
@cli_main.command()
@click_wrap.option(
"--filepath",
help="Full path to CSV file with data",
type=str,
required=True
)
@click_wrap.option(
"--project",
help="Project name in which the context will be used",
type=str,
required=True
)
@click_wrap.option(
"--folder-path",
help="Asset name in which the context will be used",
type=str,
required=True
)
@click_wrap.option(
"--task",
help="Task name under Asset in which the context will be used",
type=str,
required=False
)
@click_wrap.option(
"--ignore-validators",
help="Option to ignore validators",
type=bool,
is_flag=True,
required=False
)
def ingestcsv(
filepath,
project,
folder_path,
task,
ignore_validators
):
"""Ingest CSV file into project.
This command will ingest CSV file into project. CSV file must be in
specific format. See documentation for more information.
"""
from .csv_publish import csvpublish
# use Path to check if csv_filepath exists
if not Path(filepath).exists():
raise FileNotFoundError(f"File {filepath} does not exist.")
csvpublish(
filepath,
project,
folder_path,
task,
ignore_validators
)

View file

@ -0,0 +1,8 @@
from .pipeline import (
TrayPublisherHost,
)
__all__ = (
"TrayPublisherHost",
)

View file

@ -0,0 +1,363 @@
import re
from copy import deepcopy
import ayon_api
from ayon_core.pipeline.create import CreatorError
class ShotMetadataSolver:
""" Solving hierarchical metadata
Used during editorial publishing. Works with input
clip name and settings defining python formatable
template. Settings also define searching patterns
and its token keys used for formatting in templates.
"""
NO_DECOR_PATERN = re.compile(r"\{([a-z]*?)\}")
def __init__(self, logger):
self.clip_name_tokenizer = []
self.shot_rename = {
"enabled": False,
"shot_rename_template": "",
}
self.shot_hierarchy = {
"enabled": False,
"parents": [],
"parents_path": "",
}
self.shot_add_tasks = []
self.log = logger
def update_data(
self,
clip_name_tokenizer,
shot_rename,
shot_hierarchy,
shot_add_tasks
):
self.clip_name_tokenizer = clip_name_tokenizer
self.shot_rename = shot_rename
self.shot_hierarchy = shot_hierarchy
self.shot_add_tasks = shot_add_tasks
def _rename_template(self, data):
"""Shot renaming function
Args:
data (dict): formatting data
Raises:
CreatorError: If missing keys
Returns:
str: formatted new name
"""
shot_rename_template = self.shot_rename[
"shot_rename_template"]
try:
# format to new shot name
return shot_rename_template.format(**data)
except KeyError as _error:
raise CreatorError((
"Make sure all keys in settings are correct:: \n\n"
f"From template string {shot_rename_template} > "
f"`{_error}` has no equivalent in \n"
f"{list(data.keys())} input formatting keys!"
))
def _generate_tokens(self, clip_name, source_data):
"""Token generator
Settings defines token pairs key and regex expression.
Args:
clip_name (str): name of clip in editorial
source_data (dict): data for formatting
Raises:
CreatorError: if missing key
Returns:
dict: updated source_data
"""
output_data = deepcopy(source_data["anatomy_data"])
output_data["clip_name"] = clip_name
if not self.clip_name_tokenizer:
return output_data
parent_name = source_data["selected_folder_entity"]["name"]
search_text = parent_name + clip_name
for clip_name_item in self.clip_name_tokenizer:
token_key = clip_name_item["name"]
pattern = clip_name_item["regex"]
p = re.compile(pattern)
match = p.findall(search_text)
if not match:
raise CreatorError((
"Make sure regex expression works with your data: \n\n"
f"'{token_key}' with regex '{pattern}' in your settings\n"
"can't find any match in your clip name "
f"'{search_text}'!\n\nLook to: "
"'project_settings/traypublisher/editorial_creators"
"/editorial_simple/clip_name_tokenizer'\n"
"at your project settings..."
))
# QUESTION:how to refactor `match[-1]` to some better way?
output_data[token_key] = match[-1]
return output_data
def _create_parents_from_settings(self, parents, data):
"""formatting parent components.
Args:
parents (list): list of dict parent components
data (dict): formatting data
Raises:
CreatorError: missing formatting key
CreatorError: missing token key
KeyError: missing parent token
Returns:
list: list of dict of parent components
"""
# fill the parents parts from presets
shot_hierarchy = deepcopy(self.shot_hierarchy)
hierarchy_parents = shot_hierarchy["parents"]
# fill parent keys data template from anatomy data
try:
_parent_tokens_formatting_data = {
parent_token["name"]: parent_token["value"].format(**data)
for parent_token in hierarchy_parents
}
except KeyError as _error:
raise CreatorError((
"Make sure all keys in settings are correct : \n"
f"`{_error}` has no equivalent in \n{list(data.keys())}"
))
_parent_tokens_type = {
parent_token["name"]: parent_token["parent_type"]
for parent_token in hierarchy_parents
}
for _index, _parent in enumerate(
shot_hierarchy["parents_path"].split("/")
):
# format parent token with value which is formatted
try:
parent_name = _parent.format(
**_parent_tokens_formatting_data)
except KeyError as _error:
raise CreatorError((
"Make sure all keys in settings are correct:\n\n"
f"`{_error}` from template string"
f" {shot_hierarchy['parents_path']},"
f" has no equivalent in"
f"\n{list(_parent_tokens_formatting_data.keys())} parents"
))
parent_token_name = (
self.NO_DECOR_PATERN.findall(_parent).pop())
if not parent_token_name:
raise KeyError(
f"Parent token is not found in: `{_parent}`")
# find parent type
parent_token_type = _parent_tokens_type[parent_token_name]
# in case selected context is set to the same folder
# TODO keep index with 'parents' - name check is not enough
if (
_index == 0
and parents[-1]["entity_name"] == parent_name
):
continue
# in case first parent is project then start parents from start
if (
_index == 0
and parent_token_type.lower() == "project"
):
project_parent = parents[0]
parents = [project_parent]
continue
parents.append({
"entity_type": "folder",
"folder_type": parent_token_type.lower(),
"entity_name": parent_name
})
return parents
def _create_hierarchy_path(self, parents):
"""Converting hierarchy path from parents
Args:
parents (list): list of dict parent components
Returns:
str: hierarchy path
"""
return "/".join(
[
p["entity_name"] for p in parents
if p["entity_type"] != "project"
]
) if parents else ""
def _get_parents_from_selected_folder(
self,
project_entity,
folder_entity,
):
"""Returning parents from context on selected folder.
Context defined in Traypublisher project tree.
Args:
project_entity (dict[str, Any]): Project entity.
folder_entity (dict[str, Any]): Selected folder entity.
Returns:
list: list of dict parent components
"""
project_name = project_entity["name"]
path_entries = folder_entity["path"].split("/")
subpaths = []
subpath_items = []
for name in path_entries:
subpath_items.append(name)
if name:
subpaths.append("/".join(subpath_items))
# Remove last name because we already have folder entity
subpaths.pop(-1)
folder_entity_by_path = {}
if subpaths:
folder_entity_by_path = {
parent_folder["path"]: parent_folder
for parent_folder in ayon_api.get_folders(
project_name, folder_paths=subpaths
)
}
folders_hierarchy = [
folder_entity_by_path[folder_path]
for folder_path in subpaths
]
folders_hierarchy.append(folder_entity)
# add current selection context hierarchy
output = [{
"entity_type": "project",
"entity_name": project_name,
}]
for entity in folders_hierarchy:
output.append({
"entity_type": "folder",
"folder_type": entity["folderType"],
"entity_name": entity["name"]
})
return output
def _generate_tasks_from_settings(self, project_entity):
"""Convert settings inputs to task data.
Args:
project_entity (dict): Project entity.
Raises:
KeyError: Missing task type in project doc
Returns:
dict: tasks data
"""
tasks_to_add = {}
project_task_types = project_entity["taskTypes"]
task_type_names = {
task_type["name"]
for task_type in project_task_types
}
for task_item in self.shot_add_tasks:
task_name = task_item["name"]
task_type = task_item["task_type"]
# check if task type in project task types
if task_type not in task_type_names:
raise KeyError(
"Missing task type `{}` for `{}` is not"
" existing in `{}``".format(
task_type,
task_name,
list(task_type_names)
)
)
tasks_to_add[task_name] = {"type": task_type}
return tasks_to_add
def generate_data(self, clip_name, source_data):
"""Metadata generator.
Converts input data to hierarchy mentadata.
Args:
clip_name (str): clip name
source_data (dict): formatting data
Returns:
(str, dict): shot name and hierarchy data
"""
tasks = {}
folder_entity = source_data["selected_folder_entity"]
project_entity = source_data["project_entity"]
# match clip to shot name at start
shot_name = clip_name
# parse all tokens and generate formatting data
formatting_data = self._generate_tokens(shot_name, source_data)
# generate parents from selected folder
parents = self._get_parents_from_selected_folder(
project_entity, folder_entity
)
if self.shot_rename["enabled"]:
shot_name = self._rename_template(formatting_data)
self.log.info(f"Renamed shot name: {shot_name}")
if self.shot_hierarchy["enabled"]:
parents = self._create_parents_from_settings(
parents, formatting_data)
if self.shot_add_tasks:
tasks = self._generate_tasks_from_settings(
project_entity)
# generate hierarchy path from parents
hierarchy_path = self._create_hierarchy_path(parents)
if hierarchy_path:
folder_path = f"/{hierarchy_path}/{shot_name}"
else:
folder_path = f"/{shot_name}"
return shot_name, {
"hierarchy": hierarchy_path,
"folderPath": folder_path,
"parents": parents,
"tasks": tasks
}

View file

@ -0,0 +1,179 @@
import os
import json
import tempfile
import atexit
import pyblish.api
from ayon_core.pipeline import (
register_creator_plugin_path,
)
from ayon_core.host import HostBase, IPublishHost
ROOT_DIR = os.path.dirname(os.path.dirname(
os.path.abspath(__file__)
))
PUBLISH_PATH = os.path.join(ROOT_DIR, "plugins", "publish")
CREATE_PATH = os.path.join(ROOT_DIR, "plugins", "create")
class TrayPublisherHost(HostBase, IPublishHost):
name = "traypublisher"
def install(self):
os.environ["AYON_HOST_NAME"] = self.name
pyblish.api.register_host("traypublisher")
pyblish.api.register_plugin_path(PUBLISH_PATH)
register_creator_plugin_path(CREATE_PATH)
def get_context_title(self):
return HostContext.get_project_name()
def get_context_data(self):
return HostContext.get_context_data()
def update_context_data(self, data, changes):
HostContext.save_context_data(data)
def set_project_name(self, project_name):
# TODO Deregister project specific plugins and register new project
# plugins
os.environ["AYON_PROJECT_NAME"] = project_name
HostContext.set_project_name(project_name)
class HostContext:
_context_json_path = None
@staticmethod
def _on_exit():
if (
HostContext._context_json_path
and os.path.exists(HostContext._context_json_path)
):
os.remove(HostContext._context_json_path)
@classmethod
def get_context_json_path(cls):
if cls._context_json_path is None:
output_file = tempfile.NamedTemporaryFile(
mode="w", prefix="traypub_", suffix=".json"
)
output_file.close()
cls._context_json_path = output_file.name
atexit.register(HostContext._on_exit)
print(cls._context_json_path)
return cls._context_json_path
@classmethod
def _get_data(cls, group=None):
json_path = cls.get_context_json_path()
data = {}
if not os.path.exists(json_path):
with open(json_path, "w") as json_stream:
json.dump(data, json_stream)
else:
with open(json_path, "r") as json_stream:
content = json_stream.read()
if content:
data = json.loads(content)
if group is None:
return data
return data.get(group)
@classmethod
def _save_data(cls, group, new_data):
json_path = cls.get_context_json_path()
data = cls._get_data()
data[group] = new_data
with open(json_path, "w") as json_stream:
json.dump(data, json_stream)
@classmethod
def add_instance(cls, instance):
instances = cls.get_instances()
instances.append(instance)
cls.save_instances(instances)
@classmethod
def get_instances(cls):
return cls._get_data("instances") or []
@classmethod
def save_instances(cls, instances):
cls._save_data("instances", instances)
@classmethod
def get_context_data(cls):
return cls._get_data("context") or {}
@classmethod
def save_context_data(cls, data):
cls._save_data("context", data)
@classmethod
def get_project_name(cls):
return cls._get_data("project_name")
@classmethod
def set_project_name(cls, project_name):
cls._save_data("project_name", project_name)
@classmethod
def get_data_to_store(cls):
return {
"project_name": cls.get_project_name(),
"instances": cls.get_instances(),
"context": cls.get_context_data(),
}
def list_instances():
return HostContext.get_instances()
def update_instances(update_list):
updated_instances = {}
for instance, _changes in update_list:
updated_instances[instance.id] = instance.data_to_store()
instances = HostContext.get_instances()
for instance_data in instances:
instance_id = instance_data["instance_id"]
if instance_id in updated_instances:
new_instance_data = updated_instances[instance_id]
old_keys = set(instance_data.keys())
new_keys = set(new_instance_data.keys())
instance_data.update(new_instance_data)
for key in (old_keys - new_keys):
instance_data.pop(key)
HostContext.save_instances(instances)
def remove_instances(instances):
if not isinstance(instances, (tuple, list)):
instances = [instances]
current_instances = HostContext.get_instances()
for instance in instances:
instance_id = instance.data["instance_id"]
found_idx = None
for idx, _instance in enumerate(current_instances):
if instance_id == _instance["instance_id"]:
found_idx = idx
break
if found_idx is not None:
current_instances.pop(found_idx)
HostContext.save_instances(current_instances)
def get_context_data():
return HostContext.get_context_data()
def update_context_data(data, changes):
HostContext.save_context_data(data)

View file

@ -0,0 +1,337 @@
import ayon_api
from ayon_core.lib.attribute_definitions import (
FileDef,
BoolDef,
NumberDef,
UISeparatorDef,
)
from ayon_core.lib.transcoding import IMAGE_EXTENSIONS, VIDEO_EXTENSIONS
from ayon_core.pipeline.create import (
Creator,
HiddenCreator,
CreatedInstance,
cache_and_get_instances,
PRE_CREATE_THUMBNAIL_KEY,
)
from .pipeline import (
list_instances,
update_instances,
remove_instances,
HostContext,
)
REVIEW_EXTENSIONS = set(IMAGE_EXTENSIONS) | set(VIDEO_EXTENSIONS)
SHARED_DATA_KEY = "openpype.traypublisher.instances"
class HiddenTrayPublishCreator(HiddenCreator):
host_name = "traypublisher"
settings_category = "traypublisher"
def collect_instances(self):
instances_by_identifier = cache_and_get_instances(
self, SHARED_DATA_KEY, list_instances
)
for instance_data in instances_by_identifier[self.identifier]:
instance = CreatedInstance.from_existing(instance_data, self)
self._add_instance_to_context(instance)
def update_instances(self, update_list):
update_instances(update_list)
def remove_instances(self, instances):
remove_instances(instances)
for instance in instances:
self._remove_instance_from_context(instance)
def _store_new_instance(self, new_instance):
"""Tray publisher specific method to store instance.
Instance is stored into "workfile" of traypublisher and also add it
to CreateContext.
Args:
new_instance (CreatedInstance): Instance that should be stored.
"""
# Host implementation of storing metadata about instance
HostContext.add_instance(new_instance.data_to_store())
# Add instance to current context
self._add_instance_to_context(new_instance)
class TrayPublishCreator(Creator):
create_allow_context_change = True
host_name = "traypublisher"
settings_category = "traypublisher"
def collect_instances(self):
instances_by_identifier = cache_and_get_instances(
self, SHARED_DATA_KEY, list_instances
)
for instance_data in instances_by_identifier[self.identifier]:
instance = CreatedInstance.from_existing(instance_data, self)
self._add_instance_to_context(instance)
def update_instances(self, update_list):
update_instances(update_list)
def remove_instances(self, instances):
remove_instances(instances)
for instance in instances:
self._remove_instance_from_context(instance)
def _store_new_instance(self, new_instance):
"""Tray publisher specific method to store instance.
Instance is stored into "workfile" of traypublisher and also add it
to CreateContext.
Args:
new_instance (CreatedInstance): Instance that should be stored.
"""
# Host implementation of storing metadata about instance
HostContext.add_instance(new_instance.data_to_store())
new_instance.mark_as_stored()
# Add instance to current context
self._add_instance_to_context(new_instance)
class SettingsCreator(TrayPublishCreator):
create_allow_context_change = True
create_allow_thumbnail = True
allow_version_control = False
extensions = []
def create(self, product_name, data, pre_create_data):
# Pass precreate data to creator attributes
thumbnail_path = pre_create_data.pop(PRE_CREATE_THUMBNAIL_KEY, None)
# Fill 'version_to_use' if version control is enabled
if self.allow_version_control:
folder_path = data["folderPath"]
product_entities_by_folder_path = self._prepare_next_versions(
[folder_path], [product_name])
version = product_entities_by_folder_path[folder_path].get(
product_name
)
pre_create_data["version_to_use"] = version
data["_previous_last_version"] = version
data["creator_attributes"] = pre_create_data
data["settings_creator"] = True
# Create new instance
new_instance = CreatedInstance(
self.product_type, product_name, data, self
)
self._store_new_instance(new_instance)
if thumbnail_path:
self.set_instance_thumbnail_path(new_instance.id, thumbnail_path)
def _prepare_next_versions(self, folder_paths, product_names):
"""Prepare next versions for given folder and product names.
Todos:
Expect combination of product names by folder path to avoid
unnecessary server calls for unused products.
Args:
folder_paths (Iterable[str]): Folder paths.
product_names (Iterable[str]): Product names.
Returns:
dict[str, dict[str, int]]: Last versions by fodler path
and product names.
"""
# Prepare all versions for all combinations to '1'
# TODO use 'ayon_core.pipeline.version_start' logic
product_entities_by_folder_path = {
folder_path: {
product_name: 1
for product_name in product_names
}
for folder_path in folder_paths
}
if not folder_paths or not product_names:
return product_entities_by_folder_path
folder_entities = ayon_api.get_folders(
self.project_name,
folder_paths=folder_paths,
fields={"id", "path"}
)
folder_paths_by_id = {
folder_entity["id"]: folder_entity["path"]
for folder_entity in folder_entities
}
product_entities = list(ayon_api.get_products(
self.project_name,
folder_ids=folder_paths_by_id.keys(),
product_names=product_names,
fields={"id", "name", "folderId"}
))
product_ids = {p["id"] for p in product_entities}
last_versions = ayon_api.get_last_versions(
self.project_name,
product_ids,
fields={"version", "productId"})
for product_entity in product_entities:
product_id = product_entity["id"]
product_name = product_entity["name"]
folder_id = product_entity["folderId"]
folder_path = folder_paths_by_id[folder_id]
last_version = last_versions.get(product_id)
version = 0
if last_version is not None:
version = last_version["version"]
product_entities_by_folder_path[folder_path][product_name] += (
version
)
return product_entities_by_folder_path
def _fill_next_versions(self, instances_data):
"""Fill next version for instances.
Instances have also stored previous next version to be able to
recognize if user did enter different version. If version was
not changed by user, or user set it to '0' the next version will be
updated by current database state.
"""
filtered_instance_data = []
for instance in instances_data:
previous_last_version = instance.get("_previous_last_version")
creator_attributes = instance["creator_attributes"]
use_next_version = creator_attributes.get(
"use_next_version", True)
version = creator_attributes.get("version_to_use", 0)
if (
use_next_version
or version == 0
or version == previous_last_version
):
filtered_instance_data.append(instance)
folder_paths = {
instance["folderPath"]
for instance in filtered_instance_data
}
product_names = {
instance["productName"]
for instance in filtered_instance_data}
product_entities_by_folder_path = self._prepare_next_versions(
folder_paths, product_names
)
for instance in filtered_instance_data:
folder_path = instance["folderPath"]
product_name = instance["productName"]
version = product_entities_by_folder_path[folder_path][product_name]
instance["creator_attributes"]["version_to_use"] = version
instance["_previous_last_version"] = version
def collect_instances(self):
"""Collect instances from host.
Overriden to be able to manage version control attributes. If version
control is disabled, the attributes will be removed from instances,
and next versions are filled if is version control enabled.
"""
instances_by_identifier = cache_and_get_instances(
self, SHARED_DATA_KEY, list_instances
)
instances = instances_by_identifier[self.identifier]
if not instances:
return
if self.allow_version_control:
self._fill_next_versions(instances)
for instance_data in instances:
# Make sure that there are not data related to version control
# if plugin does not support it
if not self.allow_version_control:
instance_data.pop("_previous_last_version", None)
creator_attributes = instance_data["creator_attributes"]
creator_attributes.pop("version_to_use", None)
creator_attributes.pop("use_next_version", None)
instance = CreatedInstance.from_existing(instance_data, self)
self._add_instance_to_context(instance)
def get_instance_attr_defs(self):
defs = self.get_pre_create_attr_defs()
if self.allow_version_control:
defs += [
UISeparatorDef(),
BoolDef(
"use_next_version",
default=True,
label="Use next version",
),
NumberDef(
"version_to_use",
default=1,
minimum=0,
maximum=999,
label="Version to use",
)
]
return defs
def get_pre_create_attr_defs(self):
# Use same attributes as for instance attributes
return [
FileDef(
"representation_files",
folders=False,
extensions=self.extensions,
allow_sequences=self.allow_sequences,
single_item=not self.allow_multiple_items,
label="Representations",
),
FileDef(
"reviewable",
folders=False,
extensions=REVIEW_EXTENSIONS,
allow_sequences=True,
single_item=True,
label="Reviewable representations",
extensions_label="Single reviewable item"
)
]
@classmethod
def from_settings(cls, item_data):
identifier = item_data["identifier"]
product_type = item_data["product_type"]
if not identifier:
identifier = "settings_{}".format(product_type)
return type(
"{}{}".format(cls.__name__, identifier),
(cls, ),
{
"product_type": product_type,
"identifier": identifier,
"label": item_data["label"].strip(),
"icon": item_data["icon"],
"description": item_data["description"],
"detailed_description": item_data["detailed_description"],
"extensions": item_data["extensions"],
"allow_sequences": item_data["allow_sequences"],
"allow_multiple_items": item_data["allow_multiple_items"],
"allow_version_control": item_data.get(
"allow_version_control", False),
"default_variants": item_data["default_variants"],
}
)

View file

@ -0,0 +1,122 @@
"""Functions to parse asset names, versions from file names"""
import os
import re
import ayon_api
from ayon_core.lib import Logger
def get_folder_entity_from_filename(
project_name,
source_filename,
version_regex,
all_selected_folder_ids=None
):
"""Try to parse out folder name from file name provided.
Artists might provide various file name formats.
Currently handled:
- chair.mov
- chair_v001.mov
- my_chair_to_upload.mov
"""
version = None
folder_name = os.path.splitext(source_filename)[0]
# Always first check if source filename is directly folder
# (eg. 'chair.mov')
matching_folder_entity = get_folder_by_name_case_not_sensitive(
project_name, folder_name, all_selected_folder_ids)
if matching_folder_entity is None:
# name contains also a version
matching_folder_entity, version = (
parse_with_version(
project_name,
folder_name,
version_regex,
all_selected_folder_ids
)
)
if matching_folder_entity is None:
matching_folder_entity = parse_containing(
project_name,
folder_name,
all_selected_folder_ids
)
return matching_folder_entity, version
def parse_with_version(
project_name,
folder_name,
version_regex,
all_selected_folder_ids=None,
log=None
):
"""Try to parse folder name from a file name containing version too
Eg. 'chair_v001.mov' >> 'chair', 1
"""
if not log:
log = Logger.get_logger(__name__)
log.debug(
("Folder entity by \"{}\" was not found, trying version regex.".
format(folder_name)))
matching_folder_entity = version_number = None
regex_result = version_regex.findall(folder_name)
if regex_result:
_folder_name, _version_number = regex_result[0]
matching_folder_entity = get_folder_by_name_case_not_sensitive(
project_name,
_folder_name,
all_selected_folder_ids=all_selected_folder_ids
)
if matching_folder_entity:
version_number = int(_version_number)
return matching_folder_entity, version_number
def parse_containing(project_name, folder_name, all_selected_folder_ids=None):
"""Look if file name contains any existing folder name"""
for folder_entity in ayon_api.get_folders(
project_name,
folder_ids=all_selected_folder_ids,
fields={"id", "name"}
):
if folder_entity["name"].lower() in folder_name.lower():
return ayon_api.get_folder_by_id(
project_name,
folder_entity["id"]
)
def get_folder_by_name_case_not_sensitive(
project_name,
folder_name,
all_selected_folder_ids=None,
log=None
):
"""Handle more cases in file names"""
if not log:
log = Logger.get_logger(__name__)
folder_name = re.compile(folder_name, re.IGNORECASE)
folder_entities = list(ayon_api.get_folders(
project_name,
folder_ids=all_selected_folder_ids,
folder_names=[folder_name]
))
if len(folder_entities) > 1:
log.warning("Too many records found for {}".format(
folder_name))
return None
if folder_entities:
return folder_entities.pop()

View file

@ -0,0 +1,84 @@
import pyblish.api
import pyblish.util
from ayon_api import get_folder_by_path, get_task_by_name
from ayon_core.lib.attribute_definitions import FileDefItem
from ayon_core.pipeline import install_host
from ayon_core.pipeline.create import CreateContext
from ayon_core.hosts.traypublisher.api import TrayPublisherHost
def csvpublish(
filepath,
project_name,
folder_path,
task_name=None,
ignore_validators=False
):
"""Publish CSV file.
Args:
filepath (str): Path to CSV file.
project_name (str): Project name.
folder_path (str): Folder path.
task_name (Optional[str]): Task name.
ignore_validators (Optional[bool]): Option to ignore validators.
"""
# initialization of host
host = TrayPublisherHost()
install_host(host)
# setting host context into project
host.set_project_name(project_name)
# form precreate data with field values
file_field = FileDefItem.from_paths([filepath], False).pop().to_dict()
precreate_data = {
"csv_filepath_data": file_field,
}
# create context initialization
create_context = CreateContext(host, headless=True)
folder_entity = get_folder_by_path(
project_name,
folder_path=folder_path,
)
if not folder_entity:
ValueError(
f"Folder path '{folder_path}' doesn't "
f"exists at project '{project_name}'."
)
task_entity = get_task_by_name(
project_name,
folder_entity["id"],
task_name,
)
if not task_entity:
ValueError(
f"Task name '{task_name}' doesn't "
f"exists at folder '{folder_path}'."
)
create_context.create(
"io.ayon.creators.traypublisher.csv_ingest",
"Main",
folder_entity=folder_entity,
task_entity=task_entity,
pre_create_data=precreate_data,
)
# publishing context initialization
pyblish_context = pyblish.api.Context()
pyblish_context.data["create_context"] = create_context
# redefine targets (skip 'local' to disable validators)
if ignore_validators:
targets = ["default", "ingest"]
# publishing
pyblish.util.publish(context=pyblish_context, targets=targets)

View file

@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
"""Creator of colorspace look files.
This creator is used to publish colorspace look files thanks to
production type `ociolook`. All files are published as representation.
"""
from pathlib import Path
import ayon_api
from ayon_core.lib.attribute_definitions import (
FileDef, EnumDef, TextDef, UISeparatorDef
)
from ayon_core.pipeline import (
CreatedInstance,
CreatorError
)
from ayon_core.pipeline import colorspace
from ayon_core.hosts.traypublisher.api.plugin import TrayPublishCreator
class CreateColorspaceLook(TrayPublishCreator):
"""Creates colorspace look files."""
identifier = "io.openpype.creators.traypublisher.colorspace_look"
label = "Colorspace Look"
product_type = "ociolook"
description = "Publishes color space look file."
extensions = [".cc", ".cube", ".3dl", ".spi1d", ".spi3d", ".csp", ".lut"]
enabled = False
colorspace_items = [
(None, "Not set")
]
colorspace_attr_show = False
config_items = None
config_data = None
def get_detail_description(self):
return """# Colorspace Look
This creator publishes color space look file (LUT).
"""
def get_icon(self):
return "mdi.format-color-fill"
def create(self, product_name, instance_data, pre_create_data):
repr_file = pre_create_data.get("luts_file")
if not repr_file:
raise CreatorError("No files specified")
files = repr_file.get("filenames")
if not files:
# this should never happen
raise CreatorError("Missing files from representation")
folder_path = instance_data["folderPath"]
task_name = instance_data["task"]
folder_entity = ayon_api.get_folder_by_path(
self.project_name, folder_path)
task_entity = None
if task_name:
task_entity = ayon_api.get_task_by_name(
self.project_name, folder_entity["id"], task_name
)
product_name = self.get_product_name(
project_name=self.project_name,
folder_entity=folder_entity,
task_entity=task_entity,
variant=instance_data["variant"],
)
instance_data["creator_attributes"] = {
"abs_lut_path": (
Path(repr_file["directory"]) / files[0]).as_posix()
}
# Create new instance
new_instance = CreatedInstance(self.product_type, product_name,
instance_data, self)
new_instance.transient_data["config_items"] = self.config_items
new_instance.transient_data["config_data"] = self.config_data
self._store_new_instance(new_instance)
def collect_instances(self):
super().collect_instances()
for instance in self.create_context.instances:
if instance.creator_identifier == self.identifier:
instance.transient_data["config_items"] = self.config_items
instance.transient_data["config_data"] = self.config_data
def get_instance_attr_defs(self):
return [
EnumDef(
"working_colorspace",
self.colorspace_items,
default="Not set",
label="Working Colorspace",
),
UISeparatorDef(
label="Advanced1"
),
TextDef(
"abs_lut_path",
label="LUT Path",
),
EnumDef(
"input_colorspace",
self.colorspace_items,
default="Not set",
label="Input Colorspace",
),
EnumDef(
"direction",
[
(None, "Not set"),
("forward", "Forward"),
("inverse", "Inverse")
],
default="Not set",
label="Direction"
),
EnumDef(
"interpolation",
[
(None, "Not set"),
("linear", "Linear"),
("tetrahedral", "Tetrahedral"),
("best", "Best"),
("nearest", "Nearest")
],
default="Not set",
label="Interpolation"
),
EnumDef(
"output_colorspace",
self.colorspace_items,
default="Not set",
label="Output Colorspace",
),
]
def get_pre_create_attr_defs(self):
return [
FileDef(
"luts_file",
folders=False,
extensions=self.extensions,
allow_sequences=False,
single_item=True,
label="Look Files",
)
]
def apply_settings(self, project_settings):
config_data = colorspace.get_current_context_imageio_config_preset(
project_settings=project_settings
)
if not config_data:
self.enabled = False
return
filepath = config_data["path"]
config_items = colorspace.get_ocio_config_colorspaces(filepath)
labeled_colorspaces = colorspace.get_colorspaces_enumerator_items(
config_items,
include_aliases=True,
include_roles=True
)
self.config_items = config_items
self.config_data = config_data
self.colorspace_items.extend(labeled_colorspaces)
self.enabled = True

View file

@ -0,0 +1,741 @@
import os
import re
import csv
import clique
from io import StringIO
from copy import deepcopy, copy
from ayon_api import get_folder_by_path, get_task_by_name
from ayon_core.pipeline.create import get_product_name
from ayon_core.pipeline import CreatedInstance
from ayon_core.lib import FileDef, BoolDef
from ayon_core.lib.transcoding import (
VIDEO_EXTENSIONS, IMAGE_EXTENSIONS
)
from ayon_core.pipeline.create import CreatorError
from ayon_core.hosts.traypublisher.api.plugin import (
TrayPublishCreator
)
class IngestCSV(TrayPublishCreator):
"""CSV ingest creator class"""
icon = "fa.file"
label = "CSV Ingest"
product_type = "csv_ingest_file"
identifier = "io.ayon.creators.traypublisher.csv_ingest"
default_variants = ["Main"]
description = "Ingest products' data from CSV file"
detailed_description = """
Ingest products' data from CSV file following column and representation
configuration in project settings.
"""
# Position in the list of creators.
order = 10
# settings for this creator
columns_config = {}
representations_config = {}
def create(self, subset_name, instance_data, pre_create_data):
"""Create an product from each row found in the CSV.
Args:
subset_name (str): The subset name.
instance_data (dict): The instance data.
pre_create_data (dict):
"""
csv_filepath_data = pre_create_data.get("csv_filepath_data", {})
folder = csv_filepath_data.get("directory", "")
if not os.path.exists(folder):
raise CreatorError(
f"Directory '{folder}' does not exist."
)
filename = csv_filepath_data.get("filenames", [])
self._process_csv_file(subset_name, instance_data, folder, filename[0])
def _process_csv_file(
self, subset_name, instance_data, staging_dir, filename):
"""Process CSV file.
Args:
subset_name (str): The subset name.
instance_data (dict): The instance data.
staging_dir (str): The staging directory.
filename (str): The filename.
"""
# create new instance from the csv file via self function
self._pass_data_to_csv_instance(
instance_data,
staging_dir,
filename
)
csv_instance = CreatedInstance(
self.product_type, subset_name, instance_data, self
)
self._store_new_instance(csv_instance)
csv_instance["csvFileData"] = {
"filename": filename,
"staging_dir": staging_dir,
}
# from special function get all data from csv file and convert them
# to new instances
csv_data_for_instances = self._get_data_from_csv(
staging_dir, filename)
# create instances from csv data via self function
self._create_instances_from_csv_data(
csv_data_for_instances, staging_dir
)
def _create_instances_from_csv_data(
self,
csv_data_for_instances,
staging_dir
):
"""Create instances from csv data"""
for folder_path, prepared_data in csv_data_for_instances.items():
project_name = self.create_context.get_current_project_name()
products = prepared_data["products"]
for instance_name, product_data in products.items():
# get important instance variables
task_name = product_data["task_name"]
task_type = product_data["task_type"]
variant = product_data["variant"]
product_type = product_data["product_type"]
version = product_data["version"]
# create subset/product name
product_name = get_product_name(
project_name,
task_name,
task_type,
self.host_name,
product_type,
variant
)
# make sure frame start/end is inherited from csv columns
# expected frame range data are handles excluded
for _, repre_data in product_data["representations"].items(): # noqa: E501
frame_start = repre_data["frameStart"]
frame_end = repre_data["frameEnd"]
handle_start = repre_data["handleStart"]
handle_end = repre_data["handleEnd"]
fps = repre_data["fps"]
break
# try to find any version comment in representation data
version_comment = next(
iter(
repre_data["comment"]
for repre_data in product_data["representations"].values() # noqa: E501
if repre_data["comment"]
),
None
)
# try to find any slate switch in representation data
slate_exists = any(
repre_data["slate"]
for _, repre_data in product_data["representations"].items() # noqa: E501
)
# get representations from product data
representations = product_data["representations"]
label = f"{folder_path}_{product_name}_v{version:>03}"
families = ["csv_ingest"]
if slate_exists:
# adding slate to families mainly for loaders to be able
# to filter out slates
families.append("slate")
# make product data
product_data = {
"name": instance_name,
"folderPath": folder_path,
"families": families,
"label": label,
"task": task_name,
"variant": variant,
"source": "csv",
"frameStart": frame_start,
"frameEnd": frame_end,
"handleStart": handle_start,
"handleEnd": handle_end,
"fps": fps,
"version": version,
"comment": version_comment,
}
# create new instance
new_instance = CreatedInstance(
product_type, product_name, product_data, self
)
self._store_new_instance(new_instance)
if not new_instance.get("prepared_data_for_repres"):
new_instance["prepared_data_for_repres"] = []
base_thumbnail_repre_data = {
"name": "thumbnail",
"ext": None,
"files": None,
"stagingDir": None,
"stagingDir_persistent": True,
"tags": ["thumbnail", "delete"],
}
# need to populate all thumbnails for all representations
# so we can check if unique thumbnail per representation
# is needed
thumbnails = [
repre_data["thumbnailPath"]
for repre_data in representations.values()
if repre_data["thumbnailPath"]
]
multiple_thumbnails = len(set(thumbnails)) > 1
explicit_output_name = None
thumbnails_processed = False
for filepath, repre_data in representations.items():
# check if any review derivate tag is present
reviewable = any(
tag for tag in repre_data.get("tags", [])
# tag can be `ftrackreview` or `review`
if "review" in tag
)
# since we need to populate multiple thumbnails as
# representation with outputName for (Ftrack instance
# integrator) pairing with reviewable video representations
if (
thumbnails
and multiple_thumbnails
and reviewable
):
# multiple unique thumbnails per representation needs
# grouping by outputName
# mainly used in Ftrack instance integrator
explicit_output_name = repre_data["representationName"]
relative_thumbnail_path = repre_data["thumbnailPath"]
# representation might not have thumbnail path
# so ignore this one
if not relative_thumbnail_path:
continue
thumb_dir, thumb_file = \
self._get_refactor_thumbnail_path(
staging_dir, relative_thumbnail_path)
filename, ext = os.path.splitext(thumb_file)
thumbnail_repr_data = deepcopy(
base_thumbnail_repre_data)
thumbnail_repr_data.update({
"name": "thumbnail_{}".format(filename),
"ext": ext[1:],
"files": thumb_file,
"stagingDir": thumb_dir,
"outputName": explicit_output_name,
})
new_instance["prepared_data_for_repres"].append({
"type": "thumbnail",
"colorspace": None,
"representation": thumbnail_repr_data,
})
# also add thumbnailPath for ayon to integrate
if not new_instance.get("thumbnailPath"):
new_instance["thumbnailPath"] = (
os.path.join(thumb_dir, thumb_file)
)
elif (
thumbnails
and not multiple_thumbnails
and not thumbnails_processed
or not reviewable
):
"""
For case where we have only one thumbnail
and not reviewable medias. This needs to be processed
only once per instance.
"""
if not thumbnails:
continue
# here we will use only one thumbnail for
# all representations
relative_thumbnail_path = repre_data["thumbnailPath"]
# popping last thumbnail from list since it is only one
# and we do not need to iterate again over it
if not relative_thumbnail_path:
relative_thumbnail_path = thumbnails.pop()
thumb_dir, thumb_file = \
self._get_refactor_thumbnail_path(
staging_dir, relative_thumbnail_path)
_, ext = os.path.splitext(thumb_file)
thumbnail_repr_data = deepcopy(
base_thumbnail_repre_data)
thumbnail_repr_data.update({
"ext": ext[1:],
"files": thumb_file,
"stagingDir": thumb_dir
})
new_instance["prepared_data_for_repres"].append({
"type": "thumbnail",
"colorspace": None,
"representation": thumbnail_repr_data,
})
# also add thumbnailPath for ayon to integrate
if not new_instance.get("thumbnailPath"):
new_instance["thumbnailPath"] = (
os.path.join(thumb_dir, thumb_file)
)
thumbnails_processed = True
# get representation data
representation_data = self._get_representation_data(
filepath, repre_data, staging_dir,
explicit_output_name
)
new_instance["prepared_data_for_repres"].append({
"type": "media",
"colorspace": repre_data["colorspace"],
"representation": representation_data,
})
def _get_refactor_thumbnail_path(
self, staging_dir, relative_thumbnail_path):
thumbnail_abs_path = os.path.join(
staging_dir, relative_thumbnail_path)
return os.path.split(
thumbnail_abs_path)
def _get_representation_data(
self, filepath, repre_data, staging_dir, explicit_output_name=None
):
"""Get representation data
Args:
filepath (str): Filepath to representation file.
repre_data (dict): Representation data from CSV file.
staging_dir (str): Staging directory.
explicit_output_name (Optional[str]): Explicit output name.
For grouping purposes with reviewable components.
Defaults to None.
"""
# get extension of file
basename = os.path.basename(filepath)
extension = os.path.splitext(filepath)[-1].lower()
# validate filepath is having correct extension based on output
repre_name = repre_data["representationName"]
repre_config_data = None
for repre in self.representations_config["representations"]:
if repre["name"] == repre_name:
repre_config_data = repre
break
if not repre_config_data:
raise CreatorError(
f"Representation '{repre_name}' not found "
"in config representation data."
)
validate_extensions = repre_config_data["extensions"]
if extension not in validate_extensions:
raise CreatorError(
f"File extension '{extension}' not valid for "
f"output '{validate_extensions}'."
)
is_sequence = (extension in IMAGE_EXTENSIONS)
# convert ### string in file name to %03d
# this is for correct frame range validation
# example: file.###.exr -> file.%03d.exr
if "#" in basename:
padding = len(basename.split("#")) - 1
basename = basename.replace("#" * padding, f"%0{padding}d")
is_sequence = True
# make absolute path to file
absfilepath = os.path.normpath(os.path.join(staging_dir, filepath))
dirname = os.path.dirname(absfilepath)
# check if dirname exists
if not os.path.isdir(dirname):
raise CreatorError(
f"Directory '{dirname}' does not exist."
)
# collect all data from dirname
paths_for_collection = []
for file in os.listdir(dirname):
filepath = os.path.join(dirname, file)
paths_for_collection.append(filepath)
collections, _ = clique.assemble(paths_for_collection)
if collections:
collections = collections[0]
else:
if is_sequence:
raise CreatorError(
f"No collections found in directory '{dirname}'."
)
frame_start = None
frame_end = None
if is_sequence:
files = [os.path.basename(file) for file in collections]
frame_start = list(collections.indexes)[0]
frame_end = list(collections.indexes)[-1]
else:
files = basename
tags = deepcopy(repre_data["tags"])
# if slate in repre_data is True then remove one frame from start
if repre_data["slate"]:
tags.append("has_slate")
# get representation data
representation_data = {
"name": repre_name,
"ext": extension[1:],
"files": files,
"stagingDir": dirname,
"stagingDir_persistent": True,
"tags": tags,
}
if extension in VIDEO_EXTENSIONS:
representation_data.update({
"fps": repre_data["fps"],
"outputName": repre_name,
})
if explicit_output_name:
representation_data["outputName"] = explicit_output_name
if frame_start:
representation_data["frameStart"] = frame_start
if frame_end:
representation_data["frameEnd"] = frame_end
return representation_data
def _get_data_from_csv(
self, package_dir, filename
):
"""Generate instances from the csv file"""
# get current project name and code from context.data
project_name = self.create_context.get_current_project_name()
csv_file_path = os.path.join(
package_dir, filename
)
# make sure csv file contains columns from following list
required_columns = [
column["name"] for column in self.columns_config["columns"]
if column["required_column"]
]
# read csv file
with open(csv_file_path, "r") as csv_file:
csv_content = csv_file.read()
# read csv file with DictReader
csv_reader = csv.DictReader(
StringIO(csv_content),
delimiter=self.columns_config["csv_delimiter"]
)
# fix fieldnames
# sometimes someone can keep extra space at the start or end of
# the column name
all_columns = [
" ".join(column.rsplit()) for column in csv_reader.fieldnames]
# return back fixed fieldnames
csv_reader.fieldnames = all_columns
# check if csv file contains all required columns
if any(column not in all_columns for column in required_columns):
raise CreatorError(
f"Missing required columns: {required_columns}"
)
csv_data = {}
# get data from csv file
for row in csv_reader:
# Get required columns first
# TODO: will need to be folder path in CSV
# TODO: `context_asset_name` is now `folder_path`
folder_path = self._get_row_value_with_validation(
"Folder Path", row)
task_name = self._get_row_value_with_validation(
"Task Name", row)
version = self._get_row_value_with_validation(
"Version", row)
# Get optional columns
variant = self._get_row_value_with_validation(
"Variant", row)
product_type = self._get_row_value_with_validation(
"Product Type", row)
pre_product_name = (
f"{task_name}{variant}{product_type}"
f"{version}".replace(" ", "").lower()
)
# get representation data
filename, representation_data = \
self._get_representation_row_data(row)
# TODO: batch query of all folder paths and task names
# get folder entity from folder path
folder_entity = get_folder_by_path(
project_name, folder_path)
# make sure asset exists
if not folder_entity:
raise CreatorError(
f"Asset '{folder_path}' not found."
)
# first get all tasks on the folder entity and then find
task_entity = get_task_by_name(
project_name, folder_entity["id"], task_name)
# check if task name is valid task in asset doc
if not task_entity:
raise CreatorError(
f"Task '{task_name}' not found in asset doc."
)
# get all csv data into one dict and make sure there are no
# duplicates data are already validated and sorted under
# correct existing asset also check if asset exists and if
# task name is valid task in asset doc and representations
# are distributed under products following variants
if folder_path not in csv_data:
csv_data[folder_path] = {
"folder_entity": folder_entity,
"products": {
pre_product_name: {
"task_name": task_name,
"task_type": task_entity["taskType"],
"variant": variant,
"product_type": product_type,
"version": version,
"representations": {
filename: representation_data,
},
}
}
}
else:
csv_products = csv_data[folder_path]["products"]
if pre_product_name not in csv_products:
csv_products[pre_product_name] = {
"task_name": task_name,
"task_type": task_entity["taskType"],
"variant": variant,
"product_type": product_type,
"version": version,
"representations": {
filename: representation_data,
},
}
else:
csv_representations = \
csv_products[pre_product_name]["representations"]
if filename in csv_representations:
raise CreatorError(
f"Duplicate filename '{filename}' in csv file."
)
csv_representations[filename] = representation_data
return csv_data
def _get_representation_row_data(self, row_data):
"""Get representation row data"""
# Get required columns first
file_path = self._get_row_value_with_validation(
"File Path", row_data)
frame_start = self._get_row_value_with_validation(
"Frame Start", row_data)
frame_end = self._get_row_value_with_validation(
"Frame End", row_data)
handle_start = self._get_row_value_with_validation(
"Handle Start", row_data)
handle_end = self._get_row_value_with_validation(
"Handle End", row_data)
fps = self._get_row_value_with_validation(
"FPS", row_data)
# Get optional columns
thumbnail_path = self._get_row_value_with_validation(
"Version Thumbnail", row_data)
colorspace = self._get_row_value_with_validation(
"Representation Colorspace", row_data)
comment = self._get_row_value_with_validation(
"Version Comment", row_data)
repre = self._get_row_value_with_validation(
"Representation", row_data)
slate_exists = self._get_row_value_with_validation(
"Slate Exists", row_data)
repre_tags = self._get_row_value_with_validation(
"Representation Tags", row_data)
# convert tags value to list
tags_list = copy(self.representations_config["default_tags"])
if repre_tags:
tags_list = []
tags_delimiter = self.representations_config["tags_delimiter"]
# strip spaces from repre_tags
if tags_delimiter in repre_tags:
tags = repre_tags.split(tags_delimiter)
for _tag in tags:
tags_list.append(("".join(_tag.strip())).lower())
else:
tags_list.append(repre_tags)
representation_data = {
"colorspace": colorspace,
"comment": comment,
"representationName": repre,
"slate": slate_exists,
"tags": tags_list,
"thumbnailPath": thumbnail_path,
"frameStart": int(frame_start),
"frameEnd": int(frame_end),
"handleStart": int(handle_start),
"handleEnd": int(handle_end),
"fps": float(fps),
}
return file_path, representation_data
def _get_row_value_with_validation(
self, column_name, row_data, default_value=None
):
"""Get row value with validation"""
# get column data from column config
column_data = None
for column in self.columns_config["columns"]:
if column["name"] == column_name:
column_data = column
break
if not column_data:
raise CreatorError(
f"Column '{column_name}' not found in column config."
)
# get column value from row
column_value = row_data.get(column_name)
column_required = column_data["required_column"]
# check if column value is not empty string and column is required
if column_value == "" and column_required:
raise CreatorError(
f"Value in column '{column_name}' is required."
)
# get column type
column_type = column_data["type"]
# get column validation regex
column_validation = column_data["validation_pattern"]
# get column default value
column_default = default_value or column_data["default"]
if column_type in ["number", "decimal"] and column_default == 0:
column_default = None
# check if column value is not empty string
if column_value == "":
# set default value if column value is empty string
column_value = column_default
# set column value to correct type following column type
if column_type == "number" and column_value is not None:
column_value = int(column_value)
elif column_type == "decimal" and column_value is not None:
column_value = float(column_value)
elif column_type == "bool":
column_value = column_value in ["true", "True"]
# check if column value matches validation regex
if (
column_value is not None and
not re.match(str(column_validation), str(column_value))
):
raise CreatorError(
f"Column '{column_name}' value '{column_value}' "
f"does not match validation regex '{column_validation}' \n"
f"Row data: {row_data} \n"
f"Column data: {column_data}"
)
return column_value
def _pass_data_to_csv_instance(
self, instance_data, staging_dir, filename
):
"""Pass CSV representation file to instance data"""
representation = {
"name": "csv",
"ext": "csv",
"files": filename,
"stagingDir": staging_dir,
"stagingDir_persistent": True,
}
instance_data.update({
"label": f"CSV: {filename}",
"representations": [representation],
"stagingDir": staging_dir,
"stagingDir_persistent": True,
})
def get_instance_attr_defs(self):
return [
BoolDef(
"add_review_family",
default=True,
label="Review"
)
]
def get_pre_create_attr_defs(self):
"""Creating pre-create attributes at creator plugin.
Returns:
list: list of attribute object instances
"""
# Use same attributes as for instance attributes
attr_defs = [
FileDef(
"csv_filepath_data",
folders=False,
extensions=[".csv"],
allow_sequences=False,
single_item=True,
label="CSV File",
),
]
return attr_defs

View file

@ -0,0 +1,845 @@
import os
from copy import deepcopy
import ayon_api
import opentimelineio as otio
from ayon_core.hosts.traypublisher.api.plugin import (
TrayPublishCreator,
HiddenTrayPublishCreator
)
from ayon_core.hosts.traypublisher.api.editorial import (
ShotMetadataSolver
)
from ayon_core.pipeline import CreatedInstance
from ayon_core.lib import (
get_ffprobe_data,
convert_ffprobe_fps_value,
FileDef,
TextDef,
NumberDef,
EnumDef,
BoolDef,
UISeparatorDef,
UILabelDef
)
CLIP_ATTR_DEFS = [
EnumDef(
"fps",
items=[
{"value": "from_selection", "label": "From selection"},
{"value": 23.997, "label": "23.976"},
{"value": 24, "label": "24"},
{"value": 25, "label": "25"},
{"value": 29.97, "label": "29.97"},
{"value": 30, "label": "30"}
],
label="FPS"
),
NumberDef(
"workfile_start_frame",
default=1001,
label="Workfile start frame"
),
NumberDef(
"handle_start",
default=0,
label="Handle start"
),
NumberDef(
"handle_end",
default=0,
label="Handle end"
)
]
class EditorialClipInstanceCreatorBase(HiddenTrayPublishCreator):
"""Wrapper class for clip product type creators."""
host_name = "traypublisher"
def create(self, instance_data, source_data=None):
product_name = instance_data["productName"]
# Create new instance
new_instance = CreatedInstance(
self.product_type, product_name, instance_data, self
)
self._store_new_instance(new_instance)
return new_instance
def get_instance_attr_defs(self):
return [
BoolDef(
"add_review_family",
default=True,
label="Review"
)
]
class EditorialShotInstanceCreator(EditorialClipInstanceCreatorBase):
"""Shot product type class
The shot metadata instance carrier.
"""
identifier = "editorial_shot"
product_type = "shot"
label = "Editorial Shot"
def get_instance_attr_defs(self):
instance_attributes = [
TextDef(
"folderPath",
label="Folder path"
)
]
instance_attributes.extend(CLIP_ATTR_DEFS)
return instance_attributes
class EditorialPlateInstanceCreator(EditorialClipInstanceCreatorBase):
"""Plate product type class
Plate representation instance.
"""
identifier = "editorial_plate"
product_type = "plate"
label = "Editorial Plate"
class EditorialAudioInstanceCreator(EditorialClipInstanceCreatorBase):
"""Audio product type class
Audio representation instance.
"""
identifier = "editorial_audio"
product_type = "audio"
label = "Editorial Audio"
class EditorialReviewInstanceCreator(EditorialClipInstanceCreatorBase):
"""Review product type class
Review representation instance.
"""
identifier = "editorial_review"
product_type = "review"
label = "Editorial Review"
class EditorialSimpleCreator(TrayPublishCreator):
"""Editorial creator class
Simple workflow creator. This creator only disecting input
video file into clip chunks and then converts each to
defined format defined Settings for each product preset.
Args:
TrayPublishCreator (Creator): Tray publisher plugin class
"""
label = "Editorial Simple"
product_type = "editorial"
identifier = "editorial_simple"
default_variants = [
"main"
]
description = "Editorial files to generate shots."
detailed_description = """
Supporting publishing new shots to project
or updating already created. Publishing will create OTIO file.
"""
icon = "fa.file"
product_type_presets = []
def __init__(self, *args, **kwargs):
self._shot_metadata_solver = ShotMetadataSolver(self.log)
super(EditorialSimpleCreator, self).__init__(*args, **kwargs)
def apply_settings(self, project_settings):
editorial_creators = deepcopy(
project_settings["traypublisher"]["editorial_creators"]
)
creator_settings = editorial_creators.get(self.identifier)
self._shot_metadata_solver.update_data(
creator_settings["clip_name_tokenizer"],
creator_settings["shot_rename"],
creator_settings["shot_hierarchy"],
creator_settings["shot_add_tasks"]
)
self.product_type_presets = creator_settings["product_type_presets"]
default_variants = creator_settings.get("default_variants")
if default_variants:
self.default_variants = default_variants
def create(self, product_name, instance_data, pre_create_data):
allowed_product_type_presets = self._get_allowed_product_type_presets(
pre_create_data)
product_types = {
item["product_type"]
for item in self.product_type_presets
}
clip_instance_properties = {
k: v
for k, v in pre_create_data.items()
if k != "sequence_filepath_data"
if k not in product_types
}
folder_path = instance_data["folderPath"]
folder_entity = ayon_api.get_folder_by_path(
self.project_name, folder_path
)
if pre_create_data["fps"] == "from_selection":
# get 'fps' from folder attributes
fps = folder_entity["attrib"]["fps"]
else:
fps = float(pre_create_data["fps"])
instance_data.update({
"fps": fps
})
# get path of sequence
sequence_path_data = pre_create_data["sequence_filepath_data"]
media_path_data = pre_create_data["media_filepaths_data"]
sequence_paths = self._get_path_from_file_data(
sequence_path_data, multi=True)
media_path = self._get_path_from_file_data(media_path_data)
first_otio_timeline = None
for seq_path in sequence_paths:
# get otio timeline
otio_timeline = self._create_otio_timeline(
seq_path, fps)
# Create all clip instances
clip_instance_properties.update({
"fps": fps,
"variant": instance_data["variant"]
})
# create clip instances
self._get_clip_instances(
folder_entity,
otio_timeline,
media_path,
clip_instance_properties,
allowed_product_type_presets,
os.path.basename(seq_path),
first_otio_timeline,
)
if not first_otio_timeline:
# assign otio timeline for multi file to layer
first_otio_timeline = otio_timeline
# create otio editorial instance
self._create_otio_instance(
product_name,
instance_data,
seq_path,
media_path,
first_otio_timeline
)
def _create_otio_instance(
self,
product_name,
data,
sequence_path,
media_path,
otio_timeline
):
"""Otio instance creating function
Args:
product_name (str): Product name.
data (dict): instance data
sequence_path (str): path to sequence file
media_path (str): path to media file
otio_timeline (otio.Timeline): otio timeline object
"""
# Pass precreate data to creator attributes
data.update({
"sequenceFilePath": sequence_path,
"editorialSourcePath": media_path,
"otioTimeline": otio.adapters.write_to_string(otio_timeline)
})
new_instance = CreatedInstance(
self.product_type, product_name, data, self
)
self._store_new_instance(new_instance)
def _create_otio_timeline(self, sequence_path, fps):
"""Creating otio timeline from sequence path
Args:
sequence_path (str): path to sequence file
fps (float): frame per second
Returns:
otio.Timeline: otio timeline object
"""
# get editorial sequence file into otio timeline object
extension = os.path.splitext(sequence_path)[1]
kwargs = {}
if extension == ".edl":
# EDL has no frame rate embedded so needs explicit
# frame rate else 24 is assumed.
kwargs["rate"] = fps
kwargs["ignore_timecode_mismatch"] = True
return otio.adapters.read_from_file(sequence_path, **kwargs)
def _get_path_from_file_data(self, file_path_data, multi=False):
"""Converting creator path data to single path string
Args:
file_path_data (FileDefItem): creator path data inputs
multi (bool): switch to multiple files mode
Raises:
FileExistsError: in case nothing had been set
Returns:
str: path string
"""
return_path_list = []
if isinstance(file_path_data, list):
return_path_list = [
os.path.join(f["directory"], f["filenames"][0])
for f in file_path_data
]
if not return_path_list:
raise FileExistsError(
f"File path was not added: {file_path_data}")
return return_path_list if multi else return_path_list[0]
def _get_clip_instances(
self,
folder_entity,
otio_timeline,
media_path,
instance_data,
product_type_presets,
sequence_file_name,
first_otio_timeline=None
):
"""Helping function for creating clip instance
Args:
folder_entity (dict[str, Any]): Folder entity.
otio_timeline (otio.Timeline): otio timeline object
media_path (str): media file path string
instance_data (dict): clip instance data
product_type_presets (list): list of dict settings product presets
"""
tracks = [
track for track in otio_timeline.each_child(
descended_from_type=otio.schema.Track)
if track.kind == "Video"
]
# media data for audio stream and reference solving
media_data = self._get_media_source_metadata(media_path)
for track in tracks:
# set track name
track.name = f"{sequence_file_name} - {otio_timeline.name}"
try:
track_start_frame = (
abs(track.source_range.start_time.value)
)
track_start_frame -= self.timeline_frame_start
except AttributeError:
track_start_frame = 0
for otio_clip in track.each_child():
if not self._validate_clip_for_processing(otio_clip):
continue
# get available frames info to clip data
self._create_otio_reference(otio_clip, media_path, media_data)
# convert timeline range to source range
self._restore_otio_source_range(otio_clip)
base_instance_data = self._get_base_instance_data(
otio_clip,
instance_data,
track_start_frame,
folder_entity
)
parenting_data = {
"instance_label": None,
"instance_id": None
}
for product_type_preset in product_type_presets:
# exclude audio product type if no audio stream
if (
product_type_preset["product_type"] == "audio"
and not media_data.get("audio")
):
continue
self._make_product_instance(
otio_clip,
product_type_preset,
deepcopy(base_instance_data),
parenting_data
)
# add track to first otioTimeline if it is in input args
if first_otio_timeline:
first_otio_timeline.tracks.append(deepcopy(track))
def _restore_otio_source_range(self, otio_clip):
"""Infusing source range.
Otio clip is missing proper source clip range so
here we add them from from parent timeline frame range.
Args:
otio_clip (otio.Clip): otio clip object
"""
otio_clip.source_range = otio_clip.range_in_parent()
def _create_otio_reference(
self,
otio_clip,
media_path,
media_data
):
"""Creating otio reference at otio clip.
Args:
otio_clip (otio.Clip): otio clip object
media_path (str): media file path string
media_data (dict): media metadata
"""
start_frame = media_data["start_frame"]
frame_duration = media_data["duration"]
fps = media_data["fps"]
available_range = otio.opentime.TimeRange(
start_time=otio.opentime.RationalTime(
start_frame, fps),
duration=otio.opentime.RationalTime(
frame_duration, fps)
)
# in case old OTIO or video file create `ExternalReference`
media_reference = otio.schema.ExternalReference(
target_url=media_path,
available_range=available_range
)
otio_clip.media_reference = media_reference
def _get_media_source_metadata(self, path):
"""Get all available metadata from file
Args:
path (str): media file path string
Raises:
AssertionError: ffprobe couldn't read metadata
Returns:
dict: media file metadata
"""
return_data = {}
try:
media_data = get_ffprobe_data(
path, self.log
)
# get video stream data
video_streams = []
audio_streams = []
for stream in media_data["streams"]:
codec_type = stream.get("codec_type")
if codec_type == "audio":
audio_streams.append(stream)
elif codec_type == "video":
video_streams.append(stream)
if not video_streams:
raise ValueError(
"Could not find video stream in source file."
)
video_stream = video_streams[0]
return_data = {
"video": True,
"start_frame": 0,
"duration": int(video_stream["nb_frames"]),
"fps": float(
convert_ffprobe_fps_value(
video_stream["r_frame_rate"]
)
)
}
# get audio streams data
if audio_streams:
return_data["audio"] = True
except Exception as exc:
raise AssertionError((
"FFprobe couldn't read information about input file: "
f"\"{path}\". Error message: {exc}"
))
return return_data
def _make_product_instance(
self,
otio_clip,
product_type_preset,
instance_data,
parenting_data
):
"""Making product instance from input preset
Args:
otio_clip (otio.Clip): otio clip object
product_type_preset (dict): single product type preset
instance_data (dict): instance data
parenting_data (dict): shot instance parent data
Returns:
CreatedInstance: creator instance object
"""
product_type = product_type_preset["product_type"]
label = self._make_product_naming(
product_type_preset,
instance_data
)
instance_data["label"] = label
# add file extension filter only if it is not shot product type
if product_type == "shot":
instance_data["otioClip"] = (
otio.adapters.write_to_string(otio_clip))
c_instance = self.create_context.creators[
"editorial_shot"].create(
instance_data)
parenting_data.update({
"instance_label": label,
"instance_id": c_instance.data["instance_id"]
})
else:
# add review family if defined
instance_data.update({
"outputFileType": product_type_preset["output_file_type"],
"parent_instance_id": parenting_data["instance_id"],
"creator_attributes": {
"parent_instance": parenting_data["instance_label"],
"add_review_family": product_type_preset.get("review")
}
})
creator_identifier = f"editorial_{product_type}"
editorial_clip_creator = self.create_context.creators[
creator_identifier]
c_instance = editorial_clip_creator.create(
instance_data)
return c_instance
def _make_product_naming(self, product_type_preset, instance_data):
"""Product name maker
Args:
product_type_preset (dict): single preset item
instance_data (dict): instance data
Returns:
str: label string
"""
folder_path = instance_data["creator_attributes"]["folderPath"]
variant_name = instance_data["variant"]
product_type = product_type_preset["product_type"]
# get variant name from preset or from inheritance
_variant_name = product_type_preset.get("variant") or variant_name
# product name
product_name = "{}{}".format(
product_type, _variant_name.capitalize()
)
label = "{} {}".format(
folder_path,
product_name
)
instance_data.update({
"label": label,
"variant": _variant_name,
"productType": product_type,
"productName": product_name,
})
return label
def _get_base_instance_data(
self,
otio_clip,
instance_data,
track_start_frame,
folder_entity,
):
"""Factoring basic set of instance data.
Args:
otio_clip (otio.Clip): otio clip object
instance_data (dict): precreate instance data
track_start_frame (int): track start frame
Returns:
dict: instance data
"""
parent_folder_path = folder_entity["path"]
parent_folder_name = parent_folder_path.rsplit("/", 1)[-1]
# get clip instance properties
handle_start = instance_data["handle_start"]
handle_end = instance_data["handle_end"]
timeline_offset = instance_data["timeline_offset"]
workfile_start_frame = instance_data["workfile_start_frame"]
fps = instance_data["fps"]
variant_name = instance_data["variant"]
# basic unique folder name
clip_name = os.path.splitext(otio_clip.name)[0]
project_entity = ayon_api.get_project(self.project_name)
shot_name, shot_metadata = self._shot_metadata_solver.generate_data(
clip_name,
{
"anatomy_data": {
"project": {
"name": self.project_name,
"code": project_entity["code"]
},
"parent": parent_folder_name,
"app": self.host_name
},
"selected_folder_entity": folder_entity,
"project_entity": project_entity
}
)
timing_data = self._get_timing_data(
otio_clip,
timeline_offset,
track_start_frame,
workfile_start_frame
)
# create creator attributes
creator_attributes = {
"workfile_start_frame": workfile_start_frame,
"fps": fps,
"handle_start": int(handle_start),
"handle_end": int(handle_end)
}
# add timing data
creator_attributes.update(timing_data)
# create base instance data
base_instance_data = {
"shotName": shot_name,
"variant": variant_name,
"task": None,
"newAssetPublishing": True,
"trackStartFrame": track_start_frame,
"timelineOffset": timeline_offset,
# creator_attributes
"creator_attributes": creator_attributes
}
# update base instance data with context data
# and also update creator attributes with context data
creator_attributes["folderPath"] = shot_metadata.pop("folderPath")
base_instance_data["folderPath"] = parent_folder_path
# add creator attributes to shared instance data
base_instance_data["creator_attributes"] = creator_attributes
# add hierarchy shot metadata
base_instance_data.update(shot_metadata)
return base_instance_data
def _get_timing_data(
self,
otio_clip,
timeline_offset,
track_start_frame,
workfile_start_frame
):
"""Returning available timing data
Args:
otio_clip (otio.Clip): otio clip object
timeline_offset (int): offset value
track_start_frame (int): starting frame input
workfile_start_frame (int): start frame for shot's workfiles
Returns:
dict: timing metadata
"""
# frame ranges data
clip_in = otio_clip.range_in_parent().start_time.value
clip_in += track_start_frame
clip_out = otio_clip.range_in_parent().end_time_inclusive().value
clip_out += track_start_frame
# add offset in case there is any
if timeline_offset:
clip_in += timeline_offset
clip_out += timeline_offset
clip_duration = otio_clip.duration().value
source_in = otio_clip.trimmed_range().start_time.value
source_out = source_in + clip_duration
# define starting frame for future shot
frame_start = (
clip_in if workfile_start_frame is None
else workfile_start_frame
)
frame_end = frame_start + (clip_duration - 1)
return {
"frameStart": int(frame_start),
"frameEnd": int(frame_end),
"clipIn": int(clip_in),
"clipOut": int(clip_out),
"clipDuration": int(otio_clip.duration().value),
"sourceIn": int(source_in),
"sourceOut": int(source_out)
}
def _get_allowed_product_type_presets(self, pre_create_data):
"""Filter out allowed product type presets.
Args:
pre_create_data (dict): precreate attributes inputs
Returns:
list: lit of dict with preset items
"""
return [
{"product_type": "shot"},
*[
preset
for preset in self.product_type_presets
if pre_create_data[preset["product_type"]]
]
]
def _validate_clip_for_processing(self, otio_clip):
"""Validate otio clip attributes
Args:
otio_clip (otio.Clip): otio clip object
Returns:
bool: True if all passing conditions
"""
if otio_clip.name is None:
return False
if isinstance(otio_clip, otio.schema.Gap):
return False
# skip all generators like black empty
if isinstance(
otio_clip.media_reference,
otio.schema.GeneratorReference):
return False
# Transitions are ignored, because Clips have the full frame
# range.
if isinstance(otio_clip, otio.schema.Transition):
return False
return True
def get_pre_create_attr_defs(self):
"""Creating pre-create attributes at creator plugin.
Returns:
list: list of attribute object instances
"""
# Use same attributes as for instance attrobites
attr_defs = [
FileDef(
"sequence_filepath_data",
folders=False,
extensions=[
".edl",
".xml",
".aaf",
".fcpxml"
],
allow_sequences=False,
single_item=False,
label="Sequence file",
),
FileDef(
"media_filepaths_data",
folders=False,
extensions=[
".mov",
".mp4",
".wav"
],
allow_sequences=False,
single_item=False,
label="Media files",
),
# TODO: perhaps better would be timecode and fps input
NumberDef(
"timeline_offset",
default=0,
label="Timeline offset"
),
UISeparatorDef(),
UILabelDef("Clip instance attributes"),
UISeparatorDef()
]
# add variants swithers
attr_defs.extend(
BoolDef(item["product_type"], label=item["product_type"])
for item in self.product_type_presets
)
attr_defs.append(UISeparatorDef())
attr_defs.extend(CLIP_ATTR_DEFS)
return attr_defs

View file

@ -0,0 +1,96 @@
from pathlib import Path
from ayon_core.pipeline import (
CreatedInstance,
)
from ayon_core.lib.attribute_definitions import (
FileDef,
BoolDef,
TextDef,
)
from ayon_core.hosts.traypublisher.api.plugin import TrayPublishCreator
class EditorialPackageCreator(TrayPublishCreator):
"""Creates instance for OTIO file from published folder.
Folder contains OTIO file and exported .mov files. Process should publish
whole folder as single `editorial_pckg` product type and (possibly) convert
.mov files into different format and copy them into `publish` `resources`
subfolder.
"""
identifier = "editorial_pckg"
label = "Editorial package"
product_type = "editorial_pckg"
description = "Publish folder with OTIO file and resources"
# Position batch creator after simple creators
order = 120
conversion_enabled = False
def apply_settings(self, project_settings):
self.conversion_enabled = (
project_settings["traypublisher"]
["publish"]
["ExtractEditorialPckgConversion"]
["conversion_enabled"]
)
def get_icon(self):
return "fa.folder"
def create(self, product_name, instance_data, pre_create_data):
folder_path = pre_create_data.get("folder_path")
if not folder_path:
return
instance_data["creator_attributes"] = {
"folder_path": (Path(folder_path["directory"]) /
Path(folder_path["filenames"][0])).as_posix(),
"conversion_enabled": pre_create_data["conversion_enabled"]
}
# Create new instance
new_instance = CreatedInstance(self.product_type, product_name,
instance_data, self)
self._store_new_instance(new_instance)
def get_pre_create_attr_defs(self):
# Use same attributes as for instance attributes
return [
FileDef(
"folder_path",
folders=True,
single_item=True,
extensions=[],
allow_sequences=False,
label="Folder path"
),
BoolDef("conversion_enabled",
tooltip="Convert to output defined in Settings.",
default=self.conversion_enabled,
label="Convert resources"),
]
def get_instance_attr_defs(self):
return [
TextDef(
"folder_path",
label="Folder path",
disabled=True
),
BoolDef("conversion_enabled",
tooltip="Convert to output defined in Settings.",
label="Convert resources"),
]
def get_detail_description(self):
return """# Publish folder with OTIO file and video clips
Folder contains OTIO file and exported .mov files. Process should
publish whole folder as single `editorial_pckg` product type and
(possibly) convert .mov files into different format and copy them into
`publish` `resources` subfolder.
"""

View file

@ -0,0 +1,22 @@
import os
from ayon_core.lib import Logger
from ayon_core.settings import get_project_settings
log = Logger.get_logger(__name__)
def initialize():
from ayon_core.hosts.traypublisher.api.plugin import SettingsCreator
project_name = os.environ["AYON_PROJECT_NAME"]
project_settings = get_project_settings(project_name)
simple_creators = project_settings["traypublisher"]["simple_creators"]
global_variables = globals()
for item in simple_creators:
dynamic_plugin = SettingsCreator.from_settings(item)
global_variables[dynamic_plugin.__name__] = dynamic_plugin
initialize()

View file

@ -0,0 +1,170 @@
import copy
import os
import re
import collections
import ayon_api
from ayon_core.lib import (
FileDef,
BoolDef,
)
from ayon_core.pipeline import (
CreatedInstance,
)
from ayon_core.pipeline.create import (
get_product_name,
TaskNotSetError,
)
from ayon_core.hosts.traypublisher.api.plugin import TrayPublishCreator
from ayon_core.hosts.traypublisher.batch_parsing import (
get_folder_entity_from_filename
)
class BatchMovieCreator(TrayPublishCreator):
"""Creates instances from movie file(s).
Intended for .mov files, but should work for any video file.
Doesn't handle image sequences though.
"""
identifier = "render_movie_batch"
label = "Batch Movies"
product_type = "render"
description = "Publish batch of video files"
create_allow_context_change = False
version_regex = re.compile(r"^(.+)_v([0-9]+)$")
# Position batch creator after simple creators
order = 110
def apply_settings(self, project_settings):
creator_settings = (
project_settings["traypublisher"]["create"]["BatchMovieCreator"]
)
self.default_variants = creator_settings["default_variants"]
self.default_tasks = creator_settings["default_tasks"]
self.extensions = creator_settings["extensions"]
def get_icon(self):
return "fa.file"
def create(self, product_name, data, pre_create_data):
file_paths = pre_create_data.get("filepath")
if not file_paths:
return
data_by_folder_id = collections.defaultdict(list)
for file_info in file_paths:
instance_data = copy.deepcopy(data)
file_name = file_info["filenames"][0]
filepath = os.path.join(file_info["directory"], file_name)
instance_data["creator_attributes"] = {"filepath": filepath}
folder_entity, version = get_folder_entity_from_filename(
self.project_name, file_name, self.version_regex)
data_by_folder_id[folder_entity["id"]].append(
(instance_data, folder_entity)
)
all_task_entities = ayon_api.get_tasks(
self.project_name, task_ids=set(data_by_folder_id.keys())
)
task_entity_by_folder_id = collections.defaultdict(dict)
for task_entity in all_task_entities:
folder_id = task_entity["folderId"]
task_name = task_entity["name"].lower()
task_entity_by_folder_id[folder_id][task_name] = task_entity
for (
folder_id, (instance_data, folder_entity)
) in data_by_folder_id.items():
task_entities_by_name = task_entity_by_folder_id[folder_id]
task_name = None
task_entity = None
for default_task_name in self.default_tasks:
_name = default_task_name.lower()
if _name in task_entities_by_name:
task_name = task_entity["name"]
task_entity = task_entities_by_name[_name]
break
product_name = self._get_product_name(
self.project_name, task_entity, data["variant"]
)
instance_data["folderPath"] = folder_entity["path"]
instance_data["task"] = task_name
# Create new instance
new_instance = CreatedInstance(self.product_type, product_name,
instance_data, self)
self._store_new_instance(new_instance)
def _get_product_name(self, project_name, task_entity, variant):
"""Create product name according to standard template process"""
host_name = self.create_context.host_name
task_name = task_type = None
if task_entity:
task_name = task_entity["name"]
task_type = task_entity["taskType"]
try:
product_name = get_product_name(
project_name,
task_name,
task_type,
host_name,
self.product_type,
variant,
)
except TaskNotSetError:
# Create instance with fake task
# - instance will be marked as invalid so it can't be published
# but user have ability to change it
# NOTE: This expect that there is not task 'Undefined' on folder
dumb_value = "Undefined"
product_name = get_product_name(
project_name,
dumb_value,
dumb_value,
host_name,
self.product_type,
variant,
)
return product_name
def get_instance_attr_defs(self):
return [
BoolDef(
"add_review_family",
default=True,
label="Review"
)
]
def get_pre_create_attr_defs(self):
# Use same attributes as for instance attributes
return [
FileDef(
"filepath",
folders=False,
single_item=False,
extensions=self.extensions,
allow_sequences=False,
label="Filepath"
),
BoolDef(
"add_review_family",
default=True,
label="Review"
)
]
def get_detail_description(self):
return """# Publish batch of .mov to multiple folders.
File names must then contain only folder name, or folder name + version.
(eg. 'chair.mov', 'chair_v001.mov', not really safe `my_chair_v001.mov`
"""

View file

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
"""Creator of online files.
Online file retain their original name and use it as product name. To
avoid conflicts, this creator checks if product with this name already
exists under selected folder.
"""
from pathlib import Path
# import ayon_api
from ayon_core.lib.attribute_definitions import FileDef, BoolDef
from ayon_core.pipeline import (
CreatedInstance,
CreatorError
)
from ayon_core.hosts.traypublisher.api.plugin import TrayPublishCreator
class OnlineCreator(TrayPublishCreator):
"""Creates instance from file and retains its original name."""
identifier = "io.openpype.creators.traypublisher.online"
label = "Online"
product_type = "online"
description = "Publish file retaining its original file name"
extensions = [".mov", ".mp4", ".mxf", ".m4v", ".mpg", ".exr",
".dpx", ".tif", ".png", ".jpg"]
def get_detail_description(self):
return """# Create file retaining its original file name.
This will publish files using template helping to retain original
file name and that file name is used as product name.
Bz default it tries to guard against multiple publishes of the same
file."""
def get_icon(self):
return "fa.file"
def create(self, product_name, instance_data, pre_create_data):
repr_file = pre_create_data.get("representation_file")
if not repr_file:
raise CreatorError("No files specified")
files = repr_file.get("filenames")
if not files:
# this should never happen
raise CreatorError("Missing files from representation")
origin_basename = Path(files[0]).stem
# disable check for existing product with the same name
"""
folder_entity = ayon_api.get_folder_by_path(
self.project_name, instance_data["folderPath"], fields={"id"})
if ayon_api.get_product_by_name(
self.project_name, origin_basename, folder_entity["id"],
fields={"id"}):
raise CreatorError(f"product with {origin_basename} already "
"exists in selected folder")
"""
instance_data["originalBasename"] = origin_basename
product_name = origin_basename
instance_data["creator_attributes"] = {
"path": (Path(repr_file["directory"]) / files[0]).as_posix()
}
# Create new instance
new_instance = CreatedInstance(self.product_type, product_name,
instance_data, self)
self._store_new_instance(new_instance)
def get_instance_attr_defs(self):
return [
BoolDef(
"add_review_family",
default=True,
label="Review"
)
]
def get_pre_create_attr_defs(self):
return [
FileDef(
"representation_file",
folders=False,
extensions=self.extensions,
allow_sequences=True,
single_item=True,
label="Representation",
),
BoolDef(
"add_review_family",
default=True,
label="Review"
)
]
def get_product_name(
self,
project_name,
folder_entity,
task_entity,
variant,
host_name=None,
instance=None
):
if instance is None:
return "{originalBasename}"
return instance.data["productName"]

View file

@ -0,0 +1,13 @@
import pyblish.api
class CollectTrayPublisherAppName(pyblish.api.ContextPlugin):
"""Collect app name and label."""
label = "Collect App Name/Label"
order = pyblish.api.CollectorOrder - 0.5
hosts = ["traypublisher"]
def process(self, context):
context.data["appName"] = "tray publisher"
context.data["appLabel"] = "Tray publisher"

View file

@ -0,0 +1,36 @@
from pprint import pformat
import pyblish.api
class CollectClipInstance(pyblish.api.InstancePlugin):
"""Collect clip instances and resolve its parent"""
label = "Collect Clip Instances"
order = pyblish.api.CollectorOrder - 0.081
hosts = ["traypublisher"]
families = ["plate", "review", "audio"]
def process(self, instance):
creator_identifier = instance.data["creator_identifier"]
if creator_identifier not in [
"editorial_plate",
"editorial_audio",
"editorial_review"
]:
return
instance.data["families"].append("clip")
parent_instance_id = instance.data["parent_instance_id"]
edit_shared_data = instance.context.data["editorialSharedData"]
instance.data.update(
edit_shared_data[parent_instance_id]
)
if "editorialSourcePath" in instance.context.data.keys():
instance.data["editorialSourcePath"] = (
instance.context.data["editorialSourcePath"])
instance.data["families"].append("trimming")
self.log.debug(pformat(instance.data))

View file

@ -0,0 +1,86 @@
import os
from pprint import pformat
import pyblish.api
from ayon_core.pipeline import publish
from ayon_core.pipeline import colorspace
class CollectColorspaceLook(pyblish.api.InstancePlugin,
publish.AYONPyblishPluginMixin):
"""Collect OCIO colorspace look from LUT file
"""
label = "Collect Colorspace Look"
order = pyblish.api.CollectorOrder
hosts = ["traypublisher"]
families = ["ociolook"]
def process(self, instance):
creator_attrs = instance.data["creator_attributes"]
lut_repre_name = "LUTfile"
file_url = creator_attrs["abs_lut_path"]
file_name = os.path.basename(file_url)
base_name, ext = os.path.splitext(file_name)
# set output name with base_name which was cleared
# of all symbols and all parts were capitalized
output_name = (base_name.replace("_", " ")
.replace(".", " ")
.replace("-", " ")
.title()
.replace(" ", ""))
# get config items
config_items = instance.data["transientData"]["config_items"]
config_data = instance.data["transientData"]["config_data"]
# get colorspace items
converted_color_data = {}
for colorspace_key in [
"working_colorspace",
"input_colorspace",
"output_colorspace"
]:
if creator_attrs[colorspace_key]:
color_data = colorspace.convert_colorspace_enumerator_item(
creator_attrs[colorspace_key], config_items)
converted_color_data[colorspace_key] = color_data
else:
converted_color_data[colorspace_key] = None
# add colorspace to config data
if converted_color_data["working_colorspace"]:
config_data["colorspace"] = (
converted_color_data["working_colorspace"]["name"]
)
# create lut representation data
lut_repre = {
"name": lut_repre_name,
"output": output_name,
"ext": ext.lstrip("."),
"files": file_name,
"stagingDir": os.path.dirname(file_url),
"tags": []
}
instance.data.update({
"representations": [lut_repre],
"source": file_url,
"ocioLookWorkingSpace": converted_color_data["working_colorspace"],
"ocioLookItems": [
{
"name": lut_repre_name,
"ext": ext.lstrip("."),
"input_colorspace": converted_color_data[
"input_colorspace"],
"output_colorspace": converted_color_data[
"output_colorspace"],
"direction": creator_attrs["direction"],
"interpolation": creator_attrs["interpolation"],
"config_data": config_data
}
],
})
self.log.debug(pformat(instance.data))

View file

@ -0,0 +1,47 @@
from pprint import pformat
import pyblish.api
from ayon_core.pipeline import publish
class CollectCSVIngestInstancesData(
pyblish.api.InstancePlugin,
publish.AYONPyblishPluginMixin,
publish.ColormanagedPyblishPluginMixin
):
"""Collect CSV Ingest data from instance.
"""
label = "Collect CSV Ingest instances data"
order = pyblish.api.CollectorOrder + 0.1
hosts = ["traypublisher"]
families = ["csv_ingest"]
def process(self, instance):
# expecting [(colorspace, repre_data), ...]
prepared_repres_data_items = instance.data[
"prepared_data_for_repres"]
for prep_repre_data in prepared_repres_data_items:
type = prep_repre_data["type"]
colorspace = prep_repre_data["colorspace"]
repre_data = prep_repre_data["representation"]
# thumbnails should be skipped
if type == "media":
# colorspace name is passed from CSV column
self.set_representation_colorspace(
repre_data, instance.context, colorspace
)
elif type == "media" and colorspace is None:
# TODO: implement colorspace file rules file parsing
self.log.warning(
"Colorspace is not defined in csv for following"
f" representation: {pformat(repre_data)}"
)
pass
elif type == "thumbnail":
# thumbnails should be skipped
pass
instance.data["representations"].append(repre_data)

View file

@ -0,0 +1,48 @@
import os
from pprint import pformat
import pyblish.api
import opentimelineio as otio
class CollectEditorialInstance(pyblish.api.InstancePlugin):
"""Collect data for instances created by settings creators."""
label = "Collect Editorial Instances"
order = pyblish.api.CollectorOrder - 0.1
hosts = ["traypublisher"]
families = ["editorial"]
def process(self, instance):
if "families" not in instance.data:
instance.data["families"] = []
if "representations" not in instance.data:
instance.data["representations"] = []
fpath = instance.data["sequenceFilePath"]
otio_timeline_string = instance.data.pop("otioTimeline")
otio_timeline = otio.adapters.read_from_string(
otio_timeline_string)
instance.context.data["otioTimeline"] = otio_timeline
instance.context.data["editorialSourcePath"] = (
instance.data["editorialSourcePath"])
self.log.info(fpath)
instance.data["stagingDir"] = os.path.dirname(fpath)
_, ext = os.path.splitext(fpath)
instance.data["representations"].append({
"ext": ext[1:],
"name": ext[1:],
"stagingDir": instance.data["stagingDir"],
"files": os.path.basename(fpath)
})
self.log.debug("Created Editorial Instance {}".format(
pformat(instance.data)
))

View file

@ -0,0 +1,58 @@
"""Produces instance.data["editorial_pckg"] data used during integration.
Requires:
instance.data["creator_attributes"]["path"] - from creator
Provides:
instance -> editorial_pckg (dict):
folder_path (str)
otio_path (str) - from dragged folder
resource_paths (list)
"""
import os
import pyblish.api
from ayon_core.lib.transcoding import VIDEO_EXTENSIONS
class CollectEditorialPackage(pyblish.api.InstancePlugin):
"""Collects path to OTIO file and resources"""
label = "Collect Editorial Package"
order = pyblish.api.CollectorOrder - 0.1
hosts = ["traypublisher"]
families = ["editorial_pckg"]
def process(self, instance):
folder_path = instance.data["creator_attributes"]["folder_path"]
if not folder_path or not os.path.exists(folder_path):
self.log.info((
"Instance doesn't contain collected existing folder path."
))
return
instance.data["editorial_pckg"] = {}
instance.data["editorial_pckg"]["folder_path"] = folder_path
otio_path, resource_paths = (
self._get_otio_and_resource_paths(folder_path))
instance.data["editorial_pckg"]["otio_path"] = otio_path
instance.data["editorial_pckg"]["resource_paths"] = resource_paths
def _get_otio_and_resource_paths(self, folder_path):
otio_path = None
resource_paths = []
file_names = os.listdir(folder_path)
for filename in file_names:
_, ext = os.path.splitext(filename)
file_path = os.path.join(folder_path, filename)
if ext == ".otio":
otio_path = file_path
elif ext in VIDEO_EXTENSIONS:
resource_paths.append(file_path)
return otio_path, resource_paths

View file

@ -0,0 +1,30 @@
import pyblish.api
class CollectEditorialReviewable(pyblish.api.InstancePlugin):
""" Collect review input from user.
Adds the input to instance data.
"""
label = "Collect Editorial Reviewable"
order = pyblish.api.CollectorOrder
families = ["plate", "review", "audio"]
hosts = ["traypublisher"]
def process(self, instance):
creator_identifier = instance.data["creator_identifier"]
if creator_identifier not in [
"editorial_plate",
"editorial_audio",
"editorial_review"
]:
return
creator_attributes = instance.data["creator_attributes"]
if creator_attributes["add_review_family"]:
instance.data["families"].append("review")
self.log.debug("instance.data {}".format(instance.data))

View file

@ -0,0 +1,100 @@
import pyblish.api
from ayon_core.lib import EnumDef
from ayon_core.pipeline import colorspace
from ayon_core.pipeline import publish
from ayon_core.pipeline.publish import KnownPublishError
class CollectColorspace(pyblish.api.InstancePlugin,
publish.AYONPyblishPluginMixin,
publish.ColormanagedPyblishPluginMixin):
"""Collect explicit user defined representation colorspaces"""
label = "Choose representation colorspace"
order = pyblish.api.CollectorOrder + 0.49
hosts = ["traypublisher"]
families = ["render", "plate", "reference", "image", "online"]
enabled = False
default_colorspace_items = [
(None, "Don't override")
]
colorspace_items = list(default_colorspace_items)
colorspace_attr_show = False
config_items = None
def process(self, instance):
values = self.get_attr_values_from_data(instance.data)
colorspace_value = values.get("colorspace", None)
if colorspace_value is None:
return
color_data = colorspace.convert_colorspace_enumerator_item(
colorspace_value, self.config_items)
colorspace_name = self._colorspace_name_by_type(color_data)
self.log.debug("Explicit colorspace name: {}".format(colorspace_name))
context = instance.context
for repre in instance.data.get("representations", {}):
self.set_representation_colorspace(
representation=repre,
context=context,
colorspace=colorspace_name
)
def _colorspace_name_by_type(self, colorspace_data):
"""
Returns colorspace name by type
Arguments:
colorspace_data (dict): colorspace data
Returns:
str: colorspace name
"""
if colorspace_data["type"] == "colorspaces":
return colorspace_data["name"]
elif colorspace_data["type"] == "roles":
return colorspace_data["colorspace"]
else:
raise KnownPublishError(
(
"Collecting of colorspace failed. used config is missing "
"colorspace type: '{}' . Please contact your pipeline TD."
).format(colorspace_data['type'])
)
@classmethod
def apply_settings(cls, project_settings):
config_data = colorspace.get_current_context_imageio_config_preset(
project_settings=project_settings
)
enabled = False
colorspace_items = list(cls.default_colorspace_items)
config_items = None
if config_data:
filepath = config_data["path"]
config_items = colorspace.get_ocio_config_colorspaces(filepath)
labeled_colorspaces = colorspace.get_colorspaces_enumerator_items(
config_items,
include_aliases=True,
include_roles=True
)
colorspace_items.extend(labeled_colorspaces)
cls.config_items = config_items
cls.colorspace_items = colorspace_items
cls.enabled = enabled
@classmethod
def get_attribute_defs(cls):
return [
EnumDef(
"colorspace",
cls.colorspace_items,
default="Don't override",
label="Override Colorspace"
)
]

View file

@ -0,0 +1,51 @@
import pyblish.api
class CollectFrameDataFromAssetEntity(pyblish.api.InstancePlugin):
"""Collect Frame Data From 'folderEntity' found in context.
Frame range data will only be collected if the keys
are not yet collected for the instance.
"""
order = pyblish.api.CollectorOrder + 0.491
label = "Collect Missing Frame Data From Folder"
families = [
"plate",
"pointcache",
"vdbcache",
"online",
"render",
]
hosts = ["traypublisher"]
def process(self, instance):
missing_keys = []
for key in (
"fps",
"frameStart",
"frameEnd",
"handleStart",
"handleEnd",
):
if key not in instance.data:
missing_keys.append(key)
# Skip the logic if all keys are already collected.
# NOTE: In editorial is not 'folderEntity' filled, so it would crash
# even if we don't need it.
if not missing_keys:
return
keys_set = []
folder_attributes = instance.data["folderEntity"]["attrib"]
for key in missing_keys:
if key in folder_attributes:
instance.data[key] = folder_attributes[key]
keys_set.append(key)
if keys_set:
self.log.debug(
f"Frame range data {keys_set} "
"has been collected from folder entity."
)

View file

@ -0,0 +1,48 @@
import os
import pyblish.api
from ayon_core.pipeline import AYONPyblishPluginMixin
class CollectMovieBatch(
pyblish.api.InstancePlugin, AYONPyblishPluginMixin
):
"""Collect file url for batch movies and create representation.
Adds review on instance and to repre.tags based on value of toggle button
on creator.
"""
label = "Collect Movie Batch Files"
order = pyblish.api.CollectorOrder
hosts = ["traypublisher"]
def process(self, instance):
if instance.data.get("creator_identifier") != "render_movie_batch":
return
creator_attributes = instance.data["creator_attributes"]
file_url = creator_attributes["filepath"]
file_name = os.path.basename(file_url)
_, ext = os.path.splitext(file_name)
repre = {
"name": ext[1:],
"ext": ext[1:],
"files": file_name,
"stagingDir": os.path.dirname(file_url),
"tags": []
}
instance.data["representations"].append(repre)
if creator_attributes["add_review_family"]:
repre["tags"].append("review")
instance.data["families"].append("review")
if not instance.data.get("thumbnailSource"):
instance.data["thumbnailSource"] = file_url
instance.data["source"] = file_url
self.log.debug("instance.data {}".format(instance.data))

View file

@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
import pyblish.api
from pathlib import Path
class CollectOnlineFile(pyblish.api.InstancePlugin):
"""Collect online file and retain its file name."""
label = "Collect Online File"
order = pyblish.api.CollectorOrder
families = ["online"]
hosts = ["traypublisher"]
def process(self, instance):
file = Path(instance.data["creator_attributes"]["path"])
review = instance.data["creator_attributes"]["add_review_family"]
instance.data["review"] = review
if "review" not in instance.data["families"]:
instance.data["families"].append("review")
self.log.info(f"Adding review: {review}")
instance.data["representations"].append(
{
"name": file.suffix.lstrip("."),
"ext": file.suffix.lstrip("."),
"files": file.name,
"stagingDir": file.parent.as_posix(),
"tags": ["review"] if review else []
}
)

View file

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
import pyblish.api
class CollectReviewInfo(pyblish.api.InstancePlugin):
"""Collect data required for review instances.
ExtractReview plugin requires frame start/end, fps on instance data which
are missing on instances from TrayPublishes.
Warning:
This is temporary solution to "make it work". Contains removed changes
from https://github.com/ynput/OpenPype/pull/4383 reduced only for
review instances.
"""
label = "Collect Review Info"
order = pyblish.api.CollectorOrder + 0.491
families = ["review"]
hosts = ["traypublisher"]
def process(self, instance):
folder_entity = instance.data.get("folderEntity")
if instance.data.get("frameStart") is not None or not folder_entity:
self.log.debug("Missing required data on instance")
return
folder_attributes = folder_entity["attrib"]
# Store collected data for logging
collected_data = {}
for key in (
"fps",
"frameStart",
"frameEnd",
"handleStart",
"handleEnd",
):
if key in instance.data or key not in folder_attributes:
continue
value = folder_attributes[key]
collected_data[key] = value
instance.data[key] = value
self.log.debug("Collected data: {}".format(str(collected_data)))

View file

@ -0,0 +1,78 @@
import pyblish.api
import clique
from ayon_core.pipeline import OptionalPyblishPluginMixin
class CollectSequenceFrameData(
pyblish.api.InstancePlugin,
OptionalPyblishPluginMixin
):
"""Collect Original Sequence Frame Data
If the representation includes files with frame numbers,
then set `frameStart` and `frameEnd` for the instance to the
start and end frame respectively
"""
order = pyblish.api.CollectorOrder + 0.4905
label = "Collect Original Sequence Frame Data"
families = ["plate", "pointcache",
"vdbcache", "online",
"render"]
hosts = ["traypublisher"]
optional = True
def process(self, instance):
if not self.is_active(instance.data):
return
# editorial would fail since they might not be in database yet
new_folder_publishing = instance.data.get("newAssetPublishing")
if new_folder_publishing:
self.log.debug("Instance is creating new folders. Skipping.")
return
frame_data = self.get_frame_data_from_repre_sequence(instance)
if not frame_data:
# if no dict data skip collecting the frame range data
return
for key, value in frame_data.items():
instance.data[key] = value
self.log.debug(f"Collected Frame range data '{key}':{value} ")
def get_frame_data_from_repre_sequence(self, instance):
repres = instance.data.get("representations")
folder_attributes = instance.data["folderEntity"]["attrib"]
if repres:
first_repre = repres[0]
if "ext" not in first_repre:
self.log.warning("Cannot find file extension"
" in representation data")
return
files = first_repre["files"]
if not isinstance(files, list):
files = [files]
collections, _ = clique.assemble(files)
if not collections:
# No sequences detected and we can't retrieve
# frame range
self.log.debug(
"No sequences detected in the representation data."
" Skipping collecting frame range data.")
return
collection = collections[0]
repres_frames = list(collection.indexes)
return {
"frameStart": repres_frames[0],
"frameEnd": repres_frames[-1],
"handleStart": 0,
"handleEnd": 0,
"fps": folder_attributes["fps"]
}

View file

@ -0,0 +1,209 @@
from pprint import pformat
import pyblish.api
import opentimelineio as otio
class CollectShotInstance(pyblish.api.InstancePlugin):
""" Collect shot instances
Resolving its user inputs from creator attributes
to instance data.
"""
label = "Collect Shot Instances"
order = pyblish.api.CollectorOrder - 0.09
hosts = ["traypublisher"]
families = ["shot"]
SHARED_KEYS = [
"folderPath",
"fps",
"handleStart",
"handleEnd",
"frameStart",
"frameEnd",
"clipIn",
"clipOut",
"clipDuration",
"sourceIn",
"sourceOut",
"otioClip",
"workfileFrameStart"
]
def process(self, instance):
creator_identifier = instance.data["creator_identifier"]
if "editorial" not in creator_identifier:
return
# get otio clip object
otio_clip = self._get_otio_clip(instance)
instance.data["otioClip"] = otio_clip
# first solve the inputs from creator attr
data = self._solve_inputs_to_data(instance)
instance.data.update(data)
# distribute all shared keys to clips instances
self._distribute_shared_data(instance)
self._solve_hierarchy_context(instance)
self.log.debug(pformat(instance.data))
def _get_otio_clip(self, instance):
""" Converts otio string data.
Convert them to proper otio object
and finds its equivalent at otio timeline.
This process is a hack to support also
resolving parent range.
Args:
instance (obj): publishing instance
Returns:
otio.Clip: otio clip object
"""
context = instance.context
# convert otio clip from string to object
otio_clip_string = instance.data.pop("otioClip")
otio_clip = otio.adapters.read_from_string(
otio_clip_string)
otio_timeline = context.data["otioTimeline"]
clips = [
clip for clip in otio_timeline.each_child(
descended_from_type=otio.schema.Clip)
if clip.name == otio_clip.name
if clip.parent().kind == "Video"
]
otio_clip = clips.pop()
return otio_clip
def _distribute_shared_data(self, instance):
""" Distribute all defined keys.
All data are shared between all related
instances in context.
Args:
instance (obj): publishing instance
"""
context = instance.context
instance_id = instance.data["instance_id"]
if not context.data.get("editorialSharedData"):
context.data["editorialSharedData"] = {}
context.data["editorialSharedData"][instance_id] = {
_k: _v for _k, _v in instance.data.items()
if _k in self.SHARED_KEYS
}
def _solve_inputs_to_data(self, instance):
""" Resolve all user inputs into instance data.
Args:
instance (obj): publishing instance
Returns:
dict: instance data updating data
"""
_cr_attrs = instance.data["creator_attributes"]
workfile_start_frame = _cr_attrs["workfile_start_frame"]
frame_start = _cr_attrs["frameStart"]
frame_end = _cr_attrs["frameEnd"]
frame_dur = frame_end - frame_start
return {
"fps": float(_cr_attrs["fps"]),
"handleStart": _cr_attrs["handle_start"],
"handleEnd": _cr_attrs["handle_end"],
"frameStart": workfile_start_frame,
"frameEnd": workfile_start_frame + frame_dur,
"clipIn": _cr_attrs["clipIn"],
"clipOut": _cr_attrs["clipOut"],
"clipDuration": _cr_attrs["clipDuration"],
"sourceIn": _cr_attrs["sourceIn"],
"sourceOut": _cr_attrs["sourceOut"],
"workfileFrameStart": workfile_start_frame,
"folderPath": _cr_attrs["folderPath"],
}
def _solve_hierarchy_context(self, instance):
""" Adding hierarchy data to context shared data.
Args:
instance (obj): publishing instance
"""
context = instance.context
final_context = (
context.data["hierarchyContext"]
if context.data.get("hierarchyContext")
else {}
)
# get handles
handle_start = int(instance.data["handleStart"])
handle_end = int(instance.data["handleEnd"])
in_info = {
"entity_type": "folder",
"folder_type": "Shot",
"attributes": {
"handleStart": handle_start,
"handleEnd": handle_end,
"frameStart": instance.data["frameStart"],
"frameEnd": instance.data["frameEnd"],
"clipIn": instance.data["clipIn"],
"clipOut": instance.data["clipOut"],
"fps": instance.data["fps"]
},
"tasks": instance.data["tasks"]
}
parents = instance.data.get('parents', [])
folder_name = instance.data["folderPath"].split("/")[-1]
actual = {folder_name: in_info}
for parent in reversed(parents):
parent_name = parent["entity_name"]
parent_info = {
"entity_type": parent["entity_type"],
"children": actual,
}
if parent_info["entity_type"] == "folder":
parent_info["folder_type"] = parent["folder_type"]
actual = {parent_name: parent_info}
final_context = self._update_dict(final_context, actual)
# adding hierarchy context to instance
context.data["hierarchyContext"] = final_context
def _update_dict(self, ex_dict, new_dict):
""" Recursion function
Updating nested data with another nested data.
Args:
ex_dict (dict): nested data
new_dict (dict): nested data
Returns:
dict: updated nested data
"""
for key in ex_dict:
if key in new_dict and isinstance(ex_dict[key], dict):
new_dict[key] = self._update_dict(ex_dict[key], new_dict[key])
elif not ex_dict.get(key) or not new_dict.get(key):
new_dict[key] = ex_dict[key]
return new_dict

View file

@ -0,0 +1,272 @@
import os
import tempfile
from pathlib import Path
import clique
import pyblish.api
class CollectSettingsSimpleInstances(pyblish.api.InstancePlugin):
"""Collect data for instances created by settings creators.
Plugin create representations for simple instances based
on 'representation_files' attribute stored on instance data.
There is also possibility to have reviewable representation which can be
stored under 'reviewable' attribute stored on instance data. If there was
already created representation with the same files as 'reviewable' contains
Representations can be marked for review and in that case is also added
'review' family to instance families. For review can be marked only one
representation so **first** representation that has extension available
in '_review_extensions' is used for review.
For instance 'source' is used path from last representation created
from 'representation_files'.
Set staging directory on instance. That is probably never used because
each created representation has it's own staging dir.
"""
label = "Collect Settings Simple Instances"
order = pyblish.api.CollectorOrder - 0.49
hosts = ["traypublisher"]
def process(self, instance):
if not instance.data.get("settings_creator"):
return
instance_label = instance.data["name"]
# Create instance's staging dir in temp
tmp_folder = tempfile.mkdtemp(prefix="traypublisher_")
instance.data["stagingDir"] = tmp_folder
instance.context.data["cleanupFullPaths"].append(tmp_folder)
self.log.debug((
"Created temp staging directory for instance {}. {}"
).format(instance_label, tmp_folder))
self._fill_version(instance, instance_label)
# Store filepaths for validation of their existence
source_filepaths = []
# Make sure there are no representations with same name
repre_names_counter = {}
# Store created names for logging
repre_names = []
# Store set of filepaths per each representation
representation_files_mapping = []
source = self._create_main_representations(
instance,
source_filepaths,
repre_names_counter,
repre_names,
representation_files_mapping
)
self._create_review_representation(
instance,
source_filepaths,
repre_names_counter,
repre_names,
representation_files_mapping
)
source_filepaths = list(set(source_filepaths))
instance.data["source"] = source
instance.data["sourceFilepaths"] = source_filepaths
# NOTE: Missing filepaths should not cause crashes (at least not here)
# - if filepaths are required they should crash on validation
if source_filepaths:
# NOTE: Original basename is not handling sequences
# - we should maybe not fill the key when sequence is used?
origin_basename = Path(source_filepaths[0]).stem
instance.data["originalBasename"] = origin_basename
self.log.debug(
(
"Created Simple Settings instance \"{}\""
" with {} representations: {}"
).format(
instance_label,
len(instance.data["representations"]),
", ".join(repre_names)
)
)
def _fill_version(self, instance, instance_label):
"""Fill instance version under which will be instance integrated.
Instance must have set 'use_next_version' to 'False'
and 'version_to_use' to version to use.
Args:
instance (pyblish.api.Instance): Instance to fill version for.
instance_label (str): Label of instance to fill version for.
"""
creator_attributes = instance.data["creator_attributes"]
use_next_version = creator_attributes.get("use_next_version", True)
# If 'version_to_use' is '0' it means that next version should be used
version_to_use = creator_attributes.get("version_to_use", 0)
if use_next_version or not version_to_use:
return
instance.data["version"] = version_to_use
self.log.debug(
"Version for instance \"{}\" was set to \"{}\"".format(
instance_label, version_to_use))
def _create_main_representations(
self,
instance,
source_filepaths,
repre_names_counter,
repre_names,
representation_files_mapping
):
creator_attributes = instance.data["creator_attributes"]
filepath_items = creator_attributes["representation_files"]
if not isinstance(filepath_items, list):
filepath_items = [filepath_items]
source = None
for filepath_item in filepath_items:
# Skip if filepath item does not have filenames
if not filepath_item["filenames"]:
continue
filepaths = {
os.path.join(filepath_item["directory"], filename)
for filename in filepath_item["filenames"]
}
source_filepaths.extend(filepaths)
source = self._calculate_source(filepaths)
representation = self._create_representation_data(
filepath_item, repre_names_counter, repre_names
)
instance.data["representations"].append(representation)
representation_files_mapping.append(
(filepaths, representation, source)
)
return source
def _create_review_representation(
self,
instance,
source_filepaths,
repre_names_counter,
repre_names,
representation_files_mapping
):
# Skip review representation creation if there are no representations
# created for "main" part
# - review representation must not be created in that case so
# validation can care about it
if not representation_files_mapping:
self.log.warning((
"There are missing source representations."
" Creation of review representation was skipped."
))
return
creator_attributes = instance.data["creator_attributes"]
review_file_item = creator_attributes["reviewable"]
filenames = review_file_item.get("filenames")
if not filenames:
self.log.debug((
"Filepath for review is not defined."
" Skipping review representation creation."
))
return
item_dir = review_file_item["directory"]
first_filepath = os.path.join(item_dir, filenames[0])
filepaths = {
os.path.join(item_dir, filename)
for filename in filenames
}
source_filepaths.extend(filepaths)
# First try to find out representation with same filepaths
# so it's not needed to create new representation just for review
review_representation = None
# Review path (only for logging)
review_path = None
for item in representation_files_mapping:
_filepaths, representation, repre_path = item
if _filepaths == filepaths:
review_representation = representation
review_path = repre_path
break
if review_representation is None:
self.log.debug("Creating new review representation")
review_path = self._calculate_source(filepaths)
review_representation = self._create_representation_data(
review_file_item, repre_names_counter, repre_names
)
instance.data["representations"].append(review_representation)
if "review" not in instance.data["families"]:
instance.data["families"].append("review")
if not instance.data.get("thumbnailSource"):
instance.data["thumbnailSource"] = first_filepath
review_representation["tags"].append("review")
# Adding "review" to representation name since it can clash with main
# representation if they share the same extension.
review_representation["outputName"] = "review"
self.log.debug("Representation {} was marked for review. {}".format(
review_representation["name"], review_path
))
def _create_representation_data(
self, filepath_item, repre_names_counter, repre_names
):
"""Create new representation data based on file item.
Args:
filepath_item (Dict[str, Any]): Item with information about
representation paths.
repre_names_counter (Dict[str, int]): Store count of representation
names.
repre_names (List[str]): All used representation names. For
logging purposes.
Returns:
Dict: Prepared base representation data.
"""
filenames = filepath_item["filenames"]
_, ext = os.path.splitext(filenames[0])
if len(filenames) == 1:
filenames = filenames[0]
repre_name = repre_ext = ext[1:]
if repre_name not in repre_names_counter:
repre_names_counter[repre_name] = 2
else:
counter = repre_names_counter[repre_name]
repre_names_counter[repre_name] += 1
repre_name = "{}_{}".format(repre_name, counter)
repre_names.append(repre_name)
return {
"ext": repre_ext,
"name": repre_name,
"stagingDir": filepath_item["directory"],
"files": filenames,
"tags": []
}
def _calculate_source(self, filepaths):
cols, rems = clique.assemble(filepaths)
if cols:
source = cols[0].format("{head}{padding}{tail}")
elif rems:
source = rems[0]
return source

View file

@ -0,0 +1,24 @@
import pyblish.api
class CollectSource(pyblish.api.ContextPlugin):
"""Collecting instances from traypublisher host."""
label = "Collect source"
order = pyblish.api.CollectorOrder - 0.49
hosts = ["traypublisher"]
def process(self, context):
# get json paths from os and load them
source_name = "traypublisher"
for instance in context:
source = instance.data.get("source")
if not source:
instance.data["source"] = source_name
self.log.info((
"Source of instance \"{}\" is changed to \"{}\""
).format(instance.data["name"], source_name))
else:
self.log.info((
"Source of instance \"{}\" was already set to \"{}\""
).format(instance.data["name"], source))

View file

@ -0,0 +1,45 @@
import os
import json
import pyblish.api
from ayon_core.pipeline import publish
class ExtractColorspaceLook(publish.Extractor,
publish.AYONPyblishPluginMixin):
"""Extract OCIO colorspace look from LUT file
"""
label = "Extract Colorspace Look"
order = pyblish.api.ExtractorOrder
hosts = ["traypublisher"]
families = ["ociolook"]
def process(self, instance):
ociolook_items = instance.data["ocioLookItems"]
ociolook_working_color = instance.data["ocioLookWorkingSpace"]
staging_dir = self.staging_dir(instance)
# create ociolook file attributes
ociolook_file_name = "ocioLookFile.json"
ociolook_file_content = {
"version": 1,
"data": {
"ocioLookItems": ociolook_items,
"ocioLookWorkingSpace": ociolook_working_color
}
}
# write ociolook content into json file saved in staging dir
file_url = os.path.join(staging_dir, ociolook_file_name)
with open(file_url, "w") as f_:
json.dump(ociolook_file_content, f_, indent=4)
# create lut representation data
ociolook_repre = {
"name": "ocioLookFile",
"ext": "json",
"files": ociolook_file_name,
"stagingDir": staging_dir,
"tags": []
}
instance.data["representations"].append(ociolook_repre)

View file

@ -0,0 +1,31 @@
import pyblish.api
from ayon_core.pipeline import publish
class ExtractCSVFile(publish.Extractor):
"""
Extractor export CSV file
"""
label = "Extract CSV file"
order = pyblish.api.ExtractorOrder - 0.45
families = ["csv_ingest_file"]
hosts = ["traypublisher"]
def process(self, instance):
csv_file_data = instance.data["csvFileData"]
representation_csv = {
'name': "csv_data",
'ext': "csv",
'files': csv_file_data["filename"],
"stagingDir": csv_file_data["staging_dir"],
"stagingDir_persistent": True
}
instance.data["representations"].append(representation_csv)
self.log.info("Added CSV file representation: {}".format(
representation_csv))

View file

@ -0,0 +1,232 @@
import copy
import os.path
import subprocess
import opentimelineio
import pyblish.api
from ayon_core.lib import get_ffmpeg_tool_args, run_subprocess
from ayon_core.pipeline import publish
class ExtractEditorialPckgConversion(publish.Extractor):
"""Replaces movie paths in otio file with publish rootless
Prepares movie resources for integration (adds them to `transfers`).
Converts .mov files according to output definition.
"""
label = "Extract Editorial Package"
order = pyblish.api.ExtractorOrder - 0.45
hosts = ["traypublisher"]
families = ["editorial_pckg"]
def process(self, instance):
editorial_pckg_data = instance.data.get("editorial_pckg")
otio_path = editorial_pckg_data["otio_path"]
otio_basename = os.path.basename(otio_path)
staging_dir = self.staging_dir(instance)
editorial_pckg_repre = {
'name': "editorial_pckg",
'ext': "otio",
'files': otio_basename,
"stagingDir": staging_dir,
}
otio_staging_path = os.path.join(staging_dir, otio_basename)
instance.data["representations"].append(editorial_pckg_repre)
publish_resource_folder = self._get_publish_resource_folder(instance)
resource_paths = editorial_pckg_data["resource_paths"]
transfers = self._get_transfers(resource_paths,
publish_resource_folder)
project_settings = instance.context.data["project_settings"]
output_def = (project_settings["traypublisher"]
["publish"]
["ExtractEditorialPckgConversion"]
["output"])
conversion_enabled = (instance.data["creator_attributes"]
["conversion_enabled"])
if conversion_enabled and output_def["ext"]:
transfers = self._convert_resources(output_def, transfers)
instance.data["transfers"] = transfers
source_to_rootless = self._get_resource_path_mapping(instance,
transfers)
otio_data = editorial_pckg_data["otio_data"]
otio_data = self._replace_target_urls(otio_data, source_to_rootless)
opentimelineio.adapters.write_to_file(otio_data, otio_staging_path)
self.log.info("Added Editorial Package representation: {}".format(
editorial_pckg_repre))
def _get_publish_resource_folder(self, instance):
"""Calculates publish folder and create it."""
publish_path = self._get_published_path(instance)
publish_folder = os.path.dirname(publish_path)
publish_resource_folder = os.path.join(publish_folder, "resources")
if not os.path.exists(publish_resource_folder):
os.makedirs(publish_resource_folder, exist_ok=True)
return publish_resource_folder
def _get_resource_path_mapping(self, instance, transfers):
"""Returns dict of {source_mov_path: rootless_published_path}."""
replace_paths = {}
anatomy = instance.context.data["anatomy"]
for source, destination in transfers:
rootless_path = self._get_rootless(anatomy, destination)
source_file_name = os.path.basename(source)
replace_paths[source_file_name] = rootless_path
return replace_paths
def _get_transfers(self, resource_paths, publish_resource_folder):
"""Returns list of tuples (source, destination) with movie paths."""
transfers = []
for res_path in resource_paths:
res_basename = os.path.basename(res_path)
pub_res_path = os.path.join(publish_resource_folder, res_basename)
transfers.append((res_path, pub_res_path))
return transfers
def _replace_target_urls(self, otio_data, replace_paths):
"""Replace original movie paths with published rootless ones."""
for track in otio_data.tracks:
for clip in track:
# Check if the clip has a media reference
if clip.media_reference is not None:
# Access the target_url from the media reference
target_url = clip.media_reference.target_url
if not target_url:
continue
file_name = os.path.basename(target_url)
replace_path = replace_paths.get(file_name)
if replace_path:
clip.media_reference.target_url = replace_path
if clip.name == file_name:
clip.name = os.path.basename(replace_path)
return otio_data
def _get_rootless(self, anatomy, path):
"""Try to find rootless {root[work]} path from `path`"""
success, rootless_path = anatomy.find_root_template_from_path(
path)
if not success:
# `rootless_path` is not set to `output_dir` if none of roots match
self.log.warning(
f"Could not find root path for remapping '{path}'."
)
rootless_path = path
return rootless_path
def _get_published_path(self, instance):
"""Calculates expected `publish` folder"""
# determine published path from Anatomy.
template_data = instance.data.get("anatomyData")
rep = instance.data["representations"][0]
template_data["representation"] = rep.get("name")
template_data["ext"] = rep.get("ext")
template_data["comment"] = None
anatomy = instance.context.data["anatomy"]
template_data["root"] = anatomy.roots
template = anatomy.get_template_item("publish", "default", "path")
template_filled = template.format_strict(template_data)
return os.path.normpath(template_filled)
def _convert_resources(self, output_def, transfers):
"""Converts all resource files to configured format."""
out_extension = output_def["ext"]
if not out_extension:
self.log.warning("No output extension configured in "
"ayon+settings://traypublisher/publish/ExtractEditorialPckgConversion") # noqa
return transfers
final_transfers = []
out_def_ffmpeg_args = output_def["ffmpeg_args"]
ffmpeg_input_args = [
value.strip()
for value in out_def_ffmpeg_args["input"]
if value.strip()
]
ffmpeg_video_filters = [
value.strip()
for value in out_def_ffmpeg_args["video_filters"]
if value.strip()
]
ffmpeg_audio_filters = [
value.strip()
for value in out_def_ffmpeg_args["audio_filters"]
if value.strip()
]
ffmpeg_output_args = [
value.strip()
for value in out_def_ffmpeg_args["output"]
if value.strip()
]
ffmpeg_input_args = self._split_ffmpeg_args(ffmpeg_input_args)
generic_args = [
subprocess.list2cmdline(get_ffmpeg_tool_args("ffmpeg"))
]
generic_args.extend(ffmpeg_input_args)
if ffmpeg_video_filters:
generic_args.append("-filter:v")
generic_args.append(
"\"{}\"".format(",".join(ffmpeg_video_filters)))
if ffmpeg_audio_filters:
generic_args.append("-filter:a")
generic_args.append(
"\"{}\"".format(",".join(ffmpeg_audio_filters)))
for source, destination in transfers:
base_name = os.path.basename(destination)
file_name, ext = os.path.splitext(base_name)
dest_path = os.path.join(os.path.dirname(destination),
f"{file_name}.{out_extension}")
final_transfers.append((source, dest_path))
all_args = copy.deepcopy(generic_args)
all_args.append(f"-i \"{source}\"")
all_args.extend(ffmpeg_output_args) # order matters
all_args.append(f"\"{dest_path}\"")
subprcs_cmd = " ".join(all_args)
# run subprocess
self.log.debug("Executing: {}".format(subprcs_cmd))
run_subprocess(subprcs_cmd, shell=True, logger=self.log)
return final_transfers
def _split_ffmpeg_args(self, in_args):
"""Makes sure all entered arguments are separated in individual items.
Split each argument string with " -" to identify if string contains
one or more arguments.
"""
splitted_args = []
for arg in in_args:
sub_args = arg.split(" -")
if len(sub_args) == 1:
if arg and arg not in splitted_args:
splitted_args.append(arg)
continue
for idx, arg in enumerate(sub_args):
if idx != 0:
arg = "-" + arg
if arg and arg not in splitted_args:
splitted_args.append(arg)
return splitted_args

View file

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<root>
<error id="main">
<title>Version already exists</title>
<description>
## Version already exists
Version {version} you have set on instance '{product_name}' under '{folder_path}' already exists. This validation is enabled by default to prevent accidental override of existing versions.
### How to repair?
- Click on 'Repair' action -> this will change version to next available.
- Disable validation on the instance if you are sure you want to override the version.
- Reset publishing and manually change the version number.
</description>
</error>
</root>

View file

@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<root>
<error id="main">
<title>Invalid frame range</title>
<description>
## Invalid frame range
Expected duration or '{duration}' frames set in database, workfile contains only '{found}' frames.
### How to repair?
Modify configuration in the database or tweak frame range in the workfile.
</description>
</error>
</root>

View file

@ -0,0 +1,66 @@
import pyblish.api
from ayon_core.pipeline import (
publish,
PublishValidationError
)
from ayon_core.pipeline.colorspace import (
get_ocio_config_colorspaces
)
class ValidateColorspace(pyblish.api.InstancePlugin,
publish.AYONPyblishPluginMixin,
publish.ColormanagedPyblishPluginMixin):
"""Validate representation colorspaces"""
label = "Validate representation colorspace"
order = pyblish.api.ValidatorOrder
hosts = ["traypublisher"]
families = ["render", "plate", "reference", "image", "online"]
def process(self, instance):
config_colorspaces = {} # cache of colorspaces per config path
for repre in instance.data.get("representations", {}):
colorspace_data = repre.get("colorspaceData", {})
if not colorspace_data:
# Nothing to validate
continue
config_path = colorspace_data["config"]["path"]
if config_path not in config_colorspaces:
colorspaces = get_ocio_config_colorspaces(config_path)
if not colorspaces.get("colorspaces"):
message = (
f"OCIO config '{config_path}' does not contain any "
"colorspaces. This is an error in the OCIO config. "
"Contact your pipeline TD.",
)
raise PublishValidationError(
title="Colorspace validation",
message=message,
description=message
)
config_colorspaces[config_path] = set(
colorspaces["colorspaces"])
colorspace = colorspace_data["colorspace"]
self.log.debug(
f"Validating representation '{repre['name']}' "
f"colorspace '{colorspace}'"
)
if colorspace not in config_colorspaces[config_path]:
message = (
f"Representation '{repre['name']}' colorspace "
f"'{colorspace}' does not exist in OCIO config: "
f"{config_path}"
)
raise PublishValidationError(
title="Representation colorspace",
message=message,
description=message
)

View file

@ -0,0 +1,89 @@
import pyblish.api
from ayon_core.pipeline import (
publish,
PublishValidationError
)
class ValidateColorspaceLook(pyblish.api.InstancePlugin,
publish.AYONPyblishPluginMixin):
"""Validate colorspace look attributes"""
label = "Validate colorspace look attributes"
order = pyblish.api.ValidatorOrder
hosts = ["traypublisher"]
families = ["ociolook"]
def process(self, instance):
create_context = instance.context.data["create_context"]
created_instance = create_context.get_instance_by_id(
instance.data["instance_id"])
creator_defs = created_instance.creator_attribute_defs
ociolook_working_color = instance.data.get("ocioLookWorkingSpace")
ociolook_items = instance.data.get("ocioLookItems", [])
creator_defs_by_key = {_def.key: _def.label for _def in creator_defs}
not_set_keys = {}
if not ociolook_working_color:
not_set_keys["working_colorspace"] = creator_defs_by_key[
"working_colorspace"]
for ociolook_item in ociolook_items:
item_not_set_keys = self.validate_colorspace_set_attrs(
ociolook_item, creator_defs_by_key)
if item_not_set_keys:
not_set_keys[ociolook_item["name"]] = item_not_set_keys
if not_set_keys:
message = (
"Colorspace look attributes are not set: \n"
)
for key, value in not_set_keys.items():
if isinstance(value, list):
values_string = "\n\t- ".join(value)
message += f"\n\t{key}:\n\t- {values_string}"
else:
message += f"\n\t{value}"
raise PublishValidationError(
title="Colorspace Look attributes",
message=message,
description=message
)
def validate_colorspace_set_attrs(
self,
ociolook_item,
creator_defs_by_key
):
"""Validate colorspace look attributes"""
self.log.debug(f"Validate colorspace look attributes: {ociolook_item}")
check_keys = [
"input_colorspace",
"output_colorspace",
"direction",
"interpolation"
]
not_set_keys = []
for key in check_keys:
if ociolook_item[key]:
# key is set and it is correct
continue
def_label = creator_defs_by_key.get(key)
if not def_label:
# raise since key is not recognized by creator defs
raise KeyError(
f"Colorspace look attribute '{key}' is not "
f"recognized by creator attributes: {creator_defs_by_key}"
)
not_set_keys.append(def_label)
return not_set_keys

View file

@ -0,0 +1,68 @@
import os
import opentimelineio
import pyblish.api
from ayon_core.pipeline import PublishValidationError
class ValidateEditorialPackage(pyblish.api.InstancePlugin):
"""Checks that published folder contains all resources from otio
Currently checks only by file names and expects flat structure.
It ignores path to resources in otio file as folder might be dragged in and
published from different location than it was created.
"""
label = "Validate Editorial Package"
order = pyblish.api.ValidatorOrder - 0.49
hosts = ["traypublisher"]
families = ["editorial_pckg"]
def process(self, instance):
editorial_pckg_data = instance.data.get("editorial_pckg")
if not editorial_pckg_data:
raise PublishValidationError("Editorial package not collected")
folder_path = editorial_pckg_data["folder_path"]
otio_path = editorial_pckg_data["otio_path"]
if not otio_path:
raise PublishValidationError(
f"Folder {folder_path} missing otio file")
resource_paths = editorial_pckg_data["resource_paths"]
resource_file_names = {os.path.basename(path)
for path in resource_paths}
otio_data = opentimelineio.adapters.read_from_file(otio_path)
target_urls = self._get_all_target_urls(otio_data)
missing_files = set()
for target_url in target_urls:
target_basename = os.path.basename(target_url)
if target_basename not in resource_file_names:
missing_files.add(target_basename)
if missing_files:
raise PublishValidationError(
f"Otio file contains missing files `{missing_files}`.\n\n"
f"Please add them to `{folder_path}` and republish.")
instance.data["editorial_pckg"]["otio_data"] = otio_data
def _get_all_target_urls(self, otio_data):
target_urls = []
# Iterate through tracks, clips, or other elements
for track in otio_data.tracks:
for clip in track:
# Check if the clip has a media reference
if clip.media_reference is not None:
# Access the target_url from the media reference
target_url = clip.media_reference.target_url
if target_url:
target_urls.append(target_url)
return target_urls

View file

@ -0,0 +1,58 @@
import pyblish.api
from ayon_core.pipeline.publish import (
ValidateContentsOrder,
PublishXmlValidationError,
OptionalPyblishPluginMixin,
RepairAction,
)
class ValidateExistingVersion(
OptionalPyblishPluginMixin,
pyblish.api.InstancePlugin
):
label = "Validate Existing Version"
order = ValidateContentsOrder
hosts = ["traypublisher"]
targets = ["local"]
actions = [RepairAction]
settings_category = "traypublisher"
optional = True
def process(self, instance):
if not self.is_active(instance.data):
return
version = instance.data.get("version")
if version is None:
return
last_version = instance.data.get("latestVersion")
if last_version is None or last_version < version:
return
product_name = instance.data["productName"]
msg = "Version {} already exists for product {}.".format(
version, product_name)
formatting_data = {
"product_name": product_name,
"folder_path": instance.data["folderPath"],
"version": version
}
raise PublishXmlValidationError(
self, msg, formatting_data=formatting_data)
@classmethod
def repair(cls, instance):
create_context = instance.context.data["create_context"]
created_instance = create_context.get_instance_by_id(
instance.data["instance_id"])
creator_attributes = created_instance["creator_attributes"]
# Disable version override
creator_attributes["use_next_version"] = True
create_context.save_changes()

View file

@ -0,0 +1,68 @@
import os
import pyblish.api
from ayon_core.pipeline import PublishValidationError
class ValidateFilePath(pyblish.api.InstancePlugin):
"""Validate existence of source filepaths on instance.
Plugins looks into key 'sourceFilepaths' and validate if paths there
actually exist on disk.
Also validate if the key is filled but is empty. In that case also
crashes so do not fill the key if unfilled value should not cause error.
This is primarily created for Simple Creator instances.
"""
label = "Validate Filepaths"
order = pyblish.api.ValidatorOrder - 0.49
hosts = ["traypublisher"]
def process(self, instance):
if "sourceFilepaths" not in instance.data:
self.log.info((
"Skipped validation of source filepaths existence."
" Instance does not have collected 'sourceFilepaths'"
))
return
product_type = instance.data["productType"]
label = instance.data["name"]
filepaths = instance.data["sourceFilepaths"]
if not filepaths:
raise PublishValidationError(
(
"Source filepaths of '{}' instance \"{}\" are not filled"
).format(product_type, label),
"File not filled",
(
"## Files were not filled"
"\nThis mean that you didn't enter any files into required"
" file input."
"\n- Please refresh publishing and check instance"
" <b>{}</b>"
).format(label)
)
not_found_files = [
filepath
for filepath in filepaths
if not os.path.exists(filepath)
]
if not_found_files:
joined_paths = "\n".join([
"- {}".format(filepath)
for filepath in not_found_files
])
raise PublishValidationError(
(
"Filepath of '{}' instance \"{}\" does not exist:\n{}"
).format(product_type, label, joined_paths),
"File not found",
(
"## Files were not found\nFiles\n{}"
"\n\nCheck if the path is still available."
).format(joined_paths)
)

View file

@ -0,0 +1,82 @@
import re
import pyblish.api
from ayon_core.pipeline.publish import (
ValidateContentsOrder,
PublishXmlValidationError,
OptionalPyblishPluginMixin,
)
class ValidateFrameRange(OptionalPyblishPluginMixin,
pyblish.api.InstancePlugin):
"""Validating frame range of rendered files against state in DB."""
label = "Validate Frame Range"
hosts = ["traypublisher"]
families = ["render", "plate"]
targets = ["local"]
order = ValidateContentsOrder
optional = True
# published data might be sequence (.mov, .mp4) in that counting files
# doesn't make sense
check_extensions = ["exr", "dpx", "jpg", "jpeg", "png", "tiff", "tga",
"gif", "svg"]
skip_timelines_check = [] # skip for specific task names (regex)
def process(self, instance):
# Skip the instance if is not active by data on the instance
if not self.is_active(instance.data):
return
# editorial would fail since they might not be in database yet
new_folder_publishing = instance.data.get("newAssetPublishing")
if new_folder_publishing:
self.log.debug("Instance is creating new folder. Skipping.")
return
if (self.skip_timelines_check and
any(re.search(pattern, instance.data["task"])
for pattern in self.skip_timelines_check)):
self.log.info("Skipping for {} task".format(instance.data["task"]))
folder_attributes = instance.data["folderEntity"]["attrib"]
frame_start = folder_attributes["frameStart"]
frame_end = folder_attributes["frameEnd"]
handle_start = folder_attributes["handleStart"]
handle_end = folder_attributes["handleEnd"]
duration = (frame_end - frame_start + 1) + handle_start + handle_end
repres = instance.data.get("representations")
if not repres:
self.log.info("No representations, skipping.")
return
first_repre = repres[0]
ext = first_repre['ext'].replace(".", '')
if not ext or ext.lower() not in self.check_extensions:
self.log.warning("Cannot check for extension {}".format(ext))
return
files = first_repre["files"]
if isinstance(files, str):
files = [files]
frames = len(files)
msg = (
"Frame duration from DB:'{}' doesn't match number of files:'{}'"
" Please change frame range for Folder or limit no. of files"
). format(int(duration), frames)
formatting_data = {"duration": duration,
"found": frames}
if frames != duration:
raise PublishXmlValidationError(self, msg,
formatting_data=formatting_data)
self.log.debug("Valid ranges expected '{}' - found '{}'".
format(int(duration), frames))

View file

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
import ayon_api
import pyblish.api
from ayon_core.pipeline.publish import (
ValidateContentsOrder,
PublishValidationError,
OptionalPyblishPluginMixin,
)
class ValidateOnlineFile(OptionalPyblishPluginMixin,
pyblish.api.InstancePlugin):
"""Validate that product doesn't exist yet."""
label = "Validate Existing Online Files"
hosts = ["traypublisher"]
families = ["online"]
order = ValidateContentsOrder
optional = True
def process(self, instance):
if not self.is_active(instance.data):
return
project_name = instance.context.data["projectName"]
folder_id = instance.data["folderEntity"]["id"]
product_entity = ayon_api.get_product_by_name(
project_name, instance.data["productName"], folder_id)
if product_entity:
raise PublishValidationError(
"Product to be published already exists.",
title=self.label
)