From ae387d09778607ec56b12c2d9d75a9e74740786a Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 11:39:22 +0100 Subject: [PATCH 01/17] added subproces for status --- pype/ftrack/ftrack_server/sub_event_info.py | 411 ++++++++++++++++++++ 1 file changed, 411 insertions(+) create mode 100644 pype/ftrack/ftrack_server/sub_event_info.py diff --git a/pype/ftrack/ftrack_server/sub_event_info.py b/pype/ftrack/ftrack_server/sub_event_info.py new file mode 100644 index 0000000000..d63b6acadd --- /dev/null +++ b/pype/ftrack/ftrack_server/sub_event_info.py @@ -0,0 +1,411 @@ +import os +import sys +import copy +import signal +import socket +import uuid +from datetime import datetime + +import ftrack_api +from ftrack_server import FtrackServer +from pype.ftrack.ftrack_server.lib import ( + SocketSession, SocketBaseEventHub, + TOPIC_STATUS_SERVER, TOPIC_STATUS_SERVER_RESULT +) +from pypeapp import Logger + +log = Logger().get_logger("Event storer") +log.info(os.environ.get("FTRACK_EVENT_SUB_ID")) + + +class ObjectFactory: + session = None + sock = None + subprocess_id = os.environ["FTRACK_EVENT_SUB_ID"] + status_factory = None + + +def trigger_status_info(status_id=None, status=None): + if not status and not status_id: + log.warning( + "`status_id` or `status` must be specified to trigger action." + ) + return + + if not status: + status = ObjectFactory.status_factory[status_id] + + if not status: + return + + new_event_data = copy.deepcopy(action_data) + new_event_data.update({ + "selection": [] + }) + new_event_data["subprocess_id"] = ObjectFactory.subprocess_id + new_event_data["status_id"] = status.id + + new_event = ftrack_api.event.base.Event( + topic="ftrack.action.launch", + data=new_event_data, + source=status.source + ) + ObjectFactory.session.event_hub.publish(new_event) + + +action_identifier = ( + "event.server.status" + ObjectFactory.subprocess_id +) + +# TODO add IP adress to label +# TODO add icon +action_data = { + "label": "Pype Admin", + "variant": "Event server Status", + "description": "Get Infromation about event server", + "actionIdentifier": action_identifier, + "icon": None +} + + +class Status: + default_item = { + "type": "label", + "value": "Information not allowed." + } + note_item = { + "type": "label", + "value": "Hit `submit` to refresh data." + } + splitter_item = { + "type": "label", + "value": "---" + } + + def __init__(self, source_info, parent): + self.id = str(uuid.uuid1()) + self.created = datetime.now() + self.parent = parent + + self.source = source_info + + self.main_process = None + self.storer = None + self.processor = None + + def add_result(self, source, data): + if source.lower() == "storer": + self.storer = data + + elif source.lower() == "processor": + self.processor = data + + else: + self.main_process = data + + def filled(self): + # WARNING DEBUG PART!!!! + return True + return ( + self.main_process is not None and + self.storer is not None and + self.processor is not None + ) + + def get_items_from_dict(self, in_dict): + items = [] + for key, value in in_dict.items(): + items.append({ + "type": "label", + "value": "##{}".format(key) + }) + items.append({ + "type": "label", + "value": value + }) + return items + + def bool_items(self): + items = [] + name_labels = { + "shutdown_main": "Shutdown main process", + "reset_storer": "Reset storer", + "reset_processor": "Reset processor" + } + for name, label in name_labels.items(): + items.append({ + "type": "boolean", + "value": False, + "label": label, + "name": name + }) + return items + + def items(self): + items = [] + items.append(self.note_item) + + items.append({"type": "label", "value": "Main process"}) + if not self.main_process: + items.append(self.default_item) + else: + items.extend( + self.get_items_from_dict(self.main_process) + ) + + items.append(self.splitter_item) + items.append({"type": "label", "value": "Storer process"}) + if not self.storer: + items.append(self.default_item) + else: + items.extend( + self.get_items_from_dict(self.storer) + ) + + items.append(self.splitter_item) + items.append({"type": "label", "value": "Processor process"}) + if not self.processor: + items.append(self.default_item) + else: + items.extend( + self.get_items_from_dict(self.processor) + ) + + items.append(self.splitter_item) + items.extend(self.bool_items()) + + return items + + @property + def is_overtime(self): + time_delta = (datetime.now() - self.created).total_seconds() + return time_delta >= self.parent.max_delta_seconds + + +class StatusFactory: + max_delta_seconds = 30 + + def __init__(self): + self.statuses = {} + + def __getitem__(self, key): + return self.statuses.get(key) + + def create_status(self, source_info): + new_status = Status(source_info, self) + self.statuses[new_status.id] = new_status + return new_status + + def process_result(self, event): + subprocess_id = event["data"].get("subprocess_id") + if subprocess_id != ObjectFactory.subprocess_id: + return + + status_id = event["data"].get("status_id") + status = self.statuses[status_id] + if not status: + return + + source = event["data"]["source"] + data = event["data"]["status_info"] + + status.add_result(source, data) + if status.filled(): + trigger_status_info(status=status) + + +def server_activity_validate_user(event): + """Validate user permissions to show server info.""" + session = ObjectFactory.session + + username = event["source"].get("user", {}).get("username") + if not username: + return False + + user_ent = session.query( + "User where username = \"{}\"".format(username) + ).first() + if not user_ent: + return False + + role_list = ["Pypeclub", "Administrator"] + for role in user_ent["user_security_roles"]: + if role["security_role"]["name"] in role_list: + return True + return False + + +def server_activity_discover(event): + """Discover action in actions menu conditions.""" + session = ObjectFactory.session + if session is None: + return + + if not server_activity_validate_user(event): + return + + return {"items": [action_data]} + + +def handle_filled_event(event): + subprocess_id = event["data"].get("subprocess_id") + if subprocess_id != ObjectFactory.subprocess_id: + return None + + status_id = event["data"].get("status_id") + status = ObjectFactory.status_factory[status_id] + if not status: + return None + + values = event.get("values") + if values: + log.info(values) + + title = "Event server - Status" + + event_data = copy.deepcopy(event["data"]) + event_data.update({ + "type": "widget", + "items": status.items(), + "title": title + }) + + ObjectFactory.session.event_hub.publish( + ftrack_api.event.base.Event( + topic="ftrack.action.trigger-user-interface", + data=event_data + ), + on_error='ignore' + ) + + +def server_activity(event): + session = ObjectFactory.session + if session is None: + msg = "Session is not set. Can't trigger Reset action." + log.warning(msg) + return { + "success": False, + "message": msg + } + + valid = server_activity_validate_user(event) + if not valid: + return { + "success": False, + "message": "You don't have permissions to see Event server status!" + } + + subprocess_id = event["data"].get("subprocess_id") + if subprocess_id is not None: + return handle_filled_event(event) + + status = ObjectFactory.status_factory.create_status(event["source"]) + + event_data = { + "status_id": status.id, + "subprocess_id": ObjectFactory.subprocess_id + } + session.event_hub.publish( + ftrack_api.event.base.Event( + topic=TOPIC_STATUS_SERVER, + data=event_data + ), + on_error="ignore" + ) + + return { + "success": True, + "message": "Collecting information (this may take > 20s)" + } + + +def register(session): + '''Registers the event, subscribing the discover and launch topics.''' + session.event_hub.subscribe( + "topic=ftrack.action.discover", + server_activity_discover + ) + + status_launch_subscription = ( + "topic=ftrack.action.launch and data.actionIdentifier={}" + ).format(action_identifier) + + session.event_hub.subscribe( + status_launch_subscription, + server_activity + ) + + session.event_hub.subscribe( + "topic={}".format(TOPIC_STATUS_SERVER_RESULT), + ObjectFactory.status_factory.process_result + ) + + +def main(args): + port = int(args[-1]) + + # Create a TCP/IP socket + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + # Connect the socket to the port where the server is listening + server_address = ("localhost", port) + log.debug("Storer connected to {} port {}".format(*server_address)) + sock.connect(server_address) + sock.sendall(b"CreatedStatus") + # store socket connection object + ObjectFactory.sock = sock + ObjectFactory.status_factory = StatusFactory() + + _returncode = 0 + try: + session = SocketSession( + auto_connect_event_hub=True, sock=sock, Eventhub=SocketBaseEventHub + ) + ObjectFactory.session = session + register(session) + server = FtrackServer("event") + log.debug("Launched Ftrack Event storer") + server.run_server(session, load_files=False) + + except Exception: + _returncode = 1 + log.error("ServerInfo subprocess crashed", exc_info=True) + + finally: + log.debug("Ending. Closing socket.") + sock.close() + return _returncode + + +if __name__ == "__main__": + # Register interupt signal + def signal_handler(sig, frame): + print("You pressed Ctrl+C. Process ended.") + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + sys.exit(main(sys.argv)) + + +example_action_event = { + 'data': { + 'selection': [], + 'description': 'Test action2', + 'variant': None, + 'label': 'Test action2', + 'actionIdentifier': 'test.action2.3ceffe5e9acf40f8aa80603adebd0d06', + 'values': {}, + 'icon': None, + }, + 'topic': 'ftrack.action.launch', + 'sent': None, + 'source': { + 'id': 'eb67d186301c4cbbab73c1aee9b7c55d', + 'user': {'username': 'jakub.trllo', 'id': '2a8ae090-cbd3-11e8-a87a-0a580aa00121'} + }, + 'target': '', + 'in_reply_to_event': None +} From c937964dc8c80b54b95d5059670f845a83f4ca82 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 12:13:22 +0100 Subject: [PATCH 02/17] added subprocess to event server cli --- pype/ftrack/ftrack_server/event_server_cli.py | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/pype/ftrack/ftrack_server/event_server_cli.py b/pype/ftrack/ftrack_server/event_server_cli.py index b09b0bc84e..b2c540e993 100644 --- a/pype/ftrack/ftrack_server/event_server_cli.py +++ b/pype/ftrack/ftrack_server/event_server_cli.py @@ -7,6 +7,7 @@ import socket import argparse import atexit import time +import uuid import ftrack_api from pype.ftrack.lib import credentials @@ -175,6 +176,7 @@ def main_loop(ftrack_url): otherwise thread will be killed. """ + os.environ["FTRACK_EVENT_SUB_ID"] = str(uuid.uuid1()) # Get mongo hostname and port for testing mongo connection mongo_list = ftrack_events_mongo_settings() mongo_hostname = mongo_list[0] @@ -202,6 +204,13 @@ def main_loop(ftrack_url): processor_last_failed = datetime.datetime.now() processor_failed_count = 0 + statuser_name = "StorerThread" + statuser_port = 10021 + statuser_path = "{}/sub_event_info.py".format(file_path) + statuser_thread = None + statuser_last_failed = datetime.datetime.now() + statuser_failed_count = 0 + ftrack_accessible = False mongo_accessible = False @@ -336,6 +345,43 @@ def main_loop(ftrack_url): processor_failed_count = 0 processor_last_failed = _processor_last_failed + if statuser_thread is None: + if statuser_failed_count < max_fail_count: + statuser_thread = socket_thread.SocketThread( + statuser_name, statuser_port, statuser_path + ) + statuser_thread.start() + + elif statuser_failed_count == max_fail_count: + print(( + "Statuser failed {}times in row" + " I'll try to run again {}s later" + ).format(str(max_fail_count), str(wait_time_after_max_fail))) + statuser_failed_count += 1 + + elif (( + datetime.datetime.now() - statuser_last_failed + ).seconds > wait_time_after_max_fail): + statuser_failed_count = 0 + + # If thread failed test Ftrack and Mongo connection + elif not statuser_thread.isAlive(): + statuser_thread.join() + statuser_thread = None + ftrack_accessible = False + mongo_accessible = False + + _processor_last_failed = datetime.datetime.now() + delta_time = ( + _processor_last_failed - statuser_last_failed + ).seconds + + if delta_time < min_fail_seconds: + statuser_failed_count += 1 + else: + statuser_failed_count = 0 + statuser_last_failed = _processor_last_failed + time.sleep(1) From fa60c87c3e0f9e9261dd9b9e5c8b4188c50e0b4f Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:28:29 +0100 Subject: [PATCH 03/17] created base EventHub that can set callbacks on heartbeat and set message for sockets on heartbeat --- pype/ftrack/ftrack_server/lib.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index fefba580e0..2617b63614 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -123,20 +123,30 @@ def check_ftrack_url(url, log_errors=True): return url -class StorerEventHub(ftrack_api.event.hub.EventHub): +class SocketBaseEventHub(ftrack_api.event.hub.EventHub): + + hearbeat_msg = b"hearbeat" + heartbeat_callbacks = [] + def __init__(self, *args, **kwargs): self.sock = kwargs.pop("sock") - super(StorerEventHub, self).__init__(*args, **kwargs) + super(SocketBaseEventHub, self).__init__(*args, **kwargs) def _handle_packet(self, code, packet_identifier, path, data): """Override `_handle_packet` which extend heartbeat""" code_name = self._code_name_mapping[code] if code_name == "heartbeat": # Reply with heartbeat. - self.sock.sendall(b"storer") - return self._send_packet(self._code_name_mapping['heartbeat']) + for callback in self.heartbeat_callbacks: + callback() + + self.sock.sendall(self.hearbeat_msg) + return self._send_packet(self._code_name_mapping["heartbeat"]) + + return super(SocketBaseEventHub, self)._handle_packet( + code, packet_identifier, path, data + ) - elif code_name == "connect": event = ftrack_api.event.base.Event( topic="pype.storer.started", data={}, From 24022c583651f16d70b210e340472be523c447d8 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:28:44 +0100 Subject: [PATCH 04/17] Status event hub implemented --- pype/ftrack/ftrack_server/lib.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 2617b63614..71ce6861a4 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -147,6 +147,25 @@ class SocketBaseEventHub(ftrack_api.event.hub.EventHub): code, packet_identifier, path, data ) + +class StatusEventHub(SocketBaseEventHub): + def _handle_packet(self, code, packet_identifier, path, data): + """Override `_handle_packet` which extend heartbeat""" + code_name = self._code_name_mapping[code] + if code_name == "connect": + event = ftrack_api.event.base.Event( + topic="pype.status.started", + data={}, + source={ + "id": self.id, + "user": {"username": self._api_user} + } + ) + self._event_queue.put(event) + + return super(StatusEventHub, self)._handle_packet( + code, packet_identifier, path, data + ) event = ftrack_api.event.base.Event( topic="pype.storer.started", data={}, From a97c73258e349291ae8f0899f37ac7ec9a8c13b5 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:29:01 +0100 Subject: [PATCH 05/17] removed user event hub --- pype/ftrack/ftrack_server/lib.py | 29 -------------------- pype/ftrack/ftrack_server/sub_user_server.py | 4 +-- 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 71ce6861a4..57c5b7d5dc 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -296,35 +296,6 @@ class ProcessEventHub(ftrack_api.event.hub.EventHub): return super()._handle_packet(code, packet_identifier, path, data) -class UserEventHub(ftrack_api.event.hub.EventHub): - def __init__(self, *args, **kwargs): - self.sock = kwargs.pop("sock") - super(UserEventHub, self).__init__(*args, **kwargs) - - def _handle_packet(self, code, packet_identifier, path, data): - """Override `_handle_packet` which extend heartbeat""" - code_name = self._code_name_mapping[code] - if code_name == "heartbeat": - # Reply with heartbeat. - self.sock.sendall(b"hearbeat") - return self._send_packet(self._code_name_mapping['heartbeat']) - - elif code_name == "connect": - event = ftrack_api.event.base.Event( - topic="pype.storer.started", - data={}, - source={ - "id": self.id, - "user": {"username": self._api_user} - } - ) - self._event_queue.put(event) - - return super(UserEventHub, self)._handle_packet( - code, packet_identifier, path, data - ) - - class SocketSession(ftrack_api.session.Session): '''An isolated session for interaction with an ftrack server.''' def __init__( diff --git a/pype/ftrack/ftrack_server/sub_user_server.py b/pype/ftrack/ftrack_server/sub_user_server.py index f0d39447a8..8c1497a562 100644 --- a/pype/ftrack/ftrack_server/sub_user_server.py +++ b/pype/ftrack/ftrack_server/sub_user_server.py @@ -5,7 +5,7 @@ import socket import traceback from ftrack_server import FtrackServer -from pype.ftrack.ftrack_server.lib import SocketSession, UserEventHub +from pype.ftrack.ftrack_server.lib import SocketSession, SocketBaseEventHub from pypeapp import Logger @@ -28,7 +28,7 @@ def main(args): try: session = SocketSession( - auto_connect_event_hub=True, sock=sock, Eventhub=UserEventHub + auto_connect_event_hub=True, sock=sock, Eventhub=SocketBaseEventHub ) server = FtrackServer("action") log.debug("Launched User Ftrack Server") From 526f9282d1e4136b44eab6e5505b1adf23e4af5b Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:29:24 +0100 Subject: [PATCH 06/17] storer and processor eventhubs are modified --- pype/ftrack/ftrack_server/lib.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 57c5b7d5dc..478bede6ef 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -166,6 +166,16 @@ class StatusEventHub(SocketBaseEventHub): return super(StatusEventHub, self)._handle_packet( code, packet_identifier, path, data ) + + +class StorerEventHub(SocketBaseEventHub): + + hearbeat_msg = b"storer" + + def _handle_packet(self, code, packet_identifier, path, data): + """Override `_handle_packet` which extend heartbeat""" + code_name = self._code_name_mapping[code] + if code_name == "connect": event = ftrack_api.event.base.Event( topic="pype.storer.started", data={}, @@ -181,7 +191,9 @@ class StatusEventHub(SocketBaseEventHub): ) -class ProcessEventHub(ftrack_api.event.hub.EventHub): +class ProcessEventHub(SocketBaseEventHub): + + hearbeat_msg = b"processor" url, database, table_name = get_ftrack_event_mongo_info() is_table_created = False @@ -193,7 +205,6 @@ class ProcessEventHub(ftrack_api.event.hub.EventHub): database_name=self.database, table_name=self.table_name ) - self.sock = kwargs.pop("sock") super(ProcessEventHub, self).__init__(*args, **kwargs) def prepare_dbcon(self): @@ -289,9 +300,6 @@ class ProcessEventHub(ftrack_api.event.hub.EventHub): code_name = self._code_name_mapping[code] if code_name == "event": return - if code_name == "heartbeat": - self.sock.sendall(b"processor") - return self._send_packet(self._code_name_mapping["heartbeat"]) return super()._handle_packet(code, packet_identifier, path, data) From 4fd403bf54a167ea6d0621554b0a9b6768ca2bfb Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:29:38 +0100 Subject: [PATCH 07/17] added constants with topics to lib --- pype/ftrack/ftrack_server/lib.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py index 478bede6ef..e623cab8fb 100644 --- a/pype/ftrack/ftrack_server/lib.py +++ b/pype/ftrack/ftrack_server/lib.py @@ -28,6 +28,10 @@ from pypeapp import Logger from pype.ftrack.lib.custom_db_connector import DbConnector +TOPIC_STATUS_SERVER = "pype.event.server.status" +TOPIC_STATUS_SERVER_RESULT = "pype.event.server.status.result" + + def ftrack_events_mongo_settings(): host = None port = None From 37de60577809c2ace929f7dab880a95ddc0ed0c2 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:30:07 +0100 Subject: [PATCH 08/17] socket thread can use additional arguments to execute and -port arg was removed (not used) --- pype/ftrack/ftrack_server/socket_thread.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pype/ftrack/ftrack_server/socket_thread.py b/pype/ftrack/ftrack_server/socket_thread.py index 8e217870ba..cb073d83a0 100644 --- a/pype/ftrack/ftrack_server/socket_thread.py +++ b/pype/ftrack/ftrack_server/socket_thread.py @@ -12,13 +12,14 @@ class SocketThread(threading.Thread): MAX_TIMEOUT = 35 - def __init__(self, name, port, filepath): + def __init__(self, name, port, filepath, additional_args=[]): super(SocketThread, self).__init__() self.log = Logger().get_logger("SocketThread", "Event Thread") self.setName(name) self.name = name self.port = port self.filepath = filepath + self.additional_args = additional_args self.sock = None self.subproc = None self.connection = None @@ -53,7 +54,12 @@ class SocketThread(threading.Thread): ) self.subproc = subprocess.Popen( - [sys.executable, self.filepath, "-port", str(self.port)] + [ + sys.executable, + self.filepath, + *self.additional_args, + str(self.port) + ] ) # Listen for incoming connections From 05929f2b02929b9652411e4f0b53d324f3a67b76 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:31:24 +0100 Subject: [PATCH 09/17] status get suprocess data ony if they are missing (why to collect same data for each action launch) --- pype/ftrack/ftrack_server/sub_event_info.py | 426 +++++++++----------- 1 file changed, 197 insertions(+), 229 deletions(-) diff --git a/pype/ftrack/ftrack_server/sub_event_info.py b/pype/ftrack/ftrack_server/sub_event_info.py index d63b6acadd..5a38c992f5 100644 --- a/pype/ftrack/ftrack_server/sub_event_info.py +++ b/pype/ftrack/ftrack_server/sub_event_info.py @@ -1,137 +1,189 @@ import os import sys -import copy +import json import signal import socket -import uuid -from datetime import datetime +import datetime import ftrack_api from ftrack_server import FtrackServer from pype.ftrack.ftrack_server.lib import ( - SocketSession, SocketBaseEventHub, + SocketSession, StatusEventHub, TOPIC_STATUS_SERVER, TOPIC_STATUS_SERVER_RESULT ) from pypeapp import Logger log = Logger().get_logger("Event storer") -log.info(os.environ.get("FTRACK_EVENT_SUB_ID")) - - -class ObjectFactory: - session = None - sock = None - subprocess_id = os.environ["FTRACK_EVENT_SUB_ID"] - status_factory = None - - -def trigger_status_info(status_id=None, status=None): - if not status and not status_id: - log.warning( - "`status_id` or `status` must be specified to trigger action." - ) - return - - if not status: - status = ObjectFactory.status_factory[status_id] - - if not status: - return - - new_event_data = copy.deepcopy(action_data) - new_event_data.update({ - "selection": [] - }) - new_event_data["subprocess_id"] = ObjectFactory.subprocess_id - new_event_data["status_id"] = status.id - - new_event = ftrack_api.event.base.Event( - topic="ftrack.action.launch", - data=new_event_data, - source=status.source - ) - ObjectFactory.session.event_hub.publish(new_event) - - action_identifier = ( - "event.server.status" + ObjectFactory.subprocess_id + "event.server.status" + os.environ["FTRACK_EVENT_SUB_ID"] ) - -# TODO add IP adress to label -# TODO add icon action_data = { "label": "Pype Admin", - "variant": "Event server Status", + "variant": "- Event server Status", "description": "Get Infromation about event server", "actionIdentifier": action_identifier, "icon": None } +class ObjectFactory: + session = None + status_factory = None + + class Status: default_item = { "type": "label", - "value": "Information not allowed." + "value": "Process info is not available at this moment." } + + def __init__(self, name, label, parent): + self.name = name + self.label = label or name + self.parent = parent + + self.info = None + self.last_update = None + + def update(self, info): + self.last_update = datetime.datetime.now() + self.info = info + + def get_delta_string(self, delta): + days, hours, minutes = ( + delta.days, delta.seconds // 3600, delta.seconds // 60 % 60 + ) + delta_items = [ + "{}d".format(days), + "{}h".format(hours), + "{}m".format(minutes) + ] + if not days: + delta_items.pop(0) + if not hours: + delta_items.pop(0) + delta_items.append("{}s".format(delta.seconds % 60)) + if not minutes: + delta_items.pop(0) + + return " ".join(delta_items) + + def get_items(self): + items = [] + last_update = "N/A" + if self.last_update: + delta = datetime.datetime.now() - self.last_update + last_update = "{} ago".format( + self.get_delta_string(delta) + ) + + last_update = "Updated: {}".format(last_update) + items.append({ + "type": "label", + "value": "#{}".format(self.label) + }) + items.append({ + "type": "label", + "value": "##{}".format(last_update) + }) + + if not self.info: + if self.info is None: + trigger_info_get() + items.append(self.default_item) + return items + + info = {} + for key, value in self.info.items(): + if key not in ["created_at:", "created_at"]: + info[key] = value + continue + + datetime_value = datetime.datetime.strptime( + value, "%Y.%m.%d %H:%M:%S" + ) + delta = datetime.datetime.now() - datetime_value + + running_for = self.get_delta_string(delta) + info["Started at"] = "{} [running: {}]".format(value, running_for) + + for key, value in info.items(): + items.append({ + "type": "label", + "value": "{}: {}".format(key, value) + }) + + return items + + +class StatusFactory: + note_item = { "type": "label", - "value": "Hit `submit` to refresh data." + "value": ( + "NOTE: Hit `submit` and uncheck all" + " checkers to refresh data." + ) } splitter_item = { "type": "label", "value": "---" } - def __init__(self, source_info, parent): - self.id = str(uuid.uuid1()) - self.created = datetime.now() - self.parent = parent + def __init__(self, statuses={}): + self.statuses = [] + for status in statuses.items(): + self.create_status(*status) - self.source = source_info + def __getitem__(self, key): + return self.get(key) - self.main_process = None - self.storer = None - self.processor = None + def get(self, key, default=None): + for status in self.statuses: + if status.name == key: + return status + return default - def add_result(self, source, data): - if source.lower() == "storer": - self.storer = data - - elif source.lower() == "processor": - self.processor = data - - else: - self.main_process = data - - def filled(self): - # WARNING DEBUG PART!!!! + def is_filled(self): + for status in self.statuses: + if status.info is None: + return False return True - return ( - self.main_process is not None and - self.storer is not None and - self.processor is not None - ) - def get_items_from_dict(self, in_dict): - items = [] - for key, value in in_dict.items(): - items.append({ - "type": "label", - "value": "##{}".format(key) - }) - items.append({ - "type": "label", - "value": value - }) - return items + def create_status(self, name, label): + new_status = Status(name, label, self) + self.statuses.append(new_status) + + def process_event_result(self, event): + subprocess_id = event["data"].get("subprocess_id") + if subprocess_id != os.environ["FTRACK_EVENT_SUB_ID"]: + return + + source = event["data"]["source"] + data = event["data"]["status_info"] + for status in self.statuses: + if status.name == source: + status.update(data) + break def bool_items(self): items = [] - name_labels = { - "shutdown_main": "Shutdown main process", - "reset_storer": "Reset storer", - "reset_processor": "Reset processor" - } + items.append({ + "type": "label", + "value": "#Restart process" + }) + items.append({ + "type": "label", + "value": ( + "WARNING: Main process may not restart" + " if does not run as a service!" + ) + }) + + name_labels = {} + for status in self.statuses: + name_labels[status.name] = status.label + for name, label in name_labels.items(): items.append({ "type": "boolean", @@ -144,75 +196,14 @@ class Status: def items(self): items = [] items.append(self.note_item) - - items.append({"type": "label", "value": "Main process"}) - if not self.main_process: - items.append(self.default_item) - else: - items.extend( - self.get_items_from_dict(self.main_process) - ) - - items.append(self.splitter_item) - items.append({"type": "label", "value": "Storer process"}) - if not self.storer: - items.append(self.default_item) - else: - items.extend( - self.get_items_from_dict(self.storer) - ) - - items.append(self.splitter_item) - items.append({"type": "label", "value": "Processor process"}) - if not self.processor: - items.append(self.default_item) - else: - items.extend( - self.get_items_from_dict(self.processor) - ) - - items.append(self.splitter_item) items.extend(self.bool_items()) + for status in self.statuses: + items.append(self.splitter_item) + items.extend(status.get_items()) + return items - @property - def is_overtime(self): - time_delta = (datetime.now() - self.created).total_seconds() - return time_delta >= self.parent.max_delta_seconds - - -class StatusFactory: - max_delta_seconds = 30 - - def __init__(self): - self.statuses = {} - - def __getitem__(self, key): - return self.statuses.get(key) - - def create_status(self, source_info): - new_status = Status(source_info, self) - self.statuses[new_status.id] = new_status - return new_status - - def process_result(self, event): - subprocess_id = event["data"].get("subprocess_id") - if subprocess_id != ObjectFactory.subprocess_id: - return - - status_id = event["data"].get("status_id") - status = self.statuses[status_id] - if not status: - return - - source = event["data"]["source"] - data = event["data"]["status_info"] - - status.add_result(source, data) - if status.filled(): - trigger_status_info(status=status) - def server_activity_validate_user(event): """Validate user permissions to show server info.""" @@ -247,38 +238,6 @@ def server_activity_discover(event): return {"items": [action_data]} -def handle_filled_event(event): - subprocess_id = event["data"].get("subprocess_id") - if subprocess_id != ObjectFactory.subprocess_id: - return None - - status_id = event["data"].get("status_id") - status = ObjectFactory.status_factory[status_id] - if not status: - return None - - values = event.get("values") - if values: - log.info(values) - - title = "Event server - Status" - - event_data = copy.deepcopy(event["data"]) - event_data.update({ - "type": "widget", - "items": status.items(), - "title": title - }) - - ObjectFactory.session.event_hub.publish( - ftrack_api.event.base.Event( - topic="ftrack.action.trigger-user-interface", - data=event_data - ), - on_error='ignore' - ) - - def server_activity(event): session = ObjectFactory.session if session is None: @@ -289,35 +248,47 @@ def server_activity(event): "message": msg } - valid = server_activity_validate_user(event) - if not valid: + if not server_activity_validate_user(event): return { "success": False, "message": "You don't have permissions to see Event server status!" } - subprocess_id = event["data"].get("subprocess_id") - if subprocess_id is not None: - return handle_filled_event(event) + values = event["data"].get("values") or {} + is_checked = False + for value in values.values(): + if value: + is_checked = True + break - status = ObjectFactory.status_factory.create_status(event["source"]) + if not is_checked: + return { + "items": ObjectFactory.status_factory.items(), + "title": "Server current status" + } - event_data = { - "status_id": status.id, - "subprocess_id": ObjectFactory.subprocess_id - } + +def trigger_info_get(): + session = ObjectFactory.session session.event_hub.publish( ftrack_api.event.base.Event( topic=TOPIC_STATUS_SERVER, - data=event_data + data={"subprocess_id": os.environ["FTRACK_EVENT_SUB_ID"]} ), on_error="ignore" ) - return { - "success": True, - "message": "Collecting information (this may take > 20s)" - } + +def on_start(event): + session = ObjectFactory.session + source_id = event.get("source", {}).get("id") + if not source_id or source_id != session.event_hub.id: + return + + if session is None: + log.warning("Session is not set. Can't trigger Sync to avalon action.") + return True + trigger_info_get() def register(session): @@ -326,6 +297,7 @@ def register(session): "topic=ftrack.action.discover", server_activity_discover ) + session.event_hub.subscribe("topic=pype.status.started", on_start) status_launch_subscription = ( "topic=ftrack.action.launch and data.actionIdentifier={}" @@ -338,34 +310,51 @@ def register(session): session.event_hub.subscribe( "topic={}".format(TOPIC_STATUS_SERVER_RESULT), - ObjectFactory.status_factory.process_result + ObjectFactory.status_factory.process_event_result ) +def heartbeat(): + if ObjectFactory.status_factory.is_filled(): + return + + trigger_info_get() + + def main(args): port = int(args[-1]) + server_info = json.loads(args[-2]) # Create a TCP/IP socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Connect the socket to the port where the server is listening server_address = ("localhost", port) - log.debug("Storer connected to {} port {}".format(*server_address)) + log.debug("Statuser connected to {} port {}".format(*server_address)) sock.connect(server_address) sock.sendall(b"CreatedStatus") # store socket connection object ObjectFactory.sock = sock - ObjectFactory.status_factory = StatusFactory() + statuse_names = { + "main": "Main process", + "storer": "Storer", + "processor": "Processor" + } + + ObjectFactory.status_factory = StatusFactory(statuse_names) + ObjectFactory.status_factory["main"].update(server_info) _returncode = 0 try: session = SocketSession( - auto_connect_event_hub=True, sock=sock, Eventhub=SocketBaseEventHub + auto_connect_event_hub=True, sock=sock, Eventhub=StatusEventHub ) ObjectFactory.session = session + session.event_hub.heartbeat_callbacks.append(heartbeat) register(session) server = FtrackServer("event") - log.debug("Launched Ftrack Event storer") + log.debug("Launched Ftrack Event statuser") + server.run_server(session, load_files=False) except Exception: @@ -388,24 +377,3 @@ if __name__ == "__main__": signal.signal(signal.SIGTERM, signal_handler) sys.exit(main(sys.argv)) - - -example_action_event = { - 'data': { - 'selection': [], - 'description': 'Test action2', - 'variant': None, - 'label': 'Test action2', - 'actionIdentifier': 'test.action2.3ceffe5e9acf40f8aa80603adebd0d06', - 'values': {}, - 'icon': None, - }, - 'topic': 'ftrack.action.launch', - 'sent': None, - 'source': { - 'id': 'eb67d186301c4cbbab73c1aee9b7c55d', - 'user': {'username': 'jakub.trllo', 'id': '2a8ae090-cbd3-11e8-a87a-0a580aa00121'} - }, - 'target': '', - 'in_reply_to_event': None -} From 1b1a78cb6ed79be18fcf89bd340c4e09528fda56 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:31:47 +0100 Subject: [PATCH 10/17] processor suprocess can send status information on ask event --- .../ftrack_server/sub_event_processor.py | 51 ++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/pype/ftrack/ftrack_server/sub_event_processor.py b/pype/ftrack/ftrack_server/sub_event_processor.py index 9c971ca916..2a3ad3e76d 100644 --- a/pype/ftrack/ftrack_server/sub_event_processor.py +++ b/pype/ftrack/ftrack_server/sub_event_processor.py @@ -1,13 +1,59 @@ +import os import sys import signal import socket +import datetime from ftrack_server import FtrackServer -from pype.ftrack.ftrack_server.lib import SocketSession, ProcessEventHub +from pype.ftrack.ftrack_server.lib import ( + SocketSession, ProcessEventHub, TOPIC_STATUS_SERVER +) +import ftrack_api from pypeapp import Logger log = Logger().get_logger("Event processor") +subprocess_started = datetime.datetime.now() + + +class SessionFactory: + session = None + + +def send_status(event): + subprocess_id = event["data"].get("subprocess_id") + if not subprocess_id: + return + + if subprocess_id != os.environ["FTRACK_EVENT_SUB_ID"]: + return + + session = SessionFactory.session + if not session: + return + + new_event_data = { + "subprocess_id": subprocess_id, + "source": "processor", + "status_info": { + "created_at": subprocess_started.strftime("%Y.%m.%d %H:%M:%S") + } + } + + new_event = ftrack_api.event.base.Event( + topic="pype.event.server.status.result", + data=new_event_data + ) + + session.event_hub.publish(new_event) + + +def register(session): + '''Registers the event, subscribing the discover and launch topics.''' + session.event_hub.subscribe( + "topic={}".format(TOPIC_STATUS_SERVER), send_status + ) + def main(args): port = int(args[-1]) @@ -24,6 +70,9 @@ def main(args): session = SocketSession( auto_connect_event_hub=True, sock=sock, Eventhub=ProcessEventHub ) + register(session) + SessionFactory.session = session + server = FtrackServer("event") log.debug("Launched Ftrack Event processor") server.run_server(session) From 2ff7b87956651c3343d195b56f0f871aaa4afee1 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:32:02 +0100 Subject: [PATCH 11/17] storer can send status information on ask --- pype/ftrack/ftrack_server/sub_event_storer.py | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/pype/ftrack/ftrack_server/sub_event_storer.py b/pype/ftrack/ftrack_server/sub_event_storer.py index dfe8e21654..b4b9b8a7ab 100644 --- a/pype/ftrack/ftrack_server/sub_event_storer.py +++ b/pype/ftrack/ftrack_server/sub_event_storer.py @@ -8,14 +8,15 @@ import pymongo import ftrack_api from ftrack_server import FtrackServer from pype.ftrack.ftrack_server.lib import ( + SocketSession, StorerEventHub, get_ftrack_event_mongo_info, - SocketSession, - StorerEventHub + TOPIC_STATUS_SERVER, TOPIC_STATUS_SERVER_RESULT ) from pype.ftrack.lib.custom_db_connector import DbConnector from pypeapp import Logger log = Logger().get_logger("Event storer") +subprocess_started = datetime.datetime.now() class SessionFactory: @@ -138,11 +139,42 @@ def trigger_sync(event): ) +def send_status(event): + session = SessionFactory.session + if not session: + return + + subprocess_id = event["data"].get("subprocess_id") + if not subprocess_id: + return + + if subprocess_id != os.environ["FTRACK_EVENT_SUB_ID"]: + return + + new_event_data = { + "subprocess_id": os.environ["FTRACK_EVENT_SUB_ID"], + "source": "storer", + "status_info": { + "created_at": subprocess_started.strftime("%Y.%m.%d %H:%M:%S") + } + } + + new_event = ftrack_api.event.base.Event( + topic=TOPIC_STATUS_SERVER_RESULT, + data=new_event_data + ) + + session.event_hub.publish(new_event) + + def register(session): '''Registers the event, subscribing the discover and launch topics.''' install_db() session.event_hub.subscribe("topic=*", launch) session.event_hub.subscribe("topic=pype.storer.started", trigger_sync) + session.event_hub.subscribe( + "topic={}".format(TOPIC_STATUS_SERVER), send_status + ) def main(args): From 5433daf7b065eb7c16720009170b3400a5ee0fd5 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 18:32:40 +0100 Subject: [PATCH 12/17] event server cli sent his infomation on status subprocess startup --- pype/ftrack/ftrack_server/event_server_cli.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/pype/ftrack/ftrack_server/event_server_cli.py b/pype/ftrack/ftrack_server/event_server_cli.py index b2c540e993..2dadb5da25 100644 --- a/pype/ftrack/ftrack_server/event_server_cli.py +++ b/pype/ftrack/ftrack_server/event_server_cli.py @@ -4,7 +4,10 @@ import signal import datetime import subprocess import socket +import json +import platform import argparse +import getpass import atexit import time import uuid @@ -233,6 +236,16 @@ def main_loop(ftrack_url): atexit.register( on_exit, processor_thread=processor_thread, storer_thread=storer_thread ) + + system_name, pc_name = platform.uname()[:2] + host_name = socket.gethostname() + main_info = { + "created_at": datetime.datetime.now().strftime("%Y.%m.%d %H:%M:%S"), + "Username": getpass.getuser(), + "Host Name": host_name, + "Host IP": socket.gethostbyname(host_name) + } + main_info_str = json.dumps(main_info) # Main loop while True: # Check if accessible Ftrack and Mongo url @@ -270,6 +283,7 @@ def main_loop(ftrack_url): printed_ftrack_error = False printed_mongo_error = False + # ====== STORER ======= # Run backup thread which does not requeire mongo to work if storer_thread is None: if storer_failed_count < max_fail_count: @@ -304,6 +318,7 @@ def main_loop(ftrack_url): storer_failed_count = 0 storer_last_failed = _storer_last_failed + # ====== PROCESSOR ======= if processor_thread is None: if processor_failed_count < max_fail_count: processor_thread = socket_thread.SocketThread( @@ -345,10 +360,12 @@ def main_loop(ftrack_url): processor_failed_count = 0 processor_last_failed = _processor_last_failed + # ====== STATUSER ======= if statuser_thread is None: if statuser_failed_count < max_fail_count: statuser_thread = socket_thread.SocketThread( - statuser_name, statuser_port, statuser_path + statuser_name, statuser_port, statuser_path, + [main_info_str] ) statuser_thread.start() From 2f85cdf0be4ed0b54481013ebc57c201dad9f444 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Fri, 7 Feb 2020 19:53:16 +0100 Subject: [PATCH 13/17] restarting is working, need to add comunication between main proces and status process --- pype/ftrack/ftrack_server/event_server_cli.py | 99 +++++++++++-------- pype/ftrack/ftrack_server/socket_thread.py | 56 ++++++++++- pype/ftrack/ftrack_server/sub_event_info.py | 35 +++++++ 3 files changed, 148 insertions(+), 42 deletions(-) diff --git a/pype/ftrack/ftrack_server/event_server_cli.py b/pype/ftrack/ftrack_server/event_server_cli.py index 2dadb5da25..19e889f77d 100644 --- a/pype/ftrack/ftrack_server/event_server_cli.py +++ b/pype/ftrack/ftrack_server/event_server_cli.py @@ -222,7 +222,7 @@ def main_loop(ftrack_url): # stop threads on exit # TODO check if works and args have thread objects! - def on_exit(processor_thread, storer_thread): + def on_exit(processor_thread, storer_thread, statuser_thread): if processor_thread is not None: processor_thread.stop() processor_thread.join() @@ -233,8 +233,16 @@ def main_loop(ftrack_url): storer_thread.join() storer_thread = None + if statuser_thread is not None: + statuser_thread.stop() + statuser_thread.join() + statuser_thread = None + atexit.register( - on_exit, processor_thread=processor_thread, storer_thread=storer_thread + on_exit, + processor_thread=processor_thread, + storer_thread=storer_thread, + statuser_thread=statuser_thread ) system_name, pc_name = platform.uname()[:2] @@ -283,6 +291,51 @@ def main_loop(ftrack_url): printed_ftrack_error = False printed_mongo_error = False + # ====== STATUSER ======= + if statuser_thread is None: + if statuser_failed_count < max_fail_count: + statuser_thread = socket_thread.StatusSocketThread( + statuser_name, statuser_port, statuser_path, + [main_info_str] + ) + statuser_thread.start() + + elif statuser_failed_count == max_fail_count: + print(( + "Statuser failed {}times in row" + " I'll try to run again {}s later" + ).format(str(max_fail_count), str(wait_time_after_max_fail))) + statuser_failed_count += 1 + + elif (( + datetime.datetime.now() - statuser_last_failed + ).seconds > wait_time_after_max_fail): + statuser_failed_count = 0 + + # If thread failed test Ftrack and Mongo connection + elif not statuser_thread.isAlive(): + statuser_thread.join() + statuser_thread = None + ftrack_accessible = False + mongo_accessible = False + + _processor_last_failed = datetime.datetime.now() + delta_time = ( + _processor_last_failed - statuser_last_failed + ).seconds + + if delta_time < min_fail_seconds: + statuser_failed_count += 1 + else: + statuser_failed_count = 0 + statuser_last_failed = _processor_last_failed + + elif statuser_thread.stop_subprocess: + print("Main process was stopped by action") + on_exit(processor_thread, storer_thread, statuser_thread) + os.kill(os.getpid(), signal.SIGTERM) + return 1 + # ====== STORER ======= # Run backup thread which does not requeire mongo to work if storer_thread is None: @@ -291,6 +344,7 @@ def main_loop(ftrack_url): storer_name, storer_port, storer_path ) storer_thread.start() + elif storer_failed_count == max_fail_count: print(( "Storer failed {}times I'll try to run again {}s later" @@ -360,44 +414,9 @@ def main_loop(ftrack_url): processor_failed_count = 0 processor_last_failed = _processor_last_failed - # ====== STATUSER ======= - if statuser_thread is None: - if statuser_failed_count < max_fail_count: - statuser_thread = socket_thread.SocketThread( - statuser_name, statuser_port, statuser_path, - [main_info_str] - ) - statuser_thread.start() - - elif statuser_failed_count == max_fail_count: - print(( - "Statuser failed {}times in row" - " I'll try to run again {}s later" - ).format(str(max_fail_count), str(wait_time_after_max_fail))) - statuser_failed_count += 1 - - elif (( - datetime.datetime.now() - statuser_last_failed - ).seconds > wait_time_after_max_fail): - statuser_failed_count = 0 - - # If thread failed test Ftrack and Mongo connection - elif not statuser_thread.isAlive(): - statuser_thread.join() - statuser_thread = None - ftrack_accessible = False - mongo_accessible = False - - _processor_last_failed = datetime.datetime.now() - delta_time = ( - _processor_last_failed - statuser_last_failed - ).seconds - - if delta_time < min_fail_seconds: - statuser_failed_count += 1 - else: - statuser_failed_count = 0 - statuser_last_failed = _processor_last_failed + if statuser_thread is not None: + statuser_thread.set_process("storer", storer_thread) + statuser_thread.set_process("processor", processor_thread) time.sleep(1) diff --git a/pype/ftrack/ftrack_server/socket_thread.py b/pype/ftrack/ftrack_server/socket_thread.py index cb073d83a0..cbe4f9dd8b 100644 --- a/pype/ftrack/ftrack_server/socket_thread.py +++ b/pype/ftrack/ftrack_server/socket_thread.py @@ -3,6 +3,7 @@ import sys import time import socket import threading +import traceback import subprocess from pypeapp import Logger @@ -14,12 +15,13 @@ class SocketThread(threading.Thread): def __init__(self, name, port, filepath, additional_args=[]): super(SocketThread, self).__init__() - self.log = Logger().get_logger("SocketThread", "Event Thread") + self.log = Logger().get_logger(self.__class__.__name__) self.setName(name) self.name = name self.port = port self.filepath = filepath self.additional_args = additional_args + self.sock = None self.subproc = None self.connection = None @@ -59,7 +61,8 @@ class SocketThread(threading.Thread): self.filepath, *self.additional_args, str(self.port) - ] + ], + stdin=subprocess.PIPE ) # Listen for incoming connections @@ -133,3 +136,52 @@ class SocketThread(threading.Thread): if data == b"MongoError": self.mongo_error = True connection.sendall(data) + + +class StatusSocketThread(SocketThread): + process_name_mapping = { + b"RestartS": "storer", + b"RestartP": "processor", + b"RestartM": "main" + } + + def __init__(self, *args, **kwargs): + self.process_threads = {} + self.stop_subprocess = False + super(StatusSocketThread, self).__init__(*args, **kwargs) + + def set_process(self, process_name, thread): + try: + if not self.subproc: + self.process_threads[process_name] = None + return + + if ( + process_name in self.process_threads and + self.process_threads[process_name] == thread + ): + return + + self.process_threads[process_name] = thread + self.subproc.stdin.write( + str.encode("reset:{}".format(process_name)) + ) + self.subproc.stdin.flush() + + except Exception: + print("Could not set thread in StatusSocketThread") + traceback.print_exception(*sys.exc_info()) + + def _handle_data(self, connection, data): + if not data: + return + + process_name = self.process_name_mapping.get(data) + if process_name: + if process_name == "main": + self.stop_subprocess = True + else: + subp = self.process_threads.get(process_name) + if subp: + subp.stop() + connection.sendall(data) diff --git a/pype/ftrack/ftrack_server/sub_event_info.py b/pype/ftrack/ftrack_server/sub_event_info.py index 5a38c992f5..a0c2564e10 100644 --- a/pype/ftrack/ftrack_server/sub_event_info.py +++ b/pype/ftrack/ftrack_server/sub_event_info.py @@ -1,6 +1,8 @@ import os import sys import json +import time +import threading import signal import socket import datetime @@ -29,6 +31,7 @@ action_data = { class ObjectFactory: session = None status_factory = None + checker_thread = None class Status: @@ -267,6 +270,17 @@ def server_activity(event): "title": "Server current status" } + session = ObjectFactory.session + if values["main"]: + session.event_hub.sock.sendall(b"RestartM") + return + + if values["storer"]: + session.event_hub.sock.sendall(b"RestartS") + + if values["processor"]: + session.event_hub.sock.sendall(b"RestartP") + def trigger_info_get(): session = ObjectFactory.session @@ -367,13 +381,34 @@ def main(args): return _returncode +class OutputChecker(threading.Thread): + read_input = True + + def run(self): + while self.read_input: + line = sys.stdin.readlines() + log.info(str(line)) + # for line in sys.stdin.readlines(): + # log.info(str(line)) + log.info("alive-end") + time.sleep(0.5) + + def stop(self): + self.read_input = False + + if __name__ == "__main__": # Register interupt signal def signal_handler(sig, frame): print("You pressed Ctrl+C. Process ended.") + ObjectFactory.checker_thread.stop() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) + checker_thread = OutputChecker() + ObjectFactory.checker_thread = checker_thread + checker_thread.start() + sys.exit(main(sys.argv)) From 10853e1ade753801109009d0497b389533419316 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Sat, 8 Feb 2020 11:26:43 +0100 Subject: [PATCH 14/17] process information are refreshed by main process now --- pype/ftrack/ftrack_server/socket_thread.py | 2 +- pype/ftrack/ftrack_server/sub_event_info.py | 39 ++++++++++++++------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/pype/ftrack/ftrack_server/socket_thread.py b/pype/ftrack/ftrack_server/socket_thread.py index cbe4f9dd8b..942965f9e2 100644 --- a/pype/ftrack/ftrack_server/socket_thread.py +++ b/pype/ftrack/ftrack_server/socket_thread.py @@ -164,7 +164,7 @@ class StatusSocketThread(SocketThread): self.process_threads[process_name] = thread self.subproc.stdin.write( - str.encode("reset:{}".format(process_name)) + str.encode("reset:{}\r\n".format(process_name)) ) self.subproc.stdin.flush() diff --git a/pype/ftrack/ftrack_server/sub_event_info.py b/pype/ftrack/ftrack_server/sub_event_info.py index a0c2564e10..4c94513eae 100644 --- a/pype/ftrack/ftrack_server/sub_event_info.py +++ b/pype/ftrack/ftrack_server/sub_event_info.py @@ -32,6 +32,7 @@ class ObjectFactory: session = None status_factory = None checker_thread = None + last_trigger = None class Status: @@ -124,8 +125,8 @@ class StatusFactory: note_item = { "type": "label", "value": ( - "NOTE: Hit `submit` and uncheck all" - " checkers to refresh data." + "HINT: To refresh data uncheck" + " all checkboxes and hit `Submit` button." ) } splitter_item = { @@ -164,9 +165,13 @@ class StatusFactory: source = event["data"]["source"] data = event["data"]["status_info"] + + self.update_status_info(source, data) + + def update_status_info(self, process_name, info): for status in self.statuses: - if status.name == source: - status.update(data) + if status.name == process_name: + status.update(info) break def bool_items(self): @@ -178,7 +183,7 @@ class StatusFactory: items.append({ "type": "label", "value": ( - "WARNING: Main process may not restart" + "WARNING: Main process may shut down when checked" " if does not run as a service!" ) }) @@ -283,6 +288,11 @@ def server_activity(event): def trigger_info_get(): + if ObjectFactory.last_trigger: + delta = datetime.datetime.now() - ObjectFactory.last_trigger + if delta.seconds() < 5: + return + session = ObjectFactory.session session.event_hub.publish( ftrack_api.event.base.Event( @@ -352,8 +362,8 @@ def main(args): statuse_names = { "main": "Main process", - "storer": "Storer", - "processor": "Processor" + "storer": "Event Storer", + "processor": "Event Processor" } ObjectFactory.status_factory = StatusFactory(statuse_names) @@ -386,12 +396,15 @@ class OutputChecker(threading.Thread): def run(self): while self.read_input: - line = sys.stdin.readlines() - log.info(str(line)) - # for line in sys.stdin.readlines(): - # log.info(str(line)) - log.info("alive-end") - time.sleep(0.5) + for line in sys.stdin: + line = line.rstrip().lower() + if not line.startswith("reset:"): + continue + process_name = line.replace("reset:", "") + + ObjectFactory.status_factory.update_status_info( + process_name, None + ) def stop(self): self.read_input = False From 49f9dbf4183f057ab2f0ad16fe4b0909de55eef1 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Sat, 8 Feb 2020 11:28:33 +0100 Subject: [PATCH 15/17] renamed sub_event_info to sub_event_status --- pype/ftrack/ftrack_server/event_server_cli.py | 2 +- .../ftrack_server/{sub_event_info.py => sub_event_status.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename pype/ftrack/ftrack_server/{sub_event_info.py => sub_event_status.py} (100%) diff --git a/pype/ftrack/ftrack_server/event_server_cli.py b/pype/ftrack/ftrack_server/event_server_cli.py index 19e889f77d..90c7c566fc 100644 --- a/pype/ftrack/ftrack_server/event_server_cli.py +++ b/pype/ftrack/ftrack_server/event_server_cli.py @@ -209,7 +209,7 @@ def main_loop(ftrack_url): statuser_name = "StorerThread" statuser_port = 10021 - statuser_path = "{}/sub_event_info.py".format(file_path) + statuser_path = "{}/sub_event_status.py".format(file_path) statuser_thread = None statuser_last_failed = datetime.datetime.now() statuser_failed_count = 0 diff --git a/pype/ftrack/ftrack_server/sub_event_info.py b/pype/ftrack/ftrack_server/sub_event_status.py similarity index 100% rename from pype/ftrack/ftrack_server/sub_event_info.py rename to pype/ftrack/ftrack_server/sub_event_status.py From e9c4ec7fee46b87a067efc9a7566a09f071a4ea3 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Sat, 8 Feb 2020 11:30:38 +0100 Subject: [PATCH 16/17] label has IP adress of server --- pype/ftrack/ftrack_server/sub_event_status.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pype/ftrack/ftrack_server/sub_event_status.py b/pype/ftrack/ftrack_server/sub_event_status.py index 4c94513eae..8dc176a091 100644 --- a/pype/ftrack/ftrack_server/sub_event_status.py +++ b/pype/ftrack/ftrack_server/sub_event_status.py @@ -1,7 +1,6 @@ import os import sys import json -import time import threading import signal import socket @@ -19,9 +18,10 @@ log = Logger().get_logger("Event storer") action_identifier = ( "event.server.status" + os.environ["FTRACK_EVENT_SUB_ID"] ) +host_ip = socket.gethostbyname(socket.gethostname()) action_data = { "label": "Pype Admin", - "variant": "- Event server Status", + "variant": "- Event server Status ({})".format(host_ip), "description": "Get Infromation about event server", "actionIdentifier": action_identifier, "icon": None From 4e85279771711e794330d414537381be9025a4b6 Mon Sep 17 00:00:00 2001 From: iLLiCiTiT Date: Sat, 8 Feb 2020 12:01:04 +0100 Subject: [PATCH 17/17] added icon to status action --- pype/ftrack/ftrack_server/sub_event_status.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/pype/ftrack/ftrack_server/sub_event_status.py b/pype/ftrack/ftrack_server/sub_event_status.py index 8dc176a091..1a15a1f28d 100644 --- a/pype/ftrack/ftrack_server/sub_event_status.py +++ b/pype/ftrack/ftrack_server/sub_event_status.py @@ -12,7 +12,7 @@ from pype.ftrack.ftrack_server.lib import ( SocketSession, StatusEventHub, TOPIC_STATUS_SERVER, TOPIC_STATUS_SERVER_RESULT ) -from pypeapp import Logger +from pypeapp import Logger, config log = Logger().get_logger("Event storer") action_identifier = ( @@ -24,7 +24,16 @@ action_data = { "variant": "- Event server Status ({})".format(host_ip), "description": "Get Infromation about event server", "actionIdentifier": action_identifier, - "icon": None + "icon": "{}/ftrack/action_icons/PypeAdmin.svg".format( + os.environ.get( + "PYPE_STATICS_SERVER", + "http://localhost:{}".format( + config.get_presets().get("services", {}).get( + "rest_api", {} + ).get("default_port", 8021) + ) + ) + ) }