diff --git a/pype/ftrack/ftrack_server/event_server_cli.py b/pype/ftrack/ftrack_server/event_server_cli.py
index b09b0bc84e..90c7c566fc 100644
--- a/pype/ftrack/ftrack_server/event_server_cli.py
+++ b/pype/ftrack/ftrack_server/event_server_cli.py
@@ -4,9 +4,13 @@ import signal
import datetime
import subprocess
import socket
+import json
+import platform
import argparse
+import getpass
import atexit
import time
+import uuid
import ftrack_api
from pype.ftrack.lib import credentials
@@ -175,6 +179,7 @@ def main_loop(ftrack_url):
otherwise thread will be killed.
"""
+ os.environ["FTRACK_EVENT_SUB_ID"] = str(uuid.uuid1())
# Get mongo hostname and port for testing mongo connection
mongo_list = ftrack_events_mongo_settings()
mongo_hostname = mongo_list[0]
@@ -202,6 +207,13 @@ def main_loop(ftrack_url):
processor_last_failed = datetime.datetime.now()
processor_failed_count = 0
+ statuser_name = "StorerThread"
+ statuser_port = 10021
+ statuser_path = "{}/sub_event_status.py".format(file_path)
+ statuser_thread = None
+ statuser_last_failed = datetime.datetime.now()
+ statuser_failed_count = 0
+
ftrack_accessible = False
mongo_accessible = False
@@ -210,7 +222,7 @@ def main_loop(ftrack_url):
# stop threads on exit
# TODO check if works and args have thread objects!
- def on_exit(processor_thread, storer_thread):
+ def on_exit(processor_thread, storer_thread, statuser_thread):
if processor_thread is not None:
processor_thread.stop()
processor_thread.join()
@@ -221,9 +233,27 @@ def main_loop(ftrack_url):
storer_thread.join()
storer_thread = None
+ if statuser_thread is not None:
+ statuser_thread.stop()
+ statuser_thread.join()
+ statuser_thread = None
+
atexit.register(
- on_exit, processor_thread=processor_thread, storer_thread=storer_thread
+ on_exit,
+ processor_thread=processor_thread,
+ storer_thread=storer_thread,
+ statuser_thread=statuser_thread
)
+
+ system_name, pc_name = platform.uname()[:2]
+ host_name = socket.gethostname()
+ main_info = {
+ "created_at": datetime.datetime.now().strftime("%Y.%m.%d %H:%M:%S"),
+ "Username": getpass.getuser(),
+ "Host Name": host_name,
+ "Host IP": socket.gethostbyname(host_name)
+ }
+ main_info_str = json.dumps(main_info)
# Main loop
while True:
# Check if accessible Ftrack and Mongo url
@@ -261,6 +291,52 @@ def main_loop(ftrack_url):
printed_ftrack_error = False
printed_mongo_error = False
+ # ====== STATUSER =======
+ if statuser_thread is None:
+ if statuser_failed_count < max_fail_count:
+ statuser_thread = socket_thread.StatusSocketThread(
+ statuser_name, statuser_port, statuser_path,
+ [main_info_str]
+ )
+ statuser_thread.start()
+
+ elif statuser_failed_count == max_fail_count:
+ print((
+ "Statuser failed {}times in row"
+ " I'll try to run again {}s later"
+ ).format(str(max_fail_count), str(wait_time_after_max_fail)))
+ statuser_failed_count += 1
+
+ elif ((
+ datetime.datetime.now() - statuser_last_failed
+ ).seconds > wait_time_after_max_fail):
+ statuser_failed_count = 0
+
+ # If thread failed test Ftrack and Mongo connection
+ elif not statuser_thread.isAlive():
+ statuser_thread.join()
+ statuser_thread = None
+ ftrack_accessible = False
+ mongo_accessible = False
+
+ _processor_last_failed = datetime.datetime.now()
+ delta_time = (
+ _processor_last_failed - statuser_last_failed
+ ).seconds
+
+ if delta_time < min_fail_seconds:
+ statuser_failed_count += 1
+ else:
+ statuser_failed_count = 0
+ statuser_last_failed = _processor_last_failed
+
+ elif statuser_thread.stop_subprocess:
+ print("Main process was stopped by action")
+ on_exit(processor_thread, storer_thread, statuser_thread)
+ os.kill(os.getpid(), signal.SIGTERM)
+ return 1
+
+ # ====== STORER =======
# Run backup thread which does not requeire mongo to work
if storer_thread is None:
if storer_failed_count < max_fail_count:
@@ -268,6 +344,7 @@ def main_loop(ftrack_url):
storer_name, storer_port, storer_path
)
storer_thread.start()
+
elif storer_failed_count == max_fail_count:
print((
"Storer failed {}times I'll try to run again {}s later"
@@ -295,6 +372,7 @@ def main_loop(ftrack_url):
storer_failed_count = 0
storer_last_failed = _storer_last_failed
+ # ====== PROCESSOR =======
if processor_thread is None:
if processor_failed_count < max_fail_count:
processor_thread = socket_thread.SocketThread(
@@ -336,6 +414,10 @@ def main_loop(ftrack_url):
processor_failed_count = 0
processor_last_failed = _processor_last_failed
+ if statuser_thread is not None:
+ statuser_thread.set_process("storer", storer_thread)
+ statuser_thread.set_process("processor", processor_thread)
+
time.sleep(1)
diff --git a/pype/ftrack/ftrack_server/lib.py b/pype/ftrack/ftrack_server/lib.py
index fefba580e0..e623cab8fb 100644
--- a/pype/ftrack/ftrack_server/lib.py
+++ b/pype/ftrack/ftrack_server/lib.py
@@ -28,6 +28,10 @@ from pypeapp import Logger
from pype.ftrack.lib.custom_db_connector import DbConnector
+TOPIC_STATUS_SERVER = "pype.event.server.status"
+TOPIC_STATUS_SERVER_RESULT = "pype.event.server.status.result"
+
+
def ftrack_events_mongo_settings():
host = None
port = None
@@ -123,20 +127,59 @@ def check_ftrack_url(url, log_errors=True):
return url
-class StorerEventHub(ftrack_api.event.hub.EventHub):
+class SocketBaseEventHub(ftrack_api.event.hub.EventHub):
+
+ hearbeat_msg = b"hearbeat"
+ heartbeat_callbacks = []
+
def __init__(self, *args, **kwargs):
self.sock = kwargs.pop("sock")
- super(StorerEventHub, self).__init__(*args, **kwargs)
+ super(SocketBaseEventHub, self).__init__(*args, **kwargs)
def _handle_packet(self, code, packet_identifier, path, data):
"""Override `_handle_packet` which extend heartbeat"""
code_name = self._code_name_mapping[code]
if code_name == "heartbeat":
# Reply with heartbeat.
- self.sock.sendall(b"storer")
- return self._send_packet(self._code_name_mapping['heartbeat'])
+ for callback in self.heartbeat_callbacks:
+ callback()
- elif code_name == "connect":
+ self.sock.sendall(self.hearbeat_msg)
+ return self._send_packet(self._code_name_mapping["heartbeat"])
+
+ return super(SocketBaseEventHub, self)._handle_packet(
+ code, packet_identifier, path, data
+ )
+
+
+class StatusEventHub(SocketBaseEventHub):
+ def _handle_packet(self, code, packet_identifier, path, data):
+ """Override `_handle_packet` which extend heartbeat"""
+ code_name = self._code_name_mapping[code]
+ if code_name == "connect":
+ event = ftrack_api.event.base.Event(
+ topic="pype.status.started",
+ data={},
+ source={
+ "id": self.id,
+ "user": {"username": self._api_user}
+ }
+ )
+ self._event_queue.put(event)
+
+ return super(StatusEventHub, self)._handle_packet(
+ code, packet_identifier, path, data
+ )
+
+
+class StorerEventHub(SocketBaseEventHub):
+
+ hearbeat_msg = b"storer"
+
+ def _handle_packet(self, code, packet_identifier, path, data):
+ """Override `_handle_packet` which extend heartbeat"""
+ code_name = self._code_name_mapping[code]
+ if code_name == "connect":
event = ftrack_api.event.base.Event(
topic="pype.storer.started",
data={},
@@ -152,7 +195,9 @@ class StorerEventHub(ftrack_api.event.hub.EventHub):
)
-class ProcessEventHub(ftrack_api.event.hub.EventHub):
+class ProcessEventHub(SocketBaseEventHub):
+
+ hearbeat_msg = b"processor"
url, database, table_name = get_ftrack_event_mongo_info()
is_table_created = False
@@ -164,7 +209,6 @@ class ProcessEventHub(ftrack_api.event.hub.EventHub):
database_name=self.database,
table_name=self.table_name
)
- self.sock = kwargs.pop("sock")
super(ProcessEventHub, self).__init__(*args, **kwargs)
def prepare_dbcon(self):
@@ -260,42 +304,10 @@ class ProcessEventHub(ftrack_api.event.hub.EventHub):
code_name = self._code_name_mapping[code]
if code_name == "event":
return
- if code_name == "heartbeat":
- self.sock.sendall(b"processor")
- return self._send_packet(self._code_name_mapping["heartbeat"])
return super()._handle_packet(code, packet_identifier, path, data)
-class UserEventHub(ftrack_api.event.hub.EventHub):
- def __init__(self, *args, **kwargs):
- self.sock = kwargs.pop("sock")
- super(UserEventHub, self).__init__(*args, **kwargs)
-
- def _handle_packet(self, code, packet_identifier, path, data):
- """Override `_handle_packet` which extend heartbeat"""
- code_name = self._code_name_mapping[code]
- if code_name == "heartbeat":
- # Reply with heartbeat.
- self.sock.sendall(b"hearbeat")
- return self._send_packet(self._code_name_mapping['heartbeat'])
-
- elif code_name == "connect":
- event = ftrack_api.event.base.Event(
- topic="pype.storer.started",
- data={},
- source={
- "id": self.id,
- "user": {"username": self._api_user}
- }
- )
- self._event_queue.put(event)
-
- return super(UserEventHub, self)._handle_packet(
- code, packet_identifier, path, data
- )
-
-
class SocketSession(ftrack_api.session.Session):
'''An isolated session for interaction with an ftrack server.'''
def __init__(
diff --git a/pype/ftrack/ftrack_server/socket_thread.py b/pype/ftrack/ftrack_server/socket_thread.py
index 8e217870ba..942965f9e2 100644
--- a/pype/ftrack/ftrack_server/socket_thread.py
+++ b/pype/ftrack/ftrack_server/socket_thread.py
@@ -3,6 +3,7 @@ import sys
import time
import socket
import threading
+import traceback
import subprocess
from pypeapp import Logger
@@ -12,13 +13,15 @@ class SocketThread(threading.Thread):
MAX_TIMEOUT = 35
- def __init__(self, name, port, filepath):
+ def __init__(self, name, port, filepath, additional_args=[]):
super(SocketThread, self).__init__()
- self.log = Logger().get_logger("SocketThread", "Event Thread")
+ self.log = Logger().get_logger(self.__class__.__name__)
self.setName(name)
self.name = name
self.port = port
self.filepath = filepath
+ self.additional_args = additional_args
+
self.sock = None
self.subproc = None
self.connection = None
@@ -53,7 +56,13 @@ class SocketThread(threading.Thread):
)
self.subproc = subprocess.Popen(
- [sys.executable, self.filepath, "-port", str(self.port)]
+ [
+ sys.executable,
+ self.filepath,
+ *self.additional_args,
+ str(self.port)
+ ],
+ stdin=subprocess.PIPE
)
# Listen for incoming connections
@@ -127,3 +136,52 @@ class SocketThread(threading.Thread):
if data == b"MongoError":
self.mongo_error = True
connection.sendall(data)
+
+
+class StatusSocketThread(SocketThread):
+ process_name_mapping = {
+ b"RestartS": "storer",
+ b"RestartP": "processor",
+ b"RestartM": "main"
+ }
+
+ def __init__(self, *args, **kwargs):
+ self.process_threads = {}
+ self.stop_subprocess = False
+ super(StatusSocketThread, self).__init__(*args, **kwargs)
+
+ def set_process(self, process_name, thread):
+ try:
+ if not self.subproc:
+ self.process_threads[process_name] = None
+ return
+
+ if (
+ process_name in self.process_threads and
+ self.process_threads[process_name] == thread
+ ):
+ return
+
+ self.process_threads[process_name] = thread
+ self.subproc.stdin.write(
+ str.encode("reset:{}\r\n".format(process_name))
+ )
+ self.subproc.stdin.flush()
+
+ except Exception:
+ print("Could not set thread in StatusSocketThread")
+ traceback.print_exception(*sys.exc_info())
+
+ def _handle_data(self, connection, data):
+ if not data:
+ return
+
+ process_name = self.process_name_mapping.get(data)
+ if process_name:
+ if process_name == "main":
+ self.stop_subprocess = True
+ else:
+ subp = self.process_threads.get(process_name)
+ if subp:
+ subp.stop()
+ connection.sendall(data)
diff --git a/pype/ftrack/ftrack_server/sub_event_processor.py b/pype/ftrack/ftrack_server/sub_event_processor.py
index 9c971ca916..2a3ad3e76d 100644
--- a/pype/ftrack/ftrack_server/sub_event_processor.py
+++ b/pype/ftrack/ftrack_server/sub_event_processor.py
@@ -1,13 +1,59 @@
+import os
import sys
import signal
import socket
+import datetime
from ftrack_server import FtrackServer
-from pype.ftrack.ftrack_server.lib import SocketSession, ProcessEventHub
+from pype.ftrack.ftrack_server.lib import (
+ SocketSession, ProcessEventHub, TOPIC_STATUS_SERVER
+)
+import ftrack_api
from pypeapp import Logger
log = Logger().get_logger("Event processor")
+subprocess_started = datetime.datetime.now()
+
+
+class SessionFactory:
+ session = None
+
+
+def send_status(event):
+ subprocess_id = event["data"].get("subprocess_id")
+ if not subprocess_id:
+ return
+
+ if subprocess_id != os.environ["FTRACK_EVENT_SUB_ID"]:
+ return
+
+ session = SessionFactory.session
+ if not session:
+ return
+
+ new_event_data = {
+ "subprocess_id": subprocess_id,
+ "source": "processor",
+ "status_info": {
+ "created_at": subprocess_started.strftime("%Y.%m.%d %H:%M:%S")
+ }
+ }
+
+ new_event = ftrack_api.event.base.Event(
+ topic="pype.event.server.status.result",
+ data=new_event_data
+ )
+
+ session.event_hub.publish(new_event)
+
+
+def register(session):
+ '''Registers the event, subscribing the discover and launch topics.'''
+ session.event_hub.subscribe(
+ "topic={}".format(TOPIC_STATUS_SERVER), send_status
+ )
+
def main(args):
port = int(args[-1])
@@ -24,6 +70,9 @@ def main(args):
session = SocketSession(
auto_connect_event_hub=True, sock=sock, Eventhub=ProcessEventHub
)
+ register(session)
+ SessionFactory.session = session
+
server = FtrackServer("event")
log.debug("Launched Ftrack Event processor")
server.run_server(session)
diff --git a/pype/ftrack/ftrack_server/sub_event_status.py b/pype/ftrack/ftrack_server/sub_event_status.py
new file mode 100644
index 0000000000..1a15a1f28d
--- /dev/null
+++ b/pype/ftrack/ftrack_server/sub_event_status.py
@@ -0,0 +1,436 @@
+import os
+import sys
+import json
+import threading
+import signal
+import socket
+import datetime
+
+import ftrack_api
+from ftrack_server import FtrackServer
+from pype.ftrack.ftrack_server.lib import (
+ SocketSession, StatusEventHub,
+ TOPIC_STATUS_SERVER, TOPIC_STATUS_SERVER_RESULT
+)
+from pypeapp import Logger, config
+
+log = Logger().get_logger("Event storer")
+action_identifier = (
+ "event.server.status" + os.environ["FTRACK_EVENT_SUB_ID"]
+)
+host_ip = socket.gethostbyname(socket.gethostname())
+action_data = {
+ "label": "Pype Admin",
+ "variant": "- Event server Status ({})".format(host_ip),
+ "description": "Get Infromation about event server",
+ "actionIdentifier": action_identifier,
+ "icon": "{}/ftrack/action_icons/PypeAdmin.svg".format(
+ os.environ.get(
+ "PYPE_STATICS_SERVER",
+ "http://localhost:{}".format(
+ config.get_presets().get("services", {}).get(
+ "rest_api", {}
+ ).get("default_port", 8021)
+ )
+ )
+ )
+}
+
+
+class ObjectFactory:
+ session = None
+ status_factory = None
+ checker_thread = None
+ last_trigger = None
+
+
+class Status:
+ default_item = {
+ "type": "label",
+ "value": "Process info is not available at this moment."
+ }
+
+ def __init__(self, name, label, parent):
+ self.name = name
+ self.label = label or name
+ self.parent = parent
+
+ self.info = None
+ self.last_update = None
+
+ def update(self, info):
+ self.last_update = datetime.datetime.now()
+ self.info = info
+
+ def get_delta_string(self, delta):
+ days, hours, minutes = (
+ delta.days, delta.seconds // 3600, delta.seconds // 60 % 60
+ )
+ delta_items = [
+ "{}d".format(days),
+ "{}h".format(hours),
+ "{}m".format(minutes)
+ ]
+ if not days:
+ delta_items.pop(0)
+ if not hours:
+ delta_items.pop(0)
+ delta_items.append("{}s".format(delta.seconds % 60))
+ if not minutes:
+ delta_items.pop(0)
+
+ return " ".join(delta_items)
+
+ def get_items(self):
+ items = []
+ last_update = "N/A"
+ if self.last_update:
+ delta = datetime.datetime.now() - self.last_update
+ last_update = "{} ago".format(
+ self.get_delta_string(delta)
+ )
+
+ last_update = "Updated: {}".format(last_update)
+ items.append({
+ "type": "label",
+ "value": "#{}".format(self.label)
+ })
+ items.append({
+ "type": "label",
+ "value": "##{}".format(last_update)
+ })
+
+ if not self.info:
+ if self.info is None:
+ trigger_info_get()
+ items.append(self.default_item)
+ return items
+
+ info = {}
+ for key, value in self.info.items():
+ if key not in ["created_at:", "created_at"]:
+ info[key] = value
+ continue
+
+ datetime_value = datetime.datetime.strptime(
+ value, "%Y.%m.%d %H:%M:%S"
+ )
+ delta = datetime.datetime.now() - datetime_value
+
+ running_for = self.get_delta_string(delta)
+ info["Started at"] = "{} [running: {}]".format(value, running_for)
+
+ for key, value in info.items():
+ items.append({
+ "type": "label",
+ "value": "{}: {}".format(key, value)
+ })
+
+ return items
+
+
+class StatusFactory:
+
+ note_item = {
+ "type": "label",
+ "value": (
+ "HINT: To refresh data uncheck"
+ " all checkboxes and hit `Submit` button."
+ )
+ }
+ splitter_item = {
+ "type": "label",
+ "value": "---"
+ }
+
+ def __init__(self, statuses={}):
+ self.statuses = []
+ for status in statuses.items():
+ self.create_status(*status)
+
+ def __getitem__(self, key):
+ return self.get(key)
+
+ def get(self, key, default=None):
+ for status in self.statuses:
+ if status.name == key:
+ return status
+ return default
+
+ def is_filled(self):
+ for status in self.statuses:
+ if status.info is None:
+ return False
+ return True
+
+ def create_status(self, name, label):
+ new_status = Status(name, label, self)
+ self.statuses.append(new_status)
+
+ def process_event_result(self, event):
+ subprocess_id = event["data"].get("subprocess_id")
+ if subprocess_id != os.environ["FTRACK_EVENT_SUB_ID"]:
+ return
+
+ source = event["data"]["source"]
+ data = event["data"]["status_info"]
+
+ self.update_status_info(source, data)
+
+ def update_status_info(self, process_name, info):
+ for status in self.statuses:
+ if status.name == process_name:
+ status.update(info)
+ break
+
+ def bool_items(self):
+ items = []
+ items.append({
+ "type": "label",
+ "value": "#Restart process"
+ })
+ items.append({
+ "type": "label",
+ "value": (
+ "WARNING: Main process may shut down when checked"
+ " if does not run as a service!"
+ )
+ })
+
+ name_labels = {}
+ for status in self.statuses:
+ name_labels[status.name] = status.label
+
+ for name, label in name_labels.items():
+ items.append({
+ "type": "boolean",
+ "value": False,
+ "label": label,
+ "name": name
+ })
+ return items
+
+ def items(self):
+ items = []
+ items.append(self.note_item)
+ items.extend(self.bool_items())
+
+ for status in self.statuses:
+ items.append(self.splitter_item)
+ items.extend(status.get_items())
+
+ return items
+
+
+def server_activity_validate_user(event):
+ """Validate user permissions to show server info."""
+ session = ObjectFactory.session
+
+ username = event["source"].get("user", {}).get("username")
+ if not username:
+ return False
+
+ user_ent = session.query(
+ "User where username = \"{}\"".format(username)
+ ).first()
+ if not user_ent:
+ return False
+
+ role_list = ["Pypeclub", "Administrator"]
+ for role in user_ent["user_security_roles"]:
+ if role["security_role"]["name"] in role_list:
+ return True
+ return False
+
+
+def server_activity_discover(event):
+ """Discover action in actions menu conditions."""
+ session = ObjectFactory.session
+ if session is None:
+ return
+
+ if not server_activity_validate_user(event):
+ return
+
+ return {"items": [action_data]}
+
+
+def server_activity(event):
+ session = ObjectFactory.session
+ if session is None:
+ msg = "Session is not set. Can't trigger Reset action."
+ log.warning(msg)
+ return {
+ "success": False,
+ "message": msg
+ }
+
+ if not server_activity_validate_user(event):
+ return {
+ "success": False,
+ "message": "You don't have permissions to see Event server status!"
+ }
+
+ values = event["data"].get("values") or {}
+ is_checked = False
+ for value in values.values():
+ if value:
+ is_checked = True
+ break
+
+ if not is_checked:
+ return {
+ "items": ObjectFactory.status_factory.items(),
+ "title": "Server current status"
+ }
+
+ session = ObjectFactory.session
+ if values["main"]:
+ session.event_hub.sock.sendall(b"RestartM")
+ return
+
+ if values["storer"]:
+ session.event_hub.sock.sendall(b"RestartS")
+
+ if values["processor"]:
+ session.event_hub.sock.sendall(b"RestartP")
+
+
+def trigger_info_get():
+ if ObjectFactory.last_trigger:
+ delta = datetime.datetime.now() - ObjectFactory.last_trigger
+ if delta.seconds() < 5:
+ return
+
+ session = ObjectFactory.session
+ session.event_hub.publish(
+ ftrack_api.event.base.Event(
+ topic=TOPIC_STATUS_SERVER,
+ data={"subprocess_id": os.environ["FTRACK_EVENT_SUB_ID"]}
+ ),
+ on_error="ignore"
+ )
+
+
+def on_start(event):
+ session = ObjectFactory.session
+ source_id = event.get("source", {}).get("id")
+ if not source_id or source_id != session.event_hub.id:
+ return
+
+ if session is None:
+ log.warning("Session is not set. Can't trigger Sync to avalon action.")
+ return True
+ trigger_info_get()
+
+
+def register(session):
+ '''Registers the event, subscribing the discover and launch topics.'''
+ session.event_hub.subscribe(
+ "topic=ftrack.action.discover",
+ server_activity_discover
+ )
+ session.event_hub.subscribe("topic=pype.status.started", on_start)
+
+ status_launch_subscription = (
+ "topic=ftrack.action.launch and data.actionIdentifier={}"
+ ).format(action_identifier)
+
+ session.event_hub.subscribe(
+ status_launch_subscription,
+ server_activity
+ )
+
+ session.event_hub.subscribe(
+ "topic={}".format(TOPIC_STATUS_SERVER_RESULT),
+ ObjectFactory.status_factory.process_event_result
+ )
+
+
+def heartbeat():
+ if ObjectFactory.status_factory.is_filled():
+ return
+
+ trigger_info_get()
+
+
+def main(args):
+ port = int(args[-1])
+ server_info = json.loads(args[-2])
+
+ # Create a TCP/IP socket
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+
+ # Connect the socket to the port where the server is listening
+ server_address = ("localhost", port)
+ log.debug("Statuser connected to {} port {}".format(*server_address))
+ sock.connect(server_address)
+ sock.sendall(b"CreatedStatus")
+ # store socket connection object
+ ObjectFactory.sock = sock
+
+ statuse_names = {
+ "main": "Main process",
+ "storer": "Event Storer",
+ "processor": "Event Processor"
+ }
+
+ ObjectFactory.status_factory = StatusFactory(statuse_names)
+ ObjectFactory.status_factory["main"].update(server_info)
+ _returncode = 0
+ try:
+ session = SocketSession(
+ auto_connect_event_hub=True, sock=sock, Eventhub=StatusEventHub
+ )
+ ObjectFactory.session = session
+ session.event_hub.heartbeat_callbacks.append(heartbeat)
+ register(session)
+ server = FtrackServer("event")
+ log.debug("Launched Ftrack Event statuser")
+
+ server.run_server(session, load_files=False)
+
+ except Exception:
+ _returncode = 1
+ log.error("ServerInfo subprocess crashed", exc_info=True)
+
+ finally:
+ log.debug("Ending. Closing socket.")
+ sock.close()
+ return _returncode
+
+
+class OutputChecker(threading.Thread):
+ read_input = True
+
+ def run(self):
+ while self.read_input:
+ for line in sys.stdin:
+ line = line.rstrip().lower()
+ if not line.startswith("reset:"):
+ continue
+ process_name = line.replace("reset:", "")
+
+ ObjectFactory.status_factory.update_status_info(
+ process_name, None
+ )
+
+ def stop(self):
+ self.read_input = False
+
+
+if __name__ == "__main__":
+ # Register interupt signal
+ def signal_handler(sig, frame):
+ print("You pressed Ctrl+C. Process ended.")
+ ObjectFactory.checker_thread.stop()
+ sys.exit(0)
+
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGTERM, signal_handler)
+
+ checker_thread = OutputChecker()
+ ObjectFactory.checker_thread = checker_thread
+ checker_thread.start()
+
+ sys.exit(main(sys.argv))
diff --git a/pype/ftrack/ftrack_server/sub_event_storer.py b/pype/ftrack/ftrack_server/sub_event_storer.py
index dfe8e21654..b4b9b8a7ab 100644
--- a/pype/ftrack/ftrack_server/sub_event_storer.py
+++ b/pype/ftrack/ftrack_server/sub_event_storer.py
@@ -8,14 +8,15 @@ import pymongo
import ftrack_api
from ftrack_server import FtrackServer
from pype.ftrack.ftrack_server.lib import (
+ SocketSession, StorerEventHub,
get_ftrack_event_mongo_info,
- SocketSession,
- StorerEventHub
+ TOPIC_STATUS_SERVER, TOPIC_STATUS_SERVER_RESULT
)
from pype.ftrack.lib.custom_db_connector import DbConnector
from pypeapp import Logger
log = Logger().get_logger("Event storer")
+subprocess_started = datetime.datetime.now()
class SessionFactory:
@@ -138,11 +139,42 @@ def trigger_sync(event):
)
+def send_status(event):
+ session = SessionFactory.session
+ if not session:
+ return
+
+ subprocess_id = event["data"].get("subprocess_id")
+ if not subprocess_id:
+ return
+
+ if subprocess_id != os.environ["FTRACK_EVENT_SUB_ID"]:
+ return
+
+ new_event_data = {
+ "subprocess_id": os.environ["FTRACK_EVENT_SUB_ID"],
+ "source": "storer",
+ "status_info": {
+ "created_at": subprocess_started.strftime("%Y.%m.%d %H:%M:%S")
+ }
+ }
+
+ new_event = ftrack_api.event.base.Event(
+ topic=TOPIC_STATUS_SERVER_RESULT,
+ data=new_event_data
+ )
+
+ session.event_hub.publish(new_event)
+
+
def register(session):
'''Registers the event, subscribing the discover and launch topics.'''
install_db()
session.event_hub.subscribe("topic=*", launch)
session.event_hub.subscribe("topic=pype.storer.started", trigger_sync)
+ session.event_hub.subscribe(
+ "topic={}".format(TOPIC_STATUS_SERVER), send_status
+ )
def main(args):
diff --git a/pype/ftrack/ftrack_server/sub_user_server.py b/pype/ftrack/ftrack_server/sub_user_server.py
index f0d39447a8..8c1497a562 100644
--- a/pype/ftrack/ftrack_server/sub_user_server.py
+++ b/pype/ftrack/ftrack_server/sub_user_server.py
@@ -5,7 +5,7 @@ import socket
import traceback
from ftrack_server import FtrackServer
-from pype.ftrack.ftrack_server.lib import SocketSession, UserEventHub
+from pype.ftrack.ftrack_server.lib import SocketSession, SocketBaseEventHub
from pypeapp import Logger
@@ -28,7 +28,7 @@ def main(args):
try:
session = SocketSession(
- auto_connect_event_hub=True, sock=sock, Eventhub=UserEventHub
+ auto_connect_event_hub=True, sock=sock, Eventhub=SocketBaseEventHub
)
server = FtrackServer("action")
log.debug("Launched User Ftrack Server")