Merge pull request #2149 from pypeclub/OP-1206_Add-loader-for-linked-smart-objects-in-Photoshop

Add loader for linked smart objects in photoshop
This commit is contained in:
Petr Kalis 2021-10-25 18:33:57 +02:00 committed by GitHub
commit e25265db7b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 726 additions and 117 deletions

View file

@ -158,6 +158,25 @@ def publish(debug, paths, targets):
PypeCommands.publish(list(paths), targets)
@main.command()
@click.argument("path")
@click.option("-d", "--debug", is_flag=True, help="Print debug messages")
@click.option("-h", "--host", help="Host")
@click.option("-u", "--user", help="User email address")
@click.option("-p", "--project", help="Project")
@click.option("-t", "--targets", help="Targets", default=None,
multiple=True)
def remotepublishfromapp(debug, project, path, host, targets=None, user=None):
"""Start CLI publishing.
Publish collects json from paths provided as an argument.
More than one path is allowed.
"""
if debug:
os.environ['OPENPYPE_DEBUG'] = '3'
PypeCommands.remotepublishfromapp(project, path, host, user,
targets=targets)
@main.command()
@click.argument("path")
@click.option("-d", "--debug", is_flag=True, help="Print debug messages")

View file

@ -13,7 +13,7 @@ class LaunchFoundryAppsWindows(PreLaunchHook):
# Should be as last hook because must change launch arguments to string
order = 1000
app_groups = ["nuke", "nukex", "hiero", "nukestudio"]
app_groups = ["nuke", "nukex", "hiero", "nukestudio", "photoshop"]
platforms = ["windows"]
def execute(self):

View file

