mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 21:04:40 +01:00
457 lines
19 KiB
Python
457 lines
19 KiB
Python
"""Python 3 only implementation."""
|
|
import os
|
|
import asyncio
|
|
import threading
|
|
import concurrent.futures
|
|
from concurrent.futures._base import CancelledError
|
|
|
|
from .providers import lib
|
|
from openpype.lib import Logger
|
|
|
|
from .utils import SyncStatus, ResumableError
|
|
|
|
|
|
async def upload(module, project_name, file, representation, provider_name,
|
|
remote_site_name, tree=None, preset=None):
|
|
"""
|
|
Upload single 'file' of a 'representation' to 'provider'.
|
|
Source url is taken from 'file' portion, where {root} placeholder
|
|
is replaced by 'representation.Context.root'
|
|
Provider could be one of implemented in provider.py.
|
|
|
|
Updates MongoDB, fills in id of file from provider (ie. file_id
|
|
from GDrive), 'created_dt' - time of upload
|
|
|
|
'provider_name' doesn't have to match to 'site_name', single
|
|
provider (GDrive) might have multiple sites ('projectA',
|
|
'projectB')
|
|
|
|
Args:
|
|
module(SyncServerModule): object to run SyncServerModule API
|
|
project_name (str): source db
|
|
file (dictionary): of file from representation in Mongo
|
|
representation (dictionary): of representation
|
|
provider_name (string): gdrive, gdc etc.
|
|
site_name (string): site on provider, single provider(gdrive) could
|
|
have multiple sites (different accounts, credentials)
|
|
tree (dictionary): injected memory structure for performance
|
|
preset (dictionary): site config ('credentials_url', 'root'...)
|
|
|
|
"""
|
|
# create ids sequentially, upload file in parallel later
|
|
with module.lock:
|
|
# this part modifies structure on 'remote_site', only single
|
|
# thread can do that at a time, upload/download to prepared
|
|
# structure should be run in parallel
|
|
remote_handler = lib.factory.get_provider(provider_name,
|
|
project_name,
|
|
remote_site_name,
|
|
tree=tree,
|
|
presets=preset)
|
|
|
|
file_path = file.get("path", "")
|
|
try:
|
|
local_file_path, remote_file_path = resolve_paths(
|
|
module, file_path, project_name,
|
|
remote_site_name, remote_handler
|
|
)
|
|
except Exception as exp:
|
|
print(exp)
|
|
|
|
target_folder = os.path.dirname(remote_file_path)
|
|
folder_id = remote_handler.create_folder(target_folder)
|
|
|
|
if not folder_id:
|
|
err = "Folder {} wasn't created. Check permissions.". \
|
|
format(target_folder)
|
|
raise NotADirectoryError(err)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
file_id = await loop.run_in_executor(None,
|
|
remote_handler.upload_file,
|
|
local_file_path,
|
|
remote_file_path,
|
|
module,
|
|
project_name,
|
|
file,
|
|
representation,
|
|
remote_site_name,
|
|
True
|
|
)
|
|
|
|
module.handle_alternate_site(project_name, representation,
|
|
remote_site_name,
|
|
file["_id"], file_id)
|
|
|
|
return file_id
|
|
|
|
|
|
async def download(module, project_name, file, representation, provider_name,
|
|
remote_site_name, tree=None, preset=None):
|
|
"""
|
|
Downloads file to local folder denoted in representation.Context.
|
|
|
|
Args:
|
|
module(SyncServerModule): object to run SyncServerModule API
|
|
project_name (str): source
|
|
file (dictionary) : info about processed file
|
|
representation (dictionary): repr that 'file' belongs to
|
|
provider_name (string): 'gdrive' etc
|
|
site_name (string): site on provider, single provider(gdrive) could
|
|
have multiple sites (different accounts, credentials)
|
|
tree (dictionary): injected memory structure for performance
|
|
preset (dictionary): site config ('credentials_url', 'root'...)
|
|
|
|
Returns:
|
|
(string) - 'name' of local file
|
|
"""
|
|
with module.lock:
|
|
remote_handler = lib.factory.get_provider(provider_name,
|
|
project_name,
|
|
remote_site_name,
|
|
tree=tree,
|
|
presets=preset)
|
|
|
|
file_path = file.get("path", "")
|
|
local_file_path, remote_file_path = resolve_paths(
|
|
module, file_path, project_name, remote_site_name, remote_handler
|
|
)
|
|
|
|
local_folder = os.path.dirname(local_file_path)
|
|
os.makedirs(local_folder, exist_ok=True)
|
|
|
|
local_site = module.get_active_site(project_name)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
file_id = await loop.run_in_executor(None,
|
|
remote_handler.download_file,
|
|
remote_file_path,
|
|
local_file_path,
|
|
module,
|
|
project_name,
|
|
file,
|
|
representation,
|
|
local_site,
|
|
True
|
|
)
|
|
|
|
module.handle_alternate_site(project_name, representation, local_site,
|
|
file["_id"], file_id)
|
|
|
|
return file_id
|
|
|
|
|
|
def resolve_paths(module, file_path, project_name,
|
|
remote_site_name=None, remote_handler=None):
|
|
"""
|
|
Returns tuple of local and remote file paths with {root}
|
|
placeholders replaced with proper values from Settings or Anatomy
|
|
|
|
Ejected here because of Python 2 hosts (GDriveHandler is an issue)
|
|
|
|
Args:
|
|
module(SyncServerModule): object to run SyncServerModule API
|
|
file_path(string): path with {root}
|
|
project_name(string): project name
|
|
remote_site_name(string): remote site
|
|
remote_handler(AbstractProvider): implementation
|
|
Returns:
|
|
(string, string) - proper absolute paths, remote path is optional
|
|
"""
|
|
remote_file_path = ''
|
|
if remote_handler:
|
|
remote_file_path = remote_handler.resolve_path(file_path)
|
|
|
|
local_handler = lib.factory.get_provider(
|
|
'local_drive', project_name, module.get_active_site(project_name))
|
|
local_file_path = local_handler.resolve_path(file_path)
|
|
|
|
return local_file_path, remote_file_path
|
|
|
|
|
|
def _site_is_working(module, project_name, site_name, site_config):
|
|
"""
|
|
Confirm that 'site_name' is configured correctly for 'project_name'.
|
|
|
|
Must be here as lib.factory access doesn't work in Python 2 hosts.
|
|
|
|
Args:
|
|
module (SyncServerModule)
|
|
project_name(string):
|
|
site_name(string):
|
|
site_config (dict): configuration for site from Settings
|
|
Returns
|
|
(bool)
|
|
"""
|
|
provider = module.get_provider_for_site(site=site_name)
|
|
handler = lib.factory.get_provider(provider,
|
|
project_name,
|
|
site_name,
|
|
presets=site_config)
|
|
|
|
return handler.is_active()
|
|
|
|
|
|
class SyncServerThread(threading.Thread):
|
|
"""
|
|
Separate thread running synchronization server with asyncio loop.
|
|
Stopped when tray is closed.
|
|
"""
|
|
def __init__(self, module):
|
|
self.log = Logger.get_logger(self.__class__.__name__)
|
|
|
|
super(SyncServerThread, self).__init__()
|
|
self.module = module
|
|
self.loop = None
|
|
self.is_running = False
|
|
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
|
|
self.timer = None
|
|
|
|
def run(self):
|
|
self.is_running = True
|
|
|
|
try:
|
|
self.log.info("Starting Sync Server")
|
|
self.loop = asyncio.new_event_loop() # create new loop for thread
|
|
asyncio.set_event_loop(self.loop)
|
|
self.loop.set_default_executor(self.executor)
|
|
|
|
asyncio.ensure_future(self.check_shutdown(), loop=self.loop)
|
|
asyncio.ensure_future(self.sync_loop(), loop=self.loop)
|
|
self.log.info("Sync Server Started")
|
|
self.loop.run_forever()
|
|
except Exception:
|
|
self.log.warning(
|
|
"Sync Server service has failed", exc_info=True
|
|
)
|
|
finally:
|
|
self.loop.close() # optional
|
|
|
|
async def sync_loop(self):
|
|
"""
|
|
Runs permanently, each time:
|
|
- gets list of collections in DB
|
|
- gets list of active remote providers (has configuration,
|
|
credentials)
|
|
- for each project_name it looks for representations that
|
|
should be synced
|
|
- synchronize found collections
|
|
- update representations - fills error messages for exceptions
|
|
- waits X seconds and repeat
|
|
Returns:
|
|
|
|
"""
|
|
while self.is_running and not self.module.is_paused():
|
|
try:
|
|
import time
|
|
start_time = time.time()
|
|
self.module.set_sync_project_settings() # clean cache
|
|
project_name = None
|
|
enabled_projects = self.module.get_enabled_projects()
|
|
for project_name in enabled_projects:
|
|
preset = self.module.sync_project_settings[project_name]
|
|
|
|
local_site, remote_site = self._working_sites(project_name,
|
|
preset)
|
|
if not all([local_site, remote_site]):
|
|
continue
|
|
|
|
sync_repres = self.module.get_sync_representations(
|
|
project_name,
|
|
local_site,
|
|
remote_site
|
|
)
|
|
|
|
task_files_to_process = []
|
|
files_processed_info = []
|
|
# process only unique file paths in one batch
|
|
# multiple representation could have same file path
|
|
# (textures),
|
|
# upload process can find already uploaded file and
|
|
# reuse same id
|
|
processed_file_path = set()
|
|
|
|
site_preset = preset.get('sites')[remote_site]
|
|
remote_provider = \
|
|
self.module.get_provider_for_site(site=remote_site)
|
|
handler = lib.factory.get_provider(remote_provider,
|
|
project_name,
|
|
remote_site,
|
|
presets=site_preset)
|
|
limit = lib.factory.get_provider_batch_limit(
|
|
remote_provider)
|
|
# first call to get_provider could be expensive, its
|
|
# building folder tree structure in memory
|
|
# call only if needed, eg. DO_UPLOAD or DO_DOWNLOAD
|
|
for sync in sync_repres:
|
|
if self.module.\
|
|
is_representation_paused(sync['_id']):
|
|
continue
|
|
if limit <= 0:
|
|
continue
|
|
files = sync.get("files") or []
|
|
if files:
|
|
for file in files:
|
|
# skip already processed files
|
|
file_path = file.get('path', '')
|
|
if file_path in processed_file_path:
|
|
continue
|
|
status = self.module.check_status(
|
|
file,
|
|
local_site,
|
|
remote_site,
|
|
preset.get('config'))
|
|
if status == SyncStatus.DO_UPLOAD:
|
|
tree = handler.get_tree()
|
|
limit -= 1
|
|
task = asyncio.create_task(
|
|
upload(self.module,
|
|
project_name,
|
|
file,
|
|
sync,
|
|
remote_provider,
|
|
remote_site,
|
|
tree,
|
|
site_preset))
|
|
task_files_to_process.append(task)
|
|
# store info for exception handlingy
|
|
files_processed_info.append((file,
|
|
sync,
|
|
remote_site,
|
|
project_name
|
|
))
|
|
processed_file_path.add(file_path)
|
|
if status == SyncStatus.DO_DOWNLOAD:
|
|
tree = handler.get_tree()
|
|
limit -= 1
|
|
task = asyncio.create_task(
|
|
download(self.module,
|
|
project_name,
|
|
file,
|
|
sync,
|
|
remote_provider,
|
|
remote_site,
|
|
tree,
|
|
site_preset))
|
|
task_files_to_process.append(task)
|
|
|
|
files_processed_info.append((file,
|
|
sync,
|
|
local_site,
|
|
project_name
|
|
))
|
|
processed_file_path.add(file_path)
|
|
|
|
self.log.debug("Sync tasks count {}".format(
|
|
len(task_files_to_process)
|
|
))
|
|
files_created = await asyncio.gather(
|
|
*task_files_to_process,
|
|
return_exceptions=True)
|
|
for file_id, info in zip(files_created,
|
|
files_processed_info):
|
|
file, representation, site, project_name = info
|
|
error = None
|
|
if isinstance(file_id, BaseException):
|
|
error = str(file_id)
|
|
file_id = None
|
|
self.module.update_db(project_name,
|
|
file_id,
|
|
file,
|
|
representation,
|
|
site,
|
|
error)
|
|
|
|
duration = time.time() - start_time
|
|
self.log.debug("One loop took {:.2f}s".format(duration))
|
|
|
|
delay = self.module.get_loop_delay(project_name)
|
|
self.log.debug(
|
|
"Waiting for {} seconds to new loop".format(delay)
|
|
)
|
|
self.timer = asyncio.create_task(self.run_timer(delay))
|
|
await asyncio.gather(self.timer)
|
|
|
|
except ConnectionResetError:
|
|
self.log.warning(
|
|
"ConnectionResetError in sync loop, trying next loop",
|
|
exc_info=True)
|
|
except CancelledError:
|
|
# just stopping server
|
|
pass
|
|
except ResumableError:
|
|
self.log.warning(
|
|
"ResumableError in sync loop, trying next loop",
|
|
exc_info=True)
|
|
except Exception:
|
|
self.stop()
|
|
self.log.warning(
|
|
"Unhandled except. in sync loop, stopping server",
|
|
exc_info=True)
|
|
|
|
def stop(self):
|
|
"""Sets is_running flag to false, 'check_shutdown' shuts server down"""
|
|
self.is_running = False
|
|
|
|
async def check_shutdown(self):
|
|
""" Future that is running and checks if server should be running
|
|
periodically.
|
|
"""
|
|
while self.is_running:
|
|
if self.module.long_running_tasks:
|
|
task = self.module.long_running_tasks.pop()
|
|
self.log.info("starting long running")
|
|
await self.loop.run_in_executor(None, task["func"])
|
|
self.log.info("finished long running")
|
|
self.module.projects_processed.remove(task["project_name"])
|
|
await asyncio.sleep(0.5)
|
|
tasks = [task for task in asyncio.all_tasks() if
|
|
task is not asyncio.current_task()]
|
|
list(map(lambda task: task.cancel(), tasks)) # cancel all the tasks
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
self.log.debug(
|
|
f'Finished awaiting cancelled tasks, results: {results}...')
|
|
await self.loop.shutdown_asyncgens()
|
|
# to really make sure everything else has time to stop
|
|
self.executor.shutdown(wait=True)
|
|
await asyncio.sleep(0.07)
|
|
self.loop.stop()
|
|
|
|
async def run_timer(self, delay):
|
|
"""Wait for 'delay' seconds to start next loop"""
|
|
await asyncio.sleep(delay)
|
|
|
|
def reset_timer(self):
|
|
"""Called when waiting for next loop should be skipped"""
|
|
self.log.debug("Resetting timer")
|
|
if self.timer:
|
|
self.timer.cancel()
|
|
self.timer = None
|
|
|
|
def _working_sites(self, project_name, sync_config):
|
|
if self.module.is_project_paused(project_name):
|
|
self.log.debug("Both sites same, skipping")
|
|
return None, None
|
|
|
|
local_site = self.module.get_active_site(project_name)
|
|
remote_site = self.module.get_remote_site(project_name)
|
|
if local_site == remote_site:
|
|
self.log.debug("{}-{} sites same, skipping".format(
|
|
local_site, remote_site))
|
|
return None, None
|
|
|
|
local_site_config = sync_config.get('sites')[local_site]
|
|
remote_site_config = sync_config.get('sites')[remote_site]
|
|
if not all([_site_is_working(self.module, project_name, local_site,
|
|
local_site_config),
|
|
_site_is_working(self.module, project_name, remote_site,
|
|
remote_site_config)]):
|
|
self.log.debug(
|
|
"Some of the sites {} - {} is not working properly".format(
|
|
local_site, remote_site
|
|
)
|
|
)
|
|
|
|
return None, None
|
|
|
|
return local_site, remote_site
|