Merge pull request #2966 from pypeclub/feature/OP-2951_Download-all-workfile-inputs

SiteSync: Download all workfile inputs
This commit is contained in:
Petr Kalis 2022-04-25 16:13:09 +02:00 committed by GitHub
commit b59a408a39
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 266 additions and 93 deletions

View file

@ -1973,3 +1973,119 @@ def get_last_workfile(
return os.path.normpath(os.path.join(workdir, filename))
return filename
@with_avalon
def get_linked_ids_for_representations(project_name, repre_ids, dbcon=None,
link_type=None, max_depth=0):
"""Returns list of linked ids of particular type (if provided).
Goes from representations to version, back to representations
Args:
project_name (str)
repre_ids (list) or (ObjectId)
dbcon (avalon.mongodb.AvalonMongoDB, optional): Avalon Mongo connection
with Session.
link_type (str): ['reference', '..]
max_depth (int): limit how many levels of recursion
Returns:
(list) of ObjectId - linked representations
"""
# Create new dbcon if not passed and use passed project name
if not dbcon:
from avalon.api import AvalonMongoDB
dbcon = AvalonMongoDB()
dbcon.Session["AVALON_PROJECT"] = project_name
# Validate that passed dbcon has same project
elif dbcon.Session["AVALON_PROJECT"] != project_name:
raise ValueError("Passed connection does not have right project")
if not isinstance(repre_ids, list):
repre_ids = [repre_ids]
version_ids = dbcon.distinct("parent", {
"_id": {"$in": repre_ids},
"type": "representation"
})
match = {
"_id": {"$in": version_ids},
"type": "version"
}
graph_lookup = {
"from": project_name,
"startWith": "$data.inputLinks.id",
"connectFromField": "data.inputLinks.id",
"connectToField": "_id",
"as": "outputs_recursive",
"depthField": "depth"
}
if max_depth != 0:
# We offset by -1 since 0 basically means no recursion
# but the recursion only happens after the initial lookup
# for outputs.
graph_lookup["maxDepth"] = max_depth - 1
pipeline_ = [
# Match
{"$match": match},
# Recursive graph lookup for inputs
{"$graphLookup": graph_lookup}
]
result = dbcon.aggregate(pipeline_)
referenced_version_ids = _process_referenced_pipeline_result(result,
link_type)
ref_ids = dbcon.distinct(
"_id",
filter={
"parent": {"$in": list(referenced_version_ids)},
"type": "representation"
}
)
return list(ref_ids)
def _process_referenced_pipeline_result(result, link_type):
"""Filters result from pipeline for particular link_type.
Pipeline cannot use link_type directly in a query.
Returns:
(list)
"""
referenced_version_ids = set()
correctly_linked_ids = set()
for item in result:
input_links = item["data"].get("inputLinks", [])
correctly_linked_ids = _filter_input_links(input_links,
link_type,
correctly_linked_ids)
# outputs_recursive in random order, sort by depth
outputs_recursive = sorted(item.get("outputs_recursive", []),
key=lambda d: d["depth"])
for output in outputs_recursive:
if output["_id"] not in correctly_linked_ids: # leaf
continue
correctly_linked_ids = _filter_input_links(
output["data"].get("inputLinks", []),
link_type,
correctly_linked_ids)
referenced_version_ids.add(output["_id"])
return referenced_version_ids
def _filter_input_links(input_links, link_type, correctly_linked_ids):
for input_link in input_links:
if not link_type or input_link["type"] == link_type:
correctly_linked_ids.add(input_link.get("id") or
input_link.get("_id")) # legacy
return correctly_linked_ids

View file

@ -23,7 +23,7 @@ from openpype.settings.lib import (
from .providers.local_drive import LocalDriveHandler
from .providers import lib
from .utils import time_function, SyncStatus
from .utils import time_function, SyncStatus, SiteAlreadyPresentError
log = PypeLogger().get_logger("SyncServer")
@ -131,21 +131,25 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
def add_site(self, collection, representation_id, site_name=None,
force=False):
"""
Adds new site to representation to be synced.
Adds new site to representation to be synced.
'collection' must have synchronization enabled (globally or
project only)
'collection' must have synchronization enabled (globally or
project only)
Used as a API endpoint from outside applications (Loader etc)
Used as a API endpoint from outside applications (Loader etc).
Args:
collection (string): project name (must match DB)
representation_id (string): MongoDB _id value
site_name (string): name of configured and active site
force (bool): reset site if exists
Use 'force' to reset existing site.
Returns:
throws ValueError if any issue
Args:
collection (string): project name (must match DB)
representation_id (string): MongoDB _id value
site_name (string): name of configured and active site
force (bool): reset site if exists
Throws:
SiteAlreadyPresentError - if adding already existing site and
not 'force'
ValueError - other errors (repre not found, misconfiguration)
"""
if not self.get_sync_project_setting(collection):
raise ValueError("Project not configured")
@ -155,7 +159,8 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
self.reset_site_on_representation(collection,
representation_id,
site_name=site_name, force=force)
site_name=site_name,
force=force)
def remove_site(self, collection, representation_id, site_name,
remove_local_files=False):
@ -351,36 +356,38 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
def create_validate_project_task(self, collection, site_name):
"""Adds metadata about project files validation on a queue.
This process will loop through all representation and check if
their files actually exist on an active site.
This process will loop through all representation and check if
their files actually exist on an active site.
This might be useful for edge cases when artists is switching
between sites, remote site is actually physically mounted and
active site has same file urls etc.
It also checks if site is set in DB, but file is physically not
present
Task will run on a asyncio loop, shouldn't be blocking.
This might be useful for edge cases when artists is switching
between sites, remote site is actually physically mounted and
active site has same file urls etc.
Task will run on a asyncio loop, shouldn't be blocking.
"""
task = {
"type": "validate",
"project_name": collection,
"func": lambda: self.validate_project(collection, site_name)
"func": lambda: self.validate_project(collection, site_name,
reset_missing=True)
}
self.projects_processed.add(collection)
self.long_running_tasks.append(task)
def validate_project(self, collection, site_name, remove_missing=False):
"""
Validate 'collection' of 'site_name' and its local files
def validate_project(self, collection, site_name, reset_missing=False):
"""Validate 'collection' of 'site_name' and its local files
If file present and not marked with a 'site_name' in DB, DB is
updated with site name and file modified date.
If file present and not marked with a 'site_name' in DB, DB is
updated with site name and file modified date.
Args:
module (SyncServerModule)
collection (string): project name
site_name (string): active site name
remove_missing (bool): if True remove sites in DB if missing
physically
Args:
collection (string): project name
site_name (string): active site name
reset_missing (bool): if True reset site in DB if missing
physically
"""
self.log.debug("Validation of {} for {} started".format(collection,
site_name))
@ -395,29 +402,32 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
return
sites_added = 0
sites_removed = 0
sites_reset = 0
for repre in representations:
repre_id = repre["_id"]
for repre_file in repre.get("files", []):
try:
has_site = site_name in [site["name"]
for site in repre_file["sites"]]
except TypeError:
is_on_site = site_name in [site["name"]
for site in repre_file["sites"]
if (site.get("created_dt") and
not site.get("error"))]
except (TypeError, AttributeError):
self.log.debug("Structure error in {}".format(repre_id))
continue
if has_site and not remove_missing:
continue
file_path = repre_file.get("path", "")
local_file_path = self.get_local_file_path(collection,
site_name,
file_path)
if local_file_path and os.path.exists(local_file_path):
self.log.debug("Adding site {} for {}".format(site_name,
repre_id))
if not has_site:
file_exists = (local_file_path and
os.path.exists(local_file_path))
if not is_on_site:
if file_exists:
self.log.debug(
"Adding site {} for {}".format(site_name,
repre_id))
query = {
"_id": repre_id
}
@ -425,27 +435,27 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
os.path.getmtime(local_file_path))
elem = {"name": site_name,
"created_dt": created_dt}
self._add_site(collection, query, [repre], elem,
self._add_site(collection, query, repre, elem,
site_name=site_name,
file_id=repre_file["_id"])
file_id=repre_file["_id"],
force=True)
sites_added += 1
else:
if has_site and remove_missing:
self.log.debug("Removing site {} for {}".
if not file_exists and reset_missing:
self.log.debug("Resetting site {} for {}".
format(site_name, repre_id))
self.reset_provider_for_file(collection,
repre_id,
file_id=repre_file["_id"],
remove=True)
sites_removed += 1
self.reset_site_on_representation(
collection, repre_id, site_name=site_name,
file_id=repre_file["_id"])
sites_reset += 1
if sites_added % 100 == 0:
self.log.debug("Sites added {}".format(sites_added))
self.log.debug("Validation of {} for {} ended".format(collection,
site_name))
self.log.info("Sites added {}, sites removed {}".format(sites_added,
sites_removed))
self.log.info("Sites added {}, sites reset {}".format(sites_added,
reset_missing))
def pause_representation(self, collection, representation_id, site_name):
"""
@ -963,7 +973,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
self.log.debug("Adding alternate {} to {}".format(
alt_site, representation["_id"]))
self._add_site(collection, query,
[representation], elem,
representation, elem,
alt_site, file_id=file_id, force=True)
""" End of Public API """
@ -1567,14 +1577,16 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
pause (bool or None): if True - pause, False - unpause
force (bool): hard reset - currently only for add_site
Returns:
throws ValueError
Raises:
SiteAlreadyPresentError - if adding already existing site and
not 'force'
ValueError - other errors (repre not found, misconfiguration)
"""
query = {
"_id": ObjectId(representation_id)
}
representation = list(self.connection.database[collection].find(query))
representation = self.connection.database[collection].find_one(query)
if not representation:
raise ValueError("Representation {} not found in {}".
format(representation_id, collection))
@ -1605,7 +1617,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
representation, site_name, pause)
else: # add new site to all files for representation
self._add_site(collection, query, representation, elem, site_name,
force)
force=force)
def _update_site(self, collection, query, update, arr_filter):
"""
@ -1660,7 +1672,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Throws ValueError if 'site_name' not found on 'representation'
"""
found = False
for repre_file in representation.pop().get("files"):
for repre_file in representation.get("files"):
for site in repre_file.get("sites"):
if site.get("name") == site_name:
found = True
@ -1686,7 +1698,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
"""
found = False
site = None
for repre_file in representation.pop().get("files"):
for repre_file in representation.get("files"):
for site in repre_file.get("sites"):
if site["name"] == site_name:
found = True
@ -1718,29 +1730,34 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
Adds 'site_name' to 'representation' on 'collection'
Args:
representation (list of 1 dict)
representation (dict)
file_id (ObjectId)
Use 'force' to remove existing or raises ValueError
"""
reseted_existing = False
for repre_file in representation.pop().get("files"):
reset_existing = False
files = representation.get("files", [])
if not files:
log.debug("No files for {}".format(representation["_id"]))
return
for repre_file in files:
if file_id and file_id != repre_file["_id"]:
continue
for site in repre_file.get("sites"):
if site["name"] == site_name:
if force:
if force or site.get("error"):
self._reset_site_for_file(collection, query,
elem, repre_file["_id"],
site_name)
reseted_existing = True
reset_existing = True
else:
msg = "Site {} already present".format(site_name)
log.info(msg)
raise ValueError(msg)
raise SiteAlreadyPresentError(msg)
if reseted_existing:
if reset_existing:
return
if not file_id:
@ -1904,7 +1921,7 @@ class SyncServerModule(OpenPypeModule, ITrayModule):
(int) - number of failed attempts
"""
_, rec = self._get_site_rec(file.get("sites", []), provider)
return rec.get("tries", 0)
return self._get_tries_count_from_rec(rec)
def _get_progress_dict(self, progress):
"""

View file

@ -8,6 +8,11 @@ class ResumableError(Exception):
pass
class SiteAlreadyPresentError(Exception):
"""Representation has already site skeleton present."""
pass
class SyncStatus:
DO_NOTHING = 0
DO_UPLOAD = 1

View file

@ -1,9 +1,19 @@
from openpype.modules import ModulesManager
from openpype.pipeline import load
from openpype.lib.avalon_context import get_linked_ids_for_representations
from openpype.modules.sync_server.utils import SiteAlreadyPresentError
class AddSyncSite(load.LoaderPlugin):
"""Add sync site to representation"""
"""Add sync site to representation
If family of synced representation is 'workfile', it looks for all
representations which are referenced (loaded) in workfile with content of
'inputLinks'.
It doesn't do any checks for site, most common use case is when artist is
downloading workfile to his local site, but it might be helpful when
artist is re-uploading broken representation on remote site also.
"""
representations = ["*"]
families = ["*"]
@ -12,21 +22,42 @@ class AddSyncSite(load.LoaderPlugin):
icon = "download"
color = "#999999"
_sync_server = None
is_add_site_loader = True
@property
def sync_server(self):
if not self._sync_server:
manager = ModulesManager()
self._sync_server = manager.modules_by_name["sync_server"]
return self._sync_server
def load(self, context, name=None, namespace=None, data=None):
self.log.info("Adding {} to representation: {}".format(
data["site_name"], data["_id"]))
self.add_site_to_representation(data["project_name"],
data["_id"],
data["site_name"])
self.log.debug("Site added.")
family = context["representation"]["context"]["family"]
project_name = data["project_name"]
repre_id = data["_id"]
site_name = data["site_name"]
@staticmethod
def add_site_to_representation(project_name, representation_id, site_name):
"""Adds new site to representation_id, resets if exists"""
manager = ModulesManager()
sync_server = manager.modules_by_name["sync_server"]
sync_server.add_site(project_name, representation_id, site_name,
force=True)
self.sync_server.add_site(project_name, repre_id, site_name,
force=True)
if family == "workfile":
links = get_linked_ids_for_representations(project_name,
[repre_id],
link_type="reference")
for link_repre_id in links:
try:
self.sync_server.add_site(project_name, link_repre_id,
site_name,
force=False)
except SiteAlreadyPresentError:
# do not add/reset working site for references
self.log.debug("Site present", exc_info=True)
self.log.debug("Site added.")
def filepath_from_context(self, context):
"""No real file loading"""

View file

@ -12,22 +12,26 @@ class RemoveSyncSite(load.LoaderPlugin):
icon = "download"
color = "#999999"
_sync_server = None
is_remove_site_loader = True
@property
def sync_server(self):
if not self._sync_server:
manager = ModulesManager()
self._sync_server = manager.modules_by_name["sync_server"]
return self._sync_server
def load(self, context, name=None, namespace=None, data=None):
self.log.info("Removing {} on representation: {}".format(
data["site_name"], data["_id"]))
self.remove_site_on_representation(data["project_name"],
data["_id"],
data["site_name"])
self.sync_server.remove_site(data["project_name"],
data["_id"],
data["site_name"],
True)
self.log.debug("Site added.")
@staticmethod
def remove_site_on_representation(project_name, representation_id,
site_name):
manager = ModulesManager()
sync_server = manager.modules_by_name["sync_server"]
sync_server.remove_site(project_name, representation_id,
site_name, True)
def filepath_from_context(self, context):
"""No real file loading"""
return ""

View file

@ -727,11 +727,11 @@ def is_sync_loader(loader):
def is_remove_site_loader(loader):
return hasattr(loader, "remove_site_on_representation")
return hasattr(loader, "is_remove_site_loader")
def is_add_site_loader(loader):
return hasattr(loader, "add_site_to_representation")
return hasattr(loader, "is_add_site_loader")
class WrappedCallbackItem: