Merge branch 'develop' into feature/895-add-option-to-define-paht-to-workfile-template

This commit is contained in:
Milan Kolar 2021-06-01 20:07:39 +02:00
commit 37c7dff418
14 changed files with 630 additions and 49 deletions

View file

@ -1,4 +1,5 @@
import os
import subprocess
from openpype.lib import (
PreLaunchHook,
@ -17,6 +18,8 @@ class NonPythonHostHook(PreLaunchHook):
"""
app_groups = ["harmony", "photoshop", "aftereffects"]
order = 20
def execute(self):
# Pop executable
executable_path = self.launch_context.launch_args.pop(0)
@ -45,3 +48,6 @@ class NonPythonHostHook(PreLaunchHook):
if remainders:
self.launch_context.launch_args.extend(remainders)
self.launch_context.kwargs["stdout"] = subprocess.DEVNULL
self.launch_context.kwargs["stderr"] = subprocess.STDOUT

View file

@ -11,12 +11,14 @@ class LaunchWithWindowsShell(PreLaunchHook):
instead.
"""
# Should be as last hook becuase must change launch arguments to string
# Should be as last hook because must change launch arguments to string
order = 1000
app_groups = ["nuke", "nukex", "hiero", "nukestudio"]
platforms = ["windows"]
def execute(self):
launch_args = self.launch_context.clear_launch_args(
self.launch_context.launch_args)
new_args = [
# Get comspec which is cmd.exe in most cases.
os.environ.get("COMSPEC", "cmd.exe"),
@ -24,7 +26,7 @@ class LaunchWithWindowsShell(PreLaunchHook):
"/c",
# Convert arguments to command line arguments (as string)
"\"{}\"".format(
subprocess.list2cmdline(self.launch_context.launch_args)
subprocess.list2cmdline(launch_args)
)
]
# Convert list to string

View file

@ -43,7 +43,7 @@ def sizeof_fmt(num, suffix='B'):
return "%.1f%s%s" % (num, 'Yi', suffix)
def path_from_represenation(representation, anatomy):
def path_from_representation(representation, anatomy):
from avalon import pipeline # safer importing
try:
@ -126,18 +126,22 @@ def check_destination_path(repre_id,
anatomy_filled = anatomy.format_all(anatomy_data)
dest_path = anatomy_filled["delivery"][template_name]
report_items = collections.defaultdict(list)
sub_msg = None
if not dest_path.solved:
msg = (
"Missing keys in Representation's context"
" for anatomy template \"{}\"."
).format(template_name)
sub_msg = (
"Representation: {}<br>"
).format(repre_id)
if dest_path.missing_keys:
keys = ", ".join(dest_path.missing_keys)
sub_msg = (
"Representation: {}<br>- Missing keys: \"{}\"<br>"
).format(repre_id, keys)
sub_msg += (
"- Missing keys: \"{}\"<br>"
).format(keys)
if dest_path.invalid_types:
items = []
@ -145,10 +149,9 @@ def check_destination_path(repre_id,
items.append("\"{}\" {}".format(key, str(value)))
keys = ", ".join(items)
sub_msg = (
"Representation: {}<br>"
sub_msg += (
"- Invalid value DataType: \"{}\"<br>"
).format(repre_id, keys)
).format(keys)
report_items[msg].append(sub_msg)

View file

@ -139,6 +139,25 @@ class ITrayModule:
"""
pass
def execute_in_main_thread(self, callback):
""" Pushes callback to the queue or process 'callback' on a main thread
Some callbacks need to be processed on main thread (menu actions
must be added on main thread or they won't get triggered etc.)
"""
# called without initialized tray, still main thread needed
if not self.tray_initialized:
try:
callback = self._main_thread_callbacks.popleft()
callback()
except:
self.log.warning(
"Failed to execute {} in main thread".format(callback),
exc_info=True)
return
self.manager.tray_manager.execute_in_main_thread(callback)
def show_tray_message(self, title, message, icon=None, msecs=None):
"""Show tray message.

View file

@ -9,7 +9,7 @@ from openpype.api import Anatomy, config
from openpype.modules.ftrack.lib import BaseAction, statics_icon
from openpype.modules.ftrack.lib.avalon_sync import CUST_ATTR_ID_KEY
from openpype.lib.delivery import (
path_from_represenation,
path_from_representation,
get_format_dict,
check_destination_path,
process_single_file,
@ -74,7 +74,7 @@ class Delivery(BaseAction):
"value": project_name
})
# Prpeare anatomy data
# Prepare anatomy data
anatomy = Anatomy(project_name)
new_anatomies = []
first = None
@ -368,12 +368,18 @@ class Delivery(BaseAction):
def launch(self, session, entities, event):
if "values" not in event["data"]:
return
return {
"success": True,
"message": "Nothing to do"
}
values = event["data"]["values"]
skipped = values.pop("__skipped__")
if skipped:
return None
return {
"success": False,
"message": "Action skipped"
}
user_id = event["source"]["user"]["id"]
user_entity = session.query(
@ -391,27 +397,45 @@ class Delivery(BaseAction):
try:
self.db_con.install()
self.real_launch(session, entities, event)
job["status"] = "done"
report = self.real_launch(session, entities, event)
except Exception:
except Exception as exc:
report = {
"success": False,
"title": "Delivery failed",
"items": [{
"type": "label",
"value": (
"Error during delivery action process:<br>{}"
"<br><br>Check logs for more information."
).format(str(exc))
}]
}
self.log.warning(
"Failed during processing delivery action.",
exc_info=True
)
finally:
if job["status"] != "done":
if report["success"]:
job["status"] = "done"
else:
job["status"] = "failed"
session.commit()
self.db_con.uninstall()
if job["status"] == "failed":
if not report["success"]:
self.show_interface(
items=report["items"],
title=report["title"],
event=event
)
return {
"success": False,
"message": "Delivery failed. Check logs for more information."
"message": "Errors during delivery process. See report."
}
return True
return report
def real_launch(self, session, entities, event):
self.log.info("Delivery action just started.")
@ -431,7 +455,7 @@ class Delivery(BaseAction):
if not repre_names:
return {
"success": True,
"message": "Not selected components to deliver."
"message": "No selected components to deliver."
}
location_path = location_path.strip()
@ -479,7 +503,7 @@ class Delivery(BaseAction):
if frame:
repre["context"]["frame"] = len(str(frame)) * "#"
repre_path = path_from_represenation(repre, anatomy)
repre_path = path_from_representation(repre, anatomy)
# TODO add backup solution where root of path from component
# is replaced with root
args = (
@ -502,7 +526,7 @@ class Delivery(BaseAction):
def report(self, report_items):
"""Returns dict with final status of delivery (succes, fail etc.)."""
items = []
title = "Delivery report"
for msg, _items in report_items.items():
if not _items:
continue
@ -533,9 +557,8 @@ class Delivery(BaseAction):
return {
"items": items,
"title": title,
"success": False,
"message": "Delivery Finished"
"title": "Delivery report",
"success": False
}

View file

@ -0,0 +1,151 @@
import aiohttp
from aiohttp import web
import json
import logging
from concurrent.futures import CancelledError
from Qt import QtWidgets
from openpype.modules import ITrayService
from openpype.tools.tray_app.app import ConsoleDialog
log = logging.getLogger(__name__)
class IconType:
IDLE = "idle"
RUNNING = "running"
FAILED = "failed"
class MsgAction:
CONNECTING = "connecting"
INITIALIZED = "initialized"
ADD = "add"
CLOSE = "close"
class HostListener:
def __init__(self, webserver, module):
self._window_per_id = {}
self.module = module
self.webserver = webserver
self._window_per_id = {} # dialogs per host name
self._action_per_id = {} # QAction per host name
webserver.add_route('*', "/ws/host_listener", self.websocket_handler)
def _host_is_connecting(self, host_name, label):
""" Initialize dialog, adds to submenu. """
services_submenu = self.module._services_submenu
action = QtWidgets.QAction(label, services_submenu)
action.triggered.connect(lambda: self.show_widget(host_name))
services_submenu.addAction(action)
self._action_per_id[host_name] = action
self._set_host_icon(host_name, IconType.IDLE)
widget = ConsoleDialog("")
self._window_per_id[host_name] = widget
def _set_host_icon(self, host_name, icon_type):
"""Assigns icon to action for 'host_name' with 'icon_type'.
Action must exist in self._action_per_id
Args:
host_name (str)
icon_type (IconType)
"""
action = self._action_per_id.get(host_name)
if not action:
raise ValueError("Unknown host {}".format(host_name))
icon = None
if icon_type == IconType.IDLE:
icon = ITrayService.get_icon_idle()
elif icon_type == IconType.RUNNING:
icon = ITrayService.get_icon_running()
elif icon_type == IconType.FAILED:
icon = ITrayService.get_icon_failed()
else:
log.info("Unknown icon type {} for {}".format(icon_type,
host_name))
action.setIcon(icon)
def show_widget(self, host_name):
"""Shows prepared widget for 'host_name'.
Dialog get initialized when 'host_name' is connecting.
"""
self.module.execute_in_main_thread(
lambda: self._show_widget(host_name))
def _show_widget(self, host_name):
widget = self._window_per_id[host_name]
widget.show()
widget.raise_()
widget.activateWindow()
async def websocket_handler(self, request):
ws = web.WebSocketResponse()
await ws.prepare(request)
widget = None
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
host_name, action, text = self._parse_message(msg)
if action == MsgAction.CONNECTING:
self._action_per_id[host_name] = None
# must be sent to main thread, or action wont trigger
self.module.execute_in_main_thread(
lambda: self._host_is_connecting(host_name, text))
elif action == MsgAction.CLOSE:
# clean close
self._close(host_name)
await ws.close()
elif action == MsgAction.INITIALIZED:
self.module.execute_in_main_thread(
# must be queued as _host_is_connecting might not
# be triggered/finished yet
lambda: self._set_host_icon(host_name,
IconType.RUNNING))
elif action == MsgAction.ADD:
self.module.execute_in_main_thread(
lambda: self._add_text(host_name, text))
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())
host_name, _, _ = self._parse_message(msg)
self._set_host_icon(host_name, IconType.FAILED)
except CancelledError: # recoverable
pass
except Exception as exc:
log.warning("Exception during communication", exc_info=True)
if widget:
error_msg = str(exc)
widget.append_text(error_msg)
return ws
def _add_text(self, host_name, text):
widget = self._window_per_id[host_name]
widget.append_text(text)
def _close(self, host_name):
""" Clean close - remove from menu, delete widget."""
services_submenu = self.module._services_submenu
action = self._action_per_id.pop(host_name)
services_submenu.removeAction(action)
widget = self._window_per_id.pop(host_name)
if widget.isVisible():
widget.hide()
widget.deleteLater()
def _parse_message(self, msg):
data = json.loads(msg.data)
action = data.get("action")
host_name = data["host"]
value = data.get("text")
return host_name, action, value

View file

@ -7,6 +7,8 @@ import six
from openpype import resources
from .. import PypeModule, ITrayService
from openpype.modules.webserver.host_console_listener import HostListener
@six.add_metaclass(ABCMeta)
class IWebServerRoutes:
@ -23,6 +25,7 @@ class WebServerModule(PypeModule, ITrayService):
def initialize(self, _module_settings):
self.enabled = True
self.server_manager = None
self._host_listener = None
self.port = self.find_free_port()
@ -37,6 +40,7 @@ class WebServerModule(PypeModule, ITrayService):
def tray_init(self):
self.create_server_manager()
self._add_resources_statics()
self._add_listeners()
def tray_start(self):
self.start_server()
@ -54,6 +58,9 @@ class WebServerModule(PypeModule, ITrayService):
webserver_url, static_prefix
)
def _add_listeners(self):
self._host_listener = HostListener(self.server_manager, self)
def start_server(self):
if self.server_manager:
self.server_manager.start_server()

View file

@ -11,7 +11,7 @@ from openpype import resources
from openpype.lib.delivery import (
sizeof_fmt,
path_from_represenation,
path_from_representation,
get_format_dict,
check_destination_path,
process_single_file,
@ -170,7 +170,7 @@ class DeliveryOptionsDialog(QtWidgets.QDialog):
if repre["name"] not in selected_repres:
continue
repre_path = path_from_represenation(repre, self.anatomy)
repre_path = path_from_representation(repre, self.anatomy)
anatomy_data = copy.deepcopy(repre["context"])
new_report_items = check_destination_path(str(repre["_id"]),

View file

@ -81,11 +81,11 @@ def main(argv):
host_name = os.environ["AVALON_APP"].lower()
if host_name == "photoshop":
from avalon.photoshop.lib import launch
from avalon.photoshop.lib import main
elif host_name == "aftereffects":
from avalon.aftereffects.lib import launch
from avalon.aftereffects.lib import main
elif host_name == "harmony":
from avalon.harmony.lib import launch
from avalon.harmony.lib import main
else:
title = "Unknown host name"
message = (
@ -97,7 +97,7 @@ def main(argv):
if launch_args:
# Launch host implementation
launch(*launch_args)
main(*launch_args)
else:
# Show message box
on_invalid_args(after_script_idx is None)

View file

@ -76,27 +76,33 @@ def _fill_schema_template_data(
if key not in template_data:
template_data[key] = value
# Store paths by first part if path
# - None value says that whole key should be skipped
skip_paths_by_first_key = {}
for path in skip_paths:
parts = path.split("/")
key = parts.pop(0)
if key not in skip_paths_by_first_key:
skip_paths_by_first_key[key] = []
value = "/".join(parts)
skip_paths_by_first_key[key].append(value or None)
if not template:
output = template
elif isinstance(template, list):
# Store paths by first part if path
# - None value says that whole key should be skipped
skip_paths_by_first_key = {}
for path in skip_paths:
parts = path.split("/")
key = parts.pop(0)
if key not in skip_paths_by_first_key:
skip_paths_by_first_key[key] = []
value = "/".join(parts)
skip_paths_by_first_key[key].append(value or None)
output = []
for item in template:
# Get skip paths for children item
_skip_paths = []
if skip_paths_by_first_key and isinstance(item, dict):
if not isinstance(item, dict):
pass
elif item.get("type") in WRAPPER_TYPES:
_skip_paths = copy.deepcopy(skip_paths)
elif skip_paths_by_first_key:
# Check if this item should be skipped
key = item.get("key")
if key and key in skip_paths_by_first_key:
@ -108,7 +114,8 @@ def _fill_schema_template_data(
output_item = _fill_schema_template_data(
item, template_data, _skip_paths, required_keys, missing_keys
)
output.append(output_item)
if output_item:
output.append(output_item)
elif isinstance(template, dict):
output = {}
@ -116,6 +123,8 @@ def _fill_schema_template_data(
output[key] = _fill_schema_template_data(
value, template_data, skip_paths, required_keys, missing_keys
)
if output.get("type") in WRAPPER_TYPES and not output.get("children"):
return {}
elif isinstance(template, STRING_TYPE):
# TODO find much better way how to handle filling template data

View file

@ -1,3 +1,4 @@
import collections
import os
import sys
import atexit
@ -23,7 +24,6 @@ class TrayManager:
def __init__(self, tray_widget, main_window):
self.tray_widget = tray_widget
self.main_window = main_window
self.pype_info_widget = None
self.log = Logger.get_logger(self.__class__.__name__)
@ -34,6 +34,28 @@ class TrayManager:
self.errors = []
self.main_thread_timer = None
self._main_thread_callbacks = collections.deque()
self._execution_in_progress = None
def execute_in_main_thread(self, callback):
self._main_thread_callbacks.append(callback)
def _main_thread_execution(self):
if self._execution_in_progress:
return
self._execution_in_progress = True
while self._main_thread_callbacks:
try:
callback = self._main_thread_callbacks.popleft()
callback()
except:
self.log.warning(
"Failed to execute {} in main thread".format(callback),
exc_info=True)
self._execution_in_progress = False
def initialize_modules(self):
"""Add modules to tray."""
@ -59,6 +81,14 @@ class TrayManager:
# Print time report
self.modules_manager.print_report()
# create timer loop to check callback functions
main_thread_timer = QtCore.QTimer()
main_thread_timer.setInterval(300)
main_thread_timer.timeout.connect(self._main_thread_execution)
main_thread_timer.start()
self.main_thread_timer = main_thread_timer
def show_tray_message(self, title, message, icon=None, msecs=None):
"""Show tray message.

View file

View file

@ -0,0 +1,331 @@
import os
import sys
import re
import collections
import queue
import websocket
import json
import itertools
from datetime import datetime
from avalon import style
from openpype.modules.webserver import host_console_listener
from Qt import QtWidgets, QtCore
class ConsoleTrayApp:
"""
Application showing console in Services tray for non python hosts
instead of cmd window.
"""
callback_queue = None
process = None
webserver_client = None
MAX_LINES = 10000
sdict = {
r">>> ":
'<span style="font-weight: bold;color:#EE5C42"> >>> </span>',
r"!!!(?!\sCRI|\sERR)":
'<span style="font-weight: bold;color:red"> !!! </span>',
r"\-\-\- ":
'<span style="font-weight: bold;color:cyan"> --- </span>',
r"\*\*\*(?!\sWRN)":
'<span style="font-weight: bold;color:#FFD700"> *** </span>',
r"\*\*\* WRN":
'<span style="font-weight: bold;color:#FFD700"> *** WRN</span>',
r" \- ":
'<span style="font-weight: bold;color:#FFD700"> - </span>',
r"\[ ":
'<span style="font-weight: bold;color:#66CDAA">[</span>',
r"\]":
'<span style="font-weight: bold;color:#66CDAA">]</span>',
r"{":
'<span style="color:#66CDAA">{',
r"}":
r"}</span>",
r"\(":
'<span style="color:#66CDAA">(',
r"\)":
r")</span>",
r"^\.\.\. ":
'<span style="font-weight: bold;color:#EE5C42"> ... </span>',
r"!!! ERR: ":
'<span style="font-weight: bold;color:#EE5C42"> !!! ERR: </span>',
r"!!! CRI: ":
'<span style="font-weight: bold;color:red"> !!! CRI: </span>',
r"(?i)failed":
'<span style="font-weight: bold;color:#EE5C42"> FAILED </span>',
r"(?i)error":
'<span style="font-weight: bold;color:#EE5C42"> ERROR </span>'
}
def __init__(self, host, launch_method, subprocess_args, is_host_connected,
parent=None):
self.host = host
self.initialized = False
self.websocket_server = None
self.initializing = False
self.tray = False
self.launch_method = launch_method
self.subprocess_args = subprocess_args
self.is_host_connected = is_host_connected
self.tray_reconnect = True
self.original_stdout_write = None
self.original_stderr_write = None
self.new_text = collections.deque()
timer = QtCore.QTimer()
timer.timeout.connect(self.on_timer)
timer.setInterval(200)
timer.start()
self.timer = timer
self.catch_std_outputs()
date_str = datetime.now().strftime("%d%m%Y%H%M%S")
self.host_id = "{}_{}".format(self.host, date_str)
def _connect(self):
""" Connect to Tray webserver to pass console output. """
ws = websocket.WebSocket()
webserver_url = os.environ.get("OPENPYPE_WEBSERVER_URL")
if not webserver_url:
print("Unknown webserver url, cannot connect to pass log")
self.tray_reconnect = False
return
webserver_url = webserver_url.replace("http", "ws")
ws.connect("{}/ws/host_listener".format(webserver_url))
ConsoleTrayApp.webserver_client = ws
payload = {
"host": self.host_id,
"action": host_console_listener.MsgAction.CONNECTING,
"text": "Integration with {}".format(str.capitalize(self.host))
}
self.tray_reconnect = False
self._send(payload)
def _connected(self):
""" Send to Tray console that host is ready - icon change. """
print("Host {} connected".format(self.host))
if not ConsoleTrayApp.webserver_client:
return
payload = {
"host": self.host_id,
"action": host_console_listener.MsgAction.INITIALIZED,
"text": "Integration with {}".format(str.capitalize(self.host))
}
self.tray_reconnect = False
self._send(payload)
def _close(self):
""" Send to Tray that host is closing - remove from Services. """
print("Host {} closing".format(self.host))
if not ConsoleTrayApp.webserver_client:
return
payload = {
"host": self.host_id,
"action": host_console_listener.MsgAction.CLOSE,
"text": "Integration with {}".format(str.capitalize(self.host))
}
self._send(payload)
self.tray_reconnect = False
ConsoleTrayApp.webserver_client.close()
def _send_text(self, new_text):
""" Send console content. """
if not ConsoleTrayApp.webserver_client:
return
if isinstance(new_text, str):
new_text = collections.deque(new_text.split("\n"))
payload = {
"host": self.host_id,
"action": host_console_listener.MsgAction.ADD,
"text": "\n".join(new_text)
}
self._send(payload)
def _send(self, payload):
""" Worker method to send to existing websocket connection. """
if not ConsoleTrayApp.webserver_client:
return
try:
ConsoleTrayApp.webserver_client.send(json.dumps(payload))
except ConnectionResetError: # Tray closed
ConsoleTrayApp.webserver_client = None
self.tray_reconnect = True
def on_timer(self):
"""Called periodically to initialize and run function on main thread"""
if self.tray_reconnect:
self._connect() # reconnect
if ConsoleTrayApp.webserver_client and self.new_text:
self._send_text(self.new_text)
self.new_text = collections.deque()
if self.new_text: # no webserver_client, text keeps stashing
start = max(len(self.new_text) - self.MAX_LINES, 0)
self.new_text = itertools.islice(self.new_text,
start, self.MAX_LINES)
if not self.initialized:
if self.initializing:
host_connected = self.is_host_connected()
if host_connected is None: # keep trying
return
elif not host_connected:
text = "{} process is not alive. Exiting".format(self.host)
print(text)
self._send_text([text])
ConsoleTrayApp.websocket_server.stop()
sys.exit(1)
elif host_connected:
self.initialized = True
self.initializing = False
self._connected()
return
ConsoleTrayApp.callback_queue = queue.Queue()
self.initializing = True
self.launch_method(*self.subprocess_args)
elif ConsoleTrayApp.process.poll() is not None:
self.exit()
elif ConsoleTrayApp.callback_queue:
try:
callback = ConsoleTrayApp.callback_queue.get(block=False)
callback()
except queue.Empty:
pass
@classmethod
def execute_in_main_thread(cls, func_to_call_from_main_thread):
"""Put function to the queue to be picked by 'on_timer'"""
if not cls.callback_queue:
cls.callback_queue = queue.Queue()
cls.callback_queue.put(func_to_call_from_main_thread)
@classmethod
def restart_server(cls):
if ConsoleTrayApp.websocket_server:
ConsoleTrayApp.websocket_server.stop_server(restart=True)
# obsolete
def exit(self):
""" Exit whole application. """
self._close()
if ConsoleTrayApp.websocket_server:
ConsoleTrayApp.websocket_server.stop()
ConsoleTrayApp.process.kill()
ConsoleTrayApp.process.wait()
if self.timer:
self.timer.stop()
QtCore.QCoreApplication.exit()
def catch_std_outputs(self):
"""Redirects standard out and error to own functions"""
if not sys.stdout:
self.dialog.append_text("Cannot read from stdout!")
else:
self.original_stdout_write = sys.stdout.write
sys.stdout.write = self.my_stdout_write
if not sys.stderr:
self.dialog.append_text("Cannot read from stderr!")
else:
self.original_stderr_write = sys.stderr.write
sys.stderr.write = self.my_stderr_write
def my_stdout_write(self, text):
"""Appends outputted text to queue, keep writing to original stdout"""
if self.original_stdout_write is not None:
self.original_stdout_write(text)
self.new_text.append(text)
def my_stderr_write(self, text):
"""Appends outputted text to queue, keep writing to original stderr"""
if self.original_stderr_write is not None:
self.original_stderr_write(text)
self.new_text.append(text)
@staticmethod
def _multiple_replace(text, adict):
"""Replace multiple tokens defined in dict.
Find and replace all occurances of strings defined in dict is
supplied string.
Args:
text (str): string to be searched
adict (dict): dictionary with `{'search': 'replace'}`
Returns:
str: string with replaced tokens
"""
for r, v in adict.items():
text = re.sub(r, v, text)
return text
@staticmethod
def color(message):
""" Color message with html tags. """
message = ConsoleTrayApp._multiple_replace(message,
ConsoleTrayApp.sdict)
return message
class ConsoleDialog(QtWidgets.QDialog):
"""Qt dialog to show stdout instead of unwieldy cmd window"""
WIDTH = 720
HEIGHT = 450
MAX_LINES = 10000
def __init__(self, text, parent=None):
super(ConsoleDialog, self).__init__(parent)
layout = QtWidgets.QHBoxLayout(parent)
plain_text = QtWidgets.QPlainTextEdit(self)
plain_text.setReadOnly(True)
plain_text.resize(self.WIDTH, self.HEIGHT)
plain_text.maximumBlockCount = self.MAX_LINES
while text:
plain_text.appendPlainText(text.popleft().strip())
layout.addWidget(plain_text)
self.setWindowTitle("Console output")
self.plain_text = plain_text
self.setStyleSheet(style.load_stylesheet())
self.resize(self.WIDTH, self.HEIGHT)
def append_text(self, new_text):
if isinstance(new_text, str):
new_text = collections.deque(new_text.split("\n"))
while new_text:
text = new_text.popleft()
if text:
self.plain_text.appendHtml(
ConsoleTrayApp.color(text))

@ -1 +1 @@
Subproject commit 0d9a228fdb2eb08fe6caa30f25fe2a34fead1a03
Subproject commit e9882d0ffff27fed03a03459f496c29da0310cd2