Merge pull request #3597 from pypeclub/chore/OP-3405_Use-query-functions-in-sync-server

SyncServer: use query functions
This commit is contained in:
Petr Kalis 2022-08-15 13:43:05 +02:00 committed by GitHub
commit 1bd2d53172
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 279 additions and 268 deletions

View file

@ -62,7 +62,7 @@ class AbstractProvider:
@abc.abstractmethod
def upload_file(self, source_path, path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False):
"""
Copy file from 'source_path' to 'target_path' on provider.
@ -75,7 +75,7 @@ class AbstractProvider:
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
project_name (str): name of project_name
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name
@ -87,7 +87,7 @@ class AbstractProvider:
@abc.abstractmethod
def download_file(self, source_path, local_path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False):
"""
Download file from provider into local system
@ -99,7 +99,7 @@ class AbstractProvider:
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
project_name (str):
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name

View file

@ -224,7 +224,7 @@ class DropboxHandler(AbstractProvider):
return False
def upload_file(self, source_path, path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False):
"""
Copy file from 'source_path' to 'target_path' on provider.
@ -237,7 +237,7 @@ class DropboxHandler(AbstractProvider):
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
project_name (str):
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name
@ -290,7 +290,7 @@ class DropboxHandler(AbstractProvider):
cursor.offset = f.tell()
server.update_db(
collection=collection,
project_name=project_name,
new_file_id=None,
file=file,
representation=representation,
@ -301,7 +301,7 @@ class DropboxHandler(AbstractProvider):
return path
def download_file(self, source_path, local_path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False):
"""
Download file from provider into local system
@ -313,7 +313,7 @@ class DropboxHandler(AbstractProvider):
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
project_name (str):
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name
@ -337,7 +337,7 @@ class DropboxHandler(AbstractProvider):
self.dbx.files_download_to_file(local_path, source_path)
server.update_db(
collection=collection,
project_name=project_name,
new_file_id=None,
file=file,
representation=representation,

View file

@ -251,7 +251,7 @@ class GDriveHandler(AbstractProvider):
return folder_id
def upload_file(self, source_path, path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False):
"""
Uploads single file from 'source_path' to destination 'path'.
@ -264,7 +264,7 @@ class GDriveHandler(AbstractProvider):
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
project_name (str):
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name
@ -324,7 +324,7 @@ class GDriveHandler(AbstractProvider):
while response is None:
if server.is_representation_paused(representation['_id'],
check_parents=True,
project_name=collection):
project_name=project_name):
raise ValueError("Paused during process, please redo.")
if status:
status_val = float(status.progress())
@ -333,7 +333,7 @@ class GDriveHandler(AbstractProvider):
last_tick = time.time()
log.debug("Uploaded %d%%." %
int(status_val * 100))
server.update_db(collection=collection,
server.update_db(project_name=project_name,
new_file_id=None,
file=file,
representation=representation,
@ -358,7 +358,7 @@ class GDriveHandler(AbstractProvider):
return response['id']
def download_file(self, source_path, local_path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False):
"""
Downloads single file from 'source_path' (remote) to 'local_path'.
@ -372,7 +372,7 @@ class GDriveHandler(AbstractProvider):
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
project_name (str):
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name
@ -410,7 +410,7 @@ class GDriveHandler(AbstractProvider):
while response is None:
if server.is_representation_paused(representation['_id'],
check_parents=True,
project_name=collection):
project_name=project_name):
raise ValueError("Paused during process, please redo.")
if status:
status_val = float(status.progress())
@ -419,7 +419,7 @@ class GDriveHandler(AbstractProvider):
last_tick = time.time()
log.debug("Downloaded %d%%." %
int(status_val * 100))
server.update_db(collection=collection,
server.update_db(project_name=project_name,
new_file_id=None,
file=file,
representation=representation,

View file

@ -82,7 +82,7 @@ class LocalDriveHandler(AbstractProvider):
return editable
def upload_file(self, source_path, target_path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False, direction="Upload"):
"""
Copies file from 'source_path' to 'target_path'
@ -95,7 +95,7 @@ class LocalDriveHandler(AbstractProvider):
thread = threading.Thread(target=self._copy,
args=(source_path, target_path))
thread.start()
self._mark_progress(collection, file, representation, server,
self._mark_progress(project_name, file, representation, server,
site, source_path, target_path, direction)
else:
if os.path.exists(target_path):
@ -105,13 +105,14 @@ class LocalDriveHandler(AbstractProvider):
return os.path.basename(target_path)
def download_file(self, source_path, local_path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False):
"""
Download a file form 'source_path' to 'local_path'
"""
return self.upload_file(source_path, local_path,
server, collection, file, representation, site,
server, project_name, file,
representation, site,
overwrite, direction="Download")
def delete_file(self, path):
@ -188,7 +189,7 @@ class LocalDriveHandler(AbstractProvider):
except shutil.SameFileError:
print("same files, skipping")
def _mark_progress(self, collection, file, representation, server, site,
def _mark_progress(self, project_name, file, representation, server, site,
source_path, target_path, direction):
"""
Updates progress field in DB by values 0-1.
@ -204,7 +205,7 @@ class LocalDriveHandler(AbstractProvider):
status_val = target_file_size / source_file_size
last_tick = time.time()
log.debug(direction + "ed %d%%." % int(status_val * 100))
server.update_db(collection=collection,
server.update_db(project_name=project_name,
new_file_id=None,
file=file,
representation=representation,

View file

@ -222,7 +222,7 @@ class SFTPHandler(AbstractProvider):
return os.path.basename(path)
def upload_file(self, source_path, target_path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False):
"""
Uploads single file from 'source_path' to destination 'path'.
@ -235,7 +235,7 @@ class SFTPHandler(AbstractProvider):
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
project_name (str):
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name
@ -256,7 +256,7 @@ class SFTPHandler(AbstractProvider):
thread = threading.Thread(target=self._upload,
args=(source_path, target_path))
thread.start()
self._mark_progress(collection, file, representation, server,
self._mark_progress(project_name, file, representation, server,
site, source_path, target_path, "upload")
return os.path.basename(target_path)
@ -267,7 +267,7 @@ class SFTPHandler(AbstractProvider):
conn.put(source_path, target_path)
def download_file(self, source_path, target_path,
server, collection, file, representation, site,
server, project_name, file, representation, site,
overwrite=False):
"""
Downloads single file from 'source_path' (remote) to 'target_path'.
@ -281,7 +281,7 @@ class SFTPHandler(AbstractProvider):
arguments for saving progress:
server (SyncServer): server instance to call update_db on
collection (str): name of collection
project_name (str):
file (dict): info about uploaded file (matches structure from db)
representation (dict): complete repre containing 'file'
site (str): site name
@ -302,7 +302,7 @@ class SFTPHandler(AbstractProvider):
thread = threading.Thread(target=self._download,
args=(source_path, target_path))
thread.start()
self._mark_progress(collection, file, representation, server,
self._mark_progress(project_name, file, representation, server,
site, source_path, target_path, "download")
return os.path.basename(target_path)
@ -425,7 +425,7 @@ class SFTPHandler(AbstractProvider):
pysftp.exceptions.ConnectionException):
log.warning("Couldn't connect", exc_info=True)
def _mark_progress(self, collection, file, representation, server, site,
def _mark_progress(self, project_name, file, representation, server, site,
source_path, target_path, direction):
"""
Updates progress field in DB by values 0-1.
@ -446,7 +446,7 @@ class SFTPHandler(AbstractProvider):
status_val = target_file_size / source_file_size
last_tick = time.time()
log.debug(direction + "ed %d%%." % int(status_val * 100))
server.update_db(collection=collection,
server.update_db(project_name=project_name,
new_file_id=None,
file=file,
representation=representation,

View file

@ -14,7 +14,7 @@ from .utils import SyncStatus, ResumableError
log = PypeLogger().get_logger("SyncServer")
async def upload(module, collection, file, representation, provider_name,
async def upload(module, project_name, file, representation, provider_name,
remote_site_name, tree=None, preset=None):
"""
Upload single 'file' of a 'representation' to 'provider'.
@ -31,7 +31,7 @@ async def upload(module, collection, file, representation, provider_name,
Args:
module(SyncServerModule): object to run SyncServerModule API
collection (str): source collection
project_name (str): source db
file (dictionary): of file from representation in Mongo
representation (dictionary): of representation
provider_name (string): gdrive, gdc etc.
@ -47,15 +47,16 @@ async def upload(module, collection, file, representation, provider_name,
# thread can do that at a time, upload/download to prepared
# structure should be run in parallel
remote_handler = lib.factory.get_provider(provider_name,
collection,
project_name,
remote_site_name,
tree=tree,
presets=preset)
file_path = file.get("path", "")
try:
local_file_path, remote_file_path = resolve_paths(module,
file_path, collection, remote_site_name, remote_handler
local_file_path, remote_file_path = resolve_paths(
module, file_path, project_name,
remote_site_name, remote_handler
)
except Exception as exp:
print(exp)
@ -74,27 +75,28 @@ async def upload(module, collection, file, representation, provider_name,
local_file_path,
remote_file_path,
module,
collection,
project_name,
file,
representation,
remote_site_name,
True
)
module.handle_alternate_site(collection, representation, remote_site_name,
module.handle_alternate_site(project_name, representation,
remote_site_name,
file["_id"], file_id)
return file_id
async def download(module, collection, file, representation, provider_name,
async def download(module, project_name, file, representation, provider_name,
remote_site_name, tree=None, preset=None):
"""
Downloads file to local folder denoted in representation.Context.
Args:
module(SyncServerModule): object to run SyncServerModule API
collection (str): source collection
project_name (str): source
file (dictionary) : info about processed file
representation (dictionary): repr that 'file' belongs to
provider_name (string): 'gdrive' etc
@ -108,20 +110,20 @@ async def download(module, collection, file, representation, provider_name,
"""
with module.lock:
remote_handler = lib.factory.get_provider(provider_name,
collection,
project_name,
remote_site_name,
tree=tree,
presets=preset)
file_path = file.get("path", "")
local_file_path, remote_file_path = resolve_paths(
module, file_path, collection, remote_site_name, remote_handler
module, file_path, project_name, remote_site_name, remote_handler
)
local_folder = os.path.dirname(local_file_path)
os.makedirs(local_folder, exist_ok=True)
local_site = module.get_active_site(collection)
local_site = module.get_active_site(project_name)
loop = asyncio.get_running_loop()
file_id = await loop.run_in_executor(None,
@ -129,20 +131,20 @@ async def download(module, collection, file, representation, provider_name,
remote_file_path,
local_file_path,
module,
collection,
project_name,
file,
representation,
local_site,
True
)
module.handle_alternate_site(collection, representation, local_site,
module.handle_alternate_site(project_name, representation, local_site,
file["_id"], file_id)
return file_id
def resolve_paths(module, file_path, collection,
def resolve_paths(module, file_path, project_name,
remote_site_name=None, remote_handler=None):
"""
Returns tuple of local and remote file paths with {root}
@ -153,7 +155,7 @@ def resolve_paths(module, file_path, collection,
Args:
module(SyncServerModule): object to run SyncServerModule API
file_path(string): path with {root}
collection(string): project name
project_name(string): project name
remote_site_name(string): remote site
remote_handler(AbstractProvider): implementation
Returns:
@ -164,7 +166,7 @@ def resolve_paths(module, file_path, collection,
remote_file_path = remote_handler.resolve_path(file_path)
local_handler = lib.factory.get_provider(
'local_drive', collection, module.get_active_site(collection))
'local_drive', project_name, module.get_active_site(project_name))
local_file_path = local_handler.resolve_path(file_path)
return local_file_path, remote_file_path
@ -269,8 +271,8 @@ class SyncServerThread(threading.Thread):
- gets list of collections in DB
- gets list of active remote providers (has configuration,
credentials)
- for each collection it looks for representations that should
be synced
- for each project_name it looks for representations that
should be synced
- synchronize found collections
- update representations - fills error messages for exceptions
- waits X seconds and repeat
@ -282,17 +284,17 @@ class SyncServerThread(threading.Thread):
import time
start_time = time.time()
self.module.set_sync_project_settings() # clean cache
collection = None
project_name = None
enabled_projects = self.module.get_enabled_projects()
for collection in enabled_projects:
preset = self.module.sync_project_settings[collection]
for project_name in enabled_projects:
preset = self.module.sync_project_settings[project_name]
local_site, remote_site = self._working_sites(collection)
local_site, remote_site = self._working_sites(project_name)
if not all([local_site, remote_site]):
continue
sync_repres = self.module.get_sync_representations(
collection,
project_name,
local_site,
remote_site
)
@ -310,7 +312,7 @@ class SyncServerThread(threading.Thread):
remote_provider = \
self.module.get_provider_for_site(site=remote_site)
handler = lib.factory.get_provider(remote_provider,
collection,
project_name,
remote_site,
presets=site_preset)
limit = lib.factory.get_provider_batch_limit(
@ -341,7 +343,7 @@ class SyncServerThread(threading.Thread):
limit -= 1
task = asyncio.create_task(
upload(self.module,
collection,
project_name,
file,
sync,
remote_provider,
@ -353,7 +355,7 @@ class SyncServerThread(threading.Thread):
files_processed_info.append((file,
sync,
remote_site,
collection
project_name
))
processed_file_path.add(file_path)
if status == SyncStatus.DO_DOWNLOAD:
@ -361,7 +363,7 @@ class SyncServerThread(threading.Thread):
limit -= 1
task = asyncio.create_task(
download(self.module,
collection,
project_name,
file,
sync,
remote_provider,
@ -373,7 +375,7 @@ class SyncServerThread(threading.Thread):
files_processed_info.append((file,
sync,
local_site,
collection
project_name
))
processed_file_path.add(file_path)
@ -384,12 +386,12 @@ class SyncServerThread(threading.Thread):
return_exceptions=True)
for file_id, info in zip(files_created,
files_processed_info):
file, representation, site, collection = info
file, representation, site, project_name = info
error = None
if isinstance(file_id, BaseException):
error = str(file_id)
file_id = None
self.module.update_db(collection,
self.module.update_db(project_name,
file_id,
file,
representation,
@ -399,7 +401,7 @@ class SyncServerThread(threading.Thread):
duration = time.time() - start_time
log.debug("One loop took {:.2f}s".format(duration))
delay = self.module.get_loop_delay(collection)
delay = self.module.get_loop_delay(project_name)
log.debug("Waiting for {} seconds to new loop".format(delay))
self.timer = asyncio.create_task(self.run_timer(delay))
await asyncio.gather(self.timer)
@ -458,19 +460,19 @@ class SyncServerThread(threading.Thread):
self.timer.cancel()
self.timer = None
def _working_sites(self, collection):
if self.module.is_project_paused(collection):
def _working_sites(self, project_name):
if self.module.is_project_paused(project_name):
log.debug("Both sites same, skipping")
return None, None
local_site = self.module.get_active_site(collection)
remote_site = self.module.get_remote_site(collection)
local_site = self.module.get_active_site(project_name)
remote_site = self.module.get_remote_site(project_name)
if local_site == remote_site:
log.debug("{}-{} sites same, skipping".format(local_site,
remote_site))
return None, None
configured_sites = _get_configured_sites(self.module, collection)
configured_sites = _get_configured_sites(self.module, project_name)
if not all([local_site in configured_sites,
remote_site in configured_sites]):
log.debug("Some of the sites {} - {} is not ".format(local_site,

View file

@ -25,6 +25,8 @@ from .providers import lib
from .utils import time_function, SyncStatus, SiteAlreadyPresentError
from openpype.client import get_representations, get_representation_by_id
log = PypeLogger.get_logger("SyncServer")
@ -128,12 +130,12 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
self.projects_processed = set()
""" Start of Public API """
def add_site(self, collection, representation_id, site_name=None,
def add_site(self, project_name, representation_id, site_name=None,
force=False):
"""
Adds new site to representation to be synced.
'collection' must have synchronization enabled (globally or
'project_name' must have synchronization enabled (globally or
project only)
Used as a API endpoint from outside applications (Loader etc).
@ -141,7 +143,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Use 'force' to reset existing site.
Args:
collection (string): project name (must match DB)
project_name (string): project name (must match DB)
representation_id (string): MongoDB _id value
site_name (string): name of configured and active site
force (bool): reset site if exists
@ -151,25 +153,25 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
not 'force'
ValueError - other errors (repre not found, misconfiguration)
"""
if not self.get_sync_project_setting(collection):
if not self.get_sync_project_setting(project_name):
raise ValueError("Project not configured")
if not site_name:
site_name = self.DEFAULT_SITE
self.reset_site_on_representation(collection,
self.reset_site_on_representation(project_name,
representation_id,
site_name=site_name,
force=force)
def remove_site(self, collection, representation_id, site_name,
def remove_site(self, project_name, representation_id, site_name,
remove_local_files=False):
"""
Removes 'site_name' for particular 'representation_id' on
'collection'
'project_name'
Args:
collection (string): project name (must match DB)
project_name (string): project name (must match DB)
representation_id (string): MongoDB _id value
site_name (string): name of configured and active site
remove_local_files (bool): remove only files for 'local_id'
@ -178,15 +180,15 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Returns:
throws ValueError if any issue
"""
if not self.get_sync_project_setting(collection):
if not self.get_sync_project_setting(project_name):
raise ValueError("Project not configured")
self.reset_site_on_representation(collection,
self.reset_site_on_representation(project_name,
representation_id,
site_name=site_name,
remove=True)
if remove_local_files:
self._remove_local_file(collection, representation_id, site_name)
self._remove_local_file(project_name, representation_id, site_name)
def compute_resource_sync_sites(self, project_name):
"""Get available resource sync sites state for publish process.
@ -333,9 +335,9 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
return alt_site_pairs
def clear_project(self, collection, site_name):
def clear_project(self, project_name, site_name):
"""
Clear 'collection' of 'site_name' and its local files
Clear 'project_name' of 'site_name' and its local files
Works only on real local sites, not on 'studio'
"""
@ -344,16 +346,17 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
"files.sites.name": site_name
}
# TODO currently not possible to replace with get_representations
representations = list(
self.connection.database[collection].find(query))
self.connection.database[project_name].find(query))
if not representations:
self.log.debug("No repre found")
return
for repre in representations:
self.remove_site(collection, repre.get("_id"), site_name, True)
self.remove_site(project_name, repre.get("_id"), site_name, True)
def create_validate_project_task(self, collection, site_name):
def create_validate_project_task(self, project_name, site_name):
"""Adds metadata about project files validation on a queue.
This process will loop through all representation and check if
@ -370,33 +373,28 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
"""
task = {
"type": "validate",
"project_name": collection,
"func": lambda: self.validate_project(collection, site_name,
"project_name": project_name,
"func": lambda: self.validate_project(project_name, site_name,
reset_missing=True)
}
self.projects_processed.add(collection)
self.projects_processed.add(project_name)
self.long_running_tasks.append(task)
def validate_project(self, collection, site_name, reset_missing=False):
"""Validate 'collection' of 'site_name' and its local files
def validate_project(self, project_name, site_name, reset_missing=False):
"""Validate 'project_name' of 'site_name' and its local files
If file present and not marked with a 'site_name' in DB, DB is
updated with site name and file modified date.
Args:
collection (string): project name
project_name (string): project name
site_name (string): active site name
reset_missing (bool): if True reset site in DB if missing
physically
"""
self.log.debug("Validation of {} for {} started".format(collection,
self.log.debug("Validation of {} for {} started".format(project_name,
site_name))
query = {
"type": "representation"
}
representations = list(
self.connection.database[collection].find(query))
representations = list(get_representations(project_name))
if not representations:
self.log.debug("No repre found")
return
@ -416,7 +414,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
continue
file_path = repre_file.get("path", "")
local_file_path = self.get_local_file_path(collection,
local_file_path = self.get_local_file_path(project_name,
site_name,
file_path)
@ -428,14 +426,11 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
"Adding site {} for {}".format(site_name,
repre_id))
query = {
"_id": repre_id
}
created_dt = datetime.fromtimestamp(
os.path.getmtime(local_file_path))
elem = {"name": site_name,
"created_dt": created_dt}
self._add_site(collection, query, repre, elem,
self._add_site(project_name, repre, elem,
site_name=site_name,
file_id=repre_file["_id"],
force=True)
@ -445,41 +440,42 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
self.log.debug("Resetting site {} for {}".
format(site_name, repre_id))
self.reset_site_on_representation(
collection, repre_id, site_name=site_name,
project_name, repre_id, site_name=site_name,
file_id=repre_file["_id"])
sites_reset += 1
if sites_added % 100 == 0:
self.log.debug("Sites added {}".format(sites_added))
self.log.debug("Validation of {} for {} ended".format(collection,
self.log.debug("Validation of {} for {} ended".format(project_name,
site_name))
self.log.info("Sites added {}, sites reset {}".format(sites_added,
reset_missing))
def pause_representation(self, collection, representation_id, site_name):
def pause_representation(self, project_name, representation_id, site_name):
"""
Sets 'representation_id' as paused, eg. no syncing should be
happening on it.
Args:
collection (string): project name
project_name (string): project name
representation_id (string): MongoDB objectId value
site_name (string): 'gdrive', 'studio' etc.
"""
log.info("Pausing SyncServer for {}".format(representation_id))
self._paused_representations.add(representation_id)
self.reset_site_on_representation(collection, representation_id,
self.reset_site_on_representation(project_name, representation_id,
site_name=site_name, pause=True)
def unpause_representation(self, collection, representation_id, site_name):
def unpause_representation(self, project_name,
representation_id, site_name):
"""
Sets 'representation_id' as unpaused.
Does not fail or warn if repre wasn't paused.
Args:
collection (string): project name
project_name (string): project name
representation_id (string): MongoDB objectId value
site_name (string): 'gdrive', 'studio' etc.
"""
@ -489,7 +485,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
except KeyError:
pass
# self.paused_representations is not persistent
self.reset_site_on_representation(collection, representation_id,
self.reset_site_on_representation(project_name, representation_id,
site_name=site_name, pause=False)
def is_representation_paused(self, representation_id,
@ -520,7 +516,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
happening on all representation inside.
Args:
project_name (string): collection name
project_name (string): project_name name
"""
log.info("Pausing SyncServer for {}".format(project_name))
self._paused_projects.add(project_name)
@ -532,7 +528,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Does not fail or warn if project wasn't paused.
Args:
project_name (string): collection name
project_name (string):
"""
log.info("Unpausing SyncServer for {}".format(project_name))
try:
@ -545,7 +541,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Returns if 'project_name' is paused or not.
Args:
project_name (string): collection name
project_name (string):
check_parents (bool): check if server itself
is not paused
Returns:
@ -944,8 +940,8 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
return True
return False
def handle_alternate_site(self, collection, representation, processed_site,
file_id, synced_file_id):
def handle_alternate_site(self, project_name, representation,
processed_site, file_id, synced_file_id):
"""
For special use cases where one site vendors another.
@ -958,7 +954,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
same location >> file is accesible on 'sftp' site right away.
Args:
collection (str): name of project
project_name (str): name of project
representation (dict)
processed_site (str): real site_name of published/uploaded file
file_id (ObjectId): DB id of file handled
@ -982,26 +978,112 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
alternate_sites = set(alternate_sites)
for alt_site in alternate_sites:
query = {
"_id": representation["_id"]
}
elem = {"name": alt_site,
"created_dt": datetime.now(),
"id": synced_file_id}
self.log.debug("Adding alternate {} to {}".format(
alt_site, representation["_id"]))
self._add_site(collection, query,
self._add_site(project_name,
representation, elem,
alt_site, file_id=file_id, force=True)
def get_repre_info_for_versions(self, project_name, version_ids,
active_site, remote_site):
"""Returns representation documents for versions and sites combi
Args:
project_name (str)
version_ids (list): of version[_id]
active_site (string): 'local', 'studio' etc
remote_site (string): dtto
Returns:
"""
self.connection.Session["AVALON_PROJECT"] = project_name
query = [
{"$match": {"parent": {"$in": version_ids},
"type": "representation",
"files.sites.name": {"$exists": 1}}},
{"$unwind": "$files"},
{'$addFields': {
'order_local': {
'$filter': {
'input': '$files.sites', 'as': 'p',
'cond': {'$eq': ['$$p.name', active_site]}
}
}
}},
{'$addFields': {
'order_remote': {
'$filter': {
'input': '$files.sites', 'as': 'p',
'cond': {'$eq': ['$$p.name', remote_site]}
}
}
}},
{'$addFields': {
'progress_local': {"$arrayElemAt": [{
'$cond': [
{'$size': "$order_local.progress"},
"$order_local.progress",
# if exists created_dt count is as available
{'$cond': [
{'$size': "$order_local.created_dt"},
[1],
[0]
]}
]},
0
]}
}},
{'$addFields': {
'progress_remote': {"$arrayElemAt": [{
'$cond': [
{'$size': "$order_remote.progress"},
"$order_remote.progress",
# if exists created_dt count is as available
{'$cond': [
{'$size': "$order_remote.created_dt"},
[1],
[0]
]}
]},
0
]}
}},
{'$group': { # first group by repre
'_id': '$_id',
'parent': {'$first': '$parent'},
'avail_ratio_local': {
'$first': {
'$divide': [{'$sum': "$progress_local"}, {'$sum': 1}]
}
},
'avail_ratio_remote': {
'$first': {
'$divide': [{'$sum': "$progress_remote"}, {'$sum': 1}]
}
}
}},
{'$group': { # second group by parent, eg version_id
'_id': '$parent',
'repre_count': {'$sum': 1}, # total representations
# fully available representation for site
'avail_repre_local': {'$sum': "$avail_ratio_local"},
'avail_repre_remote': {'$sum': "$avail_ratio_remote"},
}},
]
# docs = list(self.connection.aggregate(query))
return self.connection.aggregate(query)
""" End of Public API """
def get_local_file_path(self, collection, site_name, file_path):
def get_local_file_path(self, project_name, site_name, file_path):
"""
Externalized for app
"""
handler = LocalDriveHandler(collection, site_name)
handler = LocalDriveHandler(project_name, site_name)
local_file_path = handler.resolve_path(file_path)
return local_file_path
@ -1288,7 +1370,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
return sites.get(site, 'N/A')
@time_function
def get_sync_representations(self, collection, active_site, remote_site):
def get_sync_representations(self, project_name, active_site, remote_site):
"""
Get representations that should be synced, these could be
recognised by presence of document in 'files.sites', where key is
@ -1299,8 +1381,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
better performance. Goal is to get as few representations as
possible.
Args:
collection (string): name of collection (in most cases matches
project name
project_name (string):
active_site (string): identifier of current active site (could be
'local_0' when working from home, 'studio' when working in the
studio (default)
@ -1309,10 +1390,10 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Returns:
(list) of dictionaries
"""
log.debug("Check representations for : {}".format(collection))
self.connection.Session["AVALON_PROJECT"] = collection
log.debug("Check representations for : {}".format(project_name))
self.connection.Session["AVALON_PROJECT"] = project_name
# retry_cnt - number of attempts to sync specific file before giving up
retries_arr = self._get_retries_arr(collection)
retries_arr = self._get_retries_arr(project_name)
match = {
"type": "representation",
"$or": [
@ -1449,14 +1530,14 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
return SyncStatus.DO_NOTHING
def update_db(self, collection, new_file_id, file, representation,
def update_db(self, project_name, new_file_id, file, representation,
site, error=None, progress=None, priority=None):
"""
Update 'provider' portion of records in DB with success (file_id)
or error (exception)
Args:
collection (string): name of project - force to db connection as
project_name (string): name of project - force to db connection as
each file might come from different collection
new_file_id (string):
file (dictionary): info about processed file (pulled from DB)
@ -1499,7 +1580,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
if file_id:
arr_filter.append({'f._id': ObjectId(file_id)})
self.connection.database[collection].update_one(
self.connection.database[project_name].update_one(
query,
update,
upsert=True,
@ -1562,7 +1643,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
return -1, None
def reset_site_on_representation(self, collection, representation_id,
def reset_site_on_representation(self, project_name, representation_id,
side=None, file_id=None, site_name=None,
remove=False, pause=None, force=False):
"""
@ -1579,7 +1660,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Should be used when repre should be synced to new site.
Args:
collection (string): name of project (eg. collection) in DB
project_name (string): name of project (eg. collection) in DB
representation_id(string): _id of representation
file_id (string): file _id in representation
side (string): local or remote side
@ -1593,20 +1674,18 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
not 'force'
ValueError - other errors (repre not found, misconfiguration)
"""
query = {
"_id": ObjectId(representation_id)
}
representation = self.connection.database[collection].find_one(query)
representation = get_representation_by_id(project_name,
representation_id)
if not representation:
raise ValueError("Representation {} not found in {}".
format(representation_id, collection))
format(representation_id, project_name))
if side and site_name:
raise ValueError("Misconfiguration, only one of side and " +
"site_name arguments should be passed.")
local_site = self.get_active_site(collection)
remote_site = self.get_remote_site(collection)
local_site = self.get_active_site(project_name)
remote_site = self.get_remote_site(project_name)
if side:
if side == 'local':
@ -1617,37 +1696,43 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
elem = {"name": site_name}
if file_id: # reset site for particular file
self._reset_site_for_file(collection, query,
self._reset_site_for_file(project_name, representation_id,
elem, file_id, site_name)
elif side: # reset site for whole representation
self._reset_site(collection, query, elem, site_name)
self._reset_site(project_name, representation_id, elem, site_name)
elif remove: # remove site for whole representation
self._remove_site(collection, query, representation, site_name)
self._remove_site(project_name,
representation, site_name)
elif pause is not None:
self._pause_unpause_site(collection, query,
self._pause_unpause_site(project_name,
representation, site_name, pause)
else: # add new site to all files for representation
self._add_site(collection, query, representation, elem, site_name,
self._add_site(project_name, representation, elem, site_name,
force=force)
def _update_site(self, collection, query, update, arr_filter):
def _update_site(self, project_name, representation_id,
update, arr_filter):
"""
Auxiliary method to call update_one function on DB
Used for refactoring ugly reset_provider_for_file
"""
self.connection.database[collection].update_one(
query = {
"_id": ObjectId(representation_id)
}
self.connection.database[project_name].update_one(
query,
update,
upsert=True,
array_filters=arr_filter
)
def _reset_site_for_file(self, collection, query,
def _reset_site_for_file(self, project_name, representation_id,
elem, file_id, site_name):
"""
Resets 'site_name' for 'file_id' on representation in 'query' on
'collection'
'project_name'
"""
update = {
"$set": {"files.$[f].sites.$[s]": elem}
@ -1660,9 +1745,9 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
{'f._id': file_id}
]
self._update_site(collection, query, update, arr_filter)
self._update_site(project_name, representation_id, update, arr_filter)
def _reset_site(self, collection, query, elem, site_name):
def _reset_site(self, project_name, representation_id, elem, site_name):
"""
Resets 'site_name' for all files of representation in 'query'
"""
@ -1674,9 +1759,9 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
{'s.name': site_name}
]
self._update_site(collection, query, update, arr_filter)
self._update_site(project_name, representation_id, update, arr_filter)
def _remove_site(self, collection, query, representation, site_name):
def _remove_site(self, project_name, representation, site_name):
"""
Removes 'site_name' for 'representation' in 'query'
@ -1698,10 +1783,11 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
}
arr_filter = []
self._update_site(collection, query, update, arr_filter)
self._update_site(project_name, representation["_id"],
update, arr_filter)
def _pause_unpause_site(self, collection, query,
representation, site_name, pause):
def _pause_unpause_site(self, project_name, representation,
site_name, pause):
"""
Pauses/unpauses all files for 'representation' based on 'pause'
@ -1733,12 +1819,13 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
{'s.name': site_name}
]
self._update_site(collection, query, update, arr_filter)
self._update_site(project_name, representation["_id"],
update, arr_filter)
def _add_site(self, collection, query, representation, elem, site_name,
def _add_site(self, project_name, representation, elem, site_name,
force=False, file_id=None):
"""
Adds 'site_name' to 'representation' on 'collection'
Adds 'site_name' to 'representation' on 'project_name'
Args:
representation (dict)
@ -1746,10 +1833,11 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Use 'force' to remove existing or raises ValueError
"""
representation_id = representation["_id"]
reset_existing = False
files = representation.get("files", [])
if not files:
log.debug("No files for {}".format(representation["_id"]))
log.debug("No files for {}".format(representation_id))
return
for repre_file in files:
@ -1759,7 +1847,8 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
for site in repre_file.get("sites"):
if site["name"] == site_name:
if force or site.get("error"):
self._reset_site_for_file(collection, query,
self._reset_site_for_file(project_name,
representation_id,
elem, repre_file["_id"],
site_name)
reset_existing = True
@ -1785,14 +1874,15 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
{'f._id': file_id}
]
self._update_site(collection, query, update, arr_filter)
self._update_site(project_name, representation_id,
update, arr_filter)
def _remove_local_file(self, collection, representation_id, site_name):
def _remove_local_file(self, project_name, representation_id, site_name):
"""
Removes all local files for 'site_name' of 'representation_id'
Args:
collection (string): project name (must match DB)
project_name (string): project name (must match DB)
representation_id (string): MongoDB _id value
site_name (string): name of configured and active site
@ -1808,21 +1898,17 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
provider_name = self.get_provider_for_site(site=site_name)
if provider_name == 'local_drive':
query = {
"_id": ObjectId(representation_id)
}
representation = list(
self.connection.database[collection].find(query))
representation = get_representation_by_id(project_name,
representation_id,
fields=["files"])
if not representation:
self.log.debug("No repre {} found".format(
representation_id))
return
representation = representation.pop()
local_file_path = ''
for file in representation.get("files"):
local_file_path = self.get_local_file_path(collection,
local_file_path = self.get_local_file_path(project_name,
site_name,
file.get("path", "")
)

View file

@ -11,6 +11,7 @@ from openpype.tools.utils.delegates import pretty_timestamp
from openpype.lib import PypeLogger
from openpype.api import get_local_site_id
from openpype.client import get_representation_by_id
from . import lib
@ -440,7 +441,7 @@ class SyncRepresentationSummaryModel(_SyncRepresentationModel):
full text filtering.
Allows pagination, most of heavy lifting is being done on DB side.
Single model matches to single collection. When project is changed,
Single model matches to single project. When project is changed,
model is reset and refreshed.
Args:
@ -919,11 +920,10 @@ class SyncRepresentationSummaryModel(_SyncRepresentationModel):
repre_id = self.data(index, Qt.UserRole)
representation = list(self.dbcon.find({"type": "representation",
"_id": repre_id}))
representation = get_representation_by_id(self.project, repre_id)
if representation:
self.sync_server.update_db(self.project, None, None,
representation.pop(),
representation,
get_local_site_id(),
priority=value)
self.is_editing = False
@ -1357,11 +1357,10 @@ class SyncRepresentationDetailModel(_SyncRepresentationModel):
file_id = self.data(index, Qt.UserRole)
updated_file = None
# conversion from cursor to list
representations = list(self.dbcon.find({"type": "representation",
"_id": self._id}))
representation = get_representation_by_id(self.project, self._id)
if not representation:
return
representation = representations.pop()
for repre_file in representation["files"]:
if repre_file["_id"] == file_id:
updated_file = repre_file

View file

@ -272,15 +272,15 @@ class SubsetsModel(TreeModel, BaseRepresentationModel):
# update availability on active site when version changes
if self.sync_server.enabled and version_doc:
query = self._repre_per_version_pipeline(
repre_info = self.sync_server.get_repre_info_for_versions(
project_name,
[version_doc["_id"]],
self.active_site,
self.remote_site
)
docs = list(self.dbcon.aggregate(query))
if docs:
repre = docs.pop()
version_doc["data"].update(self._get_repre_dict(repre))
if repre_info:
version_doc["data"].update(
self._get_repre_dict(repre_info[0]))
self.set_version(index, version_doc)
@ -478,16 +478,16 @@ class SubsetsModel(TreeModel, BaseRepresentationModel):
for _subset_id, doc in last_versions_by_subset_id.items():
version_ids.add(doc["_id"])
query = self._repre_per_version_pipeline(
repres = self.sync_server.get_repre_info_for_versions(
project_name,
list(version_ids), self.active_site, self.remote_site
)
for doc in self.dbcon.aggregate(query):
for repre in repres:
if self._doc_fetching_stop:
return
doc["active_provider"] = self.active_provider
doc["remote_provider"] = self.remote_provider
repre_info[doc["_id"]] = doc
repre_info[repre["_id"]] = repre
self._doc_payload = {
"asset_docs_by_id": asset_docs_by_id,
@ -827,83 +827,6 @@ class SubsetsModel(TreeModel, BaseRepresentationModel):
return data
def _repre_per_version_pipeline(self, version_ids,
active_site, remote_site):
query = [
{"$match": {"parent": {"$in": version_ids},
"type": "representation",
"files.sites.name": {"$exists": 1}}},
{"$unwind": "$files"},
{'$addFields': {
'order_local': {
'$filter': {
'input': '$files.sites', 'as': 'p',
'cond': {'$eq': ['$$p.name', active_site]}
}
}
}},
{'$addFields': {
'order_remote': {
'$filter': {
'input': '$files.sites', 'as': 'p',
'cond': {'$eq': ['$$p.name', remote_site]}
}
}
}},
{'$addFields': {
'progress_local': {"$arrayElemAt": [{
'$cond': [
{'$size': "$order_local.progress"},
"$order_local.progress",
# if exists created_dt count is as available
{'$cond': [
{'$size': "$order_local.created_dt"},
[1],
[0]
]}
]},
0
]}
}},
{'$addFields': {
'progress_remote': {"$arrayElemAt": [{
'$cond': [
{'$size': "$order_remote.progress"},
"$order_remote.progress",
# if exists created_dt count is as available
{'$cond': [
{'$size': "$order_remote.created_dt"},
[1],
[0]
]}
]},
0
]}
}},
{'$group': { # first group by repre
'_id': '$_id',
'parent': {'$first': '$parent'},
'avail_ratio_local': {
'$first': {
'$divide': [{'$sum': "$progress_local"}, {'$sum': 1}]
}
},
'avail_ratio_remote': {
'$first': {
'$divide': [{'$sum': "$progress_remote"}, {'$sum': 1}]
}
}
}},
{'$group': { # second group by parent, eg version_id
'_id': '$parent',
'repre_count': {'$sum': 1}, # total representations
# fully available representation for site
'avail_repre_local': {'$sum': "$avail_ratio_local"},
'avail_repre_remote': {'$sum': "$avail_ratio_remote"},
}},
]
return query
class GroupMemberFilterProxyModel(QtCore.QSortFilterProxyModel):
"""Provide the feature of filtering group by the acceptance of members