Add validation for the ThreadPoolExecutor calls, raise on error

This commit is contained in:
Roy Nieterau 2025-02-05 02:35:35 +01:00
parent 284ad5316b
commit 8c9c69efa7
3 changed files with 81 additions and 31 deletions

View file

@ -2,6 +2,7 @@ import os
import logging
import sys
import errno
from concurrent.futures import ThreadPoolExecutor
from ayon_core.lib import create_hard_link
@ -11,6 +12,11 @@ if sys.platform == "win32":
else:
from shutil import copyfile
from .threadpool import as_completed_stop_and_raise_on_error
log = logging.getLogger(__name__)
class DuplicateDestinationError(ValueError):
"""Error raised when transfer destination already exists in queue.
@ -108,32 +114,23 @@ class FileTransaction:
self._transfers[dst] = (src, opts)
def _process_futures(self, futures):
"""Wait for futures and raise exceptions if any task fails."""
try:
for future in concurrent.futures.as_completed(futures):
future.result() # If an exception occurs, it will be raised here
except Exception as e:
print(f"File Transaction task failed with error: {e}", file=sys.stderr)
raise
def process(self):
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(self._transfers))) as executor:
# Submit backup tasks
backup_futures = [executor.submit(self._backup_file, dst, src) for dst, (src, _) in
self._transfers.items()]
self._process_futures(backup_futures)
with ThreadPoolExecutor(max_workers=8) as executor:
# Submit backup tasks
backup_futures = [
executor.submit(self._backup_file, dst, src)
for dst, (src, _) in self._transfers.items()
]
as_completed_stop_and_raise_on_error(
executor, backup_futures, logger=self.log)
# Submit transfer tasks
transfer_futures = [executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in
self._transfers.items()]
self._process_futures(transfer_futures)
except Exception as e:
print(f"File Transaction Failed: {e}", file=sys.stderr)
sys.exit(1)
# Submit transfer tasks
transfer_futures = [
executor.submit(self._transfer_file, dst, src, opts)
for dst, (src, opts) in self._transfers.items()
]
as_completed_stop_and_raise_on_error(
executor, transfer_futures, logger=self.log)
def _backup_file(self, dst, src):
self.log.debug(f"Checking file ... {src} -> {dst}")

View file

@ -0,0 +1,49 @@
import logging
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Optional
log = logging.getLogger(__name__)
def as_completed_stop_and_raise_on_error(
executor: ThreadPoolExecutor,
futures: List[Future],
logger: Optional[logging.Logger] = None):
"""For the ThreadPoolExecutor shutdown and cancel futures as soon one of
the workers raises an error as they complete.
The ThreadPoolExecutor only cancels pending futures on exception but will
still complete those that are running - each which also themselves could
fail. We log all exceptions, but re-raise the last exception only.
"""
if logger is None:
logger = log
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
# As soon as an error occurs, stop executing more futures.
# Running workers however, will still complete so we also want
# to log those errors if any occurred on them.
executor.shutdown(wait=True, cancel_futures=True)
break
else:
# Futures are completed, no exceptions occurred
return
# An exception occurred in at least one future. Get exceptions from
# all futures that are done and ended up failing until that point.
exceptions = []
for future in futures:
if not future.cancelled() and future.done():
exception = future.exception()
if exception:
exceptions.append(exception)
# Log any exceptions that occurred in all workers
for exception in exceptions:
logger.error("Error occurred in worker", exc_info=exception)
# Raise the last exception
raise exceptions[-1]

View file

@ -1,16 +1,16 @@
import os
import copy
import errno
import itertools
import shutil
import sys
from concurrent.futures import ThreadPoolExecutor
# this is needed until speedcopy for linux is fixed
if sys.platform == "win32":
from speedcopy import copyfile
else:
from shutil import copyfile
import concurrent.futures
import clique
import pyblish.api
import ayon_api
@ -21,6 +21,7 @@ from ayon_api.operations import (
from ayon_api.utils import create_entity_id
from ayon_core.lib import create_hard_link, source_hash
from ayon_core.lib.threadpool import as_completed_stop_and_raise_on_error
from ayon_core.pipeline.publish import (
get_publish_template_name,
OptionalPyblishPluginMixin,
@ -423,11 +424,14 @@ class IntegrateHeroVersion(
# Copy(hardlink) paths of source and destination files
# TODO should we *only* create hardlinks?
# TODO should we keep files for deletion until this is successful?
with concurrent.futures.ThreadPoolExecutor() as executor:
file_futures = []
for src_path, dst_path in src_to_dst_file_paths + other_file_paths_mapping:
file_futures.append(executor.submit(self.copy_file, src_path, dst_path))
concurrent.futures.wait(file_futures)
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [
executor.submit(self.copy_file, src_path, dst_path)
for src_path, dst_path
in itertools.chain(src_to_dst_file_paths,
other_file_paths_mapping)
]
as_completed_stop_and_raise_on_error(executor, futures)
# Update prepared representation etity data with files
# and integrate it to server.