diff --git a/openpype/modules/default_modules/job_queue/job_server/utils.py b/openpype/modules/default_modules/job_queue/job_server/utils.py index 09d401a9c2..127ca5f090 100644 --- a/openpype/modules/default_modules/job_queue/job_server/utils.py +++ b/openpype/modules/default_modules/job_queue/job_server/utils.py @@ -25,7 +25,6 @@ def main(port=None, host=None): port = int(port or 8079) host = str(host or "localhost") - print(host, port) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as con: result_of_check = con.connect_ex((host, port)) diff --git a/openpype/modules/default_modules/job_queue/job_workers/base_worker.py b/openpype/modules/default_modules/job_queue/job_workers/base_worker.py index 9469a4d305..2336f91be2 100644 --- a/openpype/modules/default_modules/job_queue/job_workers/base_worker.py +++ b/openpype/modules/default_modules/job_queue/job_workers/base_worker.py @@ -81,12 +81,12 @@ class WorkerJobsConnection: else: self.client.finish_job(success, message, data) - async def main_loop(self): + async def main_loop(self, register_worker=True): self._is_running = True while not self._stopped: start_time = datetime.datetime.now() - await self._connection_loop() + await self._connection_loop(register_worker) delta = datetime.datetime.now() - start_time print("Connection loop took {}s".format(str(delta))) # Check if was stopped and stop while loop in that case @@ -111,7 +111,7 @@ class WorkerJobsConnection: except Exception: traceback.print_exception(*sys.exc_info()) - async def _connection_loop(self): + async def _connection_loop(self, register_worker): self._connecting = True future = asyncio.run_coroutine_threadsafe( self._connect(), loop=self._loop @@ -143,13 +143,10 @@ class WorkerJobsConnection: self.client = None return - worker_id = await self.client.call( - "register_worker", [self._host_name] - ) - self.client.set_id(worker_id) - print( - "Registered as worker with id {}".format(worker_id) - ) + print("Connected to job queue server") + if register_worker: + self.register_as_worker() + while self._connected and self._loop.is_running(): if self._stopped or ws.closed: break @@ -158,6 +155,18 @@ class WorkerJobsConnection: await self._stop_cleanup() + def register_as_worker(self): + asyncio.ensure_future(self._register_as_worker(), loop=self._loop) + + async def _register_as_worker(self): + worker_id = await self.client.call( + "register_worker", [self._host_name] + ) + self.client.set_id(worker_id) + print( + "Registered as worker with id {}".format(worker_id) + ) + async def disconnect(self): await self._stop_cleanup() diff --git a/openpype/modules/default_modules/job_queue/job_workers/tvpaint_worker.py b/openpype/modules/default_modules/job_queue/job_workers/tvpaint_worker.py index 74bce1c47f..463ec7cc99 100644 --- a/openpype/modules/default_modules/job_queue/job_workers/tvpaint_worker.py +++ b/openpype/modules/default_modules/job_queue/job_workers/tvpaint_worker.py @@ -24,11 +24,16 @@ class WorkerCommunicator(BaseCommunicator): self._server_url, "tvpaint", loop ) asyncio.ensure_future( - self._worker_connection.main_loop(), loop=loop + self._worker_connection.main_loop(register_worker=False), + loop=loop ) super()._start_webserver() + def _on_client_connect(self, *args, **kwargs): + super()._on_client_connect(*args, **kwargs) + self._worker_connection.register_as_worker() + def stop(self): self._worker_connection.stop() self.return_code = 0