From dc7a1042145b9eb3ee5c20424b8c352a3e13affb Mon Sep 17 00:00:00 2001 From: "petr.kalis" Date: Fri, 10 Jul 2020 11:15:46 +0200 Subject: [PATCH] Added proper shutdown --- .../websocket_server/websocket_server.py | 98 ++++++++++++------- 1 file changed, 65 insertions(+), 33 deletions(-) diff --git a/pype/modules/websocket_server/websocket_server.py b/pype/modules/websocket_server/websocket_server.py index 8c8cb7be67..87f9bde8e0 100644 --- a/pype/modules/websocket_server/websocket_server.py +++ b/pype/modules/websocket_server/websocket_server.py @@ -1,9 +1,8 @@ from pype.api import config, Logger import threading -from aiohttp import web, WSCloseCode +from aiohttp import web import asyncio -import weakref from wsrpc_aiohttp import STATIC_DIR, WebSocketAsync import os @@ -40,7 +39,6 @@ class WebSocketServer(): ).format(str(self.presets))) self.app = web.Application() - self.app["websockets"] = weakref.WeakSet() self.app.router.add_route("*", "/ws/", WebSocketAsync) self.app.router.add_static("/js", STATIC_DIR) @@ -50,8 +48,6 @@ class WebSocketServer(): directories_with_routes = ['hosts'] self.add_routes_for_directories(directories_with_routes) - self.app.on_shutdown.append(self.on_shutdown) - self.websocket_thread = WebsocketServerThread(self, default_port) def add_routes_for_directories(self, directories_with_routes): @@ -85,38 +81,33 @@ class WebSocketServer(): def tray_start(self): self.websocket_thread.start() - # log.info("Starting websocket server") - # loop = asyncio.get_event_loop() - # self.runner = web.AppRunner(self.app) - # loop.run_until_complete(self.runner.setup()) - # self.site = web.TCPSite(self.runner, 'localhost', 8044) - # loop.run_until_complete(self.site.start()) - # log.info('site {}'.format(self.site._server)) - # asyncio.ensure_future() - # #loop.run_forever() - # #web.run_app(self.app, port=8044) - # log.info("Started websocket server") + def tray_exit(self): + self.stop() + + def stop_websocket_server(self): + + self.stop() @property def is_running(self): return self.websocket_thread.is_running def stop(self): - self.websocket_thread.is_running = False + if not self.is_running: + return + try: + log.debug("Stopping websocket server") + self.websocket_thread.is_running = False + self.websocket_thread.stop() + except Exception: + log.warning( + "Error has happened during Killing websocket server", + exc_info=True + ) def thread_stopped(self): self._is_running = False - async def on_shutdown(self): - """ - Gracefully remove all connected websocket connections - :return: None - """ - log.info('Shutting down websocket server') - for ws in set(self.app['websockets']): - await ws.close(code=WSCloseCode.GOING_AWAY, - message='Server shutdown') - class WebsocketServerThread(threading.Thread): """ Listener for websocket rpc requests. @@ -130,26 +121,67 @@ class WebsocketServerThread(threading.Thread): self.is_running = False self.port = port self.module = module + self.loop = None + self.runner = None + self.site = None def run(self): self.is_running = True try: + log.info("Starting websocket server") + self.loop = asyncio.new_event_loop() # create new loop for thread + asyncio.set_event_loop(self.loop) + + self.loop.run_until_complete(self.start_server()) + log.debug( "Running Websocket server on URL:" " \"ws://localhost:{}\"".format(self.port) ) - log.info("Starting websocket server") - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - web.run_app(self.module.app, port=self.port) # blocking - log.info("Started websocket server") - + asyncio.ensure_future(self.check_shutdown(), loop=self.loop) + self.loop.run_forever() except Exception: log.warning( "Websocket Server service has failed", exc_info=True ) + finally: + self.loop.close() # optional self.is_running = False self.module.thread_stopped() + log.info("Websocket server stopped") + + async def start_server(self): + """ Starts runner and TCPsite """ + self.runner = web.AppRunner(self.module.app) + await self.runner.setup() + self.site = web.TCPSite(self.runner, 'localhost', self.port) + await self.site.start() + + def stop(self): + """ Sets is_running flag to false, 'check_shutdown' shuts server down""" + self.is_running = False + + async def check_shutdown(self): + """ Future that is running and checks if server should be running + periodically. + """ + while self.is_running: + await asyncio.sleep(0.5) + + log.debug("Starting shutdown") + await self.site.stop() + log.debug("Site stopped") + await self.runner.cleanup() + log.debug("Runner stopped") + tasks = [task for task in asyncio.all_tasks() if + task is not asyncio.current_task()] + list(map(lambda task: task.cancel(), tasks)) # cancel all the tasks + results = await asyncio.gather(*tasks, return_exceptions=True) + log.debug(f'Finished awaiting cancelled tasks, results: {results}...') + await self.loop.shutdown_asyncgens() + # to really make sure everything else has time to stop + await asyncio.sleep(0.07) + self.loop.stop()