From fd2d07e94c0fb34730547c396e09ddc314b56983 Mon Sep 17 00:00:00 2001 From: Roy Nieterau Date: Tue, 5 Jul 2022 09:22:29 +0200 Subject: [PATCH] Revert integrator to latest develop --- openpype/plugins/publish/integrate_new.py | 1710 +++++++++++++-------- 1 file changed, 1088 insertions(+), 622 deletions(-) diff --git a/openpype/plugins/publish/integrate_new.py b/openpype/plugins/publish/integrate_new.py index a07e8a1e0f..4c14c17dae 100644 --- a/openpype/plugins/publish/integrate_new.py +++ b/openpype/plugins/publish/integrate_new.py @@ -1,111 +1,63 @@ import os +from os.path import getsize import logging import sys import copy import clique +import errno import six +import re +import shutil +from collections import deque, defaultdict +from datetime import datetime from bson.objectid import ObjectId -from pymongo import DeleteMany, ReplaceOne, InsertOne, UpdateOne +from pymongo import DeleteOne, InsertOne import pyblish.api import openpype.api -from openpype.modules import ModulesManager from openpype.lib.profiles_filtering import filter_profiles -from openpype.lib.file_transaction import FileTransaction +from openpype.lib import ( + prepare_template_data, + create_hard_link, + StringTemplate, + TemplateUnsolved +) from openpype.pipeline import legacy_io +# this is needed until speedcopy for linux is fixed +if sys.platform == "win32": + from speedcopy import copyfile +else: + from shutil import copyfile + log = logging.getLogger(__name__) -def assemble(files): - """Convenience `clique.assemble` wrapper for files of a single collection. - - Unlike `clique.assemble` this wrapper does not allow more than a single - Collection nor any remainder files. Errors will be raised when not only - a single collection is assembled. - - Returns: - clique.Collection: A single sequence Collection - - Raises: - ValueError: Error is raised when files do not result in a single - collected Collection. - - """ - # todo: move this to lib? - # Get the sequence as a collection. The files must be of a single - # sequence and have no remainder outside of the collections. - patterns = [clique.PATTERNS["frames"]] - collections, remainder = clique.assemble(files, - minimum_items=1, - patterns=patterns) - if not collections: - raise ValueError("No collections found in files: " - "{}".format(files)) - if remainder: - raise ValueError("Files found not detected as part" - " of a sequence: {}".format(remainder)) - if len(collections) > 1: - raise ValueError("Files in sequence are not part of a" - " single sequence collection: " - "{}".format(collections)) - return collections[0] - - -def get_instance_families(instance): - """Get all families of the instance""" - # todo: move this to lib? - family = instance.data.get("family") - families = [] - if family: - families.append(family) - - for _family in (instance.data.get("families") or []): - if _family not in families: - families.append(_family) - - return families - - -def get_frame_padded(frame, padding): - """Return frame number as string with `padding` amount of padded zeros""" - return "{frame:0{padding}d}".format(padding=padding, frame=frame) - - -def get_first_frame_padded(collection): - """Return first frame as padded number from `clique.Collection`""" - start_frame = next(iter(collection.indexes)) - return get_frame_padded(start_frame, padding=collection.padding) - - -def bulk_write(writes): - """Convenience function to bulk write into active project database""" - project = legacy_io.Session["AVALON_PROJECT"] - return legacy_io._database[project].bulk_write(writes) - - class IntegrateAssetNew(pyblish.api.InstancePlugin): - """Register publish in the database and transfer files to destinations. + """Resolve any dependency issues - Steps: - 1) Register the subset and version - 2) Transfer the representation files to the destination - 3) Register the representation + This plug-in resolves any paths which, if not updated might break + the published file. - Requires: - instance.data['representations'] - must be a list and each member - must be a dictionary with following data: - 'files': list of filenames for sequence, string for single file. - Only the filename is allowed, without the folder path. - 'stagingDir': "path/to/folder/with/files" - 'name': representation name (usually the same as extension) - 'ext': file extension - optional data - "frameStart" - "frameEnd" - 'fps' - "data": additional metadata for each representation. + The order of families is important, when working with lookdev you want to + first publish the texture, update the texture paths in the nodes and then + publish the shading network. Same goes for file dependent assets. + + Requirements for instance to be correctly integrated + + instance.data['representations'] - must be a list and each member + must be a dictionary with following data: + 'files': list of filenames for sequence, string for single file. + Only the filename is allowed, without the folder path. + 'stagingDir': "path/to/folder/with/files" + 'name': representation name (usually the same as extension) + 'ext': file extension + optional data + "frameStart" + "frameEnd" + 'fps' + "data": additional metadata for each representation. """ label = "Integrate Asset New" @@ -140,6 +92,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): "source", "matchmove", "image", + "source", "assembly", "fbx", "textures", @@ -156,51 +109,157 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): "usd", "staticMesh", "skeletalMesh", - "usdComposition", - "usdOverride", + "mvLook", + "mvUsd", + "mvUsdComposition", + "mvUsdOverride", "simpleUnrealTexture" ] - exclude_families = ["clip", "render.farm"] - default_template_name = "publish" - - # Representation context keys that should always be written to - # the database even if not used by the destination template + exclude_families = ["render.farm"] db_representation_context_keys = [ "project", "asset", "task", "subset", "version", "representation", - "family", "hierarchy", "username" + "family", "hierarchy", "task", "username" ] + default_template_name = "publish" + + # suffix to denote temporary files, use without '.' + TMP_FILE_EXT = 'tmp' + + # file_url : file_size of all published and uploaded files + integrated_file_sizes = {} # Attributes set by settings template_name_profiles = None + subset_grouping_profiles = None def process(self, instance): + for ef in self.exclude_families: + if ( + instance.data["family"] == ef or + ef in instance.data["families"]): + self.log.debug("Excluded family '{}' in '{}' or {}".format( + ef, instance.data["family"], instance.data["families"])) + return - # Exclude instances that also contain families from exclude families - families = set(get_instance_families(instance)) - exclude = families & set(self.exclude_families) - if exclude: - self.log.debug("Instance not integrated due to exclude " - "families found: {}".format(", ".join(exclude))) + # instance should be published on a farm + if instance.data.get("farm"): return - file_transactions = FileTransaction(log=self.log) + # Prepare repsentations that should be integrated + repres = instance.data.get("representations") + # Raise error if instance don't have any representations + if not repres: + raise ValueError( + "Instance {} has no files to transfer".format( + instance.data["family"] + ) + ) + + # Validate type of stored representations + if not isinstance(repres, (list, tuple)): + raise TypeError( + "Instance 'files' must be a list, got: {0} {1}".format( + str(type(repres)), str(repres) + ) + ) + + # Filter representations + filtered_repres = [] + for repre in repres: + if "delete" in repre.get("tags", []): + continue + filtered_repres.append(repre) + + # Skip instance if there are not representations to integrate + # all representations should not be integrated + if not filtered_repres: + self.log.warning(( + "Skipping, there are no representations" + " to integrate for instance {}" + ).format(instance.data["family"])) + return + + self.integrated_file_sizes = {} try: - self.register(instance, file_transactions) + self.register(instance, filtered_repres) + self.log.info("Integrated Asset in to the database ...") + self.log.info("instance.data: {}".format(instance.data)) + self.handle_destination_files(self.integrated_file_sizes, + 'finalize') except Exception: # clean destination - # todo: preferably we'd also rollback *any* changes to the database - file_transactions.rollback() self.log.critical("Error when registering", exc_info=True) + self.handle_destination_files(self.integrated_file_sizes, 'remove') six.reraise(*sys.exc_info()) - # Finalizing can't rollback safely so no use for moving it to - # the try, except. - file_transactions.finalize() + def register(self, instance, repres): + # Required environment variables + anatomy_data = instance.data["anatomyData"] - def register(self, instance, file_transactions): + legacy_io.install() - instance_stagingdir = instance.data.get("stagingDir") - if not instance_stagingdir: + context = instance.context + + project_entity = instance.data["projectEntity"] + + context_asset_name = None + context_asset_doc = context.data.get("assetEntity") + if context_asset_doc: + context_asset_name = context_asset_doc["name"] + + asset_name = instance.data["asset"] + asset_entity = instance.data.get("assetEntity") + if not asset_entity or asset_entity["name"] != context_asset_name: + asset_entity = legacy_io.find_one({ + "type": "asset", + "name": asset_name, + "parent": project_entity["_id"] + }) + assert asset_entity, ( + "No asset found by the name \"{0}\" in project \"{1}\"" + ).format(asset_name, project_entity["name"]) + + instance.data["assetEntity"] = asset_entity + + # update anatomy data with asset specific keys + # - name should already been set + hierarchy = "" + parents = asset_entity["data"]["parents"] + if parents: + hierarchy = "/".join(parents) + anatomy_data["hierarchy"] = hierarchy + + # Make sure task name in anatomy data is same as on instance.data + asset_tasks = ( + asset_entity.get("data", {}).get("tasks") + ) or {} + task_name = instance.data.get("task") + if task_name: + task_info = asset_tasks.get(task_name) or {} + task_type = task_info.get("type") + + project_task_types = project_entity["config"]["tasks"] + task_code = project_task_types.get(task_type, {}).get("short_name") + anatomy_data["task"] = { + "name": task_name, + "type": task_type, + "short": task_code + } + + elif "task" in anatomy_data: + # Just set 'task_name' variable to context task + task_name = anatomy_data["task"]["name"] + task_type = anatomy_data["task"]["type"] + + else: + task_name = None + task_type = None + + # Fill family in anatomy data + anatomy_data["family"] = instance.data.get("family") + + stagingdir = instance.data.get("stagingDir") + if not stagingdir: self.log.info(( "{0} is missing reference to staging directory." " Will try to get it from representation." @@ -208,515 +267,718 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): else: self.log.debug( - "Establishing staging directory " - "@ {0}".format(instance_stagingdir) + "Establishing staging directory @ {0}".format(stagingdir) ) - # Ensure at least one representation is set up for registering. - repres = instance.data.get("representations") - assert repres, "Instance has no representations data" - assert isinstance(repres, (list, tuple)), ( - "Instance 'representations' must be a list, got: {0} {1}".format( - str(type(repres)), str(repres) - ) + subset = self.get_subset(asset_entity, instance) + instance.data["subsetEntity"] = subset + + version_number = instance.data["version"] + self.log.debug("Next version: v{}".format(version_number)) + + version_data = self.create_version_data(context, instance) + + version_data_instance = instance.data.get('versionData') + if version_data_instance: + version_data.update(version_data_instance) + + # TODO rename method from `create_version` to + # `prepare_version` or similar... + version = self.create_version( + subset=subset, + version_number=version_number, + data=version_data ) - template_name = self.get_template_name(instance) + self.log.debug("Creating version ...") - subset, subset_writes = self.prepare_subset(instance) - version, version_writes = self.prepare_version(instance, subset) + new_repre_names_low = [ + _repre["name"].lower() + for _repre in repres + ] + + existing_version = legacy_io.find_one({ + 'type': 'version', + 'parent': subset["_id"], + 'name': version_number + }) + + if existing_version is None: + version_id = legacy_io.insert_one(version).inserted_id + else: + # Check if instance have set `append` mode which cause that + # only replicated representations are set to archive + append_repres = instance.data.get("append", False) + + # Update version data + # TODO query by _id and + legacy_io.update_many({ + 'type': 'version', + 'parent': subset["_id"], + 'name': version_number + }, { + '$set': version + }) + version_id = existing_version['_id'] + + # Find representations of existing version and archive them + current_repres = list(legacy_io.find({ + "type": "representation", + "parent": version_id + })) + bulk_writes = [] + for repre in current_repres: + if append_repres: + # archive only duplicated representations + if repre["name"].lower() not in new_repre_names_low: + continue + # Representation must change type, + # `_id` must be stored to other key and replaced with new + # - that is because new representations should have same ID + repre_id = repre["_id"] + bulk_writes.append(DeleteOne({"_id": repre_id})) + + repre["orig_id"] = repre_id + repre["_id"] = ObjectId() + repre["type"] = "archived_representation" + bulk_writes.append(InsertOne(repre)) + + # bulk updates + if bulk_writes: + project_name = legacy_io.Session["AVALON_PROJECT"] + legacy_io.database[project_name].bulk_write( + bulk_writes + ) + + version = legacy_io.find_one({"_id": version_id}) instance.data["versionEntity"] = version - # Get existing representations (if any) - existing_repres_by_name = { - repres["name"].lower(): repres for repres in legacy_io.find( - { - "parent": version["_id"], - "type": "representation" - }, - # Only care about id and name of existing representations - projection={"_id": True, "name": True} - ) + existing_repres = list(legacy_io.find({ + "parent": version_id, + "type": "archived_representation" + })) + + instance.data['version'] = version['name'] + + intent_value = instance.context.data.get("intent") + if intent_value and isinstance(intent_value, dict): + intent_value = intent_value.get("value") + + if intent_value: + anatomy_data["intent"] = intent_value + + anatomy = instance.context.data['anatomy'] + + # Find the representations to transfer amongst the files + # Each should be a single representation (as such, a single extension) + representations = [] + destination_list = [] + + orig_transfers = [] + if 'transfers' not in instance.data: + instance.data['transfers'] = [] + else: + orig_transfers = list(instance.data['transfers']) + + family = self.main_family_from_instance(instance) + + key_values = { + "families": family, + "tasks": task_name, + "hosts": instance.context.data["hostName"], + "task_types": task_type } - - # Prepare all representations - prepared_representations = [] - for repre in instance.data["representations"]: - - if "delete" in repre.get("tags", []): - self.log.debug("Skipping representation marked for deletion: " - "{}".format(repre)) - continue - - # todo: reduce/simplify what is returned from this function - prepared = self.prepare_representation(repre, - template_name, - existing_repres_by_name, - version, - instance_stagingdir, - instance) - - for src, dst in prepared["transfers"]: - # todo: add support for hardlink transfers - file_transactions.add(src, dst) - - prepared_representations.append(prepared) - - if not prepared_representations: - # Even though we check `instance.data["representations"]` earlier - # this could still happen if all representations were tagged with - # "delete" and thus are skipped for integration - raise RuntimeError("No representations prepared to publish.") - - # Each instance can also have pre-defined transfers not explicitly - # part of a representation - like texture resources used by a - # .ma representation. Those destination paths are pre-defined, etc. - # todo: should we move or simplify this logic? - resource_destinations = set() - for src, dst in instance.data.get("transfers", []): - file_transactions.add(src, dst, mode=FileTransaction.MODE_COPY) - resource_destinations.add(os.path.abspath(dst)) - for src, dst in instance.data.get("hardlinks", []): - file_transactions.add(src, dst, mode=FileTransaction.MODE_HARDLINK) - resource_destinations.add(os.path.abspath(dst)) - - # Bulk write to the database - # We write the subset and version to the database before the File - # Transaction to reduce the chances of another publish trying to - # publish to the same version number since that chance can greatly - # increase if the file transaction takes a long time. - bulk_write(subset_writes + version_writes) - self.log.info("Subset {subset[name]} and Version {version[name]} " - "written to database..".format(subset=subset, - version=version)) - - # Process all file transfers of all integrations now - self.log.debug("Integrating source files to destination ...") - file_transactions.process() - self.log.debug("Backed up existing files: " - "{}".format(file_transactions.backups)) - self.log.debug("Transferred files: " - "{}".format(file_transactions.transferred)) - self.log.debug("Retrieving Representation Site Sync information ...") - - # Get the accessible sites for Site Sync - manager = ModulesManager() - sync_server_module = manager.modules_by_name["sync_server"] - sites = sync_server_module.compute_resource_sync_sites( - project_name=instance.data["projectEntity"]["name"] + profile = filter_profiles( + self.template_name_profiles, + key_values, + logger=self.log ) - self.log.debug("Sync Server Sites: {}".format(sites)) - # Compute the resource file infos once (files belonging to the - # version instance instead of an individual representation) so - # we can re-use those file infos per representation - anatomy = instance.context.data["anatomy"] - resource_file_infos = self.get_files_info(resource_destinations, - sites=sites, - anatomy=anatomy) + template_name = "publish" + if profile: + template_name = profile["template_name"] - # Finalize the representations now the published files are integrated - # Get 'files' info for representations and its attached resources - representation_writes = [] - new_repre_names_low = set() - for prepared in prepared_representations: - representation = prepared["representation"] - transfers = prepared["transfers"] - destinations = [dst for src, dst in transfers] + published_representations = {} + for idx, repre in enumerate(repres): + published_files = [] + + # create template data for Anatomy + template_data = copy.deepcopy(anatomy_data) + if intent_value is not None: + template_data["intent"] = intent_value + + resolution_width = repre.get("resolutionWidth") + resolution_height = repre.get("resolutionHeight") + fps = instance.data.get("fps") + + if resolution_width: + template_data["resolution_width"] = resolution_width + if resolution_width: + template_data["resolution_height"] = resolution_height + if resolution_width: + template_data["fps"] = fps + + if "originalBasename" in instance.data: + template_data.update({ + "originalBasename": instance.data.get("originalBasename") + }) + + files = repre['files'] + if repre.get('stagingDir'): + stagingdir = repre['stagingDir'] + + if repre.get("outputName"): + template_data["output"] = repre['outputName'] + + template_data["representation"] = repre["name"] + + ext = repre["ext"] + if ext.startswith("."): + self.log.warning(( + "Implementaion warning: <\"{}\">" + " Representation's extension stored under \"ext\" key " + " started with dot (\"{}\")." + ).format(repre["name"], ext)) + ext = ext[1:] + repre["ext"] = ext + template_data["ext"] = ext + + self.log.info(template_name) + template = os.path.normpath( + anatomy.templates[template_name]["path"]) + + sequence_repre = isinstance(files, list) + repre_context = None + if sequence_repre: + self.log.debug( + "files: {}".format(files)) + src_collections, remainder = clique.assemble(files) + self.log.debug( + "src_tail_collections: {}".format(str(src_collections))) + src_collection = src_collections[0] + + # Assert that each member has identical suffix + src_head = src_collection.format("{head}") + src_tail = src_collection.format("{tail}") + + # fix dst_padding + valid_files = [x for x in files if src_collection.match(x)] + padd_len = len( + valid_files[0].replace(src_head, "").replace(src_tail, "") + ) + src_padding_exp = "%0{}d".format(padd_len) + + test_dest_files = list() + for i in [1, 2]: + template_data["representation"] = repre['ext'] + if not repre.get("udim"): + template_data["frame"] = src_padding_exp % i + else: + template_data["udim"] = src_padding_exp % i + + anatomy_filled = anatomy.format(template_data) + template_filled = anatomy_filled[template_name]["path"] + if repre_context is None: + repre_context = template_filled.used_values + test_dest_files.append( + os.path.normpath(template_filled) + ) + if not repre.get("udim"): + template_data["frame"] = repre_context["frame"] + else: + template_data["udim"] = repre_context["udim"] + + self.log.debug( + "test_dest_files: {}".format(str(test_dest_files))) + + dst_collections, remainder = clique.assemble(test_dest_files) + dst_collection = dst_collections[0] + dst_head = dst_collection.format("{head}") + dst_tail = dst_collection.format("{tail}") + + index_frame_start = None + + # TODO use frame padding from right template group + if repre.get("frameStart") is not None: + frame_start_padding = int( + anatomy.templates["render"].get( + "frame_padding", + anatomy.templates["render"].get("padding") + ) + ) + + index_frame_start = int(repre.get("frameStart")) + + # exception for slate workflow + if index_frame_start and "slate" in instance.data["families"]: + index_frame_start -= 1 + + dst_padding_exp = src_padding_exp + dst_start_frame = None + collection_start = list(src_collection.indexes)[0] + for i in src_collection.indexes: + # TODO 1.) do not count padding in each index iteration + # 2.) do not count dst_padding from src_padding before + # index_frame_start check + frame_number = i - collection_start + src_padding = src_padding_exp % i + + src_file_name = "{0}{1}{2}".format( + src_head, src_padding, src_tail) + + dst_padding = src_padding_exp % frame_number + + if index_frame_start is not None: + dst_padding_exp = "%0{}d".format(frame_start_padding) + dst_padding = dst_padding_exp % (index_frame_start + frame_number) # noqa: E501 + elif repre.get("udim"): + dst_padding = int(i) + + dst = "{0}{1}{2}".format( + dst_head, + dst_padding, + dst_tail + ) + + self.log.debug("destination: `{}`".format(dst)) + src = os.path.join(stagingdir, src_file_name) + + self.log.debug("source: {}".format(src)) + instance.data["transfers"].append([src, dst]) + + published_files.append(dst) + + # for adding first frame into db + if not dst_start_frame: + dst_start_frame = dst_padding + + # Store used frame value to template data + if repre.get("frame"): + template_data["frame"] = dst_start_frame + + dst = "{0}{1}{2}".format( + dst_head, + dst_start_frame, + dst_tail + ) + repre['published_path'] = dst + + else: + # Single file + # _______ + # | |\ + # | | + # | | + # | | + # |_______| + # + template_data.pop("frame", None) + fname = files + assert not os.path.isabs(fname), ( + "Given file name is a full path" + ) + + template_data["representation"] = repre['ext'] + # Store used frame value to template data + if repre.get("udim"): + template_data["udim"] = repre["udim"][0] + src = os.path.join(stagingdir, fname) + anatomy_filled = anatomy.format(template_data) + template_filled = anatomy_filled[template_name]["path"] + repre_context = template_filled.used_values + dst = os.path.normpath(template_filled) + + instance.data["transfers"].append([src, dst]) + + published_files.append(dst) + repre['published_path'] = dst + self.log.debug("__ dst: {}".format(dst)) + + if not instance.data.get("publishDir"): + instance.data["publishDir"] = ( + anatomy_filled + [template_name] + ["folder"] + ) + if repre.get("udim"): + repre_context["udim"] = repre.get("udim") # store list + + repre["publishedFiles"] = published_files + + for key in self.db_representation_context_keys: + value = template_data.get(key) + if not value: + continue + repre_context[key] = template_data[key] + + # Use previous representation's id if there are any + repre_id = None + repre_name_low = repre["name"].lower() + for _repre in existing_repres: + # NOTE should we check lowered names? + if repre_name_low == _repre["name"]: + repre_id = _repre["orig_id"] + break + + # Create new id if existing representations does not match + if repre_id is None: + repre_id = ObjectId() + + data = repre.get("data") or {} + data.update({'path': dst, 'template': template}) + representation = { + "_id": repre_id, + "schema": "openpype:representation-2.0", + "type": "representation", + "parent": version_id, + "name": repre['name'], + "data": data, + "dependencies": instance.data.get("dependencies", "").split(), + + # Imprint shortcut to context + # for performance reasons. + "context": repre_context + } + + if repre.get("outputName"): + representation["context"]["output"] = repre['outputName'] + + if sequence_repre and repre.get("frameStart") is not None: + representation['context']['frame'] = ( + dst_padding_exp % int(repre.get("frameStart")) + ) + + # any file that should be physically copied is expected in + # 'transfers' or 'hardlinks' + if instance.data.get('transfers', False) or \ + instance.data.get('hardlinks', False): + # could throw exception, will be caught in 'process' + # all integration to DB is being done together lower, + # so no rollback needed + self.log.debug("Integrating source files to destination ...") + self.integrated_file_sizes.update(self.integrate(instance)) + self.log.debug("Integrated files {}". + format(self.integrated_file_sizes)) + + # get 'files' info for representation and all attached resources + self.log.debug("Preparing files information ...") representation["files"] = self.get_files_info( - destinations, sites=sites, anatomy=anatomy - ) + instance, + self.integrated_file_sizes) - # Add the version resource file infos to each representation - representation["files"] += resource_file_infos + self.log.debug("__ representation: {}".format(representation)) + destination_list.append(dst) + self.log.debug("__ destination_list: {}".format(destination_list)) + instance.data['destination_list'] = destination_list + representations.append(representation) + published_representations[repre_id] = { + "representation": representation, + "anatomy_data": template_data, + "published_files": published_files + } + self.log.debug("__ representations: {}".format(representations)) + # reset transfers for next representation + # instance.data['transfers'] is used as a global variable + # in current codebase + instance.data['transfers'] = list(orig_transfers) - # Set up representation for writing to the database. Since - # we *might* be overwriting an existing entry if the version - # already existed we'll use ReplaceOnce with `upsert=True` - representation_writes.append(ReplaceOne( - filter={"_id": representation["_id"]}, - replacement=representation, - upsert=True - )) + # Remove old representations if there are any (before insertion of new) + if existing_repres: + repre_ids_to_remove = [] + for repre in existing_repres: + repre_ids_to_remove.append(repre["_id"]) + legacy_io.delete_many({"_id": {"$in": repre_ids_to_remove}}) - new_repre_names_low.add(representation["name"].lower()) + for rep in instance.data["representations"]: + self.log.debug("__ rep: {}".format(rep)) - # Delete any existing representations that didn't get any new data - # if the instance is not set to append mode - if not instance.data.get("append", False): - delete_names = set() - for name, existing_repres in existing_repres_by_name.items(): - if name not in new_repre_names_low: - # We add the exact representation name because `name` is - # lowercase for name matching only and not in the database - delete_names.add(existing_repres["name"]) - if delete_names: - representation_writes.append(DeleteMany( - filter={ - "parent": version["_id"], - "name": {"$in": list(delete_names)} - } - )) + legacy_io.insert_many(representations) + instance.data["published_representations"] = ( + published_representations + ) + # self.log.debug("Representation: {}".format(representations)) + self.log.info("Registered {} items".format(len(representations))) - # Write representations to the database - bulk_write(representation_writes) + def integrate(self, instance): + """ Move the files. - # Backwards compatibility - # todo: can we avoid the need to store this? - instance.data["published_representations"] = { - p["representation"]["_id"]: p for p in prepared_representations - } + Through `instance.data["transfers"]` - self.log.info("Registered {} representations" - "".format(len(prepared_representations))) + Args: + instance: the instance to integrate + Returns: + integrated_file_sizes: dictionary of destination file url and + its size in bytes + """ + # store destination url and size for reporting and rollback + integrated_file_sizes = {} + transfers = list(instance.data.get("transfers", list())) + for src, dest in transfers: + if os.path.normpath(src) != os.path.normpath(dest): + dest = self.get_dest_temp_url(dest) + self.copy_file(src, dest) + # TODO needs to be updated during site implementation + integrated_file_sizes[dest] = os.path.getsize(dest) - def prepare_subset(self, instance): - asset = instance.data.get("assetEntity") + # Produce hardlinked copies + # Note: hardlink can only be produced between two files on the same + # server/disk and editing one of the two will edit both files at once. + # As such it is recommended to only make hardlinks between static files + # to ensure publishes remain safe and non-edited. + hardlinks = instance.data.get("hardlinks", list()) + for src, dest in hardlinks: + dest = self.get_dest_temp_url(dest) + self.log.debug("Hardlinking file ... {} -> {}".format(src, dest)) + if not os.path.exists(dest): + self.hardlink_file(src, dest) + + # TODO needs to be updated during site implementation + integrated_file_sizes[dest] = os.path.getsize(dest) + + return integrated_file_sizes + + def copy_file(self, src, dst): + """ Copy given source to destination + + Arguments: + src (str): the source file which needs to be copied + dst (str): the destination of the sourc file + Returns: + None + """ + src = os.path.normpath(src) + dst = os.path.normpath(dst) + self.log.debug("Copying file ... {} -> {}".format(src, dst)) + dirname = os.path.dirname(dst) + try: + os.makedirs(dirname) + except OSError as e: + if e.errno == errno.EEXIST: + pass + else: + self.log.critical("An unexpected error occurred.") + six.reraise(*sys.exc_info()) + + # copy file with speedcopy and check if size of files are simetrical + while True: + if not shutil._samefile(src, dst): + copyfile(src, dst) + else: + self.log.critical( + "files are the same {} to {}".format(src, dst) + ) + os.remove(dst) + try: + shutil.copyfile(src, dst) + self.log.debug("Copying files with shutil...") + except OSError as e: + self.log.critical("Cannot copy {} to {}".format(src, dst)) + self.log.critical(e) + six.reraise(*sys.exc_info()) + if str(getsize(src)) in str(getsize(dst)): + break + + def hardlink_file(self, src, dst): + dirname = os.path.dirname(dst) + + try: + os.makedirs(dirname) + except OSError as e: + if e.errno == errno.EEXIST: + pass + else: + self.log.critical("An unexpected error occurred.") + six.reraise(*sys.exc_info()) + + create_hard_link(src, dst) + + def get_subset(self, asset, instance): subset_name = instance.data["subset"] - self.log.debug("Subset: {}".format(subset_name)) - - # Get existing subset if it exists subset = legacy_io.find_one({ "type": "subset", "parent": asset["_id"], "name": subset_name }) - # Define subset data - data = { - "families": get_instance_families(instance) - } - - subset_group = instance.data.get("subsetGroup") - if subset_group: - data["subsetGroup"] = subset_group - - bulk_writes = [] if subset is None: - # Create a new subset self.log.info("Subset '%s' not found, creating ..." % subset_name) - subset = { - "_id": ObjectId(), + self.log.debug("families. %s" % instance.data.get('families')) + self.log.debug( + "families. %s" % type(instance.data.get('families'))) + + family = instance.data.get("family") + families = [] + if family: + families.append(family) + + for _family in (instance.data.get("families") or []): + if _family not in families: + families.append(_family) + + _id = legacy_io.insert_one({ "schema": "openpype:subset-3.0", "type": "subset", "name": subset_name, - "data": data, + "data": { + "families": families + }, "parent": asset["_id"] - } - bulk_writes.append(InsertOne(subset)) + }).inserted_id - else: - # Update existing subset data with new data and set in database. - # We also change the found subset in-place so we don't need to - # re-query the subset afterwards - subset["data"].update(data) - bulk_writes.append(UpdateOne( - {"type": "subset", "_id": subset["_id"]}, - {"$set": { - "data": subset["data"] - }} - )) + subset = legacy_io.find_one({"_id": _id}) - self.log.info("Prepared subset: {}".format(subset_name)) - return subset, bulk_writes + # QUESTION Why is changing of group and updating it's + # families in 'get_subset'? + self._set_subset_group(instance, subset["_id"]) - def prepare_version(self, instance, subset): + # Update families on subset. + families = [instance.data["family"]] + families.extend(instance.data.get("families", [])) + legacy_io.update_many( + {"type": "subset", "_id": ObjectId(subset["_id"])}, + {"$set": {"data.families": families}} + ) - version_number = instance.data["version"] + return subset - version = { - "schema": "openpype:version-3.0", - "type": "version", - "parent": subset["_id"], - "name": version_number, - "data": self.create_version_data(instance) + def _set_subset_group(self, instance, subset_id): + """ + Mark subset as belonging to group in DB. + + Uses Settings > Global > Publish plugins > IntegrateAssetNew + + Args: + instance (dict): processed instance + subset_id (str): DB's subset _id + + """ + # Fist look into instance data + subset_group = instance.data.get("subsetGroup") + if not subset_group: + subset_group = self._get_subset_group(instance) + + if subset_group: + legacy_io.update_many({ + 'type': 'subset', + '_id': ObjectId(subset_id) + }, {'$set': {'data.subsetGroup': subset_group}}) + + def _get_subset_group(self, instance): + """Look into subset group profiles set by settings. + + Attribute 'subset_grouping_profiles' is defined by OpenPype settings. + """ + # Skip if 'subset_grouping_profiles' is empty + if not self.subset_grouping_profiles: + return None + + # QUESTION + # - is there a chance that task name is not filled in anatomy + # data? + # - should we use context task in that case? + anatomy_data = instance.data["anatomyData"] + task_name = None + task_type = None + if "task" in anatomy_data: + task_name = anatomy_data["task"]["name"] + task_type = anatomy_data["task"]["type"] + filtering_criteria = { + "families": instance.data["family"], + "hosts": instance.context.data["hostName"], + "tasks": task_name, + "task_types": task_type } + matching_profile = filter_profiles( + self.subset_grouping_profiles, + filtering_criteria + ) + # Skip if there is not matchin profile + if not matching_profile: + return None - existing_version = legacy_io.find_one({ - 'type': 'version', - 'parent': subset["_id"], - 'name': version_number - }, projection={"_id": True}) + filled_template = None + template = matching_profile["template"] + fill_pairs = ( + ("family", filtering_criteria["families"]), + ("task", filtering_criteria["tasks"]), + ("host", filtering_criteria["hosts"]), + ("subset", instance.data["subset"]), + ("renderlayer", instance.data.get("renderlayer")) + ) + fill_pairs = prepare_template_data(fill_pairs) - if existing_version: - self.log.debug("Updating existing version ...") - version["_id"] = existing_version["_id"] - else: - self.log.debug("Creating new version ...") - version["_id"] = ObjectId() - - bulk_writes = [ReplaceOne( - filter={"_id": version["_id"]}, - replacement=version, - upsert=True - )] - - self.log.info("Prepared version: v{0:03d}".format(version["name"])) - - return version, bulk_writes - - def prepare_representation(self, repre, - template_name, - existing_repres_by_name, - version, - instance_stagingdir, - instance): - - # pre-flight validations - if repre["ext"].startswith("."): - raise ValueError("Extension must not start with a dot '.': " - "{}".format(repre["ext"])) - - if repre.get("transfers"): - raise ValueError("Representation is not allowed to have transfers" - "data before integration. They are computed in " - "the integrator" - "Got: {}".format(repre["transfers"])) - - # create template data for Anatomy - template_data = copy.deepcopy(instance.data["anatomyData"]) - - # required representation keys - files = repre['files'] - template_data["representation"] = repre["name"] - template_data["ext"] = repre["ext"] - - # optionals - # retrieve additional anatomy data from representation if exists - for key, anatomy_key in { - # Representation Key: Anatomy data key - "resolutionWidth": "resolution_width", - "resolutionHeight": "resolution_height", - "fps": "fps", - "outputName": "output", - "originalBasename": "originalBasename" - }.items(): - # Allow to take value from representation - # if not found also consider instance.data - if key in repre: - value = repre[key] - elif key in instance.data: - value = instance.data[key] - else: - continue - template_data[anatomy_key] = value - - if repre.get('stagingDir'): - stagingdir = repre['stagingDir'] - else: - # Fall back to instance staging dir if not explicitly - # set for representation in the instance - self.log.debug("Representation uses instance staging dir: " - "{}".format(instance_stagingdir)) - stagingdir = instance_stagingdir - if not stagingdir: - raise ValueError("No staging directory set for representation: " - "{}".format(repre)) - - self.log.debug("Anatomy template name: {}".format(template_name)) - anatomy = instance.context.data['anatomy'] - template = os.path.normpath(anatomy.templates[template_name]["path"]) - - is_udim = bool(repre.get("udim")) - is_sequence_representation = isinstance(files, (list, tuple)) - if is_sequence_representation: - # Collection of files (sequence) - assert not any(os.path.isabs(fname) for fname in files), ( - "Given file names contain full paths" + try: + filled_template = StringTemplate.format_strict_template( + template, fill_pairs ) + except (KeyError, TemplateUnsolved): + keys = [] + if fill_pairs: + keys = fill_pairs.keys() - src_collection = assemble(files) + msg = "Subset grouping failed. " \ + "Only {} are expected in Settings".format(','.join(keys)) + self.log.warning(msg) - # If the representation has `frameStart` set it renumbers the - # frame indices of the published collection. It will start from - # that `frameStart` index instead. Thus if that frame start - # differs from the collection we want to shift the destination - # frame indices from the source collection. - destination_indexes = list(src_collection.indexes) - destination_padding = len(get_first_frame_padded(src_collection)) - if repre.get("frameStart") is not None and not is_udim: - index_frame_start = int(repre.get("frameStart")) + return filled_template - render_template = anatomy.templates[template_name] - # todo: should we ALWAYS manage the frame padding even when not - # having `frameStart` set? - frame_start_padding = int( - render_template.get( - "frame_padding", - render_template.get("padding") - ) - ) - - # Shift destination sequence to the start frame - src_start_frame = next(iter(src_collection.indexes)) - shift = index_frame_start - src_start_frame - if shift: - destination_indexes = [ - frame + shift for frame in destination_indexes - ] - destination_padding = frame_start_padding - - # To construct the destination template with anatomy we require - # a Frame or UDIM tile set for the template data. We use the first - # index of the destination for that because that could've shifted - # from the source indexes, etc. - first_index_padded = get_frame_padded(frame=destination_indexes[0], - padding=destination_padding) - if is_udim: - # UDIM representations handle ranges in a different manner - template_data["udim"] = first_index_padded - else: - template_data["frame"] = first_index_padded - - # Construct destination collection from template - anatomy_filled = anatomy.format(template_data) - template_filled = anatomy_filled[template_name]["path"] - repre_context = template_filled.used_values - self.log.debug("Template filled: {}".format(str(template_filled))) - dst_collection = assemble([os.path.normpath(template_filled)]) - - # Update the destination indexes and padding - dst_collection.indexes.clear() - dst_collection.indexes.update(set(destination_indexes)) - dst_collection.padding = destination_padding - assert ( - len(src_collection.indexes) == len(dst_collection.indexes) - ), "This is a bug" - - # Multiple file transfers - transfers = [] - for src_file_name, dst in zip(src_collection, dst_collection): - src = os.path.join(stagingdir, src_file_name) - transfers.append((src, dst)) - - else: - # Single file - fname = files - assert not os.path.isabs(fname), ( - "Given file name is a full path" - ) - - # Manage anatomy template data - template_data.pop("frame", None) - if is_udim: - template_data["udim"] = repre["udim"][0] - - # Construct destination filepath from template - anatomy_filled = anatomy.format(template_data) - template_filled = anatomy_filled[template_name]["path"] - repre_context = template_filled.used_values - dst = os.path.normpath(template_filled) - - # Single file transfer - src = os.path.join(stagingdir, fname) - transfers = [(src, dst)] - - # todo: Are we sure the assumption each representation - # ends up in the same folder is valid? - if not instance.data.get("publishDir"): - instance.data["publishDir"] = ( - anatomy_filled - [template_name] - ["folder"] - ) - - for key in self.db_representation_context_keys: - # Also add these values to the context even if not used by the - # destination template - value = template_data.get(key) - if not value: - continue - repre_context[key] = template_data[key] - - # Explicitly store the full list even though template data might - # have a different value because it uses just a single udim tile - if repre.get("udim"): - repre_context["udim"] = repre.get("udim") # store list - - # Use previous representation's id if there is a name match - existing = existing_repres_by_name.get(repre["name"].lower()) - if existing: - repre_id = existing["_id"] - else: - repre_id = ObjectId() - - # Backwards compatibility: - # Store first transferred destination as published path data - # todo: can we remove this? - # todo: We shouldn't change data that makes its way back into - # instance.data[] until we know the publish actually succeeded - # otherwise `published_path` might not actually be valid? - published_path = transfers[0][1] - repre["published_path"] = published_path # Backwards compatibility - - # todo: `repre` is not the actual `representation` entity - # we should simplify/clarify difference between data above - # and the actual representation entity for the database - data = repre.get("data", {}) - data.update({'path': published_path, 'template': template}) - representation = { - "_id": repre_id, - "schema": "openpype:representation-2.0", - "type": "representation", - "parent": version["_id"], - "name": repre['name'], - "data": data, - - # Imprint shortcut to context for performance reasons. - "context": repre_context - } - - # todo: simplify/streamline which additional data makes its way into - # the representation context - if repre.get("outputName"): - representation["context"]["output"] = repre['outputName'] - - if is_sequence_representation and repre.get("frameStart") is not None: - representation['context']['frame'] = template_data["frame"] - - return { - "representation": representation, - "anatomy_data": template_data, - "transfers": transfers, - # todo: avoid the need for 'published_files' used by Integrate Hero - # backwards compatibility - "published_files": [transfer[1] for transfer in transfers] - } - - def create_version_data(self, instance): - """Create the data dictionary for the version + def create_version(self, subset, version_number, data=None): + """ Copy given source to destination Args: + subset (dict): the registered subset of the asset + version_number (int): the version number + + Returns: + dict: collection of data to create a version + """ + + return {"schema": "openpype:version-3.0", + "type": "version", + "parent": subset["_id"], + "name": version_number, + "data": data} + + def create_version_data(self, context, instance): + """Create the data collection for the version + + Args: + context: the current context instance: the current instance being published Returns: - dict: the required information for version["data"] + dict: the required information with instance.data as key """ - context = instance.context + families = [] + current_families = instance.data.get("families", list()) + instance_family = instance.data.get("family", None) + + if instance_family is not None: + families.append(instance_family) + families += current_families # create relative source path for DB - if "source" in instance.data: - source = instance.data["source"] - else: + source = instance.data.get("source") + if not source: source = context.data["currentFile"] anatomy = instance.context.data["anatomy"] source = self.get_rootless_path(anatomy, source) - self.log.debug("Source: {}".format(source)) + self.log.debug("Source: {}".format(source)) version_data = { - "families": get_instance_families(instance), + "families": families, "time": context.data["time"], "author": context.data["user"], "source": source, "comment": context.data.get("comment"), "machine": context.data.get("machine"), - "fps": instance.data.get("fps", context.data.get("fps")) + "fps": context.data.get( + "fps", instance.data.get("fps") + ) } - # todo: preferably we wouldn't need this "if dict" etc. logic and - # instead be able to rely what the input value is if it's set. - intent_value = context.data.get("intent") + intent_value = instance.context.data.get("intent") if intent_value and isinstance(intent_value, dict): intent_value = intent_value.get("value") @@ -732,58 +994,33 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): if key in instance.data: version_data[key] = instance.data[key] - # Include instance.data[versionData] directly - version_data_instance = instance.data.get('versionData') - if version_data_instance: - version_data.update(version_data_instance) - return version_data - def get_template_name(self, instance): - """Return anatomy template name to use for integration""" - # Define publish template name from profiles - filter_criteria = self.get_profile_filter_criteria(instance) - profile = filter_profiles(self.template_name_profiles, - filter_criteria, - logger=self.log) - if profile: - return profile["template_name"] - else: - return self.default_template_name - - def get_profile_filter_criteria(self, instance): - """Return filter criteria for `filter_profiles`""" - # Anatomy data is pre-filled by Collectors - anatomy_data = instance.data["anatomyData"] - - # Task can be optional in anatomy data - task = anatomy_data.get("task", {}) - - # Return filter criteria - return { - "families": anatomy_data["family"], - "tasks": task.get("name"), - "hosts": anatomy_data["app"], - "task_types": task.get("type") - } + def main_family_from_instance(self, instance): + """Returns main family of entered instance.""" + family = instance.data.get("family") + if not family: + family = instance.data["families"][0] + return family def get_rootless_path(self, anatomy, path): - """Returns, if possible, path without absolute portion from root - (eg. 'c:\' or '/opt/..') - - This information is platform dependent and shouldn't be captured. - Example: - 'c:/projects/MyProject1/Assets/publish...' > - '{root}/MyProject1/Assets...' + """ Returns, if possible, path without absolute portion from host + (eg. 'c:\' or '/opt/..') + This information is host dependent and shouldn't be captured. + Example: + 'c:/projects/MyProject1/Assets/publish...' > + '{root}/MyProject1/Assets...' Args: - anatomy: anatomy part from instance - path: path (absolute) + anatomy: anatomy part from instance + path: path (absolute) Returns: - path: modified path if possible, or unmodified path - + warning logged + path: modified path if possible, or unmodified path + + warning logged """ - success, rootless_path = anatomy.find_root_template_from_path(path) + success, rootless_path = ( + anatomy.find_root_template_from_path(path) + ) if success: path = rootless_path else: @@ -793,40 +1030,269 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): ).format(path)) return path - def get_files_info(self, destinations, sites, anatomy): - """Prepare 'files' info portion for representations. + def get_files_info(self, instance, integrated_file_sizes): + """ Prepare 'files' portion for attached resources and main asset. + Combining records from 'transfers' and 'hardlinks' parts from + instance. + All attached resources should be added, currently without + Context info. Arguments: - destinations (list): List of transferred file destinations - sites (list): array of published locations - anatomy: anatomy part from instance + instance: the current instance being published + integrated_file_sizes: dictionary of destination path (absolute) + and its file size Returns: output_resources: array of dictionaries to be added to 'files' key in representation """ - file_infos = [] - for file_path in destinations: - file_info = self.prepare_file_info(file_path, anatomy, sites=sites) - file_infos.append(file_info) - return file_infos + resources = list(instance.data.get("transfers", [])) + resources.extend(list(instance.data.get("hardlinks", []))) - def prepare_file_info(self, path, anatomy, sites): + self.log.debug("get_resource_files_info.resources:{}". + format(resources)) + + output_resources = [] + anatomy = instance.context.data["anatomy"] + for _src, dest in resources: + path = self.get_rootless_path(anatomy, dest) + dest = self.get_dest_temp_url(dest) + file_hash = openpype.api.source_hash(dest) + if self.TMP_FILE_EXT and \ + ',{}'.format(self.TMP_FILE_EXT) in file_hash: + file_hash = file_hash.replace(',{}'.format(self.TMP_FILE_EXT), + '') + + file_info = self.prepare_file_info(path, + integrated_file_sizes[dest], + file_hash, + instance=instance) + output_resources.append(file_info) + + return output_resources + + def get_dest_temp_url(self, dest): + """ Enhance destination path with TMP_FILE_EXT to denote temporary + file. + Temporary files will be renamed after successful registration + into DB and full copy to destination + + Arguments: + dest: destination url of published file (absolute) + Returns: + dest: destination path + '.TMP_FILE_EXT' + """ + if self.TMP_FILE_EXT and '.{}'.format(self.TMP_FILE_EXT) not in dest: + dest += '.{}'.format(self.TMP_FILE_EXT) + return dest + + def prepare_file_info(self, path, size=None, file_hash=None, + sites=None, instance=None): """ Prepare information for one file (asset or resource) Arguments: - path: destination url of published file - anatomy: anatomy part from instance - sites: array of published locations, - [ {'name':'studio', 'created_dt':date} by default - keys expected ['studio', 'site1', 'gdrive1'] - + path: destination url of published file (rootless) + size(optional): size of file in bytes + file_hash(optional): hash of file for synchronization validation + sites(optional): array of published locations, + [ {'name':'studio', 'created_dt':date} by default + keys expected ['studio', 'site1', 'gdrive1'] + instance(dict, optional): to get collected settings Returns: - dict: file info dictionary + rec: dictionary with filled info """ - return { + local_site = 'studio' # default + remote_site = None + always_accesible = [] + sync_project_presets = None + + rec = { "_id": ObjectId(), - "path": self.get_rootless_path(anatomy, path), - "size": os.path.getsize(path), - "hash": openpype.api.source_hash(path), - "sites": sites + "path": path } + if size: + rec["size"] = size + + if file_hash: + rec["hash"] = file_hash + + if sites: + rec["sites"] = sites + else: + system_sync_server_presets = ( + instance.context.data["system_settings"] + ["modules"] + ["sync_server"]) + log.debug("system_sett:: {}".format(system_sync_server_presets)) + + if system_sync_server_presets["enabled"]: + sync_project_presets = ( + instance.context.data["project_settings"] + ["global"] + ["sync_server"]) + + if sync_project_presets and sync_project_presets["enabled"]: + local_site, remote_site = self._get_sites(sync_project_presets) + + always_accesible = sync_project_presets["config"]. \ + get("always_accessible_on", []) + + already_attached_sites = {} + meta = {"name": local_site, "created_dt": datetime.now()} + rec["sites"] = [meta] + already_attached_sites[meta["name"]] = meta["created_dt"] + + if sync_project_presets and sync_project_presets["enabled"]: + if remote_site and \ + remote_site not in already_attached_sites.keys(): + # add remote + meta = {"name": remote_site.strip()} + rec["sites"].append(meta) + already_attached_sites[meta["name"]] = None + + # add alternative sites + rec, already_attached_sites = self._add_alternative_sites( + system_sync_server_presets, already_attached_sites, rec) + + # add skeleton for site where it should be always synced to + for always_on_site in set(always_accesible): + if always_on_site not in already_attached_sites.keys(): + meta = {"name": always_on_site.strip()} + rec["sites"].append(meta) + already_attached_sites[meta["name"]] = None + + log.debug("final sites:: {}".format(rec["sites"])) + + return rec + + def _get_sites(self, sync_project_presets): + """Returns tuple (local_site, remote_site)""" + local_site_id = openpype.api.get_local_site_id() + local_site = sync_project_presets["config"]. \ + get("active_site", "studio").strip() + + if local_site == 'local': + local_site = local_site_id + + remote_site = sync_project_presets["config"].get("remote_site") + + if remote_site == 'local': + remote_site = local_site_id + + return local_site, remote_site + + def _add_alternative_sites(self, + system_sync_server_presets, + already_attached_sites, + rec): + """Loop through all configured sites and add alternatives. + + See SyncServerModule.handle_alternate_site + """ + conf_sites = system_sync_server_presets.get("sites", {}) + + alt_site_pairs = self._get_alt_site_pairs(conf_sites) + + already_attached_keys = list(already_attached_sites.keys()) + for added_site in already_attached_keys: + real_created = already_attached_sites[added_site] + for alt_site in alt_site_pairs.get(added_site, []): + if alt_site in already_attached_sites.keys(): + continue + meta = {"name": alt_site} + # alt site inherits state of 'created_dt' + if real_created: + meta["created_dt"] = real_created + rec["sites"].append(meta) + already_attached_sites[meta["name"]] = real_created + + return rec, already_attached_sites + + def _get_alt_site_pairs(self, conf_sites): + """Returns dict of site and its alternative sites. + + If `site` has alternative site, it means that alt_site has 'site' as + alternative site + Args: + conf_sites (dict) + Returns: + (dict): {'site': [alternative sites]...} + """ + alt_site_pairs = defaultdict(list) + for site_name, site_info in conf_sites.items(): + alt_sites = set(site_info.get("alternative_sites", [])) + alt_site_pairs[site_name].extend(alt_sites) + + for alt_site in alt_sites: + alt_site_pairs[alt_site].append(site_name) + + for site_name, alt_sites in alt_site_pairs.items(): + sites_queue = deque(alt_sites) + while sites_queue: + alt_site = sites_queue.popleft() + + # safety against wrong config + # {"SFTP": {"alternative_site": "SFTP"} + if alt_site == site_name or alt_site not in alt_site_pairs: + continue + + for alt_alt_site in alt_site_pairs[alt_site]: + if ( + alt_alt_site != site_name + and alt_alt_site not in alt_sites + ): + alt_sites.append(alt_alt_site) + sites_queue.append(alt_alt_site) + + return alt_site_pairs + + def handle_destination_files(self, integrated_file_sizes, mode): + """ Clean destination files + Called when error happened during integrating to DB or to disk + OR called to rename uploaded files from temporary name to final to + highlight publishing in progress/broken + Used to clean unwanted files + + Arguments: + integrated_file_sizes: dictionary, file urls as keys, size as value + mode: 'remove' - clean files, + 'finalize' - rename files, + remove TMP_FILE_EXT suffix denoting temp file + """ + if integrated_file_sizes: + for file_url, _file_size in integrated_file_sizes.items(): + if not os.path.exists(file_url): + self.log.debug( + "File {} was not found.".format(file_url) + ) + continue + + try: + if mode == 'remove': + self.log.debug("Removing file {}".format(file_url)) + os.remove(file_url) + if mode == 'finalize': + new_name = re.sub( + r'\.{}$'.format(self.TMP_FILE_EXT), + '', + file_url + ) + + if os.path.exists(new_name): + self.log.debug( + "Overwriting file {} to {}".format( + file_url, new_name + ) + ) + shutil.copy(file_url, new_name) + os.remove(file_url) + else: + self.log.debug( + "Renaming file {} to {}".format( + file_url, new_name + ) + ) + os.rename(file_url, new_name) + except OSError: + self.log.error("Cannot {} file {}".format(mode, file_url), + exc_info=True) + six.reraise(*sys.exc_info())