Added active_site, remote_site

Added reset site for representation and file
Removed io, switched to AvalonMongoDb
Added implementation for multiple projects in DB
This commit is contained in:
Petr Kalis 2020-09-30 19:27:20 +02:00
parent e2420c2919
commit a5e8b15a38

View file

@ -1,5 +1,4 @@
from pype.api import config, Logger
from avalon import io
from pype.lib import timeit
import threading
@ -12,6 +11,9 @@ from datetime import datetime
from .providers import lib
import os
from avalon import io
from avalon.api import AvalonMongoDB
log = Logger().get_logger("SyncServer")
@ -25,14 +27,15 @@ class SyncStatus(Enum):
class SyncServer():
"""
WIP
Synchronization server that is synching published files from local to
For testing set
Synchronization server that is syncing published files from local to
any of implemented providers (like GDrive, S3 etc.)
Runs in the background and checks all representations, looks for files
that are marked to be in different location than 'studio' (temporary),
checks if 'created_dt' field is present denoting successful sync
with provider destination.
Sites structure is created during publish and by default it will
always contain 1 record with "name" == self.presets["local_id"] and
always contain 1 record with "name" == self.presets["active_site"] and
filled "created_dt" AND 1 or multiple records for all defined
remote sites, where "created_dt" is not present.
This highlights that file should be uploaded to
@ -60,19 +63,20 @@ class SyncServer():
},
'''
Each Tray app has assigned its own self.presets["local_id"]
used in sites as a name. Tray is searching only for records where
name matches its self.presets["local_id"] + any defined remote sites.
If the local record has its "created_dt" filled, its a source and
used in sites as a name.
Tray is searching only for records where name matches its
self.presets["active_site"] + self.presets["remote_site"].
"active_site" could be storage in studio ('studio'), or specific
"local_id" when user is working disconnected from home.
If the local record has its "created_dt" filled, it is a source and
process will try to upload the file to all defined remote sites.
Remote files "id" is real id that could be used in approeckpriate API.
Remote files "id" is real id that could be used in appropriate API.
Local files have "id" too, for conformity, contains just file name.
It is expected that multiple providers will be implemented in separate
classes and registered in 'providers.py'.
"""
# TODO all these move to presets
LOCAL_PROVIDER = 'studio'
# limit querying DB to look for X number of representations that should
# be sync, we try to run more loops with less records
# actual number of files synced could be lower as providers can have
@ -87,17 +91,20 @@ class SyncServer():
self.presets = None
self.lock = threading.Lock()
if not io.Session:
io.install()
self.connection = AvalonMongoDB()
log.debug("connection {}".format(self.connection))
io.Session['AVALON_PROJECT'] = 'performance_test' # temp TODO
try:
self.presets = config.get_presets()["sync_server"]["config"]
except KeyError:
log.debug(("There are not set presets for SyncServer."
" No credentials provided, no synching possible").
" No credentials provided, no syncing possible").
format(str(self.presets)))
self.sync_server_thread = SynchServerThread(self)
self.sync_server_thread = SyncServerThread(self)
self.active_site = self.presets["active_site"]
self.remote_site = self.presets["remote_site"]
# try to activate providers, need to have valid credentials
self.active_provider_names = []
@ -106,10 +113,64 @@ class SyncServer():
if handler.is_active():
self.active_provider_names.append(provider)
@timeit
def get_sync_representations(self):
@property
def active_site(self):
"""
Get representations that should be synched, these could be
Returns active 'local' site (could be personal location on user
laptop or general 'studio' mounted disk.
Its 'mine' part of synchronization.
Returns:
(string)
"""
return self._active_site
@active_site.setter
def active_site(self, value):
"""
Sets 'mine' part of synchronization process. It is expected only
single site is active at the time. Active site could be changed
though on different location (user working in studio has
'active_site' = 'studio', when user is at home changes
'active_site' to 'john_doe_local_001'.
Args:
value (string): label for site, needs to match representation's
'files.site'.keys()
Returns:
(string)
"""
self._active_site = value
@property
def remote_site(self):
"""
Remote side of synchronization, where "to synchronize to".
Currently expected only single remote destination ('gdrive'..),
but prepared for multiple.
Denotes 'theirs' side of synchronization.
Returns:
(list) of strings (['gdrive'])
"""
return [self._remote_site]
@remote_site.setter
def remote_site(self, value):
self._remote_site = value
def get_collections(self):
"""
Returns:
(list) of strings with collection names in avalon DB
"""
return self.connection.database.collection_names(False)
@timeit
def get_sync_representations(self, collection, active_site, remote_site):
"""
Get representations that should be synced, these could be
recognised by presence of document in 'files.sites', where key is
a provider (GDrive, S3) and value is empty document or document
without 'created_dt' field. (Don't put null to 'created_dt'!).
@ -120,9 +181,11 @@ class SyncServer():
Returns:
(list)
"""
log.debug("Check representations for : {}".format(collection))
self.connection.Session["AVALON_PROJECT"] = collection
# retry_cnt - number of attempts to sync specific file before giving up
retries_arr = self._get_retries_arr()
active_providers_str = ",".join(self.active_provider_names)
active_providers_str = ",".join(remote_site)
query = {
"type": "representation",
"$or": [
@ -130,7 +193,7 @@ class SyncServer():
{
"files.sites": {
"$elemMatch": {
"name": self.presets["local_id"],
"name": active_site,
"created_dt": {"$exists": True}
}
}}, {
@ -146,7 +209,7 @@ class SyncServer():
{
"files.sites": {
"$elemMatch": {
"name": self.presets["local_id"],
"name": active_site,
"created_dt": {"$exists": False},
"tries": {"$in": retries_arr}
}
@ -161,8 +224,9 @@ class SyncServer():
]}
]
}
log.debug("query: {}".format(query))
representations = io.find(query).limit(self.REPRESENTATION_LIMIT)
log.debug("get_sync_representations.query: {}".format(query))
representations = self.connection.find(query)
return representations
@ -170,18 +234,17 @@ class SyncServer():
"""
Check synchronization status for single 'file' of single
'representation' by single 'provider'.
(Eg. check if 'scene.ma' of lookdev.v10 should be synched to GDrive
(Eg. check if 'scene.ma' of lookdev.v10 should be synced to GDrive
Always is comparing againts local record, eg. site with
'name' == self.presets["local_id"]
Always is comparing local record, eg. site with
'name' == self.presets["active_site"]
Args:
file (dictionary): of file from representation in Mongo
provider_name (string): - gdrive, gdc etc.
provider_name (string): - gdrive etc.
Returns:
(string) - one of SyncStatus
"""
log.debug("file: {}".format(file))
sites = file.get("sites") or []
# if isinstance(sites, list): # temporary, old format of 'sites'
# return SyncStatus.DO_NOTHING
@ -197,7 +260,7 @@ class SyncServer():
else:
_, local_rec = self._get_provider_rec(
sites,
self.presets["local_id"]) or {}
self.presets["active_site"]) or {}
if not local_rec or not local_rec.get("created_dt"):
tries = self._get_tries_count_from_rec(local_rec)
@ -219,14 +282,18 @@ class SyncServer():
Updates MongoDB, fills in id of file from provider (ie. file_id
from GDrive), 'created_dt' - time of upload
:param file: <dictionary> of file from representation in Mongo
:param representation: <dictionary> of representation
:param provider_name: <string> - gdrive, gdc etc.
:param tree: <dictionary> - injected memory structure for performance
:return:
Args:
file <dictionary>: of file from representation in Mongo
representation <dictionary>: of representation
provider_name <string>: gdrive, gdc etc.
tree <dictionary>: injected memory structure for performance
"""
# create ids sequentially, upload file in parallel later
with self.lock:
# this part modifies structure on 'remote_site', only single
# thread can do that at a time, upload/download to prepared
# structure should be run in parallel
handler = lib.factory.get_provider(provider_name, tree)
remote_file = self._get_remote_file_path(file,
handler.get_root_name())
@ -237,8 +304,9 @@ class SyncServer():
folder_id = handler.create_folder(target_folder)
if not folder_id:
raise NotADirectoryError("Folder {} wasn't created"
.format(target_folder))
err = "Folder {} wasn't created. Check permissions.".\
format(target_folder)
raise NotADirectoryError(err)
loop = asyncio.get_running_loop()
file_id = await loop.run_in_executor(None,
@ -319,8 +387,7 @@ class SyncServer():
update["$set"] = self._get_error_dict(file_index, site_index,
error, tries)
# it actually modifies single _id, but io.update_one not implemented
io.update_many(
self.connection.update_one(
query,
update
)
@ -353,6 +420,9 @@ class SyncServer():
def tray_exit(self):
self.stop()
def thread_stopped(self):
self._is_running = False
@property
def is_running(self):
return self.sync_server_thread.is_running
@ -361,12 +431,12 @@ class SyncServer():
if not self.is_running:
return
try:
log.debug("Stopping synch server server")
log.debug("Stopping sync server server")
self.sync_server_thread.is_running = False
self.sync_server_thread.stop()
except Exception:
log.warning(
"Error has happened during Killing synchserver server",
"Error has happened during Killing sync server",
exc_info=True
)
@ -409,31 +479,48 @@ class SyncServer():
return -1, None
def thread_stopped(self):
self._is_running = False
def reset_provider_for_file(self, file_id, site_index):
def reset_provider_for_file(self, collection, representation_id,
file_id, site_name):
"""
Reset information about synchronization for particular 'file_id'
and provider.
Useful for testing or forcing file to be reuploaded.
Args:
file_id (string): file id in representation
site_index(int): 'gdrive', 'S3' etc
collection (string): name of project (eg. collection) in DB
representation_id(string): _id of representation
file_id (string): file _id in representation
site_name (string): 'gdrive', 'S3' etc
Returns:
None
"""
# TODO - implement reset for ALL files or ALL sites
query = {
"files._id": file_id
"_id": io.ObjectId(representation_id)
}
update = {
"$unset": {"files.$.sites.{}".format(site_index): ""}
}
# it actually modifies single _id, but io.update_one not implemented
io.update_many(
query,
update
)
self.connection.Session["AVALON_PROJECT"] = collection
representation = list(self.connection.find(query))
if not representation:
raise ValueError("Representation {} not found in {}".
format(representation_id, collection))
files = representation[0].get('files', [])
file_index, _ = self._get_file_info(files,
file_id)
site_index, _ = self._get_provider_rec(files[file_index].
get('sites', []),
site_name)
if file_index > 0 and site_index > 0:
elem = {"name": site_name}
update = {
"$set": {"files.{}.sites.{}".format(file_index, site_index):
elem
}
}
self.connection.update_one(
query,
update
)
def get_loop_delay(self):
"""
@ -484,7 +571,7 @@ class SyncServer():
def _get_tries_count_from_rec(self, rec):
"""
Get number of failed attempts to synch from site record
Get number of failed attempts to sync from site record
Args:
rec (dictionary): info about specific site record
Returns:
@ -496,10 +583,10 @@ class SyncServer():
def _get_tries_count(self, file, provider):
"""
Get number of failed attempts to synch
Get number of failed attempts to sync
Args:
file (dictionary): info about specific file
provider (string): name of site ('gdrive' or specific LOCAL_ID)
provider (string): name of site ('gdrive' or specific user site)
Returns:
(int) - number of failed attempts
"""
@ -549,13 +636,13 @@ class SyncServer():
return arr
class SynchServerThread(threading.Thread):
class SyncServerThread(threading.Thread):
"""
Separate thread running synchronization server with asyncio loop.
Stopped when tray is closed.
"""
def __init__(self, module):
super(SynchServerThread, self).__init__()
super(SyncServerThread, self).__init__()
self.module = module
self.loop = None
self.is_running = False
@ -575,95 +662,114 @@ class SynchServerThread(threading.Thread):
self.loop.run_forever()
except Exception:
log.warning(
"Synch Server service has failed", exc_info=True
"Sync Server service has failed", exc_info=True
)
finally:
self.loop.close() # optional
async def sync_loop(self):
"""
Runs permanently, each time:
- gets list of collections in DB
- gets list of active remote providers (has configuration,
credentials)
- for each collection it looks for representations that should
be synced
- synchronize found collections
- update representations - fills error messages for exceptions
- waits X seconds and repeat
Returns:
"""
try:
while self.is_running:
import time
start_time = time.time()
sync_representations = self.module.get_sync_representations()
start_time = None
for collection in self.module.get_collections():
start_time = time.time()
sync_repres = self.module.get_sync_representations(
collection,
self.module.active_site,
self.module.remote_site
)
local_label = lib.Providers.LOCAL.value
task_files_to_process = []
files_processed_info = []
# process only unique file paths in one batch
# multiple representation could have same file path (textures),
# upload process can find already uploaded file and reuse same
# id
processed_file_path = set()
cnt = 0 # TODO remove
for provider in self.module.active_provider_names:
handler = lib.factory.get_provider(provider)
limit = lib.factory.get_provider_batch_limit(provider)
# first call to get_provider could be expensive, its
# building folder tree structure in memory
# call only if needed, eg. DO_UPLOAD or DO_DOWNLOAD
for sync in sync_representations:
if limit <= 0:
continue
files = sync.get("files") or []
if files:
for file in files:
cnt += 1
# skip already processed files
file_path = file.get('path', '')
if file_path in processed_file_path:
continue
local = self.module.active_site
task_files_to_process = []
files_processed_info = []
# process only unique file paths in one batch
# multiple representation could have same file path
# (textures),
# upload process can find already uploaded file and
# reuse same id
processed_file_path = set()
for provider in self.module.active_provider_names:
handler = lib.factory.get_provider(provider)
limit = lib.factory.get_provider_batch_limit(provider)
# first call to get_provider could be expensive, its
# building folder tree structure in memory
# call only if needed, eg. DO_UPLOAD or DO_DOWNLOAD
for sync in sync_repres:
if limit <= 0:
continue
files = sync.get("files") or []
if files:
for file in files:
# skip already processed files
file_path = file.get('path', '')
if file_path in processed_file_path:
continue
status = self.module.check_status(file,
provider)
if status == SyncStatus.DO_UPLOAD:
tree = handler.get_tree()
limit -= 1
task = asyncio.create_task(
self.module.upload(file,
sync,
provider,
tree))
task_files_to_process.append(task)
# store info for exception handling
files_processed_info.append((file,
sync,
provider))
processed_file_path.add(file_path)
if status == SyncStatus.DO_DOWNLOAD:
tree = handler.get_tree()
limit -= 1
task = asyncio.create_task(
self.module.download(file,
sync,
provider,
tree))
task_files_to_process.append(task)
status = self.module.check_status(file,
provider)
if status == SyncStatus.DO_UPLOAD:
tree = handler.get_tree()
limit -= 1
task = asyncio.create_task(
self.module.upload(file,
sync,
provider,
tree))
task_files_to_process.append(task)
# store info for exception handling
files_processed_info.append((file,
sync,
provider))
processed_file_path.add(file_path)
if status == SyncStatus.DO_DOWNLOAD:
tree = handler.get_tree()
limit -= 1
task = asyncio.create_task(
self.module.download(file,
sync,
provider,
tree))
task_files_to_process.append(task)
files_processed_info.append((file,
sync,
local_label))
processed_file_path.add(file_path)
files_processed_info.append((file,
sync,
local))
processed_file_path.add(file_path)
log.debug("gather tasks len {}".
format(len(task_files_to_process)))
log.debug("checked {} files".format(cnt))
files_created = await asyncio.gather(*task_files_to_process,
return_exceptions=True)
for file_id, info in zip(files_created, files_processed_info):
file, representation, provider = info
error = None
if isinstance(file_id, BaseException):
error = str(file_id)
file_id = None
self.module.update_db(file_id,
file,
representation,
provider,
error)
log.debug("Sync tasks count {}".
format(len(task_files_to_process)))
files_created = await asyncio.gather(
*task_files_to_process,
return_exceptions=True)
for file_id, info in zip(files_created,
files_processed_info):
file, representation, provider = info
error = None
if isinstance(file_id, BaseException):
error = str(file_id)
file_id = None
self.module.update_db(file_id,
file,
representation,
provider,
error)
duration = time.time() - start_time
log.debug("One loop took {}".format(duration))
log.debug("One loop took {:.2f}s".format(duration))
await asyncio.sleep(self.module.get_loop_delay())
except ConnectionResetError:
log.warning("ConnectionResetError in sync loop, trying next loop",