@ -6,7 +6,6 @@ from openpype.hosts.photoshop.plugins.lib import get_unique_layer_name
stub = photoshop.stub()
class ImageLoader(api.Loader):
"""Load images
@ -21,7 +20,7 @@ class ImageLoader(api.Loader):
context["asset"]["name"],
name)
with photoshop.maintained_selection():
layer = stub.import_smart_object(self.fname, layer_name)
layer = self.import_layer(self.fname, layer_name)
self[:] = [layer]
namespace = namespace or layer_name
@ -45,8 +44,9 @@ class ImageLoader(api.Loader):
layer_name = "{}_{}".format(context["asset"], context["subset"])
# switching assets
if namespace_from_container != layer_name:
layer_name = self._get_unique_layer_name(context["asset"],
context["subset"])
layer_name = get_unique_layer_name(stub.get_layers(),
context["asset"],
context["subset"])
else: # switching version - keep same name
layer_name = container["namespace"]
@ -72,3 +72,6 @@ class ImageLoader(api.Loader):
def switch(self, container, representation):
self.update(container, representation)
def import_layer(self, file_name, layer_name):
return stub.import_smart_object(file_name, layer_name)

View file

@ -0,0 +1,82 @@
import re
from avalon import api, photoshop
from openpype.hosts.photoshop.plugins.lib import get_unique_layer_name
stub = photoshop.stub()
class ReferenceLoader(api.Loader):
"""Load reference images
Stores the imported asset in a container named after the asset.
Inheriting from 'load_image' didn't work because of
"Cannot write to closing transport", possible refactor.
"""
families = ["image", "render"]
representations = ["*"]
def load(self, context, name=None, namespace=None, data=None):
layer_name = get_unique_layer_name(stub.get_layers(),
context["asset"]["name"],
name)
with photoshop.maintained_selection():
layer = self.import_layer(self.fname, layer_name)
self[:] = [layer]
namespace = namespace or layer_name
return photoshop.containerise(
name,
namespace,
layer,
context,
self.__class__.__name__
)
def update(self, container, representation):
""" Switch asset or change version """
layer = container.pop("layer")
context = representation.get("context", {})
namespace_from_container = re.sub(r'_\d{3}$', '',
container["namespace"])
layer_name = "{}_{}".format(context["asset"], context["subset"])
# switching assets
if namespace_from_container != layer_name:
layer_name = get_unique_layer_name(stub.get_layers(),
context["asset"],
context["subset"])
else: # switching version - keep same name
layer_name = container["namespace"]
path = api.get_representation_path(representation)
with photoshop.maintained_selection():
stub.replace_smart_object(
layer, path, layer_name
)
stub.imprint(
layer, {"representation": str(representation["_id"])}
)
def remove(self, container):
"""
Removes element from scene: deletes layer + removes from Headline
Args:
container (dict): container to be removed - used to get layer_id
"""
layer = container.pop("layer")
stub.imprint(layer, {})
stub.delete_layer(layer.id)
def switch(self, container, representation):
self.update(container, representation)
def import_layer(self, file_name, layer_name):
return stub.import_smart_object(file_name, layer_name,
as_reference=True)

View file

@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
"""Close PS after publish. For Webpublishing only."""
import os
import pyblish.api
from avalon import photoshop
class ClosePS(pyblish.api.ContextPlugin):
"""Close PS after publish. For Webpublishing only.
"""
order = pyblish.api.IntegratorOrder + 14
label = "Close PS"
optional = True
active = True
hosts = ["photoshop"]
def process(self, context):
self.log.info("ClosePS")
if not os.environ.get("IS_HEADLESS"):
return
stub = photoshop.stub()
self.log.info("Shutting down PS")
stub.save()
stub.close()
self.log.info("PS closed")

View file

@ -0,0 +1,136 @@
import pyblish.api
import os
import re
from avalon import photoshop
from openpype.lib import prepare_template_data
from openpype.lib.plugin_tools import parse_json
class CollectRemoteInstances(pyblish.api.ContextPlugin):
"""Gather instances configured color code of a layer.
Used in remote publishing when artists marks publishable layers by color-
coding.
Identifier:
id (str): "pyblish.avalon.instance"
"""
order = pyblish.api.CollectorOrder + 0.100
label = "Instances"
order = pyblish.api.CollectorOrder
hosts = ["photoshop"]
# configurable by Settings
color_code_mapping = []
def process(self, context):
self.log.info("CollectRemoteInstances")
self.log.info("mapping:: {}".format(self.color_code_mapping))
if not os.environ.get("IS_HEADLESS"):
self.log.debug("Not headless publishing, skipping.")
return
# parse variant if used in webpublishing, comes from webpublisher batch
batch_dir = os.environ.get("OPENPYPE_PUBLISH_DATA")
variant = "Main"
if batch_dir and os.path.exists(batch_dir):
# TODO check if batch manifest is same as tasks manifests
task_data = parse_json(os.path.join(batch_dir,
"manifest.json"))
if not task_data:
raise ValueError(
"Cannot parse batch meta in {} folder".format(batch_dir))
variant = task_data["variant"]
stub = photoshop.stub()
layers = stub.get_layers()
instance_names = []
for layer in layers:
self.log.info("Layer:: {}".format(layer))
resolved_family, resolved_subset_template = self._resolve_mapping(
layer
)
self.log.info("resolved_family {}".format(resolved_family))
self.log.info("resolved_subset_template {}".format(
resolved_subset_template))
if not resolved_subset_template or not resolved_family:
self.log.debug("!!! Not marked, skip")
continue
if layer.parents:
self.log.debug("!!! Not a top layer, skip")
continue
instance = context.create_instance(layer.name)
instance.append(layer)
instance.data["family"] = resolved_family
instance.data["publish"] = layer.visible
instance.data["asset"] = context.data["assetEntity"]["name"]
instance.data["task"] = context.data["taskType"]
fill_pairs = {
"variant": variant,
"family": instance.data["family"],
"task": instance.data["task"],
"layer": layer.name
}
subset = resolved_subset_template.format(
**prepare_template_data(fill_pairs))
instance.data["subset"] = subset
instance_names.append(layer.name)
# Produce diagnostic message for any graphical
# user interface interested in visualising it.
self.log.info("Found: \"%s\" " % instance.data["name"])
self.log.info("instance: {} ".format(instance.data))
if len(instance_names) != len(set(instance_names)):
self.log.warning("Duplicate instances found. " +
"Remove unwanted via SubsetManager")
def _resolve_mapping(self, layer):
"""Matches 'layer' color code and name to mapping.
If both color code AND name regex is configured, BOTH must be valid
If layer matches to multiple mappings, only first is used!
"""
family_list = []
family = None
subset_name_list = []
resolved_subset_template = None
for mapping in self.color_code_mapping:
if mapping["color_code"] and \
layer.color_code not in mapping["color_code"]:
break
if mapping["layer_name_regex"] and \
not any(re.search(pattern, layer.name)
for pattern in mapping["layer_name_regex"]):
break
family_list.append(mapping["family"])
subset_name_list.append(mapping["subset_template_name"])
if len(subset_name_list) > 1:
self.log.warning("Multiple mappings found for '{}'".
format(layer.name))
self.log.warning("Only first subset name template used!")
subset_name_list[:] = subset_name_list[0]
if len(family_list) > 1:
self.log.warning("Multiple mappings found for '{}'".
format(layer.name))
self.log.warning("Only first family used!")
family_list[:] = family_list[0]
if subset_name_list:
resolved_subset_template = subset_name_list.pop()
if family_list:
family = family_list.pop()
return family, resolved_subset_template

View file

@ -12,7 +12,7 @@ class ExtractImage(openpype.api.Extractor):
label = "Extract Image"
hosts = ["photoshop"]
families = ["image"]
families = ["image", "background"]
formats = ["png", "jpg"]
def process(self, instance):

View file

@ -15,6 +15,7 @@ import tempfile
import pyblish.api
from avalon import io
from openpype.lib import prepare_template_data
from openpype.lib.plugin_tools import parse_json, get_batch_asset_task_info
class CollectPublishedFiles(pyblish.api.ContextPlugin):
@ -33,22 +34,6 @@ class CollectPublishedFiles(pyblish.api.ContextPlugin):
# from Settings
task_type_to_family = {}
def _load_json(self, path):
path = path.strip('\"')
assert os.path.isfile(path), (
"Path to json file doesn't exist. \"{}\"".format(path)
)
data = None
with open(path, "r") as json_file:
try:
data = json.load(json_file)
except Exception as exc:
self.log.error(
"Error loading json: "
"{} - Exception: {}".format(path, exc)
)
return data
def _process_batch(self, dir_url):
task_subfolders = [
os.path.join(dir_url, o)
@ -56,22 +41,15 @@ class CollectPublishedFiles(pyblish.api.ContextPlugin):
if os.path.isdir(os.path.join(dir_url, o))]
self.log.info("task_sub:: {}".format(task_subfolders))
for task_dir in task_subfolders:
task_data = self._load_json(os.path.join(task_dir,
"manifest.json"))
task_data = parse_json(os.path.join(task_dir,
"manifest.json"))
self.log.info("task_data:: {}".format(task_data))
ctx = task_data["context"]
task_type = "default_task_type"
task_name = None
if ctx["type"] == "task":
items = ctx["path"].split('/')
asset = items[-2]
os.environ["AVALON_TASK"] = ctx["name"]
task_name = ctx["name"]
task_type = ctx["attributes"]["type"]
else:
asset = ctx["name"]
os.environ["AVALON_TASK"] = ""
asset, task_name, task_type = get_batch_asset_task_info(ctx)
if task_name:
os.environ["AVALON_TASK"] = task_name
is_sequence = len(task_data["files"]) > 1
@ -261,7 +239,7 @@ class CollectPublishedFiles(pyblish.api.ContextPlugin):
assert batch_dir, (
"Missing `OPENPYPE_PUBLISH_DATA`")
assert batch_dir, \
assert os.path.exists(batch_dir), \
"Folder {} doesn't exist".format(batch_dir)
project_name = os.environ.get("AVALON_PROJECT")

View file

@ -487,3 +487,48 @@ def should_decompress(file_url):
"compression: \"dwab\"" in output
return False
def parse_json(path):
"""Parses json file at 'path' location
Returns:
(dict) or None if unparsable
Raises:
AsssertionError if 'path' doesn't exist
"""
path = path.strip('\"')
assert os.path.isfile(path), (
"Path to json file doesn't exist. \"{}\"".format(path)
)
data = None
with open(path, "r") as json_file:
try:
data = json.load(json_file)
except Exception as exc:
log.error(
"Error loading json: "
"{} - Exception: {}".format(path, exc)
)
return data
def get_batch_asset_task_info(ctx):
"""Parses context data from webpublisher's batch metadata
Returns:
(tuple): asset, task_name (Optional), task_type
"""
task_type = "default_task_type"
task_name = None
asset = None
if ctx["type"] == "task":
items = ctx["path"].split('/')
asset = items[-2]
task_name = ctx["name"]
task_type = ctx["attributes"]["type"]
else:
asset = ctx["name"]
return asset, task_name, task_type

View file

@ -0,0 +1,110 @@
import os
from datetime import datetime
import sys
from bson.objectid import ObjectId
import pyblish.util
import pyblish.api
from openpype import uninstall
from openpype.lib.mongo import OpenPypeMongoConnection
def get_webpublish_conn():
"""Get connection to OP 'webpublishes' collection."""
mongo_client = OpenPypeMongoConnection.get_mongo_client()
database_name = os.environ["OPENPYPE_DATABASE_NAME"]
return mongo_client[database_name]["webpublishes"]
def start_webpublish_log(dbcon, batch_id, user):
"""Start new log record for 'batch_id'
Args:
dbcon (OpenPypeMongoConnection)
batch_id (str)
user (str)
Returns
(ObjectId) from DB
"""
return dbcon.insert_one({
"batch_id": batch_id,
"start_date": datetime.now(),
"user": user,
"status": "in_progress"
}).inserted_id
def publish_and_log(dbcon, _id, log, close_plugin_name=None):
"""Loops through all plugins, logs ok and fails into OP DB.
Args:
dbcon (OpenPypeMongoConnection)
_id (str)
log (OpenPypeLogger)
close_plugin_name (str): name of plugin with responsibility to
close host app
"""
# Error exit as soon as any error occurs.
error_format = "Failed {plugin.__name__}: {error} -- {error.traceback}"
close_plugin = _get_close_plugin(close_plugin_name, log)
if isinstance(_id, str):
_id = ObjectId(_id)
log_lines = []
for result in pyblish.util.publish_iter():
for record in result["records"]:
log_lines.append("{}: {}".format(
result["plugin"].label, record.msg))
if result["error"]:
log.error(error_format.format(**result))
uninstall()
log_lines.append(error_format.format(**result))
dbcon.update_one(
{"_id": _id},
{"$set":
{
"finish_date": datetime.now(),
"status": "error",
"log": os.linesep.join(log_lines)
}}
)
if close_plugin: # close host app explicitly after error
context = pyblish.api.Context()
close_plugin().process(context)
sys.exit(1)
else:
dbcon.update_one(
{"_id": _id},
{"$set":
{
"progress": max(result["progress"], 0.95),
"log": os.linesep.join(log_lines)
}}
)
# final update
dbcon.update_one(
{"_id": _id},
{"$set":
{
"finish_date": datetime.now(),
"status": "finished_ok",
"progress": 1,
"log": os.linesep.join(log_lines)
}}
)
def _get_close_plugin(close_plugin_name, log):
if close_plugin_name:
plugins = pyblish.api.discover()
for plugin in plugins:
if plugin.__name__ == close_plugin_name:
return plugin
log.warning("Close plugin not found, app might not close.")

View file

@ -26,14 +26,21 @@ class CollectUsername(pyblish.api.ContextPlugin):
"""
order = pyblish.api.CollectorOrder - 0.488
label = "Collect ftrack username"
hosts = ["webpublisher"]
hosts = ["webpublisher", "photoshop"]
_context = None
def process(self, context):
self.log.info("CollectUsername")
# photoshop could be triggered remotely in webpublisher fashion
if os.environ["AVALON_APP"] == "photoshop":
if not os.environ.get("IS_HEADLESS"):
self.log.debug("Regular process, skipping")
return
os.environ["FTRACK_API_USER"] = os.environ["FTRACK_BOT_API_USER"]
os.environ["FTRACK_API_KEY"] = os.environ["FTRACK_BOT_API_KEY"]
self.log.info("CollectUsername")
for instance in context:
email = instance.data["user_email"]
self.log.info("email:: {}".format(email))

