From 2c102722627fad9949c950b70b47ddc7abf0bbf0 Mon Sep 17 00:00:00 2001 From: ChunYou Date: Tue, 4 Feb 2025 02:45:36 +0000 Subject: [PATCH 01/21] Make concurrent copying of files and change hero integration to use speedcopy on windows --- client/ayon_core/lib/file_transaction.py | 66 ++++++++++--------- .../plugins/publish/integrate_hero_version.py | 20 ++++-- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index a502403958..7c330eb9b2 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -11,6 +11,7 @@ if sys.platform == "win32": else: from shutil import copyfile +import concurrent.futures class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. @@ -109,41 +110,46 @@ class FileTransaction: self._transfers[dst] = (src, opts) def process(self): - # Backup any existing files - for dst, (src, _) in self._transfers.items(): - self.log.debug("Checking file ... {} -> {}".format(src, dst)) - path_same = self._same_paths(src, dst) - if path_same or not os.path.exists(dst): - continue + with concurrent.futures.ThreadPoolExecutor() as executor: + backup_futures = [] + for dst, (src, _) in self._transfers.items(): + backup_futures.append(executor.submit(self.backup_file, dst, src)) + concurrent.futures.wait(backup_futures) - # Backup original file - # todo: add timestamp or uuid to ensure unique - backup = dst + ".bak" - self._backup_to_original[backup] = dst + transfer_futures = [] + for dst, (src, opts) in self._transfers.items(): + transfer_futures.append(executor.submit(self.transfer_file, dst, src, opts)) + concurrent.futures.wait(transfer_futures) + + def backup_file(self, dst, src): + self.log.debug("Checking file ... {} -> {}".format(src, dst)) + path_same = self._same_paths(src, dst) + if path_same or not os.path.exists(dst): + return + + # Backup original file + backup = dst + ".bak" + self._backup_to_original[backup] = dst + self.log.debug("Backup existing file: {} -> {}".format(dst, backup)) + os.rename(dst, backup) + + def transfer_file(self, dst, src, opts): + path_same = self._same_paths(src, dst) + if path_same: self.log.debug( - "Backup existing file: {} -> {}".format(dst, backup)) - os.rename(dst, backup) + "Source and destination are same files {} -> {}".format(src, dst)) + return - # Copy the files to transfer - for dst, (src, opts) in self._transfers.items(): - path_same = self._same_paths(src, dst) - if path_same: - self.log.debug( - "Source and destination are same files {} -> {}".format( - src, dst)) - continue + self._create_folder_for_file(dst) - self._create_folder_for_file(dst) + if opts["mode"] == self.MODE_COPY: + self.log.debug("Copying file ... {} -> {}".format(src, dst)) + copyfile(src, dst) + elif opts["mode"] == self.MODE_HARDLINK: + self.log.debug("Hardlinking file ... {} -> {}".format(src, dst)) + create_hard_link(src, dst) - if opts["mode"] == self.MODE_COPY: - self.log.debug("Copying file ... {} -> {}".format(src, dst)) - copyfile(src, dst) - elif opts["mode"] == self.MODE_HARDLINK: - self.log.debug("Hardlinking file ... {} -> {}".format( - src, dst)) - create_hard_link(src, dst) - - self._transferred.append(dst) + self._transferred.append(dst) def finalize(self): # Delete any backed up files diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 2163596864..92becb6e01 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -2,6 +2,14 @@ import os import copy import errno import shutil +import sys +# this is needed until speedcopy for linux is fixed +if sys.platform == "win32": + from speedcopy import copyfile +else: + from shutil import copyfile + +import concurrent.futures import clique import pyblish.api @@ -415,11 +423,11 @@ class IntegrateHeroVersion( # Copy(hardlink) paths of source and destination files # TODO should we *only* create hardlinks? # TODO should we keep files for deletion until this is successful? - for src_path, dst_path in src_to_dst_file_paths: - self.copy_file(src_path, dst_path) - - for src_path, dst_path in other_file_paths_mapping: - self.copy_file(src_path, dst_path) + with concurrent.futures.ThreadPoolExecutor() as executor: + file_futures = [] + for src_path, dst_path in src_to_dst_file_paths + other_file_paths_mapping: + file_futures.append(executor.submit(self.copy_file, src_path, dst_path)) + concurrent.futures.wait(file_futures) # Update prepared representation etity data with files # and integrate it to server. @@ -648,7 +656,7 @@ class IntegrateHeroVersion( src_path, dst_path )) - shutil.copy(src_path, dst_path) + copyfile(src_path, dst_path) def version_from_representations(self, project_name, repres): for repre in repres: From 4a8c79285dce2d1e497aa30301ff430882255296 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:23:23 +0000 Subject: [PATCH 02/21] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 7c330eb9b2..08fcd66984 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -130,7 +130,7 @@ class FileTransaction: # Backup original file backup = dst + ".bak" self._backup_to_original[backup] = dst - self.log.debug("Backup existing file: {} -> {}".format(dst, backup)) + self.log.debug(f"Backup existing file: {dst} -> {backup}") os.rename(dst, backup) def transfer_file(self, dst, src, opts): From 4bca62dcfe041665051b1342be0fc8c0a9bd0081 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:23:30 +0000 Subject: [PATCH 03/21] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 08fcd66984..138861760f 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -143,7 +143,7 @@ class FileTransaction: self._create_folder_for_file(dst) if opts["mode"] == self.MODE_COPY: - self.log.debug("Copying file ... {} -> {}".format(src, dst)) + self.log.debug(f"Copying file ... {src} -> {dst}") copyfile(src, dst) elif opts["mode"] == self.MODE_HARDLINK: self.log.debug("Hardlinking file ... {} -> {}".format(src, dst)) From 8dae70ab590789d03f3b5afe482a79cd0c1449a4 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:23:35 +0000 Subject: [PATCH 04/21] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 138861760f..8f119ed2ae 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -146,7 +146,7 @@ class FileTransaction: self.log.debug(f"Copying file ... {src} -> {dst}") copyfile(src, dst) elif opts["mode"] == self.MODE_HARDLINK: - self.log.debug("Hardlinking file ... {} -> {}".format(src, dst)) + self.log.debug(f"Hardlinking file ... {src} -> {dst}") create_hard_link(src, dst) self._transferred.append(dst) From b2e84b2b0a6f397b402084d95fa70b607bf39d0e Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:23:43 +0000 Subject: [PATCH 05/21] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 8f119ed2ae..e19f095920 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -137,7 +137,7 @@ class FileTransaction: path_same = self._same_paths(src, dst) if path_same: self.log.debug( - "Source and destination are same files {} -> {}".format(src, dst)) + f"Source and destination are same files {src} -> {dst}") return self._create_folder_for_file(dst) From 724e206900c57ec90daed0171571f9a52566d458 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:24:08 +0000 Subject: [PATCH 06/21] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 1 - 1 file changed, 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index e19f095920..2d1c7726eb 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -11,7 +11,6 @@ if sys.platform == "win32": else: from shutil import copyfile -import concurrent.futures class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. From 8721824bd9d889b97eb91ff60668c6bd64638072 Mon Sep 17 00:00:00 2001 From: r42-chun <73248638+r42-chun@users.noreply.github.com> Date: Tue, 4 Feb 2025 10:24:14 +0000 Subject: [PATCH 07/21] Update client/ayon_core/lib/file_transaction.py Co-authored-by: Roy Nieterau --- client/ayon_core/lib/file_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 2d1c7726eb..5c55f56b7d 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -121,7 +121,7 @@ class FileTransaction: concurrent.futures.wait(transfer_futures) def backup_file(self, dst, src): - self.log.debug("Checking file ... {} -> {}".format(src, dst)) + self.log.debug(f"Checking file ... {src} -> {dst}") path_same = self._same_paths(src, dst) if path_same or not os.path.exists(dst): return From 236f72832cb313c959678acc05090a006b188fde Mon Sep 17 00:00:00 2001 From: ChunYou Date: Tue, 4 Feb 2025 13:19:00 +0000 Subject: [PATCH 08/21] Add underscores to bacup and transfer file functions --- client/ayon_core/lib/file_transaction.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 5c55f56b7d..f271f3081c 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -112,15 +112,15 @@ class FileTransaction: with concurrent.futures.ThreadPoolExecutor() as executor: backup_futures = [] for dst, (src, _) in self._transfers.items(): - backup_futures.append(executor.submit(self.backup_file, dst, src)) + backup_futures.append(executor.submit(self._backup_file, dst, src)) concurrent.futures.wait(backup_futures) transfer_futures = [] for dst, (src, opts) in self._transfers.items(): - transfer_futures.append(executor.submit(self.transfer_file, dst, src, opts)) + transfer_futures.append(executor.submit(self._transfer_file, dst, src, opts)) concurrent.futures.wait(transfer_futures) - def backup_file(self, dst, src): + def _backup_file(self, dst, src): self.log.debug(f"Checking file ... {src} -> {dst}") path_same = self._same_paths(src, dst) if path_same or not os.path.exists(dst): @@ -132,7 +132,7 @@ class FileTransaction: self.log.debug(f"Backup existing file: {dst} -> {backup}") os.rename(dst, backup) - def transfer_file(self, dst, src, opts): + def _transfer_file(self, dst, src, opts): path_same = self._same_paths(src, dst) if path_same: self.log.debug( From 284ad5316be44940857f9c27c3357eae085b89a7 Mon Sep 17 00:00:00 2001 From: ChunYou Date: Tue, 4 Feb 2025 13:59:06 +0000 Subject: [PATCH 09/21] Added some error handling --- client/ayon_core/lib/file_transaction.py | 35 +++++++++++++++++------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index f271f3081c..83a371967f 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -108,17 +108,32 @@ class FileTransaction: self._transfers[dst] = (src, opts) - def process(self): - with concurrent.futures.ThreadPoolExecutor() as executor: - backup_futures = [] - for dst, (src, _) in self._transfers.items(): - backup_futures.append(executor.submit(self._backup_file, dst, src)) - concurrent.futures.wait(backup_futures) - transfer_futures = [] - for dst, (src, opts) in self._transfers.items(): - transfer_futures.append(executor.submit(self._transfer_file, dst, src, opts)) - concurrent.futures.wait(transfer_futures) + def _process_futures(self, futures): + """Wait for futures and raise exceptions if any task fails.""" + try: + for future in concurrent.futures.as_completed(futures): + future.result() # If an exception occurs, it will be raised here + except Exception as e: + print(f"File Transaction task failed with error: {e}", file=sys.stderr) + raise + + def process(self): + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(self._transfers))) as executor: + # Submit backup tasks + backup_futures = [executor.submit(self._backup_file, dst, src) for dst, (src, _) in + self._transfers.items()] + self._process_futures(backup_futures) + + # Submit transfer tasks + transfer_futures = [executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in + self._transfers.items()] + self._process_futures(transfer_futures) + + except Exception as e: + print(f"File Transaction Failed: {e}", file=sys.stderr) + sys.exit(1) def _backup_file(self, dst, src): self.log.debug(f"Checking file ... {src} -> {dst}") From 8c9c69efa78e99cc0fc6eceeff7a918736ab211e Mon Sep 17 00:00:00 2001 From: Roy Nieterau Date: Wed, 5 Feb 2025 02:35:35 +0100 Subject: [PATCH 10/21] Add validation for the ThreadPoolExecutor calls, raise on error --- client/ayon_core/lib/file_transaction.py | 45 ++++++++--------- client/ayon_core/lib/threadpool.py | 49 +++++++++++++++++++ .../plugins/publish/integrate_hero_version.py | 18 ++++--- 3 files changed, 81 insertions(+), 31 deletions(-) create mode 100644 client/ayon_core/lib/threadpool.py diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 83a371967f..e82599b2fb 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -2,6 +2,7 @@ import os import logging import sys import errno +from concurrent.futures import ThreadPoolExecutor from ayon_core.lib import create_hard_link @@ -11,6 +12,11 @@ if sys.platform == "win32": else: from shutil import copyfile +from .threadpool import as_completed_stop_and_raise_on_error + + +log = logging.getLogger(__name__) + class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. @@ -108,32 +114,23 @@ class FileTransaction: self._transfers[dst] = (src, opts) - - def _process_futures(self, futures): - """Wait for futures and raise exceptions if any task fails.""" - try: - for future in concurrent.futures.as_completed(futures): - future.result() # If an exception occurs, it will be raised here - except Exception as e: - print(f"File Transaction task failed with error: {e}", file=sys.stderr) - raise - def process(self): - try: - with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(self._transfers))) as executor: - # Submit backup tasks - backup_futures = [executor.submit(self._backup_file, dst, src) for dst, (src, _) in - self._transfers.items()] - self._process_futures(backup_futures) + with ThreadPoolExecutor(max_workers=8) as executor: + # Submit backup tasks + backup_futures = [ + executor.submit(self._backup_file, dst, src) + for dst, (src, _) in self._transfers.items() + ] + as_completed_stop_and_raise_on_error( + executor, backup_futures, logger=self.log) - # Submit transfer tasks - transfer_futures = [executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in - self._transfers.items()] - self._process_futures(transfer_futures) - - except Exception as e: - print(f"File Transaction Failed: {e}", file=sys.stderr) - sys.exit(1) + # Submit transfer tasks + transfer_futures = [ + executor.submit(self._transfer_file, dst, src, opts) + for dst, (src, opts) in self._transfers.items() + ] + as_completed_stop_and_raise_on_error( + executor, transfer_futures, logger=self.log) def _backup_file(self, dst, src): self.log.debug(f"Checking file ... {src} -> {dst}") diff --git a/client/ayon_core/lib/threadpool.py b/client/ayon_core/lib/threadpool.py new file mode 100644 index 0000000000..b1b2476342 --- /dev/null +++ b/client/ayon_core/lib/threadpool.py @@ -0,0 +1,49 @@ +import logging +import concurrent.futures +from concurrent.futures import ThreadPoolExecutor, Future +from typing import List, Optional + +log = logging.getLogger(__name__) + + +def as_completed_stop_and_raise_on_error( + executor: ThreadPoolExecutor, + futures: List[Future], + logger: Optional[logging.Logger] = None): + """For the ThreadPoolExecutor shutdown and cancel futures as soon one of + the workers raises an error as they complete. + + The ThreadPoolExecutor only cancels pending futures on exception but will + still complete those that are running - each which also themselves could + fail. We log all exceptions, but re-raise the last exception only. + """ + if logger is None: + logger = log + + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + # As soon as an error occurs, stop executing more futures. + # Running workers however, will still complete so we also want + # to log those errors if any occurred on them. + executor.shutdown(wait=True, cancel_futures=True) + break + else: + # Futures are completed, no exceptions occurred + return + + # An exception occurred in at least one future. Get exceptions from + # all futures that are done and ended up failing until that point. + exceptions = [] + for future in futures: + if not future.cancelled() and future.done(): + exception = future.exception() + if exception: + exceptions.append(exception) + + # Log any exceptions that occurred in all workers + for exception in exceptions: + logger.error("Error occurred in worker", exc_info=exception) + + # Raise the last exception + raise exceptions[-1] diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 92becb6e01..78a2796d35 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -1,16 +1,16 @@ import os import copy import errno +import itertools import shutil import sys +from concurrent.futures import ThreadPoolExecutor # this is needed until speedcopy for linux is fixed if sys.platform == "win32": from speedcopy import copyfile else: from shutil import copyfile -import concurrent.futures - import clique import pyblish.api import ayon_api @@ -21,6 +21,7 @@ from ayon_api.operations import ( from ayon_api.utils import create_entity_id from ayon_core.lib import create_hard_link, source_hash +from ayon_core.lib.threadpool import as_completed_stop_and_raise_on_error from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin, @@ -423,11 +424,14 @@ class IntegrateHeroVersion( # Copy(hardlink) paths of source and destination files # TODO should we *only* create hardlinks? # TODO should we keep files for deletion until this is successful? - with concurrent.futures.ThreadPoolExecutor() as executor: - file_futures = [] - for src_path, dst_path in src_to_dst_file_paths + other_file_paths_mapping: - file_futures.append(executor.submit(self.copy_file, src_path, dst_path)) - concurrent.futures.wait(file_futures) + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [ + executor.submit(self.copy_file, src_path, dst_path) + for src_path, dst_path + in itertools.chain(src_to_dst_file_paths, + other_file_paths_mapping) + ] + as_completed_stop_and_raise_on_error(executor, futures) # Update prepared representation etity data with files # and integrate it to server. From 42760fbffe26523aa8dc45ae8ba616dba375bcfb Mon Sep 17 00:00:00 2001 From: Roy Nieterau Date: Wed, 5 Feb 2025 02:39:11 +0100 Subject: [PATCH 11/21] Remove redundant log instance --- client/ayon_core/lib/file_transaction.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index e82599b2fb..06ba07c148 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -15,9 +15,6 @@ else: from .threadpool import as_completed_stop_and_raise_on_error -log = logging.getLogger(__name__) - - class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. From 9c67bf1990d26fdcde5e3566c058aadfdda6bf8e Mon Sep 17 00:00:00 2001 From: Roy Nieterau Date: Wed, 5 Feb 2025 11:51:32 +0100 Subject: [PATCH 12/21] Move `as_completed_stop_and_raise_on_error` to `file_transaction.py` --- client/ayon_core/lib/file_transaction.py | 49 +++++++++++++++++-- client/ayon_core/lib/threadpool.py | 49 ------------------- .../plugins/publish/integrate_hero_version.py | 2 +- 3 files changed, 47 insertions(+), 53 deletions(-) delete mode 100644 client/ayon_core/lib/threadpool.py diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index 06ba07c148..bf206b535c 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -1,8 +1,10 @@ +import concurrent.futures import os import logging import sys import errno -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, Future +from typing import List, Optional from ayon_core.lib import create_hard_link @@ -12,8 +14,6 @@ if sys.platform == "win32": else: from shutil import copyfile -from .threadpool import as_completed_stop_and_raise_on_error - class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. @@ -226,3 +226,46 @@ class FileTransaction: return os.stat(src) == os.stat(dst) return src == dst + + +def as_completed_stop_and_raise_on_error( + executor: ThreadPoolExecutor, + futures: List[Future], + logger: Optional[logging.Logger] = None): + """For the ThreadPoolExecutor shutdown and cancel futures as soon one of + the workers raises an error as they complete. + + The ThreadPoolExecutor only cancels pending futures on exception but will + still complete those that are running - each which also themselves could + fail. We log all exceptions, but re-raise the last exception only. + """ + if logger is None: + logger = logging.getLogger(__name__) + + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + # As soon as an error occurs, stop executing more futures. + # Running workers however, will still complete so we also want + # to log those errors if any occurred on them. + executor.shutdown(wait=True, cancel_futures=True) + break + else: + # Futures are completed, no exceptions occurred + return + + # An exception occurred in at least one future. Get exceptions from + # all futures that are done and ended up failing until that point. + exceptions = [] + for future in futures: + if not future.cancelled() and future.done(): + exception = future.exception() + if exception: + exceptions.append(exception) + + # Log any exceptions that occurred in all workers + for exception in exceptions: + logger.error("Error occurred in worker", exc_info=exception) + + # Raise the last exception + raise exceptions[-1] diff --git a/client/ayon_core/lib/threadpool.py b/client/ayon_core/lib/threadpool.py deleted file mode 100644 index b1b2476342..0000000000 --- a/client/ayon_core/lib/threadpool.py +++ /dev/null @@ -1,49 +0,0 @@ -import logging -import concurrent.futures -from concurrent.futures import ThreadPoolExecutor, Future -from typing import List, Optional - -log = logging.getLogger(__name__) - - -def as_completed_stop_and_raise_on_error( - executor: ThreadPoolExecutor, - futures: List[Future], - logger: Optional[logging.Logger] = None): - """For the ThreadPoolExecutor shutdown and cancel futures as soon one of - the workers raises an error as they complete. - - The ThreadPoolExecutor only cancels pending futures on exception but will - still complete those that are running - each which also themselves could - fail. We log all exceptions, but re-raise the last exception only. - """ - if logger is None: - logger = log - - for future in concurrent.futures.as_completed(futures): - exception = future.exception() - if exception: - # As soon as an error occurs, stop executing more futures. - # Running workers however, will still complete so we also want - # to log those errors if any occurred on them. - executor.shutdown(wait=True, cancel_futures=True) - break - else: - # Futures are completed, no exceptions occurred - return - - # An exception occurred in at least one future. Get exceptions from - # all futures that are done and ended up failing until that point. - exceptions = [] - for future in futures: - if not future.cancelled() and future.done(): - exception = future.exception() - if exception: - exceptions.append(exception) - - # Log any exceptions that occurred in all workers - for exception in exceptions: - logger.error("Error occurred in worker", exc_info=exception) - - # Raise the last exception - raise exceptions[-1] diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 78a2796d35..69864cce8a 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -21,7 +21,7 @@ from ayon_api.operations import ( from ayon_api.utils import create_entity_id from ayon_core.lib import create_hard_link, source_hash -from ayon_core.lib.threadpool import as_completed_stop_and_raise_on_error +from ayon_core.lib.file_transaction import as_completed_stop_and_raise_on_error from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin, From 0d7ced6fc660facf17b3afd8136a10d25a85fb0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Samohel?= <33513211+antirotor@users.noreply.github.com> Date: Fri, 9 May 2025 16:44:55 +0200 Subject: [PATCH 13/21] Update client/ayon_core/plugins/publish/integrate_hero_version.py Co-authored-by: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> --- client/ayon_core/plugins/publish/integrate_hero_version.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 69864cce8a..1315c114c3 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -427,9 +427,9 @@ class IntegrateHeroVersion( with ThreadPoolExecutor(max_workers=8) as executor: futures = [ executor.submit(self.copy_file, src_path, dst_path) - for src_path, dst_path - in itertools.chain(src_to_dst_file_paths, - other_file_paths_mapping) + for src_path, dst_path in itertools.chain( + src_to_dst_file_paths, other_file_paths_mapping + ) ] as_completed_stop_and_raise_on_error(executor, futures) From ee7d045413a59ac6c9be922fee1c979d345e087d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Samohel?= Date: Fri, 9 May 2025 17:15:38 +0200 Subject: [PATCH 14/21] :recycle: refactor function name and remove speedcopy fallback --- client/ayon_core/lib/file_transaction.py | 17 ++++++----------- .../plugins/publish/integrate_hero_version.py | 12 ++++-------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index bf206b535c..d720ff8d30 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -1,18 +1,13 @@ import concurrent.futures import os import logging -import sys import errno from concurrent.futures import ThreadPoolExecutor, Future from typing import List, Optional from ayon_core.lib import create_hard_link -# this is needed until speedcopy for linux is fixed -if sys.platform == "win32": - from speedcopy import copyfile -else: - from shutil import copyfile +from speedcopy import copyfile class DuplicateDestinationError(ValueError): @@ -118,7 +113,7 @@ class FileTransaction: executor.submit(self._backup_file, dst, src) for dst, (src, _) in self._transfers.items() ] - as_completed_stop_and_raise_on_error( + wait_for_future_errors( executor, backup_futures, logger=self.log) # Submit transfer tasks @@ -126,7 +121,7 @@ class FileTransaction: executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in self._transfers.items() ] - as_completed_stop_and_raise_on_error( + wait_for_future_errors( executor, transfer_futures, logger=self.log) def _backup_file(self, dst, src): @@ -228,7 +223,7 @@ class FileTransaction: return src == dst -def as_completed_stop_and_raise_on_error( +def wait_for_future_errors( executor: ThreadPoolExecutor, futures: List[Future], logger: Optional[logging.Logger] = None): @@ -237,7 +232,7 @@ def as_completed_stop_and_raise_on_error( The ThreadPoolExecutor only cancels pending futures on exception but will still complete those that are running - each which also themselves could - fail. We log all exceptions, but re-raise the last exception only. + fail. We log all exceptions but re-raise the last exception only. """ if logger is None: logger = logging.getLogger(__name__) @@ -246,7 +241,7 @@ def as_completed_stop_and_raise_on_error( exception = future.exception() if exception: # As soon as an error occurs, stop executing more futures. - # Running workers however, will still complete so we also want + # Running workers, however, will still be complete, so we also want # to log those errors if any occurred on them. executor.shutdown(wait=True, cancel_futures=True) break diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 1315c114c3..43f93da293 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -3,13 +3,9 @@ import copy import errno import itertools import shutil -import sys from concurrent.futures import ThreadPoolExecutor -# this is needed until speedcopy for linux is fixed -if sys.platform == "win32": - from speedcopy import copyfile -else: - from shutil import copyfile + +from speedcopy import copyfile import clique import pyblish.api @@ -21,7 +17,7 @@ from ayon_api.operations import ( from ayon_api.utils import create_entity_id from ayon_core.lib import create_hard_link, source_hash -from ayon_core.lib.file_transaction import as_completed_stop_and_raise_on_error +from ayon_core.lib.file_transaction import wait_for_future_errors from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin, @@ -431,7 +427,7 @@ class IntegrateHeroVersion( src_to_dst_file_paths, other_file_paths_mapping ) ] - as_completed_stop_and_raise_on_error(executor, futures) + wait_for_future_errors(executor, futures) # Update prepared representation etity data with files # and integrate it to server. From 7a5e7a96f1aef43463c86957f93ecfee8319d602 Mon Sep 17 00:00:00 2001 From: Jakub Jezek Date: Fri, 16 May 2025 17:50:26 +0200 Subject: [PATCH 15/21] Fixes frame string formatting for transcoding Ensures the frame string used for transcoding includes the correct padding, based on the collection's padding attribute. This resolves issues where the output file sequence name was not correctly formatted, leading to transcoding failures. --- client/ayon_core/plugins/publish/extract_color_transcode.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/client/ayon_core/plugins/publish/extract_color_transcode.py b/client/ayon_core/plugins/publish/extract_color_transcode.py index 6cf30857a4..1e86b91484 100644 --- a/client/ayon_core/plugins/publish/extract_color_transcode.py +++ b/client/ayon_core/plugins/publish/extract_color_transcode.py @@ -283,7 +283,11 @@ class ExtractOIIOTranscode(publish.Extractor): if collection.holes().indexes: return files_to_convert - frame_str = "{}-{}#".format(frames[0], frames[-1]) + # Get the padding from the collection + # This is the number of digits used in the frame numbers + padding = collection.padding + + frame_str = "{}-{}%0{}d".format(frames[0], frames[-1], padding) file_name = "{}{}{}".format(collection.head, frame_str, collection.tail) From 0e49ada807336dacdb88c85df5d83c3e695e3afb Mon Sep 17 00:00:00 2001 From: Ynbot Date: Wed, 21 May 2025 07:25:28 +0000 Subject: [PATCH 16/21] [Automated] Add generated package files from main --- client/ayon_core/version.py | 2 +- package.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/version.py b/client/ayon_core/version.py index 4fd7bde336..533862fa9a 100644 --- a/client/ayon_core/version.py +++ b/client/ayon_core/version.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- """Package declaring AYON addon 'core' version.""" -__version__ = "1.2.0+dev" +__version__ = "1.3.0" diff --git a/package.py b/package.py index 601d703857..7406e70aec 100644 --- a/package.py +++ b/package.py @@ -1,6 +1,6 @@ name = "core" title = "Core" -version = "1.2.0+dev" +version = "1.3.0" client_dir = "ayon_core" diff --git a/pyproject.toml b/pyproject.toml index c7e2bb5000..fda22073ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "ayon-core" -version = "1.2.0+dev" +version = "1.3.0" description = "" authors = ["Ynput Team "] readme = "README.md" From 3df127ec3b57d2619fc66db79e6f5f035c815a76 Mon Sep 17 00:00:00 2001 From: Ynbot Date: Wed, 21 May 2025 07:26:06 +0000 Subject: [PATCH 17/21] [Automated] Update version in package.py for develop --- client/ayon_core/version.py | 2 +- package.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/version.py b/client/ayon_core/version.py index 533862fa9a..64842b5976 100644 --- a/client/ayon_core/version.py +++ b/client/ayon_core/version.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- """Package declaring AYON addon 'core' version.""" -__version__ = "1.3.0" +__version__ = "1.3.0+dev" diff --git a/package.py b/package.py index 7406e70aec..32fedd859b 100644 --- a/package.py +++ b/package.py @@ -1,6 +1,6 @@ name = "core" title = "Core" -version = "1.3.0" +version = "1.3.0+dev" client_dir = "ayon_core" diff --git a/pyproject.toml b/pyproject.toml index fda22073ff..4034d6c0c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "ayon-core" -version = "1.3.0" +version = "1.3.0+dev" description = "" authors = ["Ynput Team "] readme = "README.md" From c020f821dca71138d48225d8cd32fa51bd8b1835 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 21 May 2025 07:27:00 +0000 Subject: [PATCH 18/21] chore(): update bug report / version --- .github/ISSUE_TEMPLATE/bug_report.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index c1e18faf55..9dbe32b018 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -35,6 +35,7 @@ body: label: Version description: What version are you running? Look to AYON Tray options: + - 1.3.0 - 1.2.0 - 1.1.9 - 1.1.8 From 46f6198ba36c23448c16f7133982753e16b58a55 Mon Sep 17 00:00:00 2001 From: Ynbot Date: Thu, 22 May 2025 12:31:33 +0000 Subject: [PATCH 19/21] [Automated] Add generated package files from main --- client/ayon_core/version.py | 2 +- package.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/version.py b/client/ayon_core/version.py index 64842b5976..1b2f0defaa 100644 --- a/client/ayon_core/version.py +++ b/client/ayon_core/version.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- """Package declaring AYON addon 'core' version.""" -__version__ = "1.3.0+dev" +__version__ = "1.3.1" diff --git a/package.py b/package.py index 32fedd859b..8161d63b1c 100644 --- a/package.py +++ b/package.py @@ -1,6 +1,6 @@ name = "core" title = "Core" -version = "1.3.0+dev" +version = "1.3.1" client_dir = "ayon_core" diff --git a/pyproject.toml b/pyproject.toml index 4034d6c0c6..79158e1010 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "ayon-core" -version = "1.3.0+dev" +version = "1.3.1" description = "" authors = ["Ynput Team "] readme = "README.md" From 6379f6726271ce848bc21c130bf28e30f8cdb302 Mon Sep 17 00:00:00 2001 From: Ynbot Date: Thu, 22 May 2025 12:32:10 +0000 Subject: [PATCH 20/21] [Automated] Update version in package.py for develop --- client/ayon_core/version.py | 2 +- package.py | 2 +- pyproject.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/client/ayon_core/version.py b/client/ayon_core/version.py index 1b2f0defaa..9c43e80bf1 100644 --- a/client/ayon_core/version.py +++ b/client/ayon_core/version.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- """Package declaring AYON addon 'core' version.""" -__version__ = "1.3.1" +__version__ = "1.3.1+dev" diff --git a/package.py b/package.py index 8161d63b1c..47e3b39083 100644 --- a/package.py +++ b/package.py @@ -1,6 +1,6 @@ name = "core" title = "Core" -version = "1.3.1" +version = "1.3.1+dev" client_dir = "ayon_core" diff --git a/pyproject.toml b/pyproject.toml index 79158e1010..f919a9589b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "ayon-core" -version = "1.3.1" +version = "1.3.1+dev" description = "" authors = ["Ynput Team "] readme = "README.md" From 1fe653c8b7323a1e84d48e6dd5684543b8b7793c Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 22 May 2025 12:33:06 +0000 Subject: [PATCH 21/21] chore(): update bug report / version --- .github/ISSUE_TEMPLATE/bug_report.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index 9dbe32b018..f71c6e2c29 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -35,6 +35,7 @@ body: label: Version description: What version are you running? Look to AYON Tray options: + - 1.3.1 - 1.3.0 - 1.2.0 - 1.1.9