diff --git a/openpype/modules/default_modules/sync_server/providers/abstract_provider.py b/openpype/modules/default_modules/sync_server/providers/abstract_provider.py index 8ae0ceed79..688a17f14f 100644 --- a/openpype/modules/default_modules/sync_server/providers/abstract_provider.py +++ b/openpype/modules/default_modules/sync_server/providers/abstract_provider.py @@ -201,5 +201,9 @@ class AbstractProvider: msg = "Error in resolving local root from anatomy" log.error(msg) raise ValueError(msg) + except IndexError: + msg = "Path {} contains unfillable placeholder" + log.error(msg) + raise ValueError(msg) return path diff --git a/openpype/modules/default_modules/sync_server/resources/refresh.png b/openpype/modules/default_modules/sync_server/resources/refresh.png new file mode 100644 index 0000000000..5ddd181fe6 Binary files /dev/null and b/openpype/modules/default_modules/sync_server/resources/refresh.png differ diff --git a/openpype/modules/default_modules/sync_server/sync_server.py b/openpype/modules/default_modules/sync_server/sync_server.py index 2227ec9366..6aca2460e3 100644 --- a/openpype/modules/default_modules/sync_server/sync_server.py +++ b/openpype/modules/default_modules/sync_server/sync_server.py @@ -421,6 +421,12 @@ class SyncServerThread(threading.Thread): periodically. """ while self.is_running: + if self.module.long_running_tasks: + task = self.module.long_running_tasks.pop() + log.info("starting long running") + await self.loop.run_in_executor(None, task["func"]) + log.info("finished long running") + self.module.projects_processed.remove(task["project_name"]) await asyncio.sleep(0.5) tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()] diff --git a/openpype/modules/default_modules/sync_server/sync_server_module.py b/openpype/modules/default_modules/sync_server/sync_server_module.py index f2e9237542..d60147a989 100644 --- a/openpype/modules/default_modules/sync_server/sync_server_module.py +++ b/openpype/modules/default_modules/sync_server/sync_server_module.py @@ -4,6 +4,7 @@ from datetime import datetime import threading import platform import copy +from collections import deque from avalon.api import AvalonMongoDB @@ -120,6 +121,11 @@ class SyncServerModule(OpenPypeModule, ITrayModule): self._connection = None + # list of long blocking tasks + self.long_running_tasks = deque() + # projects that long tasks are running on + self.projects_processed = set() + """ Start of Public API """ def add_site(self, collection, representation_id, site_name=None, force=False): @@ -197,6 +203,105 @@ class SyncServerModule(OpenPypeModule, ITrayModule): for repre in representations: self.remove_site(collection, repre.get("_id"), site_name, True) + def create_validate_project_task(self, collection, site_name): + """Adds metadata about project files validation on a queue. + + This process will loop through all representation and check if + their files actually exist on an active site. + + This might be useful for edge cases when artists is switching + between sites, remote site is actually physically mounted and + active site has same file urls etc. + + Task will run on a asyncio loop, shouldn't be blocking. + """ + task = { + "type": "validate", + "project_name": collection, + "func": lambda: self.validate_project(collection, site_name) + } + self.projects_processed.add(collection) + self.long_running_tasks.append(task) + + def validate_project(self, collection, site_name, remove_missing=False): + """ + Validate 'collection' of 'site_name' and its local files + + If file present and not marked with a 'site_name' in DB, DB is + updated with site name and file modified date. + + Args: + module (SyncServerModule) + collection (string): project name + site_name (string): active site name + remove_missing (bool): if True remove sites in DB if missing + physically + """ + self.log.debug("Validation of {} for {} started".format(collection, + site_name)) + query = { + "type": "representation" + } + + representations = list( + self.connection.database[collection].find(query)) + if not representations: + self.log.debug("No repre found") + return + + sites_added = 0 + sites_removed = 0 + for repre in representations: + repre_id = repre["_id"] + for repre_file in repre.get("files", []): + try: + has_site = site_name in [site["name"] + for site in repre_file["sites"]] + except TypeError: + self.log.debug("Structure error in {}".format(repre_id)) + continue + + if has_site and not remove_missing: + continue + + file_path = repre_file.get("path", "") + local_file_path = self.get_local_file_path(collection, + site_name, + file_path) + + if local_file_path and os.path.exists(local_file_path): + self.log.debug("Adding site {} for {}".format(site_name, + repre_id)) + if not has_site: + query = { + "_id": repre_id + } + created_dt = datetime.fromtimestamp( + os.path.getmtime(local_file_path)) + elem = {"name": site_name, + "created_dt": created_dt} + self._add_site(collection, query, [repre], elem, + site_name=site_name, + file_id=repre_file["_id"]) + sites_added += 1 + else: + if has_site and remove_missing: + self.log.debug("Removing site {} for {}". + format(site_name, repre_id)) + self.reset_provider_for_file(collection, + repre_id, + file_id=repre_file["_id"], + remove=True) + sites_removed += 1 + + if sites_added % 100 == 0: + self.log.debug("Sites added {}".format(sites_added)) + + self.log.debug("Validation of {} for {} ended".format(collection, + site_name)) + self.log.info("Sites added {}, sites removed {}".format(sites_added, + sites_removed)) + def pause_representation(self, collection, representation_id, site_name): """ Sets 'representation_id' as paused, eg. no syncing should be @@ -711,22 +816,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): return self.lock = threading.Lock() - - try: - self.sync_server_thread = SyncServerThread(self) - - from .tray.app import SyncServerWindow - self.widget = SyncServerWindow(self) - except ValueError: - log.info("No system setting for sync. Not syncing.", exc_info=True) - self.enabled = False - except KeyError: - log.info(( - "There are not set presets for SyncServer OR " - "Credentials provided are invalid, " - "no syncing possible"). - format(str(self.sync_project_settings)), exc_info=True) - self.enabled = False + self.sync_server_thread = SyncServerThread(self) def tray_start(self): """ @@ -1347,7 +1437,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule): found = False for repre_file in representation.pop().get("files"): for site in repre_file.get("sites"): - if site["name"] == site_name: + if site.get("name") == site_name: found = True break if not found: @@ -1398,13 +1488,20 @@ class SyncServerModule(OpenPypeModule, ITrayModule): self._update_site(collection, query, update, arr_filter) def _add_site(self, collection, query, representation, elem, site_name, - force=False): + force=False, file_id=None): """ Adds 'site_name' to 'representation' on 'collection' + Args: + representation (list of 1 dict) + file_id (ObjectId) + Use 'force' to remove existing or raises ValueError """ for repre_file in representation.pop().get("files"): + if file_id and file_id != repre_file["_id"]: + continue + for site in repre_file.get("sites"): if site["name"] == site_name: if force: @@ -1417,11 +1514,19 @@ class SyncServerModule(OpenPypeModule, ITrayModule): log.info(msg) raise ValueError(msg) - update = { - "$push": {"files.$[].sites": elem} - } + if not file_id: + update = { + "$push": {"files.$[].sites": elem} + } - arr_filter = [] + arr_filter = [] + else: + update = { + "$push": {"files.$[f].sites": elem} + } + arr_filter = [ + {'f._id': file_id} + ] self._update_site(collection, query, update, arr_filter) @@ -1496,7 +1601,24 @@ class SyncServerModule(OpenPypeModule, ITrayModule): return int(ld) def show_widget(self): - """Show dialog to enter credentials""" + """Show dialog for Sync Queue""" + no_errors = False + try: + from .tray.app import SyncServerWindow + self.widget = SyncServerWindow(self) + no_errors = True + except ValueError: + log.info("No system setting for sync. Not syncing.", exc_info=True) + except KeyError: + log.info(( + "There are not set presets for SyncServer OR " + "Credentials provided are invalid, " + "no syncing possible"). + format(str(self.sync_project_settings)), exc_info=True) + except: + log.error("Uncaught exception durin start of SyncServer", + exc_info=True) + self.enabled = no_errors self.widget.show() def _get_success_dict(self, new_file_id): diff --git a/openpype/modules/default_modules/sync_server/tray/models.py b/openpype/modules/default_modules/sync_server/tray/models.py index 5642c5b34a..713e167a6a 100644 --- a/openpype/modules/default_modules/sync_server/tray/models.py +++ b/openpype/modules/default_modules/sync_server/tray/models.py @@ -124,7 +124,8 @@ class _SyncRepresentationModel(QtCore.QAbstractTableModel): if not representations: self.query = self.get_query(load_records) - representations = self.dbcon.aggregate(self.query) + representations = self.dbcon.aggregate(pipeline=self.query, + allowDiskUse=True) self.add_page_records(self.active_site, self.remote_site, representations) @@ -159,7 +160,8 @@ class _SyncRepresentationModel(QtCore.QAbstractTableModel): items_to_fetch = min(self._total_records - self._rec_loaded, self.PAGE_SIZE) self.query = self.get_query(self._rec_loaded) - representations = self.dbcon.aggregate(self.query) + representations = self.dbcon.aggregate(pipeline=self.query, + allowDiskUse=True) self.beginInsertRows(index, self._rec_loaded, self._rec_loaded + items_to_fetch - 1) @@ -192,16 +194,16 @@ class _SyncRepresentationModel(QtCore.QAbstractTableModel): else: order = -1 - backup_sort = dict(self.sort) + backup_sort = dict(self.sort_criteria) - self.sort = {self.SORT_BY_COLUMN[index]: order} # reset + self.sort_criteria = {self.SORT_BY_COLUMN[index]: order} # reset # add last one for key, val in backup_sort.items(): if key != '_id' and key != self.SORT_BY_COLUMN[index]: - self.sort[key] = val + self.sort_criteria[key] = val break # add default one - self.sort['_id'] = 1 + self.sort_criteria['_id'] = 1 self.query = self.get_query() # import json @@ -209,7 +211,8 @@ class _SyncRepresentationModel(QtCore.QAbstractTableModel): # replace('False', 'false').\ # replace('True', 'true').replace('None', 'null')) - representations = self.dbcon.aggregate(self.query) + representations = self.dbcon.aggregate(pipeline=self.query, + allowDiskUse=True) self.refresh(representations) def set_word_filter(self, word_filter): @@ -440,12 +443,13 @@ class SyncRepresentationSummaryModel(_SyncRepresentationModel): self.active_site = self.sync_server.get_active_site(self.project) self.remote_site = self.sync_server.get_remote_site(self.project) - self.sort = self.DEFAULT_SORT + self.sort_criteria = self.DEFAULT_SORT self.query = self.get_query() self.default_query = list(self.get_query()) - representations = self.dbcon.aggregate(self.query) + representations = self.dbcon.aggregate(pipeline=self.query, + allowDiskUse=True) self.refresh(representations) self.timer = QtCore.QTimer() @@ -732,7 +736,7 @@ class SyncRepresentationSummaryModel(_SyncRepresentationModel): ) aggr.extend( - [{"$sort": self.sort}, + [{"$sort": self.sort_criteria}, { '$facet': { 'paginatedResults': [{'$skip': self._rec_loaded}, @@ -970,10 +974,11 @@ class SyncRepresentationDetailModel(_SyncRepresentationModel): self.active_site = self.sync_server.get_active_site(self.project) self.remote_site = self.sync_server.get_remote_site(self.project) - self.sort = self.DEFAULT_SORT + self.sort_criteria = self.DEFAULT_SORT self.query = self.get_query() - representations = self.dbcon.aggregate(self.query) + representations = self.dbcon.aggregate(pipeline=self.query, + allowDiskUse=True) self.refresh(representations) self.timer = QtCore.QTimer() @@ -1235,7 +1240,7 @@ class SyncRepresentationDetailModel(_SyncRepresentationModel): print(self.column_filtering) aggr.extend([ - {"$sort": self.sort}, + {"$sort": self.sort_criteria}, { '$facet': { 'paginatedResults': [{'$skip': self._rec_loaded}, diff --git a/openpype/modules/default_modules/sync_server/tray/widgets.py b/openpype/modules/default_modules/sync_server/tray/widgets.py index 45537c1c2e..5e368f9e0b 100644 --- a/openpype/modules/default_modules/sync_server/tray/widgets.py +++ b/openpype/modules/default_modules/sync_server/tray/widgets.py @@ -32,6 +32,8 @@ class SyncProjectListWidget(QtWidgets.QWidget): project_changed = QtCore.Signal() message_generated = QtCore.Signal(str) + refresh_msec = 10000 + def __init__(self, sync_server, parent): super(SyncProjectListWidget, self).__init__(parent) self.setObjectName("ProjectListWidget") @@ -56,8 +58,8 @@ class SyncProjectListWidget(QtWidgets.QWidget): layout.addWidget(project_list, 1) project_list.customContextMenuRequested.connect(self._on_context_menu) - project_list.selectionModel().currentChanged.connect( - self._on_index_change + project_list.selectionModel().selectionChanged.connect( + self._on_selection_changed ) self.project_model = project_model @@ -69,17 +71,43 @@ class SyncProjectListWidget(QtWidgets.QWidget): self.remote_site = None self.icons = {} - def _on_index_change(self, new_idx, _old_idx): - project_name = new_idx.data(QtCore.Qt.DisplayRole) + self._selection_changed = False + self._model_reset = False + timer = QtCore.QTimer() + timer.setInterval(self.refresh_msec) + timer.timeout.connect(self.refresh) + timer.start() + + self.timer = timer + + def _on_selection_changed(self, new_selection, _old_selection): + # block involuntary selection changes + if self._selection_changed or self._model_reset: + return + + indexes = new_selection.indexes() + if not indexes: + return + + project_name = indexes[0].data(QtCore.Qt.DisplayRole) + + if self.current_project == project_name: + return + self._selection_changed = True self.current_project = project_name self.project_changed.emit() + self.refresh() + self._selection_changed = False def refresh(self): + selected_index = None model = self.project_model + self._model_reset = True model.clear() + self._model_reset = False - project_name = None + selected_item = None for project_name in self.sync_server.sync_project_settings.\ keys(): if self.sync_server.is_paused() or \ @@ -88,20 +116,38 @@ class SyncProjectListWidget(QtWidgets.QWidget): else: icon = self._get_icon("synced") - model.appendRow(QtGui.QStandardItem(icon, project_name)) + if project_name in self.sync_server.projects_processed: + icon = self._get_icon("refresh") + + item = QtGui.QStandardItem(icon, project_name) + model.appendRow(item) + + if self.current_project == project_name: + selected_item = item + + if selected_item: + selected_index = model.indexFromItem(selected_item) if len(self.sync_server.sync_project_settings.keys()) == 0: model.appendRow(QtGui.QStandardItem(lib.DUMMY_PROJECT)) - self.current_project = self.project_list.currentIndex().data( - QtCore.Qt.DisplayRole - ) if not self.current_project: self.current_project = model.item(0).data(QtCore.Qt.DisplayRole) - if project_name: - self.local_site = self.sync_server.get_active_site(project_name) - self.remote_site = self.sync_server.get_remote_site(project_name) + self.project_model = model + + if selected_index and \ + selected_index.isValid() and \ + not self._selection_changed: + mode = QtCore.QItemSelectionModel.Select | \ + QtCore.QItemSelectionModel.Rows + self.project_list.selectionModel().select(selected_index, mode) + + if self.current_project: + self.local_site = self.sync_server.get_active_site( + self.current_project) + self.remote_site = self.sync_server.get_remote_site( + self.current_project) def _can_edit(self): """Returns true if some site is user local site, eg. could edit""" @@ -143,6 +189,11 @@ class SyncProjectListWidget(QtWidgets.QWidget): actions_mapping[action] = self._clear_project menu.addAction(action) + if self.project_name not in self.sync_server.projects_processed: + action = QtWidgets.QAction("Validate files on active site") + actions_mapping[action] = self._validate_site + menu.addAction(action) + result = menu.exec_(QtGui.QCursor.pos()) if result: to_run = actions_mapping[result] @@ -167,6 +218,13 @@ class SyncProjectListWidget(QtWidgets.QWidget): self.project_name = None self.refresh() + def _validate_site(self): + if self.project_name: + self.sync_server.create_validate_project_task(self.project_name, + self.local_site) + self.project_name = None + self.refresh() + class _SyncRepresentationWidget(QtWidgets.QWidget): """ diff --git a/openpype/tests/test_mongo_performance.py b/openpype/tests/mongo_performance.py similarity index 82% rename from openpype/tests/test_mongo_performance.py rename to openpype/tests/mongo_performance.py index cd606d6483..9220c6c730 100644 --- a/openpype/tests/test_mongo_performance.py +++ b/openpype/tests/mongo_performance.py @@ -80,7 +80,7 @@ class TestPerformance(): file_id3 = bson.objectid.ObjectId() self.inserted_ids.extend([file_id, file_id2, file_id3]) - version_str = "v{0:03}".format(i + 1) + version_str = "v{:03d}".format(i + 1) file_name = "test_Cylinder_workfileLookdev_{}.mb".\ format(version_str) @@ -95,7 +95,7 @@ class TestPerformance(): "family": "workfile", "hierarchy": "Assets", "project": {"code": "test", "name": "Test"}, - "version": 1, + "version": i + 1, "asset": "Cylinder", "representation": "mb", "root": self.ROOT_DIR @@ -104,8 +104,8 @@ class TestPerformance(): "name": "mb", "parent": {"oid": '{}'.format(id)}, "data": { - "path": "C:\\projects\\Test\\Assets\\Cylinder\\publish\\workfile\\workfileLookdev\\{}\\{}".format(version_str, file_name), - "template": "{root}\\{project[name]}\\{hierarchy}\\{asset}\\publish\\{family}\\{subset}\\v{version:0>3}\\{project[code]}_{asset}_{subset}_v{version:0>3}<_{output}><.{frame:0>4}>.{representation}" + "path": "C:\\projects\\test_performance\\Assets\\Cylinder\\publish\\workfile\\workfileLookdev\\{}\\{}".format(version_str, file_name), # noqa + "template": "{root[work]}\\{project[name]}\\{hierarchy}\\{asset}\\publish\\{family}\\{subset}\\v{version:0>3}\\{project[code]}_{asset}_{subset}_v{version:0>3}<_{output}><.{frame:0>4}>.{representation}" # noqa }, "type": "representation", "schema": "openpype:representation-2.0" @@ -188,30 +188,21 @@ class TestPerformance(): create_files=False): ret = [ { - "path": "{root}" + "/Test/Assets/Cylinder/publish/workfile/" + - "workfileLookdev/v{0:03}/" + - "test_Cylinder_A_workfileLookdev_v{0:03}.dat" - .format(i, i), + "path": "{root[work]}" + "{root[work]}/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/v{:03d}/test_Cylinder_A_workfileLookdev_v{:03d}.dat".format(i, i), #noqa "_id": '{}'.format(file_id), "hash": "temphash", "sites": self.get_sites(self.MAX_NUMBER_OF_SITES), "size": random.randint(0, self.MAX_FILE_SIZE_B) }, { - "path": "{root}" + "/Test/Assets/Cylinder/publish/workfile/" + - "workfileLookdev/v{0:03}/" + - "test_Cylinder_B_workfileLookdev_v{0:03}.dat" - .format(i, i), + "path": "{root[work]}" + "/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/v{:03d}/test_Cylinder_B_workfileLookdev_v{:03d}.dat".format(i, i), #noqa "_id": '{}'.format(file_id2), "hash": "temphash", "sites": self.get_sites(self.MAX_NUMBER_OF_SITES), "size": random.randint(0, self.MAX_FILE_SIZE_B) }, { - "path": "{root}" + "/Test/Assets/Cylinder/publish/workfile/" + - "workfileLookdev/v{0:03}/" + - "test_Cylinder_C_workfileLookdev_v{0:03}.dat" - .format(i, i), + "path": "{root[work]}" + "/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/v{:03d}/test_Cylinder_C_workfileLookdev_v{:03d}.dat".format(i, i), #noqa "_id": '{}'.format(file_id3), "hash": "temphash", "sites": self.get_sites(self.MAX_NUMBER_OF_SITES), @@ -221,7 +212,7 @@ class TestPerformance(): ] if create_files: for f in ret: - path = f.get("path").replace("{root}", self.ROOT_DIR) + path = f.get("path").replace("{root[work]}", self.ROOT_DIR) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'wb') as fp: fp.write(os.urandom(f.get("size"))) @@ -231,26 +222,26 @@ class TestPerformance(): def get_files_doc(self, i, file_id, file_id2, file_id3): ret = {} ret['{}'.format(file_id)] = { - "path": "{root}" + - "/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" - "v001/test_CylinderA_workfileLookdev_v{0:03}.mb".format(i), + "path": "{root[work]}" + + "/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/" #noqa + "v{:03d}/test_CylinderA_workfileLookdev_v{:03d}.mb".format(i, i), # noqa "hash": "temphash", "sites": ["studio"], "size": 87236 } ret['{}'.format(file_id2)] = { - "path": "{root}" + - "/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" - "v001/test_CylinderB_workfileLookdev_v{0:03}.mb".format(i), + "path": "{root[work]}" + + "/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/" #noqa + "v{:03d}/test_CylinderB_workfileLookdev_v{:03d}.mb".format(i, i), # noqa "hash": "temphash", "sites": ["studio"], "size": 87236 } ret['{}'.format(file_id3)] = { - "path": "{root}" + - "/Test/Assets/Cylinder/publish/workfile/workfileLookdev/" - "v001/test_CylinderC_workfileLookdev_v{0:03}.mb".format(i), + "path": "{root[work]}" + + "/test_performance/Assets/Cylinder/publish/workfile/workfileLookdev/" #noqa + "v{:03d}/test_CylinderC_workfileLookdev_v{:03d}.mb".format(i, i), # noqa "hash": "temphash", "sites": ["studio"], "size": 87236 @@ -287,7 +278,7 @@ class TestPerformance(): if __name__ == '__main__': tp = TestPerformance('array') - tp.prepare(no_of_records=10, create_files=True) # enable to prepare data + tp.prepare(no_of_records=10000, create_files=True) # tp.run(10, 3) # print('-'*50) diff --git a/website/docs/artist_tools.md b/website/docs/artist_tools.md index f099b48a9a..ae7fbbca49 100644 --- a/website/docs/artist_tools.md +++ b/website/docs/artist_tools.md @@ -474,5 +474,13 @@ Actions accessible by context menu on single (or multiple representations): Double click on any of the representation open Detail dialog with information about all files for particular representation. In this dialog error details could be accessed in the context menu. +#### Context menu on project name Artists can also Pause whole server or specific project for synchronization. In that state no download/upload is being run. -This might be helpful if the artist is not interested in a particular project for a while or wants to save bandwidth data limit for a bit. \ No newline at end of file +This might be helpful if the artist is not interested in a particular project for a while or wants to save bandwidth data limit for a bit. + +Another option is `Validate files on active site`. This option triggers process where all representation of the selected project are looped through, file paths are resolved for active site and +if paths point to local system, paths are physically checked if files are existing. If file exists and representation is not marked to be present on 'active_site' in DB, DB is updated +to follow that. + +This might be useful if artist has representation files that Site Sync doesn't know about (newly attached external drive with representations from studio). +This project might take a while! \ No newline at end of file