View file

@ -4,9 +4,16 @@ import os
import sys
import json
from datetime import datetime
import time
from openpype.lib import PypeLogger
from openpype.api import get_app_environments_for_context
from openpype.lib.plugin_tools import parse_json, get_batch_asset_task_info
from openpype.lib.remote_publish import (
get_webpublish_conn,
start_webpublish_log,
publish_and_log
)
class PypeCommands:
@ -110,10 +117,100 @@ class PypeCommands:
log.info("Publish finished.")
uninstall()
@staticmethod
def remotepublishfromapp(project, batch_dir, host, user, targets=None):
"""Opens installed variant of 'host' and run remote publish there.
Currently implemented and tested for Photoshop where customer
wants to process uploaded .psd file and publish collected layers
from there.
Requires installed host application on the machine.
Runs publish process as user would, in automatic fashion.
"""
from openpype import install, uninstall
from openpype.api import Logger
log = Logger.get_logger()
log.info("remotepublishphotoshop command")
install()
from openpype.lib import ApplicationManager
application_manager = ApplicationManager()
app_group = application_manager.app_groups.get(host)
if not app_group or not app_group.enabled:
raise ValueError("No application {} configured".format(host))
found_variant_key = None
# finds most up-to-date variant if any installed
for variant_key, variant in app_group.variants.items():
for executable in variant.executables:
if executable.exists():
found_variant_key = variant_key
if not found_variant_key:
raise ValueError("No executable for {} found".format(host))
app_name = "{}/{}".format(host, found_variant_key)
batch_data = None
if batch_dir and os.path.exists(batch_dir):
# TODO check if batch manifest is same as tasks manifests
batch_data = parse_json(os.path.join(batch_dir, "manifest.json"))
if not batch_data:
raise ValueError(
"Cannot parse batch meta in {} folder".format(batch_dir))
asset, task_name, _task_type = get_batch_asset_task_info(
batch_data["context"])
workfile_path = os.path.join(batch_dir,
batch_data["task"],
batch_data["files"][0])
print("workfile_path {}".format(workfile_path))
# must have for proper launch of app
env = get_app_environments_for_context(
project,
asset,
task_name,
app_name
)
os.environ.update(env)
_, batch_id = os.path.split(batch_dir)
dbcon = get_webpublish_conn()
# safer to start logging here, launch might be broken altogether
_id = start_webpublish_log(dbcon, batch_id, user)
os.environ["OPENPYPE_PUBLISH_DATA"] = batch_dir
os.environ["IS_HEADLESS"] = "true"
# must pass identifier to update log lines for a batch
os.environ["BATCH_LOG_ID"] = str(_id)
data = {
"last_workfile_path": workfile_path,
"start_last_workfile": True
}
launched_app = application_manager.launch(app_name, **data)
while launched_app.poll() is None:
time.sleep(0.5)
uninstall()
@staticmethod
def remotepublish(project, batch_path, host, user, targets=None):
"""Start headless publishing.
Used to publish rendered assets, workfiles etc.
Publish use json from passed paths argument.
Args:
@ -134,7 +231,6 @@ class PypeCommands:
from openpype import install, uninstall
from openpype.api import Logger
from openpype.lib import OpenPypeMongoConnection
# Register target and host
import pyblish.api
@ -166,62 +262,11 @@ class PypeCommands:
log.info("Running publish ...")
# Error exit as soon as any error occurs.
error_format = "Failed {plugin.__name__}: {error} -- {error.traceback}"
mongo_client = OpenPypeMongoConnection.get_mongo_client()
database_name = os.environ["OPENPYPE_DATABASE_NAME"]
dbcon = mongo_client[database_name]["webpublishes"]
_, batch_id = os.path.split(batch_path)
_id = dbcon.insert_one({
"batch_id": batch_id,
"start_date": datetime.now(),
"user": user,
"status": "in_progress"
}).inserted_id
dbcon = get_webpublish_conn()
_id = start_webpublish_log(dbcon, batch_id, user)
log_lines = []
for result in pyblish.util.publish_iter():
for record in result["records"]:
log_lines.append("{}: {}".format(
result["plugin"].label, record.msg))
if result["error"]:
log.error(error_format.format(**result))
uninstall()
log_lines.append(error_format.format(**result))
dbcon.update_one(
{"_id": _id},
{"$set":
{
"finish_date": datetime.now(),
"status": "error",
"log": os.linesep.join(log_lines)
}}
)
sys.exit(1)
else:
dbcon.update_one(
{"_id": _id},
{"$set":
{
"progress": max(result["progress"], 0.95),
"log": os.linesep.join(log_lines)
}}
)
dbcon.update_one(
{"_id": _id},
{"$set":
{
"finish_date": datetime.now(),
"status": "finished_ok",
"progress": 1,
"log": os.linesep.join(log_lines)
}}
)
publish_and_log(dbcon, _id, log)
log.info("Publish finished.")
uninstall()

View file

@ -162,9 +162,7 @@
]
}
],
"customNodes": [
]
"customNodes": []
},
"regexInputs": {
"inputs": [

View file

@ -12,6 +12,16 @@
"optional": true,
"active": true
},
"CollectRemoteInstances": {
"color_code_mapping": [
{
"color_code": [],
"layer_name_regex": [],
"family": "",
"subset_template_name": ""
}
]
},
"ExtractImage": {
"formats": [
"png",

View file

@ -43,6 +43,56 @@
}
]
},
{
"type": "dict",
"collapsible": true,
"is_group": true,
"key": "CollectRemoteInstances",
"label": "Collect Instances for Webpublish",
"children": [
{
"type": "label",
"label": "Set color for publishable layers, set publishable families."
},
{
"type": "list",
"key": "color_code_mapping",
"label": "Color code mappings",
"use_label_wrap": false,
"collapsible": false,
"object_type": {
"type": "dict",
"children": [
{
"type": "list",
"key": "color_code",
"label": "Color codes for layers",
"object_type": "text"
},
{
"type": "list",
"key": "layer_name_regex",
"label": "Layer name regex",
"object_type": "text"
},
{
"type": "splitter"
},
{
"key": "family",
"label": "Resulting family",
"type": "text"
},
{
"type": "text",
"key": "subset_template_name",
"label": "Subset template name"
}
]
}
}
]
},
{
"type": "dict",
"collapsible": true,

View file

@ -164,8 +164,9 @@ class LoaderWindow(QtWidgets.QDialog):
subsets_widget.load_started.connect(self._on_load_start)
subsets_widget.load_ended.connect(self._on_load_end)
repres_widget.load_started.connect(self._on_load_start)
repres_widget.load_ended.connect(self._on_load_end)
if repres_widget:
repres_widget.load_started.connect(self._on_load_start)
repres_widget.load_ended.connect(self._on_load_end)
self._sync_server_enabled = sync_server_enabled

View file

@ -142,18 +142,23 @@ class ConsoleTrayApp:
self.tray_reconnect = False
ConsoleTrayApp.webserver_client.close()
def _send_text(self, new_text):
def _send_text_queue(self):
"""Sends lines and purges queue"""
lines = tuple(self.new_text)
self.new_text.clear()
if lines:
self._send_lines(lines)
def _send_lines(self, lines):
""" Send console content. """
if not ConsoleTrayApp.webserver_client:
return
if isinstance(new_text, str):
new_text = collections.deque(new_text.split("\n"))
payload = {
"host": self.host_id,
"action": host_console_listener.MsgAction.ADD,
"text": "\n".join(new_text)
"text": "\n".join(lines)
}
self._send(payload)
@ -174,14 +179,7 @@ class ConsoleTrayApp:
if self.tray_reconnect:
self._connect() # reconnect
if ConsoleTrayApp.webserver_client and self.new_text:
self._send_text(self.new_text)
self.new_text = collections.deque()
if self.new_text: # no webserver_client, text keeps stashing
start = max(len(self.new_text) - self.MAX_LINES, 0)
self.new_text = itertools.islice(self.new_text,
start, self.MAX_LINES)
self._send_text_queue()
if not self.initialized:
if self.initializing:
@ -191,7 +189,7 @@ class ConsoleTrayApp:
elif not host_connected:
text = "{} process is not alive. Exiting".format(self.host)
print(text)
self._send_text([text])
self._send_lines([text])
ConsoleTrayApp.websocket_server.stop()
sys.exit(1)
elif host_connected:
@ -205,14 +203,15 @@ class ConsoleTrayApp:
self.initializing = True
self.launch_method(*self.subprocess_args)
elif ConsoleTrayApp.process.poll() is not None:
self.exit()
elif ConsoleTrayApp.callback_queue:
elif ConsoleTrayApp.callback_queue and \
not ConsoleTrayApp.callback_queue.empty():
try:
callback = ConsoleTrayApp.callback_queue.get(block=False)
callback()
except queue.Empty:
pass
elif ConsoleTrayApp.process.poll() is not None:
self.exit()
@classmethod
def execute_in_main_thread(cls, func_to_call_from_main_thread):
@ -232,8 +231,9 @@ class ConsoleTrayApp:
self._close()
if ConsoleTrayApp.websocket_server:
ConsoleTrayApp.websocket_server.stop()
ConsoleTrayApp.process.kill()
ConsoleTrayApp.process.wait()
if ConsoleTrayApp.process:
ConsoleTrayApp.process.kill()
ConsoleTrayApp.process.wait()
if self.timer:
self.timer.stop()
QtCore.QCoreApplication.exit()

View file

@ -0,0 +1,94 @@
import pytest
import os
import shutil
from tests.lib.testing_classes import PublishTest
class TestPublishInPhotoshop(PublishTest):
"""Basic test case for publishing in Photoshop
Uses generic TestCase to prepare fixtures for test data, testing DBs,
env vars.
Opens Maya, run publish on prepared workile.
Then checks content of DB (if subset, version, representations were
created.
Checks tmp folder if all expected files were published.
"""
PERSIST = True
TEST_FILES = [
("1Bciy2pCwMKl1UIpxuPnlX_LHMo_Xkq0K", "test_photoshop_publish.zip", "")
]
APP = "photoshop"
APP_VARIANT = "2020"
APP_NAME = "{}/{}".format(APP, APP_VARIANT)
TIMEOUT = 120 # publish timeout
@pytest.fixture(scope="module")
def last_workfile_path(self, download_test_data):
"""Get last_workfile_path from source data.
Maya expects workfile in proper folder, so copy is done first.
"""
src_path = os.path.join(download_test_data,
"input",
"workfile",
"test_project_test_asset_TestTask_v001.psd")
dest_folder = os.path.join(download_test_data,
self.PROJECT,
self.ASSET,
"work",
self.TASK)
os.makedirs(dest_folder)
dest_path = os.path.join(dest_folder,
"test_project_test_asset_TestTask_v001.psd")
shutil.copy(src_path, dest_path)
yield dest_path
@pytest.fixture(scope="module")
def startup_scripts(self, monkeypatch_session, download_test_data):
"""Points Maya to userSetup file from input data"""
os.environ["IS_HEADLESS"] = "true"
def test_db_asserts(self, dbcon, publish_finished):
"""Host and input data dependent expected results in DB."""
print("test_db_asserts")
assert 5 == dbcon.count_documents({"type": "version"}), \
"Not expected no of versions"
assert 0 == dbcon.count_documents({"type": "version",
"name": {"$ne": 1}}), \
"Only versions with 1 expected"
assert 1 == dbcon.count_documents({"type": "subset",
"name": "modelMain"}), \
"modelMain subset must be present"
assert 1 == dbcon.count_documents({"type": "subset",
"name": "workfileTest_task"}), \
"workfileTest_task subset must be present"
assert 11 == dbcon.count_documents({"type": "representation"}), \
"Not expected no of representations"
assert 2 == dbcon.count_documents({"type": "representation",
"context.subset": "modelMain",
"context.ext": "abc"}), \
"Not expected no of representations with ext 'abc'"
assert 2 == dbcon.count_documents({"type": "representation",
"context.subset": "modelMain",
"context.ext": "ma"}), \
"Not expected no of representations with ext 'abc'"
if __name__ == "__main__":
test_case = TestPublishInPhotoshop()

View file

@ -228,6 +228,7 @@ class PublishTest(ModuleUnitTest):
while launched_app.poll() is None:
time.sleep(0.5)
if time.time() - time_start > self.TIMEOUT:
launched_app.terminate()
raise ValueError("Timeout reached")
# some clean exit test possible?