From 0c631e76f13f9cdf84253b8fcf20e1c329169b9a Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Mon, 3 May 2021 14:13:53 +0200 Subject: [PATCH] SyncServer - implemented priority in synchronization loop --- .../modules/sync_server/sync_server_module.py | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/openpype/modules/sync_server/sync_server_module.py b/openpype/modules/sync_server/sync_server_module.py index e411366fd9..5ee9d87791 100644 --- a/openpype/modules/sync_server/sync_server_module.py +++ b/openpype/modules/sync_server/sync_server_module.py @@ -83,6 +83,7 @@ class SyncServerModule(PypeModule, ITrayModule): DEFAULT_SITE = 'studio' LOCAL_SITE = 'local' LOG_PROGRESS_SEC = 5 # how often log progress to DB + DEFAULT_PRIORITY = 50 # higher is better, allowed range 1 - 1000 name = "sync_server" label = "Sync Queue" @@ -662,7 +663,7 @@ class SyncServerModule(PypeModule, ITrayModule): self.connection.Session["AVALON_PROJECT"] = collection # retry_cnt - number of attempts to sync specific file before giving up retries_arr = self._get_retries_arr(collection) - query = { + match = { "type": "representation", "$or": [ {"$and": [ @@ -700,10 +701,41 @@ class SyncServerModule(PypeModule, ITrayModule): ]} ] } + + aggr = [ + {"$match": match}, + {'$unwind': '$files'}, + {'$addFields': { + 'order_remote': { + '$filter': {'input': '$files.sites', 'as': 'p', + 'cond': {'$eq': ['$$p.name', remote_site]} + }}, + 'order_local': { + '$filter': {'input': '$files.sites', 'as': 'p', + 'cond': {'$eq': ['$$p.name', active_site]} + }}, + }}, + {'$addFields': { + 'priority': { + '$cond': [{'$size': '$order_local.priority'}, + {'$first': '$order_local.priority'}, + self.DEFAULT_PRIORITY]} + }}, + {'$group': { + '_id': '$_id', + # pass through context - same for representation + 'context': {'$addToSet': '$context'}, + 'data': {'$addToSet': '$data'}, + # pass through files as a list + 'files': {'$addToSet': '$files'}, + 'priority': {'$max': "$priority"}, + }}, + {"$sort": {'priority': -1, '_id': 1}}, + ] log.debug("active_site:{} - remote_site:{}".format(active_site, remote_site)) - log.debug("query: {}".format(query)) - representations = self.connection.find(query) + log.debug("query: {}".format(aggr)) + representations = self.connection.aggregate(aggr) return representations @@ -796,7 +828,7 @@ class SyncServerModule(PypeModule, ITrayModule): {'s.name': site} ] if file_id: - arr_filter['f._id'] = ObjectId(file_id) + arr_filter.append({'f._id': ObjectId(file_id)}) self.connection.database[collection].update_one( query, @@ -1212,7 +1244,7 @@ class SyncServerModule(PypeModule, ITrayModule): str_key = "files.$[f].sites.$[s].priority" else: str_key = "files.$[].sites.$[s].priority" - return {str_key: priority} + return {str_key: int(priority)} def _get_retries_arr(self, project_name): """