Move as_completed_stop_and_raise_on_error to file_transaction.py

This commit is contained in:
Roy Nieterau 2025-02-05 11:51:32 +01:00
parent ef850b556f
commit 9c67bf1990
3 changed files with 47 additions and 53 deletions

View file

@ -1,8 +1,10 @@
import concurrent.futures
import os
import logging
import sys
import errno
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Optional
from ayon_core.lib import create_hard_link
@ -12,8 +14,6 @@ if sys.platform == "win32":
else:
from shutil import copyfile
from .threadpool import as_completed_stop_and_raise_on_error
class DuplicateDestinationError(ValueError):
"""Error raised when transfer destination already exists in queue.
@ -226,3 +226,46 @@ class FileTransaction:
return os.stat(src) == os.stat(dst)
return src == dst
def as_completed_stop_and_raise_on_error(
executor: ThreadPoolExecutor,
futures: List[Future],
logger: Optional[logging.Logger] = None):
"""For the ThreadPoolExecutor shutdown and cancel futures as soon one of
the workers raises an error as they complete.
The ThreadPoolExecutor only cancels pending futures on exception but will
still complete those that are running - each which also themselves could
fail. We log all exceptions, but re-raise the last exception only.
"""
if logger is None:
logger = logging.getLogger(__name__)
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
# As soon as an error occurs, stop executing more futures.
# Running workers however, will still complete so we also want
# to log those errors if any occurred on them.
executor.shutdown(wait=True, cancel_futures=True)
break
else:
# Futures are completed, no exceptions occurred
return
# An exception occurred in at least one future. Get exceptions from
# all futures that are done and ended up failing until that point.
exceptions = []
for future in futures:
if not future.cancelled() and future.done():
exception = future.exception()
if exception:
exceptions.append(exception)
# Log any exceptions that occurred in all workers
for exception in exceptions:
logger.error("Error occurred in worker", exc_info=exception)
# Raise the last exception
raise exceptions[-1]

View file

@ -1,49 +0,0 @@
import logging
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Optional
log = logging.getLogger(__name__)
def as_completed_stop_and_raise_on_error(
executor: ThreadPoolExecutor,
futures: List[Future],
logger: Optional[logging.Logger] = None):
"""For the ThreadPoolExecutor shutdown and cancel futures as soon one of
the workers raises an error as they complete.
The ThreadPoolExecutor only cancels pending futures on exception but will
still complete those that are running - each which also themselves could
fail. We log all exceptions, but re-raise the last exception only.
"""
if logger is None:
logger = log
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
# As soon as an error occurs, stop executing more futures.
# Running workers however, will still complete so we also want
# to log those errors if any occurred on them.
executor.shutdown(wait=True, cancel_futures=True)
break
else:
# Futures are completed, no exceptions occurred
return
# An exception occurred in at least one future. Get exceptions from
# all futures that are done and ended up failing until that point.
exceptions = []
for future in futures:
if not future.cancelled() and future.done():
exception = future.exception()
if exception:
exceptions.append(exception)
# Log any exceptions that occurred in all workers
for exception in exceptions:
logger.error("Error occurred in worker", exc_info=exception)
# Raise the last exception
raise exceptions[-1]

View file

@ -21,7 +21,7 @@ from ayon_api.operations import (
from ayon_api.utils import create_entity_id
from ayon_core.lib import create_hard_link, source_hash
from ayon_core.lib.threadpool import as_completed_stop_and_raise_on_error
from ayon_core.lib.file_transaction import as_completed_stop_and_raise_on_error
from ayon_core.pipeline.publish import (
get_publish_template_name,
OptionalPyblishPluginMixin,