Merged in feature/PYPE-713_event_server_status (pull request #478)

Feature/PYPE-713 event server status

Approved-by: Milan Kolar <milan@orbi.tools>
This commit is contained in:
Jakub Trllo 2020-02-17 12:01:04 +00:00 committed by Milan Kolar
commit 2eaf1e5cb5
7 changed files with 718 additions and 49 deletions

View file

@ -4,9 +4,13 @@ import signal
import datetime
import subprocess
import socket
import json
import platform
import argparse
import getpass
import atexit
import time
import uuid
import ftrack_api
from pype.ftrack.lib import credentials
@ -175,6 +179,7 @@ def main_loop(ftrack_url):
otherwise thread will be killed.
"""
os.environ["FTRACK_EVENT_SUB_ID"] = str(uuid.uuid1())
# Get mongo hostname and port for testing mongo connection
mongo_list = ftrack_events_mongo_settings()
mongo_hostname = mongo_list[0]
@ -202,6 +207,13 @@ def main_loop(ftrack_url):
processor_last_failed = datetime.datetime.now()
processor_failed_count = 0
statuser_name = "StorerThread"
statuser_port = 10021
statuser_path = "{}/sub_event_status.py".format(file_path)
statuser_thread = None
statuser_last_failed = datetime.datetime.now()
statuser_failed_count = 0
ftrack_accessible = False
mongo_accessible = False
@ -210,7 +222,7 @@ def main_loop(ftrack_url):
# stop threads on exit
# TODO check if works and args have thread objects!
def on_exit(processor_thread, storer_thread):
def on_exit(processor_thread, storer_thread, statuser_thread):
if processor_thread is not None:
processor_thread.stop()
processor_thread.join()
@ -221,9 +233,27 @@ def main_loop(ftrack_url):
storer_thread.join()
storer_thread = None
if statuser_thread is not None:
statuser_thread.stop()
statuser_thread.join()
statuser_thread = None
atexit.register(
on_exit, processor_thread=processor_thread, storer_thread=storer_thread
on_exit,
processor_thread=processor_thread,
storer_thread=storer_thread,
statuser_thread=statuser_thread
)
system_name, pc_name = platform.uname()[:2]
host_name = socket.gethostname()
main_info = {
"created_at": datetime.datetime.now().strftime("%Y.%m.%d %H:%M:%S"),
"Username": getpass.getuser(),
"Host Name": host_name,
"Host IP": socket.gethostbyname(host_name)
}
main_info_str = json.dumps(main_info)
# Main loop
while True:
# Check if accessible Ftrack and Mongo url
@ -261,6 +291,52 @@ def main_loop(ftrack_url):
printed_ftrack_error = False
printed_mongo_error = False
# ====== STATUSER =======
if statuser_thread is None:
if statuser_failed_count < max_fail_count:
statuser_thread = socket_thread.StatusSocketThread(
statuser_name, statuser_port, statuser_path,
[main_info_str]
)
statuser_thread.start()
elif statuser_failed_count == max_fail_count:
print((
"Statuser failed {}times in row"
" I'll try to run again {}s later"
).format(str(max_fail_count), str(wait_time_after_max_fail)))
statuser_failed_count += 1
elif ((
datetime.datetime.now() - statuser_last_failed
).seconds > wait_time_after_max_fail):
statuser_failed_count = 0
# If thread failed test Ftrack and Mongo connection
elif not statuser_thread.isAlive():
statuser_thread.join()
statuser_thread = None
ftrack_accessible = False
mongo_accessible = False
_processor_last_failed = datetime.datetime.now()
delta_time = (
_processor_last_failed - statuser_last_failed
).seconds
if delta_time < min_fail_seconds:
statuser_failed_count += 1
else:
statuser_failed_count = 0
statuser_last_failed = _processor_last_failed
elif statuser_thread.stop_subprocess:
print("Main process was stopped by action")
on_exit(processor_thread, storer_thread, statuser_thread)
os.kill(os.getpid(), signal.SIGTERM)
return 1
# ====== STORER =======
# Run backup thread which does not requeire mongo to work
if storer_thread is None:
if storer_failed_count < max_fail_count:
@ -268,6 +344,7 @@ def main_loop(ftrack_url):
storer_name, storer_port, storer_path
)
storer_thread.start()
elif storer_failed_count == max_fail_count:
print((
"Storer failed {}times I'll try to run again {}s later"
@ -295,6 +372,7 @@ def main_loop(ftrack_url):
storer_failed_count = 0
storer_last_failed = _storer_last_failed
# ====== PROCESSOR =======
if processor_thread is None:
if processor_failed_count < max_fail_count:
processor_thread = socket_thread.SocketThread(
@ -336,6 +414,10 @@ def main_loop(ftrack_url):
processor_failed_count = 0
processor_last_failed = _processor_last_failed
if statuser_thread is not None:
statuser_thread.set_process("storer", storer_thread)
statuser_thread.set_process("processor", processor_thread)
time.sleep(1)

View file

@ -28,6 +28,10 @@ from pypeapp import Logger
from pype.ftrack.lib.custom_db_connector import DbConnector
TOPIC_STATUS_SERVER = "pype.event.server.status"
TOPIC_STATUS_SERVER_RESULT = "pype.event.server.status.result"
def ftrack_events_mongo_settings():
host = None
port = None
@ -123,20 +127,59 @@ def check_ftrack_url(url, log_errors=True):
return url
class StorerEventHub(ftrack_api.event.hub.EventHub):
class SocketBaseEventHub(ftrack_api.event.hub.EventHub):
hearbeat_msg = b"hearbeat"
heartbeat_callbacks = []
def __init__(self, *args, **kwargs):
self.sock = kwargs.pop("sock")
super(StorerEventHub, self).__init__(*args, **kwargs)
super(SocketBaseEventHub, self).__init__(*args, **kwargs)
def _handle_packet(self, code, packet_identifier, path, data):
"""Override `_handle_packet` which extend heartbeat"""
code_name = self._code_name_mapping[code]
if code_name == "heartbeat":
# Reply with heartbeat.
self.sock.sendall(b"storer")
return self._send_packet(self._code_name_mapping['heartbeat'])
for callback in self.heartbeat_callbacks:
callback()
elif code_name == "connect":
self.sock.sendall(self.hearbeat_msg)
return self._send_packet(self._code_name_mapping["heartbeat"])
return super(SocketBaseEventHub, self)._handle_packet(
code, packet_identifier, path, data
)
class StatusEventHub(SocketBaseEventHub):
def _handle_packet(self, code, packet_identifier, path, data):
"""Override `_handle_packet` which extend heartbeat"""
code_name = self._code_name_mapping[code]
if code_name == "connect":
event = ftrack_api.event.base.Event(
topic="pype.status.started",
data={},
source={
"id": self.id,
"user": {"username": self._api_user}
}
)
self._event_queue.put(event)
return super(StatusEventHub, self)._handle_packet(
code, packet_identifier, path, data
)
class StorerEventHub(SocketBaseEventHub):
hearbeat_msg = b"storer"
def _handle_packet(self, code, packet_identifier, path, data):
"""Override `_handle_packet` which extend heartbeat"""
code_name = self._code_name_mapping[code]
if code_name == "connect":
event = ftrack_api.event.base.Event(
topic="pype.storer.started",
data={},
@ -152,7 +195,9 @@ class StorerEventHub(ftrack_api.event.hub.EventHub):
)
class ProcessEventHub(ftrack_api.event.hub.EventHub):
class ProcessEventHub(SocketBaseEventHub):
hearbeat_msg = b"processor"
url, database, table_name = get_ftrack_event_mongo_info()
is_table_created = False
@ -164,7 +209,6 @@ class ProcessEventHub(ftrack_api.event.hub.EventHub):
database_name=self.database,
table_name=self.table_name
)
self.sock = kwargs.pop("sock")
super(ProcessEventHub, self).__init__(*args, **kwargs)
def prepare_dbcon(self):
@ -260,42 +304,10 @@ class ProcessEventHub(ftrack_api.event.hub.EventHub):
code_name = self._code_name_mapping[code]
if code_name == "event":
return
if code_name == "heartbeat":
self.sock.sendall(b"processor")
return self._send_packet(self._code_name_mapping["heartbeat"])
return super()._handle_packet(code, packet_identifier, path, data)
class UserEventHub(ftrack_api.event.hub.EventHub):
def __init__(self, *args, **kwargs):
self.sock = kwargs.pop("sock")
super(UserEventHub, self).__init__(*args, **kwargs)
def _handle_packet(self, code, packet_identifier, path, data):
"""Override `_handle_packet` which extend heartbeat"""
code_name = self._code_name_mapping[code]
if code_name == "heartbeat":
# Reply with heartbeat.
self.sock.sendall(b"hearbeat")
return self._send_packet(self._code_name_mapping['heartbeat'])
elif code_name == "connect":
event = ftrack_api.event.base.Event(
topic="pype.storer.started",
data={},
source={
"id": self.id,
"user": {"username": self._api_user}
}
)
self._event_queue.put(event)
return super(UserEventHub, self)._handle_packet(
code, packet_identifier, path, data
)
class SocketSession(ftrack_api.session.Session):
'''An isolated session for interaction with an ftrack server.'''
def __init__(

View file

@ -3,6 +3,7 @@ import sys
import time
import socket
import threading
import traceback
import subprocess
from pypeapp import Logger
@ -12,13 +13,15 @@ class SocketThread(threading.Thread):
MAX_TIMEOUT = 35
def __init__(self, name, port, filepath):
def __init__(self, name, port, filepath, additional_args=[]):
super(SocketThread, self).__init__()
self.log = Logger().get_logger("SocketThread", "Event Thread")
self.log = Logger().get_logger(self.__class__.__name__)
self.setName(name)
self.name = name
self.port = port
self.filepath = filepath
self.additional_args = additional_args
self.sock = None
self.subproc = None
self.connection = None
@ -53,7 +56,13 @@ class SocketThread(threading.Thread):
)
self.subproc = subprocess.Popen(
[sys.executable, self.filepath, "-port", str(self.port)]
[
sys.executable,
self.filepath,
*self.additional_args,
str(self.port)
],
stdin=subprocess.PIPE
)
# Listen for incoming connections
@ -127,3 +136,52 @@ class SocketThread(threading.Thread):
if data == b"MongoError":
self.mongo_error = True
connection.sendall(data)
class StatusSocketThread(SocketThread):
process_name_mapping = {
b"RestartS": "storer",
b"RestartP": "processor",
b"RestartM": "main"
}
def __init__(self, *args, **kwargs):
self.process_threads = {}
self.stop_subprocess = False
super(StatusSocketThread, self).__init__(*args, **kwargs)
def set_process(self, process_name, thread):
try:
if not self.subproc:
self.process_threads[process_name] = None
return
if (
process_name in self.process_threads and
self.process_threads[process_name] == thread
):
return
self.process_threads[process_name] = thread
self.subproc.stdin.write(
str.encode("reset:{}\r\n".format(process_name))
)
self.subproc.stdin.flush()
except Exception:
print("Could not set thread in StatusSocketThread")
traceback.print_exception(*sys.exc_info())
def _handle_data(self, connection, data):
if not data:
return
process_name = self.process_name_mapping.get(data)
if process_name:
if process_name == "main":
self.stop_subprocess = True
else:
subp = self.process_threads.get(process_name)
if subp:
subp.stop()
connection.sendall(data)

View file

@ -1,13 +1,59 @@
import os
import sys
import signal
import socket
import datetime
from ftrack_server import FtrackServer
from pype.ftrack.ftrack_server.lib import SocketSession, ProcessEventHub
from pype.ftrack.ftrack_server.lib import (
SocketSession, ProcessEventHub, TOPIC_STATUS_SERVER
)
import ftrack_api
from pypeapp import Logger
log = Logger().get_logger("Event processor")
subprocess_started = datetime.datetime.now()
class SessionFactory:
session = None
def send_status(event):
subprocess_id = event["data"].get("subprocess_id")
if not subprocess_id:
return
if subprocess_id != os.environ["FTRACK_EVENT_SUB_ID"]:
return
session = SessionFactory.session
if not session:
return
new_event_data = {
"subprocess_id": subprocess_id,
"source": "processor",
"status_info": {
"created_at": subprocess_started.strftime("%Y.%m.%d %H:%M:%S")
}
}
new_event = ftrack_api.event.base.Event(
topic="pype.event.server.status.result",
data=new_event_data
)
session.event_hub.publish(new_event)
def register(session):
'''Registers the event, subscribing the discover and launch topics.'''
session.event_hub.subscribe(
"topic={}".format(TOPIC_STATUS_SERVER), send_status
)
def main(args):
port = int(args[-1])
@ -24,6 +70,9 @@ def main(args):
session = SocketSession(
auto_connect_event_hub=True, sock=sock, Eventhub=ProcessEventHub
)
register(session)
SessionFactory.session = session
server = FtrackServer("event")
log.debug("Launched Ftrack Event processor")
server.run_server(session)

View file

@ -0,0 +1,436 @@
import os
import sys
import json
import threading
import signal
import socket
import datetime
import ftrack_api
from ftrack_server import FtrackServer
from pype.ftrack.ftrack_server.lib import (
SocketSession, StatusEventHub,
TOPIC_STATUS_SERVER, TOPIC_STATUS_SERVER_RESULT
)
from pypeapp import Logger, config
log = Logger().get_logger("Event storer")
action_identifier = (
"event.server.status" + os.environ["FTRACK_EVENT_SUB_ID"]
)
host_ip = socket.gethostbyname(socket.gethostname())
action_data = {
"label": "Pype Admin",
"variant": "- Event server Status ({})".format(host_ip),
"description": "Get Infromation about event server",
"actionIdentifier": action_identifier,
"icon": "{}/ftrack/action_icons/PypeAdmin.svg".format(
os.environ.get(
"PYPE_STATICS_SERVER",
"http://localhost:{}".format(
config.get_presets().get("services", {}).get(
"rest_api", {}
).get("default_port", 8021)
)
)
)
}
class ObjectFactory:
session = None
status_factory = None
checker_thread = None
last_trigger = None
class Status:
default_item = {
"type": "label",
"value": "Process info is not available at this moment."
}
def __init__(self, name, label, parent):
self.name = name
self.label = label or name
self.parent = parent
self.info = None
self.last_update = None
def update(self, info):
self.last_update = datetime.datetime.now()
self.info = info
def get_delta_string(self, delta):
days, hours, minutes = (
delta.days, delta.seconds // 3600, delta.seconds // 60 % 60
)
delta_items = [
"{}d".format(days),
"{}h".format(hours),
"{}m".format(minutes)
]
if not days:
delta_items.pop(0)
if not hours:
delta_items.pop(0)
delta_items.append("{}s".format(delta.seconds % 60))
if not minutes:
delta_items.pop(0)
return " ".join(delta_items)
def get_items(self):
items = []
last_update = "N/A"
if self.last_update:
delta = datetime.datetime.now() - self.last_update
last_update = "{} ago".format(
self.get_delta_string(delta)
)
last_update = "Updated: {}".format(last_update)
items.append({
"type": "label",
"value": "#{}".format(self.label)
})
items.append({
"type": "label",
"value": "##{}".format(last_update)
})
if not self.info:
if self.info is None:
trigger_info_get()
items.append(self.default_item)
return items
info = {}
for key, value in self.info.items():
if key not in ["created_at:", "created_at"]:
info[key] = value
continue
datetime_value = datetime.datetime.strptime(
value, "%Y.%m.%d %H:%M:%S"
)
delta = datetime.datetime.now() - datetime_value
running_for = self.get_delta_string(delta)
info["Started at"] = "{} [running: {}]".format(value, running_for)
for key, value in info.items():
items.append({
"type": "label",
"value": "<b>{}:</b> {}".format(key, value)
})
return items
class StatusFactory:
note_item = {
"type": "label",
"value": (
"<i>HINT: To refresh data uncheck"
" all checkboxes and hit `Submit` button.</i>"
)
}
splitter_item = {
"type": "label",
"value": "---"
}
def __init__(self, statuses={}):
self.statuses = []
for status in statuses.items():
self.create_status(*status)
def __getitem__(self, key):
return self.get(key)
def get(self, key, default=None):
for status in self.statuses:
if status.name == key:
return status
return default
def is_filled(self):
for status in self.statuses:
if status.info is None:
return False
return True
def create_status(self, name, label):
new_status = Status(name, label, self)
self.statuses.append(new_status)
def process_event_result(self, event):
subprocess_id = event["data"].get("subprocess_id")
if subprocess_id != os.environ["FTRACK_EVENT_SUB_ID"]:
return
source = event["data"]["source"]
data = event["data"]["status_info"]
self.update_status_info(source, data)
def update_status_info(self, process_name, info):
for status in self.statuses:
if status.name == process_name:
status.update(info)
break
def bool_items(self):
items = []
items.append({
"type": "label",
"value": "#Restart process"
})
items.append({
"type": "label",
"value": (
"<i><b>WARNING:</b> Main process may shut down when checked"
" if does not run as a service!</i>"
)
})
name_labels = {}
for status in self.statuses:
name_labels[status.name] = status.label
for name, label in name_labels.items():
items.append({
"type": "boolean",
"value": False,
"label": label,
"name": name
})
return items
def items(self):
items = []
items.append(self.note_item)
items.extend(self.bool_items())
for status in self.statuses:
items.append(self.splitter_item)
items.extend(status.get_items())
return items
def server_activity_validate_user(event):
"""Validate user permissions to show server info."""
session = ObjectFactory.session
username = event["source"].get("user", {}).get("username")
if not username:
return False
user_ent = session.query(
"User where username = \"{}\"".format(username)
).first()
if not user_ent:
return False
role_list = ["Pypeclub", "Administrator"]
for role in user_ent["user_security_roles"]:
if role["security_role"]["name"] in role_list:
return True
return False
def server_activity_discover(event):
"""Discover action in actions menu conditions."""
session = ObjectFactory.session
if session is None:
return
if not server_activity_validate_user(event):
return
return {"items": [action_data]}
def server_activity(event):
session = ObjectFactory.session
if session is None:
msg = "Session is not set. Can't trigger Reset action."
log.warning(msg)
return {
"success": False,
"message": msg
}
if not server_activity_validate_user(event):
return {
"success": False,
"message": "You don't have permissions to see Event server status!"
}
values = event["data"].get("values") or {}
is_checked = False
for value in values.values():
if value:
is_checked = True
break
if not is_checked:
return {
"items": ObjectFactory.status_factory.items(),
"title": "Server current status"
}
session = ObjectFactory.session
if values["main"]:
session.event_hub.sock.sendall(b"RestartM")
return
if values["storer"]:
session.event_hub.sock.sendall(b"RestartS")
if values["processor"]:
session.event_hub.sock.sendall(b"RestartP")
def trigger_info_get():
if ObjectFactory.last_trigger:
delta = datetime.datetime.now() - ObjectFactory.last_trigger
if delta.seconds() < 5:
return
session = ObjectFactory.session
session.event_hub.publish(
ftrack_api.event.base.Event(
topic=TOPIC_STATUS_SERVER,
data={"subprocess_id": os.environ["FTRACK_EVENT_SUB_ID"]}
),
on_error="ignore"
)
def on_start(event):
session = ObjectFactory.session
source_id = event.get("source", {}).get("id")
if not source_id or source_id != session.event_hub.id:
return
if session is None:
log.warning("Session is not set. Can't trigger Sync to avalon action.")
return True
trigger_info_get()
def register(session):
'''Registers the event, subscribing the discover and launch topics.'''
session.event_hub.subscribe(
"topic=ftrack.action.discover",
server_activity_discover
)
session.event_hub.subscribe("topic=pype.status.started", on_start)
status_launch_subscription = (
"topic=ftrack.action.launch and data.actionIdentifier={}"
).format(action_identifier)
session.event_hub.subscribe(
status_launch_subscription,
server_activity
)
session.event_hub.subscribe(
"topic={}".format(TOPIC_STATUS_SERVER_RESULT),
ObjectFactory.status_factory.process_event_result
)
def heartbeat():
if ObjectFactory.status_factory.is_filled():
return
trigger_info_get()
def main(args):
port = int(args[-1])
server_info = json.loads(args[-2])
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = ("localhost", port)
log.debug("Statuser connected to {} port {}".format(*server_address))
sock.connect(server_address)
sock.sendall(b"CreatedStatus")
# store socket connection object
ObjectFactory.sock = sock
statuse_names = {
"main": "Main process",
"storer": "Event Storer",
"processor": "Event Processor"
}
ObjectFactory.status_factory = StatusFactory(statuse_names)
ObjectFactory.status_factory["main"].update(server_info)
_returncode = 0
try:
session = SocketSession(
auto_connect_event_hub=True, sock=sock, Eventhub=StatusEventHub
)
ObjectFactory.session = session
session.event_hub.heartbeat_callbacks.append(heartbeat)
register(session)
server = FtrackServer("event")
log.debug("Launched Ftrack Event statuser")
server.run_server(session, load_files=False)
except Exception:
_returncode = 1
log.error("ServerInfo subprocess crashed", exc_info=True)
finally:
log.debug("Ending. Closing socket.")
sock.close()
return _returncode
class OutputChecker(threading.Thread):
read_input = True
def run(self):
while self.read_input:
for line in sys.stdin:
line = line.rstrip().lower()
if not line.startswith("reset:"):
continue
process_name = line.replace("reset:", "")
ObjectFactory.status_factory.update_status_info(
process_name, None
)
def stop(self):
self.read_input = False
if __name__ == "__main__":
# Register interupt signal
def signal_handler(sig, frame):
print("You pressed Ctrl+C. Process ended.")
ObjectFactory.checker_thread.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
checker_thread = OutputChecker()
ObjectFactory.checker_thread = checker_thread
checker_thread.start()
sys.exit(main(sys.argv))

View file

@ -8,14 +8,15 @@ import pymongo
import ftrack_api
from ftrack_server import FtrackServer
from pype.ftrack.ftrack_server.lib import (
SocketSession, StorerEventHub,
get_ftrack_event_mongo_info,
SocketSession,
StorerEventHub
TOPIC_STATUS_SERVER, TOPIC_STATUS_SERVER_RESULT
)
from pype.ftrack.lib.custom_db_connector import DbConnector
from pypeapp import Logger
log = Logger().get_logger("Event storer")
subprocess_started = datetime.datetime.now()
class SessionFactory:
@ -138,11 +139,42 @@ def trigger_sync(event):
)
def send_status(event):
session = SessionFactory.session
if not session:
return
subprocess_id = event["data"].get("subprocess_id")
if not subprocess_id:
return
if subprocess_id != os.environ["FTRACK_EVENT_SUB_ID"]:
return
new_event_data = {
"subprocess_id": os.environ["FTRACK_EVENT_SUB_ID"],
"source": "storer",
"status_info": {
"created_at": subprocess_started.strftime("%Y.%m.%d %H:%M:%S")
}
}
new_event = ftrack_api.event.base.Event(
topic=TOPIC_STATUS_SERVER_RESULT,
data=new_event_data
)
session.event_hub.publish(new_event)
def register(session):
'''Registers the event, subscribing the discover and launch topics.'''
install_db()
session.event_hub.subscribe("topic=*", launch)
session.event_hub.subscribe("topic=pype.storer.started", trigger_sync)
session.event_hub.subscribe(
"topic={}".format(TOPIC_STATUS_SERVER), send_status
)
def main(args):

View file

@ -5,7 +5,7 @@ import socket
import traceback
from ftrack_server import FtrackServer
from pype.ftrack.ftrack_server.lib import SocketSession, UserEventHub
from pype.ftrack.ftrack_server.lib import SocketSession, SocketBaseEventHub
from pypeapp import Logger
@ -28,7 +28,7 @@ def main(args):
try:
session = SocketSession(
auto_connect_event_hub=True, sock=sock, Eventhub=UserEventHub
auto_connect_event_hub=True, sock=sock, Eventhub=SocketBaseEventHub
)
server = FtrackServer("action")
log.debug("Launched User Ftrack Server")