diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 0000000000..567bb92773 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,76 @@ +# Contributor Covenant Code of Conduct + +## Our Pledge + +In the interest of fostering an open and welcoming environment, we as +contributors and maintainers pledge to making participation in our project and +our community a harassment-free experience for everyone, regardless of age, body +size, disability, ethnicity, sex characteristics, gender identity and expression, +level of experience, education, socio-economic status, nationality, personal +appearance, race, religion, or sexual identity and orientation. + +## Our Standards + +Examples of behavior that contributes to creating a positive environment +include: + +* Using welcoming and inclusive language +* Being respectful of differing viewpoints and experiences +* Gracefully accepting constructive criticism +* Focusing on what is best for the community +* Showing empathy towards other community members + +Examples of unacceptable behavior by participants include: + +* The use of sexualized language or imagery and unwelcome sexual attention or + advances +* Trolling, insulting/derogatory comments, and personal or political attacks +* Public or private harassment +* Publishing others' private information, such as a physical or electronic + address, without explicit permission +* Other conduct which could reasonably be considered inappropriate in a + professional setting + +## Our Responsibilities + +Project maintainers are responsible for clarifying the standards of acceptable +behavior and are expected to take appropriate and fair corrective action in +response to any instances of unacceptable behavior. + +Project maintainers have the right and responsibility to remove, edit, or +reject comments, commits, code, wiki edits, issues, and other contributions +that are not aligned to this Code of Conduct, or to ban temporarily or +permanently any contributor for other behaviors that they deem inappropriate, +threatening, offensive, or harmful. + +## Scope + +This Code of Conduct applies both within project spaces and in public spaces +when an individual is representing the project or its community. Examples of +representing a project or community include using an official project e-mail +address, posting via an official social media account, or acting as an appointed +representative at an online or offline event. Representation of a project may be +further defined and clarified by project maintainers. + +## Enforcement + +Instances of abusive, harassing, or otherwise unacceptable behavior may be +reported by contacting the project team at info@pype.club. All +complaints will be reviewed and investigated and will result in a response that +is deemed necessary and appropriate to the circumstances. The project team is +obligated to maintain confidentiality with regard to the reporter of an incident. +Further details of specific enforcement policies may be posted separately. + +Project maintainers who do not follow or enforce the Code of Conduct in good +faith may face temporary or permanent repercussions as determined by other +members of the project's leadership. + +## Attribution + +This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, +available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html + +[homepage]: https://www.contributor-covenant.org + +For answers to common questions about this code of conduct, see +https://www.contributor-covenant.org/faq diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000000..e9473eb4e8 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,53 @@ +## How to contribute to Pype + +#### **Did you find a bug?** + +1. Check in the issues and our [bug triage[(https://github.com/pypeclub/pype/projects/2) to make sure it wasn't reported already. +2. Ask on our [discord](http://pype.community/chat) Often, what appears as a bug, might be the intended behaviour for someone else. +3. Create a new issue. +4. Use the issue template for you PR please. + + +#### **Did you write a patch that fixes a bug?** + +- Open a new GitHub pull request with the patch. +- Ensure the PR description clearly describes the problem and solution. Include the relevant issue number if applicable. + + +#### **Do you intend to add a new feature or change an existing one?** + +- Open a new thread in the [github discussions](https://github.com/pypeclub/pype/discussions/new) +- Do not open issue untill the suggestion is discussed. We will convert accepted suggestions into backlog and point them to the relevant discussion thread to keep the context. + +#### **Do you have questions about the source code?** + +Open a new question on [github discussions](https://github.com/pypeclub/pype/discussions/new) + +## Branching Strategy + +As we move to 3.x as the primary supported version of pype and only keep 2.15 on bug bugfixes and client sponsored feature requests, we need to be very careful with merging strategy. + +We also use this opportunity to switch the branch naming. 3.0 production branch will no longer be called MASTER, but will be renamed to MAIN. Develop will stay as it is. + +A few important notes about 2.x and 3.x development: + +- 3.x features are not backported to 2.x unless specifically requested +- 3.x bugs and hotfixes can be ported to 2.x if they are relevant or severe +- 2.x features and bugs MUST be ported to 3.x at the same time + +## Pull Requests + +- Each 2.x PR MUST have a corresponding 3.x PR in github. Without 3.x PR, 2.x features will not be merged! Luckily most of the code is compatible, albeit sometimes in a different place after refactor. Porting from 2.x to 3.x should be really easy. +- Please keep the corresponding 2 and 3 PR names the same so they can be easily identified from the PR list page. +- Each 2.x PR should be labeled with `2.x-dev` label. + +Inside each PR, put a link to the corresponding PR + +Of course if you want to contribute, feel free to make a PR to only 2.x/develop or develop, based on what you are using. While reviewing the PRs, we might convert the code to corresponding PR for the other release ourselves. + +We might also change the target of you PR to and intermediate branch, rather than `develop` if we feel it requires some extra work on our end. That way, we preserve all your commits so you don't loos out on the contribution credits. + + + + +If a PR is targeted at 2.x release it must be labelled with 2x-dev label in Github. diff --git a/pype/modules/sync_server/README.md b/pype/modules/sync_server/README.md index 8ecf849a4e..d7d7f3718b 100644 --- a/pype/modules/sync_server/README.md +++ b/pype/modules/sync_server/README.md @@ -62,7 +62,7 @@ Needed configuration: - `"local_id": "local_0",` -- identifier of user pype - `"retry_cnt": 3,` -- how many times try to synch file in case of error - `"loop_delay": 60,` -- how many seconds between sync loops - - `"active_site": "studio",` -- which site user current, 'studio' by default, + - `"publish_site": "studio",` -- which site user current, 'studio' by default, could by same as 'local_id' if user is working from home without connection to studio infrastructure @@ -71,7 +71,7 @@ Needed configuration: Used in IntegrateNew to prepare skeleton for syncing in the representation record. Leave empty if no syncing is wanted. - This is a general configuration, 'local_id', 'active_site' and 'remote_site' + This is a general configuration, 'local_id', 'publish_site' and 'remote_site' will be set and changed by some GUI in the future. `pype/settings/defaults/project_settings/global.json`.`sync_server`.`sites`: diff --git a/pype/modules/sync_server/providers/abstract_provider.py b/pype/modules/sync_server/providers/abstract_provider.py index 6931373561..9130a06d94 100644 --- a/pype/modules/sync_server/providers/abstract_provider.py +++ b/pype/modules/sync_server/providers/abstract_provider.py @@ -3,6 +3,13 @@ from abc import ABCMeta, abstractmethod class AbstractProvider(metaclass=ABCMeta): + def __init__(self, site_name, tree=None, presets=None): + self.presets = None + self.active = False + self.site_name = site_name + + self.presets = presets + @abstractmethod def is_active(self): """ @@ -27,13 +34,14 @@ class AbstractProvider(metaclass=ABCMeta): pass @abstractmethod - def download_file(self, source_path, local_path): + def download_file(self, source_path, local_path, overwrite=True): """ Download file from provider into local system Args: source_path (string): absolute path on provider local_path (string): absolute path on local + overwrite (bool): default set to True Returns: None """ diff --git a/pype/modules/sync_server/providers/gdrive.py b/pype/modules/sync_server/providers/gdrive.py index 5bc6f21b38..b141131203 100644 --- a/pype/modules/sync_server/providers/gdrive.py +++ b/pype/modules/sync_server/providers/gdrive.py @@ -351,6 +351,10 @@ class GDriveHandler(AbstractProvider): last_tick = status = response = None status_val = 0 while response is None: + if server.is_representation_paused(representation['_id'], + check_parents=True, + project_name=collection): + raise ValueError("Paused during process, please redo.") if status: status_val = float(status.progress()) if not last_tick or \ @@ -433,6 +437,10 @@ class GDriveHandler(AbstractProvider): last_tick = status = response = None status_val = 0 while response is None: + if server.is_representation_paused(representation['_id'], + check_parents=True, + project_name=collection): + raise ValueError("Paused during process, please redo.") if status: status_val = float(status.progress()) if not last_tick or \ diff --git a/pype/modules/sync_server/providers/lib.py b/pype/modules/sync_server/providers/lib.py index a6a52f0624..144594ecbe 100644 --- a/pype/modules/sync_server/providers/lib.py +++ b/pype/modules/sync_server/providers/lib.py @@ -1,10 +1,6 @@ from enum import Enum from .gdrive import GDriveHandler - - -class Providers(Enum): - LOCAL = 'studio' - GDRIVE = 'gdrive' +from .local_drive import LocalDriveHandler class ProviderFactory: @@ -94,3 +90,4 @@ factory = ProviderFactory() # 7 denotes number of files that could be synced in single loop - learned by # trial and error factory.register_provider('gdrive', GDriveHandler, 7) +factory.register_provider('local_drive', LocalDriveHandler, 10) diff --git a/pype/modules/sync_server/providers/local_drive.py b/pype/modules/sync_server/providers/local_drive.py new file mode 100644 index 0000000000..4d16b8b930 --- /dev/null +++ b/pype/modules/sync_server/providers/local_drive.py @@ -0,0 +1,59 @@ +from __future__ import print_function +import os.path +import shutil + +from pype.api import Logger +from .abstract_provider import AbstractProvider + +log = Logger().get_logger("SyncServer") + + +class LocalDriveHandler(AbstractProvider): + """ Handles required operations on mounted disks with OS """ + def is_active(self): + return True + + def upload_file(self, source_path, target_path, overwrite=True): + """ + Copies file from 'source_path' to 'target_path' + """ + if os.path.exists(source_path): + if overwrite: + shutil.copy(source_path, target_path) + else: + if os.path.exists(target_path): + raise ValueError("File {} exists, set overwrite". + format(target_path)) + + def download_file(self, source_path, local_path, overwrite=True): + """ + Download a file form 'source_path' to 'local_path' + """ + if os.path.exists(source_path): + if overwrite: + shutil.copy(source_path, local_path) + else: + if os.path.exists(local_path): + raise ValueError("File {} exists, set overwrite". + format(local_path)) + + def delete_file(self, path): + """ + Deletes a file at 'path' + """ + if os.path.exists(path): + os.remove(path) + + def list_folder(self, folder_path): + """ + Returns list of files and subfolder in a 'folder_path'. Non recurs + """ + lst = [] + if os.path.isdir(folder_path): + for (dir_path, dir_names, file_names) in os.walk(folder_path): + for name in file_names: + lst.append(os.path.join(dir_path, name)) + for name in dir_names: + lst.append(os.path.join(dir_path, name)) + + return lst diff --git a/pype/modules/sync_server/providers/resources/local_drive.png b/pype/modules/sync_server/providers/resources/local_drive.png new file mode 100644 index 0000000000..b53bdccac9 Binary files /dev/null and b/pype/modules/sync_server/providers/resources/local_drive.png differ diff --git a/pype/modules/sync_server/providers/resources/studio.png b/pype/modules/sync_server/providers/resources/studio.png index e95e9762f8..d61b7832bd 100644 Binary files a/pype/modules/sync_server/providers/resources/studio.png and b/pype/modules/sync_server/providers/resources/studio.png differ diff --git a/pype/modules/sync_server/resources/paused.png b/pype/modules/sync_server/resources/paused.png new file mode 100644 index 0000000000..c18d25d2f1 Binary files /dev/null and b/pype/modules/sync_server/resources/paused.png differ diff --git a/pype/modules/sync_server/resources/synced.png b/pype/modules/sync_server/resources/synced.png new file mode 100644 index 0000000000..d69c22992f Binary files /dev/null and b/pype/modules/sync_server/resources/synced.png differ diff --git a/pype/modules/sync_server/sync_server.py b/pype/modules/sync_server/sync_server.py index 84637a1d62..22dede66d8 100644 --- a/pype/modules/sync_server/sync_server.py +++ b/pype/modules/sync_server/sync_server.py @@ -1,4 +1,5 @@ from pype.api import ( + Anatomy, get_project_settings, get_current_project_settings) @@ -19,6 +20,7 @@ from .utils import time_function import six from pype.lib import PypeLogger from .. import PypeModule, ITrayModule +from .providers.local_drive import LocalDriveHandler if six.PY2: web = asyncio = STATIC_DIR = WebSocketAsync = None @@ -42,8 +44,11 @@ class SyncServer(PypeModule, ITrayModule): that are marked to be in different location than 'studio' (temporary), checks if 'created_dt' field is present denoting successful sync with provider destination. - Sites structure is created during publish and by default it will - always contain 1 record with "name" == self.presets["active_site"] and + Sites structure is created during publish OR by calling 'add_site' + method. + + By default it will always contain 1 record with + "name" == self.presets["active_site"] and filled "created_dt" AND 1 or multiple records for all defined remote sites, where "created_dt" is not present. This highlights that file should be uploaded to @@ -118,6 +123,204 @@ class SyncServer(PypeModule, ITrayModule): self.sync_server_thread = None # asyncio requires new thread self.action_show_widget = None + self._paused = False + self._paused_projects = set() + self._paused_representations = set() + self._anatomies = {} + + # public facing API + def add_site(self, collection, representation_id, site_name=None): + """ + Adds new site to representation to be synced. + + 'collection' must have synchronization enabled (globally or + project only) + + Used as a API endpoint from outside applications (Loader etc) + + Args: + collection (string): project name (must match DB) + representation_id (string): MongoDB _id value + site_name (string): name of configured and active site + + Returns: + throws ValueError if any issue + """ + if not self.get_synced_preset(collection): + raise ValueError("Project not configured") + + if not site_name: + site_name = self.DEFAULT_SITE + + self.reset_provider_for_file(collection, + representation_id, + site_name=site_name) + + # public facing API + def remove_site(self, collection, representation_id, site_name, + remove_local_files=False): + """ + Removes 'site_name' for particular 'representation_id' on + 'collection' + + Args: + collection (string): project name (must match DB) + representation_id (string): MongoDB _id value + site_name (string): name of configured and active site + remove_local_files (bool): remove only files for 'local_id' + site + + Returns: + throws ValueError if any issue + """ + if not self.get_synced_preset(collection): + raise ValueError("Project not configured") + + self.reset_provider_for_file(collection, + representation_id, + site_name=site_name, + remove=True) + if remove_local_files: + self._remove_local_file(collection, representation_id, site_name) + + def clear_project(self, collection, site_name): + """ + Clear 'collection' of 'site_name' and its local files + + Works only on real local sites, not on 'studio' + """ + query = { + "type": "representation", + "files.sites.name": site_name + } + + representations = list( + self.connection.database[collection].find(query)) + if not representations: + self.log.debug("No repre found") + return + + for repre in representations: + self.remove_site(collection, repre.get("_id"), site_name, True) + + def pause_representation(self, collection, representation_id, site_name): + """ + Sets 'representation_id' as paused, eg. no syncing should be + happening on it. + + Args: + collection (string): project name + representation_id (string): MongoDB objectId value + site_name (string): 'gdrive', 'studio' etc. + """ + log.info("Pausing SyncServer for {}".format(representation_id)) + self._paused_representations.add(representation_id) + self.reset_provider_for_file(collection, representation_id, + site_name=site_name, pause=True) + + def unpause_representation(self, collection, representation_id, site_name): + """ + Sets 'representation_id' as unpaused. + + Does not fail or warn if repre wasn't paused. + + Args: + collection (string): project name + representation_id (string): MongoDB objectId value + site_name (string): 'gdrive', 'studio' etc. + """ + log.info("Unpausing SyncServer for {}".format(representation_id)) + try: + self._paused_representations.remove(representation_id) + except KeyError: + pass + # self.paused_representations is not persistent + self.reset_provider_for_file(collection, representation_id, + site_name=site_name, pause=False) + + def is_representation_paused(self, representation_id, + check_parents=False, project_name=None): + """ + Returns if 'representation_id' is paused or not. + + Args: + representation_id (string): MongoDB objectId value + check_parents (bool): check if parent project or server itself + are not paused + project_name (string): project to check if paused + + if 'check_parents', 'project_name' should be set too + Returns: + (bool) + """ + condition = representation_id in self._paused_representations + if check_parents and project_name: + condition = condition or \ + self.is_project_paused(project_name) or \ + self.is_paused() + return condition + + def pause_project(self, project_name): + """ + Sets 'project_name' as paused, eg. no syncing should be + happening on all representation inside. + + Args: + project_name (string): collection name + """ + log.info("Pausing SyncServer for {}".format(project_name)) + self._paused_projects.add(project_name) + + def unpause_project(self, project_name): + """ + Sets 'project_name' as unpaused + + Does not fail or warn if project wasn't paused. + + Args: + project_name (string): collection name + """ + log.info("Unpausing SyncServer for {}".format(project_name)) + try: + self._paused_projects.remove(project_name) + except KeyError: + pass + + def is_project_paused(self, project_name, check_parents=False): + """ + Returns if 'project_name' is paused or not. + + Args: + project_name (string): collection name + check_parents (bool): check if server itself + is not paused + Returns: + (bool) + """ + condition = project_name in self._paused_projects + if check_parents: + condition = condition or self.is_paused() + return condition + + def pause_server(self): + """ + Pause sync server + + It won't check anything, not uploading/downloading... + """ + log.info("Pausing SyncServer") + self._paused = True + + def unpause_server(self): + """ + Unpause server + """ + log.info("Unpausing SyncServer") + self._paused = False + + def is_paused(self): + """ Is server paused """ + return self._paused def connect_with_modules(self, *_a, **kw): return @@ -210,32 +413,17 @@ class SyncServer(PypeModule, ITrayModule): def is_running(self): return self.sync_server_thread.is_running - def get_sites_for_project(self, project_name=None): + def get_anatomy(self, project_name): """ - Checks if sync is enabled globally and on project. - In that case return local and remote site + Get already created or newly created anatomy for project Args: - project_name (str): + project_name (string): - Returns: - (tuple): of strings, labels for (local_site, remote_site) + Return: + (Anatomy) """ - if self.enabled: - if project_name: - settings = get_project_settings(project_name) - else: - settings = get_current_project_settings() - - sync_server_presets = settings["global"]["sync_server"]["config"] - if settings["global"]["sync_server"]["enabled"]: - local_site = sync_server_presets.get("active_site", - "studio").strip() - remote_site = sync_server_presets.get("remote_site") - - return local_site, remote_site - - return self.DEFAULT_SITE, None + return self._anatomies.get('project_name') or Anatomy(project_name) def get_synced_presets(self): """ @@ -243,7 +431,14 @@ class SyncServer(PypeModule, ITrayModule): Returns: (dict): of settings, keys are project names """ + if self.presets: # presets set already, do not call again and again + return self.presets + sync_presets = {} + if not self.connection: + self.connection = AvalonMongoDB() + self.connection.install() + for collection in self.connection.database.collection_names(False): sync_settings = self.get_synced_preset(collection) if sync_settings: @@ -263,6 +458,11 @@ class SyncServer(PypeModule, ITrayModule): (dict): settings dictionary for the enabled project, empty if no settings or sync is disabled """ + # presets set already, do not call again and again + # self.log.debug("project preset {}".format(self.presets)) + if self.presets and self.presets.get(project_name): + return self.presets.get(project_name) + settings = get_project_settings(project_name) sync_settings = settings.get("global")["sync_server"] if not sync_settings: @@ -285,11 +485,16 @@ class SyncServer(PypeModule, ITrayModule): retries count etc.) """ self.active_sites = {} + initiated_handlers = {} for project_name, project_setting in settings.items(): for site_name, config in project_setting.get("sites").items(): - handler = lib.factory.get_provider(config["provider"], - site_name, - presets=config) + handler = initiated_handlers.get(config["provider"]) + if not handler: + handler = lib.factory.get_provider(config["provider"], + site_name, + presets=config) + initiated_handlers[config["provider"]] = handler + if handler.is_active(): if not self.active_sites.get('project_name'): self.active_sites[project_name] = [] @@ -315,6 +520,28 @@ class SyncServer(PypeModule, ITrayModule): """ return self.active_sites[project_name] + def get_local_site(self, project_name): + """ + Returns active (mine) site for 'project_name' from settings + """ + return self.get_synced_preset(project_name)['config']['active_site'] + + def get_remote_site(self, project_name): + """ + Returns remote (theirs) site for 'project_name' from settings + """ + return self.get_synced_preset(project_name)['config']['remote_site'] + + def get_provider_for_site(self, project_name, site): + """ + Return provider name for site. + """ + site_preset = self.get_synced_preset(project_name)["sites"].get(site) + if site_preset: + return site_preset["provider"] + + return "NA" + @time_function def get_sync_representations(self, collection, active_site, remote_site): """ @@ -385,7 +612,7 @@ class SyncServer(PypeModule, ITrayModule): return representations - def check_status(self, file, provider_name, config_preset): + def check_status(self, file, local_site, remote_site, config_preset): """ Check synchronization status for single 'file' of single 'representation' by single 'provider'. @@ -396,7 +623,8 @@ class SyncServer(PypeModule, ITrayModule): Args: file (dictionary): of file from representation in Mongo - provider_name (string): - gdrive etc. + local_site (string): - local side of compare (usually 'studio') + remote_site (string): - gdrive etc. config_preset (dict): config about active site, retries Returns: (string) - one of SyncStatus @@ -404,20 +632,17 @@ class SyncServer(PypeModule, ITrayModule): sites = file.get("sites") or [] # if isinstance(sites, list): # temporary, old format of 'sites' # return SyncStatus.DO_NOTHING - _, provider_rec = self._get_provider_rec(sites, provider_name) or {} - if provider_rec: # sync remote target - created_dt = provider_rec.get("created_dt") + _, remote_rec = self._get_site_rec(sites, remote_site) or {} + if remote_rec: # sync remote target + created_dt = remote_rec.get("created_dt") if not created_dt: - tries = self._get_tries_count_from_rec(provider_rec) + tries = self._get_tries_count_from_rec(remote_rec) # file will be skipped if unsuccessfully tried over threshold # error metadata needs to be purged manually in DB to reset if tries < int(config_preset["retry_cnt"]): return SyncStatus.DO_UPLOAD else: - _, local_rec = self._get_provider_rec( - sites, - config_preset["active_site"]) or {} - + _, local_rec = self._get_site_rec(sites, local_site) or {} if not local_rec or not local_rec.get("created_dt"): tries = self._get_tries_count_from_rec(local_rec) # file will be skipped if unsuccessfully tried over @@ -439,6 +664,10 @@ class SyncServer(PypeModule, ITrayModule): Updates MongoDB, fills in id of file from provider (ie. file_id from GDrive), 'created_dt' - time of upload + 'provider_name' doesn't have to match to 'site_name', single + provider (GDrive) might have multiple sites ('projectA', + 'projectB') + Args: collection (str): source collection file (dictionary): of file from representation in Mongo @@ -460,8 +689,9 @@ class SyncServer(PypeModule, ITrayModule): remote_file = self._get_remote_file_path(file, handler.get_roots_config() ) - local_root = representation.get("context", {}).get("root") - local_file = self._get_local_file_path(file, local_root) + + local_file = self.get_local_file_path(collection, + file.get("path", "")) target_folder = os.path.dirname(remote_file) folder_id = handler.create_folder(target_folder) @@ -470,7 +700,8 @@ class SyncServer(PypeModule, ITrayModule): err = "Folder {} wasn't created. Check permissions.".\ format(target_folder) raise NotADirectoryError(err) - _, remote_site = self.get_sites_for_project(collection) + + remote_site = self.get_remote_site(collection) loop = asyncio.get_running_loop() file_id = await loop.run_in_executor(None, handler.upload_file, @@ -506,28 +737,27 @@ class SyncServer(PypeModule, ITrayModule): with self.lock: handler = lib.factory.get_provider(provider_name, site_name, tree=tree, presets=preset) - remote_file = self._get_remote_file_path(file, - handler.get_roots_config() - ) - local_root = representation.get("context", {}).get("root") - local_file = self._get_local_file_path(file, local_root) + remote_file_path = self._get_remote_file_path( + file, handler.get_roots_config()) - local_folder = os.path.dirname(local_file) + local_file_path = self.get_local_file_path(collection, + file.get("path", "")) + local_folder = os.path.dirname(local_file_path) os.makedirs(local_folder, exist_ok=True) - local_site, _ = self.get_sites_for_project(collection) + local_site = self.get_local_site(collection) loop = asyncio.get_running_loop() file_id = await loop.run_in_executor(None, handler.download_file, - remote_file, - local_file, - False, + remote_file_path, + local_file_path, self, collection, file, representation, - local_site + local_site, + True ) return file_id @@ -553,34 +783,32 @@ class SyncServer(PypeModule, ITrayModule): representation_id = representation.get("_id") file_id = file.get("_id") query = { - "_id": representation_id, - "files._id": file_id + "_id": representation_id } - file_index, _ = self._get_file_info(representation.get('files', []), - file_id) - site_index, _ = self._get_provider_rec(file.get('sites', []), - site) + update = {} if new_file_id: - update["$set"] = self._get_success_dict(file_index, site_index, - new_file_id) + update["$set"] = self._get_success_dict(new_file_id) # reset previous errors if any - update["$unset"] = self._get_error_dict(file_index, site_index, - "", "", "") + update["$unset"] = self._get_error_dict("", "", "") elif progress is not None: - update["$set"] = self._get_progress_dict(file_index, site_index, - progress) + update["$set"] = self._get_progress_dict(progress) else: tries = self._get_tries_count(file, site) tries += 1 - update["$set"] = self._get_error_dict(file_index, site_index, - error, tries) + update["$set"] = self._get_error_dict(error, tries) - self.connection.Session["AVALON_PROJECT"] = collection - self.connection.update_one( + arr_filter = [ + {'s.name': site}, + {'f._id': ObjectId(file_id)} + ] + + self.connection.database[collection].update_one( query, - update + update, + upsert=True, + array_filters=arr_filter ) if progress is not None: @@ -593,8 +821,9 @@ class SyncServer(PypeModule, ITrayModule): error_str = '' source_file = file.get("path", "") - log.debug("File {source_file} process {status} {error_str}". - format(status=status, + log.debug("File for {} - {source_file} process {status} {error_str}". + format(representation_id, + status=status, source_file=source_file, error_str=error_str)) @@ -618,13 +847,14 @@ class SyncServer(PypeModule, ITrayModule): return -1, None - def _get_provider_rec(self, sites, provider): + def _get_site_rec(self, sites, site_name): """ - Return record from list of records which name matches to 'provider' + Return record from list of records which name matches to + 'remote_site_name' Args: sites (list): of dictionaries - provider (string): 'local_XXX', 'gdrive' + site_name (string): 'local_XXX', 'gdrive' Returns: (int, dictionary): index from list and record with metadata @@ -632,26 +862,39 @@ class SyncServer(PypeModule, ITrayModule): OR (-1, None) if not present """ for index, rec in enumerate(sites): - if rec.get("name") == provider: + if rec.get("name") == site_name: return index, rec return -1, None def reset_provider_for_file(self, collection, representation_id, - file_id, side): + side=None, file_id=None, site_name=None, + remove=False, pause=None): """ Reset information about synchronization for particular 'file_id' and provider. Useful for testing or forcing file to be reuploaded. + + 'side' and 'site_name' are disjunctive. + + 'side' is used for resetting local or remote side for + current user for repre. + + 'site_name' is used to set synchronization for particular site. + Should be used when repre should be synced to new site. + Args: collection (string): name of project (eg. collection) in DB representation_id(string): _id of representation file_id (string): file _id in representation side (string): local or remote side + site_name (string): for adding new site + remove (bool): if True remove site altogether + pause (bool or None): if True - pause, False - unpause + Returns: - None + throws ValueError """ - # TODO - implement reset for ALL files or ALL sites query = { "_id": ObjectId(representation_id) } @@ -660,31 +903,236 @@ class SyncServer(PypeModule, ITrayModule): if not representation: raise ValueError("Representation {} not found in {}". format(representation_id, collection)) + if side and site_name: + raise ValueError("Misconfiguration, only one of side and " + + "site_name arguments should be passed.") - local_site, remote_site = self.get_sites_for_project(collection) - if side == 'local': - site_name = local_site + local_site = self.get_local_site(collection) + remote_site = self.get_remote_site(collection) + + if side: + if side == 'local': + site_name = local_site + else: + site_name = remote_site + + elem = {"name": site_name} + + if file_id: # reset site for particular file + self._reset_site_for_file(collection, query, + elem, file_id, site_name) + elif side: # reset site for whole representation + self._reset_site(collection, query, elem, site_name) + elif remove: # remove site for whole representation + self._remove_site(collection, query, representation, site_name) + elif pause is not None: + self._pause_unpause_site(collection, query, + representation, site_name, pause) + else: # add new site to all files for representation + self._add_site(collection, query, representation, elem, site_name) + + def _update_site(self, collection, query, update, arr_filter): + """ + Auxiliary method to call update_one function on DB + + Used for refactoring ugly reset_provider_for_file + """ + self.connection.database[collection].update_one( + query, + update, + upsert=True, + array_filters=arr_filter + ) + + def _reset_site_for_file(self, collection, query, + elem, file_id, site_name): + """ + Resets 'site_name' for 'file_id' on representation in 'query' on + 'collection' + """ + update = { + "$set": {"files.$[f].sites.$[s]": elem} + } + arr_filter = [ + {'s.name': site_name}, + {'f._id': ObjectId(file_id)} + ] + + self._update_site(collection, query, update, arr_filter) + + def _reset_site(self, collection, query, elem, site_name): + """ + Resets 'site_name' for all files of representation in 'query' + """ + update = { + "$set": {"files.$[].sites.$[s]": elem} + } + + arr_filter = [ + {'s.name': site_name} + ] + + self._update_site(collection, query, update, arr_filter) + + def _remove_site(self, collection, query, representation, site_name): + """ + Removes 'site_name' for 'representation' in 'query' + + Throws ValueError if 'site_name' not found on 'representation' + """ + found = False + for file in representation.pop().get("files"): + for site in file.get("sites"): + if site["name"] == site_name: + found = True + break + if not found: + msg = "Site {} not found".format(site_name) + log.info(msg) + raise ValueError(msg) + + update = { + "$pull": {"files.$[].sites": {"name": site_name}} + } + arr_filter = [] + + self._update_site(collection, query, update, arr_filter) + + def _pause_unpause_site(self, collection, query, + representation, site_name, pause): + """ + Pauses/unpauses all files for 'representation' based on 'pause' + + Throws ValueError if 'site_name' not found on 'representation' + """ + found = False + site = None + for file in representation.pop().get("files"): + for site in file.get("sites"): + if site["name"] == site_name: + found = True + break + if not found: + msg = "Site {} not found".format(site_name) + log.info(msg) + raise ValueError(msg) + + if pause: + site['paused'] = pause else: - site_name = remote_site + if site.get('paused'): + site.pop('paused') - files = representation[0].get('files', []) - file_index, _ = self._get_file_info(files, - file_id) - site_index, _ = self._get_provider_rec(files[file_index]. - get('sites', []), - site_name) - if file_index >= 0 and site_index >= 0: - elem = {"name": site_name} - update = { - "$set": {"files.{}.sites.{}".format(file_index, site_index): - elem - } + update = { + "$set": {"files.$[].sites.$[s]": site} + } + + arr_filter = [ + {'s.name': site_name} + ] + + self._update_site(collection, query, update, arr_filter) + + def _add_site(self, collection, query, representation, elem, site_name): + """ + Adds 'site_name' to 'representation' on 'collection' + + Throws ValueError if already present + """ + for file in representation.pop().get("files"): + for site in file.get("sites"): + if site["name"] == site_name: + msg = "Site {} already present".format(site_name) + log.info(msg) + raise ValueError(msg) + + update = { + "$push": {"files.$[].sites": elem} + } + + arr_filter = [] + + self._update_site(collection, query, update, arr_filter) + + def _remove_local_file(self, collection, representation_id, site_name): + """ + Removes all local files for 'site_name' of 'representation_id' + + Args: + collection (string): project name (must match DB) + representation_id (string): MongoDB _id value + site_name (string): name of configured and active site + + Returns: + only logs, catches IndexError and OSError + """ + my_local_site = self.get_my_local_site(collection) + if my_local_site != site_name: + self.log.warning("Cannot remove non local file for {}". + format(site_name)) + return + + handler = None + sites = self.get_active_sites(collection) + for provider_name, provider_site_name in sites: + if provider_site_name == site_name: + handler = lib.factory.get_provider(provider_name, + site_name) + break + + if handler and isinstance(handler, LocalDriveHandler): + query = { + "_id": ObjectId(representation_id) } - self.connection.database[collection].update_one( - query, - update - ) + representation = list( + self.connection.database[collection].find(query)) + if not representation: + self.log.debug("No repre {} found".format( + representation_id)) + return + + representation = representation.pop() + for file in representation.get("files"): + try: + self.log.debug("Removing {}".format(file["path"])) + local_file = self.get_local_file_path(collection, + file.get("path", "")) + os.remove(local_file) + except IndexError: + msg = "No file set for {}".format(representation_id) + self.log.debug(msg) + raise ValueError(msg) + except OSError: + msg = "File {} cannot be removed".format(file["path"]) + self.log.warning(msg) + raise ValueError(msg) + + try: + folder = os.path.dirname(local_file) + os.rmdir(folder) + except OSError: + msg = "folder {} cannot be removed".format(folder) + self.log.warning(msg) + raise ValueError(msg) + + def get_my_local_site(self, project_name=None): + """ + Returns name of current user local_site + + Args: + project_name (string): + Returns: + (string) + """ + if project_name: + settings = get_project_settings(project_name) + else: + settings = get_current_project_settings() + + sync_server_presets = settings["global"]["sync_server"]["config"] + + return sync_server_presets.get("local_id") def get_loop_delay(self, project_name): """ @@ -699,44 +1147,35 @@ class SyncServer(PypeModule, ITrayModule): """Show dialog to enter credentials""" self.widget.show() - def _get_success_dict(self, file_index, site_index, new_file_id): + def _get_success_dict(self, new_file_id): """ Provide success metadata ("id", "created_dt") to be stored in Db. Used in $set: "DICT" part of query. Sites are array inside of array(file), so real indexes for both file and site are needed for upgrade in DB. Args: - file_index: (int) - index of modified file - site_index: (int) - index of modified site of modified file new_file_id: id of created file Returns: (dictionary) """ - val = {"files.{}.sites.{}.id".format(file_index, site_index): - new_file_id, - "files.{}.sites.{}.created_dt".format(file_index, site_index): - datetime.utcnow()} + val = {"files.$[f].sites.$[s].id": new_file_id, + "files.$[f].sites.$[s].created_dt": datetime.now()} return val - def _get_error_dict(self, file_index, site_index, - error="", tries="", progress=""): + def _get_error_dict(self, error="", tries="", progress=""): """ Provide error metadata to be stored in Db. Used for set (error and tries provided) or unset mode. Args: - file_index: (int) - index of modified file - site_index: (int) - index of modified site of modified file error: (string) - message tries: how many times failed Returns: (dictionary) """ - val = {"files.{}.sites.{}.last_failed_dt". - format(file_index, site_index): datetime.utcnow(), - "files.{}.sites.{}.error".format(file_index, site_index): error, - "files.{}.sites.{}.tries".format(file_index, site_index): tries, - "files.{}.sites.{}.progress".format(file_index, site_index): - progress + val = {"files.$[f].sites.$[s].last_failed_dt": datetime.now(), + "files.$[f].sites.$[s].error": error, + "files.$[f].sites.$[s].tries": tries, + "files.$[f].sites.$[s].progress": progress } return val @@ -761,41 +1200,54 @@ class SyncServer(PypeModule, ITrayModule): Returns: (int) - number of failed attempts """ - _, rec = self._get_provider_rec(file.get("sites", []), provider) + _, rec = self._get_site_rec(file.get("sites", []), provider) return rec.get("tries", 0) - def _get_progress_dict(self, file_index, site_index, progress): + def _get_progress_dict(self, progress): """ Provide progress metadata to be stored in Db. Used during upload/download for GUI to show. Args: - file_index: (int) - index of modified file - site_index: (int) - index of modified site of modified file progress: (float) - 0-1 progress of upload/download Returns: (dictionary) """ - val = {"files.{}.sites.{}.progress". - format(file_index, site_index): progress - } + val = {"files.$[f].sites.$[s].progress": progress} return val - def _get_local_file_path(self, file, local_root): + def get_local_file_path(self, collection, path): """ Auxiliary function for replacing rootless path with real path + Works with multi roots. + If root definition is not found in Settings, anatomy is used + Args: - file (dictionary): file info, get 'path' to file with {root} - local_root (string): value of {root} for local projects + collection (string): project name + path (dictionary): 'path' to file with {root} Returns: (string) - absolute path on local system """ - if not local_root: - raise ValueError("Unknown local root for file {}") - path = file.get("path", "") + local_active_site = self.get_local_site(collection) + sites = self.get_synced_preset(collection)["sites"] + root_config = sites[local_active_site]["root"] - return path.format(**{"root": local_root}) + if not root_config.get("root"): + root_config = {"root": root_config} + + try: + path = path.format(**root_config) + except KeyError: + try: + anatomy = self.get_anatomy(collection) + path = anatomy.fill_root(path) + except KeyError: + msg = "Error in resolving local root from anatomy" + self.log.error(msg) + raise ValueError(msg) + + return path def _get_remote_file_path(self, file, root_config): """ @@ -810,8 +1262,12 @@ class SyncServer(PypeModule, ITrayModule): path = file.get("path", "") if not root_config.get("root"): root_config = {"root": root_config} - path = path.format(**root_config) - return path + + try: + return path.format(**root_config) + except KeyError: + msg = "Error in resolving remote root, unknown key" + self.log.error(msg) def _get_retries_arr(self, project_name): """ @@ -875,11 +1331,14 @@ class SyncServerThread(threading.Thread): """ try: - while self.is_running: + while self.is_running and not self.module.is_paused(): import time start_time = None for collection, preset in self.module.get_synced_presets().\ items(): + if self.module.is_project_paused(collection): + continue + start_time = time.time() sync_repres = self.module.get_sync_representations( collection, @@ -887,7 +1346,8 @@ class SyncServerThread(threading.Thread): preset.get('config')["remote_site"] ) - local = preset.get('config')["active_site"] + local_site = preset.get('config')["active_site"] + remote_site = preset.get('config')["remote_site"] task_files_to_process = [] files_processed_info = [] # process only unique file paths in one batch @@ -896,69 +1356,73 @@ class SyncServerThread(threading.Thread): # upload process can find already uploaded file and # reuse same id processed_file_path = set() - for check_site in self.module.get_active_sites(collection): - provider, site = check_site - site_preset = preset.get('sites')[site] - handler = lib.factory.get_provider(provider, - site, - presets=site_preset) - limit = lib.factory.get_provider_batch_limit(provider) - # first call to get_provider could be expensive, its - # building folder tree structure in memory - # call only if needed, eg. DO_UPLOAD or DO_DOWNLOAD - for sync in sync_repres: - if limit <= 0: - continue - files = sync.get("files") or [] - if files: - for file in files: - # skip already processed files - file_path = file.get('path', '') - if file_path in processed_file_path: - continue - status = self.module.check_status( - file, - provider, - preset.get('config')) - if status == SyncStatus.DO_UPLOAD: - tree = handler.get_tree() - limit -= 1 - task = asyncio.create_task( - self.module.upload(collection, - file, - sync, - provider, - site, - tree, - site_preset)) - task_files_to_process.append(task) - # store info for exception handlingy - files_processed_info.append((file, - sync, - site, - collection - )) - processed_file_path.add(file_path) - if status == SyncStatus.DO_DOWNLOAD: - tree = handler.get_tree() - limit -= 1 - task = asyncio.create_task( - self.module.download(collection, - file, + site_preset = preset.get('sites')[remote_site] + remote_provider = site_preset['provider'] + handler = lib.factory.get_provider(remote_provider, + remote_site, + presets=site_preset) + limit = lib.factory.get_provider_batch_limit( + site_preset['provider']) + # first call to get_provider could be expensive, its + # building folder tree structure in memory + # call only if needed, eg. DO_UPLOAD or DO_DOWNLOAD + for sync in sync_repres: + if self.module.\ + is_representation_paused(sync['_id']): + continue + if limit <= 0: + continue + files = sync.get("files") or [] + if files: + for file in files: + # skip already processed files + file_path = file.get('path', '') + if file_path in processed_file_path: + continue + status = self.module.check_status( + file, + local_site, + remote_site, + preset.get('config')) + if status == SyncStatus.DO_UPLOAD: + tree = handler.get_tree() + limit -= 1 + task = asyncio.create_task( + self.module.upload(collection, + file, + sync, + remote_provider, + remote_site, + tree, + site_preset)) + task_files_to_process.append(task) + # store info for exception handlingy + files_processed_info.append((file, sync, - provider, - site, - tree, - site_preset)) - task_files_to_process.append(task) + remote_site, + collection + )) + processed_file_path.add(file_path) + if status == SyncStatus.DO_DOWNLOAD: + tree = handler.get_tree() + limit -= 1 + task = asyncio.create_task( + self.module.download(collection, + file, + sync, + remote_provider, + remote_site, + tree, + site_preset)) + task_files_to_process.append(task) - files_processed_info.append((file, - sync, - local, - collection - )) - processed_file_path.add(file_path) + files_processed_info.append((file, + sync, + local_site, + collection + )) + processed_file_path.add(file_path) log.debug("Sync tasks count {}". format(len(task_files_to_process))) diff --git a/pype/modules/sync_server/tray/app.py b/pype/modules/sync_server/tray/app.py index 2a35f5341f..2adec8382b 100644 --- a/pype/modules/sync_server/tray/app.py +++ b/pype/modules/sync_server/tray/app.py @@ -2,11 +2,17 @@ from Qt import QtWidgets, QtCore, QtGui from Qt.QtCore import Qt import attr import os + +import sys +import subprocess + from pype.tools.settings import ( ProjectListWidget, style ) + from avalon.tools.delegates import PrettyTimeDelegate, pretty_timestamp +from bson.objectid import ObjectId from pype.lib import PypeLogger @@ -32,6 +38,7 @@ class SyncServerWindow(QtWidgets.QDialog): def __init__(self, sync_server, parent=None): super(SyncServerWindow, self).__init__(parent) + self.sync_server = sync_server self.setWindowFlags(QtCore.Qt.Window) self.setFocusPolicy(QtCore.Qt.StrongFocus) @@ -39,21 +46,33 @@ class SyncServerWindow(QtWidgets.QDialog): self.setWindowIcon(QtGui.QIcon(style.app_icon_path())) self.resize(1400, 800) + self.timer = QtCore.QTimer() + self.timer.timeout.connect(self._hide_message) + body = QtWidgets.QWidget(self) footer = QtWidgets.QWidget(self) footer.setFixedHeight(20) - container = QtWidgets.QWidget() - projects = SyncProjectListWidget(sync_server, self) - projects.refresh() # force selection of default - repres = SyncRepresentationWidget(sync_server, - project=projects.current_project, - parent=self) + left_column = QtWidgets.QWidget(body) + left_column_layout = QtWidgets.QVBoxLayout(left_column) + self.projects = SyncProjectListWidget(sync_server, self) + self.projects.refresh() # force selection of default + left_column_layout.addWidget(self.projects) + self.pause_btn = QtWidgets.QPushButton("Pause server") + + left_column_layout.addWidget(self.pause_btn) + left_column.setLayout(left_column_layout) + + repres = SyncRepresentationWidget( + sync_server, + project=self.projects.current_project, + parent=self) + container = QtWidgets.QWidget() container_layout = QtWidgets.QHBoxLayout(container) container_layout.setContentsMargins(0, 0, 0, 0) split = QtWidgets.QSplitter() - split.addWidget(projects) + split.addWidget(left_column) split.addWidget(repres) split.setSizes([180, 950, 200]) container_layout.addWidget(split) @@ -64,12 +83,12 @@ class SyncServerWindow(QtWidgets.QDialog): body_layout.addWidget(container) body_layout.setContentsMargins(0, 0, 0, 0) - message = QtWidgets.QLabel(footer) - message.hide() + self.message = QtWidgets.QLabel(footer) + self.message.hide() footer_layout = QtWidgets.QVBoxLayout(footer) - footer_layout.addWidget(message) - footer_layout.setContentsMargins(0, 0, 0, 0) + footer_layout.addWidget(self.message) + footer_layout.setContentsMargins(20, 0, 0, 0) layout = QtWidgets.QVBoxLayout(self) layout.addWidget(body) @@ -78,9 +97,42 @@ class SyncServerWindow(QtWidgets.QDialog): self.setLayout(body_layout) self.setWindowTitle("Sync Server") - projects.project_changed.connect( + self.projects.project_changed.connect( lambda: repres.table_view.model().set_project( - projects.current_project)) + self.projects.current_project)) + + self.pause_btn.clicked.connect(self._pause) + repres.message_generated.connect(self._update_message) + + def _pause(self): + if self.sync_server.is_paused(): + self.sync_server.unpause_server() + self.pause_btn.setText("Pause server") + else: + self.sync_server.pause_server() + self.pause_btn.setText("Unpause server") + self.projects.refresh() + + def _update_message(self, value): + """ + Update and show message in the footer + """ + self.message.setText(value) + if self.message.isVisible(): + self.message.repaint() + else: + self.message.show() + msec_delay = 3000 + self.timer.start(msec_delay) + + def _hide_message(self): + """ + Hide message in footer + + Called automatically by self.timer after a while + """ + self.message.setText("") + self.message.hide() class SyncProjectListWidget(ProjectListWidget): @@ -91,6 +143,12 @@ class SyncProjectListWidget(ProjectListWidget): def __init__(self, sync_server, parent): super(SyncProjectListWidget, self).__init__(parent) self.sync_server = sync_server + self.project_list.setContextMenuPolicy(QtCore.Qt.CustomContextMenu) + self.project_list.customContextMenuRequested.connect( + self._on_context_menu) + self.project_name = None + self.local_site = None + self.icons = {} def validate_context_change(self): return True @@ -100,7 +158,13 @@ class SyncProjectListWidget(ProjectListWidget): model.clear() for project_name in self.sync_server.get_synced_presets().keys(): - model.appendRow(QtGui.QStandardItem(project_name)) + if self.sync_server.is_paused() or \ + self.sync_server.is_project_paused(project_name): + icon = self._get_icon("paused") + else: + icon = self._get_icon("synced") + + model.appendRow(QtGui.QStandardItem(icon, project_name)) if len(self.sync_server.get_synced_presets().keys()) == 0: model.appendRow(QtGui.QStandardItem("No project configured")) @@ -112,6 +176,83 @@ class SyncProjectListWidget(ProjectListWidget): self.current_project = self.project_list.model().item(0). \ data(QtCore.Qt.DisplayRole) + self.local_site = self.sync_server.get_local_site(project_name) + + def _get_icon(self, status): + if not self.icons.get(status): + resource_path = os.path.dirname(__file__) + resource_path = os.path.join(resource_path, "..", + "resources") + pix_url = "{}/{}.png".format(resource_path, status) + icon = QtGui.QIcon(pix_url) + self.icons[status] = icon + else: + icon = self.icons[status] + return icon + + def _on_context_menu(self, point): + point_index = self.project_list.indexAt(point) + if not point_index.isValid(): + return + + self.project_name = point_index.data(QtCore.Qt.DisplayRole) + + menu = QtWidgets.QMenu() + actions_mapping = {} + + action = None + if self.sync_server.is_project_paused(self.project_name): + action = QtWidgets.QAction("Unpause") + actions_mapping[action] = self._unpause + else: + action = QtWidgets.QAction("Pause") + actions_mapping[action] = self._pause + menu.addAction(action) + + if self.local_site == self.sync_server.get_my_local_site( + self.project_name): + action = QtWidgets.QAction("Clear local project") + actions_mapping[action] = self._clear_project + menu.addAction(action) + + result = menu.exec_(QtGui.QCursor.pos()) + if result: + to_run = actions_mapping[result] + if to_run: + to_run() + + def _pause(self): + if self.project_name: + self.sync_server.pause_project(self.project_name) + self.project_name = None + self.refresh() + + def _unpause(self): + if self.project_name: + self.sync_server.unpause_project(self.project_name) + self.project_name = None + self.refresh() + + def _clear_project(self): + if self.project_name: + self.sync_server.clear_project(self.project_name, self.local_site) + self.project_name = None + self.refresh() + +class ProjectModel(QtCore.QAbstractListModel): + def __init__(self, *args, projects=None, **kwargs): + super(ProjectModel, self).__init__(*args, **kwargs) + self.projects = projects or [] + + def data(self, index, role): + if role == Qt.DisplayRole: + # See below for the data structure. + status, text = self.projects[index.row()] + # Return the todo text only. + return text + + def rowCount(self, index): + return len(self.todos) class SyncRepresentationWidget(QtWidgets.QWidget): """ @@ -119,6 +260,7 @@ class SyncRepresentationWidget(QtWidgets.QWidget): settings 'local_site' and 'remote_site'. """ active_changed = QtCore.Signal() # active index changed + message_generated = QtCore.Signal(str) default_widths = ( ("asset", 210), @@ -141,6 +283,8 @@ class SyncRepresentationWidget(QtWidgets.QWidget): self.sync_server = sync_server self._selected_id = None # keep last selected _id + self.representation_id = None + self.site_name = None # to pause/unpause representation self.filter = QtWidgets.QLineEdit() self.filter.setPlaceholderText("Filter representations..") @@ -204,7 +348,8 @@ class SyncRepresentationWidget(QtWidgets.QWidget): def _selection_changed(self, new_selection): index = self.selection_model.currentIndex() - self._selected_id = self.table_view.model().data(index, Qt.UserRole) + self._selected_id = \ + self.table_view.model().data(index, Qt.UserRole) def _set_selection(self): """ @@ -238,10 +383,192 @@ class SyncRepresentationWidget(QtWidgets.QWidget): if not point_index.isValid(): return + self.item = self.table_view.model()._data[point_index.row()] + self.representation_id = self.item._id + log.debug("menu representation _id:: {}". + format(self.representation_id)) + + menu = QtWidgets.QMenu() + actions_mapping = {} + + action = QtWidgets.QAction("Open in explorer") + actions_mapping[action] = self._open_in_explorer + menu.addAction(action) + + local_site, local_progress = self.item.local_site.split() + remote_site, remote_progress = self.item.remote_site.split() + local_progress = float(local_progress) + remote_progress = float(remote_progress) + + # progress smaller then 1.0 --> in progress or queued + if local_progress < 1.0: + self.site_name = local_site + else: + self.site_name = remote_site + + if self.item.state in [STATUS[0], STATUS[2]]: + action = QtWidgets.QAction("Pause") + actions_mapping[action] = self._pause + menu.addAction(action) + + if self.item.state == STATUS[3]: + action = QtWidgets.QAction("Unpause") + actions_mapping[action] = self._unpause + menu.addAction(action) + + # if self.item.state == STATUS[1]: + # action = QtWidgets.QAction("Open error detail") + # actions_mapping[action] = self._show_detail + # menu.addAction(action) + + if remote_progress == 1.0: + action = QtWidgets.QAction("Reset local site") + actions_mapping[action] = self._reset_local_site + menu.addAction(action) + + if local_progress == 1.0: + action = QtWidgets.QAction("Reset remote site") + actions_mapping[action] = self._reset_remote_site + menu.addAction(action) + + if local_site != self.sync_server.DEFAULT_SITE: + action = QtWidgets.QAction("Completely remove from local") + actions_mapping[action] = self._remove_site + menu.addAction(action) + else: + action = QtWidgets.QAction("Mark for sync to local") + actions_mapping[action] = self._add_site + menu.addAction(action) + + if not actions_mapping: + action = QtWidgets.QAction("< No action >") + actions_mapping[action] = None + menu.addAction(action) + + result = menu.exec_(QtGui.QCursor.pos()) + if result: + to_run = actions_mapping[result] + if to_run: + to_run() + + self.table_view.model().refresh() + + def _pause(self): + self.sync_server.pause_representation(self.table_view.model()._project, + self.representation_id, + self.site_name) + self.site_name = None + self.message_generated.emit("Paused {}".format(self.representation_id)) + + def _unpause(self): + self.sync_server.unpause_representation( + self.table_view.model()._project, + self.representation_id, + self.site_name) + self.site_name = None + self.message_generated.emit("Unpaused {}".format( + self.representation_id)) + + # temporary here for testing, will be removed TODO + def _add_site(self): + log.info(self.representation_id) + project_name = self.table_view.model()._project + local_site_name = self.sync_server.get_my_local_site(project_name) + try: + self.sync_server.add_site( + self.table_view.model()._project, + self.representation_id, + local_site_name + ) + self.message_generated.emit( + "Site {} added for {}".format(local_site_name, + self.representation_id)) + except ValueError as exp: + self.message_generated.emit("Error {}".format(str(exp))) + + def _remove_site(self): + """ + Removes site record AND files. + + This is ONLY for representations stored on local site, which + cannot be same as SyncServer.DEFAULT_SITE. + + This could only happen when artist work on local machine, not + connected to studio mounted drives. + """ + log.info("Removing {}".format(self.representation_id)) + try: + self.sync_server.remove_site( + self.table_view.model()._project, + self.representation_id, + 'local_0', + True + ) + self.message_generated.emit("Site local_0 removed") + except ValueError as exp: + self.message_generated.emit("Error {}".format(str(exp))) + + def _reset_local_site(self): + """ + Removes errors or success metadata for particular file >> forces + redo of upload/download + """ + self.sync_server.reset_provider_for_file( + self.table_view.model()._project, + self.representation_id, + 'local' + ) + + def _reset_remote_site(self): + """ + Removes errors or success metadata for particular file >> forces + redo of upload/download + """ + self.sync_server.reset_provider_for_file( + self.table_view.model()._project, + self.representation_id, + 'remote' + ) + + def _open_in_explorer(self): + if not self.item: + return + + fpath = self.item.path + fpath = os.path.normpath(os.path.dirname(fpath)) + if os.path.isdir(fpath): + if 'win' in sys.platform: # windows + subprocess.Popen('explorer "%s"' % fpath) + elif sys.platform == 'darwin': # macOS + subprocess.Popen(['open', fpath]) + else: # linux + try: + subprocess.Popen(['xdg-open', fpath]) + except OSError: + raise OSError('unsupported xdg-open call??') + class SyncRepresentationModel(QtCore.QAbstractTableModel): - PAGE_SIZE = 19 - REFRESH_SEC = 5000 + """ + Model for summary of representations. + + Groups files information per representation. Allows sorting and + full text filtering. + + Allows pagination, most of heavy lifting is being done on DB side. + Single model matches to single collection. When project is changed, + model is reset and refreshed. + + Args: + sync_server (SyncServer) - object to call server operations (update + db status, set site status...) + header (list) - names of visible columns + project (string) - collection name, all queries must be called on + a specific collection + + """ + PAGE_SIZE = 20 # default page size to query for + REFRESH_SEC = 5000 # in seconds, requery DB for new status DEFAULT_SORT = { "updated_dt_remote": -1, "_id": 1 @@ -261,8 +588,6 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): "status" # state ] - numberPopulated = QtCore.Signal(int) - @attr.s class SyncRepresentation: """ @@ -283,6 +608,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): files_size = attr.ib(default=None) priority = attr.ib(default=None) state = attr.ib(default=None) + path = attr.ib(default=None) def __init__(self, sync_server, header, project=None): super(SyncRepresentationModel, self).__init__() @@ -290,7 +616,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): self._data = [] self._project = project self._rec_loaded = 0 - self._buffer = [] # stash one page worth of records (actually cursor) + self._total_records = 0 # how many documents query actually found self.filter = None self._initialized = False @@ -298,8 +624,8 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): self.sync_server = sync_server # TODO think about admin mode # this is for regular user, always only single local and single remote - self.local_site, self.remote_site = \ - self.sync_server.get_sites_for_project(self._project) + self.local_site = self.sync_server.get_local_site(self._project) + self.remote_site = self.sync_server.get_remote_site(self._project) self.projection = self.get_default_projection() @@ -307,8 +633,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): self.query = self.get_default_query() self.default_query = list(self.get_default_query()) - log.debug("!!! init query: {}".format(json.dumps(self.query, - indent=4))) + representations = self.dbcon.aggregate(self.query) self.refresh(representations) @@ -318,6 +643,12 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): @property def dbcon(self): + """ + Database object with preselected project (collection) to run DB + operations (find, aggregate). + + All queries should go through this (because of collection). + """ return self.sync_server.connection.database[self._project] def data(self, index, role): @@ -340,6 +671,12 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): return str(self._header[section]) def tick(self): + """ + Triggers refresh of model. + + Because of pagination, prepared (sorting, filtering) query needs + to be run on DB every X seconds. + """ self.refresh(representations=None, load_records=self._rec_loaded) self.timer.start(self.REFRESH_SEC) @@ -355,6 +692,25 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): return self._header.index(value) def refresh(self, representations=None, load_records=0): + """ + Reloads representations from DB if necessary, adds them to model. + + Runs periodically (every X seconds) or by demand (change of + sorting, filtering etc.) + + Emits 'modelReset' signal. + + Args: + representations (PaginationResult object): pass result of + aggregate query from outside - mostly for testing only + load_records (int) - enforces how many records should be + actually queried (scrolled a couple of times to list more + than single page of records) + """ + if self.sync_server.is_paused() or \ + self.sync_server.is_project_paused(self._project): + return + self.beginResetModel() self._data = [] self._rec_loaded = 0 @@ -368,7 +724,30 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): self.endResetModel() def _add_page_records(self, local_site, remote_site, representations): - for repre in representations: + """ + Process all records from 'representation' and add them to storage. + + Args: + local_site (str): name of local site (mine) + remote_site (str): name of cloud provider (theirs) + representations (Mongo Cursor) - mimics result set, 1 object + with paginatedResults array and totalCount array + """ + result = representations.next() + count = 0 + total_count = result.get("totalCount") + if total_count: + count = total_count.pop().get('count') + self._total_records = count + + local_provider = _translate_provider_for_icon(self.sync_server, + self._project, + local_site) + remote_provider = _translate_provider_for_icon(self.sync_server, + self._project, + remote_site) + + for repre in result.get("paginatedResults"): context = repre.get("context").pop() files = repre.get("files", []) if isinstance(files, dict): # aggregate returns dictionary @@ -387,37 +766,44 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): remote_updated = \ repre.get('updated_dt_remote').strftime("%Y%m%dT%H%M%SZ") - avg_progress_remote = repre.get('avg_progress_remote', '') - avg_progress_local = repre.get('avg_progress_local', '') + avg_progress_remote = _convert_progress( + repre.get('avg_progress_remote', '0')) + avg_progress_local = _convert_progress( + repre.get('avg_progress_local', '0')) + + if context.get("version"): + version = "v{:0>3d}".format(context.get("version")) + else: + version = "master" item = self.SyncRepresentation( repre.get("_id"), context.get("asset"), context.get("subset"), - "v{:0>3d}".format(context.get("version", 1)), + version, context.get("representation"), local_updated, remote_updated, - '{} {}'.format(local_site, avg_progress_local), - '{} {}'.format(remote_site, avg_progress_remote), + '{} {}'.format(local_provider, avg_progress_local), + '{} {}'.format(remote_provider, avg_progress_remote), repre.get("files_count", 1), repre.get("files_size", 0), 1, - STATUS[repre.get("status", -1)] + STATUS[repre.get("status", -1)], + self.sync_server.get_local_file_path(self._project, + files[0].get('path')) ) self._data.append(item) self._rec_loaded += 1 + def canFetchMore(self, index): """ Check if there are more records than currently loaded """ # 'skip' might be suboptimal when representation hits 500k+ - self._buffer = list(self.dbcon.aggregate(self.query)) - # log.info("!!! canFetchMore _rec_loaded::{} - {}".format( - # self._rec_loaded, len(self._buffer))) - return len(self._buffer) > self._rec_loaded + return self._total_records > self._rec_loaded def fetchMore(self, index): """ @@ -425,22 +811,21 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): Called when 'canFetchMore' returns true, which means there are more records in DB than loaded. - 'self._buffer' is used to stash cursor to limit requery """ log.debug("fetchMore") - # cursor.count() returns always total number, not only skipped + limit - remainder = len(self._buffer) - self._rec_loaded - items_to_fetch = min(self.PAGE_SIZE, remainder) + items_to_fetch = min(self._total_records - self._rec_loaded, + self.PAGE_SIZE) + self.query = self.get_default_query(self._rec_loaded) + representations = self.dbcon.aggregate(self.query) self.beginInsertRows(index, self._rec_loaded, self._rec_loaded + items_to_fetch - 1) - self._add_page_records(self.local_site, self.remote_site, self._buffer) + self._add_page_records(self.local_site, self.remote_site, + representations) self.endInsertRows() - self.numberPopulated.emit(items_to_fetch) # ?? - def sort(self, index, order): """ Summary sort per representation. @@ -486,6 +871,8 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): project (str): name of project """ self._project = project + self.local_site = self.sync_server.get_local_site(self._project) + self.remote_site = self.sync_server.get_remote_site(self._project) self.refresh() def get_index(self, id): @@ -505,7 +892,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): value = self.data(index, Qt.UserRole) if value == id: return index - return index + return None def get_default_query(self, limit=0): """ @@ -523,7 +910,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): 0 - in progress 1 - failed 2 - queued - 3 - paused (not implemented yet) + 3 - paused 4 - finished on both sides are calculated and must be calculated in DB because of @@ -601,12 +988,29 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): 'failed_local': { '$cond': [{'$size': "$order_local.last_failed_dt"}, 1, - 0]} + 0]}, + 'failed_local_tries': { + '$cond': [{'$size': '$order_local.tries'}, + {'$first': '$order_local.tries'}, + 0]}, + 'failed_remote_tries': { + '$cond': [{'$size': '$order_remote.tries'}, + {'$first': '$order_local.tries'}, + 0]}, + 'paused_remote': { + '$cond': [{'$size': "$order_remote.paused"}, + 1, + 0]}, + 'paused_local': { + '$cond': [{'$size': "$order_local.paused"}, + 1, + 0]}, }}, {'$group': { '_id': '$_id', # pass through context - same for representation 'context': {'$addToSet': '$context'}, + 'data': {'$addToSet': '$data'}, # pass through files as a list 'files': {'$addToSet': '$files'}, # count how many files @@ -618,13 +1022,22 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): # select last touch of file 'updated_dt_remote': {'$max': "$updated_dt_remote"}, 'failed_remote': {'$sum': '$failed_remote'}, - 'failed_local': {'$sum': '$failed_local'}, + 'failed_local': {'$sum': '$paused_remote'}, + 'failed_local_tries': {'$sum': '$failed_local_tries'}, + 'failed_remote_tries': {'$sum': '$failed_remote_tries'}, + 'paused_remote': {'$sum': '$paused_remote'}, + 'paused_local': {'$sum': '$paused_local'}, 'updated_dt_local': {'$max': "$updated_dt_local"} }}, - {"$limit": limit}, - {"$skip": self._rec_loaded}, {"$project": self.projection}, - {"$sort": self.sort} + {"$sort": self.sort}, + { + '$facet': { + 'paginatedResults': [{'$skip': self._rec_loaded}, + {'$limit': limit}], + 'totalCount': [{'$count': 'count'}] + } + } ] def _get_match_part(self): @@ -635,37 +1048,31 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): checked. If performance issues are found, '$text' and text indexes should be investigated. + + Fulltext searches in: + context.subset + context.asset + context.representation names AND _id (ObjectId) """ - if not self.filter: - return { + base_match = { "type": "representation", - 'files.sites': { - '$elemMatch': { - '$or': [ - {'name': self.local_site}, - {'name': self.remote_site} - ] - } - } - } + 'files.sites.name': {'$all': [self.local_site, + self.remote_site]} + } + if not self.filter: + return base_match else: regex_str = '.*{}.*'.format(self.filter) - return { - "type": "representation", - '$or': [ + base_match['$or'] = [ {'context.subset': {'$regex': regex_str, '$options': 'i'}}, {'context.asset': {'$regex': regex_str, '$options': 'i'}}, {'context.representation': {'$regex': regex_str, - '$options': 'i'}}], - 'files.sites': { - '$elemMatch': { - '$or': [ - {'name': self.local_site}, - {'name': self.remote_site} - ] - } - } - } + '$options': 'i'}}] + + if ObjectId.is_valid(self.filter): + base_match['$or'] = [{'_id': ObjectId(self.filter)}] + + return base_match def get_default_projection(self): """ @@ -681,6 +1088,7 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): "context.asset": 1, "context.version": 1, "context.representation": 1, + "data.path": 1, "files": 1, 'files_count': 1, "files_size": 1, @@ -688,19 +1096,28 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): 'avg_progress_local': 1, 'updated_dt_remote': 1, 'updated_dt_local': 1, + 'paused_remote': 1, + 'paused_local': 1, 'status': { '$switch': { 'branches': [ { 'case': { - '$or': [{'$eq': ['$avg_progress_remote', 0]}, - {'$eq': ['$avg_progress_local', 0]}]}, - 'then': 2 # Queued + '$or': ['$paused_remote', '$paused_local']}, + 'then': 3 # Paused }, { 'case': { - '$or': ['$failed_remote', '$failed_local']}, - 'then': 1 # Failed + '$or': [ + {'$gte': ['$failed_local_tries', 3]}, + {'$gte': ['$failed_remote_tries', 3]} + ]}, + 'then': 1}, + { + 'case': { + '$or': [{'$eq': ['$avg_progress_remote', 0]}, + {'$eq': ['$avg_progress_local', 0]}]}, + 'then': 2 # Queued }, { 'case': {'$or': [{'$and': [ @@ -714,10 +1131,6 @@ class SyncRepresentationModel(QtCore.QAbstractTableModel): ]}, 'then': 0 # In progress }, - { - 'case': {'$eq': ['dummy_placeholder', 'paused']}, - 'then': 3 # Paused - }, { 'case': {'$and': [ {'$eq': ['$avg_progress_remote', 1]}, @@ -754,11 +1167,11 @@ class SyncServerDetailWindow(QtWidgets.QDialog): body_layout.addWidget(container) body_layout.setContentsMargins(0, 0, 0, 0) - message = QtWidgets.QLabel() - message.hide() + self.message = QtWidgets.QLabel() + self.message.hide() footer_layout = QtWidgets.QVBoxLayout(footer) - footer_layout.addWidget(message) + footer_layout.addWidget(self.message) footer_layout.setContentsMargins(0, 0, 0, 0) layout = QtWidgets.QVBoxLayout(self) @@ -794,8 +1207,10 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): def __init__(self, sync_server, _id=None, project=None, parent=None): super(SyncRepresentationDetailWidget, self).__init__(parent) + log.debug("Representation_id:{}".format(_id)) self.representation_id = _id self.item = None # set to item that mouse was clicked over + self.project = project self.sync_server = sync_server @@ -905,19 +1320,23 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): menu = QtWidgets.QMenu() actions_mapping = {} + action = QtWidgets.QAction("Open in explorer") + actions_mapping[action] = self._open_in_explorer + menu.addAction(action) + if self.item.state == STATUS[1]: action = QtWidgets.QAction("Open error detail") actions_mapping[action] = self._show_detail menu.addAction(action) remote_site, remote_progress = self.item.remote_site.split() - if remote_progress == '1': + if float(remote_progress) == 1.0: action = QtWidgets.QAction("Reset local site") actions_mapping[action] = self._reset_local_site menu.addAction(action) local_site, local_progress = self.item.local_site.split() - if local_progress == '1': + if float(local_progress) == 1.0: action = QtWidgets.QAction("Reset remote site") actions_mapping[action] = self._reset_remote_site menu.addAction(action) @@ -941,8 +1360,9 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): self.sync_server.reset_provider_for_file( self.table_view.model()._project, self.representation_id, - self.item._id, - 'local') + 'local', + self.item._id) + self.table_view.model().refresh() def _reset_remote_site(self): """ @@ -952,13 +1372,43 @@ class SyncRepresentationDetailWidget(QtWidgets.QWidget): self.sync_server.reset_provider_for_file( self.table_view.model()._project, self.representation_id, - self.item._id, - 'remote') + 'remote', + self.item._id) + self.table_view.model().refresh() + + def _open_in_explorer(self): + if not self.item: + return + + fpath = self.item.path + fpath = os.path.normpath(os.path.dirname(fpath)) + + if os.path.isdir(fpath): + if 'win' in sys.platform: # windows + subprocess.Popen('explorer "%s"' % fpath) + elif sys.platform == 'darwin': # macOS + subprocess.Popen(['open', fpath]) + else: # linux + try: + subprocess.Popen(['xdg-open', fpath]) + except OSError: + raise OSError('unsupported xdg-open call??') class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): """ List of all syncronizable files per single representation. + + Used in detail window accessible after clicking on single repre in the + summary. + + Args: + sync_server (SyncServer) - object to call server operations (update + db status, set site status...) + header (list) - names of visible columns + _id (string) - MongoDB _id of representation + project (string) - collection name, all queries must be called on + a specific collection """ PAGE_SIZE = 30 # TODO add filter filename @@ -994,6 +1444,7 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): state = attr.ib(default=None) tries = attr.ib(default=None) error = attr.ib(default=None) + path = attr.ib(default=None) def __init__(self, sync_server, header, _id, project=None): super(SyncRepresentationDetailModel, self).__init__() @@ -1001,16 +1452,16 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): self._data = [] self._project = project self._rec_loaded = 0 + self._total_records = 0 # how many documents query actually found self.filter = None - self._buffer = [] # stash one page worth of records (actually cursor) self._id = _id self._initialized = False self.sync_server = sync_server # TODO think about admin mode # this is for regular user, always only single local and single remote - self.local_site, self.remote_site = \ - self.sync_server.get_sites_for_project(self._project) + self.local_site = self.sync_server.get_local_site(self._project) + self.remote_site = self.sync_server.get_remote_site(self._project) self.sort = self.DEFAULT_SORT @@ -1063,6 +1514,9 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): return str(self._header[section]) def refresh(self, representations=None, load_records=0): + if self.sync_server.is_paused(): + return + self.beginResetModel() self._data = [] self._rec_loaded = 0 @@ -1082,9 +1536,25 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): Args: local_site (str): name of local site (mine) remote_site (str): name of cloud provider (theirs) - representations (Mongo Cursor) + representations (Mongo Cursor) - mimics result set, 1 object + with paginatedResults array and totalCount array """ - for repre in representations: + # representations is a Cursor, get first + result = representations.next() + count = 0 + total_count = result.get("totalCount") + if total_count: + count = total_count.pop().get('count') + self._total_records = count + + local_provider = _translate_provider_for_icon(self.sync_server, + self._project, + local_site) + remote_provider = _translate_provider_for_icon(self.sync_server, + self._project, + remote_site) + + for repre in result.get("paginatedResults"): # log.info("!!! repre:: {}".format(repre)) files = repre.get("files", []) if isinstance(files, dict): # aggregate returns dictionary @@ -1102,8 +1572,10 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): repre.get('updated_dt_remote').strftime( "%Y%m%dT%H%M%SZ") - progress_remote = repre.get('progress_remote', '') - progress_local = repre.get('progress_local', '') + progress_remote = _convert_progress( + repre.get('progress_remote', '0')) + progress_local = _convert_progress( + repre.get('progress_local', '0')) errors = [] if repre.get('failed_remote_error'): @@ -1116,13 +1588,16 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): os.path.basename(file["path"]), local_updated, remote_updated, - '{} {}'.format(local_site, progress_local), - '{} {}'.format(remote_site, progress_remote), + '{} {}'.format(local_provider, progress_local), + '{} {}'.format(remote_provider, progress_remote), file.get('size', 0), 1, STATUS[repre.get("status", -1)], repre.get("tries"), - '\n'.join(errors) + '\n'.join(errors), + self.sync_server.get_local_file_path(self._project, + file.get('path')) + ) self._data.append(item) self._rec_loaded += 1 @@ -1132,8 +1607,7 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): Check if there are more records than currently loaded """ # 'skip' might be suboptimal when representation hits 500k+ - self._buffer = list(self.dbcon.aggregate(self.query)) - return len(self._buffer) > self._rec_loaded + return self._total_records > self._rec_loaded def fetchMore(self, index): """ @@ -1144,14 +1618,16 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): 'self._buffer' is used to stash cursor to limit requery """ log.debug("fetchMore") - # cursor.count() returns always total number, not only skipped + limit - remainder = len(self._buffer) - self._rec_loaded - items_to_fetch = min(self.PAGE_SIZE, remainder) - + items_to_fetch = min(self._total_records - self._rec_loaded, + self.PAGE_SIZE) + self.query = self.get_default_query(self._rec_loaded) + representations = self.dbcon.aggregate(self.query) self.beginInsertRows(index, self._rec_loaded, self._rec_loaded + items_to_fetch - 1) - self._add_page_records(self.local_site, self.remote_site, self._buffer) + + self._add_page_records(self.local_site, self.remote_site, + representations) self.endInsertRows() @@ -1194,7 +1670,7 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): value = self.data(index, Qt.UserRole) if value == id: return index - return index + return None def get_default_query(self, limit=0): """ @@ -1271,6 +1747,14 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): } ] }}, + 'paused_remote': { + '$cond': [{'$size': "$order_remote.paused"}, + 1, + 0]}, + 'paused_local': { + '$cond': [{'$size': "$order_local.paused"}, + 1, + 0]}, 'failed_remote': { '$cond': [{'$size': "$order_remote.last_failed_dt"}, 1, @@ -1297,10 +1781,15 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): ]} ]}} }}, - {"$limit": limit}, - {"$skip": self._rec_loaded}, {"$project": self.projection}, - {"$sort": self.sort} + {"$sort": self.sort}, + { + '$facet': { + 'paginatedResults': [{'$skip': self._rec_loaded}, + {'$limit': limit}], + 'totalCount': [{'$count': 'count'}] + } + } ] def _get_match_part(self): @@ -1339,12 +1828,26 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): 'progress_local': 1, 'updated_dt_remote': 1, 'updated_dt_local': 1, + 'paused_remote': 1, + 'paused_local': 1, 'failed_remote_error': 1, 'failed_local_error': 1, 'tries': 1, 'status': { '$switch': { 'branches': [ + { + 'case': { + '$or': ['$paused_remote', '$paused_local']}, + 'then': 3 # Paused + }, + { + 'case': { + '$and': [{'$or': ['$failed_remote', + '$failed_local']}, + {'$eq': ['$tries', 3]}]}, + 'then': 1 # Failed (3 tries) + }, { 'case': { '$or': [{'$eq': ['$progress_remote', 0]}, @@ -1368,10 +1871,6 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): ]}, 'then': 0 # In Progress }, - { - 'case': {'$eq': ['dummy_placeholder', 'paused']}, - 'then': 3 - }, { 'case': {'$and': [ {'$eq': ['$progress_remote', 1]}, @@ -1382,7 +1881,8 @@ class SyncRepresentationDetailModel(QtCore.QAbstractTableModel): ], 'default': -1 } - } + }, + 'data.path': 1 } @@ -1509,3 +2009,24 @@ class SizeDelegate(QtWidgets.QStyledItemDelegate): return "%3.1f%s%s" % (value, unit, suffix) value /= 1024.0 return "%.1f%s%s" % (value, 'Yi', suffix) + +def _convert_progress(value): + try: + progress = float(value) + except (ValueError, TypeError): + progress = 0.0 + + return progress + + +def _translate_provider_for_icon(sync_server, project, site): + """ + Get provider for 'site' + + This is used for getting icon, 'studio' should have different icon + then local sites, even the provider 'local_drive' is same + + """ + if site == sync_server.DEFAULT_SITE: + return sync_server.DEFAULT_SITE + return sync_server.get_provider_for_site(project, site) diff --git a/pype/plugins/publish/integrate_master_version.py b/pype/plugins/publish/integrate_master_version.py index d82c3be075..7d72bb26d4 100644 --- a/pype/plugins/publish/integrate_master_version.py +++ b/pype/plugins/publish/integrate_master_version.py @@ -298,6 +298,62 @@ class IntegrateMasterVersion(pyblish.api.InstancePlugin): repre["data"] = repre_data repre.pop("_id", None) + # Prepare paths of source and destination files + if len(published_files) == 1: + src_to_dst_file_paths.append( + (published_files[0], template_filled) + ) + else: + collections, remainders = clique.assemble(published_files) + if remainders or not collections or len(collections) > 1: + raise Exception(( + "Integrity error. Files of published representation " + "is combination of frame collections and single files." + "Collections: `{}` Single files: `{}`" + ).format(str(collections), + str(remainders))) + + src_col = collections[0] + + # Get head and tail for collection + frame_splitter = "_-_FRAME_SPLIT_-_" + anatomy_data["frame"] = frame_splitter + _anatomy_filled = anatomy.format(anatomy_data) + _template_filled = _anatomy_filled["master"]["path"] + head, tail = _template_filled.split(frame_splitter) + padding = int( + anatomy.templates["render"].get( + "frame_padding", + anatomy.templates["render"].get("padding") + ) + ) + + dst_col = clique.Collection( + head=head, padding=padding, tail=tail + ) + dst_col.indexes.clear() + dst_col.indexes.update(src_col.indexes) + for src_file, dst_file in zip(src_col, dst_col): + src_to_dst_file_paths.append( + (src_file, dst_file) + ) + + # replace original file name with master name in repre doc + for index in range(len(repre.get("files"))): + file = repre.get("files")[index] + file_name = os.path.basename(file.get('path')) + for src_file, dst_file in src_to_dst_file_paths: + src_file_name = os.path.basename(src_file) + if src_file_name == file_name: + repre["files"][index]["path"] = self._update_path( + anatomy, repre["files"][index]["path"], + src_file, dst_file) + + repre["files"][index]["hash"] = self._update_hash( + repre["files"][index]["hash"], + src_file_name, dst_file + ) + schema.validate(repre) repre_name_low = repre["name"].lower() @@ -333,46 +389,6 @@ class IntegrateMasterVersion(pyblish.api.InstancePlugin): InsertOne(repre) ) - # Prepare paths of source and destination files - if len(published_files) == 1: - src_to_dst_file_paths.append( - (published_files[0], template_filled) - ) - continue - - collections, remainders = clique.assemble(published_files) - if remainders or not collections or len(collections) > 1: - raise Exception(( - "Integrity error. Files of published representation " - "is combination of frame collections and single files." - "Collections: `{}` Single files: `{}`" - ).format(str(collections), str(remainders))) - - src_col = collections[0] - - # Get head and tail for collection - frame_splitter = "_-_FRAME_SPLIT_-_" - anatomy_data["frame"] = frame_splitter - _anatomy_filled = anatomy.format(anatomy_data) - _template_filled = _anatomy_filled["master"]["path"] - head, tail = _template_filled.split(frame_splitter) - padding = int( - anatomy.templates["render"].get( - "frame_padding", - anatomy.templates["render"].get("padding") - ) - ) - - dst_col = clique.Collection( - head=head, padding=padding, tail=tail - ) - dst_col.indexes.clear() - dst_col.indexes.update(src_col.indexes) - for src_file, dst_file in zip(src_col, dst_col): - src_to_dst_file_paths.append( - (src_file, dst_file) - ) - self.path_checks = [] # Copy(hardlink) paths of source and destination files @@ -533,3 +549,39 @@ class IntegrateMasterVersion(pyblish.api.InstancePlugin): "type": "representation" })) return (master_version, master_repres) + + def _update_path(self, anatomy, path, src_file, dst_file): + """ + Replaces source path with new master path + + 'path' contains original path with version, must be replaced with + 'master' path (with 'master' label and without version) + + Args: + anatomy (Anatomy) - to get rootless style of path + path (string) - path from DB + src_file (string) - original file path + dst_file (string) - master file path + """ + _, rootless = anatomy.find_root_template_from_path( + dst_file + ) + _, rtls_src = anatomy.find_root_template_from_path( + src_file + ) + return path.replace(rtls_src, rootless) + + def _update_hash(self, hash, src_file_name, dst_file): + """ + Updates hash value with proper master name + """ + src_file_name = self._get_name_without_ext( + src_file_name) + master_file_name = self._get_name_without_ext( + dst_file) + return hash.replace(src_file_name, master_file_name) + + def _get_name_without_ext(self, value): + file_name = os.path.basename(value) + file_name, _ = os.path.splitext(file_name) + return file_name diff --git a/pype/settings/defaults/project_settings/global.json b/pype/settings/defaults/project_settings/global.json index 5913277df3..c74cc9a2c5 100644 --- a/pype/settings/defaults/project_settings/global.json +++ b/pype/settings/defaults/project_settings/global.json @@ -180,7 +180,7 @@ } }, "sync_server": { - "enabled": false, + "enabled": true, "config": { "local_id": "local_0", "retry_cnt": "3", @@ -192,7 +192,23 @@ "gdrive": { "provider": "gdrive", "credentials_url": "", - "root": "/sync_testing/test" + "root": { + "work": "" + } + }, + "studio": { + "provider": "local_drive", + "credentials_url": "", + "root": { + "work": "" + } + }, + "local_0": { + "provider": "local_drive", + "credentials_url": "", + "root": { + "work": "" + } } } } diff --git a/pype/settings/entities/schemas/projects_schema/schema_project_syncserver.json b/pype/settings/entities/schemas/projects_schema/schema_project_syncserver.json index 005e188b4d..4bc3fb6c48 100644 --- a/pype/settings/entities/schemas/projects_schema/schema_project_syncserver.json +++ b/pype/settings/entities/schemas/projects_schema/schema_project_syncserver.json @@ -66,10 +66,14 @@ "label": "Credentials url" }, { - "type": "text", + "type": "dict-modifiable", "key": "root", - "label": "Root" - }] + "label": "Roots", + "collapsable": false, + "collapsable_key": false, + "object_type": "text" + } + ] } } ]