SyncServer - modified resolving of paths for local and remote

Fix - status for some failed was incorrectly set to Not available
Extended AbstracProvider with new method for resolvments of paths
Added defaults sites to configured sites
Name refactor
This commit is contained in:
Petr Kalis 2021-03-11 16:20:54 +01:00
parent fcfd537830
commit 83570d4998
5 changed files with 134 additions and 113 deletions

View file

@ -93,3 +93,17 @@ class AbstractProvider(metaclass=ABCMeta):
only parents and their parents) only parents and their parents)
""" """
pass pass
@abstractmethod
def resolve_path(self, path, root_config, anatomy=None):
"""
Replaces root placeholders with appropriate real value from
'root_configs' (from Settings or Local Settings) or Anatomy
(mainly for 'studio' site)
Args:
path(string): path with '{root[work]}/...'
root_config(dict): from Settings or Local Settings
anatomy (Anatomy): prepared anatomy object for project
"""
pass

View file

@ -678,6 +678,16 @@ class GDriveHandler(AbstractProvider):
return return
return provider_presets return provider_presets
def resolve_path(self, path, root_config, anatomy=None):
if not root_config.get("root"):
root_config = {"root": root_config}
try:
return path.format(**root_config)
except KeyError:
msg = "Error in resolving remote root, unknown key"
log.error(msg)
def _handle_q(self, q, trashed=False): def _handle_q(self, q, trashed=False):
""" API list call contain trashed and hidden files/folder by default. """ API list call contain trashed and hidden files/folder by default.
Usually we dont want those, must be included in query explicitly. Usually we dont want those, must be included in query explicitly.

View file

@ -85,6 +85,25 @@ class LocalDriveHandler(AbstractProvider):
def get_tree(self): def get_tree(self):
return return
def resolve_path(self, path, root_config, anatomy=None):
if root_config and not root_config.get("root"):
root_config = {"root": root_config}
try:
if not root_config:
raise KeyError
path = path.format(**root_config)
except KeyError:
try:
path = anatomy.fill_root(path)
except KeyError:
msg = "Error in resolving local root from anatomy"
log.error(msg)
raise ValueError(msg)
return path
def _copy(self, source_path, target_path): def _copy(self, source_path, target_path):
print("copying {}->{}".format(source_path, target_path)) print("copying {}->{}".format(source_path, target_path))
shutil.copy(source_path, target_path) shutil.copy(source_path, target_path)

View file

