extract js code from plugins

This commit is contained in:
Ondřej Samohel 2020-10-20 16:28:17 +02:00
parent f37eb3e109
commit 3d54b9f754
No known key found for this signature in database
GPG key ID: 8A29C663C672C2B7
16 changed files with 975 additions and 95 deletions

View file

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
"""Pype Harmony Host implementation."""
import os
from pathlib import Path
from avalon import api, io, harmony
import avalon.tools.sceneinventory
@ -130,11 +131,24 @@ def check_inventory():
def application_launch():
"""Event that is executed after Harmony is launched."""
# FIXME: This is breaking server <-> client communication.
# It is now moved so it it manually called.
# ensure_scene_settings()
# check_inventory()
pass
pype_harmony_path = Path(__file__).parent / "js" / "PypeHarmony.js"
pype_harmony_js = pype_harmony_path.read_text()
# go through js/creators, loaders and publish folders and load all scripts
script = ""
for item in ["creators", "loaders", "publish"]:
dir_to_scan = Path(__file__).parent / "js" / item
for child in dir_to_scan.iterdir():
script += child.read_text()
# send scripts to Harmony
harmony.send({"script": pype_harmony_js})
harmony.send({"script": script})
def export_template(backdrops, nodes, filepath):

View file

@ -59,11 +59,31 @@ PypeHarmony.setSceneSettings = function(settings) {
};
/**
* Get scene settings.
* @function
* @return {array} Scene settings.
*/
PypeHarmony.getSceneSettings = function() {
return [
about.getApplicationPath(),
scene.currentProjectPath(),
scene.currentScene(),
scene.getFrameRate(),
scene.getStartFrame(),
scene.getStopFrame(),
sound.getSoundtrackAll().path(),
scene.defaultResolutionX(),
scene.defaultResolutionY()
];
};
/**
* Set color of nodes.
* @function
* @param {array} nodes List of nodes.
* @param {array} rgba array of RGBA components of color.
* @param {array} rgba array of RGBA components of color.
*/
PypeHarmony.setColor = function(nodes, rgba) {
for (var i =0; i <= nodes.length - 1; ++i) {
@ -152,9 +172,26 @@ PypeHarmony.copyFile = function(src, dst) {
/**
* create RGBA color from array.
* @function
* @param {array} rgba array of rgba values.
* @return {ColorRGBA} ColorRGBA Harmony class.
* @param {array} rgba array of rgba values.
* @return {ColorRGBA} ColorRGBA Harmony class.
*/
PypeHarmony.color = function(rgba) {
return new ColorRGBA(rgba[0], rgba[1], rgba[2], rgba[3]);
};
/**
* get all dependencies for given node.
* @function
* @param {string} node node path.
* @return {array} List of dependent nodes.
*/
PypeHarmony.getDependencies = function(node) {
var target_node = node;
var numInput = node.numberOfInputPorts(target_node);
var dependencies = [];
for (var i = 0 ; i < numInput; i++) {
dependencies.push(node.srcNode(target_node, i));
}
return dependencies;
};

View file

@ -0,0 +1,33 @@
/* global PypeHarmony:writable, include */
// ***************************************************************************
// * CreateRender *
// ***************************************************************************
// check if PypeHarmony is defined and if not, load it.
if (typeof PypeHarmony !== 'undefined') {
var PYPE_HARMONY_JS = System.getenv('PYPE_HARMONY_JS');
include(PYPE_HARMONY_JS + '/pype_harmony.js');
}
/**
* @namespace
* @classdesc Code creating render containers in Harmony.
*/
var CreateRender = function() {};
/**
* Create render instance.
* @function
* @param {array} args Arguments for instance.
*/
CreateRender.prototype.create = function(args) {
node.setTextAttr(args[0], 'DRAWING_TYPE', 1, 'PNG4');
node.setTextAttr(args[0], 'DRAWING_NAME', 1, args[1]);
node.setTextAttr(args[0], 'MOVIE_PATH', 1, args[1]);
};
// add self to Pype Loaders
PypeHarmony.Creators.CreateRender = new CreateRender();

View file

@ -85,12 +85,11 @@ TemplateLoader.prototype.loadContainer = function(args) {
* @param {string} srcNodePath Harmony path to source Node.
* @param {string} renameSrc ...
* @param {boolean} cloneSrc ...
* @param {array} linkColumns ...
* @return {boolean} Success
* @todo This is work in progress.
*/
TemplateLoader.prototype.replaceNode = function(
dstNodePath, srcNodePath, renameSrc, cloneSrc, linkColumns) {
dstNodePath, srcNodePath, renameSrc, cloneSrc) {
var doc = $.scn;
var srcNode = doc.$node(srcNodePath);
var dstNode = doc.$node(dstNodePath);

View file

@ -0,0 +1,38 @@
/* global PypeHarmony:writable, include */
// ***************************************************************************
// * ExtractPalette *
// ***************************************************************************
// check if PypeHarmony is defined and if not, load it.
if (typeof PypeHarmony !== 'undefined') {
var PYPE_HARMONY_JS = System.getenv('PYPE_HARMONY_JS');
include(PYPE_HARMONY_JS + '/pype_harmony.js');
}
/**
* @namespace
* @classdesc Code for extracting palettes.
*/
var ExtractPalette = function() {};
/**
* Get palette from Harmony.
* @function
* @param {string} paletteId ID of palette to get.
* @return {array} [paletteName, palettePath]
*/
ExtractPalette.prototype.getPalette = function(paletteId) {
var palette_list = PaletteObjectManager.getScenePaletteList();
var palette = palette_list.getPaletteById(paletteId);
var palette_name = palette.getName();
return [
palette_name,
(palette.getPath() + '/' + palette.getName() + '.plt')
];
};
// add self to Pype Loaders
PypeHarmony.Publish.ExtractPalette = new ExtractPalette();

View file

@ -0,0 +1,54 @@
/* global PypeHarmony:writable, include */
// ***************************************************************************
// * ExtractTemplate *
// ***************************************************************************
// check if PypeHarmony is defined and if not, load it.
if (typeof PypeHarmony !== 'undefined') {
var PYPE_HARMONY_JS = System.getenv('PYPE_HARMONY_JS');
include(PYPE_HARMONY_JS + '/pype_harmony.js');
}
/**
* @namespace
* @classdesc Code for extracting palettes.
*/
var ExtractTemplate = function() {};
/**
* Get backdrops for given node.
* @function
* @param {string} probeNode Node path to probe for backdrops.
* @return {array} list of backdrops.
*/
ExtractTemplate.prototype.getBackdropsByNode = function(probeNode) {
var backdrops = Backdrop.backdrops('Top');
var valid_backdrops = [];
for(var i=0; i<backdrops.length; i++)
{
var position = backdrops[i].position;
var x_valid = false;
var node_x = node.coordX(probeNode);
if (position.x < node_x && node_x < (position.x + position.w)){
x_valid = true;
}
var y_valid = false;
var node_y = node.coordY(probeNode);
if (position.y < node_y && node_y < (position.y + position.h)){
y_valid = true;
}
if (x_valid && y_valid){
valid_backdrops.push(backdrops[i]);
}
}
return valid_backdrops;
};
// add self to Pype Loaders
PypeHarmony.Publish.ExtractTemplate = new ExtractTemplate();

View file

@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
"""Create render node."""
from avalon import harmony
@ -10,17 +12,15 @@ class CreateRender(harmony.Creator):
node_type = "WRITE"
def __init__(self, *args, **kwargs):
"""Constructor."""
super(CreateRender, self).__init__(*args, **kwargs)
def setup_node(self, node):
sig = harmony.signature()
func = """function %s(args)
{
node.setTextAttr(args[0], "DRAWING_TYPE", 1, "PNG4");
node.setTextAttr(args[0], "DRAWING_NAME", 1, args[1]);
node.setTextAttr(args[0], "MOVIE_PATH", 1, args[1]);
}
%s
""" % (sig, sig)
"""Set render node."""
self_name = self.__class__.__name__
path = "{0}/{0}".format(node.split("/")[-1])
harmony.send({"function": func, "args": [node, path]})
harmony.send(
{
"function": f"PypeHarmony.Creators.{self_name}.create",
"args": [node, path]
})

View file

@ -28,7 +28,7 @@ class CollectPalettes(pyblish.api.ContextPlugin):
"id": id,
"family": "harmony.palette",
"asset": os.environ["AVALON_ASSET"],
"subset": "palette" + name
"subset": "{}{}".format("palette", name)
})
self.log.info(
"Created instance:\n" + json.dumps(

View file

@ -14,26 +14,11 @@ class CollectScene(pyblish.api.ContextPlugin):
hosts = ["harmony"]
def process(self, context):
sig = harmony.signature()
func = """function %s()
{
return [
about.getApplicationPath(),
scene.currentProjectPath(),
scene.currentScene(),
scene.getFrameRate(),
scene.getStartFrame(),
scene.getStopFrame(),
sound.getSoundtrackAll().path(),
scene.defaultResolutionX(),
scene.defaultResolutionY()
]
}
%s
""" % (sig, sig)
"""Plugin entry point."""
result = harmony.send(
{"function": func, "args": []}
{
f"function": "PypeHarmony.getSceneSettings",
"args": []}
)["result"]
context.data["applicationPath"] = result[0]

View file

@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
"""Collect current workfile from Harmony."""
import pyblish.api
import os
@ -10,6 +12,7 @@ class CollectWorkfile(pyblish.api.ContextPlugin):
hosts = ["harmony"]
def process(self, context):
"""Plugin entry point."""
family = "workfile"
task = os.getenv("AVALON_TASK", None)
sanitized_task_name = task[0].upper() + task[1:]

View file

@ -1,4 +1,9 @@
# -*- coding: utf-8 -*-
"""Extract palette from Harmony."""
import os
import csv
from PIL import Image, ImageDraw, ImageFont
from avalon import harmony
import pype.api
@ -13,18 +18,34 @@ class ExtractPalette(pype.api.Extractor):
families = ["harmony.palette"]
def process(self, instance):
sig = harmony.signature()
func = """function %s(args)
{
var palette_list = PaletteObjectManager.getScenePaletteList();
var palette = palette_list.getPaletteById(args[0]);
return (palette.getPath() + "/" + palette.getName() + ".plt");
"""Plugin entry point."""
self_name = self.__class__.__name__
result = harmony.send(
{
"function": f"PypeHarmony.Publish.{self_name}",
"args": instance.data["id"]
})["result"]
palette_name = result[0]
palette_file = result[1]
tmp_thumb_path = os.path.join(os.path.dirname(palette_file),
os.path.basename(palette_file)
.split(".plt")[0] + "_swatches.png"
)
palette_version = str(instance.data.get("version")).zfill(3)
thumbnail_path = self.create_palette_thumbnail(palette_name,
palette_version,
palette_file,
tmp_thumb_path)
thumbnail = {
"name": "thumbnail",
"ext": "png",
"files": os.path.basename(thumbnail_path),
"stagingDir": os.path.dirname(thumbnail_path),
"tags": ["thumbnail"]
}
%s
""" % (sig, sig)
palette_file = harmony.send(
{"function": func, "args": [instance.data["id"]]}
)["result"]
representation = {
"name": "plt",
@ -32,4 +53,128 @@ class ExtractPalette(pype.api.Extractor):
"files": os.path.basename(palette_file),
"stagingDir": os.path.dirname(palette_file)
}
instance.data["representations"] = [representation]
instance.data["representations"] = [representation, thumbnail]
def create_palette_thumbnail(self,
palette_name,
palette_version,
palette_path,
dst_path):
"""Create thumbnail for palette file.
Args:
palette_name (str): Name of palette.
palette_version (str): Version of palette.
palette_path (str): Path to palette file.
dst_path (str): Thumbnail path.
Returns:
str: Thumbnail path.
"""
colors = {}
with open(palette_path, newline='') as plt:
plt_parser = csv.reader(plt, delimiter=" ")
for i, line in enumerate(plt_parser):
if i == 0:
continue
while ("" in line):
line.remove("")
print(line)
color_name = line[1].strip('"')
colors[color_name] = {"type": line[0],
"uuid": line[2],
"rgba": (int(line[3]),
int(line[4]),
int(line[5]),
int(line[6])),
}
plt.close()
img_pad_top = 80
label_pad_name = 30
label_pad_rgb = 580
swatch_pad_left = 300
swatch_pad_top = 10
swatch_w = 120
swatch_h = 50
image_w = 800
image_h = (img_pad_top +
(len(colors.keys()) *
swatch_h) +
(swatch_pad_top *
len(colors.keys()))
)
img = Image.new("RGBA", (image_w, image_h), "white")
# For bg of colors with alpha, create checkerboard image
checkers = Image.new("RGB", (swatch_w, swatch_h))
pixels = checkers.load()
# Make pixels white where (row+col) is odd
for i in range(swatch_w):
for j in range(swatch_h):
if (i + j) % 2:
pixels[i, j] = (255, 255, 255)
draw = ImageDraw.Draw(img)
# TODO: This needs to be font included with Pype because
# arial is not available on other platforms then Windows.
title_font = ImageFont.truetype("arial.ttf", 28)
label_font = ImageFont.truetype("arial.ttf", 20)
draw.text((label_pad_name, 20),
"{} (v{})".format(palette_name, palette_version),
"black",
font=title_font)
for i, name in enumerate(colors):
rgba = colors[name]["rgba"]
# @TODO: Fix this so alpha colors are displayed with checkboard
# if not rgba[3] == "255":
# img.paste(checkers,
# (swatch_pad_left,
# img_pad_top + swatch_pad_top + (i * swatch_h))
# )
#
# half_y = (img_pad_top + swatch_pad_top + (i * swatch_h))/2
#
# draw.rectangle((
# swatch_pad_left, # upper LX
# img_pad_top + swatch_pad_top + (i * swatch_h), # upper LY
# swatch_pad_left + (swatch_w * 2), # lower RX
# half_y), # lower RY
# fill=rgba[:-1], outline=(0, 0, 0), width=2)
# draw.rectangle((
# swatch_pad_left, # upper LX
# half_y, # upper LY
# swatch_pad_left + (swatch_w * 2), # lower RX
# img_pad_top + swatch_h + (i * swatch_h)), # lower RY
# fill=rgba, outline=(0, 0, 0), width=2)
# else:
draw.rectangle((
swatch_pad_left, # upper left x
img_pad_top + swatch_pad_top + (i * swatch_h), # upper left y
swatch_pad_left + (swatch_w * 2), # lower right x
img_pad_top + swatch_h + (i * swatch_h)), # lower right y
fill=rgba, outline=(0, 0, 0), width=2)
draw.text((label_pad_name, img_pad_top + (i * swatch_h) + swatch_pad_top + (swatch_h / 4)), # noqa: E501
name,
"black",
font=label_font)
draw.text((label_pad_rgb, img_pad_top + (i * swatch_h) + swatch_pad_top + (swatch_h / 4)), # noqa: E501
str(rgba),
"black",
font=label_font)
draw = ImageDraw.Draw(img)
img.save(dst_path)
return dst_path

View file

@ -1,3 +1,5 @@
# -*- coding: utf-8 -*-
"""Extract template."""
import os
import shutil
@ -14,6 +16,7 @@ class ExtractTemplate(pype.api.Extractor):
families = ["harmony.template"]
def process(self, instance):
"""Plugin entry point."""
staging_dir = self.staging_dir(instance)
filepath = os.path.join(staging_dir, f"{instance.name}.tpl")
@ -61,60 +64,49 @@ class ExtractTemplate(pype.api.Extractor):
"files": f"{instance.name}.zip",
"stagingDir": staging_dir
}
instance.data["representations"] = [representation]
def get_backdrops(self, node):
sig = harmony.signature()
func = """function %s(probe_node)
{
var backdrops = Backdrop.backdrops("Top");
var valid_backdrops = [];
for(var i=0; i<backdrops.length; i++)
{
var position = backdrops[i].position;
self.log.info(instance.data.get("representations"))
if instance.data.get("representations"):
instance.data["representations"].extend([representation])
else:
instance.data["representations"] = [representation]
var x_valid = false;
var node_x = node.coordX(probe_node);
if (position.x < node_x && node_x < (position.x + position.w)){
x_valid = true
};
instance.data["version_name"] = "{}_{}".format(
instance.data["subset"], os.environ["AVALON_TASK"])
var y_valid = false;
var node_y = node.coordY(probe_node);
if (position.y < node_y && node_y < (position.y + position.h)){
y_valid = true
};
def get_backdrops(self, node: str) -> list:
"""Get backdrops for the node.
if (x_valid && y_valid){
valid_backdrops.push(backdrops[i])
};
}
return valid_backdrops;
}
%s
""" % (sig, sig)
return harmony.send(
{"function": func, "args": [node]}
)["result"]
Args:
node (str): Node path.
def get_dependencies(self, node, dependencies):
sig = harmony.signature()
func = """function %s(args)
{
var target_node = args[0];
var numInput = node.numberOfInputPorts(target_node);
var dependencies = [];
for (var i = 0 ; i < numInput; i++)
{
dependencies.push(node.srcNode(target_node, i));
}
return dependencies;
}
%s
""" % (sig, sig)
Returns:
list: list of Backdrops.
"""
self_name = self.__class__.__name__
return harmony.send({
"function": f"PypeHarmony.Publish.{self_name}.getBackdropsByNode",
"args": node})["result"]
def get_dependencies(
self, node: str, dependencies: list = None) -> list:
"""Get node dependencies.
This will return recursive dependency list of given node.
Args:
node (str): Path to the node.
dependencies (list, optional): existing dependency list.
Returns:
list: List of dependent nodes.
"""
current_dependencies = harmony.send(
{"function": func, "args": [node]}
{
"function": "PypeHarmony.getDependencies",
"args": node}
)["result"]
for dependency in current_dependencies:

View file

@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
"""Collect Harmony scenes in Standalone Publisher."""
import copy
import glob
import os
from pprint import pformat
import pyblish.api
class CollectHarmonyScenes(pyblish.api.InstancePlugin):
"""Collect Harmony xstage files."""
order = pyblish.api.CollectorOrder + 0.498
label = "Collect Harmony Scene"
hosts = ["standalonepublisher"]
families = ["harmony.scene"]
# presets
ignored_instance_data_keys = ("name", "label", "stagingDir", "version")
def process(self, instance):
"""Plugin entry point."""
context = instance.context
asset_data = instance.context.data["assetEntity"]
asset_name = instance.data["asset"]
subset_name = instance.data.get("subset", "sceneMain")
anatomy_data = instance.context.data["anatomyData"]
repres = instance.data["representations"]
staging_dir = repres[0]["stagingDir"]
files = repres[0]["files"]
if not files.endswith(".zip"):
# A harmony project folder / .xstage was dropped
instance_name = f"{asset_name}_{subset_name}"
task = instance.data.get("task", "harmonyIngest")
# create new instance
new_instance = context.create_instance(instance_name)
# add original instance data except name key
for key, value in instance.data.items():
# Make sure value is copy since value may be object which
# can be shared across all new created objects
if key not in self.ignored_instance_data_keys:
new_instance.data[key] = copy.deepcopy(value)
self.log.info("Copied data: {}".format(new_instance.data))
# fix anatomy data
anatomy_data_new = copy.deepcopy(anatomy_data)
# updating hierarchy data
anatomy_data_new.update({
"asset": asset_data["name"],
"task": task,
"subset": subset_name
})
new_instance.data["label"] = f"{instance_name}"
new_instance.data["subset"] = subset_name
new_instance.data["extension"] = ".zip"
new_instance.data["anatomyData"] = anatomy_data_new
new_instance.data["publish"] = True
# When a project folder was dropped vs. just an xstage file, find
# the latest file xstage version and update the instance
if not files.endswith(".xstage"):
source_dir = os.path.join(
staging_dir, files
).replace("\\", "/")
latest_file = max(glob.iglob(source_dir + "/*.xstage"),
key=os.path.getctime).replace("\\", "/")
new_instance.data["representations"][0]["stagingDir"] = (
source_dir
)
new_instance.data["representations"][0]["files"] = (
os.path.basename(latest_file)
)
self.log.info(f"Created new instance: {instance_name}")
self.log.debug(f"_ inst_data: {pformat(new_instance.data)}")
# set original instance for removal
self.log.info("Context data: {}".format(context.data))
instance.data["remove"] = True

View file

@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-
"""Collect zips as Harmony scene files."""
import copy
from pprint import pformat
import pyblish.api
class CollectHarmonyZips(pyblish.api.InstancePlugin):
"""Collect Harmony zipped projects."""
order = pyblish.api.CollectorOrder + 0.497
label = "Collect Harmony Zipped Projects"
hosts = ["standalonepublisher"]
families = ["harmony.scene"]
extensions = ["zip"]
# presets
ignored_instance_data_keys = ("name", "label", "stagingDir", "version")
def process(self, instance):
"""Plugin entry point."""
context = instance.context
asset_data = instance.context.data["assetEntity"]
asset_name = instance.data["asset"]
subset_name = instance.data.get("subset", "sceneMain")
anatomy_data = instance.context.data["anatomyData"]
repres = instance.data["representations"]
files = repres[0]["files"]
if files.endswith(".zip"):
# A zip file was dropped
instance_name = f"{asset_name}_{subset_name}"
task = instance.data.get("task", "harmonyIngest")
# create new instance
new_instance = context.create_instance(instance_name)
# add original instance data except name key
for key, value in instance.data.items():
# Make sure value is copy since value may be object which
# can be shared across all new created objects
if key not in self.ignored_instance_data_keys:
new_instance.data[key] = copy.deepcopy(value)
self.log.info("Copied data: {}".format(new_instance.data))
# fix anatomy data
anatomy_data_new = copy.deepcopy(anatomy_data)
# updating hierarchy data
anatomy_data_new.update({
"asset": asset_data["name"],
"task": task,
"subset": subset_name
})
new_instance.data["label"] = f"{instance_name}"
new_instance.data["subset"] = subset_name
new_instance.data["extension"] = ".zip"
new_instance.data["anatomyData"] = anatomy_data_new
new_instance.data["publish"] = True
self.log.info(f"Created new instance: {instance_name}")
self.log.debug(f"_ inst_data: {pformat(new_instance.data)}")
# set original instance for removal
self.log.info("Context data: {}".format(context.data))
instance.data["remove"] = True

View file

@ -0,0 +1,21 @@
# -*- coding: utf-8 -*-
"""Collect instances that are marked for removal and remove them."""
import pyblish.api
class CollectRemoveMarked(pyblish.api.ContextPlugin):
"""Clean up instances marked for removal.
Note:
This is a workaround for race conditions and removing of instances
used to generate other instances.
"""
order = pyblish.api.CollectorOrder + 0.499
label = 'Remove Marked Instances'
def process(self, context):
"""Plugin entry point."""
for instance in context:
if instance.data.get('remove'):
context.remove(instance)

View file

@ -0,0 +1,404 @@
# -*- coding: utf-8 -*-
"""Extract Harmony scene from zip file."""
import glob
import os
import shutil
import six
import sys
import tempfile
import zipfile
import pyblish.api
from avalon import api, io
import pype.api
class ExtractHarmonyZip(pype.api.Extractor):
"""Extract Harmony zip."""
# Pyblish settings
label = "Extract Harmony zip"
order = pyblish.api.ExtractorOrder + 0.02
hosts = ["standalonepublisher"]
families = ["scene"]
# Properties
session = None
task_types = None
task_statuses = None
assetversion_statuses = None
# Presets
create_workfile = True
default_task = "harmonyIngest"
default_task_type = "Ingest"
default_task_status = "Ingested"
assetversion_status = "Ingested"
def process(self, instance):
"""Plugin entry point."""
context = instance.context
self.session = context.data["ftrackSession"]
asset_doc = context.data["assetEntity"]
# asset_name = instance.data["asset"]
subset_name = instance.data["subset"]
instance_name = instance.data["name"]
family = instance.data["family"]
task = context.data["anatomyData"]["task"] or self.default_task
project_entity = instance.context.data["projectEntity"]
ftrack_id = asset_doc["data"]["ftrackId"]
repres = instance.data["representations"]
submitted_staging_dir = repres[0]["stagingDir"]
submitted_files = repres[0]["files"]
# Get all the ftrack entities needed
# Asset Entity
query = 'AssetBuild where id is "{}"'.format(ftrack_id)
asset_entity = self.session.query(query).first()
# Project Entity
query = 'Project where full_name is "{}"'.format(
project_entity["name"]
)
project_entity = self.session.query(query).one()
# Get Task types and Statuses for creation if needed
self.task_types = self._get_all_task_types(project_entity)
self.task_statuses = self.get_all_task_statuses(project_entity)
# Get Statuses of AssetVersions
self.assetversion_statuses = self.get_all_assetversion_statuses(
project_entity
)
# Setup the status that we want for the AssetVersion
if self.assetversion_status:
instance.data["assetversion_status"] = self.assetversion_status
# Create the default_task if it does not exist
if task == self.default_task:
existing_tasks = []
entity_children = asset_entity.get('children', [])
for child in entity_children:
if child.entity_type.lower() == 'task':
existing_tasks.append(child['name'].lower())
if task.lower() in existing_tasks:
print("Task {} already exists".format(task))
else:
self.create_task(
name=task,
task_type=self.default_task_type,
task_status=self.default_task_status,
parent=asset_entity,
)
# Find latest version
latest_version = self._find_last_version(subset_name, asset_doc)
version_number = 1
if latest_version is not None:
version_number += latest_version
self.log.info(
"Next version of instance \"{}\" will be {}".format(
instance_name, version_number
)
)
# update instance info
instance.data["task"] = task
instance.data["version_name"] = "{}_{}".format(subset_name, task)
instance.data["family"] = family
instance.data["subset"] = subset_name
instance.data["version"] = version_number
instance.data["latestVersion"] = latest_version
instance.data["anatomyData"].update({
"subset": subset_name,
"family": family,
"version": version_number
})
# Copy `families` and check if `family` is not in current families
families = instance.data.get("families") or list()
if families:
families = list(set(families))
instance.data["families"] = families
# Prepare staging dir for new instance and zip + sanitize scene name
staging_dir = tempfile.mkdtemp(prefix="pyblish_tmp_")
# Handle if the representation is a .zip and not an .xstage
pre_staged = False
if submitted_files.endswith(".zip"):
submitted_zip_file = os.path.join(submitted_staging_dir,
submitted_files
).replace("\\", "/")
pre_staged = self.sanitize_prezipped_project(instance,
submitted_zip_file,
staging_dir)
# Get the file to work with
source_dir = str(repres[0]["stagingDir"])
source_file = str(repres[0]["files"])
staging_scene_dir = os.path.join(staging_dir, "scene")
staging_scene = os.path.join(staging_scene_dir, source_file)
# If the file is an .xstage / directory, we must stage it
if not pre_staged:
shutil.copytree(source_dir, staging_scene_dir)
# Rename this latest file as 'scene.xstage'
# This is is determined in the collector from the latest scene in a
# submitted directory / directory the submitted .xstage is in.
# In the case of a zip file being submitted, this is determined within
# the self.sanitize_project() method in this extractor.
os.rename(staging_scene,
os.path.join(staging_scene_dir, "scene.xstage")
)
# Required to set the current directory where the zip will end up
os.chdir(staging_dir)
# Create the zip file
zip_filepath = shutil.make_archive(os.path.basename(source_dir),
"zip",
staging_scene_dir
)
zip_filename = os.path.basename(zip_filepath)
self.log.info("Zip file: {}".format(zip_filepath))
# Setup representation
new_repre = {
"name": "zip",
"ext": "zip",
"files": zip_filename,
"stagingDir": staging_dir
}
self.log.debug(
"Creating new representation: {}".format(new_repre)
)
instance.data["representations"] = [new_repre]
self.log.debug("Completed prep of zipped Harmony scene: {}"
.format(zip_filepath)
)
# If this extractor is setup to also extract a workfile...
if self.create_workfile:
workfile_path = self.extract_workfile(instance,
staging_scene
)
self.log.debug("Extracted Workfile to: {}".format(workfile_path))
def extract_workfile(self, instance, staging_scene):
"""Extract a valid workfile for this corresponding publish.
Args:
instance (:class:`pyblish.api.Instance`): Instance data.
staging_scene (str): path of staging scene.
Returns:
str: Path to workdir.
"""
# Since the staging scene was renamed to "scene.xstage" for publish
# rename the staging scene in the temp stagingdir
staging_scene = os.path.join(os.path.dirname(staging_scene),
"scene.xstage")
# Setup the data needed to form a valid work path filename
anatomy = pype.api.Anatomy()
project_entity = instance.context.data["projectEntity"]
data = {
"root": api.registered_root(),
"project": {
"name": project_entity["name"],
"code": project_entity["data"].get("code", '')
},
"asset": instance.data["asset"],
"hierarchy": pype.api.get_hierarchy(instance.data["asset"]),
"family": instance.data["family"],
"task": instance.data.get("task"),
"subset": instance.data["subset"],
"version": 1,
"ext": "zip",
}
# Get a valid work filename first with version 1
file_template = anatomy.templates["work"]["file"]
anatomy_filled = anatomy.format(data)
work_path = anatomy_filled["work"]["path"]
# Get the final work filename with the proper version
data["version"] = api.last_workfile_with_version(
os.path.dirname(work_path), file_template, data, [".zip"]
)[1]
work_path = anatomy_filled["work"]["path"]
base_name = os.path.splitext(os.path.basename(work_path))[0]
staging_work_path = os.path.join(os.path.dirname(staging_scene),
base_name + ".xstage"
)
# Rename this latest file after the workfile path filename
os.rename(staging_scene, staging_work_path)
# Required to set the current directory where the zip will end up
os.chdir(os.path.dirname(os.path.dirname(staging_scene)))
# Create the zip file
zip_filepath = shutil.make_archive(base_name,
"zip",
os.path.dirname(staging_scene)
)
self.log.info(staging_scene)
self.log.info(work_path)
self.log.info(staging_work_path)
self.log.info(os.path.dirname(os.path.dirname(staging_scene)))
self.log.info(base_name)
self.log.info(zip_filepath)
# Create the work path on disk if it does not exist
os.makedirs(os.path.dirname(work_path), exist_ok=True)
shutil.copy(zip_filepath, work_path)
return work_path
def sanitize_prezipped_project(
self, instance, zip_filepath, staging_dir):
"""Fix when a zip contains a folder.
Handle zip file root contains folder instead of the project.
Args:
instance (:class:`pyblish.api.Instance`): Instance data.
zip_filepath (str): Path to zip.
staging_dir (str): Path to staging directory.
"""
zip = zipfile.ZipFile(zip_filepath)
zip_contents = zipfile.ZipFile.namelist(zip)
# Determine if any xstage file is in root of zip
project_in_root = [pth for pth in zip_contents
if "/" not in pth and pth.endswith(".xstage")]
staging_scene_dir = os.path.join(staging_dir, "scene")
# The project is nested, so we must extract and move it
if not project_in_root:
staging_tmp_dir = os.path.join(staging_dir, "tmp")
with zipfile.ZipFile(zip_filepath, "r") as zip_ref:
zip_ref.extractall(staging_tmp_dir)
nested_project_folder = os.path.join(staging_tmp_dir,
zip_contents[0]
)
shutil.copytree(nested_project_folder, staging_scene_dir)
else:
# The project is not nested, so we just extract to scene folder
with zipfile.ZipFile(zip_filepath, "r") as zip_ref:
zip_ref.extractall(staging_scene_dir)
latest_file = max(glob.iglob(staging_scene_dir + "/*.xstage"),
key=os.path.getctime).replace("\\", "/")
instance.data["representations"][0]["stagingDir"] = staging_scene_dir
instance.data["representations"][0]["files"] = os.path.basename(
latest_file)
# We have staged the scene already so return True
return True
def _find_last_version(self, subset_name, asset_doc):
"""Find last version of subset."""
subset_doc = io.find_one({
"type": "subset",
"name": subset_name,
"parent": asset_doc["_id"]
})
if subset_doc is None:
self.log.debug("Subset entity does not exist yet.")
else:
version_doc = io.find_one(
{
"type": "version",
"parent": subset_doc["_id"]
},
sort=[("name", -1)]
)
if version_doc:
return int(version_doc["name"])
return None
def _get_all_task_types(self, project):
"""Get all task types."""
tasks = {}
proj_template = project['project_schema']
temp_task_types = proj_template['_task_type_schema']['types']
for type in temp_task_types:
if type['name'] not in tasks:
tasks[type['name']] = type
return tasks
def _get_all_task_statuses(self, project):
"""Get all statuses of tasks."""
statuses = {}
proj_template = project['project_schema']
temp_task_statuses = proj_template.get_statuses("Task")
for status in temp_task_statuses:
if status['name'] not in statuses:
statuses[status['name']] = status
return statuses
def _get_all_assetversion_statuses(self, project):
"""Get statuses of all asset versions."""
statuses = {}
proj_template = project['project_schema']
temp_task_statuses = proj_template.get_statuses("AssetVersion")
for status in temp_task_statuses:
if status['name'] not in statuses:
statuses[status['name']] = status
return statuses
def _create_task(self, name, task_type, parent, task_status):
"""Create task."""
task_data = {
'name': name,
'parent': parent,
}
self.log.info(task_type)
task_data['type'] = self.task_types[task_type]
task_data['status'] = self.task_statuses[task_status]
self.log.info(task_data)
task = self.session.create('Task', task_data)
try:
self.session.commit()
except Exception:
tp, value, tb = sys.exc_info()
self.session.rollback()
six.reraise(tp, value, tb)
return task