Sync Server - fixed usage of settings

Implemented settings per project
Updated defaults settings
This commit is contained in:
Petr Kalis 2020-12-07 20:50:24 +01:00
parent ecf220156e
commit 76e3d70cca
5 changed files with 151 additions and 119 deletions

View file

@ -25,10 +25,14 @@ class GDriveHandler(AbstractProvider):
slow and should be run only when necessary. Currently is set to
lazy creation, created only after first call when necessary.
Configuration for provider is in pype-config/presets/gdrive.json
Configuration for provider is in
'settings/defaults/project_settings/global.json'
Settings could be overwritten per project.
Example of config:
"gdrive": { - site name
"provider": "gdrive", - type of provider, label must be registered
"credentials_url": "/my_secret_folder/credentials.json",
"root": { - could be "root": "/My Drive" for single root
"root_one": "/My Drive",
@ -39,12 +43,12 @@ class GDriveHandler(AbstractProvider):
FOLDER_STR = 'application/vnd.google-apps.folder'
MY_DRIVE_STR = 'My Drive' # name of root folder of regular Google drive
def __init__(self, site_name, tree=None):
def __init__(self, site_name, tree=None, presets=None):
self.presets = None
self.active = False
self.site_name = site_name
self.presets = self.get_presets().get(site_name, None)
self.presets = presets
if not self.presets:
log.info("Sync Server: There are no presets for {}.".
format(site_name))

View file

@ -29,7 +29,7 @@ class ProviderFactory:
"""
self.providers[provider] = (creator, batch_limit)
def get_provider(self, provider, site_name, tree=None):
def get_provider(self, provider, site_name, tree=None, presets=None):
"""
Returns new instance of provider client for specific site.
One provider could have multiple sites.
@ -42,11 +42,13 @@ class ProviderFactory:
site_name (string): descriptor of site, different service accounts
must have different site name
tree (dictionary): - folder paths to folder id structure
presets (dictionary): config for provider and site (eg.
"credentials_url"..)
Returns:
(implementation of AbstractProvider)
"""
creator_info = self._get_creator_info(provider)
site = creator_info[0](site_name, tree) # call init
site = creator_info[0](site_name, tree, presets) # call init
return site
@ -68,10 +70,16 @@ class ProviderFactory:
def _get_creator_info(self, provider):
"""
Collect all necessary info for provider. Currently only creator
class and batch limit
class and batch limit.
Args:
provider (string): 'gdrive' etc
Returns:
(tuple): (creator, batch_limit)
creator is class of a provider (ex: GDriveHandler)
batch_limit denotes how many files synced at single loop
its provided via 'register_provider' as its needed even
before provider class is initialized itself
(setting it as a class variable didn't work)
"""
creator_info = self.providers.get(provider)
if not creator_info:
@ -81,4 +89,8 @@ class ProviderFactory:
factory = ProviderFactory()
# this says that there is implemented provider with a label 'gdrive'
# there is implementing 'GDriveHandler' class
# 7 denotes number of files that could be synced in single loop - learned by
# trial and error
factory.register_provider('gdrive', GDriveHandler, 7)

View file

@ -1,4 +1,4 @@
from pype.api import get_system_settings, Logger
from pype.api import get_system_settings, get_project_settings, Logger
import threading
import asyncio
@ -92,79 +92,89 @@ class SyncServer():
self.connection = AvalonMongoDB()
try:
self.presets = get_system_settings()["sync_server"]["config"]
self.presets = self.get_synced_presets()
self.set_active_sites(self.presets)
self.sync_server_thread = SyncServerThread(self)
except KeyError as exp:
log.debug(("There are not set presets for SyncServer OR "
"Credentials provided are invalid, "
"no syncing possible").
format(str(self.presets)), exc_info=True)
self.active_site = self.presets["active_site"]
self.remote_site = self.presets["remote_site"]
# try to activate providers, need to have valid credentials
self.active_sites = []
for provider in lib.factory.providers.keys():
for site in lib.factory.providers[provider][0].get_presets(). \
keys():
handler = lib.factory.get_provider(provider, site)
if handler.is_active():
self.active_sites.append((provider, site))
except KeyError:
log.debug(("There are not set presets for SyncServer."
" No credentials provided, no syncing possible").
format(str(self.presets)))
@property
def active_site(self):
def get_synced_presets(self):
"""
Returns active 'local' site (could be personal location on user
laptop or general 'studio' mounted disk.
Its 'mine' part of synchronization.
Collects all projects which have enabled syncing and their settings
Returns:
(string)
(dict): of settings, keys are project names
"""
return self._active_site
sync_presets = {}
for collection in self.connection.database.collection_names(False):
sync_settings = self.get_synced_preset(collection)
if sync_settings:
sync_presets[collection] = sync_settings
@active_site.setter
def active_site(self, value):
return sync_presets
def get_synced_preset(self, project_name):
""" Handles pulling sync_server's settings for enabled 'project_name'
Args:
project_name (str): used in project settings
Returns:
(dict): settings dictionary for the enabled project,
empty if no settings or sync is disabled
"""
Sets 'mine' part of synchronization process. It is expected only
single site is active at the time. Active site could be changed
though on different location (user working in studio has
'active_site' = 'studio', when user is at home changes
'active_site' to 'john_doe_local_001'.
settings = get_project_settings(project_name)
sync_settings = settings.get("global")["sync_server"]
if not sync_settings:
log.debug("No project setting for sync_server, not syncing.")
return {}
if sync_settings.get("enabled"):
return sync_settings
return {}
def set_active_sites(self, settings):
"""
Sets 'self.active_sites' as a dictionary from provided 'settings'
Format:
{ 'project_name' : ('provider_name', 'site_name') }
Args:
value (string): label for site, needs to match representation's
'files.site'.keys()
Returns:
(string)
settings (dict): all enabled project sync setting (sites labesl,
retries count etc.)
"""
self._active_site = value
self.active_sites = {}
for project_name, project_setting in settings.items():
for site_name, config in project_setting.get("sites").items():
handler = lib.factory.get_provider(config["provider"],
site_name,
presets=config)
if handler.is_active():
if not self.active_sites.get('project_name'):
self.active_sites[project_name] = []
@property
def remote_site(self):
"""
Remote side of synchronization, where "to synchronize to".
Currently expected only single remote destination ('gdrive'..),
but prepared for multiple.
Denotes 'theirs' side of synchronization.
self.active_sites[project_name].append(
(config["provider"], site_name))
Returns:
(list) of strings (['gdrive'])
"""
return [self._remote_site]
if not self.active_sites:
log.debug("No sync sites active, no working credentials provided")
@remote_site.setter
def remote_site(self, value):
self._remote_site = value
def get_active_sites(self, project_name):
"""
Returns active sites (provider configured and able to connect) per
project.
def get_collections(self):
Args:
project_name (str): used as a key in dict
Returns:
(dict):
Format:
{ 'project_name' : ('provider_name', 'site_name') }
"""
Returns:
(list) of strings with collection names in avalon DB
"""
return self.connection.database.collection_names(False)
return self.active_sites[project_name]
@time_function
def get_sync_representations(self, collection, active_site, remote_site):
@ -191,8 +201,8 @@ class SyncServer():
log.debug("Check representations for : {}".format(collection))
self.connection.Session["AVALON_PROJECT"] = collection
# retry_cnt - number of attempts to sync specific file before giving up
retries_arr = self._get_retries_arr()
active_providers_str = ",".join(remote_site)
retries_arr = self._get_retries_arr(collection)
#active_providers_str = ",".join(remote_site)
query = {
"type": "representation",
"$or": [
@ -206,7 +216,7 @@ class SyncServer():
}}, {
"files.sites": {
"$elemMatch": {
"name": {"$in": [active_providers_str]},
"name": {"$in": [remote_site]},
"created_dt": {"$exists": False},
"tries": {"$in": retries_arr}
}
@ -223,7 +233,7 @@ class SyncServer():
}}, {
"files.sites": {
"$elemMatch": {
"name": {"$in": [active_providers_str]},
"name": {"$in": [remote_site]},
"created_dt": {"$exists": True}
}
}
@ -237,18 +247,19 @@ class SyncServer():
return representations
def check_status(self, file, provider_name):
def check_status(self, file, provider_name, config_preset):
"""
Check synchronization status for single 'file' of single
'representation' by single 'provider'.
(Eg. check if 'scene.ma' of lookdev.v10 should be synced to GDrive
Always is comparing local record, eg. site with
'name' == self.presets["active_site"]
'name' == self.presets[PROJECT_NAME]['config']["active_site"]
Args:
file (dictionary): of file from representation in Mongo
provider_name (string): - gdrive etc.
config_preset (dict): config about active site, retries
Returns:
(string) - one of SyncStatus
"""
@ -262,25 +273,25 @@ class SyncServer():
tries = self._get_tries_count_from_rec(provider_rec)
# file will be skipped if unsuccessfully tried over threshold
# error metadata needs to be purged manually in DB to reset
if tries < self.presets["retry_cnt"]:
if tries < config_preset["retry_cnt"]:
return SyncStatus.DO_UPLOAD
else:
_, local_rec = self._get_provider_rec(
sites,
self.presets["active_site"]) or {}
config_preset["active_site"]) or {}
if not local_rec or not local_rec.get("created_dt"):
tries = self._get_tries_count_from_rec(local_rec)
# file will be skipped if unsuccessfully tried over
# threshold times, error metadata needs to be purged
# manually in DB to reset
if tries < self.presets["retry_cnt"]:
if tries < config_preset["retry_cnt"]:
return SyncStatus.DO_DOWNLOAD
return SyncStatus.DO_NOTHING
async def upload(self, file, representation, provider_name, site_name,
tree=None):
tree=None, preset=None):
"""
Upload single 'file' of a 'representation' to 'provider'.
Source url is taken from 'file' portion, where {root} placeholder
@ -297,6 +308,7 @@ class SyncServer():
site_name (string): site on provider, single provider(gdrive) could
have multiple sites (different accounts, credentials)
tree (dictionary): injected memory structure for performance
preset (dictionary): site config ('credentials_url', 'root'...)
"""
# create ids sequentially, upload file in parallel later
@ -304,7 +316,8 @@ class SyncServer():
# this part modifies structure on 'remote_site', only single
# thread can do that at a time, upload/download to prepared
# structure should be run in parallel
handler = lib.factory.get_provider(provider_name, site_name, tree)
handler = lib.factory.get_provider(provider_name, site_name,
tree=tree, presets=preset)
remote_file = self._get_remote_file_path(file,
handler.get_roots_config()
)
@ -328,7 +341,7 @@ class SyncServer():
return file_id
async def download(self, file, representation, provider_name,
site_name, tree=None):
site_name, tree=None, preset=None):
"""
Downloads file to local folder denoted in representation.Context.
@ -339,12 +352,14 @@ class SyncServer():
site_name (string): site on provider, single provider(gdrive) could
have multiple sites (different accounts, credentials)
tree (dictionary): injected memory structure for performance
preset (dictionary): site config ('credentials_url', 'root'...)
Returns:
(string) - 'name' of local file
"""
with self.lock:
handler = lib.factory.get_provider(provider_name, site_name, tree)
handler = lib.factory.get_provider(provider_name, site_name,
tree=tree, presets=preset)
remote_file = self._get_remote_file_path(file,
handler.get_roots_config()
)
@ -538,14 +553,14 @@ class SyncServer():
update
)
def get_loop_delay(self):
def get_loop_delay(self, project_name):
"""
Return count of seconds before next synchronization loop starts
after finish of previous loop.
Returns:
(int): in seconds
"""
return self.presets["loop_delay"]
return int(self.presets[project_name]["config"]["loop_delay"])
def _get_success_dict(self, file_index, site_index, new_file_id):
"""
@ -642,7 +657,7 @@ class SyncServer():
path = path.format(**root_config)
return path
def _get_retries_arr(self):
def _get_retries_arr(self, project_name):
"""
Returns array with allowed values in 'tries' field. If repre
contains these values, it means it was tried to be synchronized
@ -651,7 +666,8 @@ class SyncServer():
Returns:
(list)
"""
arr = [i for i in range(self.presets["retry_cnt"])]
retry_cnt = self.presets[project_name].get("config")["retry_cnt"]
arr = [i for i in range(int(retry_cnt))]
arr.append(None)
return arr
@ -706,15 +722,16 @@ class SyncServerThread(threading.Thread):
while self.is_running:
import time
start_time = None
for collection in self.module.get_collections():
for collection, preset in self.module.get_synced_presets().\
items():
start_time = time.time()
sync_repres = self.module.get_sync_representations(
collection,
self.module.active_site,
self.module.remote_site
preset.get('config')["active_site"],
preset.get('config')["remote_site"]
)
local = self.module.active_site
local = preset.get('config')["active_site"]
task_files_to_process = []
files_processed_info = []
# process only unique file paths in one batch
@ -723,9 +740,12 @@ class SyncServerThread(threading.Thread):
# upload process can find already uploaded file and
# reuse same id
processed_file_path = set()
for active_site in self.module.active_sites:
provider, site = active_site
handler = lib.factory.get_provider(provider, site)
for check_site in self.module.get_active_sites(collection):
provider, site = check_site
site_preset = preset.get('sites')[site]
handler = lib.factory.get_provider(provider,
site,
presets=site_preset)
limit = lib.factory.get_provider_batch_limit(provider)
# first call to get_provider could be expensive, its
# building folder tree structure in memory
@ -741,8 +761,10 @@ class SyncServerThread(threading.Thread):
if file_path in processed_file_path:
continue
status = self.module.check_status(file,
provider)
status = self.module.check_status(
file,
provider,
preset.get('config'))
if status == SyncStatus.DO_UPLOAD:
tree = handler.get_tree()
limit -= 1
@ -751,7 +773,8 @@ class SyncServerThread(threading.Thread):
sync,
provider,
site,
tree))
tree,
site_preset))
task_files_to_process.append(task)
# store info for exception handling
files_processed_info.append((file,
@ -762,11 +785,12 @@ class SyncServerThread(threading.Thread):
tree = handler.get_tree()
limit -= 1
task = asyncio.create_task(
self.module.download(file,
sync,
provider,
site,
tree))
self.module.download(file,
sync,
provider,
site,
tree,
site_preset))
task_files_to_process.append(task)
files_processed_info.append((file,
@ -794,7 +818,7 @@ class SyncServerThread(threading.Thread):
duration = time.time() - start_time
log.debug("One loop took {:.2f}s".format(duration))
await asyncio.sleep(self.module.get_loop_delay())
await asyncio.sleep(self.module.get_loop_delay(collection))
except ConnectionResetError:
log.warning("ConnectionResetError in sync loop, trying next loop",
exc_info=True)

View file

@ -180,16 +180,20 @@
}
},
"sync_server": {
"enabled": false,
"enabled": true,
"config": {
"local_id": "",
"retry_cnt": "",
"loop_delay": "",
"active_site": "",
"remote_site": ""
"local_id": "local_0",
"retry_cnt": "3",
"loop_delay": "60",
"active_site": "studio",
"remote_site": "gdrive"
},
"providers": {
"gdrive": {}
"sites": {
"gdrive": {
"provider": "gdrive",
"credentials_url": "",
"root": ""
}
}
}
}

View file

@ -146,20 +146,8 @@
"enabled": false,
"workspace_name": "studio name"
},
"sync_server": {
"enabled": false,
"config": {
"local_id": "local_0",
"retry_cnt": "3",
"loop_delay": "60",
"active_site": "studio",
"remote_site": "gdrive"
},
"providers": {
"gdrive": {
"credentials_url": ""
}
}
"Sync Server": {
"enabled": true
},
"deadline": {
"enabled": true,