From 7485d99cf6d3457b67c97496a98d48bd899cb977 Mon Sep 17 00:00:00 2001 From: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> Date: Fri, 19 Dec 2025 11:23:37 +0100 Subject: [PATCH] use FileTransaction in integrate hero --- .../plugins/publish/integrate_hero_version.py | 101 +++++++----------- 1 file changed, 41 insertions(+), 60 deletions(-) diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 03b9dddf3a..99fdfc94f5 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -1,11 +1,8 @@ import os +import sys import copy -import errno import itertools import shutil -from concurrent.futures import ThreadPoolExecutor - -from speedcopy import copyfile import clique import pyblish.api @@ -16,11 +13,15 @@ from ayon_api.operations import ( ) from ayon_api.utils import create_entity_id -from ayon_core.lib import create_hard_link, source_hash -from ayon_core.lib.file_transaction import wait_for_future_errors +from ayon_core.lib import source_hash +from ayon_core.lib.file_transaction import ( + FileTransaction, + DuplicateDestinationError, +) from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin, + KnownPublishError, ) @@ -421,19 +422,41 @@ class IntegrateHeroVersion( (repre_entity, dst_paths) ) - self.path_checks = [] + file_transactions = FileTransaction( + log=self.log, + # Enforce unique transfers + allow_queue_replacements=False + ) + mode = FileTransaction.MODE_COPY + if self.use_hardlinks: + mode = FileTransaction.MODE_LINK - # Copy(hardlink) paths of source and destination files - # TODO should we *only* create hardlinks? - # TODO should we keep files for deletion until this is successful? - with ThreadPoolExecutor(max_workers=8) as executor: - futures = [ - executor.submit(self.copy_file, src_path, dst_path) - for src_path, dst_path in itertools.chain( - src_to_dst_file_paths, other_file_paths_mapping - ) - ] - wait_for_future_errors(executor, futures) + try: + for src_path, dst_path in itertools.chain( + src_to_dst_file_paths, + other_file_paths_mapping + ): + file_transactions.add(src_path, dst_path, mode=mode) + + self.log.debug("Integrating source files to destination ...") + file_transactions.process() + + except DuplicateDestinationError as exc: + # Raise DuplicateDestinationError as KnownPublishError + # and rollback the transactions + file_transactions.rollback() + raise KnownPublishError(exc).with_traceback(sys.exc_info()[2]) + + except Exception as exc: + # clean destination + # todo: preferably we'd also rollback *any* changes to the database + file_transactions.rollback() + self.log.critical("Error when copying files", exc_info=True) + raise exc + + # Finalizing can't rollback safely so no use for moving it to + # the try, except. + file_transactions.finalize() # Update prepared representation etity data with files # and integrate it to server. @@ -622,48 +645,6 @@ class IntegrateHeroVersion( ).format(path)) return path - def copy_file(self, src_path, dst_path): - # TODO check drives if are the same to check if cas hardlink - dirname = os.path.dirname(dst_path) - - try: - os.makedirs(dirname) - self.log.debug("Folder(s) created: \"{}\"".format(dirname)) - except OSError as exc: - if exc.errno != errno.EEXIST: - self.log.error("An unexpected error occurred.", exc_info=True) - raise - - self.log.debug("Folder already exists: \"{}\"".format(dirname)) - - if self.use_hardlinks: - # First try hardlink and copy if paths are cross drive - self.log.debug("Hardlinking file \"{}\" to \"{}\"".format( - src_path, dst_path - )) - try: - create_hard_link(src_path, dst_path) - # Return when successful - return - - except OSError as exc: - # re-raise exception if different than - # EXDEV - cross drive path - # EINVAL - wrong format, must be NTFS - self.log.debug( - "Hardlink failed with errno:'{}'".format(exc.errno)) - if exc.errno not in [errno.EXDEV, errno.EINVAL]: - raise - - self.log.debug( - "Hardlinking failed, falling back to regular copy...") - - self.log.debug("Copying file \"{}\" to \"{}\"".format( - src_path, dst_path - )) - - copyfile(src_path, dst_path) - def version_from_representations(self, project_name, repres): for repre in repres: version = ayon_api.get_version_by_id(