Merge pull request #5 from pypeclub/feature/sync_server_modifications

Sync server modifications
This commit is contained in:
Félix David 2022-11-09 17:06:57 +01:00 committed by GitHub
commit d5b8b0e6e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 90 additions and 110 deletions

View file

@ -1368,33 +1368,6 @@ def get_representation_parents(project_name, representation):
return parents_by_repre_id[repre_id]
def get_representation_last_created_time_on_site(
representation: dict, site_name: str
) -> datetime:
"""Get `created_dt` value for representation on site.
Args:
representation (dict): Representation to get creation date of
site_name (str): Site from which to get the creation date
Returns:
datetime: Created time of representation on site
"""
created_time = next(
(
site.get("created_dt")
for site in representation["files"][0].get("sites", [])
if site["name"] == site_name
),
None,
)
if created_time:
return created_time
else:
# Use epoch as 'zero' time
return datetime.utcfromtimestamp(0)
def get_thumbnail_id_from_source(project_name, src_type, src_id):
"""Receive thumbnail id from source entity.

View file

@ -3,8 +3,6 @@ import shutil
from time import sleep
from openpype.client.entities import (
get_last_version_by_subset_id,
get_representation_by_id,
get_representation_last_created_time_on_site,
get_representations,
get_subsets,
)
@ -37,6 +35,12 @@ class CopyLastPublishedWorkfile(PreLaunchHook):
Returns:
None: This is a void method.
"""
sync_server = self.modules_manager.get("sync_server")
if not sync_server or not sync_server.enabled:
self.log.deubg("Sync server module is not enabled or available")
return
# Check there is no workfile available
last_workfile = self.data.get("last_workfile_path")
if os.path.exists(last_workfile):
@ -116,19 +120,19 @@ class CopyLastPublishedWorkfile(PreLaunchHook):
return
# Get workfile representation
last_version_doc = get_last_version_by_subset_id(
project_name, subset_id, fields=["_id"]
)
if not last_version_doc:
self.log.debug("Subset does not have any versions")
return
workfile_representation = next(
(
representation
for representation in get_representations(
project_name,
version_ids=[
(
get_last_version_by_subset_id(
project_name, subset_id, fields=["_id"]
)
or {}
).get("_id")
],
version_ids=[last_version_doc["_id"]]
)
if representation["context"]["task"]["name"] == task_name
),
@ -141,49 +145,20 @@ class CopyLastPublishedWorkfile(PreLaunchHook):
).format(task_name, host_name)
return
# POST to webserver sites to add to representations
webserver_url = os.environ.get("OPENPYPE_WEBSERVER_URL")
if not webserver_url:
self.log.warning("Couldn't find webserver url")
return
entry_point_url = "{}/sync_server".format(webserver_url)
rest_api_url = "{}/add_sites_to_representations".format(
entry_point_url
)
try:
import requests
except Exception:
self.log.warning(
"Couldn't add sites to representations "
"('requests' is not available)"
)
return
local_site_id = get_local_site_id()
requests.post(
rest_api_url,
json={
"project_name": project_name,
"sites": [local_site_id],
"representations": [str(workfile_representation["_id"])],
},
sync_server.add_site(
project_name,
workfile_representation["_id"],
local_site_id,
force=True,
priority=99,
reset_timer=True
)
# Wait for the download loop to end
last_created_time = get_representation_last_created_time_on_site(
workfile_representation, local_site_id
)
while (
last_created_time
>= get_representation_last_created_time_on_site(
get_representation_by_id(
project_name,
workfile_representation["_id"],
fields=["files"],
),
local_site_id,
)
while not sync_server.is_representation_on_site(
project_name,
workfile_representation["_id"],
local_site_id
):
sleep(5)

View file

@ -26,36 +26,11 @@ class SyncServerModuleRestApi:
def register(self):
self.server_manager.add_route(
"POST",
self.prefix + "/add_sites_to_representations",
self.add_sites_to_representations,
self.prefix + "/reset_timer",
self.reset_timer,
)
async def add_sites_to_representations(self, request):
# Extract data from request
data = await request.json()
try:
project_name = data["project_name"]
sites = data["sites"]
representations = data["representations"]
except KeyError:
msg = (
"Payload must contain fields 'project_name,"
" 'sites' (list of names) and 'representations' (list of IDs)"
)
self.log.error(msg)
return Response(status=400, message=msg)
# Add all sites to each representation
for representation_id in representations:
for site in sites:
self.module.add_site(
project_name,
representation_id,
site,
force=True,
priority=99,
)
async def reset_timer(self, request):
# Force timer to run immediately
self.module.reset_timer()

View file

@ -136,14 +136,14 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
""" Start of Public API """
def add_site(self, project_name, representation_id, site_name=None,
force=False, priority=None):
force=False, priority=None, reset_timer=False):
"""
Adds new site to representation to be synced.
'project_name' must have synchronization enabled (globally or
project only)
Used as a API endpoint from outside applications (Loader etc).
Used as an API endpoint from outside applications (Loader etc).
Use 'force' to reset existing site.
@ -153,6 +153,8 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
site_name (string): name of configured and active site
force (bool): reset site if exists
priority (int): set priority
reset_timer (bool): if delay timer should be reset, eg. user mark
some representation to be synced manually
Throws:
SiteAlreadyPresentError - if adding already existing site and
@ -171,6 +173,9 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
force=force,
priority=priority)
if reset_timer:
self.reset_timer()
def remove_site(self, project_name, representation_id, site_name,
remove_local_files=False):
"""
@ -913,7 +918,59 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
In case of user's involvement (reset site), start that right away.
"""
self.sync_server_thread.reset_timer()
if not self.enabled:
return
if self.sync_server_thread is None:
self._reset_timer_with_rest_api()
else:
self.sync_server_thread.reset_timer()
def is_representation_on_site(
self, project_name, representation_id, site_name
):
"""Checks if 'representation_id' has all files avail. on 'site_name'"""
representation = get_representation_by_id(project_name,
representation_id,
fields=["_id", "files"])
if not representation:
return False
on_site = False
for file_info in representation.get("files", []):
for site in file_info.get("sites", []):
if site["name"] != site_name:
continue
if (site.get("progress") or site.get("error") or
not site.get("created_dt")):
return False
on_site = True
return on_site
def _reset_timer_with_rest_api(self):
# POST to webserver sites to add to representations
webserver_url = os.environ.get("OPENPYPE_WEBSERVER_URL")
if not webserver_url:
self.log.warning("Couldn't find webserver url")
return
rest_api_url = "{}/sync_server/reset_timer".format(
webserver_url
)
try:
import requests
except Exception:
self.log.warning(
"Couldn't add sites to representations "
"('requests' is not available)"
)
return
requests.post(rest_api_url)
def get_enabled_projects(self):
"""Returns list of projects which have SyncServer enabled."""
@ -1546,12 +1603,12 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Args:
project_name (string): name of project - force to db connection as
each file might come from different collection
new_file_id (string):
new_file_id (string): only present if file synced successfully
file (dictionary): info about processed file (pulled from DB)
representation (dictionary): parent repr of file (from DB)
site (string): label ('gdrive', 'S3')
error (string): exception message
progress (float): 0-1 of progress of upload/download
progress (float): 0-0.99 of progress of upload/download
priority (int): 0-100 set priority
Returns: