From 2c102722627fad9949c950b70b47ddc7abf0bbf0 Mon Sep 17 00:00:00 2001 From: ChunYou Date: Tue, 4 Feb 2025 02:45:36 +0000 Subject: [PATCH 01/20] Make concurrent copying of files and change hero integration to use speedcopy on windows --- client/ayon_core/lib/file_transaction.py | 66 ++++++++++--------- .../plugins/publish/integrate_hero_version.py | 20 ++++-- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index a502403958..7c330eb9b2 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -11,6 +11,7 @@ if sys.platform == "win32": else: from shutil import copyfile +import concurrent.futures class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. @@ -109,41 +110,46 @@ class FileTransaction: self._transfers[dst] = (src, opts) def process(self): - # Backup any existing files - for dst, (src, _) in self._transfers.items(): - self.log.debug("Checking file ... {} -> {}".format(src, dst)) - path_same = self._same_paths(src, dst) - if path_same or not os.path.exists(dst): - continue + with concurrent.futures.ThreadPoolExecutor() as executor: + backup_futures = [] + for dst, (src, _) in self._transfers.items(): + backup_futures.append(executor.submit(self.backup_file, dst, src)) + concurrent.futures.wait(backup_futures) - # Backup original file - # todo: add timestamp or uuid to ensure unique - backup = dst + ".bak" - self._backup_to_original[backup] = dst + transfer_futures = [] + for dst, (src, opts) in self._transfers.items(): + transfer_futures.append(executor.submit(self.transfer_file, dst, src, opts)) + concurrent.futures.wait(transfer_futures) + + def backup_file(self, dst, src): + self.log.debug("Checking file ... {} -> {}".format(src, dst)) + path_same = self._same_paths(src, dst) + if path_same or not os.path.exists(dst): + return + + # Backup original file + backup = dst + ".bak" + self._backup_to_original[backup] = dst + self.log.debug("Backup existing file: {} -> {}".format(dst, backup)) + os.rename(dst, backup) + + def transfer_file(self, dst, src, opts): + path_same = self._same_paths(src, dst) + if path_same: self.log.debug( - "Backup existing file: {} -> {}".format(dst, backup)) - os.rename(dst, backup) + "Source and destination are same files {} -> {}".format(src, dst)) + return - # Copy the files to transfer - for dst, (src, opts) in self._transfers.items(): - path_same = self._same_paths(src, dst) - if path_same: - self.log.debug( - "Source and destination are same files {} -> {}".format( - src, dst)) - continue + self._create_folder_for_file(dst) - self._create_folder_for_file(dst) + if opts["mode"] == self.MODE_COPY: + self.log.debug("Copying file ... {} -> {}".format(src, dst)) + copyfile(src, dst) + elif opts["mode"] == self.MODE_HARDLINK: + self.log.debug("Hardlinking file ... {} -> {}".format(src, dst)) + create_hard_link(src, dst) - if opts["mode"] == self.MODE_COPY: - self.log.debug("Copying file ... {} -> {}".format(src, dst)) - copyfile(src, dst) - elif opts["mode"] == self.MODE_HARDLINK: - self.log.debug("Hardlinking file ... {} -> {}".format( - src, dst)) - create_hard_link(src, dst) - - self._transferred.append(dst) + self._transferred.append(dst) def finalize(self): # Delete any backed up files diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 2163596864..92becb6e01 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -2,6 +2,14 @@ import os import copy import errno import shutil +import sys +# this is needed until speedcopy for linux is fixed +if sys.platform == "win32": + from speedcopy import copyfile +else: + from shutil import copyfile + +import concurrent.futures import clique import pyblish.api @@ -415,11 +423,11 @@ class IntegrateHeroVersion( # Copy(hardlink) paths of source and destination files # TODO should we *only* create hardlinks? # TODO should we keep files for deletion until this is successful? - for src_path, dst_path in src_to_dst_file_paths: - self.copy_file(src_path, dst_path) - - for src_path, dst_path in other_file_paths_mapping: - self.copy_file(src_path, dst_path) + with concurrent.futures.ThreadPoolExecutor() as executor: + file_futures = [] + for src_path, dst_path in src_to_dst_file_paths + other_file_paths_mapping: + file_futures.append(executor.submit(self.copy_file, src_path, dst_path)) + concurrent.futures.wait(file_futures) # Update prepared representation etity data with files # and integrate it to server. @@ -648,7 +656,7 @@ class IntegrateHeroVersion( src_path, dst_path )) - shutil.copy(src_path, dst_path) + copyfile(src_path, dst_path) def version_from_representations(self, project_name, repres): for repre in repres: From 4a8c79285dce2d1e497aa30301ff430882255296 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:23:23 +0000 Subject: [PATCH 02/20] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 7c330eb9b2..08fcd66984 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -130,7 +130,7 @@ class FileTransaction: # Backup original file backup = dst + ".bak" self._backup_to_original[backup] = dst - self.log.debug("Backup existing file: {} -> {}".format(dst, backup)) + self.log.debug(f"Backup existing file: {dst} -> {backup}") os.rename(dst, backup) def transfer_file(self, dst, src, opts): From 4bca62dcfe041665051b1342be0fc8c0a9bd0081 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:23:30 +0000 Subject: [PATCH 03/20] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 08fcd66984..138861760f 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -143,7 +143,7 @@ class FileTransaction: self._create_folder_for_file(dst) if opts["mode"] == self.MODE_COPY: - self.log.debug("Copying file ... {} -> {}".format(src, dst)) + self.log.debug(f"Copying file ... {src} -> {dst}") copyfile(src, dst) elif opts["mode"] == self.MODE_HARDLINK: self.log.debug("Hardlinking file ... {} -> {}".format(src, dst)) From 8dae70ab590789d03f3b5afe482a79cd0c1449a4 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:23:35 +0000 Subject: [PATCH 04/20] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 138861760f..8f119ed2ae 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -146,7 +146,7 @@ class FileTransaction: self.log.debug(f"Copying file ... {src} -> {dst}") copyfile(src, dst) elif opts["mode"] == self.MODE_HARDLINK: - self.log.debug("Hardlinking file ... {} -> {}".format(src, dst)) + self.log.debug(f"Hardlinking file ... {src} -> {dst}") create_hard_link(src, dst) self._transferred.append(dst) From b2e84b2b0a6f397b402084d95fa70b607bf39d0e Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:23:43 +0000 Subject: [PATCH 05/20] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 8f119ed2ae..e19f095920 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -137,7 +137,7 @@ class FileTransaction: path_same = self._same_paths(src, dst) if path_same: self.log.debug( - "Source and destination are same files {} -> {}".format(src, dst)) + f"Source and destination are same files {src} -> {dst}") return self._create_folder_for_file(dst) From 724e206900c57ec90daed0171571f9a52566d458 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:24:08 +0000 Subject: [PATCH 06/20] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 1 - 1 file changed, 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index e19f095920..2d1c7726eb 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -11,7 +11,6 @@ if sys.platform == "win32": else: from shutil import copyfile -import concurrent.futures class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. From 8721824bd9d889b97eb91ff60668c6bd64638072 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:24:14 +0000 Subject: [PATCH 07/20] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 2d1c7726eb..5c55f56b7d 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -121,7 +121,7 @@ class FileTransaction: concurrent.futures.wait(transfer_futures) def backup_file(self, dst, src): - self.log.debug("Checking file ... {} -> {}".format(src, dst)) + self.log.debug(f"Checking file ... {src} -> {dst}") path_same = self._same_paths(src, dst) if path_same or not os.path.exists(dst): return From 236f72832cb313c959678acc05090a006b188fde Mon Sep 17 00:00:00 2001 From: ChunYou Date: Tue, 4 Feb 2025 13:19:00 +0000 Subject: [PATCH 08/20] Add underscores to bacup and transfer file functions --- client/ayon_core/lib/file_transaction.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 5c55f56b7d..f271f3081c 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -112,15 +112,15 @@ class FileTransaction: with concurrent.futures.ThreadPoolExecutor() as executor: backup_futures = [] for dst, (src, _) in self._transfers.items(): - backup_futures.append(executor.submit(self.backup_file, dst, src)) + backup_futures.append(executor.submit(self._backup_file, dst, src)) concurrent.futures.wait(backup_futures) transfer_futures = [] for dst, (src, opts) in self._transfers.items(): - transfer_futures.append(executor.submit(self.transfer_file, dst, src, opts)) + transfer_futures.append(executor.submit(self._transfer_file, dst, src, opts)) concurrent.futures.wait(transfer_futures) - def backup_file(self, dst, src): + def _backup_file(self, dst, src): self.log.debug(f"Checking file ... {src} -> {dst}") path_same = self._same_paths(src, dst) if path_same or not os.path.exists(dst): @@ -132,7 +132,7 @@ class FileTransaction: self.log.debug(f"Backup existing file: {dst} -> {backup}") os.rename(dst, backup) - def transfer_file(self, dst, src, opts): + def _transfer_file(self, dst, src, opts): path_same = self._same_paths(src, dst) if path_same: self.log.debug( From 284ad5316be44940857f9c27c3357eae085b89a7 Mon Sep 17 00:00:00 2001 From: ChunYou Date: Tue, 4 Feb 2025 13:59:06 +0000 Subject: [PATCH 09/20] Added some error handling --- client/ayon_core/lib/file_transaction.py | 35 +++++++++++++++++------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index f271f3081c..83a371967f 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -108,17 +108,32 @@ class FileTransaction: self._transfers[dst] = (src, opts) - def process(self): - with concurrent.futures.ThreadPoolExecutor() as executor: - backup_futures = [] - for dst, (src, _) in self._transfers.items(): - backup_futures.append(executor.submit(self._backup_file, dst, src)) - concurrent.futures.wait(backup_futures) - transfer_futures = [] - for dst, (src, opts) in self._transfers.items(): - transfer_futures.append(executor.submit(self._transfer_file, dst, src, opts)) - concurrent.futures.wait(transfer_futures) + def _process_futures(self, futures): + """Wait for futures and raise exceptions if any task fails.""" + try: + for future in concurrent.futures.as_completed(futures): + future.result() # If an exception occurs, it will be raised here + except Exception as e: + print(f"File Transaction task failed with error: {e}", file=sys.stderr) + raise + + def process(self): + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(self._transfers))) as executor: + # Submit backup tasks + backup_futures = [executor.submit(self._backup_file, dst, src) for dst, (src, _) in + self._transfers.items()] + self._process_futures(backup_futures) + + # Submit transfer tasks + transfer_futures = [executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in + self._transfers.items()] + self._process_futures(transfer_futures) + + except Exception as e: + print(f"File Transaction Failed: {e}", file=sys.stderr) + sys.exit(1) def _backup_file(self, dst, src): self.log.debug(f"Checking file ... {src} -> {dst}") From 8c9c69efa78e99cc0fc6eceeff7a918736ab211e Mon Sep 17 00:00:00 2001 From: Roy Nieterau Date: Wed, 5 Feb 2025 02:35:35 +0100 Subject: [PATCH 10/20] Add validation for the ThreadPoolExecutor calls, raise on error --- client/ayon_core/lib/file_transaction.py | 45 ++++++++--------- client/ayon_core/lib/threadpool.py | 49 +++++++++++++++++++ .../plugins/publish/integrate_hero_version.py | 18 ++++--- 3 files changed, 81 insertions(+), 31 deletions(-) create mode 100644 client/ayon_core/lib/threadpool.py diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 83a371967f..e82599b2fb 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -2,6 +2,7 @@ import os import logging import sys import errno +from concurrent.futures import ThreadPoolExecutor from ayon_core.lib import create_hard_link @@ -11,6 +12,11 @@ if sys.platform == "win32": else: from shutil import copyfile +from .threadpool import as_completed_stop_and_raise_on_error + + +log = logging.getLogger(__name__) + class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. @@ -108,32 +114,23 @@ class FileTransaction: self._transfers[dst] = (src, opts) - - def _process_futures(self, futures): - """Wait for futures and raise exceptions if any task fails.""" - try: - for future in concurrent.futures.as_completed(futures): - future.result() # If an exception occurs, it will be raised here - except Exception as e: - print(f"File Transaction task failed with error: {e}", file=sys.stderr) - raise - def process(self): - try: - with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(self._transfers))) as executor: - # Submit backup tasks - backup_futures = [executor.submit(self._backup_file, dst, src) for dst, (src, _) in - self._transfers.items()] - self._process_futures(backup_futures) + with ThreadPoolExecutor(max_workers=8) as executor: + # Submit backup tasks + backup_futures = [ + executor.submit(self._backup_file, dst, src) + for dst, (src, _) in self._transfers.items() + ] + as_completed_stop_and_raise_on_error( + executor, backup_futures, logger=self.log) - # Submit transfer tasks - transfer_futures = [executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in - self._transfers.items()] - self._process_futures(transfer_futures) - - except Exception as e: - print(f"File Transaction Failed: {e}", file=sys.stderr) - sys.exit(1) + # Submit transfer tasks + transfer_futures = [ + executor.submit(self._transfer_file, dst, src, opts) + for dst, (src, opts) in self._transfers.items() + ] + as_completed_stop_and_raise_on_error( + executor, transfer_futures, logger=self.log) def _backup_file(self, dst, src): self.log.debug(f"Checking file ... {src} -> {dst}") diff --git a/client/ayon_core/lib/threadpool.py b/client/ayon_core/lib/threadpool.py new file mode 100644 index 0000000000..b1b2476342 --- /dev/null +++ b/client/ayon_core/lib/threadpool.py @@ -0,0 +1,49 @@ +import logging +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor, Future +from typing import List, Optional + +log = logging.getLogger(__name__) + + +def as_completed_stop_and_raise_on_error( + executor: ThreadPoolExecutor, + futures: List[Future], + logger: Optional[logging.Logger] = None): + """For the ThreadPoolExecutor shutdown and cancel futures as soon one of + the workers raises an error as they complete. + + The ThreadPoolExecutor only cancels pending futures on exception but will + still complete those that are running - each which also themselves could + fail. We log all exceptions, but re-raise the last exception only. + """ + if logger is None: + logger = log + + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + # As soon as an error occurs, stop executing more futures. + # Running workers however, will still complete so we also want + # to log those errors if any occurred on them. + executor.shutdown(wait=True, cancel_futures=True) + break + else: + # Futures are completed, no exceptions occurred + return + + # An exception occurred in at least one future. Get exceptions from + # all futures that are done and ended up failing until that point. + exceptions = [] + for future in futures: + if not future.cancelled() and future.done(): + exception = future.exception() + if exception: + exceptions.append(exception) + + # Log any exceptions that occurred in all workers + for exception in exceptions: + logger.error("Error occurred in worker", exc_info=exception) + + # Raise the last exception + raise exceptions[-1] diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 92becb6e01..78a2796d35 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -1,16 +1,16 @@ import os import copy import errno +import itertools import shutil import sys +from concurrent.futures import ThreadPoolExecutor # this is needed until speedcopy for linux is fixed if sys.platform == "win32": from speedcopy import copyfile else: from shutil import copyfile -import concurrent.futures - import clique import pyblish.api import ayon_api @@ -21,6 +21,7 @@ from ayon_api.operations import ( from ayon_api.utils import create_entity_id from ayon_core.lib import create_hard_link, source_hash +from ayon_core.lib.threadpool import as_completed_stop_and_raise_on_error from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin, @@ -423,11 +424,14 @@ class IntegrateHeroVersion( # Copy(hardlink) paths of source and destination files # TODO should we *only* create hardlinks? # TODO should we keep files for deletion until this is successful? - with concurrent.futures.ThreadPoolExecutor() as executor: - file_futures = [] - for src_path, dst_path in src_to_dst_file_paths + other_file_paths_mapping: - file_futures.append(executor.submit(self.copy_file, src_path, dst_path)) - concurrent.futures.wait(file_futures) + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [ + executor.submit(self.copy_file, src_path, dst_path) + for src_path, dst_path + in itertools.chain(src_to_dst_file_paths, + other_file_paths_mapping) + ] + as_completed_stop_and_raise_on_error(executor, futures) # Update prepared representation etity data with files # and integrate it to server. From 42760fbffe26523aa8dc45ae8ba616dba375bcfb Mon Sep 17 00:00:00 2001 From: Roy Nieterau Date: Wed, 5 Feb 2025 02:39:11 +0100 Subject: [PATCH 11/20] Remove redundant log instance --- client/ayon_core/lib/file_transaction.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index e82599b2fb..06ba07c148 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -15,9 +15,6 @@ else: from .threadpool import as_completed_stop_and_raise_on_error -log = logging.getLogger(__name__) - - class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. From 9c67bf1990d26fdcde5e3566c058aadfdda6bf8e Mon Sep 17 00:00:00 2001 From: Roy Nieterau Date: Wed, 5 Feb 2025 11:51:32 +0100 Subject: [PATCH 12/20] Move `as_completed_stop_and_raise_on_error` to `file_transaction.py` --- client/ayon_core/lib/file_transaction.py | 49 +++++++++++++++++-- client/ayon_core/lib/threadpool.py | 49 ------------------- .../plugins/publish/integrate_hero_version.py | 2 +- 3 files changed, 47 insertions(+), 53 deletions(-) delete mode 100644 client/ayon_core/lib/threadpool.py diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 06ba07c148..bf206b535c 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -1,8 +1,10 @@ +import concurrent.futures import os import logging import sys import errno -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, Future +from typing import List, Optional from ayon_core.lib import create_hard_link @@ -12,8 +14,6 @@ if sys.platform == "win32": else: from shutil import copyfile -from .threadpool import as_completed_stop_and_raise_on_error - class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. @@ -226,3 +226,46 @@ class FileTransaction: return os.stat(src) == os.stat(dst) return src == dst + + +def as_completed_stop_and_raise_on_error( + executor: ThreadPoolExecutor, + futures: List[Future], + logger: Optional[logging.Logger] = None): + """For the ThreadPoolExecutor shutdown and cancel futures as soon one of + the workers raises an error as they complete. + + The ThreadPoolExecutor only cancels pending futures on exception but will + still complete those that are running - each which also themselves could + fail. We log all exceptions, but re-raise the last exception only. + """ + if logger is None: + logger = logging.getLogger(__name__) + + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + # As soon as an error occurs, stop executing more futures. + # Running workers however, will still complete so we also want + # to log those errors if any occurred on them. + executor.shutdown(wait=True, cancel_futures=True) + break + else: + # Futures are completed, no exceptions occurred + return + + # An exception occurred in at least one future. Get exceptions from + # all futures that are done and ended up failing until that point. + exceptions = [] + for future in futures: + if not future.cancelled() and future.done(): + exception = future.exception() + if exception: + exceptions.append(exception) + + # Log any exceptions that occurred in all workers + for exception in exceptions: + logger.error("Error occurred in worker", exc_info=exception) + + # Raise the last exception + raise exceptions[-1] diff --git a/client/ayon_core/lib/threadpool.py b/client/ayon_core/lib/threadpool.py deleted file mode 100644 index b1b2476342..0000000000 --- a/client/ayon_core/lib/threadpool.py +++ /dev/null @@ -1,49 +0,0 @@ -import logging -import concurrent.futures -from concurrent.futures import ThreadPoolExecutor, Future -from typing import List, Optional - -log = logging.getLogger(__name__) - - -def as_completed_stop_and_raise_on_error( - executor: ThreadPoolExecutor, - futures: List[Future], - logger: Optional[logging.Logger] = None): - """For the ThreadPoolExecutor shutdown and cancel futures as soon one of - the workers raises an error as they complete. - - The ThreadPoolExecutor only cancels pending futures on exception but will - still complete those that are running - each which also themselves could - fail. We log all exceptions, but re-raise the last exception only. - """ - if logger is None: - logger = log - - for future in concurrent.futures.as_completed(futures): - exception = future.exception() - if exception: - # As soon as an error occurs, stop executing more futures. - # Running workers however, will still complete so we also want - # to log those errors if any occurred on them. - executor.shutdown(wait=True, cancel_futures=True) - break - else: - # Futures are completed, no exceptions occurred - return - - # An exception occurred in at least one future. Get exceptions from - # all futures that are done and ended up failing until that point. - exceptions = [] - for future in futures: - if not future.cancelled() and future.done(): - exception = future.exception() - if exception: - exceptions.append(exception) - - # Log any exceptions that occurred in all workers - for exception in exceptions: - logger.error("Error occurred in worker", exc_info=exception) - - # Raise the last exception - raise exceptions[-1] diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 78a2796d35..69864cce8a 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -21,7 +21,7 @@ from ayon_api.operations import ( from ayon_api.utils import create_entity_id from ayon_core.lib import create_hard_link, source_hash -from ayon_core.lib.threadpool import as_completed_stop_and_raise_on_error +from ayon_core.lib.file_transaction import as_completed_stop_and_raise_on_error from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin, From 0d7ced6fc660facf17b3afd8136a10d25a85fb0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Samohel?= <33513211+antirotor@users.noreply.github.com> Date: Fri, 9 May 2025 16:44:55 +0200 Subject: [PATCH 13/20] Update client/ayon_core/plugins/publish/integrate_hero_version.py Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> --- client/ayon_core/plugins/publish/integrate_hero_version.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 69864cce8a..1315c114c3 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -427,9 +427,9 @@ class IntegrateHeroVersion( with ThreadPoolExecutor(max_workers=8) as executor: futures = [ executor.submit(self.copy_file, src_path, dst_path) - for src_path, dst_path - in itertools.chain(src_to_dst_file_paths, - other_file_paths_mapping) + for src_path, dst_path in itertools.chain( + src_to_dst_file_paths, other_file_paths_mapping + ) ] as_completed_stop_and_raise_on_error(executor, futures) From ee7d045413a59ac6c9be922fee1c979d345e087d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Samohel?= Date: Fri, 9 May 2025 17:15:38 +0200 Subject: [PATCH 14/20] :recycle: refactor function name and remove speedcopy fallback --- client/ayon_core/lib/file_transaction.py | 17 ++++++----------- .../plugins/publish/integrate_hero_version.py | 12 ++++-------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index bf206b535c..d720ff8d30 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -1,18 +1,13 @@ import concurrent.futures import os import logging -import sys import errno from concurrent.futures import ThreadPoolExecutor, Future from typing import List, Optional from ayon_core.lib import create_hard_link -# this is needed until speedcopy for linux is fixed -if sys.platform == "win32": - from speedcopy import copyfile -else: - from shutil import copyfile +from speedcopy import copyfile class DuplicateDestinationError(ValueError): @@ -118,7 +113,7 @@ class FileTransaction: executor.submit(self._backup_file, dst, src) for dst, (src, _) in self._transfers.items() ] - as_completed_stop_and_raise_on_error( + wait_for_future_errors( executor, backup_futures, logger=self.log) # Submit transfer tasks @@ -126,7 +121,7 @@ class FileTransaction: executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in self._transfers.items() ] - as_completed_stop_and_raise_on_error( + wait_for_future_errors( executor, transfer_futures, logger=self.log) def _backup_file(self, dst, src): @@ -228,7 +223,7 @@ class FileTransaction: return src == dst -def as_completed_stop_and_raise_on_error( +def wait_for_future_errors( executor: ThreadPoolExecutor, futures: List[Future], logger: Optional[logging.Logger] = None): @@ -237,7 +232,7 @@ def as_completed_stop_and_raise_on_error( The ThreadPoolExecutor only cancels pending futures on exception but will still complete those that are running - each which also themselves could - fail. We log all exceptions, but re-raise the last exception only. + fail. We log all exceptions but re-raise the last exception only. """ if logger is None: logger = logging.getLogger(__name__) @@ -246,7 +241,7 @@ def as_completed_stop_and_raise_on_error( exception = future.exception() if exception: # As soon as an error occurs, stop executing more futures. - # Running workers however, will still complete so we also want + # Running workers, however, will still be complete, so we also want # to log those errors if any occurred on them. executor.shutdown(wait=True, cancel_futures=True) break diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 1315c114c3..43f93da293 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -3,13 +3,9 @@ import copy import errno import itertools import shutil -import sys from concurrent.futures import ThreadPoolExecutor -# this is needed until speedcopy for linux is fixed -if sys.platform == "win32": - from speedcopy import copyfile -else: - from shutil import copyfile + +from speedcopy import copyfile import clique import pyblish.api @@ -21,7 +17,7 @@ from ayon_api.operations import ( from ayon_api.utils import create_entity_id from ayon_core.lib import create_hard_link, source_hash -from ayon_core.lib.file_transaction import as_completed_stop_and_raise_on_error +from ayon_core.lib.file_transaction import wait_for_future_errors from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin, @@ -431,7 +427,7 @@ class IntegrateHeroVersion( src_to_dst_file_paths, other_file_paths_mapping ) ] - as_completed_stop_and_raise_on_error(executor, futures) + wait_for_future_errors(executor, futures) # Update prepared representation etity data with files # and integrate it to server. From 0e49ada807336dacdb88c85df5d83c3e695e3afb Mon Sep 17 00:00:00 2001 From: Ynbot Date: Wed, 21 May 2025 07:25:28 +0000 Subject: [PATCH 15/20] [Automated] Add generated package files from main --- client/ayon_core/version.py | 2 +- package.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/version.py b/client/ayon_core/version.py index 4fd7bde336..533862fa9a 100644 --- a/client/ayon_core/version.py +++ b/client/ayon_core/version.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- """Package declaring AYON addon 'core' version.""" -__version__ = "1.2.0+dev" +__version__ = "1.3.0" diff --git a/package.py b/package.py index 601d703857..7406e70aec 100644 --- a/package.py +++ b/package.py @@ -1,6 +1,6 @@ name = "core" title = "Core" -version = "1.2.0+dev" +version = "1.3.0" client_dir = "ayon_core" diff --git a/pyproject.toml b/pyproject.toml index c7e2bb5000..fda22073ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "ayon-core" -version = "1.2.0+dev" +version = "1.3.0" description = "" authors = ["Ynput Team "] readme = "README.md" From 3df127ec3b57d2619fc66db79e6f5f035c815a76 Mon Sep 17 00:00:00 2001 From: Ynbot Date: Wed, 21 May 2025 07:26:06 +0000 Subject: [PATCH 16/20] [Automated] Update version in package.py for develop --- client/ayon_core/version.py | 2 +- package.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/version.py b/client/ayon_core/version.py index 533862fa9a..64842b5976 100644 --- a/client/ayon_core/version.py +++ b/client/ayon_core/version.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- """Package declaring AYON addon 'core' version.""" -__version__ = "1.3.0" +__version__ = "1.3.0+dev" diff --git a/package.py b/package.py index 7406e70aec..32fedd859b 100644 --- a/package.py +++ b/package.py @@ -1,6 +1,6 @@ name = "core" title = "Core" -version = "1.3.0" +version = "1.3.0+dev" client_dir = "ayon_core" diff --git a/pyproject.toml b/pyproject.toml index fda22073ff..4034d6c0c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "ayon-core" -version = "1.3.0" +version = "1.3.0+dev" description = "" authors = ["Ynput Team "] readme = "README.md" From c020f821dca71138d48225d8cd32fa51bd8b1835 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 21 May 2025 07:27:00 +0000 Subject: [PATCH 17/20] chore(): update bug report / version --- .github/ISSUE_TEMPLATE/bug_report.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index c1e18faf55..9dbe32b018 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -35,6 +35,7 @@ body: label: Version description: What version are you running? Look to AYON Tray options: + - 1.3.0 - 1.2.0 - 1.1.9 - 1.1.8 From 46f6198ba36c23448c16f7133982753e16b58a55 Mon Sep 17 00:00:00 2001 From: Ynbot Date: Thu, 22 May 2025 12:31:33 +0000 Subject: [PATCH 18/20] [Automated] Add generated package files from main --- client/ayon_core/version.py | 2 +- package.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/version.py b/client/ayon_core/version.py index 64842b5976..1b2f0defaa 100644 --- a/client/ayon_core/version.py +++ b/client/ayon_core/version.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- """Package declaring AYON addon 'core' version.""" -__version__ = "1.3.0+dev" +__version__ = "1.3.1" diff --git a/package.py b/package.py index 32fedd859b..8161d63b1c 100644 --- a/package.py +++ b/package.py @@ -1,6 +1,6 @@ name = "core" title = "Core" -version = "1.3.0+dev" +version = "1.3.1" client_dir = "ayon_core" diff --git a/pyproject.toml b/pyproject.toml index 4034d6c0c6..79158e1010 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "ayon-core" -version = "1.3.0+dev" +version = "1.3.1" description = "" authors = ["Ynput Team "] readme = "README.md" From 6379f6726271ce848bc21c130bf28e30f8cdb302 Mon Sep 17 00:00:00 2001 From: Ynbot Date: Thu, 22 May 2025 12:32:10 +0000 Subject: [PATCH 19/20] [Automated] Update version in package.py for develop --- client/ayon_core/version.py | 2 +- package.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/version.py b/client/ayon_core/version.py index 1b2f0defaa..9c43e80bf1 100644 --- a/client/ayon_core/version.py +++ b/client/ayon_core/version.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- """Package declaring AYON addon 'core' version.""" -__version__ = "1.3.1" +__version__ = "1.3.1+dev" diff --git a/package.py b/package.py index 8161d63b1c..47e3b39083 100644 --- a/package.py +++ b/package.py @@ -1,6 +1,6 @@ name = "core" title = "Core" -version = "1.3.1" +version = "1.3.1+dev" client_dir = "ayon_core" diff --git a/pyproject.toml b/pyproject.toml index 79158e1010..f919a9589b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "ayon-core" -version = "1.3.1" +version = "1.3.1+dev" description = "" authors = ["Ynput Team "] readme = "README.md" From 1fe653c8b7323a1e84d48e6dd5684543b8b7793c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 22 May 2025 12:33:06 +0000 Subject: [PATCH 20/20] chore(): update bug report / version --- .github/ISSUE_TEMPLATE/bug_report.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index 9dbe32b018..f71c6e2c29 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -35,6 +35,7 @@ body: label: Version description: What version are you running? Look to AYON Tray options: + - 1.3.1 - 1.3.0 - 1.2.0 - 1.1.9