From 399f9bd059a190c8b1c640bb950e070e1b6dec5e Mon Sep 17 00:00:00 2001 From: Petr Kalis Date: Tue, 6 Apr 2021 12:27:49 +0200 Subject: [PATCH] SyncServer adding functionality to Loader In one big commit as PR wasnt merged before rebranding and merge exploded --- openpype/modules/__init__.py | 4 +- openpype/modules/sync_server/__init__.py | 4 +- .../modules/sync_server/providers/__init__.py | 0 .../providers/abstract_provider.py | 109 +- .../modules/sync_server/providers/gdrive.py | 336 ++-- openpype/modules/sync_server/providers/lib.py | 8 +- .../sync_server/providers/local_drive.py | 60 +- openpype/modules/sync_server/sync_server.py | 1567 +++-------------- .../modules/sync_server/sync_server_module.py | 1194 +++++++++++++ openpype/modules/sync_server/tray/app.py | 298 +++- openpype/modules/sync_server/utils.py | 8 +- openpype/plugins/load/add_site.py | 33 + openpype/plugins/load/delete_old_versions.py | 6 +- openpype/plugins/load/remove_site.py | 33 + 14 files changed, 1986 insertions(+), 1674 deletions(-) create mode 100644 openpype/modules/sync_server/providers/__init__.py create mode 100644 openpype/modules/sync_server/sync_server_module.py create mode 100644 openpype/plugins/load/add_site.py create mode 100644 openpype/plugins/load/remove_site.py diff --git a/openpype/modules/__init__.py b/openpype/modules/__init__.py index 4b120647e1..d7c6d99fe6 100644 --- a/openpype/modules/__init__.py +++ b/openpype/modules/__init__.py @@ -41,7 +41,7 @@ from .log_viewer import LogViewModule from .muster import MusterModule from .deadline import DeadlineModule from .standalonepublish_action import StandAlonePublishAction -from .sync_server import SyncServer +from .sync_server import SyncServerModule __all__ = ( @@ -82,5 +82,5 @@ __all__ = ( "DeadlineModule", "StandAlonePublishAction", - "SyncServer" + "SyncServerModule" ) diff --git a/openpype/modules/sync_server/__init__.py b/openpype/modules/sync_server/__init__.py index 7123536fcf..a814f0db62 100644 --- a/openpype/modules/sync_server/__init__.py +++ b/openpype/modules/sync_server/__init__.py @@ -1,5 +1,5 @@ -from openpype.modules.sync_server.sync_server import SyncServer +from openpype.modules.sync_server.sync_server_module import SyncServerModule def tray_init(tray_widget, main_widget): - return SyncServer() + return SyncServerModule() diff --git a/openpype/modules/sync_server/providers/__init__.py b/openpype/modules/sync_server/providers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openpype/modules/sync_server/providers/abstract_provider.py b/openpype/modules/sync_server/providers/abstract_provider.py index 001d4c4d50..35dca87acf 100644 --- a/openpype/modules/sync_server/providers/abstract_provider.py +++ b/openpype/modules/sync_server/providers/abstract_provider.py @@ -1,16 +1,22 @@ -from abc import ABCMeta, abstractmethod +import abc, six +from openpype.api import Anatomy, Logger + +log = Logger().get_logger("SyncServer") -class AbstractProvider(metaclass=ABCMeta): +@six.add_metaclass(abc.ABCMeta) +class AbstractProvider: - def __init__(self, site_name, tree=None, presets=None): + def __init__(self, project_name, site_name, tree=None, presets=None): self.presets = None self.active = False self.site_name = site_name self.presets = presets - @abstractmethod + super(AbstractProvider, self).__init__() + + @abc.abstractmethod def is_active(self): """ Returns True if provider is activated, eg. has working credentials. @@ -18,36 +24,54 @@ class AbstractProvider(metaclass=ABCMeta): (boolean) """ - @abstractmethod - def upload_file(self, source_path, target_path, overwrite=True): + @abc.abstractmethod + def upload_file(self, source_path, path, + server, collection, file, representation, site, + overwrite=False): """ Copy file from 'source_path' to 'target_path' on provider. Use 'overwrite' boolean to rewrite existing file on provider Args: - source_path (string): absolute path on local system - target_path (string): absolute path on provider (GDrive etc.) - overwrite (boolean): True if overwite existing + source_path (string): + path (string): absolute path with or without name of the file + overwrite (boolean): replace existing file + + arguments for saving progress: + server (SyncServer): server instance to call update_db on + collection (str): name of collection + file (dict): info about uploaded file (matches structure from db) + representation (dict): complete repre containing 'file' + site (str): site name Returns: (string) file_id of created file, raises exception """ pass - @abstractmethod - def download_file(self, source_path, local_path, overwrite=True): + @abc.abstractmethod + def download_file(self, source_path, local_path, + server, collection, file, representation, site, + overwrite=False): """ Download file from provider into local system Args: source_path (string): absolute path on provider - local_path (string): absolute path on local - overwrite (bool): default set to True + local_path (string): absolute path with or without name of the file + overwrite (boolean): replace existing file + + arguments for saving progress: + server (SyncServer): server instance to call update_db on + collection (str): name of collection + file (dict): info about uploaded file (matches structure from db) + representation (dict): complete repre containing 'file' + site (str): site name Returns: None """ pass - @abstractmethod + @abc.abstractmethod def delete_file(self, path): """ Deletes file from 'path'. Expects path to specific file. @@ -60,7 +84,7 @@ class AbstractProvider(metaclass=ABCMeta): """ pass - @abstractmethod + @abc.abstractmethod def list_folder(self, folder_path): """ List all files and subfolders of particular path non-recursively. @@ -72,7 +96,7 @@ class AbstractProvider(metaclass=ABCMeta): """ pass - @abstractmethod + @abc.abstractmethod def create_folder(self, folder_path): """ Create all nonexistent folders and subfolders in 'path'. @@ -85,7 +109,7 @@ class AbstractProvider(metaclass=ABCMeta): """ pass - @abstractmethod + @abc.abstractmethod def get_tree(self): """ Creates folder structure for providers which do not provide @@ -94,16 +118,49 @@ class AbstractProvider(metaclass=ABCMeta): """ pass - @abstractmethod - def resolve_path(self, path, root_config, anatomy=None): + @abc.abstractmethod + def get_roots_config(self, anatomy=None): """ - Replaces root placeholders with appropriate real value from - 'root_configs' (from Settings or Local Settings) or Anatomy - (mainly for 'studio' site) + Returns root values for path resolving - Args: - path(string): path with '{root[work]}/...' - root_config(dict): from Settings or Local Settings - anatomy (Anatomy): prepared anatomy object for project + Takes value from Anatomy which takes values from Settings + overridden by Local Settings + + Returns: + (dict) - {"root": {"root": "/My Drive"}} + OR + {"root": {"root_ONE": "value", "root_TWO":"value}} + Format is importing for usage of python's format ** approach """ pass + + def resolve_path(self, path, root_config=None, anatomy=None): + """ + Replaces all root placeholders with proper values + + Args: + path(string): root[work]/folder... + root_config (dict): {'work': "c:/..."...} + anatomy (Anatomy): object of Anatomy + Returns: + (string): proper url + """ + if root_config and not root_config.get("root"): + root_config = {"root": root_config} + else: + root_config = self.get_roots_config(anatomy) + + try: + if not root_config: + raise KeyError + + path = path.format(**root_config) + except KeyError: + try: + path = anatomy.fill_root(path) + except KeyError: + msg = "Error in resolving local root from anatomy" + log.error(msg) + raise ValueError(msg) + + return path diff --git a/openpype/modules/sync_server/providers/gdrive.py b/openpype/modules/sync_server/providers/gdrive.py index 6c01bc4e6f..b6ece5263b 100644 --- a/openpype/modules/sync_server/providers/gdrive.py +++ b/openpype/modules/sync_server/providers/gdrive.py @@ -6,10 +6,11 @@ from googleapiclient import errors from .abstract_provider import AbstractProvider from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload from openpype.api import Logger -from openpype.api import get_system_settings +from openpype.api import get_system_settings, Anatomy from ..utils import time_function import time + SCOPES = ['https://www.googleapis.com/auth/drive.metadata.readonly', 'https://www.googleapis.com/auth/drive.file', 'https://www.googleapis.com/auth/drive.readonly'] # for write|delete @@ -45,9 +46,10 @@ class GDriveHandler(AbstractProvider): MY_DRIVE_STR = 'My Drive' # name of root folder of regular Google drive CHUNK_SIZE = 2097152 # must be divisible by 256! - def __init__(self, site_name, tree=None, presets=None): + def __init__(self, project_name, site_name, tree=None, presets=None): self.presets = None self.active = False + self.project_name = project_name self.site_name = site_name self.presets = presets @@ -65,137 +67,6 @@ class GDriveHandler(AbstractProvider): self._tree = tree self.active = True - def _get_gd_service(self): - """ - Authorize client with 'credentials.json', uses service account. - Service account needs to have target folder shared with. - Produces service that communicates with GDrive API. - - Returns: - None - """ - creds = service_account.Credentials.from_service_account_file( - self.presets["credentials_url"], - scopes=SCOPES) - service = build('drive', 'v3', - credentials=creds, cache_discovery=False) - return service - - def _prepare_root_info(self): - """ - Prepare info about roots and theirs folder ids from 'presets'. - Configuration might be for single or multiroot projects. - Regular My Drive and Shared drives are implemented, their root - folder ids need to be queried in slightly different way. - - Returns: - (dicts) of dicts where root folders are keys - """ - roots = {} - for path in self.get_roots_config().values(): - if self.MY_DRIVE_STR in path: - roots[self.MY_DRIVE_STR] = self.service.files()\ - .get(fileId='root').execute() - else: - shared_drives = [] - page_token = None - - while True: - response = self.service.drives().list( - pageSize=100, - pageToken=page_token).execute() - shared_drives.extend(response.get('drives', [])) - page_token = response.get('nextPageToken', None) - if page_token is None: - break - - folders = path.split('/') - if len(folders) < 2: - raise ValueError("Wrong root folder definition {}". - format(path)) - - for shared_drive in shared_drives: - if folders[1] in shared_drive["name"]: - roots[shared_drive["name"]] = { - "name": shared_drive["name"], - "id": shared_drive["id"]} - if self.MY_DRIVE_STR not in roots: # add My Drive always - roots[self.MY_DRIVE_STR] = self.service.files() \ - .get(fileId='root').execute() - - return roots - - @time_function - def _build_tree(self, folders): - """ - Create in-memory structure resolving paths to folder id as - recursive querying might be slower. - Initialized in the time of class initialization. - Maybe should be persisted - Tree is structure of path to id: - '/ROOT': {'id': '1234567'} - '/ROOT/PROJECT_FOLDER': {'id':'222222'} - '/ROOT/PROJECT_FOLDER/Assets': {'id': '3434545'} - Args: - folders (list): list of dictionaries with folder metadata - Returns: - (dictionary) path as a key, folder id as a value - """ - log.debug("build_tree len {}".format(len(folders))) - root_ids = [] - default_root_id = None - tree = {} - ending_by = {} - for root_name, root in self.root.items(): # might be multiple roots - if root["id"] not in root_ids: - tree["/" + root_name] = {"id": root["id"]} - ending_by[root["id"]] = "/" + root_name - root_ids.append(root["id"]) - - if self.MY_DRIVE_STR == root_name: - default_root_id = root["id"] - - no_parents_yet = {} - while folders: - folder = folders.pop(0) - parents = folder.get("parents", []) - # weird cases, shared folders, etc, parent under root - if not parents: - parent = default_root_id - else: - parent = parents[0] - - if folder["id"] in root_ids: # do not process root - continue - - if parent in ending_by: - path_key = ending_by[parent] + "/" + folder["name"] - ending_by[folder["id"]] = path_key - tree[path_key] = {"id": folder["id"]} - else: - no_parents_yet.setdefault(parent, []).append((folder["id"], - folder["name"])) - loop_cnt = 0 - # break if looped more then X times - safety against infinite loop - while no_parents_yet and loop_cnt < 20: - - keys = list(no_parents_yet.keys()) - for parent in keys: - if parent in ending_by.keys(): - subfolders = no_parents_yet.pop(parent) - for folder_id, folder_name in subfolders: - path_key = ending_by[parent] + "/" + folder_name - ending_by[folder_id] = path_key - tree[path_key] = {"id": folder_id} - loop_cnt += 1 - - if len(no_parents_yet) > 0: - log.debug("Some folders path are not resolved {}". - format(no_parents_yet)) - log.debug("Remove deleted folders from trash.") - - return tree - def is_active(self): """ Returns True if provider is activated, eg. has working credentials. @@ -204,6 +75,21 @@ class GDriveHandler(AbstractProvider): """ return self.active + def get_roots_config(self, anatomy=None): + """ + Returns root values for path resolving + + Use only Settings as GDrive cannot be modified by Local Settings + + Returns: + (dict) - {"root": {"root": "/My Drive"}} + OR + {"root": {"root_ONE": "value", "root_TWO":"value}} + Format is importing for usage of python's format ** approach + """ + # GDrive roots cannot be locally overridden + return self.presets['root'] + def get_tree(self): """ Building of the folder tree could be potentially expensive, @@ -217,26 +103,6 @@ class GDriveHandler(AbstractProvider): self._tree = self._build_tree(self.list_folders()) return self._tree - def get_roots_config(self): - """ - Returns value from presets of roots. It calculates with multi - roots. Config should be simple key value, or dictionary. - - Examples: - "root": "/My Drive" - OR - "root": {"root_ONE": "value", "root_TWO":"value} - Returns: - (dict) - {"root": {"root": "/My Drive"}} - OR - {"root": {"root_ONE": "value", "root_TWO":"value}} - Format is importing for usage of python's format ** approach - """ - roots = self.presets["root"] - if isinstance(roots, str): - roots = {"root": roots} - return roots - def create_folder(self, path): """ Create all nonexistent folders and subfolders in 'path'. @@ -510,20 +376,6 @@ class GDriveHandler(AbstractProvider): self.service.files().delete(fileId=file["id"], supportsAllDrives=True).execute() - def _get_folder_metadata(self, path): - """ - Get info about folder with 'path' - Args: - path (string): - - Returns: - (dictionary) with metadata or raises ValueError - """ - try: - return self.get_tree()[path] - except Exception: - raise ValueError("Uknown folder id {}".format(id)) - def list_folder(self, folder_path): """ List all files and subfolders of particular path non-recursively. @@ -678,15 +530,151 @@ class GDriveHandler(AbstractProvider): return return provider_presets - def resolve_path(self, path, root_config, anatomy=None): - if not root_config.get("root"): - root_config = {"root": root_config} + def _get_gd_service(self): + """ + Authorize client with 'credentials.json', uses service account. + Service account needs to have target folder shared with. + Produces service that communicates with GDrive API. + Returns: + None + """ + creds = service_account.Credentials.from_service_account_file( + self.presets["credentials_url"], + scopes=SCOPES) + service = build('drive', 'v3', + credentials=creds, cache_discovery=False) + return service + + def _prepare_root_info(self): + """ + Prepare info about roots and theirs folder ids from 'presets'. + Configuration might be for single or multiroot projects. + Regular My Drive and Shared drives are implemented, their root + folder ids need to be queried in slightly different way. + + Returns: + (dicts) of dicts where root folders are keys + """ + roots = {} + config_roots = self.get_roots_config() + for path in config_roots.values(): + if self.MY_DRIVE_STR in path: + roots[self.MY_DRIVE_STR] = self.service.files()\ + .get(fileId='root').execute() + else: + shared_drives = [] + page_token = None + + while True: + response = self.service.drives().list( + pageSize=100, + pageToken=page_token).execute() + shared_drives.extend(response.get('drives', [])) + page_token = response.get('nextPageToken', None) + if page_token is None: + break + + folders = path.split('/') + if len(folders) < 2: + raise ValueError("Wrong root folder definition {}". + format(path)) + + for shared_drive in shared_drives: + if folders[1] in shared_drive["name"]: + roots[shared_drive["name"]] = { + "name": shared_drive["name"], + "id": shared_drive["id"]} + if self.MY_DRIVE_STR not in roots: # add My Drive always + roots[self.MY_DRIVE_STR] = self.service.files() \ + .get(fileId='root').execute() + + return roots + + @time_function + def _build_tree(self, folders): + """ + Create in-memory structure resolving paths to folder id as + recursive querying might be slower. + Initialized in the time of class initialization. + Maybe should be persisted + Tree is structure of path to id: + '/ROOT': {'id': '1234567'} + '/ROOT/PROJECT_FOLDER': {'id':'222222'} + '/ROOT/PROJECT_FOLDER/Assets': {'id': '3434545'} + Args: + folders (list): list of dictionaries with folder metadata + Returns: + (dictionary) path as a key, folder id as a value + """ + log.debug("build_tree len {}".format(len(folders))) + root_ids = [] + default_root_id = None + tree = {} + ending_by = {} + for root_name, root in self.root.items(): # might be multiple roots + if root["id"] not in root_ids: + tree["/" + root_name] = {"id": root["id"]} + ending_by[root["id"]] = "/" + root_name + root_ids.append(root["id"]) + + if self.MY_DRIVE_STR == root_name: + default_root_id = root["id"] + + no_parents_yet = {} + while folders: + folder = folders.pop(0) + parents = folder.get("parents", []) + # weird cases, shared folders, etc, parent under root + if not parents: + parent = default_root_id + else: + parent = parents[0] + + if folder["id"] in root_ids: # do not process root + continue + + if parent in ending_by: + path_key = ending_by[parent] + "/" + folder["name"] + ending_by[folder["id"]] = path_key + tree[path_key] = {"id": folder["id"]} + else: + no_parents_yet.setdefault(parent, []).append((folder["id"], + folder["name"])) + loop_cnt = 0 + # break if looped more then X times - safety against infinite loop + while no_parents_yet and loop_cnt < 20: + + keys = list(no_parents_yet.keys()) + for parent in keys: + if parent in ending_by.keys(): + subfolders = no_parents_yet.pop(parent) + for folder_id, folder_name in subfolders: + path_key = ending_by[parent] + "/" + folder_name + ending_by[folder_id] = path_key + tree[path_key] = {"id": folder_id} + loop_cnt += 1 + + if len(no_parents_yet) > 0: + log.debug("Some folders path are not resolved {}". + format(no_parents_yet)) + log.debug("Remove deleted folders from trash.") + + return tree + + def _get_folder_metadata(self, path): + """ + Get info about folder with 'path' + Args: + path (string): + + Returns: + (dictionary) with metadata or raises ValueError + """ try: - return path.format(**root_config) - except KeyError: - msg = "Error in resolving remote root, unknown key" - log.error(msg) + return self.get_tree()[path] + except Exception: + raise ValueError("Uknown folder id {}".format(id)) def _handle_q(self, q, trashed=False): """ API list call contain trashed and hidden files/folder by default. diff --git a/openpype/modules/sync_server/providers/lib.py b/openpype/modules/sync_server/providers/lib.py index 144594ecbe..58947e115d 100644 --- a/openpype/modules/sync_server/providers/lib.py +++ b/openpype/modules/sync_server/providers/lib.py @@ -1,4 +1,3 @@ -from enum import Enum from .gdrive import GDriveHandler from .local_drive import LocalDriveHandler @@ -25,7 +24,8 @@ class ProviderFactory: """ self.providers[provider] = (creator, batch_limit) - def get_provider(self, provider, site_name, tree=None, presets=None): + def get_provider(self, provider, project_name, site_name, + tree=None, presets=None): """ Returns new instance of provider client for specific site. One provider could have multiple sites. @@ -37,6 +37,7 @@ class ProviderFactory: provider (string): 'gdrive','S3' site_name (string): descriptor of site, different service accounts must have different site name + project_name (string): different projects could have diff. sites tree (dictionary): - folder paths to folder id structure presets (dictionary): config for provider and site (eg. "credentials_url"..) @@ -44,7 +45,8 @@ class ProviderFactory: (implementation of AbstractProvider) """ creator_info = self._get_creator_info(provider) - site = creator_info[0](site_name, tree, presets) # call init + # call init + site = creator_info[0](project_name, site_name, tree, presets) return site diff --git a/openpype/modules/sync_server/providers/local_drive.py b/openpype/modules/sync_server/providers/local_drive.py index fa8dd4c183..1f4fca80eb 100644 --- a/openpype/modules/sync_server/providers/local_drive.py +++ b/openpype/modules/sync_server/providers/local_drive.py @@ -4,7 +4,7 @@ import shutil import threading import time -from openpype.api import Logger +from openpype.api import Logger, Anatomy from .abstract_provider import AbstractProvider log = Logger().get_logger("SyncServer") @@ -12,6 +12,14 @@ log = Logger().get_logger("SyncServer") class LocalDriveHandler(AbstractProvider): """ Handles required operations on mounted disks with OS """ + def __init__(self, project_name, site_name, tree=None, presets=None): + self.presets = None + self.active = False + self.project_name = project_name + self.site_name = site_name + + self.active = self.is_active() + def is_active(self): return True @@ -82,27 +90,37 @@ class LocalDriveHandler(AbstractProvider): os.makedirs(folder_path, exist_ok=True) return folder_path + def get_roots_config(self, anatomy=None): + """ + Returns root values for path resolving + + Takes value from Anatomy which takes values from Settings + overridden by Local Settings + + Returns: + (dict) - {"root": {"root": "/My Drive"}} + OR + {"root": {"root_ONE": "value", "root_TWO":"value}} + Format is importing for usage of python's format ** approach + """ + if not anatomy: + anatomy = Anatomy(self.project_name, + self._normalize_site_name(self.site_name)) + + return {'root': anatomy.roots} + def get_tree(self): return - def resolve_path(self, path, root_config, anatomy=None): - if root_config and not root_config.get("root"): - root_config = {"root": root_config} + def get_configurable_items_for_site(self): + """ + Returns list of items that should be configurable by User - try: - if not root_config: - raise KeyError - - path = path.format(**root_config) - except KeyError: - try: - path = anatomy.fill_root(path) - except KeyError: - msg = "Error in resolving local root from anatomy" - log.error(msg) - raise ValueError(msg) - - return path + Returns: + (list of dict) + [{key:"root", label:"root", value:"valueFromSettings"}] + """ + pass def _copy(self, source_path, target_path): print("copying {}->{}".format(source_path, target_path)) @@ -133,3 +151,9 @@ class LocalDriveHandler(AbstractProvider): ) target_file_size = os.path.getsize(target_path) time.sleep(0.5) + + def _normalize_site_name(self, site_name): + """Transform user id to 'local' for Local settings""" + if site_name != 'studio': + return 'local' + return site_name diff --git a/openpype/modules/sync_server/sync_server.py b/openpype/modules/sync_server/sync_server.py index 62a5dc675c..e97c0e8844 100644 --- a/openpype/modules/sync_server/sync_server.py +++ b/openpype/modules/sync_server/sync_server.py @@ -1,1391 +1,225 @@ -from openpype.api import ( - Anatomy, - get_project_settings, - get_local_site_id) - +"""Python 3 only implementation.""" +import os +import asyncio import threading import concurrent.futures from concurrent.futures._base import CancelledError -from enum import Enum -from datetime import datetime - from .providers import lib -import os -from bson.objectid import ObjectId - -from avalon.api import AvalonMongoDB -from .utils import time_function - -import six from openpype.lib import PypeLogger -from .. import PypeModule, ITrayModule -from .providers.local_drive import LocalDriveHandler -if six.PY2: - web = asyncio = STATIC_DIR = WebSocketAsync = None -else: - import asyncio +from .utils import SyncStatus + log = PypeLogger().get_logger("SyncServer") -class SyncStatus(Enum): - DO_NOTHING = 0 - DO_UPLOAD = 1 - DO_DOWNLOAD = 2 - - -class SyncServer(PypeModule, ITrayModule): +async def upload(module, collection, file, representation, provider_name, + remote_site_name, tree=None, preset=None): """ - Synchronization server that is syncing published files from local to - any of implemented providers (like GDrive, S3 etc.) - Runs in the background and checks all representations, looks for files - that are marked to be in different location than 'studio' (temporary), - checks if 'created_dt' field is present denoting successful sync - with provider destination. - Sites structure is created during publish OR by calling 'add_site' - method. + Upload single 'file' of a 'representation' to 'provider'. + Source url is taken from 'file' portion, where {root} placeholder + is replaced by 'representation.Context.root' + Provider could be one of implemented in provider.py. - By default it will always contain 1 record with - "name" == self.presets["active_site"] and - filled "created_dt" AND 1 or multiple records for all defined - remote sites, where "created_dt" is not present. - This highlights that file should be uploaded to - remote destination + Updates MongoDB, fills in id of file from provider (ie. file_id + from GDrive), 'created_dt' - time of upload - ''' - example of synced file test_Cylinder_lookMain_v010.ma to GDrive - "files" : [ - { - "path" : "{root}/Test/Assets/Cylinder/publish/look/lookMain/v010/ - test_Cylinder_lookMain_v010.ma", - "_id" : ObjectId("5eeb25e411e06a16209ab78f"), - "hash" : "test_Cylinder_lookMain_v010,ma|1592468963,24|4822", - "size" : NumberLong(4822), - "sites" : [ - { - "name": "john_local_XD4345", - "created_dt" : ISODate("2020-05-22T08:05:44.000Z") - }, - { - "id" : ObjectId("5eeb25e411e06a16209ab78f"), - "name": "gdrive", - "created_dt" : ISODate("2020-05-55T08:54:35.833Z") - ] - } - }, - ''' - Each Tray app has assigned its own self.presets["local_id"] - used in sites as a name. - Tray is searching only for records where name matches its - self.presets["active_site"] + self.presets["remote_site"]. - "active_site" could be storage in studio ('studio'), or specific - "local_id" when user is working disconnected from home. - If the local record has its "created_dt" filled, it is a source and - process will try to upload the file to all defined remote sites. + 'provider_name' doesn't have to match to 'site_name', single + provider (GDrive) might have multiple sites ('projectA', + 'projectB') - Remote files "id" is real id that could be used in appropriate API. - Local files have "id" too, for conformity, contains just file name. - It is expected that multiple providers will be implemented in separate - classes and registered in 'providers.py'. + Args: + module(SyncServerModule): object to run SyncServerModule API + collection (str): source collection + file (dictionary): of file from representation in Mongo + representation (dictionary): of representation + provider_name (string): gdrive, gdc etc. + site_name (string): site on provider, single provider(gdrive) could + have multiple sites (different accounts, credentials) + tree (dictionary): injected memory structure for performance + preset (dictionary): site config ('credentials_url', 'root'...) """ - # limit querying DB to look for X number of representations that should - # be sync, we try to run more loops with less records - # actual number of files synced could be lower as providers can have - # different limits imposed by its API - # set 0 to no limit - REPRESENTATION_LIMIT = 100 - DEFAULT_SITE = 'studio' - LOCAL_SITE = 'local' - LOG_PROGRESS_SEC = 5 # how often log progress to DB + # create ids sequentially, upload file in parallel later + with module.lock: + # this part modifies structure on 'remote_site', only single + # thread can do that at a time, upload/download to prepared + # structure should be run in parallel + remote_handler = lib.factory.get_provider(provider_name, + collection, + remote_site_name, + tree=tree, + presets=preset) - name = "sync_server" - label = "Sync Server" - - def initialize(self, module_settings): - """ - Called during Module Manager creation. - - Collects needed data, checks asyncio presence. - Sets 'enabled' according to global settings for the module. - Shouldnt be doing any initialization, thats a job for 'tray_init' - """ - self.enabled = module_settings[self.name]["enabled"] - if asyncio is None: - raise AssertionError( - "SyncServer module requires Python 3.5 or higher." + file_path = file.get("path", "") + try: + local_file_path, remote_file_path = resolve_paths(module, + file_path, collection, remote_site_name, remote_handler ) - # some parts of code need to run sequentially, not in async - self.lock = None - self.connection = None # connection to avalon DB to update state - # settings for all enabled projects for sync - self.sync_project_settings = None - self.sync_server_thread = None # asyncio requires new thread - - self.action_show_widget = None - self._paused = False - self._paused_projects = set() - self._paused_representations = set() - self._anatomies = {} - - """ Start of Public API """ - def add_site(self, collection, representation_id, site_name=None): - """ - Adds new site to representation to be synced. - - 'collection' must have synchronization enabled (globally or - project only) - - Used as a API endpoint from outside applications (Loader etc) - - Args: - collection (string): project name (must match DB) - representation_id (string): MongoDB _id value - site_name (string): name of configured and active site - - Returns: - throws ValueError if any issue - """ - if not self.get_sync_project_setting(collection): - raise ValueError("Project not configured") - - if not site_name: - site_name = self.DEFAULT_SITE - - self.reset_provider_for_file(collection, - representation_id, - site_name=site_name) - - # public facing API - def remove_site(self, collection, representation_id, site_name, - remove_local_files=False): - """ - Removes 'site_name' for particular 'representation_id' on - 'collection' - - Args: - collection (string): project name (must match DB) - representation_id (string): MongoDB _id value - site_name (string): name of configured and active site - remove_local_files (bool): remove only files for 'local_id' - site - - Returns: - throws ValueError if any issue - """ - if not self.get_sync_project_setting(collection): - raise ValueError("Project not configured") - - self.reset_provider_for_file(collection, - representation_id, - site_name=site_name, - remove=True) - if remove_local_files: - self._remove_local_file(collection, representation_id, site_name) - - def clear_project(self, collection, site_name): - """ - Clear 'collection' of 'site_name' and its local files - - Works only on real local sites, not on 'studio' - """ - query = { - "type": "representation", - "files.sites.name": site_name - } - - representations = list( - self.connection.database[collection].find(query)) - if not representations: - self.log.debug("No repre found") - return - - for repre in representations: - self.remove_site(collection, repre.get("_id"), site_name, True) - - def pause_representation(self, collection, representation_id, site_name): - """ - Sets 'representation_id' as paused, eg. no syncing should be - happening on it. - - Args: - collection (string): project name - representation_id (string): MongoDB objectId value - site_name (string): 'gdrive', 'studio' etc. - """ - log.info("Pausing SyncServer for {}".format(representation_id)) - self._paused_representations.add(representation_id) - self.reset_provider_for_file(collection, representation_id, - site_name=site_name, pause=True) - - def unpause_representation(self, collection, representation_id, site_name): - """ - Sets 'representation_id' as unpaused. - - Does not fail or warn if repre wasn't paused. - - Args: - collection (string): project name - representation_id (string): MongoDB objectId value - site_name (string): 'gdrive', 'studio' etc. - """ - log.info("Unpausing SyncServer for {}".format(representation_id)) - try: - self._paused_representations.remove(representation_id) - except KeyError: - pass - # self.paused_representations is not persistent - self.reset_provider_for_file(collection, representation_id, - site_name=site_name, pause=False) - - def is_representation_paused(self, representation_id, - check_parents=False, project_name=None): - """ - Returns if 'representation_id' is paused or not. - - Args: - representation_id (string): MongoDB objectId value - check_parents (bool): check if parent project or server itself - are not paused - project_name (string): project to check if paused - - if 'check_parents', 'project_name' should be set too - Returns: - (bool) - """ - condition = representation_id in self._paused_representations - if check_parents and project_name: - condition = condition or \ - self.is_project_paused(project_name) or \ - self.is_paused() - return condition - - def pause_project(self, project_name): - """ - Sets 'project_name' as paused, eg. no syncing should be - happening on all representation inside. - - Args: - project_name (string): collection name - """ - log.info("Pausing SyncServer for {}".format(project_name)) - self._paused_projects.add(project_name) - - def unpause_project(self, project_name): - """ - Sets 'project_name' as unpaused - - Does not fail or warn if project wasn't paused. - - Args: - project_name (string): collection name - """ - log.info("Unpausing SyncServer for {}".format(project_name)) - try: - self._paused_projects.remove(project_name) - except KeyError: - pass - - def is_project_paused(self, project_name, check_parents=False): - """ - Returns if 'project_name' is paused or not. - - Args: - project_name (string): collection name - check_parents (bool): check if server itself - is not paused - Returns: - (bool) - """ - condition = project_name in self._paused_projects - if check_parents: - condition = condition or self.is_paused() - return condition - - def pause_server(self): - """ - Pause sync server - - It won't check anything, not uploading/downloading... - """ - log.info("Pausing SyncServer") - self._paused = True - - def unpause_server(self): - """ - Unpause server - """ - log.info("Unpausing SyncServer") - self._paused = False - - def is_paused(self): - """ Is server paused """ - return self._paused - - def get_active_sites(self, project_name): - """ - Returns list of active sites for 'project_name'. - - By default it returns ['studio'], this site is default - and always present even if SyncServer is not enabled. (for publish) - - Used mainly for Local settings for user override. - - Args: - project_name (string): - - Returns: - (list) of strings - """ - return self.get_active_sites_from_settings( - get_project_settings(project_name)) - - def get_active_sites_from_settings(self, settings): - """ - List available active sites from incoming 'settings'. Used for - returning 'default' values for Local Settings - - Args: - settings (dict): full settings (global + project) - Returns: - (list) of strings - """ - sync_settings = self._parse_sync_settings_from_settings(settings) - - return self._get_active_sites_from_settings(sync_settings) - - def get_active_site(self, project_name): - """ - Returns active (mine) site for 'project_name' from settings - - Returns: - (string) - """ - active_site = self.get_sync_project_setting( - project_name)['config']['active_site'] - if active_site == self.LOCAL_SITE: - return get_local_site_id() - return active_site - - # remote sites - def get_remote_sites(self, project_name): - """ - Returns all remote sites configured on 'project_name'. - - If 'project_name' is not enabled for syncing returns []. - - Used by Local setting to allow user choose remote site. - - Args: - project_name (string): - - Returns: - (list) of strings - """ - return self.get_remote_sites_from_settings( - get_project_settings(project_name)) - - def get_remote_sites_from_settings(self, settings): - """ - Get remote sites for returning 'default' values for Local Settings - """ - sync_settings = self._parse_sync_settings_from_settings(settings) - - return self._get_remote_sites_from_settings(sync_settings) - - def get_remote_site(self, project_name): - """ - Returns remote (theirs) site for 'project_name' from settings - """ - remote_site = self.get_sync_project_setting( - project_name)['config']['remote_site'] - if remote_site == self.LOCAL_SITE: - return get_local_site_id() - - return remote_site - - """ End of Public API """ - - def get_local_file_path(self, collection, file_path): - """ - Externalized for app - """ - local_file_path, _ = self._resolve_paths(file_path, collection) - - return local_file_path - - def _get_remote_sites_from_settings(self, sync_settings): - if not self.enabled or not sync_settings['enabled']: - return [] - - remote_sites = [self.DEFAULT_SITE, self.LOCAL_SITE] - if sync_settings: - remote_sites.extend(sync_settings.get("sites").keys()) - - return list(set(remote_sites)) - - def _get_active_sites_from_settings(self, sync_settings): - sites = [self.DEFAULT_SITE] - if self.enabled and sync_settings['enabled']: - sites.append(self.LOCAL_SITE) - - return sites - - def connect_with_modules(self, *_a, **kw): - return - - def tray_init(self): - """ - Actual initialization of Sync Server. - - Called when tray is initialized, it checks if module should be - enabled. If not, no initialization necessary. - """ - if not self.enabled: - return - - self.sync_project_settings = None - self.lock = threading.Lock() - - self.connection = AvalonMongoDB() - self.connection.install() - - try: - self.set_sync_project_settings() - self.sync_server_thread = SyncServerThread(self) - from .tray.app import SyncServerWindow - self.widget = SyncServerWindow(self) - except ValueError: - log.info("No system setting for sync. Not syncing.", exc_info=True) - self.enabled = False - except KeyError: - log.info(( - "There are not set presets for SyncServer OR " - "Credentials provided are invalid, " - "no syncing possible"). - format(str(self.sync_project_settings)), exc_info=True) - self.enabled = False - - def tray_start(self): - """ - Triggered when Tray is started. - - Checks if configuration presets are available and if there is - any provider ('gdrive', 'S3') that is activated - (eg. has valid credentials). + except Exception as exp: + print(exp) + + target_folder = os.path.dirname(remote_file_path) + folder_id = remote_handler.create_folder(target_folder) + + if not folder_id: + err = "Folder {} wasn't created. Check permissions.". \ + format(target_folder) + raise NotADirectoryError(err) + + loop = asyncio.get_running_loop() + file_id = await loop.run_in_executor(None, + remote_handler.upload_file, + local_file_path, + remote_file_path, + module, + collection, + file, + representation, + remote_site_name, + True + ) + return file_id + + +async def download(module, collection, file, representation, provider_name, + remote_site_name, tree=None, preset=None): + """ + Downloads file to local folder denoted in representation.Context. + + Args: + module(SyncServerModule): object to run SyncServerModule API + collection (str): source collection + file (dictionary) : info about processed file + representation (dictionary): repr that 'file' belongs to + provider_name (string): 'gdrive' etc + site_name (string): site on provider, single provider(gdrive) could + have multiple sites (different accounts, credentials) + tree (dictionary): injected memory structure for performance + preset (dictionary): site config ('credentials_url', 'root'...) Returns: - None - """ - if self.sync_project_settings and self.enabled: - self.sync_server_thread.start() - else: - log.info("No presets or active providers. " + - "Synchronization not possible.") + (string) - 'name' of local file + """ + with module.lock: + remote_handler = lib.factory.get_provider(provider_name, + collection, + remote_site_name, + tree=tree, + presets=preset) - def tray_exit(self): - """ - Stops sync thread if running. + file_path = file.get("path", "") + local_file_path, remote_file_path = resolve_paths( + module, file_path, collection, remote_site_name, remote_handler + ) - Called from Module Manager - """ - if not self.sync_server_thread: - return + local_folder = os.path.dirname(local_file_path) + os.makedirs(local_folder, exist_ok=True) - if not self.is_running: - return - try: - log.info("Stopping sync server server") - self.sync_server_thread.is_running = False - self.sync_server_thread.stop() - except Exception: - log.warning( - "Error has happened during Killing sync server", - exc_info=True - ) + local_site = module.get_active_site(collection) - def tray_menu(self, parent_menu): - if not self.enabled: - return + loop = asyncio.get_running_loop() + file_id = await loop.run_in_executor(None, + remote_handler.download_file, + remote_file_path, + local_file_path, + module, + collection, + file, + representation, + local_site, + True + ) + return file_id - from Qt import QtWidgets - """Add menu or action to Tray(or parent)'s menu""" - action = QtWidgets.QAction("SyncServer", parent_menu) - action.triggered.connect(self.show_widget) - parent_menu.addAction(action) - parent_menu.addSeparator() - self.action_show_widget = action +def resolve_paths(module, file_path, collection, + remote_site_name=None, remote_handler=None): + """ + Returns tuple of local and remote file paths with {root} + placeholders replaced with proper values from Settings or Anatomy - @property - def is_running(self): - return self.sync_server_thread.is_running + Ejected here because of Python 2 hosts (GDriveHandler is an issue) - def get_anatomy(self, project_name): - """ - Get already created or newly created anatomy for project - - Args: - project_name (string): - - Return: - (Anatomy) - """ - return self._anatomies.get('project_name') or Anatomy(project_name) - - def set_sync_project_settings(self): - """ - Set sync_project_settings for all projects (caching) - - For performance - """ - sync_project_settings = {} - if not self.connection: - self.connection = AvalonMongoDB() - self.connection.install() - - for collection in self.connection.database.collection_names(False): - sync_settings = self._parse_sync_settings_from_settings( - get_project_settings(collection)) - if sync_settings: - default_sites = self._get_default_site_configs() - sync_settings['sites'].update(default_sites) - sync_project_settings[collection] = sync_settings - - if not sync_project_settings: - log.info("No enabled and configured projects for sync.") - - self.sync_project_settings = sync_project_settings - - def get_sync_project_settings(self, refresh=False): - """ - Collects all projects which have enabled syncing and their settings Args: - refresh (bool): refresh presets from settings - used when user - changes site in Local Settings or any time up-to-date values - are necessary + module(SyncServerModule): object to run SyncServerModule API + file_path(string): path with {root} + collection(string): project name + remote_site_name(string): remote site + remote_handler(AbstractProvider): implementation Returns: - (dict): of settings, keys are project names - {'projectA':{enabled: True, sites:{}...} - """ - # presets set already, do not call again and again - if refresh or not self.sync_project_settings: - self.set_sync_project_settings() + (string, string) - proper absolute paths, remote path is optional + """ + remote_file_path = '' + if remote_handler: + remote_file_path = remote_handler.resolve_path(file_path) - return self.sync_project_settings + local_handler = lib.factory.get_provider( + 'local_drive', collection, module.get_active_site(collection)) + local_file_path = local_handler.resolve_path(file_path) - def get_sync_project_setting(self, project_name): - """ Handles pulling sync_server's settings for enabled 'project_name' + return local_file_path, remote_file_path - Args: - project_name (str): used in project settings - Returns: - (dict): settings dictionary for the enabled project, - empty if no settings or sync is disabled - """ - # presets set already, do not call again and again - # self.log.debug("project preset {}".format(self.presets)) - if self.sync_project_settings and \ - self.sync_project_settings.get(project_name): - return self.sync_project_settings.get(project_name) - settings = get_project_settings(project_name) - return self._parse_sync_settings_from_settings(settings) +def site_is_working(module, project_name, site_name): + """ + Confirm that 'site_name' is configured correctly for 'project_name'. - def site_is_working(self, project_name, site_name): - """ - Confirm that 'site_name' is configured correctly for 'project_name' - Args: - project_name(string): - site_name(string): - Returns - (bool) - """ - if self._get_configured_sites(project_name).get(site_name): - return True - return False + Must be here as lib.factory access doesn't work in Python 2 hosts. - def _parse_sync_settings_from_settings(self, settings): - """ settings from api.get_project_settings, TOOD rename """ - sync_settings = settings.get("global").get("sync_server") - if not sync_settings: - log.info("No project setting not syncing.") - return {} - if sync_settings.get("enabled"): - return sync_settings + Args: + module (SyncServerModule) + project_name(string): + site_name(string): + Returns + (bool) + """ + if _get_configured_sites(module, project_name).get(site_name): + return True + return False + +def _get_configured_sites(module, project_name): + """ + Loops through settings and looks for configured sites and checks + its handlers for particular 'project_name'. + + Args: + project_setting(dict): dictionary from Settings + only_project_name(string, optional): only interested in + particular project + Returns: + (dict of dict) + {'ProjectA': {'studio':True, 'gdrive':False}} + """ + settings = module.get_sync_project_setting(project_name) + return _get_configured_sites_from_setting(module, project_name, settings) + + +def _get_configured_sites_from_setting(module, project_name, project_setting): + if not project_setting.get("enabled"): return {} - def _get_configured_sites(self, project_name): - """ - Loops through settings and looks for configured sites and checks - its handlers for particular 'project_name'. - - Args: - project_setting(dict): dictionary from Settings - only_project_name(string, optional): only interested in - particular project - Returns: - (dict of dict) - {'ProjectA': {'studio':True, 'gdrive':False}} - """ - settings = self.get_sync_project_setting(project_name) - return self._get_configured_sites_from_setting(settings) - - def _get_configured_sites_from_setting(self, project_setting): - if not project_setting.get("enabled"): - return {} - - initiated_handlers = {} - configured_sites = {} - all_sites = self._get_default_site_configs() - all_sites.update(project_setting.get("sites")) - for site_name, config in all_sites.items(): - handler = initiated_handlers. \ - get((config["provider"], site_name)) - if not handler: - handler = lib.factory.get_provider(config["provider"], - site_name, - presets=config) - initiated_handlers[(config["provider"], site_name)] = \ - handler - - if handler.is_active(): - configured_sites[site_name] = True - - return configured_sites - - def _get_default_site_configs(self): - """ - Returns skeleton settings for 'studio' and user's local site - """ - default_config = {'provider': 'local_drive'} - all_sites = {self.DEFAULT_SITE: default_config, - get_local_site_id(): default_config} - return all_sites - - def get_provider_for_site(self, project_name, site): - """ - Return provider name for site. - """ - site_preset = self.get_sync_project_setting(project_name)["sites"].\ - get(site) - if site_preset: - return site_preset["provider"] - - return "NA" - - @time_function - def get_sync_representations(self, collection, active_site, remote_site): - """ - Get representations that should be synced, these could be - recognised by presence of document in 'files.sites', where key is - a provider (GDrive, S3) and value is empty document or document - without 'created_dt' field. (Don't put null to 'created_dt'!). - - Querying of 'to-be-synched' files is offloaded to Mongod for - better performance. Goal is to get as few representations as - possible. - Args: - collection (string): name of collection (in most cases matches - project name - active_site (string): identifier of current active site (could be - 'local_0' when working from home, 'studio' when working in the - studio (default) - remote_site (string): identifier of remote site I want to sync to - - Returns: - (list) of dictionaries - """ - log.debug("Check representations for : {}".format(collection)) - self.connection.Session["AVALON_PROJECT"] = collection - # retry_cnt - number of attempts to sync specific file before giving up - retries_arr = self._get_retries_arr(collection) - query = { - "type": "representation", - "$or": [ - {"$and": [ - { - "files.sites": { - "$elemMatch": { - "name": active_site, - "created_dt": {"$exists": True} - } - }}, { - "files.sites": { - "$elemMatch": { - "name": {"$in": [remote_site]}, - "created_dt": {"$exists": False}, - "tries": {"$in": retries_arr} - } - } - }]}, - {"$and": [ - { - "files.sites": { - "$elemMatch": { - "name": active_site, - "created_dt": {"$exists": False}, - "tries": {"$in": retries_arr} - } - }}, { - "files.sites": { - "$elemMatch": { - "name": {"$in": [remote_site]}, - "created_dt": {"$exists": True} - } - } - } - ]} - ] - } - log.debug("active_site:{} - remote_site:{}".format(active_site, - remote_site)) - log.debug("query: {}".format(query)) - representations = self.connection.find(query) - - return representations - - def check_status(self, file, local_site, remote_site, config_preset): - """ - Check synchronization status for single 'file' of single - 'representation' by single 'provider'. - (Eg. check if 'scene.ma' of lookdev.v10 should be synced to GDrive - - Always is comparing local record, eg. site with - 'name' == self.presets[PROJECT_NAME]['config']["active_site"] - - Args: - file (dictionary): of file from representation in Mongo - local_site (string): - local side of compare (usually 'studio') - remote_site (string): - gdrive etc. - config_preset (dict): config about active site, retries - Returns: - (string) - one of SyncStatus - """ - sites = file.get("sites") or [] - # if isinstance(sites, list): # temporary, old format of 'sites' - # return SyncStatus.DO_NOTHING - _, remote_rec = self._get_site_rec(sites, remote_site) or {} - if remote_rec: # sync remote target - created_dt = remote_rec.get("created_dt") - if not created_dt: - tries = self._get_tries_count_from_rec(remote_rec) - # file will be skipped if unsuccessfully tried over threshold - # error metadata needs to be purged manually in DB to reset - if tries < int(config_preset["retry_cnt"]): - return SyncStatus.DO_UPLOAD - else: - _, local_rec = self._get_site_rec(sites, local_site) or {} - if not local_rec or not local_rec.get("created_dt"): - tries = self._get_tries_count_from_rec(local_rec) - # file will be skipped if unsuccessfully tried over - # threshold times, error metadata needs to be purged - # manually in DB to reset - if tries < int(config_preset["retry_cnt"]): - return SyncStatus.DO_DOWNLOAD - - return SyncStatus.DO_NOTHING - - async def upload(self, collection, file, representation, provider_name, - remote_site_name, tree=None, preset=None): - """ - Upload single 'file' of a 'representation' to 'provider'. - Source url is taken from 'file' portion, where {root} placeholder - is replaced by 'representation.Context.root' - Provider could be one of implemented in provider.py. - - Updates MongoDB, fills in id of file from provider (ie. file_id - from GDrive), 'created_dt' - time of upload - - 'provider_name' doesn't have to match to 'site_name', single - provider (GDrive) might have multiple sites ('projectA', - 'projectB') - - Args: - collection (str): source collection - file (dictionary): of file from representation in Mongo - representation (dictionary): of representation - provider_name (string): gdrive, gdc etc. - site_name (string): site on provider, single provider(gdrive) could - have multiple sites (different accounts, credentials) - tree (dictionary): injected memory structure for performance - preset (dictionary): site config ('credentials_url', 'root'...) - - """ - # create ids sequentially, upload file in parallel later - with self.lock: - # this part modifies structure on 'remote_site', only single - # thread can do that at a time, upload/download to prepared - # structure should be run in parallel - remote_handler = lib.factory.get_provider(provider_name, - remote_site_name, - tree=tree, - presets=preset) - - file_path = file.get("path", "") - local_file_path, remote_file_path = self._resolve_paths( - file_path, collection, remote_site_name, remote_handler - ) - - target_folder = os.path.dirname(remote_file_path) - folder_id = remote_handler.create_folder(target_folder) - - if not folder_id: - err = "Folder {} wasn't created. Check permissions.".\ - format(target_folder) - raise NotADirectoryError(err) - - loop = asyncio.get_running_loop() - file_id = await loop.run_in_executor(None, - remote_handler.upload_file, - local_file_path, - remote_file_path, - self, - collection, - file, - representation, - remote_site_name, - True - ) - return file_id - - async def download(self, collection, file, representation, provider_name, - remote_site_name, tree=None, preset=None): - """ - Downloads file to local folder denoted in representation.Context. - - Args: - collection (str): source collection - file (dictionary) : info about processed file - representation (dictionary): repr that 'file' belongs to - provider_name (string): 'gdrive' etc - site_name (string): site on provider, single provider(gdrive) could - have multiple sites (different accounts, credentials) - tree (dictionary): injected memory structure for performance - preset (dictionary): site config ('credentials_url', 'root'...) - - Returns: - (string) - 'name' of local file - """ - with self.lock: - remote_handler = lib.factory.get_provider(provider_name, - remote_site_name, - tree=tree, - presets=preset) - - file_path = file.get("path", "") - local_file_path, remote_file_path = self._resolve_paths( - file_path, collection, remote_site_name, remote_handler - ) - - local_folder = os.path.dirname(local_file_path) - os.makedirs(local_folder, exist_ok=True) - - local_site = self.get_active_site(collection) - - loop = asyncio.get_running_loop() - file_id = await loop.run_in_executor(None, - remote_handler.download_file, - remote_file_path, - local_file_path, - self, - collection, - file, - representation, - local_site, - True - ) - return file_id - - def update_db(self, collection, new_file_id, file, representation, - site, error=None, progress=None): - """ - Update 'provider' portion of records in DB with success (file_id) - or error (exception) - - Args: - collection (string): name of project - force to db connection as - each file might come from different collection - new_file_id (string): - file (dictionary): info about processed file (pulled from DB) - representation (dictionary): parent repr of file (from DB) - site (string): label ('gdrive', 'S3') - error (string): exception message - progress (float): 0-1 of progress of upload/download - - Returns: - None - """ - representation_id = representation.get("_id") - file_id = file.get("_id") - query = { - "_id": representation_id - } - - update = {} - if new_file_id: - update["$set"] = self._get_success_dict(new_file_id) - # reset previous errors if any - update["$unset"] = self._get_error_dict("", "", "") - elif progress is not None: - update["$set"] = self._get_progress_dict(progress) - else: - tries = self._get_tries_count(file, site) - tries += 1 - - update["$set"] = self._get_error_dict(error, tries) - - arr_filter = [ - {'s.name': site}, - {'f._id': ObjectId(file_id)} - ] - - self.connection.database[collection].update_one( - query, - update, - upsert=True, - array_filters=arr_filter - ) - - if progress is not None: - return - - status = 'failed' - error_str = 'with error {}'.format(error) - if new_file_id: - status = 'succeeded with id {}'.format(new_file_id) - error_str = '' - - source_file = file.get("path", "") - log.debug("File for {} - {source_file} process {status} {error_str}". - format(representation_id, - status=status, - source_file=source_file, - error_str=error_str)) - - def _get_file_info(self, files, _id): - """ - Return record from list of records which name matches to 'provider' - Could be possibly refactored with '_get_provider_rec' together. - - Args: - files (list): of dictionaries with info about published files - _id (string): _id of specific file - - Returns: - (int, dictionary): index from list and record with metadata - about site (if/when created, errors..) - OR (-1, None) if not present - """ - for index, rec in enumerate(files): - if rec.get("_id") == _id: - return index, rec - - return -1, None - - def _get_site_rec(self, sites, site_name): - """ - Return record from list of records which name matches to - 'remote_site_name' - - Args: - sites (list): of dictionaries - site_name (string): 'local_XXX', 'gdrive' - - Returns: - (int, dictionary): index from list and record with metadata - about site (if/when created, errors..) - OR (-1, None) if not present - """ - for index, rec in enumerate(sites): - if rec.get("name") == site_name: - return index, rec - - return -1, None - - def reset_provider_for_file(self, collection, representation_id, - side=None, file_id=None, site_name=None, - remove=False, pause=None): - """ - Reset information about synchronization for particular 'file_id' - and provider. - Useful for testing or forcing file to be reuploaded. - - 'side' and 'site_name' are disjunctive. - - 'side' is used for resetting local or remote side for - current user for repre. - - 'site_name' is used to set synchronization for particular site. - Should be used when repre should be synced to new site. - - Args: - collection (string): name of project (eg. collection) in DB - representation_id(string): _id of representation - file_id (string): file _id in representation - side (string): local or remote side - site_name (string): for adding new site - remove (bool): if True remove site altogether - pause (bool or None): if True - pause, False - unpause - - Returns: - throws ValueError - """ - query = { - "_id": ObjectId(representation_id) - } - - representation = list(self.connection.database[collection].find(query)) - if not representation: - raise ValueError("Representation {} not found in {}". - format(representation_id, collection)) - if side and site_name: - raise ValueError("Misconfiguration, only one of side and " + - "site_name arguments should be passed.") - - local_site = self.get_active_site(collection) - remote_site = self.get_remote_site(collection) - - if side: - if side == 'local': - site_name = local_site - else: - site_name = remote_site - - elem = {"name": site_name} - - if file_id: # reset site for particular file - self._reset_site_for_file(collection, query, - elem, file_id, site_name) - elif side: # reset site for whole representation - self._reset_site(collection, query, elem, site_name) - elif remove: # remove site for whole representation - self._remove_site(collection, query, representation, site_name) - elif pause is not None: - self._pause_unpause_site(collection, query, - representation, site_name, pause) - else: # add new site to all files for representation - self._add_site(collection, query, representation, elem, site_name) - - def _update_site(self, collection, query, update, arr_filter): - """ - Auxiliary method to call update_one function on DB - - Used for refactoring ugly reset_provider_for_file - """ - self.connection.database[collection].update_one( - query, - update, - upsert=True, - array_filters=arr_filter - ) - - def _reset_site_for_file(self, collection, query, - elem, file_id, site_name): - """ - Resets 'site_name' for 'file_id' on representation in 'query' on - 'collection' - """ - update = { - "$set": {"files.$[f].sites.$[s]": elem} - } - arr_filter = [ - {'s.name': site_name}, - {'f._id': ObjectId(file_id)} - ] - - self._update_site(collection, query, update, arr_filter) - - def _reset_site(self, collection, query, elem, site_name): - """ - Resets 'site_name' for all files of representation in 'query' - """ - update = { - "$set": {"files.$[].sites.$[s]": elem} - } - - arr_filter = [ - {'s.name': site_name} - ] - - self._update_site(collection, query, update, arr_filter) - - def _remove_site(self, collection, query, representation, site_name): - """ - Removes 'site_name' for 'representation' in 'query' - - Throws ValueError if 'site_name' not found on 'representation' - """ - found = False - for file in representation.pop().get("files"): - for site in file.get("sites"): - if site["name"] == site_name: - found = True - break - if not found: - msg = "Site {} not found".format(site_name) - log.info(msg) - raise ValueError(msg) - - update = { - "$pull": {"files.$[].sites": {"name": site_name}} - } - arr_filter = [] - - self._update_site(collection, query, update, arr_filter) - - def _pause_unpause_site(self, collection, query, - representation, site_name, pause): - """ - Pauses/unpauses all files for 'representation' based on 'pause' - - Throws ValueError if 'site_name' not found on 'representation' - """ - found = False - site = None - for file in representation.pop().get("files"): - for site in file.get("sites"): - if site["name"] == site_name: - found = True - break - if not found: - msg = "Site {} not found".format(site_name) - log.info(msg) - raise ValueError(msg) - - if pause: - site['paused'] = pause - else: - if site.get('paused'): - site.pop('paused') - - update = { - "$set": {"files.$[].sites.$[s]": site} - } - - arr_filter = [ - {'s.name': site_name} - ] - - self._update_site(collection, query, update, arr_filter) - - def _add_site(self, collection, query, representation, elem, site_name): - """ - Adds 'site_name' to 'representation' on 'collection' - - Throws ValueError if already present - """ - for file in representation.pop().get("files"): - for site in file.get("sites"): - if site["name"] == site_name: - msg = "Site {} already present".format(site_name) - log.info(msg) - raise ValueError(msg) - - update = { - "$push": {"files.$[].sites": elem} - } - - arr_filter = [] - - self._update_site(collection, query, update, arr_filter) - - def _remove_local_file(self, collection, representation_id, site_name): - """ - Removes all local files for 'site_name' of 'representation_id' - - Args: - collection (string): project name (must match DB) - representation_id (string): MongoDB _id value - site_name (string): name of configured and active site - - Returns: - only logs, catches IndexError and OSError - """ - my_local_site = get_local_site_id() - if my_local_site != site_name: - self.log.warning("Cannot remove non local file for {}". - format(site_name)) - return - - provider_name = self.get_provider_for_site(collection, site_name) - handler = lib.factory.get_provider(provider_name, site_name) - - if handler and isinstance(handler, LocalDriveHandler): - query = { - "_id": ObjectId(representation_id) - } - - representation = list( - self.connection.database[collection].find(query)) - if not representation: - self.log.debug("No repre {} found".format( - representation_id)) - return - - representation = representation.pop() - local_file_path = '' - for file in representation.get("files"): - local_file_path, _ = self._resolve_paths(file.get("path", ""), - collection - ) - try: - self.log.debug("Removing {}".format(local_file_path)) - os.remove(local_file_path) - except IndexError: - msg = "No file set for {}".format(representation_id) - self.log.debug(msg) - raise ValueError(msg) - except OSError: - msg = "File {} cannot be removed".format(file["path"]) - self.log.warning(msg) - raise ValueError(msg) - - try: - folder = os.path.dirname(local_file_path) - os.rmdir(folder) - except OSError: - msg = "folder {} cannot be removed".format(folder) - self.log.warning(msg) - raise ValueError(msg) - - def get_loop_delay(self, project_name): - """ - Return count of seconds before next synchronization loop starts - after finish of previous loop. - Returns: - (int): in seconds - """ - ld = self.sync_project_settings[project_name]["config"]["loop_delay"] - return int(ld) - - def show_widget(self): - """Show dialog to enter credentials""" - self.widget.show() - - def _get_success_dict(self, new_file_id): - """ - Provide success metadata ("id", "created_dt") to be stored in Db. - Used in $set: "DICT" part of query. - Sites are array inside of array(file), so real indexes for both - file and site are needed for upgrade in DB. - Args: - new_file_id: id of created file - Returns: - (dictionary) - """ - val = {"files.$[f].sites.$[s].id": new_file_id, - "files.$[f].sites.$[s].created_dt": datetime.now()} - return val - - def _get_error_dict(self, error="", tries="", progress=""): - """ - Provide error metadata to be stored in Db. - Used for set (error and tries provided) or unset mode. - Args: - error: (string) - message - tries: how many times failed - Returns: - (dictionary) - """ - val = {"files.$[f].sites.$[s].last_failed_dt": datetime.now(), - "files.$[f].sites.$[s].error": error, - "files.$[f].sites.$[s].tries": tries, - "files.$[f].sites.$[s].progress": progress - } - return val - - def _get_tries_count_from_rec(self, rec): - """ - Get number of failed attempts to sync from site record - Args: - rec (dictionary): info about specific site record - Returns: - (int) - number of failed attempts - """ - if not rec: - return 0 - return rec.get("tries", 0) - - def _get_tries_count(self, file, provider): - """ - Get number of failed attempts to sync - Args: - file (dictionary): info about specific file - provider (string): name of site ('gdrive' or specific user site) - Returns: - (int) - number of failed attempts - """ - _, rec = self._get_site_rec(file.get("sites", []), provider) - return rec.get("tries", 0) - - def _get_progress_dict(self, progress): - """ - Provide progress metadata to be stored in Db. - Used during upload/download for GUI to show. - Args: - progress: (float) - 0-1 progress of upload/download - Returns: - (dictionary) - """ - val = {"files.$[f].sites.$[s].progress": progress} - return val - - def _resolve_paths(self, file_path, collection, - remote_site_name=None, remote_handler=None): - """ - Returns tuple of local and remote file paths with {root} - placeholders replaced with proper values from Settings or Anatomy - - Args: - file_path(string): path with {root} - collection(string): project name - remote_site_name(string): remote site - remote_handler(AbstractProvider): implementation - Returns: - (string, string) - proper absolute paths - """ - remote_file_path = '' - if remote_handler: - root_configs = self._get_roots_config(self.sync_project_settings, - collection, - remote_site_name) - - remote_file_path = remote_handler.resolve_path(file_path, - root_configs) - - local_handler = lib.factory.get_provider( - 'local_drive', self.get_active_site(collection)) - local_file_path = local_handler.resolve_path( - file_path, None, self.get_anatomy(collection)) - - return local_file_path, remote_file_path - - def _get_retries_arr(self, project_name): - """ - Returns array with allowed values in 'tries' field. If repre - contains these values, it means it was tried to be synchronized - but failed. We try up to 'self.presets["retry_cnt"]' times before - giving up and skipping representation. - Returns: - (list) - """ - retry_cnt = self.sync_project_settings[project_name].\ - get("config")["retry_cnt"] - arr = [i for i in range(int(retry_cnt))] - arr.append(None) - - return arr - - def _get_roots_config(self, presets, project_name, site_name): - """ - Returns configured root(s) for 'project_name' and 'site_name' from - settings ('presets') - """ - return presets[project_name]['sites'][site_name]['root'] - + initiated_handlers = {} + configured_sites = {} + all_sites = module._get_default_site_configs() + all_sites.update(project_setting.get("sites")) + for site_name, config in all_sites.items(): + handler = initiated_handlers. \ + get((config["provider"], site_name)) + if not handler: + handler = lib.factory.get_provider(config["provider"], + project_name, + site_name, + presets=config) + initiated_handlers[(config["provider"], site_name)] = \ + handler + + if handler.is_active(): + configured_sites[site_name] = True + + return configured_sites class SyncServerThread(threading.Thread): """ @@ -1437,7 +271,7 @@ class SyncServerThread(threading.Thread): import time start_time = None self.module.set_sync_project_settings() # clean cache - for collection, preset in self.module.get_sync_project_settings().\ + for collection, preset in self.module.sync_project_settings.\ items(): start_time = time.time() local_site, remote_site = self._working_sites(collection) @@ -1462,6 +296,7 @@ class SyncServerThread(threading.Thread): site_preset = preset.get('sites')[remote_site] remote_provider = site_preset['provider'] handler = lib.factory.get_provider(remote_provider, + collection, remote_site, presets=site_preset) limit = lib.factory.get_provider_batch_limit( @@ -1491,13 +326,14 @@ class SyncServerThread(threading.Thread): tree = handler.get_tree() limit -= 1 task = asyncio.create_task( - self.module.upload(collection, - file, - sync, - remote_provider, - remote_site, - tree, - site_preset)) + upload(self.module, + collection, + file, + sync, + remote_provider, + remote_site, + tree, + site_preset)) task_files_to_process.append(task) # store info for exception handlingy files_processed_info.append((file, @@ -1510,13 +346,14 @@ class SyncServerThread(threading.Thread): tree = handler.get_tree() limit -= 1 task = asyncio.create_task( - self.module.download(collection, - file, - sync, - remote_provider, - remote_site, - tree, - site_preset)) + download(self.module, + collection, + file, + sync, + remote_provider, + remote_site, + tree, + site_preset)) task_files_to_process.append(task) files_processed_info.append((file, @@ -1592,8 +429,8 @@ class SyncServerThread(threading.Thread): remote_site)) return None, None - if not all([self.module.site_is_working(collection, local_site), - self.module.site_is_working(collection, remote_site)]): + if not all([site_is_working(self.module, collection, local_site), + site_is_working(self.module, collection, remote_site)]): log.debug("Some of the sites {} - {} is not ".format(local_site, remote_site) + "working properly") diff --git a/openpype/modules/sync_server/sync_server_module.py b/openpype/modules/sync_server/sync_server_module.py new file mode 100644 index 0000000000..4b4b3517ee --- /dev/null +++ b/openpype/modules/sync_server/sync_server_module.py @@ -0,0 +1,1194 @@ +import os +from bson.objectid import ObjectId +from datetime import datetime +import threading + +from avalon.api import AvalonMongoDB + +from .. import PypeModule, ITrayModule +from openpype.api import ( + Anatomy, + get_project_settings, + get_local_site_id) +from openpype.lib import PypeLogger + +from .providers.local_drive import LocalDriveHandler + +from .utils import time_function, SyncStatus + + +log = PypeLogger().get_logger("SyncServer") + + +class SyncServerModule(PypeModule, ITrayModule): + """ + Synchronization server that is syncing published files from local to + any of implemented providers (like GDrive, S3 etc.) + Runs in the background and checks all representations, looks for files + that are marked to be in different location than 'studio' (temporary), + checks if 'created_dt' field is present denoting successful sync + with provider destination. + Sites structure is created during publish OR by calling 'add_site' + method. + + By default it will always contain 1 record with + "name" == self.presets["active_site"] and + filled "created_dt" AND 1 or multiple records for all defined + remote sites, where "created_dt" is not present. + This highlights that file should be uploaded to + remote destination + + ''' - example of synced file test_Cylinder_lookMain_v010.ma to GDrive + "files" : [ + { + "path" : "{root}/Test/Assets/Cylinder/publish/look/lookMain/v010/ + test_Cylinder_lookMain_v010.ma", + "_id" : ObjectId("5eeb25e411e06a16209ab78f"), + "hash" : "test_Cylinder_lookMain_v010,ma|1592468963,24|4822", + "size" : NumberLong(4822), + "sites" : [ + { + "name": "john_local_XD4345", + "created_dt" : ISODate("2020-05-22T08:05:44.000Z") + }, + { + "id" : ObjectId("5eeb25e411e06a16209ab78f"), + "name": "gdrive", + "created_dt" : ISODate("2020-05-55T08:54:35.833Z") + ] + } + }, + ''' + Each Tray app has assigned its own self.presets["local_id"] + used in sites as a name. + Tray is searching only for records where name matches its + self.presets["active_site"] + self.presets["remote_site"]. + "active_site" could be storage in studio ('studio'), or specific + "local_id" when user is working disconnected from home. + If the local record has its "created_dt" filled, it is a source and + process will try to upload the file to all defined remote sites. + + Remote files "id" is real id that could be used in appropriate API. + Local files have "id" too, for conformity, contains just file name. + It is expected that multiple providers will be implemented in separate + classes and registered in 'providers.py'. + + """ + # limit querying DB to look for X number of representations that should + # be sync, we try to run more loops with less records + # actual number of files synced could be lower as providers can have + # different limits imposed by its API + # set 0 to no limit + REPRESENTATION_LIMIT = 100 + DEFAULT_SITE = 'studio' + LOCAL_SITE = 'local' + LOG_PROGRESS_SEC = 5 # how often log progress to DB + + name = "sync_server" + label = "Sync Server" + + def initialize(self, module_settings): + """ + Called during Module Manager creation. + + Collects needed data, checks asyncio presence. + Sets 'enabled' according to global settings for the module. + Shouldnt be doing any initialization, thats a job for 'tray_init' + """ + self.enabled = module_settings[self.name]["enabled"] + + # some parts of code need to run sequentially, not in async + self.lock = None + # settings for all enabled projects for sync + self._sync_project_settings = None + self.sync_server_thread = None # asyncio requires new thread + + self.action_show_widget = None + self._paused = False + self._paused_projects = set() + self._paused_representations = set() + self._anatomies = {} + + self._connection = None + + """ Start of Public API """ + def add_site(self, collection, representation_id, site_name=None, + force=False): + """ + Adds new site to representation to be synced. + + 'collection' must have synchronization enabled (globally or + project only) + + Used as a API endpoint from outside applications (Loader etc) + + Args: + collection (string): project name (must match DB) + representation_id (string): MongoDB _id value + site_name (string): name of configured and active site + force (bool): reset site if exists + + Returns: + throws ValueError if any issue + """ + if not self.get_sync_project_setting(collection): + raise ValueError("Project not configured") + + if not site_name: + site_name = self.DEFAULT_SITE + + self.reset_provider_for_file(collection, + representation_id, + site_name=site_name, force=force) + + # public facing API + def remove_site(self, collection, representation_id, site_name, + remove_local_files=False): + """ + Removes 'site_name' for particular 'representation_id' on + 'collection' + + Args: + collection (string): project name (must match DB) + representation_id (string): MongoDB _id value + site_name (string): name of configured and active site + remove_local_files (bool): remove only files for 'local_id' + site + + Returns: + throws ValueError if any issue + """ + if not self.get_sync_project_setting(collection): + raise ValueError("Project not configured") + + self.reset_provider_for_file(collection, + representation_id, + site_name=site_name, + remove=True) + if remove_local_files: + self._remove_local_file(collection, representation_id, site_name) + + def clear_project(self, collection, site_name): + """ + Clear 'collection' of 'site_name' and its local files + + Works only on real local sites, not on 'studio' + """ + query = { + "type": "representation", + "files.sites.name": site_name + } + + representations = list( + self.connection.database[collection].find(query)) + if not representations: + self.log.debug("No repre found") + return + + for repre in representations: + self.remove_site(collection, repre.get("_id"), site_name, True) + + def pause_representation(self, collection, representation_id, site_name): + """ + Sets 'representation_id' as paused, eg. no syncing should be + happening on it. + + Args: + collection (string): project name + representation_id (string): MongoDB objectId value + site_name (string): 'gdrive', 'studio' etc. + """ + log.info("Pausing SyncServer for {}".format(representation_id)) + self._paused_representations.add(representation_id) + self.reset_provider_for_file(collection, representation_id, + site_name=site_name, pause=True) + + def unpause_representation(self, collection, representation_id, site_name): + """ + Sets 'representation_id' as unpaused. + + Does not fail or warn if repre wasn't paused. + + Args: + collection (string): project name + representation_id (string): MongoDB objectId value + site_name (string): 'gdrive', 'studio' etc. + """ + log.info("Unpausing SyncServer for {}".format(representation_id)) + try: + self._paused_representations.remove(representation_id) + except KeyError: + pass + # self.paused_representations is not persistent + self.reset_provider_for_file(collection, representation_id, + site_name=site_name, pause=False) + + def is_representation_paused(self, representation_id, + check_parents=False, project_name=None): + """ + Returns if 'representation_id' is paused or not. + + Args: + representation_id (string): MongoDB objectId value + check_parents (bool): check if parent project or server itself + are not paused + project_name (string): project to check if paused + + if 'check_parents', 'project_name' should be set too + Returns: + (bool) + """ + condition = representation_id in self._paused_representations + if check_parents and project_name: + condition = condition or \ + self.is_project_paused(project_name) or \ + self.is_paused() + return condition + + def pause_project(self, project_name): + """ + Sets 'project_name' as paused, eg. no syncing should be + happening on all representation inside. + + Args: + project_name (string): collection name + """ + log.info("Pausing SyncServer for {}".format(project_name)) + self._paused_projects.add(project_name) + + def unpause_project(self, project_name): + """ + Sets 'project_name' as unpaused + + Does not fail or warn if project wasn't paused. + + Args: + project_name (string): collection name + """ + log.info("Unpausing SyncServer for {}".format(project_name)) + try: + self._paused_projects.remove(project_name) + except KeyError: + pass + + def is_project_paused(self, project_name, check_parents=False): + """ + Returns if 'project_name' is paused or not. + + Args: + project_name (string): collection name + check_parents (bool): check if server itself + is not paused + Returns: + (bool) + """ + condition = project_name in self._paused_projects + if check_parents: + condition = condition or self.is_paused() + return condition + + def pause_server(self): + """ + Pause sync server + + It won't check anything, not uploading/downloading... + """ + log.info("Pausing SyncServer") + self._paused = True + + def unpause_server(self): + """ + Unpause server + """ + log.info("Unpausing SyncServer") + self._paused = False + + def is_paused(self): + """ Is server paused """ + return self._paused + + def get_active_sites(self, project_name): + """ + Returns list of active sites for 'project_name'. + + By default it returns ['studio'], this site is default + and always present even if SyncServer is not enabled. (for publish) + + Used mainly for Local settings for user override. + + Args: + project_name (string): + + Returns: + (list) of strings + """ + return self.get_active_sites_from_settings( + get_project_settings(project_name)) + + def get_active_sites_from_settings(self, settings): + """ + List available active sites from incoming 'settings'. Used for + returning 'default' values for Local Settings + + Args: + settings (dict): full settings (global + project) + Returns: + (list) of strings + """ + sync_settings = self._parse_sync_settings_from_settings(settings) + + return self._get_enabled_sites_from_settings(sync_settings) + + def get_configurable_items_for_site(self, project_name, site_name): + """ + Returns list of items that should be configurable by User + + Returns: + (list of dict) + [{key:"root", label:"root", value:"valueFromSettings"}] + """ + # if project_name is None: ..for get_default_project_settings + # return handler.get_configurable_items() + pass + + def get_active_site(self, project_name): + """ + Returns active (mine) site for 'project_name' from settings + + Returns: + (string) + """ + active_site = self.get_sync_project_setting( + project_name)['config']['active_site'] + if active_site == self.LOCAL_SITE: + return get_local_site_id() + return active_site + + # remote sites + def get_remote_sites(self, project_name): + """ + Returns all remote sites configured on 'project_name'. + + If 'project_name' is not enabled for syncing returns []. + + Used by Local setting to allow user choose remote site. + + Args: + project_name (string): + + Returns: + (list) of strings + """ + return self.get_remote_sites_from_settings( + get_project_settings(project_name)) + + def get_remote_sites_from_settings(self, settings): + """ + Get remote sites for returning 'default' values for Local Settings + """ + sync_settings = self._parse_sync_settings_from_settings(settings) + + return self._get_remote_sites_from_settings(sync_settings) + + def get_remote_site(self, project_name): + """ + Returns remote (theirs) site for 'project_name' from settings + """ + remote_site = self.get_sync_project_setting( + project_name)['config']['remote_site'] + if remote_site == self.LOCAL_SITE: + return get_local_site_id() + + return remote_site + + """ End of Public API """ + + def get_local_file_path(self, collection, site_name, file_path): + """ + Externalized for app + """ + handler = LocalDriveHandler(collection, site_name) + local_file_path = handler.resolve_path(file_path) + + return local_file_path + + def _get_remote_sites_from_settings(self, sync_settings): + if not self.enabled or not sync_settings['enabled']: + return [] + + remote_sites = [self.DEFAULT_SITE, self.LOCAL_SITE] + if sync_settings: + remote_sites.extend(sync_settings.get("sites").keys()) + + return list(set(remote_sites)) + + def _get_enabled_sites_from_settings(self, sync_settings): + sites = [self.DEFAULT_SITE] + if self.enabled and sync_settings['enabled']: + sites.append(self.LOCAL_SITE) + + return sites + + def connect_with_modules(self, *_a, **kw): + return + + def tray_init(self): + """ + Actual initialization of Sync Server. + + Called when tray is initialized, it checks if module should be + enabled. If not, no initialization necessary. + """ + # import only in tray, because of Python2 hosts + from .sync_server import SyncServerThread + + if not self.enabled: + return + + self.lock = threading.Lock() + + try: + self.sync_server_thread = SyncServerThread(self) + from .tray.app import SyncServerWindow + self.widget = SyncServerWindow(self) + except ValueError: + log.info("No system setting for sync. Not syncing.", exc_info=True) + self.enabled = False + except KeyError: + log.info(( + "There are not set presets for SyncServer OR " + "Credentials provided are invalid, " + "no syncing possible"). + format(str(self.sync_project_settings)), exc_info=True) + self.enabled = False + + def tray_start(self): + """ + Triggered when Tray is started. + + Checks if configuration presets are available and if there is + any provider ('gdrive', 'S3') that is activated + (eg. has valid credentials). + + Returns: + None + """ + if self.sync_project_settings and self.enabled: + self.sync_server_thread.start() + else: + log.info("No presets or active providers. " + + "Synchronization not possible.") + + def tray_exit(self): + """ + Stops sync thread if running. + + Called from Module Manager + """ + if not self.sync_server_thread: + return + + if not self.is_running: + return + try: + log.info("Stopping sync server server") + self.sync_server_thread.is_running = False + self.sync_server_thread.stop() + except Exception: + log.warning( + "Error has happened during Killing sync server", + exc_info=True + ) + + def tray_menu(self, parent_menu): + if not self.enabled: + return + + from Qt import QtWidgets + """Add menu or action to Tray(or parent)'s menu""" + action = QtWidgets.QAction("SyncServer", parent_menu) + action.triggered.connect(self.show_widget) + parent_menu.addAction(action) + parent_menu.addSeparator() + + self.action_show_widget = action + + @property + def is_running(self): + return self.sync_server_thread.is_running + + def get_anatomy(self, project_name): + """ + Get already created or newly created anatomy for project + + Args: + project_name (string): + + Return: + (Anatomy) + """ + return self._anatomies.get('project_name') or Anatomy(project_name) + + @property + def connection(self): + if self._connection is None: + self._connection = AvalonMongoDB() + + return self._connection + + @property + def sync_project_settings(self): + if self._sync_project_settings is None: + self.set_sync_project_settings() + + return self._sync_project_settings + + def set_sync_project_settings(self): + """ + Set sync_project_settings for all projects (caching) + + For performance + """ + sync_project_settings = {} + + for collection in self.connection.database.collection_names(False): + sync_settings = self._parse_sync_settings_from_settings( + get_project_settings(collection)) + if sync_settings: + default_sites = self._get_default_site_configs() + sync_settings['sites'].update(default_sites) + sync_project_settings[collection] = sync_settings + + if not sync_project_settings: + log.info("No enabled and configured projects for sync.") + + self._sync_project_settings = sync_project_settings + + def get_sync_project_setting(self, project_name): + """ Handles pulling sync_server's settings for enabled 'project_name' + + Args: + project_name (str): used in project settings + Returns: + (dict): settings dictionary for the enabled project, + empty if no settings or sync is disabled + """ + # presets set already, do not call again and again + # self.log.debug("project preset {}".format(self.presets)) + if self.sync_project_settings and \ + self.sync_project_settings.get(project_name): + return self.sync_project_settings.get(project_name) + + settings = get_project_settings(project_name) + return self._parse_sync_settings_from_settings(settings) + + def _parse_sync_settings_from_settings(self, settings): + """ settings from api.get_project_settings, TOOD rename """ + sync_settings = settings.get("global").get("sync_server") + if not sync_settings: + log.info("No project setting not syncing.") + return {} + if sync_settings.get("enabled"): + return sync_settings + + return {} + + def _get_default_site_configs(self): + """ + Returns skeleton settings for 'studio' and user's local site + """ + default_config = {'provider': 'local_drive'} + all_sites = {self.DEFAULT_SITE: default_config, + get_local_site_id(): default_config} + return all_sites + + def get_provider_for_site(self, project_name, site): + """ + Return provider name for site. + """ + site_preset = self.get_sync_project_setting(project_name)["sites"].\ + get(site) + if site_preset: + return site_preset["provider"] + + return "NA" + + @time_function + def get_sync_representations(self, collection, active_site, remote_site): + """ + Get representations that should be synced, these could be + recognised by presence of document in 'files.sites', where key is + a provider (GDrive, S3) and value is empty document or document + without 'created_dt' field. (Don't put null to 'created_dt'!). + + Querying of 'to-be-synched' files is offloaded to Mongod for + better performance. Goal is to get as few representations as + possible. + Args: + collection (string): name of collection (in most cases matches + project name + active_site (string): identifier of current active site (could be + 'local_0' when working from home, 'studio' when working in the + studio (default) + remote_site (string): identifier of remote site I want to sync to + + Returns: + (list) of dictionaries + """ + log.debug("Check representations for : {}".format(collection)) + self.connection.Session["AVALON_PROJECT"] = collection + # retry_cnt - number of attempts to sync specific file before giving up + retries_arr = self._get_retries_arr(collection) + query = { + "type": "representation", + "$or": [ + {"$and": [ + { + "files.sites": { + "$elemMatch": { + "name": active_site, + "created_dt": {"$exists": True} + } + }}, { + "files.sites": { + "$elemMatch": { + "name": {"$in": [remote_site]}, + "created_dt": {"$exists": False}, + "tries": {"$in": retries_arr} + } + } + }]}, + {"$and": [ + { + "files.sites": { + "$elemMatch": { + "name": active_site, + "created_dt": {"$exists": False}, + "tries": {"$in": retries_arr} + } + }}, { + "files.sites": { + "$elemMatch": { + "name": {"$in": [remote_site]}, + "created_dt": {"$exists": True} + } + } + } + ]} + ] + } + log.debug("active_site:{} - remote_site:{}".format(active_site, + remote_site)) + log.debug("query: {}".format(query)) + representations = self.connection.find(query) + + return representations + + def check_status(self, file, local_site, remote_site, config_preset): + """ + Check synchronization status for single 'file' of single + 'representation' by single 'provider'. + (Eg. check if 'scene.ma' of lookdev.v10 should be synced to GDrive + + Always is comparing local record, eg. site with + 'name' == self.presets[PROJECT_NAME]['config']["active_site"] + + Args: + file (dictionary): of file from representation in Mongo + local_site (string): - local side of compare (usually 'studio') + remote_site (string): - gdrive etc. + config_preset (dict): config about active site, retries + Returns: + (string) - one of SyncStatus + """ + sites = file.get("sites") or [] + # if isinstance(sites, list): # temporary, old format of 'sites' + # return SyncStatus.DO_NOTHING + _, remote_rec = self._get_site_rec(sites, remote_site) or {} + if remote_rec: # sync remote target + created_dt = remote_rec.get("created_dt") + if not created_dt: + tries = self._get_tries_count_from_rec(remote_rec) + # file will be skipped if unsuccessfully tried over threshold + # error metadata needs to be purged manually in DB to reset + if tries < int(config_preset["retry_cnt"]): + return SyncStatus.DO_UPLOAD + else: + _, local_rec = self._get_site_rec(sites, local_site) or {} + if not local_rec or not local_rec.get("created_dt"): + tries = self._get_tries_count_from_rec(local_rec) + # file will be skipped if unsuccessfully tried over + # threshold times, error metadata needs to be purged + # manually in DB to reset + if tries < int(config_preset["retry_cnt"]): + return SyncStatus.DO_DOWNLOAD + + return SyncStatus.DO_NOTHING + + def update_db(self, collection, new_file_id, file, representation, + site, error=None, progress=None): + """ + Update 'provider' portion of records in DB with success (file_id) + or error (exception) + + Args: + collection (string): name of project - force to db connection as + each file might come from different collection + new_file_id (string): + file (dictionary): info about processed file (pulled from DB) + representation (dictionary): parent repr of file (from DB) + site (string): label ('gdrive', 'S3') + error (string): exception message + progress (float): 0-1 of progress of upload/download + + Returns: + None + """ + representation_id = representation.get("_id") + file_id = file.get("_id") + query = { + "_id": representation_id + } + + update = {} + if new_file_id: + update["$set"] = self._get_success_dict(new_file_id) + # reset previous errors if any + update["$unset"] = self._get_error_dict("", "", "") + elif progress is not None: + update["$set"] = self._get_progress_dict(progress) + else: + tries = self._get_tries_count(file, site) + tries += 1 + + update["$set"] = self._get_error_dict(error, tries) + + arr_filter = [ + {'s.name': site}, + {'f._id': ObjectId(file_id)} + ] + + self.connection.database[collection].update_one( + query, + update, + upsert=True, + array_filters=arr_filter + ) + + if progress is not None: + return + + status = 'failed' + error_str = 'with error {}'.format(error) + if new_file_id: + status = 'succeeded with id {}'.format(new_file_id) + error_str = '' + + source_file = file.get("path", "") + log.debug("File for {} - {source_file} process {status} {error_str}". + format(representation_id, + status=status, + source_file=source_file, + error_str=error_str)) + + def _get_file_info(self, files, _id): + """ + Return record from list of records which name matches to 'provider' + Could be possibly refactored with '_get_provider_rec' together. + + Args: + files (list): of dictionaries with info about published files + _id (string): _id of specific file + + Returns: + (int, dictionary): index from list and record with metadata + about site (if/when created, errors..) + OR (-1, None) if not present + """ + for index, rec in enumerate(files): + if rec.get("_id") == _id: + return index, rec + + return -1, None + + def _get_site_rec(self, sites, site_name): + """ + Return record from list of records which name matches to + 'remote_site_name' + + Args: + sites (list): of dictionaries + site_name (string): 'local_XXX', 'gdrive' + + Returns: + (int, dictionary): index from list and record with metadata + about site (if/when created, errors..) + OR (-1, None) if not present + """ + for index, rec in enumerate(sites): + if rec.get("name") == site_name: + return index, rec + + return -1, None + + def reset_provider_for_file(self, collection, representation_id, + side=None, file_id=None, site_name=None, + remove=False, pause=None, force=False): + """ + Reset information about synchronization for particular 'file_id' + and provider. + Useful for testing or forcing file to be reuploaded. + + 'side' and 'site_name' are disjunctive. + + 'side' is used for resetting local or remote side for + current user for repre. + + 'site_name' is used to set synchronization for particular site. + Should be used when repre should be synced to new site. + + Args: + collection (string): name of project (eg. collection) in DB + representation_id(string): _id of representation + file_id (string): file _id in representation + side (string): local or remote side + site_name (string): for adding new site + remove (bool): if True remove site altogether + pause (bool or None): if True - pause, False - unpause + force (bool): hard reset - currently only for add_site + + Returns: + throws ValueError + """ + query = { + "_id": ObjectId(representation_id) + } + + representation = list(self.connection.database[collection].find(query)) + if not representation: + raise ValueError("Representation {} not found in {}". + format(representation_id, collection)) + if side and site_name: + raise ValueError("Misconfiguration, only one of side and " + + "site_name arguments should be passed.") + + local_site = self.get_active_site(collection) + remote_site = self.get_remote_site(collection) + + if side: + if side == 'local': + site_name = local_site + else: + site_name = remote_site + + elem = {"name": site_name} + + if file_id: # reset site for particular file + self._reset_site_for_file(collection, query, + elem, file_id, site_name) + elif side: # reset site for whole representation + self._reset_site(collection, query, elem, site_name) + elif remove: # remove site for whole representation + self._remove_site(collection, query, representation, site_name) + elif pause is not None: + self._pause_unpause_site(collection, query, + representation, site_name, pause) + else: # add new site to all files for representation + self._add_site(collection, query, representation, elem, site_name, + force) + + def _update_site(self, collection, query, update, arr_filter): + """ + Auxiliary method to call update_one function on DB + + Used for refactoring ugly reset_provider_for_file + """ + self.connection.database[collection].update_one( + query, + update, + upsert=True, + array_filters=arr_filter + ) + + def _reset_site_for_file(self, collection, query, + elem, file_id, site_name): + """ + Resets 'site_name' for 'file_id' on representation in 'query' on + 'collection' + """ + update = { + "$set": {"files.$[f].sites.$[s]": elem} + } + arr_filter = [ + {'s.name': site_name}, + {'f._id': ObjectId(file_id)} + ] + + self._update_site(collection, query, update, arr_filter) + + def _reset_site(self, collection, query, elem, site_name): + """ + Resets 'site_name' for all files of representation in 'query' + """ + update = { + "$set": {"files.$[].sites.$[s]": elem} + } + + arr_filter = [ + {'s.name': site_name} + ] + + self._update_site(collection, query, update, arr_filter) + + def _remove_site(self, collection, query, representation, site_name): + """ + Removes 'site_name' for 'representation' in 'query' + + Throws ValueError if 'site_name' not found on 'representation' + """ + found = False + for repre_file in representation.pop().get("files"): + for site in repre_file.get("sites"): + if site["name"] == site_name: + found = True + break + if not found: + msg = "Site {} not found".format(site_name) + log.info(msg) + raise ValueError(msg) + + update = { + "$pull": {"files.$[].sites": {"name": site_name}} + } + arr_filter = [] + + self._update_site(collection, query, update, arr_filter) + + def _pause_unpause_site(self, collection, query, + representation, site_name, pause): + """ + Pauses/unpauses all files for 'representation' based on 'pause' + + Throws ValueError if 'site_name' not found on 'representation' + """ + found = False + site = None + for repre_file in representation.pop().get("files"): + for site in repre_file.get("sites"): + if site["name"] == site_name: + found = True + break + if not found: + msg = "Site {} not found".format(site_name) + log.info(msg) + raise ValueError(msg) + + if pause: + site['paused'] = pause + else: + if site.get('paused'): + site.pop('paused') + + update = { + "$set": {"files.$[].sites.$[s]": site} + } + + arr_filter = [ + {'s.name': site_name} + ] + + self._update_site(collection, query, update, arr_filter) + + def _add_site(self, collection, query, representation, elem, site_name, + force=False): + """ + Adds 'site_name' to 'representation' on 'collection' + + Use 'force' to remove existing or raises ValueError + """ + for repre_file in representation.pop().get("files"): + for site in repre_file.get("sites"): + if site["name"] == site_name: + if force: + self._reset_site_for_file(collection, query, + elem, repre_file["_id"], + site_name) + return + else: + msg = "Site {} already present".format(site_name) + log.info(msg) + raise ValueError(msg) + + update = { + "$push": {"files.$[].sites": elem} + } + + arr_filter = [] + + self._update_site(collection, query, update, arr_filter) + + def _remove_local_file(self, collection, representation_id, site_name): + """ + Removes all local files for 'site_name' of 'representation_id' + + Args: + collection (string): project name (must match DB) + representation_id (string): MongoDB _id value + site_name (string): name of configured and active site + + Returns: + only logs, catches IndexError and OSError + """ + my_local_site = get_local_site_id() + if my_local_site != site_name: + self.log.warning("Cannot remove non local file for {}". + format(site_name)) + return + + provider_name = self.get_provider_for_site(collection, site_name) + + if provider_name == 'local_drive': + handler = LocalDriveHandler(collection, site_name) + query = { + "_id": ObjectId(representation_id) + } + + representation = list( + self.connection.database[collection].find(query)) + if not representation: + self.log.debug("No repre {} found".format( + representation_id)) + return + + representation = representation.pop() + local_file_path = '' + for file in representation.get("files"): + local_file_path = self.get_local_file_path(collection, + site_name, + file.get("path", "") + ) + try: + self.log.debug("Removing {}".format(local_file_path)) + os.remove(local_file_path) + except IndexError: + msg = "No file set for {}".format(representation_id) + self.log.debug(msg) + raise ValueError(msg) + except OSError: + msg = "File {} cannot be removed".format(file["path"]) + self.log.warning(msg) + raise ValueError(msg) + + folder = None + try: + folder = os.path.dirname(local_file_path) + os.rmdir(folder) + except OSError: + msg = "folder {} cannot be removed".format(folder) + self.log.warning(msg) + raise ValueError(msg) + + def get_loop_delay(self, project_name): + """ + Return count of seconds before next synchronization loop starts + after finish of previous loop. + Returns: + (int): in seconds + """ + ld = self.sync_project_settings[project_name]["config"]["loop_delay"] + return int(ld) + + def show_widget(self): + """Show dialog to enter credentials""" + self.widget.show() + + def _get_success_dict(self, new_file_id): + """ + Provide success metadata ("id", "created_dt") to be stored in Db. + Used in $set: "DICT" part of query. + Sites are array inside of array(file), so real indexes for both + file and site are needed for upgrade in DB. + Args: + new_file_id: id of created file + Returns: + (dictionary) + """ + val = {"files.$[f].sites.$[s].id": new_file_id, + "files.$[f].sites.$[s].created_dt": datetime.now()} + return val + + def _get_error_dict(self, error="", tries="", progress=""): + """ + Provide error metadata to be stored in Db. + Used for set (error and tries provided) or unset mode. + Args: + error: (string) - message + tries: how many times failed + Returns: + (dictionary) + """ + val = {"files.$[f].sites.$[s].last_failed_dt": datetime.now(), + "files.$[f].sites.$[s].error": error, + "files.$[f].sites.$[s].tries": tries, + "files.$[f].sites.$[s].progress": progress + } + return val + + def _get_tries_count_from_rec(self, rec): + """ + Get number of failed attempts to sync from site record + Args: + rec (dictionary): info about specific site record + Returns: + (int) - number of failed attempts + """ + if not rec: + return 0 + return rec.get("tries", 0) + + def _get_tries_count(self, file, provider): + """ + Get number of failed attempts to sync + Args: + file (dictionary): info about specific file + provider (string): name of site ('gdrive' or specific user site) + Returns: + (int) - number of failed attempts + """ + _, rec = self._get_site_rec(file.get("sites", []), provider) + return rec.get("tries", 0) + + def _get_progress_dict(self, progress): + """ + Provide progress metadata to be stored in Db. + Used during upload/download for GUI to show. + Args: + progress: (float) - 0-1 progress of upload/download + Returns: + (dictionary) + """ + val = {"files.$[f].sites.$[s].progress": progress} + return val + + def _get_retries_arr(self, project_name): + """ + Returns array with allowed values in 'tries' field. If repre + contains these values, it means it was tried to be synchronized + but failed. We try up to 'self.presets["retry_cnt"]' times before + giving up and skipping representation. + Returns: + (list) + """ + retry_cnt = self.sync_project_settings[project_name].\ + get("config")["retry_cnt"] + arr = [i for i in range(int(retry_cnt))] + arr.append(None) + + return arr + + def _get_roots_config(self, presets, project_name, site_name): + """ + Returns configured root(s) for 'project_name' and 'site_name' from + settings ('presets') + """ + return presets[project_name]['sites'][site_name]['root'] diff --git a/openpype/modules/sync_server/tray/app.py b/openpype/modules/sync_server/tray/app.py index 476e9d16e8..41a0f84afb 100644 --- a/openpype/modules/sync_server/tray/app.py +++ b/openpype/modules/sync_server/tray/app.py @@ -159,7 +159,7 @@ class SyncProjectListWidget(ProjectListWidget): model.clear() project_name = None - for project_name in self.sync_server.get_sync_project_settings().\ + for project_name in self.sync_server.sync_project_settings.\ keys(): if self.sync_server.is_paused() or \ self.sync_server.is_project_paused(project_name): @@ -169,7 +169,7 @@ class SyncProjectListWidget(ProjectListWidget): model.appendRow(QtGui.QStandardItem(icon, project_name)) - if len(self.sync_server.get_sync_project_settings().keys()) == 0: + if len(self.sync_server.sync_project_settings.keys()) == 0: model.appendRow(QtGui.QStandardItem(DUMMY_PROJECT)) self.current_project = self.project_list.currentIndex().data( @@ -271,15 +271,29 @@ class SyncRepresentationWidget(QtWidgets.QWidget): ("subset", 190), ("version", 10), ("representation", 90), - ("created_dt", 100), - ("sync_dt", 100), - ("local_site", 60), - ("remote_site", 70), - ("files_count", 70), - ("files_size", 70), + ("created_dt", 105), + ("sync_dt", 105), + ("local_site", 80), + ("remote_site", 80), + ("files_count", 50), + ("files_size", 60), ("priority", 20), ("state", 50) ) + column_labels = ( + ("asset", "Asset"), + ("subset", "Subset"), + ("version", "Version"), + ("representation", "Representation"), + ("created_dt", "Created"), + ("sync_dt", "Synced"), + ("local_site", "Active site"), + ("remote_site", "Remote site"), + ("files_count", "Files"), + ("files_size", "Size"), + ("priority", "Priority"), + ("state", "Status") + ) def __init__(self, sync_server, project=None, parent=None): super(SyncRepresentationWidget, self).__init__(parent) @@ -298,8 +312,10 @@ class SyncRepresentationWidget(QtWidgets.QWidget): self.table_view = QtWidgets.QTableView() headers = [item[0] for item in self.default_widths] + header_labels = [item[1] for item in self.column_labels] - model = SyncRepresentationModel(sync_server, headers, project) + model = SyncRepresentationModel(sync_server, headers, + project, header_labels) self.table_view.setModel(model) self.table_view.setContextMenuPolicy(QtCore.Qt.CustomContextMenu) self.table_view.setSelectionMode( @@ -376,7 +392,7 @@ class SyncRepresentationWidget(QtWidgets.QWidget): """ _id = self.table_view.model().data(index, Qt.UserRole) detail_window = SyncServerDetailWindow( - self.sync_server, _id, self.table_view.model()._project) + self.sync_server, _id, self.table_view.model().project) detail_window.exec() def _on_context_menu(self, point): @@ -394,15 +410,28 @@ class SyncRepresentationWidget(QtWidgets.QWidget): menu = QtWidgets.QMenu() actions_mapping = {} + actions_kwargs_mapping = {} - action = QtWidgets.QAction("Open in explorer") - actions_mapping[action] = self._open_in_explorer - menu.addAction(action) + local_site = self.item.local_site + local_progress = self.item.local_progress + remote_site = self.item.remote_site + remote_progress = self.item.remote_progress - local_site, local_progress = self.item.local_site.split() - remote_site, remote_progress = self.item.remote_site.split() - local_progress = float(local_progress) - remote_progress = float(remote_progress) + for site, progress in {local_site: local_progress, + remote_site: remote_progress}.items(): + project = self.table_view.model().project + provider = self.sync_server.get_provider_for_site(project, + site) + if provider == 'local_drive': + if 'studio' in site: + txt = " studio version" + else: + txt = " local version" + action = QtWidgets.QAction("Open in explorer" + txt) + if progress == 1.0: + actions_mapping[action] = self._open_in_explorer + actions_kwargs_mapping[action] = {'site': site} + menu.addAction(action) # progress smaller then 1.0 --> in progress or queued if local_progress < 1.0: @@ -452,13 +481,14 @@ class SyncRepresentationWidget(QtWidgets.QWidget): result = menu.exec_(QtGui.QCursor.pos()) if result: to_run = actions_mapping[result] + to_run_kwargs = actions_kwargs_mapping.get(result, {}) if to_run: - to_run() + to_run(**to_run_kwargs) self.table_view.model().refresh() def _pause(self): - self.sync_server.pause_representation(self.table_view.model()._project, + self.sync_server.pause_representation(self.table_view.model().project, self.representation_id, self.site_name) self.site_name = None @@ -466,7 +496,7 @@ class SyncRepresentationWidget(QtWidgets.QWidget): def _unpause(self): self.sync_server.unpause_representation( - self.table_view.model()._project, + self.table_view.model().project, self.representation_id, self.site_name) self.site_name = None @@ -476,7 +506,7 @@ class SyncRepresentationWidget(QtWidgets.QWidget): # temporary here for testing, will be removed TODO def _add_site(self): log.info(self.representation_id) - project_name = self.table_view.model()._project + project_name = self.table_view.model().project local_site_name = self.sync_server.get_my_local_site() try: self.sync_server.add_site( @@ -504,7 +534,7 @@ class SyncRepresentationWidget(QtWidgets.QWidget): try: local_site = get_local_site_id() self.sync_server.remove_site( - self.table_view.model()._project, + self.table_view.model().project, self.representation_id, local_site, True @@ -519,7 +549,7 @@ class SyncRepresentationWidget(QtWidgets.QWidget): redo of upload/download """ self.sync_server.reset_provider_for_file( - self.table_view.model()._project, + self.table_view.model().project, self.representation_id, 'local' ) @@ -530,18 +560,20 @@ class SyncRepresentationWidget(QtWidgets.QWidget): redo of upload/download """ self.sync_server.reset_provider_for_file( - self.table_view.model()._project, + self.table_view.model().project, self.representation_id, 'remote' ) - def _open_in_explorer(self): + def _open_in_explorer(self, site): if not self.item: return fpath = self.item.path - project = self.table_view.model()._project - fpath = self.sync_server.get_local_file_path(project, fpath) + project = self.table_view.model().project + fpath = self.sync_server.get_local_file_path(project, + site, + fpath) fpath = os.path.normpath(os.path.dirname(fpath)) if os.path.isdir(fpath): @@ -556,6 +588,10 @@ class SyncRepresentationWidget(QtWidgets.QWidget): raise OSError('unsupported xdg-open call??') +ProviderRole = QtCore.Qt.UserRole + 2 +ProgressRole = QtCore.Qt.UserRole + 4 + + class SyncRepresentationModel(QtCore.QAbstractTableModel): """ Model for summary of representations. @@ -612,15 +648,20 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): sync_dt = attr.ib(default=None) local_site = attr.ib(default=None) remote_site = attr.ib(default=None) + local_provider = attr.ib(default=None) + remote_provider = attr.ib(default=None) + local_progress = attr.ib(default=None) + remote_progress = attr.ib(default=None) files_count = attr.ib(default=None) files_size = attr.ib(default=None) priority = attr.ib(default=None) state = attr.ib(default=None) path = attr.ib(default=None) - def __init__(self, sync_server, header, project=None): + def __init__(self, sync_server, header, project=None, header_labels=None): super(SyncRepresentationModel, self).__init__() self._header = header + self._header_labels = header_labels self._data = [] self._project = project self._rec_loaded = 0 @@ -634,8 +675,8 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): self.sync_server = sync_server # TODO think about admin mode # this is for regular user, always only single local and single remote - self.local_site = self.sync_server.get_active_site(self._project) - self.remote_site = self.sync_server.get_remote_site(self._project) + self.local_site = self.sync_server.get_active_site(self.project) + self.remote_site = self.sync_server.get_remote_site(self.project) self.projection = self.get_default_projection() @@ -659,26 +700,46 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): All queries should go through this (because of collection). """ - return self.sync_server.connection.database[self._project] + return self.sync_server.connection.database[self.project] + + @property + def project(self): + """Returns project""" + return self._project def data(self, index, role): item = self._data[index.row()] + if role == ProviderRole: + if self._header[index.column()] == 'local_site': + return item.local_provider + if self._header[index.column()] == 'remote_site': + return item.remote_provider + + if role == ProgressRole: + if self._header[index.column()] == 'local_site': + return item.local_progress + if self._header[index.column()] == 'remote_site': + return item.remote_progress + if role == Qt.DisplayRole: return attr.asdict(item)[self._header[index.column()]] if role == Qt.UserRole: return item._id - def rowCount(self, index): + def rowCount(self, _index): return len(self._data) - def columnCount(self, index): + def columnCount(self, _index): return len(self._header) def headerData(self, section, orientation, role): if role == Qt.DisplayRole: if orientation == Qt.Horizontal: - return str(self._header[section]) + if self._header_labels: + return str(self._header_labels[section]) + else: + return str(self._header[section]) def tick(self): """ @@ -718,7 +779,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): than single page of records) """ if self.sync_server.is_paused() or \ - self.sync_server.is_project_paused(self._project): + self.sync_server.is_project_paused(self.project): return self.beginResetModel() @@ -751,10 +812,10 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): self._total_records = count local_provider = _translate_provider_for_icon(self.sync_server, - self._project, + self.project, local_site) remote_provider = _translate_provider_for_icon(self.sync_server, - self._project, + self.project, remote_site) for repre in result.get("paginatedResults"): @@ -784,7 +845,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): if context.get("version"): version = "v{:0>3d}".format(context.get("version")) else: - version = "hero" + version = "master" item = self.SyncRepresentation( repre.get("_id"), @@ -794,8 +855,12 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): context.get("representation"), local_updated, remote_updated, - '{} {}'.format(local_provider, avg_progress_local), - '{} {}'.format(remote_provider, avg_progress_remote), + local_site, + remote_site, + local_provider, + remote_provider, + avg_progress_local, + avg_progress_remote, repre.get("files_count", 1), repre.get("files_size", 0), 1, @@ -806,7 +871,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): self._data.append(item) self._rec_loaded += 1 - def canFetchMore(self, index): + def canFetchMore(self, _index): """ Check if there are more records than currently loaded """ @@ -858,7 +923,8 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): self.sort = {self.SORT_BY_COLUMN[index]: order, '_id': 1} self.query = self.get_default_query() # import json - # log.debug(json.dumps(self.query, indent=4).replace('False', 'false').\ + # log.debug(json.dumps(self.query, indent=4).\ + # replace('False', 'false').\ # replace('True', 'true').replace('None', 'null')) representations = self.dbcon.aggregate(self.query) @@ -883,8 +949,8 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): """ self._project = project self.sync_server.set_sync_project_settings() - self.local_site = self.sync_server.get_active_site(self._project) - self.remote_site = self.sync_server.get_remote_site(self._project) + self.local_site = self.sync_server.get_active_site(self.project) + self.remote_site = self.sync_server.get_remote_site(self.project) self.refresh() def get_index(self, id): @@ -1206,15 +1272,26 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): default_widths = ( ("file", 290), - ("created_dt", 120), - ("sync_dt", 120), - ("local_site", 60), - ("remote_site", 60), + ("created_dt", 105), + ("sync_dt", 105), + ("local_site", 80), + ("remote_site", 80), ("size", 60), ("priority", 20), ("state", 90) ) + column_labels = ( + ("file", "File name"), + ("created_dt", "Created"), + ("sync_dt", "Synced"), + ("local_site", "Active site"), + ("remote_site", "Remote site"), + ("files_size", "Size"), + ("priority", "Priority"), + ("state", "Status") + ) + def __init__(self, sync_server, _id=None, project=None, parent=None): super(SyncRepresentationDetailWidget, self).__init__(parent) @@ -1235,9 +1312,10 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): self.table_view = QtWidgets.QTableView() headers = [item[0] for item in self.default_widths] + header_labels = [item[1] for item in self.column_labels] model = SyncRepresentationDetailModel(sync_server, headers, _id, - project) + project, header_labels) self.table_view.setModel(model) self.table_view.setContextMenuPolicy(QtCore.Qt.CustomContextMenu) self.table_view.setSelectionMode( @@ -1330,23 +1408,39 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): menu = QtWidgets.QMenu() actions_mapping = {} + actions_kwargs_mapping = {} - action = QtWidgets.QAction("Open in explorer") - actions_mapping[action] = self._open_in_explorer - menu.addAction(action) + local_site = self.item.local_site + local_progress = self.item.local_progress + remote_site = self.item.remote_site + remote_progress = self.item.remote_progress + + for site, progress in {local_site: local_progress, + remote_site: remote_progress}.items(): + project = self.table_view.model().project + provider = self.sync_server.get_provider_for_site(project, + site) + if provider == 'local_drive': + if 'studio' in site: + txt = " studio version" + else: + txt = " local version" + action = QtWidgets.QAction("Open in explorer" + txt) + if progress == 1: + actions_mapping[action] = self._open_in_explorer + actions_kwargs_mapping[action] = {'site': site} + menu.addAction(action) if self.item.state == STATUS[1]: action = QtWidgets.QAction("Open error detail") actions_mapping[action] = self._show_detail menu.addAction(action) - remote_site, remote_progress = self.item.remote_site.split() if float(remote_progress) == 1.0: action = QtWidgets.QAction("Reset local site") actions_mapping[action] = self._reset_local_site menu.addAction(action) - local_site, local_progress = self.item.local_site.split() if float(local_progress) == 1.0: action = QtWidgets.QAction("Reset remote site") actions_mapping[action] = self._reset_remote_site @@ -1360,8 +1454,9 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): result = menu.exec_(QtGui.QCursor.pos()) if result: to_run = actions_mapping[result] + to_run_kwargs = actions_kwargs_mapping.get(result, {}) if to_run: - to_run() + to_run(**to_run_kwargs) def _reset_local_site(self): """ @@ -1369,7 +1464,7 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): redo of upload/download """ self.sync_server.reset_provider_for_file( - self.table_view.model()._project, + self.table_view.model().project, self.representation_id, 'local', self.item._id) @@ -1381,19 +1476,19 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): redo of upload/download """ self.sync_server.reset_provider_for_file( - self.table_view.model()._project, + self.table_view.model().project, self.representation_id, 'remote', self.item._id) self.table_view.model().refresh() - def _open_in_explorer(self): + def _open_in_explorer(self, site): if not self.item: return fpath = self.item.path - project = self.table_view.model()._project - fpath = self.sync_server.get_local_file_path(project, fpath) + project = self.project + fpath = self.sync_server.get_local_file_path(project, site, fpath) fpath = os.path.normpath(os.path.dirname(fpath)) if os.path.isdir(fpath): @@ -1415,6 +1510,8 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): Used in detail window accessible after clicking on single repre in the summary. + TODO refactor - merge with SyncRepresentationModel if possible + Args: sync_server (SyncServer) - object to call server operations (update db status, set site status...) @@ -1424,7 +1521,6 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): a specific collection """ PAGE_SIZE = 30 - # TODO add filter filename DEFAULT_SORT = { "files.path": 1 } @@ -1452,6 +1548,10 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): sync_dt = attr.ib(default=None) local_site = attr.ib(default=None) remote_site = attr.ib(default=None) + local_provider = attr.ib(default=None) + remote_provider = attr.ib(default=None) + local_progress = attr.ib(default=None) + remote_progress = attr.ib(default=None) size = attr.ib(default=None) priority = attr.ib(default=None) state = attr.ib(default=None) @@ -1459,9 +1559,11 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): error = attr.ib(default=None) path = attr.ib(default=None) - def __init__(self, sync_server, header, _id, project=None): + def __init__(self, sync_server, header, _id, + project=None, header_labels=None): super(SyncRepresentationDetailModel, self).__init__() self._header = header + self._header_labels = header_labels self._data = [] self._project = project self._rec_loaded = 0 @@ -1473,8 +1575,8 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): self.sync_server = sync_server # TODO think about admin mode # this is for regular user, always only single local and single remote - self.local_site = self.sync_server.get_active_site(self._project) - self.remote_site = self.sync_server.get_remote_site(self._project) + self.local_site = self.sync_server.get_active_site(self.project) + self.remote_site = self.sync_server.get_remote_site(self.project) self.sort = self.DEFAULT_SORT @@ -1491,9 +1593,26 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): @property def dbcon(self): - return self.sync_server.connection.database[self._project] + """ + Database object with preselected project (collection) to run DB + operations (find, aggregate). + + All queries should go through this (because of collection). + """ + return self.sync_server.connection.database[self.project] + + @property + def project(self): + """Returns project""" + return self.project def tick(self): + """ + Triggers refresh of model. + + Because of pagination, prepared (sorting, filtering) query needs + to be run on DB every X seconds. + """ self.refresh(representations=None, load_records=self._rec_loaded) self.timer.start(SyncRepresentationModel.REFRESH_SEC) @@ -1510,21 +1629,37 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): def data(self, index, role): item = self._data[index.row()] + + if role == ProviderRole: + if self._header[index.column()] == 'local_site': + return item.local_provider + if self._header[index.column()] == 'remote_site': + return item.remote_provider + + if role == ProgressRole: + if self._header[index.column()] == 'local_site': + return item.local_progress + if self._header[index.column()] == 'remote_site': + return item.remote_progress + if role == Qt.DisplayRole: return attr.asdict(item)[self._header[index.column()]] if role == Qt.UserRole: return item._id - def rowCount(self, index): + def rowCount(self, _index): return len(self._data) - def columnCount(self, index): + def columnCount(self, _index): return len(self._header) def headerData(self, section, orientation, role): if role == Qt.DisplayRole: if orientation == Qt.Horizontal: - return str(self._header[section]) + if self._header_labels: + return str(self._header_labels[section]) + else: + return str(self._header[section]) def refresh(self, representations=None, load_records=0): if self.sync_server.is_paused(): @@ -1561,10 +1696,10 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): self._total_records = count local_provider = _translate_provider_for_icon(self.sync_server, - self._project, + self.project, local_site) remote_provider = _translate_provider_for_icon(self.sync_server, - self._project, + self.project, remote_site) for repre in result.get("paginatedResults"): @@ -1585,9 +1720,9 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): repre.get('updated_dt_remote').strftime( "%Y%m%dT%H%M%SZ") - progress_remote = _convert_progress( + remote_progress = _convert_progress( repre.get('progress_remote', '0')) - progress_local = _convert_progress( + local_progress = _convert_progress( repre.get('progress_local', '0')) errors = [] @@ -1601,8 +1736,12 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): os.path.basename(file["path"]), local_updated, remote_updated, - '{} {}'.format(local_provider, progress_local), - '{} {}'.format(remote_provider, progress_remote), + local_site, + remote_site, + local_provider, + remote_provider, + local_progress, + remote_progress, file.get('size', 0), 1, STATUS[repre.get("status", -1)], @@ -1614,7 +1753,7 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): self._data.append(item) self._rec_loaded += 1 - def canFetchMore(self, index): + def canFetchMore(self, _index): """ Check if there are more records than currently loaded """ @@ -1918,11 +2057,8 @@ class ImageDelegate(QtWidgets.QStyledItemDelegate): option.palette.highlight()) painter.setOpacity(1) - d = index.data(QtCore.Qt.DisplayRole) - if d: - provider, value = d.split() - else: - return + provider = index.data(ProviderRole) + value = index.data(ProgressRole) if not self.icons.get(provider): resource_path = os.path.dirname(__file__) @@ -2008,7 +2144,7 @@ class SizeDelegate(QtWidgets.QStyledItemDelegate): def __init__(self, parent=None): super(SizeDelegate, self).__init__(parent) - def displayText(self, value, locale): + def displayText(self, value, _locale): if value is None: # Ignore None value return diff --git a/openpype/modules/sync_server/utils.py b/openpype/modules/sync_server/utils.py index 0762766783..36f3444399 100644 --- a/openpype/modules/sync_server/utils.py +++ b/openpype/modules/sync_server/utils.py @@ -1,8 +1,14 @@ import time -from openpype.api import Logger +from openpype.api import Logger log = Logger().get_logger("SyncServer") +class SyncStatus: + DO_NOTHING = 0 + DO_UPLOAD = 1 + DO_DOWNLOAD = 2 + + def time_function(method): """ Decorator to print how much time function took. For debugging. diff --git a/openpype/plugins/load/add_site.py b/openpype/plugins/load/add_site.py new file mode 100644 index 0000000000..09448d553c --- /dev/null +++ b/openpype/plugins/load/add_site.py @@ -0,0 +1,33 @@ +from avalon import api +from openpype.modules import ModulesManager + + +class AddSyncSite(api.Loader): + """Add sync site to representation""" + representations = ["*"] + families = ["*"] + + label = "Add Sync Site" + order = 2 # lower means better + icon = "download" + color = "#999999" + + def load(self, context, name=None, namespace=None, data=None): + self.log.info("Adding {} to representation: {}".format( + data["site_name"], data["_id"])) + self.add_site_to_representation(data["project_name"], + data["_id"], + data["site_name"]) + self.log.debug("Site added.") + + @staticmethod + def add_site_to_representation(project_name, representation_id, site_name): + """Adds new site to representation_id, resets if exists""" + manager = ModulesManager() + sync_server = manager.modules_by_name["sync_server"] + sync_server.add_site(project_name, representation_id, site_name, + force=True) + + def filepath_from_context(self, context): + """No real file loading""" + return "" diff --git a/openpype/plugins/load/delete_old_versions.py b/openpype/plugins/load/delete_old_versions.py index e5132e0f8a..8e3999e9c4 100644 --- a/openpype/plugins/load/delete_old_versions.py +++ b/openpype/plugins/load/delete_old_versions.py @@ -15,11 +15,12 @@ from openpype.api import Anatomy class DeleteOldVersions(api.Loader): - + """Deletes specific number of old version""" representations = ["*"] families = ["*"] label = "Delete Old Versions" + order = 35 icon = "trash" color = "#d8d8d8" @@ -421,8 +422,9 @@ class DeleteOldVersions(api.Loader): class CalculateOldVersions(DeleteOldVersions): - + """Calculate file size of old versions""" label = "Calculate Old Versions" + order = 30 options = [ qargparse.Integer( diff --git a/openpype/plugins/load/remove_site.py b/openpype/plugins/load/remove_site.py new file mode 100644 index 0000000000..aedb5d1f2f --- /dev/null +++ b/openpype/plugins/load/remove_site.py @@ -0,0 +1,33 @@ +from avalon import api +from openpype.modules import ModulesManager + + +class RemoveSyncSite(api.Loader): + """Remove sync site and its files on representation""" + representations = ["*"] + families = ["*"] + + label = "Remove Sync Site" + order = 4 + icon = "download" + color = "#999999" + + def load(self, context, name=None, namespace=None, data=None): + self.log.info("Removing {} on representation: {}".format( + data["site_name"], data["_id"])) + self.remove_site_on_representation(data["project_name"], + data["_id"], + data["site_name"]) + self.log.debug("Site added.") + + @staticmethod + def remove_site_on_representation(project_name, representation_id, + site_name): + manager = ModulesManager() + sync_server = manager.modules_by_name["sync_server"] + sync_server.remove_site(project_name, representation_id, + site_name, True) + + def filepath_from_context(self, context): + """No real file loading""" + return ""