diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 06ba07c148..bf206b535c 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -1,8 +1,10 @@ +import concurrent.futures import os import logging import sys import errno -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, Future +from typing import List, Optional from ayon_core.lib import create_hard_link @@ -12,8 +14,6 @@ if sys.platform == "win32": else: from shutil import copyfile -from .threadpool import as_completed_stop_and_raise_on_error - class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. @@ -226,3 +226,46 @@ class FileTransaction: return os.stat(src) == os.stat(dst) return src == dst + + +def as_completed_stop_and_raise_on_error( + executor: ThreadPoolExecutor, + futures: List[Future], + logger: Optional[logging.Logger] = None): + """For the ThreadPoolExecutor shutdown and cancel futures as soon one of + the workers raises an error as they complete. + + The ThreadPoolExecutor only cancels pending futures on exception but will + still complete those that are running - each which also themselves could + fail. We log all exceptions, but re-raise the last exception only. + """ + if logger is None: + logger = logging.getLogger(__name__) + + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + # As soon as an error occurs, stop executing more futures. + # Running workers however, will still complete so we also want + # to log those errors if any occurred on them. + executor.shutdown(wait=True, cancel_futures=True) + break + else: + # Futures are completed, no exceptions occurred + return + + # An exception occurred in at least one future. Get exceptions from + # all futures that are done and ended up failing until that point. + exceptions = [] + for future in futures: + if not future.cancelled() and future.done(): + exception = future.exception() + if exception: + exceptions.append(exception) + + # Log any exceptions that occurred in all workers + for exception in exceptions: + logger.error("Error occurred in worker", exc_info=exception) + + # Raise the last exception + raise exceptions[-1] diff --git a/client/ayon_core/lib/threadpool.py b/client/ayon_core/lib/threadpool.py deleted file mode 100644 index b1b2476342..0000000000 --- a/client/ayon_core/lib/threadpool.py +++ /dev/null @@ -1,49 +0,0 @@ -import logging -import concurrent.futures -from concurrent.futures import ThreadPoolExecutor, Future -from typing import List, Optional - -log = logging.getLogger(__name__) - - -def as_completed_stop_and_raise_on_error( - executor: ThreadPoolExecutor, - futures: List[Future], - logger: Optional[logging.Logger] = None): - """For the ThreadPoolExecutor shutdown and cancel futures as soon one of - the workers raises an error as they complete. - - The ThreadPoolExecutor only cancels pending futures on exception but will - still complete those that are running - each which also themselves could - fail. We log all exceptions, but re-raise the last exception only. - """ - if logger is None: - logger = log - - for future in concurrent.futures.as_completed(futures): - exception = future.exception() - if exception: - # As soon as an error occurs, stop executing more futures. - # Running workers however, will still complete so we also want - # to log those errors if any occurred on them. - executor.shutdown(wait=True, cancel_futures=True) - break - else: - # Futures are completed, no exceptions occurred - return - - # An exception occurred in at least one future. Get exceptions from - # all futures that are done and ended up failing until that point. - exceptions = [] - for future in futures: - if not future.cancelled() and future.done(): - exception = future.exception() - if exception: - exceptions.append(exception) - - # Log any exceptions that occurred in all workers - for exception in exceptions: - logger.error("Error occurred in worker", exc_info=exception) - - # Raise the last exception - raise exceptions[-1] diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 78a2796d35..69864cce8a 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -21,7 +21,7 @@ from ayon_api.operations import ( from ayon_api.utils import create_entity_id from ayon_core.lib import create_hard_link, source_hash -from ayon_core.lib.threadpool import as_completed_stop_and_raise_on_error +from ayon_core.lib.file_transaction import as_completed_stop_and_raise_on_error from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin,