diff --git a/openpype/hooks/pre_copy_last_published_workfile.py b/openpype/hooks/pre_copy_last_published_workfile.py index 7a835507f7..cefc7e5d40 100644 --- a/openpype/hooks/pre_copy_last_published_workfile.py +++ b/openpype/hooks/pre_copy_last_published_workfile.py @@ -1,14 +1,14 @@ -import gc import os import shutil +from time import sleep from openpype.client.entities import ( get_last_version_by_subset_id, get_representations, get_subsets, ) from openpype.lib import PreLaunchHook +from openpype.lib.local_settings import get_local_site_id from openpype.lib.profiles_filtering import filter_profiles -from openpype.modules.base import ModulesManager from openpype.pipeline.load.utils import get_representation_path from openpype.settings.lib import get_project_settings @@ -137,33 +137,37 @@ class CopyLastPublishedWorkfile(PreLaunchHook): ).format(task_name, host_name) return - # Get sync server from Tray, - # which handles the asynchronous thread instance - sync_server = next( - ( - t["sync_server"] - for t in [ - obj - for obj in gc.get_objects() - if isinstance(obj, ModulesManager) - ] - if t["sync_server"].sync_server_thread - ), - None, - ) + # POST to webserver sites to add to representations + webserver_url = os.environ.get("OPENPYPE_WEBSERVER_URL") + if not webserver_url: + self.log.warning("Couldn't find webserver url") + return - # Add site and reset timer - active_site = sync_server.get_active_site(project_name) - sync_server.add_site( - project_name, - workfile_representation["_id"], - active_site, - force=True, + entry_point_url = "{}/sync_server".format(webserver_url) + rest_api_url = "{}/add_sites_to_representations".format( + entry_point_url + ) + try: + import requests + except Exception: + self.log.warning( + "Couldn't add sites to representations ('requests' is not available)" + ) + return + + requests.post( + rest_api_url, + json={ + "project_name": project_name, + "sites": [get_local_site_id()], + "representations": [str(workfile_representation["_id"])], + }, ) - sync_server.reset_timer() # Wait for the download loop to end - sync_server.sync_server_thread.files_processed.wait() + rest_api_url = "{}/files_are_processed".format(entry_point_url) + while requests.get(rest_api_url).content: + sleep(5) # Get paths published_workfile_path = get_representation_path( diff --git a/openpype/modules/sync_server/rest_api.py b/openpype/modules/sync_server/rest_api.py new file mode 100644 index 0000000000..b7c5d26d15 --- /dev/null +++ b/openpype/modules/sync_server/rest_api.py @@ -0,0 +1,68 @@ +from aiohttp.web_response import Response +from openpype.lib import Logger + + +class SyncServerModuleRestApi: + """ + REST API endpoint used for calling from hosts when context change + happens in Workfile app. + """ + + def __init__(self, user_module, server_manager): + self._log = None + self.module = user_module + self.server_manager = server_manager + + self.prefix = "/sync_server" + + self.register() + + @property + def log(self): + if self._log is None: + self._log = Logger.get_logger(self.__class__.__name__) + return self._log + + def register(self): + self.server_manager.add_route( + "POST", + self.prefix + "/add_sites_to_representations", + self.add_sites_to_representations, + ) + self.server_manager.add_route( + "GET", + self.prefix + "/files_are_processed", + self.files_are_processed, + ) + + async def add_sites_to_representations(self, request): + # Extract data from request + data = await request.json() + try: + project_name = data["project_name"] + sites = data["sites"] + representations = data["representations"] + except KeyError: + msg = ( + "Payload must contain fields 'project_name," + " 'sites' (list of names) and 'representations' (list of IDs)" + ) + self.log.error(msg) + return Response(status=400, message=msg) + + # Add all sites to each representation + for representation_id in representations: + for site in sites: + self.module.add_site( + project_name, representation_id, site, force=True + ) + + # Force timer to run immediately + self.module.reset_timer() + + return Response(status=200) + + async def files_are_processed(self, _request): + return Response( + body=bytes(self.module.sync_server_thread.files_are_processed) + ) diff --git a/openpype/modules/sync_server/sync_server.py b/openpype/modules/sync_server/sync_server.py index 353b39c4e1..7fd2311c2d 100644 --- a/openpype/modules/sync_server/sync_server.py +++ b/openpype/modules/sync_server/sync_server.py @@ -237,15 +237,13 @@ class SyncServerThread(threading.Thread): def __init__(self, module): self.log = Logger.get_logger(self.__class__.__name__) - # Event to trigger files have been processed - self.files_processed = threading.Event() - - super(SyncServerThread, self).__init__(args=(self.files_processed,)) + super(SyncServerThread, self).__init__() self.module = module self.loop = None self.is_running = False self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) self.timer = None + self.files_are_processed = False def run(self): self.is_running = True @@ -400,8 +398,8 @@ class SyncServerThread(threading.Thread): representation, site, error) - # Trigger files are processed - self.files_processed.set() + # Trigger files process finished + self.files_are_processed = False duration = time.time() - start_time self.log.debug("One loop took {:.2f}s".format(duration)) @@ -460,7 +458,6 @@ class SyncServerThread(threading.Thread): async def run_timer(self, delay): """Wait for 'delay' seconds to start next loop""" - self.files_processed.clear() await asyncio.sleep(delay) def reset_timer(self): @@ -469,6 +466,7 @@ class SyncServerThread(threading.Thread): if self.timer: self.timer.cancel() self.timer = None + self.files_are_processed = True def _working_sites(self, project_name): if self.module.is_project_paused(project_name): diff --git a/openpype/modules/sync_server/sync_server_module.py b/openpype/modules/sync_server/sync_server_module.py index a478faa9ef..7aaf42006c 100644 --- a/openpype/modules/sync_server/sync_server_module.py +++ b/openpype/modules/sync_server/sync_server_module.py @@ -2089,6 +2089,15 @@ class SyncServerModule(OpenPypeModule, ITrayModule): def cli(self, click_group): click_group.add_command(cli_main) + # Webserver module implementation + def webserver_initialization(self, server_manager): + """Add routes for syncs.""" + if self.tray_initialized: + from .rest_api import SyncServerModuleRestApi + self.rest_api_obj = SyncServerModuleRestApi( + self, server_manager + ) + @click.group(SyncServerModule.name, help="SyncServer module related commands.") def cli_main(): diff --git a/openpype/modules/timers_manager/rest_api.py b/openpype/modules/timers_manager/rest_api.py index 4a2e9e6575..979db9075b 100644 --- a/openpype/modules/timers_manager/rest_api.py +++ b/openpype/modules/timers_manager/rest_api.py @@ -21,7 +21,7 @@ class TimersManagerModuleRestApi: @property def log(self): if self._log is None: - self._log = Logger.get_logger(self.__ckass__.__name__) + self._log = Logger.get_logger(self.__class__.__name__) return self._log def register(self):