@ -111,8 +111,7 @@ class SyncServer(PypeModule, ITrayModule):
Sets 'enabled' according to global settings for the module. Sets 'enabled' according to global settings for the module.
Shouldnt be doing any initialization, thats a job for 'tray_init' Shouldnt be doing any initialization, thats a job for 'tray_init'
""" """
sync_server_settings = module_settings[self.name] self.enabled = module_settings[self.name]["enabled"]
self.enabled = sync_server_settings["enabled"]
if asyncio is None: if asyncio is None:
raise AssertionError( raise AssertionError(
"SyncServer module requires Python 3.5 or higher." "SyncServer module requires Python 3.5 or higher."
@ -404,6 +403,14 @@ class SyncServer(PypeModule, ITrayModule):
""" End of Public API """ """ End of Public API """
def get_local_file_path(self, collection, file_path):
"""
Externalized for app
"""
local_file_path, _ = self._resolve_paths(file_path, collection)
return local_file_path
def _get_remote_sites_from_settings(self, sync_settings): def _get_remote_sites_from_settings(self, sync_settings):
if not self.enabled or not sync_settings['enabled']: if not self.enabled or not sync_settings['enabled']:
return [] return []
@ -529,7 +536,7 @@ class SyncServer(PypeModule, ITrayModule):
For performance For performance
""" """
sync_project_presets = {} sync_project_settings = {}
if not self.connection: if not self.connection:
self.connection = AvalonMongoDB() self.connection = AvalonMongoDB()
self.connection.install() self.connection.install()
@ -537,12 +544,12 @@ class SyncServer(PypeModule, ITrayModule):
for collection in self.connection.database.collection_names(False): for collection in self.connection.database.collection_names(False):
sync_settings = self.get_sync_project_setting(collection) sync_settings = self.get_sync_project_setting(collection)
if sync_settings: if sync_settings:
sync_project_presets[collection] = sync_settings sync_project_settings[collection] = sync_settings
if not sync_project_presets: if not sync_project_settings:
log.info("No enabled and configured projects for sync.") log.info("No enabled and configured projects for sync.")
self.sync_project_settings = sync_project_presets self.sync_project_settings = sync_project_settings
def get_sync_project_settings(self, refresh=False): def get_sync_project_settings(self, refresh=False):
""" """
@ -767,7 +774,7 @@ class SyncServer(PypeModule, ITrayModule):
return SyncStatus.DO_NOTHING return SyncStatus.DO_NOTHING
async def upload(self, collection, file, representation, provider_name, async def upload(self, collection, file, representation, provider_name,
site_name, tree=None, preset=None): remote_site_name, tree=None, preset=None):
""" """
Upload single 'file' of a 'representation' to 'provider'. Upload single 'file' of a 'representation' to 'provider'.
Source url is taken from 'file' portion, where {root} placeholder Source url is taken from 'file' portion, where {root} placeholder
@ -797,42 +804,40 @@ class SyncServer(PypeModule, ITrayModule):
# this part modifies structure on 'remote_site', only single # this part modifies structure on 'remote_site', only single
# thread can do that at a time, upload/download to prepared # thread can do that at a time, upload/download to prepared
# structure should be run in parallel # structure should be run in parallel
handler = lib.factory.get_provider(provider_name, site_name, remote_handler = lib.factory.get_provider(provider_name,
tree=tree, presets=preset) remote_site_name,
tree=tree,
presets=preset)
root_configs = self._get_roots_config(self.sync_project_settings, file_path = file.get("path", "")
collection, local_file_path, remote_file_path = self._resolve_paths(
site_name) file_path, collection, remote_site_name, remote_handler
remote_file = self._get_remote_file_path(file, root_configs) )
local_file = self.get_local_file_path(collection, target_folder = os.path.dirname(remote_file_path)
file.get("path", "")) folder_id = remote_handler.create_folder(target_folder)
target_folder = os.path.dirname(remote_file)
folder_id = handler.create_folder(target_folder)
if not folder_id: if not folder_id:
err = "Folder {} wasn't created. Check permissions.".\ err = "Folder {} wasn't created. Check permissions.".\
format(target_folder) format(target_folder)
raise NotADirectoryError(err) raise NotADirectoryError(err)
remote_site = self.get_remote_site(collection)
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
file_id = await loop.run_in_executor(None, file_id = await loop.run_in_executor(None,
handler.upload_file, remote_handler.upload_file,
local_file, local_file_path,
remote_file, remote_file_path,
self, self,
collection, collection,
file, file,
representation, representation,
remote_site, remote_site_name,
True True
) )
return file_id return file_id
async def download(self, collection, file, representation, provider_name, async def download(self, collection, file, representation, provider_name,
site_name, tree=None, preset=None): remote_site_name, tree=None, preset=None):
""" """
Downloads file to local folder denoted in representation.Context. Downloads file to local folder denoted in representation.Context.
@ -850,16 +855,16 @@ class SyncServer(PypeModule, ITrayModule):
(string) - 'name' of local file (string) - 'name' of local file
""" """
with self.lock: with self.lock:
handler = lib.factory.get_provider(provider_name, site_name, remote_handler = lib.factory.get_provider(provider_name,
tree=tree, presets=preset) remote_site_name,
tree=tree,
presets=preset)
root_configs = self._get_roots_config(self.sync_project_settings, file_path = file.get("path", "")
collection, local_file_path, remote_file_path = self._resolve_paths(
site_name) file_path, collection, remote_site_name, remote_handler
remote_file_path = self._get_remote_file_path(file, root_configs) )
local_file_path = self.get_local_file_path(collection,
file.get("path", ""))
local_folder = os.path.dirname(local_file_path) local_folder = os.path.dirname(local_file_path)
os.makedirs(local_folder, exist_ok=True) os.makedirs(local_folder, exist_ok=True)
@ -867,7 +872,7 @@ class SyncServer(PypeModule, ITrayModule):
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
file_id = await loop.run_in_executor(None, file_id = await loop.run_in_executor(None,
handler.download_file, remote_handler.download_file,
remote_file_path, remote_file_path,
local_file_path, local_file_path,
self, self,
@ -1184,7 +1189,7 @@ class SyncServer(PypeModule, ITrayModule):
Returns: Returns:
only logs, catches IndexError and OSError only logs, catches IndexError and OSError
""" """
my_local_site = self.get_my_local_site() my_local_site = get_local_site_id()
if my_local_site != site_name: if my_local_site != site_name:
self.log.warning("Cannot remove non local file for {}". self.log.warning("Cannot remove non local file for {}".
format(site_name)) format(site_name))
@ -1206,12 +1211,14 @@ class SyncServer(PypeModule, ITrayModule):
return return
representation = representation.pop() representation = representation.pop()
local_file_path = ''
for file in representation.get("files"): for file in representation.get("files"):
local_file_path, _ = self._resolve_paths(file.get("path", ""),
collection
)
try: try:
self.log.debug("Removing {}".format(file["path"])) self.log.debug("Removing {}".format(local_file_path))
local_file = self.get_local_file_path(collection, os.remove(local_file_path)
file.get("path", ""))
os.remove(local_file)
except IndexError: except IndexError:
msg = "No file set for {}".format(representation_id) msg = "No file set for {}".format(representation_id)
self.log.debug(msg) self.log.debug(msg)
@ -1222,22 +1229,13 @@ class SyncServer(PypeModule, ITrayModule):
raise ValueError(msg) raise ValueError(msg)
try: try:
folder = os.path.dirname(local_file) folder = os.path.dirname(local_file_path)
os.rmdir(folder) os.rmdir(folder)
except OSError: except OSError:
msg = "folder {} cannot be removed".format(folder) msg = "folder {} cannot be removed".format(folder)
self.log.warning(msg) self.log.warning(msg)
raise ValueError(msg) raise ValueError(msg)
def get_my_local_site(self):
""" TODO remove
Returns name of current user local_site, its Pype wide.
Returns:
(string)
"""
return get_local_site_id()
def get_loop_delay(self, project_name): def get_loop_delay(self, project_name):
""" """
Return count of seconds before next synchronization loop starts Return count of seconds before next synchronization loop starts
@ -1320,59 +1318,35 @@ class SyncServer(PypeModule, ITrayModule):
val = {"files.$[f].sites.$[s].progress": progress} val = {"files.$[f].sites.$[s].progress": progress}
return val return val
def get_local_file_path(self, collection, path): def _resolve_paths(self, file_path, collection,
remote_site_name=None, remote_handler=None):
""" """
Auxiliary function for replacing rootless path with real path Returns tuple of local and remote file paths with {root}
placeholders replaced with proper values from Settings or Anatomy
Works with multi roots. Args:
If root definition is not found in Settings, anatomy is used file_path(string): path with {root}
collection(string): project name
Args: remote_site_name(string): remote site
collection (string): project name remote_handler(AbstractProvider): implementation
path (dictionary): 'path' to file with {root} Returns:
(string, string) - proper absolute paths
Returns:
(string) - absolute path on local system
""" """
local_active_site = self.get_active_site(collection) remote_file_path = ''
sites = self.get_sync_project_setting(collection)["sites"] if remote_handler:
root_config = sites[local_active_site]["root"] root_configs = self._get_roots_config(self.sync_project_settings,
collection,
remote_site_name)
if not root_config.get("root"): remote_file_path = remote_handler.resolve_path(file_path,
root_config = {"root": root_config} root_configs)
try: local_handler = lib.factory.get_provider(
path = path.format(**root_config) 'local_drive', self.get_active_site(collection))
except KeyError: local_file_path = local_handler.resolve_path(
try: file_path, None, self.get_anatomy(collection))
anatomy = self.get_anatomy(collection)
path = anatomy.fill_root(path)
except KeyError:
msg = "Error in resolving local root from anatomy"
self.log.error(msg)
raise ValueError(msg)
return path return local_file_path, remote_file_path
def _get_remote_file_path(self, file, root_config):
"""
Auxiliary function for replacing rootless path with real path
Args:
file (dictionary): file info, get 'path' to file with {root}
root_config (dict): value of {root} for remote location
Returns:
(string) - absolute path on remote location
"""
path = file.get("path", "")
if not root_config.get("root"):
root_config = {"root": root_config}
try:
return path.format(**root_config)
except KeyError:
msg = "Error in resolving remote root, unknown key"
self.log.error(msg)
def _get_retries_arr(self, project_name): def _get_retries_arr(self, project_name):
""" """

View file

@ -159,7 +159,8 @@ class SyncProjectListWidget(ProjectListWidget):
model.clear() model.clear()
project_name = None project_name = None
for project_name in self.sync_server.get_sync_project_settings().keys(): for project_name in self.sync_server.get_sync_project_settings().\
keys():
if self.sync_server.is_paused() or \ if self.sync_server.is_paused() or \
self.sync_server.is_project_paused(project_name): self.sync_server.is_project_paused(project_name):
icon = self._get_icon("paused") icon = self._get_icon("paused")
@ -203,7 +204,6 @@ class SyncProjectListWidget(ProjectListWidget):
menu = QtWidgets.QMenu() menu = QtWidgets.QMenu()
actions_mapping = {} actions_mapping = {}
action = None
if self.sync_server.is_project_paused(self.project_name): if self.sync_server.is_project_paused(self.project_name):
action = QtWidgets.QAction("Unpause") action = QtWidgets.QAction("Unpause")
actions_mapping[action] = self._unpause actions_mapping[action] = self._unpause
@ -212,7 +212,7 @@ class SyncProjectListWidget(ProjectListWidget):
actions_mapping[action] = self._pause actions_mapping[action] = self._pause
menu.addAction(action) menu.addAction(action)
if self.local_site == self.sync_server.get_my_local_site(): if self.local_site == get_local_site_id():
action = QtWidgets.QAction("Clear local project") action = QtWidgets.QAction("Clear local project")
actions_mapping[action] = self._clear_project actions_mapping[action] = self._clear_project
menu.addAction(action) menu.addAction(action)
@ -241,6 +241,7 @@ class SyncProjectListWidget(ProjectListWidget):
self.project_name = None self.project_name = None
self.refresh() self.refresh()
class ProjectModel(QtCore.QAbstractListModel): class ProjectModel(QtCore.QAbstractListModel):
def __init__(self, *args, projects=None, **kwargs): def __init__(self, *args, projects=None, **kwargs):
super(ProjectModel, self).__init__(*args, **kwargs) super(ProjectModel, self).__init__(*args, **kwargs)
@ -256,6 +257,7 @@ class ProjectModel(QtCore.QAbstractListModel):
def rowCount(self, index): def rowCount(self, index):
return len(self.todos) return len(self.todos)
class SyncRepresentationWidget(QtWidgets.QWidget): class SyncRepresentationWidget(QtWidgets.QWidget):
""" """
Summary dialog with list of representations that matches current Summary dialog with list of representations that matches current
@ -478,7 +480,7 @@ class SyncRepresentationWidget(QtWidgets.QWidget):
local_site_name = self.sync_server.get_my_local_site() local_site_name = self.sync_server.get_my_local_site()
try: try:
self.sync_server.add_site( self.sync_server.add_site(
self.table_view.model()._project, project_name,
self.representation_id, self.representation_id,
local_site_name local_site_name
) )
@ -802,7 +804,6 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel):
self._data.append(item) self._data.append(item)
self._rec_loaded += 1 self._rec_loaded += 1
def canFetchMore(self, index): def canFetchMore(self, index):
""" """
Check if there are more records than currently loaded Check if there are more records than currently loaded
@ -854,6 +855,9 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel):
self.sort = {self.SORT_BY_COLUMN[index]: order, '_id': 1} self.sort = {self.SORT_BY_COLUMN[index]: order, '_id': 1}
self.query = self.get_default_query() self.query = self.get_default_query()
# import json
# log.debug(json.dumps(self.query, indent=4).replace('False', 'false').\
# replace('True', 'true').replace('None', 'null'))
representations = self.dbcon.aggregate(self.query) representations = self.dbcon.aggregate(self.query)
self.refresh(representations) self.refresh(representations)
@ -891,7 +895,6 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel):
Returns: Returns:
(QModelIndex) (QModelIndex)
""" """
index = None
for i in range(self.rowCount(None)): for i in range(self.rowCount(None)):
index = self.index(i, 0) index = self.index(i, 0)
value = self.data(index, Qt.UserRole) value = self.data(index, Qt.UserRole)
@ -1000,7 +1003,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel):
0]}, 0]},
'failed_remote_tries': { 'failed_remote_tries': {
'$cond': [{'$size': '$order_remote.tries'}, '$cond': [{'$size': '$order_remote.tries'},
{'$first': '$order_local.tries'}, {'$first': '$order_remote.tries'},
0]}, 0]},
'paused_remote': { 'paused_remote': {
'$cond': [{'$size': "$order_remote.paused"}, '$cond': [{'$size': "$order_remote.paused"},
@ -1027,9 +1030,9 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel):
# select last touch of file # select last touch of file
'updated_dt_remote': {'$max': "$updated_dt_remote"}, 'updated_dt_remote': {'$max': "$updated_dt_remote"},
'failed_remote': {'$sum': '$failed_remote'}, 'failed_remote': {'$sum': '$failed_remote'},
'failed_local': {'$sum': '$paused_remote'}, 'failed_local': {'$sum': '$failed_local'},
'failed_local_tries': {'$sum': '$failed_local_tries'},
'failed_remote_tries': {'$sum': '$failed_remote_tries'}, 'failed_remote_tries': {'$sum': '$failed_remote_tries'},
'failed_local_tries': {'$sum': '$failed_local_tries'},
'paused_remote': {'$sum': '$paused_remote'}, 'paused_remote': {'$sum': '$paused_remote'},
'paused_local': {'$sum': '$paused_local'}, 'paused_local': {'$sum': '$paused_local'},
'updated_dt_local': {'$max': "$updated_dt_local"} 'updated_dt_local': {'$max': "$updated_dt_local"}
@ -1669,7 +1672,6 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel):
Returns: Returns:
(QModelIndex) (QModelIndex)
""" """
index = None
for i in range(self.rowCount(None)): for i in range(self.rowCount(None)):
index = self.index(i, 0) index = self.index(i, 0)
value = self.data(index, Qt.UserRole) value = self.data(index, Qt.UserRole)
@ -1777,14 +1779,15 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel):
"$order_local.error", "$order_local.error",
[""]]}}, [""]]}},
'tries': {'$first': { 'tries': {'$first': {
'$cond': [{'$size': "$order_local.tries"}, '$cond': [
"$order_local.tries", {'$size': "$order_local.tries"},
{'$cond': [ "$order_local.tries",
{'$size': "$order_remote.tries"}, {'$cond': [
"$order_remote.tries", {'$size': "$order_remote.tries"},
[] "$order_remote.tries",
]} []
]}} ]}
]}}
}}, }},
{"$project": self.projection}, {"$project": self.projection},
{"$sort": self.sort}, {"$sort": self.sort},
@ -2015,6 +2018,7 @@ class SizeDelegate(QtWidgets.QStyledItemDelegate):
value /= 1024.0 value /= 1024.0
return "%.1f%s%s" % (value, 'Yi', suffix) return "%.1f%s%s" % (value, 'Yi', suffix)
def _convert_progress(value): def _convert_progress(value):
try: try:
progress = float(value) progress = float(value)