mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 12:54:40 +01:00
Changed get_sync_representation
Changed sites from dictionary to array of dictionaries for better performance Added @timeit decorator Changed initialization of gdrive._tree to be more lazy
This commit is contained in:
parent
b39a636bb4
commit
e793ee1c11
4 changed files with 349 additions and 114 deletions
22
pype/lib.py
22
pype/lib.py
|
|
@ -15,6 +15,7 @@ import inspect
|
|||
import acre
|
||||
import platform
|
||||
from abc import ABCMeta, abstractmethod
|
||||
import time
|
||||
|
||||
from avalon import io, pipeline
|
||||
import six
|
||||
|
|
@ -1647,3 +1648,24 @@ class ApplicationAction(avalon.api.Action):
|
|||
return launch_application(
|
||||
project_name, asset_name, task_name, self.name
|
||||
)
|
||||
|
||||
|
||||
def timeit(method):
|
||||
""" Decorator to print how much time function took.
|
||||
For debugging.
|
||||
Depends on presence of 'log' object
|
||||
"""
|
||||
def timed(*args, **kw):
|
||||
ts = time.time()
|
||||
result = method(*args, **kw)
|
||||
te = time.time()
|
||||
if 'log_time' in kw:
|
||||
name = kw.get('log_name', method.__name__.upper())
|
||||
kw['log_time'][name] = int((te - ts) * 1000)
|
||||
else:
|
||||
log.debug('%r %2.2f ms' % \
|
||||
(method.__name__, (te - ts) * 1000))
|
||||
print('%r %2.2f ms' % \
|
||||
(method.__name__, (te - ts) * 1000))
|
||||
return result
|
||||
return timed
|
||||
|
|
@ -9,6 +9,7 @@ from .abstract_provider import AbstractProvider
|
|||
# If modifying these scopes, delete the file token.pickle.
|
||||
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
|
||||
from pype.api import Logger
|
||||
from pype.lib import timeit
|
||||
|
||||
SCOPES = ['https://www.googleapis.com/auth/drive.metadata.readonly',
|
||||
'https://www.googleapis.com/auth/drive.file'] # for write|delete
|
||||
|
|
@ -22,14 +23,15 @@ class GDriveHandler(AbstractProvider):
|
|||
As GD API doesn't have real folder structure, 'tree' in memory
|
||||
structure is build in constructor to map folder paths to folder ids,
|
||||
which are used in API. Building of this tree might be expensive and
|
||||
slow and should be run only when necessary.
|
||||
slow and should be run only when necessary. Currently is set to
|
||||
lazy creation, created only after first call when necessary
|
||||
"""
|
||||
FOLDER_STR = 'application/vnd.google-apps.folder'
|
||||
|
||||
def __init__(self, tree=None):
|
||||
self.service = self._get_gd_service()
|
||||
self.root = self.service.files().get(fileId='root').execute()
|
||||
self.tree = tree or self._build_tree(self.list_folders())
|
||||
self._tree = tree
|
||||
|
||||
def _get_gd_service(self):
|
||||
"""
|
||||
|
|
@ -60,18 +62,21 @@ class GDriveHandler(AbstractProvider):
|
|||
credentials=creds, cache_discovery=False)
|
||||
return service
|
||||
|
||||
@timeit
|
||||
def _build_tree(self, folders):
|
||||
"""
|
||||
Create in-memory structure resolving paths to folder id as
|
||||
recursive quering might be slower.
|
||||
recursive querying might be slower.
|
||||
Initialized in the time of class initialization.
|
||||
Maybe should be persisted
|
||||
Tree is structure of path to id:
|
||||
'/': {'id': '1234567'}
|
||||
'/PROJECT_FOLDER': {'id':'222222'}
|
||||
'/PROJECT_FOLDER/Assets': {'id': '3434545'}
|
||||
:param folders: list of dictionaries with folder metadata
|
||||
:return: <dictionary> - path as a key, folder id as a value
|
||||
Args:
|
||||
folders (list): list of dictionaries with folder metadata
|
||||
Returns:
|
||||
(dictionary) path as a key, folder id as a value
|
||||
"""
|
||||
log.debug("build_tree len {}".format(len(folders)))
|
||||
tree = {"/": {"id": self.root["id"]}}
|
||||
|
|
@ -121,9 +126,12 @@ class GDriveHandler(AbstractProvider):
|
|||
constructor provides argument that could inject previously created
|
||||
tree.
|
||||
Tree structure must be handled in thread safe fashion!
|
||||
:return: <dictionary> - url to id
|
||||
Returns:
|
||||
(dictionary) - url to id mapping
|
||||
"""
|
||||
return self.tree
|
||||
if not self._tree:
|
||||
self._tree = self._build_tree(self.list_folders())
|
||||
return self._tree
|
||||
|
||||
def get_root_name(self):
|
||||
"""
|
||||
|
|
@ -136,10 +144,13 @@ class GDriveHandler(AbstractProvider):
|
|||
def create_folder(self, path):
|
||||
"""
|
||||
Create all nonexistent folders and subfolders in 'path'.
|
||||
Updates self.tree structure with new paths
|
||||
Updates self._tree structure with new paths
|
||||
|
||||
:param path: absolute path, starts with GDrive root, without filename
|
||||
:return: <string> folder id of lowest subfolder from 'path'
|
||||
Args:
|
||||
path (string): absolute path, starts with GDrive root,
|
||||
without filename
|
||||
Returns:
|
||||
(string) folder id of lowest subfolder from 'path'
|
||||
"""
|
||||
folder_id = self.folder_path_exists(path)
|
||||
if folder_id:
|
||||
|
|
@ -165,7 +176,7 @@ class GDriveHandler(AbstractProvider):
|
|||
folder_id = folder["id"]
|
||||
|
||||
new_path_key = path + '/' + new_folder_name
|
||||
self.tree[new_path_key] = {"id": folder_id}
|
||||
self.get_tree()[new_path_key] = {"id": folder_id}
|
||||
|
||||
path = new_path_key
|
||||
|
||||
|
|
@ -260,7 +271,7 @@ class GDriveHandler(AbstractProvider):
|
|||
# full path with file name
|
||||
target_name = os.path.basename(local_path)
|
||||
local_path = os.path.dirname(local_path)
|
||||
else: # just folder, get file name from source
|
||||
else: # just folder, get file name from source
|
||||
target_name = os.path.basename(source_path)
|
||||
|
||||
file = os.path.isfile(local_path + "/" + target_name)
|
||||
|
|
@ -303,7 +314,8 @@ class GDriveHandler(AbstractProvider):
|
|||
fields=fields).execute()
|
||||
children = response.get('files', [])
|
||||
if children and not force:
|
||||
raise ValueError("Folder {} is not empty, use 'force'".format(path))
|
||||
raise ValueError("Folder {} is not empty, use 'force'".
|
||||
format(path))
|
||||
|
||||
self.service.files().delete(fileId=folder_id).execute()
|
||||
|
||||
|
|
@ -325,7 +337,7 @@ class GDriveHandler(AbstractProvider):
|
|||
:return: <dictionary> with metadata or raises ValueError
|
||||
"""
|
||||
try:
|
||||
return self.tree[path]
|
||||
return self.get_tree()[path]
|
||||
except Exception:
|
||||
raise ValueError("Uknown folder id {}".format(id))
|
||||
|
||||
|
|
@ -337,6 +349,7 @@ class GDriveHandler(AbstractProvider):
|
|||
"""
|
||||
pass
|
||||
|
||||
@timeit
|
||||
def list_folders(self):
|
||||
""" Lists all folders in GDrive.
|
||||
Used to build in-memory structure of path to folder ids model.
|
||||
|
|
@ -351,7 +364,8 @@ class GDriveHandler(AbstractProvider):
|
|||
pageSize=1000,
|
||||
spaces='drive',
|
||||
fields=fields,
|
||||
pageToken=page_token).execute()
|
||||
pageToken=page_token)\
|
||||
.execute()
|
||||
folders.extend(response.get('files', []))
|
||||
page_token = response.get('nextPageToken', None)
|
||||
if page_token is None:
|
||||
|
|
@ -373,7 +387,8 @@ class GDriveHandler(AbstractProvider):
|
|||
response = self.service.files().list(q=q,
|
||||
spaces='drive',
|
||||
fields=fields,
|
||||
pageToken=page_token).execute()
|
||||
pageToken=page_token).\
|
||||
execute()
|
||||
files.extend(response.get('files', []))
|
||||
page_token = response.get('nextPageToken', None)
|
||||
if page_token is None:
|
||||
|
|
@ -383,9 +398,12 @@ class GDriveHandler(AbstractProvider):
|
|||
|
||||
def folder_path_exists(self, file_path):
|
||||
"""
|
||||
Checks if path from 'file_path' exists. If so, return its folder id.
|
||||
:param file_path: gdrive path with / as a separator
|
||||
:return: <string> folder id or False
|
||||
Checks if path from 'file_path' exists. If so, return its
|
||||
folder id.
|
||||
Args:
|
||||
file_path (string): gdrive path with / as a separator
|
||||
Returns:
|
||||
(string) folder id or False
|
||||
"""
|
||||
if not file_path:
|
||||
return False
|
||||
|
|
@ -396,7 +414,7 @@ class GDriveHandler(AbstractProvider):
|
|||
|
||||
dir_path = os.path.dirname(file_path)
|
||||
|
||||
path = self.tree.get(dir_path, None)
|
||||
path = self.get_tree().get(dir_path, None)
|
||||
if path:
|
||||
return path["id"]
|
||||
|
||||
|
|
@ -506,4 +524,4 @@ class GDriveHandler(AbstractProvider):
|
|||
if __name__ == '__main__':
|
||||
gd = GDriveHandler()
|
||||
print(gd.root)
|
||||
print(gd.tree)
|
||||
print(gd.get_tree())
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
from pype.api import config, Logger
|
||||
from avalon import io
|
||||
from pype.lib import timeit
|
||||
|
||||
import threading
|
||||
import asyncio
|
||||
|
|
@ -14,8 +15,6 @@ import os
|
|||
|
||||
log = Logger().get_logger("SyncServer")
|
||||
|
||||
# test object 5eeb25e411e06a16209ab78e
|
||||
|
||||
|
||||
class SyncStatus(Enum):
|
||||
DO_NOTHING = 0
|
||||
|
|
@ -32,6 +31,11 @@ class SyncServer():
|
|||
that are marked to be in different location than 'studio' (temporary),
|
||||
checks if 'created_dt' field is present denoting successful sync
|
||||
with provider destination.
|
||||
Sites structure is created during publish and by default it will
|
||||
always contain 1 record with "name" == LOCAL_ID and filled "created_dt"
|
||||
AND 1 or multiple records for all defined remote sites, where
|
||||
"created_dt" is empty. This highlights that file should be uploaded to
|
||||
remote destination
|
||||
|
||||
''' - example of synced file test_Cylinder_lookMain_v010.ma to GDrive
|
||||
"files" : [
|
||||
|
|
@ -41,17 +45,25 @@ class SyncServer():
|
|||
"_id" : ObjectId("5eeb25e411e06a16209ab78f"),
|
||||
"hash" : "test_Cylinder_lookMain_v010,ma|1592468963,24|4822",
|
||||
"size" : NumberLong(4822),
|
||||
"sites" : {
|
||||
"studio" : {
|
||||
"sites" : [
|
||||
{
|
||||
"name": "john_local_XD4345",
|
||||
"created_dt" : ISODate("2020-05-22T08:05:44.000Z")
|
||||
},
|
||||
"gdrive" : {
|
||||
{
|
||||
"id" : ObjectId("5eeb25e411e06a16209ab78f"),
|
||||
"created_dt" : ISODate("2020-07-16T17:54:35.833Z")
|
||||
}
|
||||
"name": "gdrive",
|
||||
"created_dt" : ISODate("2020-05-55T08:54:35.833Z")
|
||||
]
|
||||
}
|
||||
},
|
||||
'''
|
||||
Each Tray app has assigned its own LOCAL_ID (TODO from env) which is
|
||||
used in sites as a name. Tray is searching only for records where
|
||||
name matches its LOCAL_ID + any defined remote sites.
|
||||
If the local record has its "created_dt" filled, its a source and
|
||||
process will try to upload the file to all defined remote sites.
|
||||
|
||||
Remote files "id" is real id that could be used in appropriate API.
|
||||
Local files have "id" too, for conformity, contains just file name.
|
||||
It is expected that multiple providers will be implemented in separate
|
||||
|
|
@ -60,6 +72,15 @@ class SyncServer():
|
|||
"""
|
||||
RETRY_CNT = 3 # number of attempts to sync specific file
|
||||
LOCAL_PROVIDER = 'studio'
|
||||
LOCAL_ID = 'local_0' # personal id of this tray TODO - from Env or preset
|
||||
# limit querying DB to look for X number of representations that should
|
||||
# be sync, we try to run more loops with less records
|
||||
# actual number of files synced could be lower as providers can have
|
||||
# different limits imposed by its API
|
||||
# set 0 to no limit
|
||||
REPRESENTATION_LIMIT = 100
|
||||
# after how many seconds start next loop after end of previous
|
||||
LOOP_DELAY = 60
|
||||
|
||||
def __init__(self):
|
||||
self.qaction = None
|
||||
|
|
@ -71,7 +92,7 @@ class SyncServer():
|
|||
if not io.Session:
|
||||
io.install()
|
||||
|
||||
io.Session['AVALON_PROJECT'] = 'Test'
|
||||
io.Session['AVALON_PROJECT'] = 'performance_test' # temp TODO
|
||||
try:
|
||||
self.presets = config.get_presets()["services"]["sync_server"]
|
||||
except Exception:
|
||||
|
|
@ -82,19 +103,56 @@ class SyncServer():
|
|||
).format(str(self.presets)))
|
||||
self.sync_server_thread = SynchServerThread(self)
|
||||
|
||||
@timeit
|
||||
def get_sync_representations(self):
|
||||
"""
|
||||
Get representations that should be synched, these could be
|
||||
recognised by presence of document in 'files.sites', where key is
|
||||
a provider (GDrive, S3) and value is empty document or document with
|
||||
no value for 'created_dt' field.
|
||||
Currently returning all representations.
|
||||
TODO: filter out representations that shouldnt be synced
|
||||
:return: <list>
|
||||
a provider (GDrive, S3) and value is empty document or document
|
||||
without 'created_dt' field. (Don't put null to 'created_dt'!)
|
||||
|
||||
Returns:
|
||||
(list)
|
||||
"""
|
||||
retries_str = "null,"+",".join([str(i) for i in range(self.RETRY_CNT)])
|
||||
representations = io.find({
|
||||
"type": "representation"
|
||||
})
|
||||
"type": "representation",
|
||||
"$or": [
|
||||
{"$and": [
|
||||
{
|
||||
"files.sites": {
|
||||
"$elemMatch": {
|
||||
"name": self.LOCAL_ID,
|
||||
"created_dt": {"$exists": True}
|
||||
}
|
||||
}}, {
|
||||
"files.sites": {
|
||||
"$elemMatch": {
|
||||
"name": "gdrive",
|
||||
"created_dt": {"$exists": False},
|
||||
"tries": {"$nin": [retries_str]}
|
||||
}
|
||||
}
|
||||
}]},
|
||||
{"$and": [
|
||||
{
|
||||
"files.sites": {
|
||||
"$elemMatch": {
|
||||
"name": self.LOCAL_ID,
|
||||
"created_dt": {"$exists": False},
|
||||
"tries": {"$nin": [retries_str]}
|
||||
}
|
||||
}}, {
|
||||
"files.sites": {
|
||||
"$elemMatch": {
|
||||
"name": "gdrive",
|
||||
"created_dt": {"$exists": True}
|
||||
}
|
||||
}
|
||||
}
|
||||
]}
|
||||
]
|
||||
}).limit(self.REPRESENTATION_LIMIT)
|
||||
|
||||
return representations
|
||||
|
||||
|
|
@ -103,29 +161,35 @@ class SyncServer():
|
|||
Check synchronization status for single 'file' of single
|
||||
'representation' by single 'provider'.
|
||||
(Eg. check if 'scene.ma' of lookdev.v10 should be synched to GDrive
|
||||
:param file: <dictionary> of file from representation in Mongo
|
||||
:param provider_name: <string> - gdrive, gdc etc.
|
||||
:return: <string> - one of SyncStatus
|
||||
|
||||
Always is comparing againts local record, eg. site with
|
||||
'name' == self.LOCAL_ID
|
||||
|
||||
Args:
|
||||
file (dictionary): of file from representation in Mongo
|
||||
provider_name (string): - gdrive, gdc etc.
|
||||
Returns:
|
||||
(string) - one of SyncStatus
|
||||
"""
|
||||
sites = file.get("sites") or {}
|
||||
if isinstance(sites, list): # temporary, old format of 'sites'
|
||||
return SyncStatus.DO_NOTHING
|
||||
provider_rec = sites.get(provider_name) or {}
|
||||
if provider_rec:
|
||||
sites = file.get("sites") or []
|
||||
# if isinstance(sites, list): # temporary, old format of 'sites'
|
||||
# return SyncStatus.DO_NOTHING
|
||||
_, provider_rec = self._get_provider_rec(sites, provider_name) or {}
|
||||
if provider_rec: # sync remote target
|
||||
created_dt = provider_rec.get("created_dt")
|
||||
if not created_dt:
|
||||
tries = self._get_tries_count(file, provider_name)
|
||||
tries = self._get_tries_count_from_rec(provider_rec)
|
||||
# file will be skipped if unsuccessfully tried over threshold
|
||||
# error metadata needs to be purged manually in DB to reset
|
||||
if tries < self.RETRY_CNT:
|
||||
return SyncStatus.DO_UPLOAD
|
||||
else:
|
||||
local_rec = sites.get(lib.Providers.LOCAL.value) or {}
|
||||
local_rec = self._get_provider_rec(sites, self.LOCAL_ID) or {}
|
||||
if not local_rec or not local_rec.get("created_dt"):
|
||||
tries = self._get_tries_count(file, self.LOCAL_PROVIDER)
|
||||
tries = self._get_tries_count_from_rec(local_rec)
|
||||
# file will be skipped if unsuccessfully tried over
|
||||
# threshold error metadata needs to be purged manually
|
||||
# in DB to reset
|
||||
# threshold times, error metadata needs to be purged
|
||||
# manually in DB to reset
|
||||
if tries < self.RETRY_CNT:
|
||||
return SyncStatus.DO_DOWNLOAD
|
||||
|
||||
|
|
@ -173,11 +237,15 @@ class SyncServer():
|
|||
async def download(self, file, representation, provider_name, tree=None):
|
||||
"""
|
||||
Downloads file to local folder denoted in representation.Context.
|
||||
:param file: <dictionary> - info about processed file
|
||||
:param representation: <dictionary> - repr that 'file' belongs to
|
||||
:param provider_name: <string> - 'gdrive' etc
|
||||
:param tree: <dictionary> - injected memory structure for performance
|
||||
:return: <string> - 'name' of local file
|
||||
|
||||
Args:
|
||||
file (dictionary) : info about processed file
|
||||
representation (dictionary): repr that 'file' belongs to
|
||||
provider_name (string): 'gdrive' etc
|
||||
tree (dictionary): injected memory structure for performance
|
||||
|
||||
Returns:
|
||||
(string) - 'name' of local file
|
||||
"""
|
||||
with self.lock:
|
||||
handler = lib.factory.get_provider(provider_name, tree)
|
||||
|
|
@ -202,12 +270,16 @@ class SyncServer():
|
|||
"""
|
||||
Update 'provider' portion of records in DB with success (file_id)
|
||||
or error (exception)
|
||||
:param new_file_id: <string>
|
||||
:param file: <dictionary> - info about processed file (pulled from DB)
|
||||
:param representation: <dictionary> - parent repr of file (from DB)
|
||||
:param provider_name: <string> - label ('gdrive', 'S3')
|
||||
:param error: <string> - exception message
|
||||
:return: None
|
||||
|
||||
Args:
|
||||
new_file_id (string):
|
||||
file (dictionary): info about processed file (pulled from DB)
|
||||
representation (dictionary): parent repr of file (from DB)
|
||||
provider_name (string): label ('gdrive', 'S3')
|
||||
error (string): exception message
|
||||
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
representation_id = representation.get("_id")
|
||||
file_id = file.get("_id")
|
||||
|
|
@ -215,17 +287,26 @@ class SyncServer():
|
|||
"_id": representation_id,
|
||||
"files._id": file_id
|
||||
}
|
||||
file_index, _ = self._get_file_info(representation.get('files', []),
|
||||
file_id)
|
||||
site_index, _ = self._get_provider_rec(file.get('sites', []),
|
||||
provider_name)
|
||||
|
||||
log.debug("file_index {}, site_index {}".format(file_index,
|
||||
site_index))
|
||||
update = {}
|
||||
if new_file_id:
|
||||
update["$set"] = self._get_success_dict(provider_name, new_file_id)
|
||||
update["$set"] = self._get_success_dict(file_index, site_index,
|
||||
new_file_id)
|
||||
# reset previous errors if any
|
||||
update["$unset"] = self._get_error_dict(provider_name, "", "")
|
||||
update["$unset"] = self._get_error_dict(file_index, site_index,
|
||||
"", "")
|
||||
else:
|
||||
tries = self._get_tries_count(file, provider_name)
|
||||
tries += 1
|
||||
|
||||
update["$set"] = self._get_error_dict(provider_name, error, tries)
|
||||
update["$set"] = self._get_error_dict(file_index, site_index,
|
||||
error, tries)
|
||||
|
||||
# it actually modifies single _id, but io.update_one not implemented
|
||||
io.update_many(
|
||||
|
|
@ -261,23 +342,64 @@ class SyncServer():
|
|||
exc_info=True
|
||||
)
|
||||
|
||||
def _get_file_info(self, files, _id):
|
||||
"""
|
||||
Return record from list of records which name matches to 'provider'
|
||||
Could be possibly refactored with '_get_file_info' together.
|
||||
|
||||
Args:
|
||||
files (list): of dictionaries with info about published files
|
||||
_id (string): _id of specific file
|
||||
|
||||
Returns:
|
||||
(int, dictionary): index from list and record with metadata
|
||||
about site (if/when created, errors..)
|
||||
OR (-1, None) if not present
|
||||
"""
|
||||
for index, rec in enumerate(files):
|
||||
if rec._id == _id:
|
||||
return index, rec
|
||||
|
||||
return -1, None
|
||||
|
||||
def _get_provider_rec(self, sites, provider):
|
||||
"""
|
||||
Return record from list of records which name matches to 'provider'
|
||||
|
||||
Args:
|
||||
sites (list): of dictionaries
|
||||
provider (string): 'local_XXX', 'gdrive'
|
||||
|
||||
Returns:
|
||||
(int, dictionary): index from list and record with metadata
|
||||
about site (if/when created, errors..)
|
||||
OR (-1, None) if not present
|
||||
"""
|
||||
for index, rec in enumerate(sites):
|
||||
if rec["name"] == provider:
|
||||
return index, rec
|
||||
|
||||
return -1, None
|
||||
|
||||
def thread_stopped(self):
|
||||
self._is_running = False
|
||||
|
||||
def reset_provider_for_file(self, file_id, provider):
|
||||
def reset_provider_for_file(self, file_id, site_index):
|
||||
"""
|
||||
Reset information about synchronization for particular 'file_id'
|
||||
and provider.
|
||||
Useful for testing or forcing file to be reuploaded.
|
||||
:param file_id: <string> file id in representation
|
||||
:param provider: <string> - 'gdrive', 'S3' etc
|
||||
:return: None
|
||||
Args:
|
||||
file_id (string): file id in representation
|
||||
site_index(int): 'gdrive', 'S3' etc
|
||||
Returns:
|
||||
None
|
||||
"""
|
||||
query = {
|
||||
"files._id": file_id
|
||||
}
|
||||
update = {
|
||||
"$unset": {"files.$.sites.{}".format(provider): ""}
|
||||
"$unset": {"files.$.sites.{}".format(site_index): ""}
|
||||
}
|
||||
# it actually modifies single _id, but io.update_one not implemented
|
||||
io.update_many(
|
||||
|
|
@ -285,48 +407,78 @@ class SyncServer():
|
|||
update
|
||||
)
|
||||
|
||||
def _get_success_dict(self, provider, new_file_id):
|
||||
def _get_success_dict(self, file_index, site_index, new_file_id):
|
||||
"""
|
||||
Provide success metadata ("id", "created_dt") to be stored in Db.
|
||||
:param provider: used as part of path in DB (files.sites.gdrive)
|
||||
:param new_file_id: id of created file
|
||||
:return: <dict>
|
||||
Used in $set: "DICT" part of query.
|
||||
Sites are array inside of array(file), so real indexes for both
|
||||
file and site are needed for upgrade in DB.
|
||||
Args:
|
||||
file_index: (int) - index of modified file
|
||||
site_index: (int) - index of modified site of modified file
|
||||
new_file_id: id of created file
|
||||
Returns:
|
||||
(dictionary)
|
||||
"""
|
||||
val = {"files.$.sites.{}.id".format(provider): new_file_id,
|
||||
"files.$.sites.{}.created_dt".format(provider):
|
||||
val = {"files.{}.sites.{}.id".format(file_index, site_index):
|
||||
new_file_id,
|
||||
"files.{}.sites.{}.created_dt".format(file_index, site_index):
|
||||
datetime.utcnow()}
|
||||
return val
|
||||
|
||||
def _get_error_dict(self, provider, error="", tries=""):
|
||||
def _get_error_dict(self, file_index, site_index, error="", tries=""):
|
||||
"""
|
||||
Provide error metadata to be stored in Db.
|
||||
Used for set (error and tries provided) or unset mode.
|
||||
:param provider: used as part of path in DB (files.sites.gdrive)
|
||||
:param error: message
|
||||
:param tries: how many times failed
|
||||
:return: <dict>
|
||||
Args:
|
||||
file_index: (int) - index of modified file
|
||||
site_index: (int) - index of modified site of modified file
|
||||
error: (string) - message
|
||||
tries: how many times failed
|
||||
Returns:
|
||||
(dictionary)
|
||||
"""
|
||||
val = {"files.$.sites.{}.last_failed_dt".format(provider):
|
||||
datetime.utcnow(),
|
||||
"files.$.sites.{}.error".format(provider): error,
|
||||
"files.$.sites.{}.tries".format(provider): tries}
|
||||
val = {"files.{}.sites.{}.last_failed_dt".
|
||||
format(file_index, site_index): datetime.utcnow(),
|
||||
"files.{}.sites.{}.error".format(file_index, site_index): error,
|
||||
"files.{}.sites.{}.tries".format(file_index, site_index): tries
|
||||
}
|
||||
return val
|
||||
|
||||
def _get_tries_count_from_rec(self, rec):
|
||||
"""
|
||||
Get number of failed attempts to synch from site record
|
||||
Args:
|
||||
rec (dictionary): info about specific site record
|
||||
Returns:
|
||||
(int) - number of failed attempts
|
||||
"""
|
||||
if not rec:
|
||||
return 0
|
||||
return rec.get("tries", 0)
|
||||
|
||||
def _get_tries_count(self, file, provider):
|
||||
"""
|
||||
Get number of failed attempts to synch
|
||||
:param file: <dictionary> - info about specific file
|
||||
:param provider: <string> - gdrive, S3 etc
|
||||
:return: <int> - number of failed attempts
|
||||
Args:
|
||||
file (dictionary): info about specific file
|
||||
provider (string): name of site ('gdrive' or specific LOCAL_ID)
|
||||
Returns:
|
||||
(int) - number of failed attempts
|
||||
"""
|
||||
return file.get("sites", {}).get(provider, {}).get("tries", 0)
|
||||
return self._get_provider_rec(file.get("sites", []), provider).\
|
||||
get("tries", 0)
|
||||
|
||||
def _get_local_file_path(self, file, local_root):
|
||||
"""
|
||||
Auxiliary function for replacing rootless path with real path
|
||||
:param file: url to file with {root}
|
||||
:param local_root: value of {root} for local projects
|
||||
:return: <string> - absolute path on local system
|
||||
|
||||
Args:
|
||||
file (dictionary): file info, get 'path' to file with {root}
|
||||
local_root (string): value of {root} for local projects
|
||||
|
||||
Returns:
|
||||
<string> - absolute path on local system
|
||||
"""
|
||||
if not local_root:
|
||||
raise ValueError("Unknown local root for file {}")
|
||||
|
|
@ -335,9 +487,12 @@ class SyncServer():
|
|||
def _get_remote_file_path(self, file, root_name):
|
||||
"""
|
||||
Auxiliary function for replacing rootless path with real path
|
||||
:param file: url to file with {root}
|
||||
:param root_name: value of {root} for remote location
|
||||
:return: <string> - absolute path on remote location
|
||||
Args:
|
||||
file (dictionary): file info, get 'path' to file with {root}
|
||||
root_name (string): value of {root} for remote location
|
||||
|
||||
Returns:
|
||||
(string) - absolute path on remote location
|
||||
"""
|
||||
target_root = '/{}'.format(root_name)
|
||||
return file.get("path", "").replace('{root}', target_root)
|
||||
|
|
@ -390,18 +545,20 @@ class SynchServerThread(threading.Thread):
|
|||
# upload process can find already uploaded file and reuse same
|
||||
# id
|
||||
processed_file_path = set()
|
||||
cnt = 0 # TODO remove
|
||||
for provider in lib.factory.providers.keys():
|
||||
handler = lib.factory.get_provider(provider)
|
||||
limit = lib.factory.get_provider_batch_limit(provider)
|
||||
# first call to get_provider could be expensive, its
|
||||
# building folder tree structure in memory
|
||||
handler = lib.factory.get_provider(provider)
|
||||
tree = handler.get_tree()
|
||||
limit = lib.factory.get_provider_batch_limit(provider)
|
||||
# call only if needed, eg. DO_UPLOAD or DO_DOWNLOAD
|
||||
for sync in sync_representations:
|
||||
if limit <= 0:
|
||||
continue
|
||||
files = sync.get("files") or {}
|
||||
files = sync.get("files") or []
|
||||
if files:
|
||||
for file in files:
|
||||
cnt += 1
|
||||
# skip already processed files
|
||||
file_path = file.get('path', '')
|
||||
if file_path in processed_file_path:
|
||||
|
|
@ -409,8 +566,8 @@ class SynchServerThread(threading.Thread):
|
|||
|
||||
status = self.module.check_status(file,
|
||||
provider)
|
||||
|
||||
if status == SyncStatus.DO_UPLOAD:
|
||||
tree = handler.get_tree()
|
||||
limit -= 1
|
||||
task = asyncio.create_task(
|
||||
self.module.upload(
|
||||
|
|
@ -425,12 +582,14 @@ class SynchServerThread(threading.Thread):
|
|||
provider))
|
||||
processed_file_path.add(file_path)
|
||||
if status == SyncStatus.DO_DOWNLOAD:
|
||||
tree = handler.get_tree()
|
||||
limit -= 1
|
||||
task = asyncio.create_task(
|
||||
self.module.download
|
||||
(file,
|
||||
sync,
|
||||
provider))
|
||||
provider,
|
||||
tree))
|
||||
task_files_to_process.append(task)
|
||||
|
||||
files_processed_info.append((file,
|
||||
|
|
@ -440,6 +599,7 @@ class SynchServerThread(threading.Thread):
|
|||
|
||||
log.debug("gather tasks len {}".
|
||||
format(len(task_files_to_process)))
|
||||
log.debug("checked {} files".format(cnt))
|
||||
files_created = await asyncio.gather(*task_files_to_process,
|
||||
return_exceptions=True)
|
||||
for file_id, info in zip(files_created, files_processed_info):
|
||||
|
|
@ -456,7 +616,7 @@ class SynchServerThread(threading.Thread):
|
|||
|
||||
duration = time.time() - start_time
|
||||
log.debug("One loop took {}".format(duration))
|
||||
await asyncio.sleep(60)
|
||||
await asyncio.sleep(self.module.LOOP_DELAY)
|
||||
except ConnectionResetError:
|
||||
log.warning("ConnectionResetError in sync loop, trying next loop",
|
||||
exc_info=True)
|
||||
|
|
|
|||
|
|
@ -1,11 +1,13 @@
|
|||
import pymongo
|
||||
import bson
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class TestPerformance():
|
||||
'''
|
||||
Class for testing performance of representation and their 'files' parts.
|
||||
Class for testing performance of representation and their 'files'
|
||||
parts.
|
||||
Discussion is if embedded array:
|
||||
'files' : [ {'_id': '1111', 'path':'....},
|
||||
{'_id'...}]
|
||||
|
|
@ -16,13 +18,14 @@ class TestPerformance():
|
|||
}
|
||||
is faster.
|
||||
|
||||
Current results: without additional partial index documents is 3x faster
|
||||
Current results:
|
||||
without additional partial index documents is 3x faster
|
||||
With index is array 50x faster then document
|
||||
|
||||
Partial index something like:
|
||||
db.getCollection('performance_test').createIndex
|
||||
({'files._id': 1},
|
||||
{partialFilterExpresion: {'files': {'$exists': true}})
|
||||
{partialFilterExpresion: {'files': {'$exists': true}}})
|
||||
!DIDNT work for me, had to create manually in Compass
|
||||
|
||||
'''
|
||||
|
|
@ -118,6 +121,7 @@ class TestPerformance():
|
|||
'''
|
||||
print('Testing version {} on {}'.format(self.version,
|
||||
self.collection_name))
|
||||
print('Queries rung {} in {} loops'.format(queries, loops))
|
||||
|
||||
inserted_ids = list(self.collection.
|
||||
find({"inserted_id": {"$exists": True}}))
|
||||
|
|
@ -128,22 +132,27 @@ class TestPerformance():
|
|||
|
||||
found_cnt = 0
|
||||
for _ in range(loops):
|
||||
print('Starting loop {}'.format(_))
|
||||
start = time.time()
|
||||
for _ in range(queries):
|
||||
val = random.choice(self.ids)
|
||||
val = val.replace("'", '')
|
||||
# val = random.choice(self.ids)
|
||||
# val = val.replace("'", '')
|
||||
val = random.randint(0, 50)
|
||||
print(val)
|
||||
|
||||
if (self.version == 'array'):
|
||||
# prepared for partial index, without 'files': exists
|
||||
# wont engage
|
||||
found = self.collection.\
|
||||
find_one({'files': {"$exists": True},
|
||||
'files._id': "{}".format(val)})
|
||||
find({'files': {"$exists": True},
|
||||
'files.sites.name': "local_{}".format(val)}).\
|
||||
count()
|
||||
else:
|
||||
key = "files.{}".format(val)
|
||||
found = self.collection.find_one({key: {"$exists": True}})
|
||||
if found:
|
||||
found_cnt += 1
|
||||
print("found {} records".format(found))
|
||||
# if found:
|
||||
# found_cnt += len(list(found))
|
||||
|
||||
end = time.time()
|
||||
print('duration per loop {}'.format(end - start))
|
||||
|
|
@ -172,8 +181,8 @@ class TestPerformance():
|
|||
"test_CylinderA_workfileLookdev_v{0:03}.mb".format(i),
|
||||
"_id": '{}'.format(file_id),
|
||||
"hash": "temphash",
|
||||
"sites": ["studio"],
|
||||
"size":87236
|
||||
"sites": self.get_sites(50),
|
||||
"size": 87236
|
||||
},
|
||||
{
|
||||
"path": "c:/Test/Assets/Cylinder/publish/workfile/"
|
||||
|
|
@ -181,7 +190,7 @@ class TestPerformance():
|
|||
"test_CylinderB_workfileLookdev_v{0:03}.mb".format(i),
|
||||
"_id": '{}'.format(file_id2),
|
||||
"hash": "temphash",
|
||||
"sites": ["studio"],
|
||||
"sites": self.get_sites(50),
|
||||
"size": 87236
|
||||
},
|
||||
{
|
||||
|
|
@ -190,7 +199,7 @@ class TestPerformance():
|
|||
"test_CylinderC_workfileLookdev_v{0:03}.mb".format(i),
|
||||
"_id": '{}'.format(file_id3),
|
||||
"hash": "temphash",
|
||||
"sites": ["studio"],
|
||||
"sites": self.get_sites(50),
|
||||
"size": 87236
|
||||
}
|
||||
|
||||
|
|
@ -223,11 +232,37 @@ class TestPerformance():
|
|||
|
||||
return ret
|
||||
|
||||
def get_sites(self, number_of_sites=50):
|
||||
"""
|
||||
Return array of sites declaration.
|
||||
Currently on 1st site has "created_dt" fillled, which should
|
||||
trigger upload to 'gdrive' site.
|
||||
'gdrive' site is appended, its destination for syncing for
|
||||
Sync Server
|
||||
Args:
|
||||
number_of_sites:
|
||||
|
||||
Returns:
|
||||
|
||||
"""
|
||||
sites = []
|
||||
for i in range(number_of_sites):
|
||||
site = {'name': "local_{}".format(i)}
|
||||
# do not create null 'created_dt' field, Mongo doesnt like it
|
||||
if i == 0:
|
||||
site['created_dt'] = datetime.now()
|
||||
|
||||
sites.append(site)
|
||||
|
||||
sites.append({'name': "gdrive"})
|
||||
|
||||
return sites
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
tp = TestPerformance('array')
|
||||
tp.prepare() # enable to prepare data
|
||||
tp.run(1000, 3)
|
||||
tp.run(10, 3)
|
||||
|
||||
print('-'*50)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue