Merge branch 'develop' into bugfix/AY-5160_Max-The-asset-isnt-added-into-AYONOP-Data-parameter-after-rename

This commit is contained in:
Kayla Man 2024-04-29 14:58:27 +08:00 committed by GitHub
commit 4f76ff04cc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1428 additions and 2 deletions

View file

@ -0,0 +1,101 @@
from typing import List
from maya import cmds
def get_yeti_user_variables(yeti_shape_node: str) -> List[str]:
"""Get user defined yeti user variables for a `pgYetiMaya` shape node.
Arguments:
yeti_shape_node (str): The `pgYetiMaya` shape node.
Returns:
list: Attribute names (for a vector attribute it only lists the top
parent attribute, not the attribute per axis)
"""
attrs = cmds.listAttr(yeti_shape_node,
userDefined=True,
string=("yetiVariableV_*",
"yetiVariableF_*")) or []
valid_attrs = []
for attr in attrs:
attr_type = cmds.attributeQuery(attr, node=yeti_shape_node,
attributeType=True)
if attr.startswith("yetiVariableV_") and attr_type == "double3":
# vector
valid_attrs.append(attr)
elif attr.startswith("yetiVariableF_") and attr_type == "double":
valid_attrs.append(attr)
return valid_attrs
def create_yeti_variable(yeti_shape_node: str,
attr_name: str,
value=None,
force_value: bool = False) -> bool:
"""Get user defined yeti user variables for a `pgYetiMaya` shape node.
Arguments:
yeti_shape_node (str): The `pgYetiMaya` shape node.
attr_name (str): The fully qualified yeti variable name, e.g.
"yetiVariableF_myfloat" or "yetiVariableV_myvector"
value (object): The value to set (must match the type of the attribute)
When value is None it will ignored and not be set.
force_value (bool): Whether to set the value if the attribute already
exists or not.
Returns:
bool: Whether the attribute value was set or not.
"""
exists = cmds.attributeQuery(attr_name, node=yeti_shape_node, exists=True)
if not exists:
if attr_name.startswith("yetiVariableV_"):
_create_vector_yeti_user_variable(yeti_shape_node, attr_name)
if attr_name.startswith("yetiVariableF_"):
_create_float_yeti_user_variable(yeti_shape_node, attr_name)
if value is not None and (not exists or force_value):
plug = "{}.{}".format(yeti_shape_node, attr_name)
if (
isinstance(value, (list, tuple))
and attr_name.startswith("yetiVariableV_")
):
cmds.setAttr(plug, *value, type="double3")
else:
cmds.setAttr(plug, value)
return True
return False
def _create_vector_yeti_user_variable(yeti_shape_node: str, attr_name: str):
if not attr_name.startswith("yetiVariableV_"):
raise ValueError("Must start with yetiVariableV_")
cmds.addAttr(yeti_shape_node,
longName=attr_name,
attributeType="double3",
cachedInternally=True,
keyable=True)
for axis in "XYZ":
cmds.addAttr(yeti_shape_node,
longName="{}{}".format(attr_name, axis),
attributeType="double",
parent=attr_name,
cachedInternally=True,
keyable=True)
def _create_float_yeti_user_variable(yeti_node: str, attr_name: str):
if not attr_name.startswith("yetiVariableF_"):
raise ValueError("Must start with yetiVariableF_")
cmds.addAttr(yeti_node,
longName=attr_name,
attributeType="double",
cachedInternally=True,
softMinValue=0,
softMaxValue=100,
keyable=True)

View file

@ -12,6 +12,7 @@ from ayon_core.pipeline import (
get_representation_path
)
from ayon_core.hosts.maya.api import lib
from ayon_core.hosts.maya.api.yeti import create_yeti_variable
from ayon_core.hosts.maya.api.pipeline import containerise
from ayon_core.hosts.maya.api.plugin import get_load_color_for_product_type
@ -23,8 +24,19 @@ SKIP_UPDATE_ATTRS = {
"viewportDensity",
"viewportWidth",
"viewportLength",
"renderDensity",
"renderWidth",
"renderLength",
"increaseRenderBounds"
}
SKIP_ATTR_MESSAGE = (
"Skipping updating %s.%s to %s because it "
"is considered a local overridable attribute. "
"Either set manually or the load the cache "
"anew."
)
def set_attribute(node, attr, value):
"""Wrapper of set attribute which ignores None values"""
@ -209,9 +221,31 @@ class YetiCacheLoader(load.LoaderPlugin):
for attr, value in node_settings["attrs"].items():
if attr in SKIP_UPDATE_ATTRS:
self.log.info(
SKIP_ATTR_MESSAGE, yeti_node, attr, value
)
continue
set_attribute(attr, value, yeti_node)
# Set up user defined attributes
user_variables = node_settings.get("user_variables", {})
for attr, value in user_variables.items():
was_value_set = create_yeti_variable(
yeti_shape_node=yeti_node,
attr_name=attr,
value=value,
# We do not want to update the
# value if it already exists so
# that any local overrides that
# may have been applied still
# persist
force_value=False
)
if not was_value_set:
self.log.info(
SKIP_ATTR_MESSAGE, yeti_node, attr, value
)
cmds.setAttr("{}.representation".format(container_node),
repre_entity["id"],
typ="string")
@ -332,6 +366,13 @@ class YetiCacheLoader(load.LoaderPlugin):
for attr, value in attributes.items():
set_attribute(attr, value, yeti_node)
# Set up user defined attributes
user_variables = node_settings.get("user_variables", {})
for attr, value in user_variables.items():
create_yeti_variable(yeti_shape_node=yeti_node,
attr_name=attr,
value=value)
# Connect to the time node
cmds.connectAttr("time1.outTime", "%s.currentTime" % yeti_node)

