basic implementation publish model

This commit is contained in:
Jakub Trllo 2024-06-21 12:00:57 +02:00
parent d95fcaf9f4
commit 7666a5e6c8
2 changed files with 592 additions and 84 deletions

View file

@ -1,15 +1,9 @@
from .create import CreatorItem
from .publish import (
PublishReportMaker,
PublishValidationErrors,
PublishPluginsProxy,
)
from .publish import PublishModel
__all__ = (
"CreatorItem",
"PublishReportMaker",
"PublishValidationErrors",
"PublishPluginsProxy",
"PublishModel",
)

View file

@ -1,12 +1,24 @@
import uuid
import copy
import inspect
import traceback
import collections
from functools import partial
import arrow
import pyblish.plugin
from ayon_core.pipeline import (
PublishValidationError,
KnownPublishError,
OptionalPyblishPluginMixin,
)
from ayon_core.pipeline.publish import get_publish_instance_label
PUBLISH_EVENT_SOURCE = "publisher.publish.model"
# Define constant for plugin orders offset
PLUGIN_ORDER_OFFSET = 0.5
class PublishReportMaker:
"""Report for single publishing process.
@ -14,91 +26,74 @@ class PublishReportMaker:
Report keeps current state of publishing and currently processed plugin.
"""
def __init__(self, controller):
self.controller = controller
def __init__(
self,
creator_discover_result=None,
convertor_discover_result=None,
publish_discover_result=None,
):
self._create_discover_result = None
self._convert_discover_result = None
self._publish_discover_result = None
self._plugin_data_by_id = {}
self._current_plugin = None
self._current_plugin_data = {}
self._all_instances_by_id = {}
self._current_context = None
self._plugin_data_by_id = {}
self._current_plugin_id = None
def reset(self, context, create_context):
self.reset(
creator_discover_result,
convertor_discover_result,
publish_discover_result,
)
def reset(
self,
creator_discover_result,
convertor_discover_result,
publish_discover_result,
):
"""Reset report and clear all data."""
self._create_discover_result = create_context.creator_discover_result
self._convert_discover_result = (
create_context.convertor_discover_result
)
self._publish_discover_result = create_context.publish_discover_result
self._create_discover_result = creator_discover_result
self._convert_discover_result = convertor_discover_result
self._publish_discover_result = publish_discover_result
self._plugin_data_by_id = {}
self._current_plugin = None
self._current_plugin_data = {}
self._all_instances_by_id = {}
self._current_context = context
self._plugin_data_by_id = {}
self._current_plugin_id = None
for plugin in create_context.publish_plugins_mismatch_targets:
plugin_data = self._add_plugin_data_item(plugin)
plugin_data["skipped"] = True
publish_plugins = []
if publish_discover_result is not None:
publish_plugins = publish_discover_result.plugins
def add_plugin_iter(self, plugin, context):
for plugin in publish_plugins:
self._add_plugin_data_item(plugin)
def add_plugin_iter(self, plugin_id, context):
"""Add report about single iteration of plugin."""
for instance in context:
self._all_instances_by_id[instance.id] = instance
if self._current_plugin_data:
self._current_plugin_data["passed"] = True
self._current_plugin_id = plugin_id
self._current_plugin = plugin
self._current_plugin_data = self._add_plugin_data_item(plugin)
def set_plugin_passed(self, plugin_id):
plugin_data = self._plugin_data_by_id[plugin_id]
plugin_data["passed"] = True
def _add_plugin_data_item(self, plugin):
if plugin.id in self._plugin_data_by_id:
# A plugin would be processed more than once. What can cause it:
# - there is a bug in controller
# - plugin class is imported into multiple files
# - this can happen even with base classes from 'pyblish'
raise ValueError(
"Plugin '{}' is already stored".format(str(plugin)))
plugin_data_item = self._create_plugin_data_item(plugin)
self._plugin_data_by_id[plugin.id] = plugin_data_item
return plugin_data_item
def _create_plugin_data_item(self, plugin):
label = None
if hasattr(plugin, "label"):
label = plugin.label
return {
"id": plugin.id,
"name": plugin.__name__,
"label": label,
"order": plugin.order,
"targets": list(plugin.targets),
"instances_data": [],
"actions_data": [],
"skipped": False,
"passed": False
}
def set_plugin_skipped(self):
def set_plugin_skipped(self, plugin_id):
"""Set that current plugin has been skipped."""
self._current_plugin_data["skipped"] = True
plugin_data = self._plugin_data_by_id[plugin_id]
plugin_data["skipped"] = True
def add_result(self, result):
def add_result(self, plugin_id, result):
"""Handle result of one plugin and it's instance."""
instance = result["instance"]
instance_id = None
if instance is not None:
instance_id = instance.id
self._current_plugin_data["instances_data"].append({
plugin_data = self._plugin_data_by_id[plugin_id]
plugin_data["instances_data"].append({
"id": instance_id,
"logs": self._extract_instance_log_items(result),
"process_time": result["duration"]
@ -108,9 +103,7 @@ class PublishReportMaker:
"""Add result of single action."""
plugin = result["plugin"]
store_item = self._plugin_data_by_id.get(plugin.id)
if store_item is None:
store_item = self._add_plugin_data_item(plugin)
store_item = self._plugin_data_by_id[plugin.id]
action_name = action.__name__
action_label = action.label or action_name
@ -122,15 +115,16 @@ class PublishReportMaker:
"logs": log_items
})
def get_report(self, publish_plugins=None):
def get_report(self, publish_context):
"""Report data with all details of current state."""
now = arrow.utcnow().to("local")
instances_details = {}
for instance in self._all_instances_by_id.values():
instances_details[instance.id] = self._extract_instance_data(
instance, instance in self._current_context
instances_details = {
instance.id: self._extract_instance_data(
instance, instance in publish_context
)
for instance in self._all_instances_by_id.values()
}
plugins_data_by_id = copy.deepcopy(
self._plugin_data_by_id
@ -138,19 +132,13 @@ class PublishReportMaker:
# Ensure the current plug-in is marked as `passed` in the result
# so that it shows on reports for paused publishes
if self._current_plugin is not None:
if self._current_plugin_id is not None:
current_plugin_data = plugins_data_by_id.get(
self._current_plugin.id
self._current_plugin_id
)
if current_plugin_data and not current_plugin_data["passed"]:
current_plugin_data["passed"] = True
if publish_plugins:
for plugin in publish_plugins:
if plugin.id not in plugins_data_by_id:
plugins_data_by_id[plugin.id] = \
self._create_plugin_data_item(plugin)
reports = []
if self._create_discover_result is not None:
reports.append(self._create_discover_result)
@ -172,13 +160,42 @@ class PublishReportMaker:
return {
"plugins_data": list(plugins_data_by_id.values()),
"instances": instances_details,
"context": self._extract_context_data(self._current_context),
"context": self._extract_context_data(publish_context),
"crashed_file_paths": crashed_file_paths,
"id": uuid.uuid4().hex,
"created_at": now.isoformat(),
"report_version": "1.0.1",
}
def _add_plugin_data_item(self, plugin):
if plugin.id in self._plugin_data_by_id:
# A plugin would be processed more than once. What can cause it:
# - there is a bug in controller
# - plugin class is imported into multiple files
# - this can happen even with base classes from 'pyblish'
raise ValueError(
"Plugin '{}' is already stored".format(str(plugin)))
plugin_data_item = self._create_plugin_data_item(plugin)
self._plugin_data_by_id[plugin.id] = plugin_data_item
def _create_plugin_data_item(self, plugin):
label = None
if hasattr(plugin, "label"):
label = plugin.label
return {
"id": plugin.id,
"name": plugin.__name__,
"label": label,
"order": plugin.order,
"targets": list(plugin.targets),
"instances_data": [],
"actions_data": [],
"skipped": False,
"passed": False
}
def _extract_context_data(self, context):
context_label = "Context"
if context is not None:
@ -674,3 +691,500 @@ class PublishValidationErrors:
plugin_id
)
self._plugin_action_items[plugin_id] = plugin_actions
def collect_families_from_instances(instances, only_active=False):
"""Collect all families for passed publish instances.
Args:
instances(list<pyblish.api.Instance>): List of publish instances from
which are families collected.
only_active(bool): Return families only for active instances.
Returns:
list[str]: Families available on instances.
"""
all_families = set()
for instance in instances:
if only_active:
if instance.data.get("publish") is False:
continue
family = instance.data.get("family")
if family:
all_families.add(family)
families = instance.data.get("families") or tuple()
for family in families:
all_families.add(family)
return list(all_families)
class PublishModel:
def __init__(self, controller):
self._controller = controller
# Publishing should stop at validation stage
self._publish_up_validation = False
self._publish_comment_is_set = False
# Any other exception that happened during publishing
self._publish_error_msg = None
# Publishing is in progress
self._publish_is_running = False
# Publishing is over validation order
self._publish_has_validated = False
self._publish_has_validation_errors = False
self._publish_has_crashed = False
# All publish plugins are processed
self._publish_has_started = False
self._publish_has_finished = False
self._publish_max_progress = 0
self._publish_progress = 0
self._publish_plugins = []
self._publish_plugins_proxy = None
# pyblish.api.Context
self._publish_context = None
# Pyblish report
self._publish_report = PublishReportMaker()
# Store exceptions of validation error
self._publish_validation_errors = PublishValidationErrors()
# This information is not much important for controller but for widget
# which can change (and set) the comment.
self._publish_comment_is_set = False
# Validation order
# - plugin with order same or higher than this value is extractor or
# higher
self._validation_order = (
pyblish.api.ValidatorOrder + PLUGIN_ORDER_OFFSET
)
# Plugin iterator
self._main_thread_iter = None
def reset(self, create_context):
self._publish_up_validation = False
self._publish_comment_is_set = False
self._publish_has_started = False
self._set_publish_error_msg(None)
self._set_progress(0)
self._set_is_running(False)
self._set_has_validated(False)
self._set_is_crashed(False)
self._set_has_validation_errors(False)
self._set_finished(False)
self._main_thread_iter = self._publish_iterator()
self._publish_context = pyblish.api.Context()
# Make sure "comment" is set on publish context
self._publish_context.data["comment"] = ""
# Add access to create context during publishing
# - must not be used for changing CreatedInstances during publishing!
# QUESTION
# - pop the key after first collector using it would be safest option?
self._publish_context.data["create_context"] = create_context
publish_plugins = create_context.publish_plugins
self._publish_plugins = publish_plugins
self._publish_plugins_proxy = PublishPluginsProxy(
publish_plugins
)
self._publish_report.reset(
create_context.creator_discover_result,
create_context.convertor_discover_result,
create_context.publish_discover_result,
)
for plugin in create_context.publish_plugins_mismatch_targets:
self._publish_report.set_plugin_skipped(plugin.id)
self._publish_validation_errors.reset(self._publish_plugins_proxy)
self._set_max_progress(len(publish_plugins))
self._emit_event("publish.reset.finished")
def set_publish_up_validation(self, value):
self._publish_up_validation = value
def start_publish(self, wait=True):
"""Run publishing.
Make sure all changes are saved before method is called (Call
'save_changes' and check output).
"""
if self._publish_up_validation and self._publish_has_validated:
return
self._start_publish()
if not wait:
return
while self.is_running():
func = self.get_next_process_func()
func()
def get_next_process_func(self):
# Validations of progress before using iterator
# - same conditions may be inside iterator but they may be used
# only in specific cases (e.g. when it happens for a first time)
# There are validation errors and validation is passed
# - can't do any progree
if (
self._publish_has_validated
and self._publish_has_validation_errors
):
item = partial(self.stop_publish)
# Any unexpected error happened
# - everything should stop
elif self._publish_has_crashed:
item = partial(self.stop_publish)
# Everything is ok so try to get new processing item
else:
item = next(self._main_thread_iter)
return item
def stop_publish(self):
if self._publish_is_running:
self._stop_publish()
def is_running(self):
return self._publish_is_running
def is_crashed(self):
return self._publish_has_crashed
def has_started(self):
return self._publish_has_started
def has_finished(self):
return self._publish_has_finished
def has_validated(self):
return self._publish_has_validated
def has_validation_errors(self):
return self._publish_has_validation_errors
def get_progress(self):
return self._publish_progress
def get_max_progress(self):
return self._publish_max_progress
def get_publish_report(self):
return self._publish_report.get_report(
self._publish_context
)
def get_validation_errors(self):
return self._publish_validation_errors.create_report()
def get_error_msg(self):
return self._publish_error_msg
def set_comment(self, comment):
# Ignore change of comment when publishing started
if self._publish_has_started:
return
self._publish_context.data["comment"] = comment
self._publish_comment_is_set = True
def run_action(self, plugin_id, action_id):
# TODO handle result in UI
plugin = self._publish_plugins_proxy.get_plugin(plugin_id)
action = self._publish_plugins_proxy.get_action(plugin_id, action_id)
result = pyblish.plugin.process(
plugin, self._publish_context, None, action.id
)
exception = result.get("error")
if exception:
self._emit_event(
"publish.action.failed",
{
"title": "Action failed",
"message": "Action failed.",
"traceback": "".join(
traceback.format_exception(
type(exception),
exception,
exception.__traceback__
)
),
"label": action.__name__,
"identifier": action.id
}
)
self._publish_report.add_action_result(action, result)
self._controller.emit_card_message("Action finished.")
def _emit_event(self, topic, data=None):
self._controller.emit_event(topic, data, PUBLISH_EVENT_SOURCE)
def _set_finished(self, value):
if self._publish_has_finished != value:
self._publish_has_finished = value
self._emit_event(
"publish.finished.changed",
{"value": value}
)
def _set_is_running(self, value):
if self._publish_is_running != value:
self._publish_is_running = value
self._emit_event(
"publish.is_running.changed",
{"value": value}
)
def _set_has_validated(self, value):
if self._publish_has_validated != value:
self._publish_has_validated = value
self._emit_event(
"publish.has_validated.changed",
{"value": value}
)
def _set_is_crashed(self, value):
if self._publish_has_crashed != value:
self._publish_has_crashed = value
self._emit_event(
"publish.has_crashed.changed",
{"value": value}
)
def _set_has_validation_errors(self, value):
if self._publish_has_validation_errors != value:
self._publish_has_validation_errors = value
self._emit_event(
"publish.has_validation_errors.changed",
{"value": value}
)
def _set_max_progress(self, value):
if self._publish_max_progress != value:
self._publish_max_progress = value
self._emit_event(
"publish.max_progress.changed",
{"value": value}
)
def _set_progress(self, value):
if self._publish_progress != value:
self._publish_progress = value
self._emit_event(
"publish.progress.changed",
{"value": value}
)
def _set_publish_error_msg(self, value):
if self._publish_error_msg != value:
self._publish_error_msg = value
self._emit_event(
"publish.publish_error.changed",
{"value": value}
)
def _start_publish(self):
"""Start or continue in publishing."""
if self._publish_is_running:
return
self._set_is_running(True)
self._publish_has_started = True
self._emit_event("publish.process.started")
def _stop_publish(self):
"""Stop or pause publishing."""
self._set_is_running(False)
self._emit_event("publish.process.stopped")
def _publish_iterator(self):
"""Main logic center of publishing.
Iterator returns `partial` objects with callbacks that should be
processed in main thread (threaded in future?). Cares about changing
states of currently processed publish plugin and instance. Also
change state of processed orders like validation order has passed etc.
Also stops publishing, if should stop on validation.
"""
for idx, plugin in enumerate(self._publish_plugins):
self._publish_progress = idx
# Check if plugin is over validation order
if not self._publish_has_validated:
self._set_has_validated(
plugin.order >= self._validation_order
)
# Stop if plugin is over validation order and process
# should process up to validation.
if self._publish_up_validation and self._publish_has_validated:
yield partial(self.stop_publish)
# Stop if validation is over and validation errors happened
if (
self._publish_has_validated
and self.has_validation_errors()
):
yield partial(self.stop_publish)
# Add plugin to publish report
self._publish_report.add_plugin_iter(
plugin.id, self._publish_context)
# WARNING This is hack fix for optional plugins
if not self._is_publish_plugin_active(plugin):
self._publish_report.set_plugin_skipped(plugin.id)
continue
# Trigger callback that new plugin is going to be processed
plugin_label = plugin.__name__
if hasattr(plugin, "label") and plugin.label:
plugin_label = plugin.label
self._emit_event(
"publish.process.plugin.changed",
{"plugin_label": plugin_label}
)
# Plugin is instance plugin
if plugin.__instanceEnabled__:
instances = pyblish.logic.instances_by_plugin(
self._publish_context, plugin
)
if not instances:
self._publish_report.set_plugin_skipped(plugin.id)
continue
for instance in instances:
if instance.data.get("publish") is False:
continue
instance_label = (
instance.data.get("label")
or instance.data["name"]
)
self._emit_event(
"publish.process.instance.changed",
{"instance_label": instance_label}
)
yield partial(
self._process_and_continue, plugin, instance
)
else:
families = collect_families_from_instances(
self._publish_context, only_active=True
)
plugins = pyblish.logic.plugins_by_families(
[plugin], families
)
if not plugins:
self._publish_report.set_plugin_skipped(plugin.id)
continue
instance_label = (
self._publish_context.data.get("label")
or self._publish_context.data.get("name")
or "Context"
)
self._emit_event(
"publish.process.instance.changed",
{"instance_label": instance_label}
)
yield partial(
self._process_and_continue, plugin, None
)
self._publish_report.set_plugin_passed(plugin.id)
# Cleanup of publishing process
self._set_finished(True)
self._set_progress(self._publish_max_progress)
yield partial(self.stop_publish)
def _process_and_continue(self, plugin, instance):
result = pyblish.plugin.process(
plugin, self._publish_context, instance
)
exception = result.get("error")
if exception:
has_validation_error = False
if (
isinstance(exception, PublishValidationError)
and not self._publish_has_validated
):
has_validation_error = True
self._add_validation_error(result)
else:
if isinstance(exception, KnownPublishError):
msg = str(exception)
else:
msg = (
"Something went wrong. Send report"
" to your supervisor or Ynput team."
)
self._set_publish_error_msg(msg)
self._set_is_crashed(True)
result["is_validation_error"] = has_validation_error
self._publish_report.add_result(plugin.id, result)
def _add_validation_error(self, result):
self._set_has_validation_errors(True)
self._publish_validation_errors.add_error(
result["plugin"],
result["error"],
result["instance"]
)
def _is_publish_plugin_active(self, plugin):
"""Decide if publish plugin is active.
This is hack because 'active' is mis-used in mixin
'OptionalPyblishPluginMixin' where 'active' is used for default value
of optional plugins. Because of that is 'active' state of plugin
which inherit from 'OptionalPyblishPluginMixin' ignored. That affects
headless publishing inside host, potentially remote publishing.
We have to change that to match pyblish base, but we can do that
only when all hosts use Publisher because the change requires
change of settings schemas.
Args:
plugin (pyblish.Plugin): Plugin which should be checked if is
active.
Returns:
bool: Is plugin active.
"""
if plugin.active:
return True
if not plugin.optional:
return False
if OptionalPyblishPluginMixin in inspect.getmro(plugin):
return True
return False