Added proper shutdown

This commit is contained in:
petr.kalis 2020-07-10 11:15:46 +02:00
parent d02e9596a6
commit dc7a104214

View file

@ -1,9 +1,8 @@
from pype.api import config, Logger
import threading
from aiohttp import web, WSCloseCode
from aiohttp import web
import asyncio
import weakref
from wsrpc_aiohttp import STATIC_DIR, WebSocketAsync
import os
@ -40,7 +39,6 @@ class WebSocketServer():
).format(str(self.presets)))
self.app = web.Application()
self.app["websockets"] = weakref.WeakSet()
self.app.router.add_route("*", "/ws/", WebSocketAsync)
self.app.router.add_static("/js", STATIC_DIR)
@ -50,8 +48,6 @@ class WebSocketServer():
directories_with_routes = ['hosts']
self.add_routes_for_directories(directories_with_routes)
self.app.on_shutdown.append(self.on_shutdown)
self.websocket_thread = WebsocketServerThread(self, default_port)
def add_routes_for_directories(self, directories_with_routes):
@ -85,38 +81,33 @@ class WebSocketServer():
def tray_start(self):
self.websocket_thread.start()
# log.info("Starting websocket server")
# loop = asyncio.get_event_loop()
# self.runner = web.AppRunner(self.app)
# loop.run_until_complete(self.runner.setup())
# self.site = web.TCPSite(self.runner, 'localhost', 8044)
# loop.run_until_complete(self.site.start())
# log.info('site {}'.format(self.site._server))
# asyncio.ensure_future()
# #loop.run_forever()
# #web.run_app(self.app, port=8044)
# log.info("Started websocket server")
def tray_exit(self):
self.stop()
def stop_websocket_server(self):
self.stop()
@property
def is_running(self):
return self.websocket_thread.is_running
def stop(self):
self.websocket_thread.is_running = False
if not self.is_running:
return
try:
log.debug("Stopping websocket server")
self.websocket_thread.is_running = False
self.websocket_thread.stop()
except Exception:
log.warning(
"Error has happened during Killing websocket server",
exc_info=True
)
def thread_stopped(self):
self._is_running = False
async def on_shutdown(self):
"""
Gracefully remove all connected websocket connections
:return: None
"""
log.info('Shutting down websocket server')
for ws in set(self.app['websockets']):
await ws.close(code=WSCloseCode.GOING_AWAY,
message='Server shutdown')
class WebsocketServerThread(threading.Thread):
""" Listener for websocket rpc requests.
@ -130,26 +121,67 @@ class WebsocketServerThread(threading.Thread):
self.is_running = False
self.port = port
self.module = module
self.loop = None
self.runner = None
self.site = None
def run(self):
self.is_running = True
try:
log.info("Starting websocket server")
self.loop = asyncio.new_event_loop() # create new loop for thread
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.start_server())
log.debug(
"Running Websocket server on URL:"
" \"ws://localhost:{}\"".format(self.port)
)
log.info("Starting websocket server")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
web.run_app(self.module.app, port=self.port) # blocking
log.info("Started websocket server")
asyncio.ensure_future(self.check_shutdown(), loop=self.loop)
self.loop.run_forever()
except Exception:
log.warning(
"Websocket Server service has failed", exc_info=True
)
finally:
self.loop.close() # optional
self.is_running = False
self.module.thread_stopped()
log.info("Websocket server stopped")
async def start_server(self):
""" Starts runner and TCPsite """
self.runner = web.AppRunner(self.module.app)
await self.runner.setup()
self.site = web.TCPSite(self.runner, 'localhost', self.port)
await self.site.start()
def stop(self):
""" Sets is_running flag to false, 'check_shutdown' shuts server down"""
self.is_running = False
async def check_shutdown(self):
""" Future that is running and checks if server should be running
periodically.
"""
while self.is_running:
await asyncio.sleep(0.5)
log.debug("Starting shutdown")
await self.site.stop()
log.debug("Site stopped")
await self.runner.cleanup()
log.debug("Runner stopped")
tasks = [task for task in asyncio.all_tasks() if
task is not asyncio.current_task()]
list(map(lambda task: task.cancel(), tasks)) # cancel all the tasks
results = await asyncio.gather(*tasks, return_exceptions=True)
log.debug(f'Finished awaiting cancelled tasks, results: {results}...')
await self.loop.shutdown_asyncgens()
# to really make sure everything else has time to stop
await asyncio.sleep(0.07)
self.loop.stop()