From 91c3f479239779da46cae5349b06b2408df83efd Mon Sep 17 00:00:00 2001 From: "petr.kalis" Date: Thu, 11 Jun 2020 18:32:36 +0200 Subject: [PATCH] Added creation of temporary files in destination first Fix - integrated_file_sizes got carried over Fix - create orig_transfers to fall back to original transfers Fix - updated get_files_info logic - using transfers and hardlinks Fix - added rootless_path into representation Fix - updated file handling if errors --- pype/plugins/global/publish/integrate_new.py | 182 +++++++++++-------- 1 file changed, 110 insertions(+), 72 deletions(-) diff --git a/pype/plugins/global/publish/integrate_new.py b/pype/plugins/global/publish/integrate_new.py index 740de4d930..3f906e3455 100644 --- a/pype/plugins/global/publish/integrate_new.py +++ b/pype/plugins/global/publish/integrate_new.py @@ -91,28 +91,30 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): default_template_name = "publish" template_name_profiles = None - integrated_file_sizes = {} # file_url : file_size + integrated_file_sizes = {} # file_url : file_size of all published and uploaded files + + TMP_FILE_EXT = 'tmp' # suffix to denote temporary files, use without '.' def process(self, instance): - + self.integrated_file_sizes = {} if [ef for ef in self.exclude_families if instance.data["family"] in ef]: return self.log.info("IntegrateAssetNew.process:") import json - self.log.info("instance: {}".format(json.dumps(instance.__dict__, default=str))) + self.log.debug("instance: {}".format(json.dumps(instance.__dict__, default=str))) try: self.register(instance) self.log.info("Integrated Asset in to the database ...") self.log.info("instance.data: {}".format(instance.data)) - except Exception: + self.handle_destination_files(self.integrated_file_sizes, instance, 'finalize') + except Exception as e: # clean destination - # !TODO fix exceptions.WindowsError - e = sys.exc_info()[0] - self.log.critical("Error when registering {}".format(e)) - self.clean_destination_files(self.integrated_file_sizes) + self.log.critical("Error when registering", exc_info=True) + self.handle_destination_files(self.integrated_file_sizes, instance, 'remove') + raise def register(self, instance): # Required environment variables @@ -273,13 +275,20 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): representations = [] destination_list = [] + orig_transfers = [] if 'transfers' not in instance.data: instance.data['transfers'] = [] + else: + orig_transfers = list(instance.data['transfers']) template_name = self.template_name_from_instance(instance) published_representations = {} for idx, repre in enumerate(instance.data["representations"]): + # reset transfers for next representation + # instance.data['transfers'] is used as a global variable in current codebase + instance.data['transfers'] = list(orig_transfers) + published_files = [] # create template data for Anatomy @@ -482,17 +491,22 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): dst_padding_exp % int(repre.get("frameStart")) ) - if instance.data.get('transfer', True): + # any file that should be physically copied is expected in 'transfers' or 'hardlinks' + # both have same interface [[source_url, destination_url], [source_url...]] + if instance.data.get('transfers', False) or instance.data.get('hardlinks', False): # could throw exception, will be caught in 'process' # all integration to DB is being done together lower, so no rollback needed - self.log.info("Integrating source files to destination ...") - self.integrated_file_sizes = self.integrate(instance) + self.log.debug("Integrating source files to destination ...") + self.integrated_file_sizes.update(self.integrate(instance)) self.log.debug("Integrated files {}".format(self.integrated_file_sizes)) - #TODO instance.data["transfers"].remove([src, dst]) # array needs to be changed to tuple + + import random + if random.choice([True, False, True, True]): + raise Exception("Monkey attack!!!") # get 'files' information for representation and all attached resources self.log.debug("Preparing files information ..") - representation["files"] = self.get_files_info(dst, instance, self.integrated_file_sizes) + representation["files"] = self.get_files_info(instance, self.integrated_file_sizes) self.log.debug("__ representation: {}".format(representation)) destination_list.append(dst) @@ -535,20 +549,13 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): integrated_file_sizes: dictionary of destination file url and its size in bytes """ integrated_file_sizes = {} # store destination url and size for reporting and rollback - transfers = instance.data.get("transfers", list()) - + transfers = list(instance.data.get("transfers", list())) for src, dest in transfers: if os.path.normpath(src) != os.path.normpath(dest): + dest = self.get_dest_temp_url(dest) self.copy_file(src, dest) - - transfers = instance.data.get("transfers", list()) - for src, dest in transfers: - self.copy_file(src, dest) - # TODO needs to be updated during site implementation - integrated_file_sizes[dest] = os.path.getsize(dest) - # already copied, delete from transfers to limit double copy TODO double check - instance.data.get("transfers", list()).remove([src, dest]) - + # TODO needs to be updated during site implementation + integrated_file_sizes[dest] = os.path.getsize(dest) # Produce hardlinked copies # Note: hardlink can only be produced between two files on the same @@ -557,6 +564,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): # to ensure publishes remain safe and non-edited. hardlinks = instance.data.get("hardlinks", list()) for src, dest in hardlinks: + dest = self.get_dest_temp_url(dest) self.log.debug("Hardlinking file .. {} -> {}".format(src, dest)) if not os.path.exists(dest): self.hardlink_file(src, dest) @@ -692,16 +700,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): else: source = context.data["currentFile"] anatomy = instance.context.data["anatomy"] - success, rootless_path = ( - anatomy.find_root_template_from_path(source) - ) - if success: - source = rootless_path - else: - self.log.warning(( - "Could not find root path for remapping \"{}\"." - " This may cause issues on farm." - ).format(source)) + source = self.get_rootless_path(anatomy, source) self.log.debug("Source: {}".format(source)) version_data = { @@ -801,55 +800,75 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): return template_name - def get_files_info(self, dst, instance, integrated_file_sizes): - """ Prepare 'files' portion of representation to store all attached files (representatio, textures, json etc.) - This information is used in synchronization of mentioned files - Args: - dst: destination path for representation - instance: the instance to integrate - Returns: - files: list of dictionaries with representation and all resources + def get_rootless_path(self, anatomy, path): + """ Returns, if possible, path without absolute portion from host (eg. 'c:\' or '/opt/..') + This information is host dependent and shouldn't be captured. + Example: + 'c:/projects/MyProject1/Assets/publish...' > '{root}/MyProject1/Assets...' + + Args: + anatomy: anatomy part from instance + path: path (absolute) + Returns: + path: modified path if possible, or unmodified path + warning logged """ - self.log.debug("get_files_info:") - # add asset and all resources to 'files' - files = [] - # file info for representation - should be always - rec = self.prepare_file_info(dst, integrated_file_sizes[dst], 'temphash') # TODO - files.append(rec) + success, rootless_path = ( + anatomy.find_root_template_from_path(path) + ) + if success: + path = rootless_path + else: + self.log.warning(( + "Could not find root path for remapping \"{}\"." + " This may cause issues on farm." + ).format(path)) + return path - # file info for resources - resource_files = self.get_resource_files_info(instance, integrated_file_sizes) - if resource_files: # do not append empty list - files.extend(resource_files) - - return files - - def get_resource_files_info(self, instance, integrated_file_sizes): - """ Prepare 'files' portion for attached resources, - could be empty if no resources (like textures) present + def get_files_info(self, instance, integrated_file_sizes): + """ Prepare 'files' portion for attached resources and main asset. + Combining records from 'transfers' and 'hardlinks' parts from instance. + All attached resources should be added, currently without Context info. Arguments: instance: the current instance being published + integrated_file_sizes: dictionary of destination path (absolute) and its file size Returns: output_resources: array of dictionaries to be added to 'files' key in representation """ - # TODO check if sourceHashes is viable or transfers (they are updated during loop though) - resources = instance.data.get("sourceHashes", {}) - self.log.debug("get_resource_files_info: {}".format(resources)) + resources = list(instance.data.get("transfers", [])) + resources.extend(list(instance.data.get("hardlinks", []))) + + self.log.debug("get_resource_files_info.resources: {}".format(resources)) + output_resources = [] - for resource_info, resource_path in resources.items(): + anatomy = instance.context.data["anatomy"] + for src, dest in resources: # TODO - hash or use self.integrated_file_size - file_name,file_time,file_size,file_args = resource_info.split("|") - output_resources.append(self.prepare_file_info(resource_path, file_size, 'temphash')) + path = self.get_rootless_path(anatomy, dest) + dest = self.get_dest_temp_url(dest) + output_resources.append(self.prepare_file_info(path, integrated_file_sizes[dest], 'temphash')) return output_resources + def get_dest_temp_url(self, dest): + """ Enhance destination path with TMP_FILE_EXT to denote temporary file. + Temporary files will be renamed after successful registration into DB and full copy to destination + + Arguments: + dest: destination url of published file (absolute) + Returns: + dest: destination path + '.TMP_FILE_EXT' + """ + if self.TMP_FILE_EXT and '.{}'.format(self.TMP_FILE_EXT) not in dest: + dest += '.{}'.format(self.TMP_FILE_EXT) + return dest + def prepare_file_info(self, path, size = None, hash = None, sites = None): """ Prepare information for one file (asset or resource) Arguments: - path: destination url of published file + path: destination url of published file (rootless) size(optional): size of file in bytes hash(optional): hash of file for synchronization validation sites(optional): array of published locations, ['studio'] by default, @@ -873,25 +892,44 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin): else: rec["sites"] = ["studio"] - self.log.debug("prepare_file_info: {}".format(rec)) - return rec - def clean_destination_files(self, integrated_file_sizes): + def handle_destination_files(self, integrated_file_sizes, instance, mode): """ Clean destination files Called when error happened during integrating to DB or to disk Used to clean unwanted files Arguments: integrated_file_sizes: disctionary of uploaded files, file urls as keys + instance: processed instance - for publish directories + mode: 'remove' - clean files,'finalize' - rename files, remove TMP_FILE_EXT suffix denoting temp file """ if integrated_file_sizes: for file_url, file_size in integrated_file_sizes.items(): try: - self.log.debug("Removing file...{}".format(file_url)) - os.remove(file_url) # needs to be changed to Factory when sites implemented + if mode == 'remove': + self.log.debug("Removing file...{}".format(file_url)) + os.remove(file_url) # needs to be changed to Factory when sites implemented + if mode == 'finalize': + self.log.debug("Renaming file...{}".format(file_url)) + import re + os.rename(file_url, re.sub('\.{}$'.format(self.TMP_FILE_EXT), '', file_url)) # needs to be changed to Factory when sites implemented + except FileNotFoundError: pass # file not there, nothing to delete except OSError as e: - self.log.critical("Cannot remove file {}".format(file_url)) - self.log.critical(e) \ No newline at end of file + self.log.critical("Cannot {} file {}".format(mode, file_url), exc_info=True) + raise + + if mode == 'remove': + try: + publishDir = instance.data.get('publishDir', '') + resourcesDir = instance.data.get('resourcesDir', '') + if resourcesDir: + os.remove(resourcesDir) + if publishDir: + os.remove(publishDir) + except OSError as e: + self.log.critical("Cannot remove destination directory {} or {}".format(publishDir, resourcesDir), exc_info=True) + raise +