Merge pull request #1430 from pypeclub/bugfix/sync_server_tweaks

This commit is contained in:
Milan Kolar 2021-04-29 22:18:25 +02:00 committed by GitHub
commit 1fa38637e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 93 additions and 21 deletions

View file

@ -7,7 +7,7 @@ from .abstract_provider import AbstractProvider
from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
from openpype.api import Logger
from openpype.api import get_system_settings
from ..utils import time_function
from ..utils import time_function, ResumableError
import time
@ -63,7 +63,14 @@ class GDriveHandler(AbstractProvider):
return
self.service = self._get_gd_service()
self.root = self._prepare_root_info()
try:
self.root = self._prepare_root_info()
except errors.HttpError:
log.warning("HttpError in sync loop, "
"trying next loop",
exc_info=True)
raise ResumableError
self._tree = tree
self.active = True

View file

@ -92,4 +92,4 @@ factory = ProviderFactory()
# 7 denotes number of files that could be synced in single loop - learned by
# trial and error
factory.register_provider('gdrive', GDriveHandler, 7)
factory.register_provider('local_drive', LocalDriveHandler, 10)
factory.register_provider('local_drive', LocalDriveHandler, 50)

View file

@ -8,7 +8,7 @@ from concurrent.futures._base import CancelledError
from .providers import lib
from openpype.lib import PypeLogger
from .utils import SyncStatus
from .utils import SyncStatus, ResumableError
log = PypeLogger().get_logger("SyncServer")
@ -232,6 +232,7 @@ class SyncServerThread(threading.Thread):
self.loop = None
self.is_running = False
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
self.timer = None
def run(self):
self.is_running = True
@ -266,8 +267,8 @@ class SyncServerThread(threading.Thread):
Returns:
"""
try:
while self.is_running and not self.module.is_paused():
while self.is_running and not self.module.is_paused():
try:
import time
start_time = None
self.module.set_sync_project_settings() # clean cache
@ -384,17 +385,27 @@ class SyncServerThread(threading.Thread):
duration = time.time() - start_time
log.debug("One loop took {:.2f}s".format(duration))
await asyncio.sleep(self.module.get_loop_delay(collection))
except ConnectionResetError:
log.warning("ConnectionResetError in sync loop, trying next loop",
exc_info=True)
except CancelledError:
# just stopping server
pass
except Exception:
self.stop()
log.warning("Unhandled exception in sync loop, stopping server",
exc_info=True)
delay = self.module.get_loop_delay(collection)
log.debug("Waiting for {} seconds to new loop".format(delay))
self.timer = asyncio.create_task(self.run_timer(delay))
await asyncio.gather(self.timer)
except ConnectionResetError:
log.warning("ConnectionResetError in sync loop, "
"trying next loop",
exc_info=True)
except CancelledError:
# just stopping server
pass
except ResumableError:
log.warning("ResumableError in sync loop, "
"trying next loop",
exc_info=True)
except Exception:
self.stop()
log.warning("Unhandled except. in sync loop, stopping server",
exc_info=True)
def stop(self):
"""Sets is_running flag to false, 'check_shutdown' shuts server down"""
@ -417,6 +428,17 @@ class SyncServerThread(threading.Thread):
await asyncio.sleep(0.07)
self.loop.stop()
async def run_timer(self, delay):
"""Wait for 'delay' seconds to start next loop"""
await asyncio.sleep(delay)
def reset_timer(self):
"""Called when waiting for next loop should be skipped"""
log.debug("Resetting timer")
if self.timer:
self.timer.cancel()
self.timer = None
def _working_sites(self, collection):
if self.module.is_project_paused(collection):
log.debug("Both sites same, skipping")

View file

@ -401,6 +401,24 @@ class SyncServerModule(PypeModule, ITrayModule):
return remote_site
def reset_timer(self):
"""
Called when waiting for next loop should be skipped.
In case of user's involvement (reset site), start that right away.
"""
self.sync_server_thread.reset_timer()
def get_enabled_projects(self):
"""Returns list of projects which have SyncServer enabled."""
enabled_projects = []
for project in self.connection.projects():
project_name = project["name"]
project_settings = self.get_sync_project_setting(project_name)
if project_settings:
enabled_projects.append(project_name)
return enabled_projects
""" End of Public API """
def get_local_file_path(self, collection, site_name, file_path):
@ -413,7 +431,7 @@ class SyncServerModule(PypeModule, ITrayModule):
return local_file_path
def _get_remote_sites_from_settings(self, sync_settings):
if not self.enabled or not sync_settings['enabled']:
if not self.enabled or not sync_settings.get('enabled'):
return []
remote_sites = [self.DEFAULT_SITE, self.LOCAL_SITE]
@ -424,7 +442,7 @@ class SyncServerModule(PypeModule, ITrayModule):
def _get_enabled_sites_from_settings(self, sync_settings):
sites = [self.DEFAULT_SITE]
if self.enabled and sync_settings['enabled']:
if self.enabled and sync_settings.get('enabled'):
sites.append(self.LOCAL_SITE)
return sites
@ -445,6 +463,11 @@ class SyncServerModule(PypeModule, ITrayModule):
if not self.enabled:
return
enabled_projects = self.get_enabled_projects()
if not enabled_projects:
self.enabled = False
return
self.lock = threading.Lock()
try:

View file

@ -78,7 +78,7 @@ class SyncServerWindow(QtWidgets.QDialog):
layout.addWidget(footer)
self.setLayout(body_layout)
self.setWindowTitle("Sync Server")
self.setWindowTitle("Sync Queue")
self.projects.project_changed.connect(
lambda: repres.table_view.model().set_project(

View file

@ -170,6 +170,8 @@ class _SyncRepresentationModel(QtCore.QAbstractTableModel):
Sort is happening on a DB side, model is reset, db queried
again.
It remembers one last sort, adds it as secondary after new sort.
Args:
index (int): column index
order (int): 0|
@ -184,7 +186,17 @@ class _SyncRepresentationModel(QtCore.QAbstractTableModel):
else:
order = -1
self.sort = {self.SORT_BY_COLUMN[index]: order, '_id': 1}
backup_sort = dict(self.sort)
self.sort = {self.SORT_BY_COLUMN[index]: order} # reset
# add last one
for key, val in backup_sort.items():
if key != '_id':
self.sort[key] = val
break
# add default one
self.sort['_id'] = 1
self.query = self.get_query()
# import json
# log.debug(json.dumps(self.query, indent=4).\

View file

@ -316,6 +316,7 @@ class _SyncRepresentationWidget(QtWidgets.QWidget):
representation_id))
except ValueError as exp:
self.message_generated.emit("Error {}".format(str(exp)))
self.sync_server.reset_timer()
def _remove_site(self, selected_ids=None, site_name=None):
"""
@ -343,6 +344,7 @@ class _SyncRepresentationWidget(QtWidgets.QWidget):
self.model.refresh(
load_records=self.model._rec_loaded)
self.sync_server.reset_timer()
def _reset_site(self, selected_ids=None, site_name=None):
"""
@ -368,6 +370,7 @@ class _SyncRepresentationWidget(QtWidgets.QWidget):
self.model.refresh(
load_records=self.model._rec_loaded)
self.sync_server.reset_timer()
def _open_in_explorer(self, selected_ids=None, site_name=None):
log.debug("Open in Explorer {}:{}".format(selected_ids, site_name))

View file

@ -3,6 +3,11 @@ from openpype.api import Logger
log = Logger().get_logger("SyncServer")
class ResumableError(Exception):
"""Error which could be temporary, skip current loop, try next time"""
pass
class SyncStatus:
DO_NOTHING = 0
DO_UPLOAD = 1