View file

@ -3,6 +3,7 @@ from maya import cmds
import pyblish.api
from ayon_core.hosts.maya.api import lib
from ayon_core.hosts.maya.api.yeti import get_yeti_user_variables
SETTINGS = {
@ -34,7 +35,7 @@ class CollectYetiCache(pyblish.api.InstancePlugin):
- "increaseRenderBounds"
- "imageSearchPath"
Other information is the name of the transform and it's Colorbleed ID
Other information is the name of the transform and its `cbId`
"""
order = pyblish.api.CollectorOrder + 0.45
@ -54,6 +55,16 @@ class CollectYetiCache(pyblish.api.InstancePlugin):
# Get specific node attributes
attr_data = {}
for attr in SETTINGS:
# Ignore non-existing attributes with a warning, e.g. cbId
# if they have not been generated yet
if not cmds.attributeQuery(attr, node=shape, exists=True):
self.log.warning(
"Attribute '{}' not found on Yeti node: {}".format(
attr, shape
)
)
continue
current = cmds.getAttr("%s.%s" % (shape, attr))
# change None to empty string as Maya doesn't support
# NoneType in attributes
@ -61,6 +72,12 @@ class CollectYetiCache(pyblish.api.InstancePlugin):
current = ""
attr_data[attr] = current
# Get user variable attributes
user_variable_attrs = {
attr: lib.get_attribute("{}.{}".format(shape, attr))
for attr in get_yeti_user_variables(shape)
}
# Get transform data
parent = cmds.listRelatives(shape, parent=True)[0]
transform_data = {"name": parent, "cbId": lib.get_id(parent)}
@ -70,6 +87,7 @@ class CollectYetiCache(pyblish.api.InstancePlugin):
"name": shape,
"cbId": lib.get_id(shape),
"attrs": attr_data,
"user_variables": user_variable_attrs
}
settings["nodes"].append(shape_data)

View file

@ -1,5 +1,6 @@
import os
from pathlib import Path
from ayon_core.lib import get_ayon_launcher_args
from ayon_core.lib.execute import run_detached_process
from ayon_core.addon import (
@ -57,3 +58,62 @@ def launch():
from ayon_core.tools import traypublisher
traypublisher.main()
@cli_main.command()
@click_wrap.option(
"--filepath",
help="Full path to CSV file with data",
type=str,
required=True
)
@click_wrap.option(
"--project",
help="Project name in which the context will be used",
type=str,
required=True
)
@click_wrap.option(
"--folder-path",
help="Asset name in which the context will be used",
type=str,
required=True
)
@click_wrap.option(
"--task",
help="Task name under Asset in which the context will be used",
type=str,
required=False
)
@click_wrap.option(
"--ignore-validators",
help="Option to ignore validators",
type=bool,
is_flag=True,
required=False
)
def ingestcsv(
filepath,
project,
folder_path,
task,
ignore_validators
):
"""Ingest CSV file into project.
This command will ingest CSV file into project. CSV file must be in
specific format. See documentation for more information.
"""
from .csv_publish import csvpublish
# use Path to check if csv_filepath exists
if not Path(filepath).exists():
raise FileNotFoundError(f"File {filepath} does not exist.")
csvpublish(
filepath,
project,
folder_path,
task,
ignore_validators
)

View file

@ -0,0 +1,86 @@
import os
import pyblish.api
import pyblish.util
from ayon_api import get_folder_by_path, get_task_by_name
from ayon_core.lib.attribute_definitions import FileDefItem
from ayon_core.pipeline import install_host
from ayon_core.pipeline.create import CreateContext
from ayon_core.hosts.traypublisher.api import TrayPublisherHost
def csvpublish(
filepath,
project_name,
folder_path,
task_name=None,
ignore_validators=False
):
"""Publish CSV file.
Args:
filepath (str): Path to CSV file.
project_name (str): Project name.
folder_path (str): Folder path.
task_name (Optional[str]): Task name.
ignore_validators (Optional[bool]): Option to ignore validators.
"""
# initialization of host
host = TrayPublisherHost()
install_host(host)
# setting host context into project
host.set_project_name(project_name)
# form precreate data with field values
file_field = FileDefItem.from_paths([filepath], False).pop().to_dict()
precreate_data = {
"csv_filepath_data": file_field,
}
# create context initialization
create_context = CreateContext(host, headless=True)
folder_entity = get_folder_by_path(
project_name,
folder_path=folder_path,
)
if not folder_entity:
ValueError(
f"Folder path '{folder_path}' doesn't "
f"exists at project '{project_name}'."
)
task_entity = get_task_by_name(
project_name,
folder_entity["id"],
task_name,
)
if not task_entity:
ValueError(
f"Task name '{task_name}' doesn't "
f"exists at folder '{folder_path}'."
)
create_context.create(
"io.ayon.creators.traypublisher.csv_ingest",
"Main",
folder_entity=folder_entity,
task_entity=task_entity,
pre_create_data=precreate_data,
)
# publishing context initialization
pyblish_context = pyblish.api.Context()
pyblish_context.data["create_context"] = create_context
# redefine targets (skip 'local' to disable validators)
if ignore_validators:
targets = ["default", "ingest"]
# publishing
pyblish.util.publish(context=pyblish_context, targets=targets)

View file

@ -0,0 +1,741 @@
import os
import re
import csv
import clique
from io import StringIO
from copy import deepcopy, copy
from ayon_api import get_folder_by_path, get_task_by_name
from ayon_core.pipeline.create import get_product_name
from ayon_core.pipeline import CreatedInstance
from ayon_core.lib import FileDef, BoolDef
from ayon_core.lib.transcoding import (
VIDEO_EXTENSIONS, IMAGE_EXTENSIONS
)
from ayon_core.pipeline.create import CreatorError
from ayon_core.hosts.traypublisher.api.plugin import (
TrayPublishCreator
)
class IngestCSV(TrayPublishCreator):
"""CSV ingest creator class"""
icon = "fa.file"
label = "CSV Ingest"
product_type = "csv_ingest_file"
identifier = "io.ayon.creators.traypublisher.csv_ingest"
default_variants = ["Main"]
description = "Ingest products' data from CSV file"
detailed_description = """
Ingest products' data from CSV file following column and representation
configuration in project settings.
"""
# Position in the list of creators.
order = 10
# settings for this creator
columns_config = {}
representations_config = {}
def create(self, subset_name, instance_data, pre_create_data):
"""Create an product from each row found in the CSV.
Args:
subset_name (str): The subset name.
instance_data (dict): The instance data.
pre_create_data (dict):
"""
csv_filepath_data = pre_create_data.get("csv_filepath_data", {})
folder = csv_filepath_data.get("directory", "")
if not os.path.exists(folder):
raise CreatorError(
f"Directory '{folder}' does not exist."
)
filename = csv_filepath_data.get("filenames", [])
self._process_csv_file(subset_name, instance_data, folder, filename[0])
def _process_csv_file(
self, subset_name, instance_data, staging_dir, filename):
"""Process CSV file.
Args:
subset_name (str): The subset name.
instance_data (dict): The instance data.
staging_dir (str): The staging directory.
filename (str): The filename.
"""
# create new instance from the csv file via self function
self._pass_data_to_csv_instance(
instance_data,
staging_dir,
filename
)
csv_instance = CreatedInstance(
self.product_type, subset_name, instance_data, self
)
self._store_new_instance(csv_instance)
csv_instance["csvFileData"] = {
"filename": filename,
"staging_dir": staging_dir,
}
# from special function get all data from csv file and convert them
# to new instances
csv_data_for_instances = self._get_data_from_csv(
staging_dir, filename)
# create instances from csv data via self function
self._create_instances_from_csv_data(
csv_data_for_instances, staging_dir
)
def _create_instances_from_csv_data(
self,
csv_data_for_instances,
staging_dir
):
"""Create instances from csv data"""
for folder_path, prepared_data in csv_data_for_instances.items():
project_name = self.create_context.get_current_project_name()
products = prepared_data["products"]
for instance_name, product_data in products.items():
# get important instance variables
task_name = product_data["task_name"]
task_type = product_data["task_type"]
variant = product_data["variant"]
product_type = product_data["product_type"]
version = product_data["version"]
# create subset/product name
product_name = get_product_name(
project_name,
task_name,
task_type,
self.host_name,
product_type,
variant
)
# make sure frame start/end is inherited from csv columns
# expected frame range data are handles excluded
for _, repre_data in product_data["representations"].items(): # noqa: E501
frame_start = repre_data["frameStart"]
frame_end = repre_data["frameEnd"]
handle_start = repre_data["handleStart"]
handle_end = repre_data["handleEnd"]
fps = repre_data["fps"]
break
# try to find any version comment in representation data
version_comment = next(
iter(
repre_data["comment"]
for repre_data in product_data["representations"].values() # noqa: E501
if repre_data["comment"]
),
None
)
# try to find any slate switch in representation data
slate_exists = any(
repre_data["slate"]
for _, repre_data in product_data["representations"].items() # noqa: E501
)
# get representations from product data
representations = product_data["representations"]
label = f"{folder_path}_{product_name}_v{version:>03}"
families = ["csv_ingest"]
if slate_exists:
# adding slate to families mainly for loaders to be able
# to filter out slates
families.append("slate")
# make product data
product_data = {
"name": instance_name,
"folderPath": folder_path,
"families": families,
"label": label,
"task": task_name,
"variant": variant,
"source": "csv",
"frameStart": frame_start,
"frameEnd": frame_end,
"handleStart": handle_start,
"handleEnd": handle_end,
"fps": fps,
"version": version,
"comment": version_comment,
}
# create new instance
new_instance = CreatedInstance(
product_type, product_name, product_data, self
)
self._store_new_instance(new_instance)
if not new_instance.get("prepared_data_for_repres"):
new_instance["prepared_data_for_repres"] = []
base_thumbnail_repre_data = {
"name": "thumbnail",
"ext": None,
"files": None,
"stagingDir": None,
"stagingDir_persistent": True,
"tags": ["thumbnail", "delete"],
}
# need to populate all thumbnails for all representations
# so we can check if unique thumbnail per representation
# is needed
thumbnails = [
repre_data["thumbnailPath"]
for repre_data in representations.values()
if repre_data["thumbnailPath"]
]
multiple_thumbnails = len(set(thumbnails)) > 1
explicit_output_name = None
thumbnails_processed = False
for filepath, repre_data in representations.items():
# check if any review derivate tag is present
reviewable = any(
tag for tag in repre_data.get("tags", [])
# tag can be `ftrackreview` or `review`
if "review" in tag
)
# since we need to populate multiple thumbnails as
# representation with outputName for (Ftrack instance
# integrator) pairing with reviewable video representations
if (
thumbnails
and multiple_thumbnails
and reviewable
):
# multiple unique thumbnails per representation needs
# grouping by outputName
# mainly used in Ftrack instance integrator
explicit_output_name = repre_data["representationName"]
relative_thumbnail_path = repre_data["thumbnailPath"]
# representation might not have thumbnail path
# so ignore this one
if not relative_thumbnail_path:
continue
thumb_dir, thumb_file = \
self._get_refactor_thumbnail_path(
staging_dir, relative_thumbnail_path)
filename, ext = os.path.splitext(thumb_file)
thumbnail_repr_data = deepcopy(
base_thumbnail_repre_data)
thumbnail_repr_data.update({
"name": "thumbnail_{}".format(filename),
"ext": ext[1:],
"files": thumb_file,
"stagingDir": thumb_dir,
"outputName": explicit_output_name,
})
new_instance["prepared_data_for_repres"].append({
"type": "thumbnail",
"colorspace": None,
"representation": thumbnail_repr_data,
})
# also add thumbnailPath for ayon to integrate
if not new_instance.get("thumbnailPath"):
new_instance["thumbnailPath"] = (
os.path.join(thumb_dir, thumb_file)
)
elif (
thumbnails
and not multiple_thumbnails
and not thumbnails_processed
or not reviewable
):
"""
For case where we have only one thumbnail
and not reviewable medias. This needs to be processed
only once per instance.
"""
if not thumbnails:
continue
# here we will use only one thumbnail for
# all representations
relative_thumbnail_path = repre_data["thumbnailPath"]
# popping last thumbnail from list since it is only one
# and we do not need to iterate again over it
if not relative_thumbnail_path:
relative_thumbnail_path = thumbnails.pop()
thumb_dir, thumb_file = \
self._get_refactor_thumbnail_path(
staging_dir, relative_thumbnail_path)
_, ext = os.path.splitext(thumb_file)
thumbnail_repr_data = deepcopy(
base_thumbnail_repre_data)
thumbnail_repr_data.update({
"ext": ext[1:],
"files": thumb_file,
"stagingDir": thumb_dir
})
new_instance["prepared_data_for_repres"].append({
"type": "thumbnail",
"colorspace": None,
"representation": thumbnail_repr_data,
})
# also add thumbnailPath for ayon to integrate
if not new_instance.get("thumbnailPath"):
new_instance["thumbnailPath"] = (
os.path.join(thumb_dir, thumb_file)
)
thumbnails_processed = True
# get representation data
representation_data = self._get_representation_data(
filepath, repre_data, staging_dir,
explicit_output_name
)
new_instance["prepared_data_for_repres"].append({
"type": "media",
"colorspace": repre_data["colorspace"],
"representation": representation_data,
})
def _get_refactor_thumbnail_path(
self, staging_dir, relative_thumbnail_path):
thumbnail_abs_path = os.path.join(
staging_dir, relative_thumbnail_path)
return os.path.split(
thumbnail_abs_path)
def _get_representation_data(
self, filepath, repre_data, staging_dir, explicit_output_name=None
):
"""Get representation data
Args:
filepath (str): Filepath to representation file.
repre_data (dict): Representation data from CSV file.
staging_dir (str): Staging directory.
explicit_output_name (Optional[str]): Explicit output name.
For grouping purposes with reviewable components.
Defaults to None.
"""
# get extension of file
basename = os.path.basename(filepath)
extension = os.path.splitext(filepath)[-1].lower()
# validate filepath is having correct extension based on output
repre_name = repre_data["representationName"]
repre_config_data = None
for repre in self.representations_config["representations"]:
if repre["name"] == repre_name:
repre_config_data = repre
break
if not repre_config_data:
raise CreatorError(
f"Representation '{repre_name}' not found "
"in config representation data."
)
validate_extensions = repre_config_data["extensions"]
if extension not in validate_extensions:
raise CreatorError(
f"File extension '{extension}' not valid for "
f"output '{validate_extensions}'."
)
is_sequence = (extension in IMAGE_EXTENSIONS)
# convert ### string in file name to %03d
# this is for correct frame range validation
# example: file.###.exr -> file.%03d.exr
if "#" in basename:
padding = len(basename.split("#")) - 1
basename = basename.replace("#" * padding, f"%0{padding}d")
is_sequence = True
# make absolute path to file
absfilepath = os.path.normpath(os.path.join(staging_dir, filepath))
dirname = os.path.dirname(absfilepath)
# check if dirname exists
if not os.path.isdir(dirname):
raise CreatorError(
f"Directory '{dirname}' does not exist."
)
# collect all data from dirname
paths_for_collection = []
for file in os.listdir(dirname):
filepath = os.path.join(dirname, file)
paths_for_collection.append(filepath)
collections, _ = clique.assemble(paths_for_collection)
if collections:
collections = collections[0]
else:
if is_sequence:
raise CreatorError(
f"No collections found in directory '{dirname}'."
)
frame_start = None
frame_end = None
if is_sequence:
files = [os.path.basename(file) for file in collections]
frame_start = list(collections.indexes)[0]
frame_end = list(collections.indexes)[-1]
else:
files = basename
tags = deepcopy(repre_data["tags"])
# if slate in repre_data is True then remove one frame from start
if repre_data["slate"]:
tags.append("has_slate")
# get representation data
representation_data = {
"name": repre_name,
"ext": extension[1:],
"files": files,
"stagingDir": dirname,
"stagingDir_persistent": True,
"tags": tags,
}
if extension in VIDEO_EXTENSIONS:
representation_data.update({
"fps": repre_data["fps"],
"outputName": repre_name,
})
if explicit_output_name:
representation_data["outputName"] = explicit_output_name
if frame_start:
representation_data["frameStart"] = frame_start
if frame_end:
representation_data["frameEnd"] = frame_end
return representation_data
def _get_data_from_csv(
self, package_dir, filename
):
"""Generate instances from the csv file"""
# get current project name and code from context.data
project_name = self.create_context.get_current_project_name()
csv_file_path = os.path.join(
package_dir, filename
)
# make sure csv file contains columns from following list
required_columns = [
column["name"] for column in self.columns_config["columns"]
if column["required_column"]
]
# read csv file
with open(csv_file_path, "r") as csv_file:
csv_content = csv_file.read()
# read csv file with DictReader
csv_reader = csv.DictReader(
StringIO(csv_content),
delimiter=self.columns_config["csv_delimiter"]
)
# fix fieldnames
# sometimes someone can keep extra space at the start or end of
# the column name
all_columns = [
" ".join(column.rsplit()) for column in csv_reader.fieldnames]
# return back fixed fieldnames
csv_reader.fieldnames = all_columns
# check if csv file contains all required columns
if any(column not in all_columns for column in required_columns):
raise CreatorError(
f"Missing required columns: {required_columns}"
)
csv_data = {}
# get data from csv file
for row in csv_reader:
# Get required columns first
# TODO: will need to be folder path in CSV
# TODO: `context_asset_name` is now `folder_path`
folder_path = self._get_row_value_with_validation(
"Folder Path", row)
task_name = self._get_row_value_with_validation(
"Task Name", row)
version = self._get_row_value_with_validation(
"Version", row)
# Get optional columns
variant = self._get_row_value_with_validation(
"Variant", row)
product_type = self._get_row_value_with_validation(
"Product Type", row)
pre_product_name = (
f"{task_name}{variant}{product_type}"
f"{version}".replace(" ", "").lower()
)
# get representation data
filename, representation_data = \
self._get_representation_row_data(row)
# TODO: batch query of all folder paths and task names
# get folder entity from folder path
folder_entity = get_folder_by_path(
project_name, folder_path)
# make sure asset exists
if not folder_entity:
raise CreatorError(
f"Asset '{folder_path}' not found."
)
# first get all tasks on the folder entity and then find
task_entity = get_task_by_name(
project_name, folder_entity["id"], task_name)
# check if task name is valid task in asset doc
if not task_entity:
raise CreatorError(
f"Task '{task_name}' not found in asset doc."
)
# get all csv data into one dict and make sure there are no
# duplicates data are already validated and sorted under
# correct existing asset also check if asset exists and if
# task name is valid task in asset doc and representations
# are distributed under products following variants
if folder_path not in csv_data:
csv_data[folder_path] = {
"folder_entity": folder_entity,
"products": {
pre_product_name: {
"task_name": task_name,
"task_type": task_entity["taskType"],
"variant": variant,
"product_type": product_type,
"version": version,
"representations": {
filename: representation_data,
},
}
}
}
else:
csv_products = csv_data[folder_path]["products"]
if pre_product_name not in csv_products:
csv_products[pre_product_name] = {
"task_name": task_name,
"task_type": task_entity["taskType"],
"variant": variant,
"product_type": product_type,
"version": version,
"representations": {
filename: representation_data,
},
}
else:
csv_representations = \
csv_products[pre_product_name]["representations"]
if filename in csv_representations:
raise CreatorError(
f"Duplicate filename '{filename}' in csv file."
)
csv_representations[filename] = representation_data
return csv_data
def _get_representation_row_data(self, row_data):
"""Get representation row data"""
# Get required columns first
file_path = self._get_row_value_with_validation(
"File Path", row_data)
frame_start = self._get_row_value_with_validation(
"Frame Start", row_data)
frame_end = self._get_row_value_with_validation(
"Frame End", row_data)
handle_start = self._get_row_value_with_validation(
"Handle Start", row_data)
handle_end = self._get_row_value_with_validation(
"Handle End", row_data)
fps = self._get_row_value_with_validation(
"FPS", row_data)
# Get optional columns
thumbnail_path = self._get_row_value_with_validation(
"Version Thumbnail", row_data)
colorspace = self._get_row_value_with_validation(
"Representation Colorspace", row_data)
comment = self._get_row_value_with_validation(
"Version Comment", row_data)
repre = self._get_row_value_with_validation(
"Representation", row_data)
slate_exists = self._get_row_value_with_validation(
"Slate Exists", row_data)
repre_tags = self._get_row_value_with_validation(
"Representation Tags", row_data)
# convert tags value to list
tags_list = copy(self.representations_config["default_tags"])
if repre_tags:
tags_list = []
tags_delimiter = self.representations_config["tags_delimiter"]
# strip spaces from repre_tags
if tags_delimiter in repre_tags:
tags = repre_tags.split(tags_delimiter)
for _tag in tags:
tags_list.append(("".join(_tag.strip())).lower())
else:
tags_list.append(repre_tags)
representation_data = {
"colorspace": colorspace,
"comment": comment,
"representationName": repre,
"slate": slate_exists,
"tags": tags_list,
"thumbnailPath": thumbnail_path,
"frameStart": int(frame_start),
"frameEnd": int(frame_end),
"handleStart": int(handle_start),
"handleEnd": int(handle_end),
"fps": float(fps),
}
return file_path, representation_data
def _get_row_value_with_validation(
self, column_name, row_data, default_value=None
):
"""Get row value with validation"""
# get column data from column config
column_data = None
for column in self.columns_config["columns"]:
if column["name"] == column_name:
column_data = column
break
if not column_data:
raise CreatorError(
f"Column '{column_name}' not found in column config."
)
# get column value from row
column_value = row_data.get(column_name)
column_required = column_data["required_column"]
# check if column value is not empty string and column is required
if column_value == "" and column_required:
raise CreatorError(
f"Value in column '{column_name}' is required."
)
# get column type
column_type = column_data["type"]
# get column validation regex
column_validation = column_data["validation_pattern"]
# get column default value
column_default = default_value or column_data["default"]
if column_type in ["number", "decimal"] and column_default == 0:
column_default = None
# check if column value is not empty string
if column_value == "":
# set default value if column value is empty string
column_value = column_default
# set column value to correct type following column type
if column_type == "number" and column_value is not None:
column_value = int(column_value)
elif column_type == "decimal" and column_value is not None:
column_value = float(column_value)
elif column_type == "bool":
column_value = column_value in ["true", "True"]
# check if column value matches validation regex
if (
column_value is not None and
not re.match(str(column_validation), str(column_value))
):
raise CreatorError(
f"Column '{column_name}' value '{column_value}' "
f"does not match validation regex '{column_validation}' \n"
f"Row data: {row_data} \n"
f"Column data: {column_data}"
)
return column_value
def _pass_data_to_csv_instance(
self, instance_data, staging_dir, filename
):
"""Pass CSV representation file to instance data"""
representation = {
"name": "csv",
"ext": "csv",
"files": filename,
"stagingDir": staging_dir,
"stagingDir_persistent": True,
}
instance_data.update({
"label": f"CSV: {filename}",
"representations": [representation],
"stagingDir": staging_dir,
"stagingDir_persistent": True,
})
def get_instance_attr_defs(self):
return [
BoolDef(
"add_review_family",
default=True,
label="Review"
)
]
def get_pre_create_attr_defs(self):
"""Creating pre-create attributes at creator plugin.
Returns:
list: list of attribute object instances
"""
# Use same attributes as for instance attributes
attr_defs = [
FileDef(
"csv_filepath_data",
folders=False,
extensions=[".csv"],
allow_sequences=False,
single_item=True,
label="CSV File",
),
]
return attr_defs

View file

@ -0,0 +1,47 @@
from pprint import pformat
import pyblish.api
from ayon_core.pipeline import publish
class CollectCSVIngestInstancesData(
pyblish.api.InstancePlugin,
publish.AYONPyblishPluginMixin,
publish.ColormanagedPyblishPluginMixin
):
"""Collect CSV Ingest data from instance.
"""
label = "Collect CSV Ingest instances data"
order = pyblish.api.CollectorOrder + 0.1
hosts = ["traypublisher"]
families = ["csv_ingest"]
def process(self, instance):
# expecting [(colorspace, repre_data), ...]
prepared_repres_data_items = instance.data[
"prepared_data_for_repres"]
for prep_repre_data in prepared_repres_data_items:
type = prep_repre_data["type"]
colorspace = prep_repre_data["colorspace"]
repre_data = prep_repre_data["representation"]
# thumbnails should be skipped
if type == "media":
# colorspace name is passed from CSV column
self.set_representation_colorspace(
repre_data, instance.context, colorspace
)
elif type == "media" and colorspace is None:
# TODO: implement colorspace file rules file parsing
self.log.warning(
"Colorspace is not defined in csv for following"
f" representation: {pformat(repre_data)}"
)
pass
elif type == "thumbnail":
# thumbnails should be skipped
pass
instance.data["representations"].append(repre_data)

View file

@ -0,0 +1,31 @@
import pyblish.api
from ayon_core.pipeline import publish
class ExtractCSVFile(publish.Extractor):
"""
Extractor export CSV file
"""
label = "Extract CSV file"
order = pyblish.api.ExtractorOrder - 0.45
families = ["csv_ingest_file"]
hosts = ["traypublisher"]
def process(self, instance):
csv_file_data = instance.data["csvFileData"]
representation_csv = {
'name': "csv_data",
'ext': "csv",
'files': csv_file_data["filename"],
"stagingDir": csv_file_data["staging_dir"],
"stagingDir_persistent": True
}
instance.data["representations"].append(representation_csv)
self.log.info("Added CSV file representation: {}".format(
representation_csv))

View file

@ -16,6 +16,7 @@ class ValidateExistingVersion(
order = ValidateContentsOrder
hosts = ["traypublisher"]
targets = ["local"]
actions = [RepairAction]

View file

@ -16,6 +16,8 @@ class ValidateFrameRange(OptionalPyblishPluginMixin,
label = "Validate Frame Range"
hosts = ["traypublisher"]
families = ["render", "plate"]
targets = ["local"]
order = ValidateContentsOrder
optional = True

View file

@ -167,7 +167,8 @@ class IntegrateAsset(pyblish.api.InstancePlugin):
"uasset",
"blendScene",
"yeticacheUE",
"tycache"
"tycache",
"csv_ingest_file",
]
default_template_name = "publish"

View file

@ -1,4 +1,7 @@
from pydantic import validator
from ayon_server.settings import BaseSettingsModel, SettingsField
from ayon_server.settings.validators import ensure_unique_names
from ayon_server.exceptions import BadRequestException
class BatchMovieCreatorPlugin(BaseSettingsModel):
@ -22,11 +25,139 @@ class BatchMovieCreatorPlugin(BaseSettingsModel):
)
class ColumnItemModel(BaseSettingsModel):
"""Allows to publish multiple video files in one go. <br />Name of matching
asset is parsed from file names ('asset.mov', 'asset_v001.mov',
'my_asset_to_publish.mov')"""
name: str = SettingsField(
title="Name",
default=""
)
type: str = SettingsField(
title="Type",
default=""
)
default: str = SettingsField(
title="Default",
default=""
)
required_column: bool = SettingsField(
title="Required Column",
default=False
)
validation_pattern: str = SettingsField(
title="Validation Regex Pattern",
default="^(.*)$"
)
class ColumnConfigModel(BaseSettingsModel):
"""Allows to publish multiple video files in one go. <br />Name of matching
asset is parsed from file names ('asset.mov', 'asset_v001.mov',
'my_asset_to_publish.mov')"""
csv_delimiter: str = SettingsField(
title="CSV delimiter",
default=","
)
columns: list[ColumnItemModel] = SettingsField(
title="Columns",
default_factory=list
)
@validator("columns")
def validate_unique_outputs(cls, value):
ensure_unique_names(value)
return value
class RepresentationItemModel(BaseSettingsModel):
"""Allows to publish multiple video files in one go.
Name of matching asset is parsed from file names
('asset.mov', 'asset_v001.mov', 'my_asset_to_publish.mov')
"""
name: str = SettingsField(
title="Name",
default=""
)
extensions: list[str] = SettingsField(
title="Extensions",
default_factory=list
)
@validator("extensions")
def validate_extension(cls, value):
for ext in value:
if not ext.startswith("."):
raise BadRequestException(f"Extension must start with '.': {ext}")
return value
class RepresentationConfigModel(BaseSettingsModel):
"""Allows to publish multiple video files in one go. <br />Name of matching
asset is parsed from file names ('asset.mov', 'asset_v001.mov',
'my_asset_to_publish.mov')"""
tags_delimiter: str = SettingsField(
title="Tags delimiter",
default=";"
)
default_tags: list[str] = SettingsField(
title="Default tags",
default_factory=list
)
representations: list[RepresentationItemModel] = SettingsField(
title="Representations",
default_factory=list
)
@validator("representations")
def validate_unique_outputs(cls, value):
ensure_unique_names(value)
return value
class IngestCSVPluginModel(BaseSettingsModel):
"""Allows to publish multiple video files in one go. <br />Name of matching
asset is parsed from file names ('asset.mov', 'asset_v001.mov',
'my_asset_to_publish.mov')"""
enabled: bool = SettingsField(
title="Enabled",
default=False
)
columns_config: ColumnConfigModel = SettingsField(
title="Columns config",
default_factory=ColumnConfigModel
)
representations_config: RepresentationConfigModel = SettingsField(
title="Representations config",
default_factory=RepresentationConfigModel
)
class TrayPublisherCreatePluginsModel(BaseSettingsModel):
BatchMovieCreator: BatchMovieCreatorPlugin = SettingsField(
title="Batch Movie Creator",
default_factory=BatchMovieCreatorPlugin
)
IngestCSV: IngestCSVPluginModel = SettingsField(
title="Ingest CSV",
default_factory=IngestCSVPluginModel
)
DEFAULT_CREATORS = {
@ -41,4 +172,170 @@ DEFAULT_CREATORS = {
".mov"
]
},
"IngestCSV": {
"enabled": True,
"columns_config": {
"csv_delimiter": ",",
"columns": [
{
"name": "File Path",
"type": "text",
"default": "",
"required_column": True,
"validation_pattern": "^([a-z0-9#._\\/]*)$"
},
{
"name": "Folder Path",
"type": "text",
"default": "",
"required_column": True,
"validation_pattern": "^([a-zA-Z0-9_\\/]*)$"
},
{
"name": "Task Name",
"type": "text",
"default": "",
"required_column": True,
"validation_pattern": "^(.*)$"
},
{
"name": "Product Type",
"type": "text",
"default": "",
"required_column": False,
"validation_pattern": "^(.*)$"
},
{
"name": "Variant",
"type": "text",
"default": "",
"required_column": False,
"validation_pattern": "^(.*)$"
},
{
"name": "Version",
"type": "number",
"default": 1,
"required_column": True,
"validation_pattern": "^(\\d{1,3})$"
},
{
"name": "Version Comment",
"type": "text",
"default": "",
"required_column": False,
"validation_pattern": "^(.*)$"
},
{
"name": "Version Thumbnail",
"type": "text",
"default": "",
"required_column": False,
"validation_pattern": "^([a-zA-Z0-9#._\\/]*)$"
},
{
"name": "Frame Start",
"type": "number",
"default": 0,
"required_column": True,
"validation_pattern": "^(\\d{1,8})$"
},
{
"name": "Frame End",
"type": "number",
"default": 0,
"required_column": True,
"validation_pattern": "^(\\d{1,8})$"
},
{
"name": "Handle Start",
"type": "number",
"default": 0,
"required_column": True,
"validation_pattern": "^(\\d)$"
},
{
"name": "Handle End",
"type": "number",
"default": 0,
"required_column": True,
"validation_pattern": "^(\\d)$"
},
{
"name": "FPS",
"type": "decimal",
"default": 0.0,
"required_column": True,
"validation_pattern": "^[0-9]*\\.[0-9]+$|^[0-9]+$"
},
{
"name": "Slate Exists",
"type": "bool",
"default": True,
"required_column": False,
"validation_pattern": "(True|False)"
},
{
"name": "Representation",
"type": "text",
"default": "",
"required_column": False,
"validation_pattern": "^(.*)$"
},
{
"name": "Representation Colorspace",
"type": "text",
"default": "",
"required_column": False,
"validation_pattern": "^(.*)$"
},
{
"name": "Representation Tags",
"type": "text",
"default": "",
"required_column": False,
"validation_pattern": "^(.*)$"
}
]
},
"representations_config": {
"tags_delimiter": ";",
"default_tags": [
"review"
],
"representations": [
{
"name": "preview",
"extensions": [
".mp4",
".mov"
]
},
{
"name": "exr",
"extensions": [
".exr"
]
},
{
"name": "edit",
"extensions": [
".mov"
]
},
{
"name": "review",
"extensions": [
".mov"
]
},
{
"name": "nuke",
"extensions": [
".nk"
]
}
]
}
}
}