From 8c9c69efa78e99cc0fc6eceeff7a918736ab211e Mon Sep 17 00:00:00 2001 From: Roy Nieterau Date: Wed, 5 Feb 2025 02:35:35 +0100 Subject: [PATCH] Add validation for the ThreadPoolExecutor calls, raise on error --- client/ayon_core/lib/file_transaction.py | 45 ++++++++--------- client/ayon_core/lib/threadpool.py | 49 +++++++++++++++++++ .../plugins/publish/integrate_hero_version.py | 18 ++++--- 3 files changed, 81 insertions(+), 31 deletions(-) create mode 100644 client/ayon_core/lib/threadpool.py diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 83a371967f..e82599b2fb 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -2,6 +2,7 @@ import os import logging import sys import errno +from concurrent.futures import ThreadPoolExecutor from ayon_core.lib import create_hard_link @@ -11,6 +12,11 @@ if sys.platform == "win32": else: from shutil import copyfile +from .threadpool import as_completed_stop_and_raise_on_error + + +log = logging.getLogger(__name__) + class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. @@ -108,32 +114,23 @@ class FileTransaction: self._transfers[dst] = (src, opts) - - def _process_futures(self, futures): - """Wait for futures and raise exceptions if any task fails.""" - try: - for future in concurrent.futures.as_completed(futures): - future.result() # If an exception occurs, it will be raised here - except Exception as e: - print(f"File Transaction task failed with error: {e}", file=sys.stderr) - raise - def process(self): - try: - with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(self._transfers))) as executor: - # Submit backup tasks - backup_futures = [executor.submit(self._backup_file, dst, src) for dst, (src, _) in - self._transfers.items()] - self._process_futures(backup_futures) + with ThreadPoolExecutor(max_workers=8) as executor: + # Submit backup tasks + backup_futures = [ + executor.submit(self._backup_file, dst, src) + for dst, (src, _) in self._transfers.items() + ] + as_completed_stop_and_raise_on_error( + executor, backup_futures, logger=self.log) - # Submit transfer tasks - transfer_futures = [executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in - self._transfers.items()] - self._process_futures(transfer_futures) - - except Exception as e: - print(f"File Transaction Failed: {e}", file=sys.stderr) - sys.exit(1) + # Submit transfer tasks + transfer_futures = [ + executor.submit(self._transfer_file, dst, src, opts) + for dst, (src, opts) in self._transfers.items() + ] + as_completed_stop_and_raise_on_error( + executor, transfer_futures, logger=self.log) def _backup_file(self, dst, src): self.log.debug(f"Checking file ... {src} -> {dst}") diff --git a/client/ayon_core/lib/threadpool.py b/client/ayon_core/lib/threadpool.py new file mode 100644 index 0000000000..b1b2476342 --- /dev/null +++ b/client/ayon_core/lib/threadpool.py @@ -0,0 +1,49 @@ +import logging +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor, Future +from typing import List, Optional + +log = logging.getLogger(__name__) + + +def as_completed_stop_and_raise_on_error( + executor: ThreadPoolExecutor, + futures: List[Future], + logger: Optional[logging.Logger] = None): + """For the ThreadPoolExecutor shutdown and cancel futures as soon one of + the workers raises an error as they complete. + + The ThreadPoolExecutor only cancels pending futures on exception but will + still complete those that are running - each which also themselves could + fail. We log all exceptions, but re-raise the last exception only. + """ + if logger is None: + logger = log + + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + # As soon as an error occurs, stop executing more futures. + # Running workers however, will still complete so we also want + # to log those errors if any occurred on them. + executor.shutdown(wait=True, cancel_futures=True) + break + else: + # Futures are completed, no exceptions occurred + return + + # An exception occurred in at least one future. Get exceptions from + # all futures that are done and ended up failing until that point. + exceptions = [] + for future in futures: + if not future.cancelled() and future.done(): + exception = future.exception() + if exception: + exceptions.append(exception) + + # Log any exceptions that occurred in all workers + for exception in exceptions: + logger.error("Error occurred in worker", exc_info=exception) + + # Raise the last exception + raise exceptions[-1] diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 92becb6e01..78a2796d35 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -1,16 +1,16 @@ import os import copy import errno +import itertools import shutil import sys +from concurrent.futures import ThreadPoolExecutor # this is needed until speedcopy for linux is fixed if sys.platform == "win32": from speedcopy import copyfile else: from shutil import copyfile -import concurrent.futures - import clique import pyblish.api import ayon_api @@ -21,6 +21,7 @@ from ayon_api.operations import ( from ayon_api.utils import create_entity_id from ayon_core.lib import create_hard_link, source_hash +from ayon_core.lib.threadpool import as_completed_stop_and_raise_on_error from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin, @@ -423,11 +424,14 @@ class IntegrateHeroVersion( # Copy(hardlink) paths of source and destination files # TODO should we *only* create hardlinks? # TODO should we keep files for deletion until this is successful? - with concurrent.futures.ThreadPoolExecutor() as executor: - file_futures = [] - for src_path, dst_path in src_to_dst_file_paths + other_file_paths_mapping: - file_futures.append(executor.submit(self.copy_file, src_path, dst_path)) - concurrent.futures.wait(file_futures) + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [ + executor.submit(self.copy_file, src_path, dst_path) + for src_path, dst_path + in itertools.chain(src_to_dst_file_paths, + other_file_paths_mapping) + ] + as_completed_stop_and_raise_on_error(executor, futures) # Update prepared representation etity data with files # and integrate it to server.