From 7666a5e6c8a48e804601324c4c758ee848a85aa4 Mon Sep 17 00:00:00 2001 From: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> Date: Fri, 21 Jun 2024 12:00:57 +0200 Subject: [PATCH] basic implementation publish model --- .../tools/publisher/models/__init__.py | 10 +- .../tools/publisher/models/publish.py | 666 ++++++++++++++++-- 2 files changed, 592 insertions(+), 84 deletions(-) diff --git a/client/ayon_core/tools/publisher/models/__init__.py b/client/ayon_core/tools/publisher/models/__init__.py index 2dbf8fb63c..156141969c 100644 --- a/client/ayon_core/tools/publisher/models/__init__.py +++ b/client/ayon_core/tools/publisher/models/__init__.py @@ -1,15 +1,9 @@ from .create import CreatorItem -from .publish import ( - PublishReportMaker, - PublishValidationErrors, - PublishPluginsProxy, -) +from .publish import PublishModel __all__ = ( "CreatorItem", - "PublishReportMaker", - "PublishValidationErrors", - "PublishPluginsProxy", + "PublishModel", ) diff --git a/client/ayon_core/tools/publisher/models/publish.py b/client/ayon_core/tools/publisher/models/publish.py index 1cd787a2f5..9660862ee8 100644 --- a/client/ayon_core/tools/publisher/models/publish.py +++ b/client/ayon_core/tools/publisher/models/publish.py @@ -1,12 +1,24 @@ import uuid import copy +import inspect import traceback import collections +from functools import partial import arrow +import pyblish.plugin +from ayon_core.pipeline import ( + PublishValidationError, + KnownPublishError, + OptionalPyblishPluginMixin, +) from ayon_core.pipeline.publish import get_publish_instance_label +PUBLISH_EVENT_SOURCE = "publisher.publish.model" +# Define constant for plugin orders offset +PLUGIN_ORDER_OFFSET = 0.5 + class PublishReportMaker: """Report for single publishing process. @@ -14,91 +26,74 @@ class PublishReportMaker: Report keeps current state of publishing and currently processed plugin. """ - def __init__(self, controller): - self.controller = controller + def __init__( + self, + creator_discover_result=None, + convertor_discover_result=None, + publish_discover_result=None, + ): self._create_discover_result = None self._convert_discover_result = None self._publish_discover_result = None - self._plugin_data_by_id = {} - self._current_plugin = None - self._current_plugin_data = {} self._all_instances_by_id = {} - self._current_context = None + self._plugin_data_by_id = {} + self._current_plugin_id = None - def reset(self, context, create_context): + self.reset( + creator_discover_result, + convertor_discover_result, + publish_discover_result, + ) + + def reset( + self, + creator_discover_result, + convertor_discover_result, + publish_discover_result, + ): """Reset report and clear all data.""" - self._create_discover_result = create_context.creator_discover_result - self._convert_discover_result = ( - create_context.convertor_discover_result - ) - self._publish_discover_result = create_context.publish_discover_result + self._create_discover_result = creator_discover_result + self._convert_discover_result = convertor_discover_result + self._publish_discover_result = publish_discover_result - self._plugin_data_by_id = {} - self._current_plugin = None - self._current_plugin_data = {} self._all_instances_by_id = {} - self._current_context = context + self._plugin_data_by_id = {} + self._current_plugin_id = None - for plugin in create_context.publish_plugins_mismatch_targets: - plugin_data = self._add_plugin_data_item(plugin) - plugin_data["skipped"] = True + publish_plugins = [] + if publish_discover_result is not None: + publish_plugins = publish_discover_result.plugins - def add_plugin_iter(self, plugin, context): + for plugin in publish_plugins: + self._add_plugin_data_item(plugin) + + def add_plugin_iter(self, plugin_id, context): """Add report about single iteration of plugin.""" for instance in context: self._all_instances_by_id[instance.id] = instance - if self._current_plugin_data: - self._current_plugin_data["passed"] = True + self._current_plugin_id = plugin_id - self._current_plugin = plugin - self._current_plugin_data = self._add_plugin_data_item(plugin) + def set_plugin_passed(self, plugin_id): + plugin_data = self._plugin_data_by_id[plugin_id] + plugin_data["passed"] = True - def _add_plugin_data_item(self, plugin): - if plugin.id in self._plugin_data_by_id: - # A plugin would be processed more than once. What can cause it: - # - there is a bug in controller - # - plugin class is imported into multiple files - # - this can happen even with base classes from 'pyblish' - raise ValueError( - "Plugin '{}' is already stored".format(str(plugin))) - - plugin_data_item = self._create_plugin_data_item(plugin) - self._plugin_data_by_id[plugin.id] = plugin_data_item - - return plugin_data_item - - def _create_plugin_data_item(self, plugin): - label = None - if hasattr(plugin, "label"): - label = plugin.label - - return { - "id": plugin.id, - "name": plugin.__name__, - "label": label, - "order": plugin.order, - "targets": list(plugin.targets), - "instances_data": [], - "actions_data": [], - "skipped": False, - "passed": False - } - - def set_plugin_skipped(self): + def set_plugin_skipped(self, plugin_id): """Set that current plugin has been skipped.""" - self._current_plugin_data["skipped"] = True + plugin_data = self._plugin_data_by_id[plugin_id] + plugin_data["skipped"] = True - def add_result(self, result): + def add_result(self, plugin_id, result): """Handle result of one plugin and it's instance.""" instance = result["instance"] instance_id = None if instance is not None: instance_id = instance.id - self._current_plugin_data["instances_data"].append({ + plugin_data = self._plugin_data_by_id[plugin_id] + plugin_data["instances_data"].append({ "id": instance_id, "logs": self._extract_instance_log_items(result), "process_time": result["duration"] @@ -108,9 +103,7 @@ class PublishReportMaker: """Add result of single action.""" plugin = result["plugin"] - store_item = self._plugin_data_by_id.get(plugin.id) - if store_item is None: - store_item = self._add_plugin_data_item(plugin) + store_item = self._plugin_data_by_id[plugin.id] action_name = action.__name__ action_label = action.label or action_name @@ -122,15 +115,16 @@ class PublishReportMaker: "logs": log_items }) - def get_report(self, publish_plugins=None): + def get_report(self, publish_context): """Report data with all details of current state.""" now = arrow.utcnow().to("local") - instances_details = {} - for instance in self._all_instances_by_id.values(): - instances_details[instance.id] = self._extract_instance_data( - instance, instance in self._current_context + instances_details = { + instance.id: self._extract_instance_data( + instance, instance in publish_context ) + for instance in self._all_instances_by_id.values() + } plugins_data_by_id = copy.deepcopy( self._plugin_data_by_id @@ -138,19 +132,13 @@ class PublishReportMaker: # Ensure the current plug-in is marked as `passed` in the result # so that it shows on reports for paused publishes - if self._current_plugin is not None: + if self._current_plugin_id is not None: current_plugin_data = plugins_data_by_id.get( - self._current_plugin.id + self._current_plugin_id ) if current_plugin_data and not current_plugin_data["passed"]: current_plugin_data["passed"] = True - if publish_plugins: - for plugin in publish_plugins: - if plugin.id not in plugins_data_by_id: - plugins_data_by_id[plugin.id] = \ - self._create_plugin_data_item(plugin) - reports = [] if self._create_discover_result is not None: reports.append(self._create_discover_result) @@ -172,13 +160,42 @@ class PublishReportMaker: return { "plugins_data": list(plugins_data_by_id.values()), "instances": instances_details, - "context": self._extract_context_data(self._current_context), + "context": self._extract_context_data(publish_context), "crashed_file_paths": crashed_file_paths, "id": uuid.uuid4().hex, "created_at": now.isoformat(), "report_version": "1.0.1", } + def _add_plugin_data_item(self, plugin): + if plugin.id in self._plugin_data_by_id: + # A plugin would be processed more than once. What can cause it: + # - there is a bug in controller + # - plugin class is imported into multiple files + # - this can happen even with base classes from 'pyblish' + raise ValueError( + "Plugin '{}' is already stored".format(str(plugin))) + + plugin_data_item = self._create_plugin_data_item(plugin) + self._plugin_data_by_id[plugin.id] = plugin_data_item + + def _create_plugin_data_item(self, plugin): + label = None + if hasattr(plugin, "label"): + label = plugin.label + + return { + "id": plugin.id, + "name": plugin.__name__, + "label": label, + "order": plugin.order, + "targets": list(plugin.targets), + "instances_data": [], + "actions_data": [], + "skipped": False, + "passed": False + } + def _extract_context_data(self, context): context_label = "Context" if context is not None: @@ -674,3 +691,500 @@ class PublishValidationErrors: plugin_id ) self._plugin_action_items[plugin_id] = plugin_actions + + +def collect_families_from_instances(instances, only_active=False): + """Collect all families for passed publish instances. + + Args: + instances(list): List of publish instances from + which are families collected. + only_active(bool): Return families only for active instances. + + Returns: + list[str]: Families available on instances. + """ + + all_families = set() + for instance in instances: + if only_active: + if instance.data.get("publish") is False: + continue + family = instance.data.get("family") + if family: + all_families.add(family) + + families = instance.data.get("families") or tuple() + for family in families: + all_families.add(family) + + return list(all_families) + + +class PublishModel: + def __init__(self, controller): + self._controller = controller + + # Publishing should stop at validation stage + self._publish_up_validation = False + self._publish_comment_is_set = False + + # Any other exception that happened during publishing + self._publish_error_msg = None + # Publishing is in progress + self._publish_is_running = False + # Publishing is over validation order + self._publish_has_validated = False + + self._publish_has_validation_errors = False + self._publish_has_crashed = False + # All publish plugins are processed + self._publish_has_started = False + self._publish_has_finished = False + self._publish_max_progress = 0 + self._publish_progress = 0 + + self._publish_plugins = [] + self._publish_plugins_proxy = None + + # pyblish.api.Context + self._publish_context = None + # Pyblish report + self._publish_report = PublishReportMaker() + # Store exceptions of validation error + self._publish_validation_errors = PublishValidationErrors() + + # This information is not much important for controller but for widget + # which can change (and set) the comment. + self._publish_comment_is_set = False + + # Validation order + # - plugin with order same or higher than this value is extractor or + # higher + self._validation_order = ( + pyblish.api.ValidatorOrder + PLUGIN_ORDER_OFFSET + ) + + # Plugin iterator + self._main_thread_iter = None + + def reset(self, create_context): + self._publish_up_validation = False + self._publish_comment_is_set = False + self._publish_has_started = False + + self._set_publish_error_msg(None) + self._set_progress(0) + self._set_is_running(False) + self._set_has_validated(False) + self._set_is_crashed(False) + self._set_has_validation_errors(False) + self._set_finished(False) + + self._main_thread_iter = self._publish_iterator() + self._publish_context = pyblish.api.Context() + # Make sure "comment" is set on publish context + self._publish_context.data["comment"] = "" + # Add access to create context during publishing + # - must not be used for changing CreatedInstances during publishing! + # QUESTION + # - pop the key after first collector using it would be safest option? + self._publish_context.data["create_context"] = create_context + publish_plugins = create_context.publish_plugins + self._publish_plugins = publish_plugins + self._publish_plugins_proxy = PublishPluginsProxy( + publish_plugins + ) + + self._publish_report.reset( + create_context.creator_discover_result, + create_context.convertor_discover_result, + create_context.publish_discover_result, + ) + for plugin in create_context.publish_plugins_mismatch_targets: + self._publish_report.set_plugin_skipped(plugin.id) + self._publish_validation_errors.reset(self._publish_plugins_proxy) + + self._set_max_progress(len(publish_plugins)) + + self._emit_event("publish.reset.finished") + + def set_publish_up_validation(self, value): + self._publish_up_validation = value + + def start_publish(self, wait=True): + """Run publishing. + + Make sure all changes are saved before method is called (Call + 'save_changes' and check output). + """ + if self._publish_up_validation and self._publish_has_validated: + return + + self._start_publish() + + if not wait: + return + + while self.is_running(): + func = self.get_next_process_func() + func() + + def get_next_process_func(self): + # Validations of progress before using iterator + # - same conditions may be inside iterator but they may be used + # only in specific cases (e.g. when it happens for a first time) + + # There are validation errors and validation is passed + # - can't do any progree + if ( + self._publish_has_validated + and self._publish_has_validation_errors + ): + item = partial(self.stop_publish) + + # Any unexpected error happened + # - everything should stop + elif self._publish_has_crashed: + item = partial(self.stop_publish) + + # Everything is ok so try to get new processing item + else: + item = next(self._main_thread_iter) + + return item + + def stop_publish(self): + if self._publish_is_running: + self._stop_publish() + + def is_running(self): + return self._publish_is_running + + def is_crashed(self): + return self._publish_has_crashed + + def has_started(self): + return self._publish_has_started + + def has_finished(self): + return self._publish_has_finished + + def has_validated(self): + return self._publish_has_validated + + def has_validation_errors(self): + return self._publish_has_validation_errors + + def get_progress(self): + return self._publish_progress + + def get_max_progress(self): + return self._publish_max_progress + + def get_publish_report(self): + return self._publish_report.get_report( + self._publish_context + ) + + def get_validation_errors(self): + return self._publish_validation_errors.create_report() + + def get_error_msg(self): + return self._publish_error_msg + + def set_comment(self, comment): + # Ignore change of comment when publishing started + if self._publish_has_started: + return + self._publish_context.data["comment"] = comment + self._publish_comment_is_set = True + + def run_action(self, plugin_id, action_id): + # TODO handle result in UI + plugin = self._publish_plugins_proxy.get_plugin(plugin_id) + action = self._publish_plugins_proxy.get_action(plugin_id, action_id) + + result = pyblish.plugin.process( + plugin, self._publish_context, None, action.id + ) + exception = result.get("error") + if exception: + self._emit_event( + "publish.action.failed", + { + "title": "Action failed", + "message": "Action failed.", + "traceback": "".join( + traceback.format_exception( + type(exception), + exception, + exception.__traceback__ + ) + ), + "label": action.__name__, + "identifier": action.id + } + ) + + self._publish_report.add_action_result(action, result) + + self._controller.emit_card_message("Action finished.") + + def _emit_event(self, topic, data=None): + self._controller.emit_event(topic, data, PUBLISH_EVENT_SOURCE) + + def _set_finished(self, value): + if self._publish_has_finished != value: + self._publish_has_finished = value + self._emit_event( + "publish.finished.changed", + {"value": value} + ) + + def _set_is_running(self, value): + if self._publish_is_running != value: + self._publish_is_running = value + self._emit_event( + "publish.is_running.changed", + {"value": value} + ) + + def _set_has_validated(self, value): + if self._publish_has_validated != value: + self._publish_has_validated = value + self._emit_event( + "publish.has_validated.changed", + {"value": value} + ) + + def _set_is_crashed(self, value): + if self._publish_has_crashed != value: + self._publish_has_crashed = value + self._emit_event( + "publish.has_crashed.changed", + {"value": value} + ) + + def _set_has_validation_errors(self, value): + if self._publish_has_validation_errors != value: + self._publish_has_validation_errors = value + self._emit_event( + "publish.has_validation_errors.changed", + {"value": value} + ) + + def _set_max_progress(self, value): + if self._publish_max_progress != value: + self._publish_max_progress = value + self._emit_event( + "publish.max_progress.changed", + {"value": value} + ) + + def _set_progress(self, value): + if self._publish_progress != value: + self._publish_progress = value + self._emit_event( + "publish.progress.changed", + {"value": value} + ) + + def _set_publish_error_msg(self, value): + if self._publish_error_msg != value: + self._publish_error_msg = value + self._emit_event( + "publish.publish_error.changed", + {"value": value} + ) + + def _start_publish(self): + """Start or continue in publishing.""" + if self._publish_is_running: + return + + self._set_is_running(True) + self._publish_has_started = True + + self._emit_event("publish.process.started") + + def _stop_publish(self): + """Stop or pause publishing.""" + self._set_is_running(False) + + self._emit_event("publish.process.stopped") + + def _publish_iterator(self): + """Main logic center of publishing. + + Iterator returns `partial` objects with callbacks that should be + processed in main thread (threaded in future?). Cares about changing + states of currently processed publish plugin and instance. Also + change state of processed orders like validation order has passed etc. + + Also stops publishing, if should stop on validation. + """ + + for idx, plugin in enumerate(self._publish_plugins): + self._publish_progress = idx + + # Check if plugin is over validation order + if not self._publish_has_validated: + self._set_has_validated( + plugin.order >= self._validation_order + ) + + # Stop if plugin is over validation order and process + # should process up to validation. + if self._publish_up_validation and self._publish_has_validated: + yield partial(self.stop_publish) + + # Stop if validation is over and validation errors happened + if ( + self._publish_has_validated + and self.has_validation_errors() + ): + yield partial(self.stop_publish) + + # Add plugin to publish report + self._publish_report.add_plugin_iter( + plugin.id, self._publish_context) + + # WARNING This is hack fix for optional plugins + if not self._is_publish_plugin_active(plugin): + self._publish_report.set_plugin_skipped(plugin.id) + continue + + # Trigger callback that new plugin is going to be processed + plugin_label = plugin.__name__ + if hasattr(plugin, "label") and plugin.label: + plugin_label = plugin.label + self._emit_event( + "publish.process.plugin.changed", + {"plugin_label": plugin_label} + ) + + # Plugin is instance plugin + if plugin.__instanceEnabled__: + instances = pyblish.logic.instances_by_plugin( + self._publish_context, plugin + ) + if not instances: + self._publish_report.set_plugin_skipped(plugin.id) + continue + + for instance in instances: + if instance.data.get("publish") is False: + continue + + instance_label = ( + instance.data.get("label") + or instance.data["name"] + ) + self._emit_event( + "publish.process.instance.changed", + {"instance_label": instance_label} + ) + + yield partial( + self._process_and_continue, plugin, instance + ) + else: + families = collect_families_from_instances( + self._publish_context, only_active=True + ) + plugins = pyblish.logic.plugins_by_families( + [plugin], families + ) + if not plugins: + self._publish_report.set_plugin_skipped(plugin.id) + continue + + instance_label = ( + self._publish_context.data.get("label") + or self._publish_context.data.get("name") + or "Context" + ) + self._emit_event( + "publish.process.instance.changed", + {"instance_label": instance_label} + ) + yield partial( + self._process_and_continue, plugin, None + ) + + self._publish_report.set_plugin_passed(plugin.id) + + # Cleanup of publishing process + self._set_finished(True) + self._set_progress(self._publish_max_progress) + yield partial(self.stop_publish) + + def _process_and_continue(self, plugin, instance): + result = pyblish.plugin.process( + plugin, self._publish_context, instance + ) + + exception = result.get("error") + if exception: + has_validation_error = False + if ( + isinstance(exception, PublishValidationError) + and not self._publish_has_validated + ): + has_validation_error = True + self._add_validation_error(result) + + else: + if isinstance(exception, KnownPublishError): + msg = str(exception) + else: + msg = ( + "Something went wrong. Send report" + " to your supervisor or Ynput team." + ) + self._set_publish_error_msg(msg) + self._set_is_crashed(True) + + result["is_validation_error"] = has_validation_error + + self._publish_report.add_result(plugin.id, result) + + def _add_validation_error(self, result): + self._set_has_validation_errors(True) + self._publish_validation_errors.add_error( + result["plugin"], + result["error"], + result["instance"] + ) + + def _is_publish_plugin_active(self, plugin): + """Decide if publish plugin is active. + + This is hack because 'active' is mis-used in mixin + 'OptionalPyblishPluginMixin' where 'active' is used for default value + of optional plugins. Because of that is 'active' state of plugin + which inherit from 'OptionalPyblishPluginMixin' ignored. That affects + headless publishing inside host, potentially remote publishing. + + We have to change that to match pyblish base, but we can do that + only when all hosts use Publisher because the change requires + change of settings schemas. + + Args: + plugin (pyblish.Plugin): Plugin which should be checked if is + active. + + Returns: + bool: Is plugin active. + """ + + if plugin.active: + return True + + if not plugin.optional: + return False + + if OptionalPyblishPluginMixin in inspect.getmro(plugin): + return True + return False