diff --git a/openpype/modules/default_modules/job_queue/job_workers/base_worker.py b/openpype/modules/default_modules/job_queue/job_workers/base_worker.py index 3e9dbbfd7f..5dc1df5eb0 100644 --- a/openpype/modules/default_modules/job_queue/job_workers/base_worker.py +++ b/openpype/modules/default_modules/job_queue/job_workers/base_worker.py @@ -1,6 +1,7 @@ import sys import datetime import asyncio +import traceback from aiohttp_json_rpc import JsonRpcClient @@ -83,7 +84,7 @@ class WorkerJobsConnection: start_time = datetime.datetime.now() await self._connection_loop() delta = datetime.datetime.now() - start_time - print("Client was connected {}".format(str(delta))) + print("Connection loop took {}s".format(str(delta))) # Check if was stopped and stop while loop in that case if self._stopped: break @@ -99,18 +100,25 @@ class WorkerJobsConnection: async def _connect(self): self.client = WorkerClient() print("Connecting to {}".format(self._server_url)) - await self.client.connect_url(self._server_url) + try: + await self.client.connect_url(self._server_url) + except KeyboardInterrupt: + raise + except Exception: + traceback.print_exception(*sys.exc_info()) + async def _connection_loop(self): self._connecting = True - asyncio.run_coroutine_threadsafe( + future = asyncio.run_coroutine_threadsafe( self._connect(), loop=self._loop ) while self._connecting: - if self.client is None: + if not future.done(): await asyncio.sleep(0.07) continue + session = getattr(self.client, "_session", None) ws = getattr(self.client, "_ws", None) if session is not None: @@ -155,6 +163,9 @@ class WorkerJobsConnection: await self._stop_cleanup() + async def disconnect(self): + await self._stop_cleanup() + async def _stop_cleanup(self): print("Cleanup after stop") if self.client is not None and hasattr(self.client, "_ws"):