mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 21:04:40 +01:00
Change to REST API using web server
This commit is contained in:
parent
ed96f1d5b3
commit
82be7ce8d0
5 changed files with 112 additions and 33 deletions
|
|
@ -1,14 +1,14 @@
|
|||
import gc
|
||||
import os
|
||||
import shutil
|
||||
from time import sleep
|
||||
from openpype.client.entities import (
|
||||
get_last_version_by_subset_id,
|
||||
get_representations,
|
||||
get_subsets,
|
||||
)
|
||||
from openpype.lib import PreLaunchHook
|
||||
from openpype.lib.local_settings import get_local_site_id
|
||||
from openpype.lib.profiles_filtering import filter_profiles
|
||||
from openpype.modules.base import ModulesManager
|
||||
from openpype.pipeline.load.utils import get_representation_path
|
||||
from openpype.settings.lib import get_project_settings
|
||||
|
||||
|
|
@ -137,33 +137,37 @@ class CopyLastPublishedWorkfile(PreLaunchHook):
|
|||
).format(task_name, host_name)
|
||||
return
|
||||
|
||||
# Get sync server from Tray,
|
||||
# which handles the asynchronous thread instance
|
||||
sync_server = next(
|
||||
(
|
||||
t["sync_server"]
|
||||
for t in [
|
||||
obj
|
||||
for obj in gc.get_objects()
|
||||
if isinstance(obj, ModulesManager)
|
||||
]
|
||||
if t["sync_server"].sync_server_thread
|
||||
),
|
||||
None,
|
||||
)
|
||||
# POST to webserver sites to add to representations
|
||||
webserver_url = os.environ.get("OPENPYPE_WEBSERVER_URL")
|
||||
if not webserver_url:
|
||||
self.log.warning("Couldn't find webserver url")
|
||||
return
|
||||
|
||||
# Add site and reset timer
|
||||
active_site = sync_server.get_active_site(project_name)
|
||||
sync_server.add_site(
|
||||
project_name,
|
||||
workfile_representation["_id"],
|
||||
active_site,
|
||||
force=True,
|
||||
entry_point_url = "{}/sync_server".format(webserver_url)
|
||||
rest_api_url = "{}/add_sites_to_representations".format(
|
||||
entry_point_url
|
||||
)
|
||||
try:
|
||||
import requests
|
||||
except Exception:
|
||||
self.log.warning(
|
||||
"Couldn't add sites to representations ('requests' is not available)"
|
||||
)
|
||||
return
|
||||
|
||||
requests.post(
|
||||
rest_api_url,
|
||||
json={
|
||||
"project_name": project_name,
|
||||
"sites": [get_local_site_id()],
|
||||
"representations": [str(workfile_representation["_id"])],
|
||||
},
|
||||
)
|
||||
sync_server.reset_timer()
|
||||
|
||||
# Wait for the download loop to end
|
||||
sync_server.sync_server_thread.files_processed.wait()
|
||||
rest_api_url = "{}/files_are_processed".format(entry_point_url)
|
||||
while requests.get(rest_api_url).content:
|
||||
sleep(5)
|
||||
|
||||
# Get paths
|
||||
published_workfile_path = get_representation_path(
|
||||
|
|
|
|||
68
openpype/modules/sync_server/rest_api.py
Normal file
68
openpype/modules/sync_server/rest_api.py
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
from aiohttp.web_response import Response
|
||||
from openpype.lib import Logger
|
||||
|
||||
|
||||
class SyncServerModuleRestApi:
|
||||
"""
|
||||
REST API endpoint used for calling from hosts when context change
|
||||
happens in Workfile app.
|
||||
"""
|
||||
|
||||
def __init__(self, user_module, server_manager):
|
||||
self._log = None
|
||||
self.module = user_module
|
||||
self.server_manager = server_manager
|
||||
|
||||
self.prefix = "/sync_server"
|
||||
|
||||
self.register()
|
||||
|
||||
@property
|
||||
def log(self):
|
||||
if self._log is None:
|
||||
self._log = Logger.get_logger(self.__class__.__name__)
|
||||
return self._log
|
||||
|
||||
def register(self):
|
||||
self.server_manager.add_route(
|
||||
"POST",
|
||||
self.prefix + "/add_sites_to_representations",
|
||||
self.add_sites_to_representations,
|
||||
)
|
||||
self.server_manager.add_route(
|
||||
"GET",
|
||||
self.prefix + "/files_are_processed",
|
||||
self.files_are_processed,
|
||||
)
|
||||
|
||||
async def add_sites_to_representations(self, request):
|
||||
# Extract data from request
|
||||
data = await request.json()
|
||||
try:
|
||||
project_name = data["project_name"]
|
||||
sites = data["sites"]
|
||||
representations = data["representations"]
|
||||
except KeyError:
|
||||
msg = (
|
||||
"Payload must contain fields 'project_name,"
|
||||
" 'sites' (list of names) and 'representations' (list of IDs)"
|
||||
)
|
||||
self.log.error(msg)
|
||||
return Response(status=400, message=msg)
|
||||
|
||||
# Add all sites to each representation
|
||||
for representation_id in representations:
|
||||
for site in sites:
|
||||
self.module.add_site(
|
||||
project_name, representation_id, site, force=True
|
||||
)
|
||||
|
||||
# Force timer to run immediately
|
||||
self.module.reset_timer()
|
||||
|
||||
return Response(status=200)
|
||||
|
||||
async def files_are_processed(self, _request):
|
||||
return Response(
|
||||
body=bytes(self.module.sync_server_thread.files_are_processed)
|
||||
)
|
||||
|
|
@ -237,15 +237,13 @@ class SyncServerThread(threading.Thread):
|
|||
def __init__(self, module):
|
||||
self.log = Logger.get_logger(self.__class__.__name__)
|
||||
|
||||
# Event to trigger files have been processed
|
||||
self.files_processed = threading.Event()
|
||||
|
||||
super(SyncServerThread, self).__init__(args=(self.files_processed,))
|
||||
super(SyncServerThread, self).__init__()
|
||||
self.module = module
|
||||
self.loop = None
|
||||
self.is_running = False
|
||||
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
|
||||
self.timer = None
|
||||
self.files_are_processed = False
|
||||
|
||||
def run(self):
|
||||
self.is_running = True
|
||||
|
|
@ -400,8 +398,8 @@ class SyncServerThread(threading.Thread):
|
|||
representation,
|
||||
site,
|
||||
error)
|
||||
# Trigger files are processed
|
||||
self.files_processed.set()
|
||||
# Trigger files process finished
|
||||
self.files_are_processed = False
|
||||
|
||||
duration = time.time() - start_time
|
||||
self.log.debug("One loop took {:.2f}s".format(duration))
|
||||
|
|
@ -460,7 +458,6 @@ class SyncServerThread(threading.Thread):
|
|||
|
||||
async def run_timer(self, delay):
|
||||
"""Wait for 'delay' seconds to start next loop"""
|
||||
self.files_processed.clear()
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
def reset_timer(self):
|
||||
|
|
@ -469,6 +466,7 @@ class SyncServerThread(threading.Thread):
|
|||
if self.timer:
|
||||
self.timer.cancel()
|
||||
self.timer = None
|
||||
self.files_are_processed = True
|
||||
|
||||
def _working_sites(self, project_name):
|
||||
if self.module.is_project_paused(project_name):
|
||||
|
|
|
|||
|
|
@ -2089,6 +2089,15 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
|
|||
def cli(self, click_group):
|
||||
click_group.add_command(cli_main)
|
||||
|
||||
# Webserver module implementation
|
||||
def webserver_initialization(self, server_manager):
|
||||
"""Add routes for syncs."""
|
||||
if self.tray_initialized:
|
||||
from .rest_api import SyncServerModuleRestApi
|
||||
self.rest_api_obj = SyncServerModuleRestApi(
|
||||
self, server_manager
|
||||
)
|
||||
|
||||
|
||||
@click.group(SyncServerModule.name, help="SyncServer module related commands.")
|
||||
def cli_main():
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ class TimersManagerModuleRestApi:
|
|||
@property
|
||||
def log(self):
|
||||
if self._log is None:
|
||||
self._log = Logger.get_logger(self.__ckass__.__name__)
|
||||
self._log = Logger.get_logger(self.__class__.__name__)
|
||||
return self._log
|
||||
|
||||
def register(self):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue