diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index f271f3081c..83a371967f 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -108,17 +108,32 @@ class FileTransaction: self._transfers[dst] = (src, opts) - def process(self): - with concurrent.futures.ThreadPoolExecutor() as executor: - backup_futures = [] - for dst, (src, _) in self._transfers.items(): - backup_futures.append(executor.submit(self._backup_file, dst, src)) - concurrent.futures.wait(backup_futures) - transfer_futures = [] - for dst, (src, opts) in self._transfers.items(): - transfer_futures.append(executor.submit(self._transfer_file, dst, src, opts)) - concurrent.futures.wait(transfer_futures) + def _process_futures(self, futures): + """Wait for futures and raise exceptions if any task fails.""" + try: + for future in concurrent.futures.as_completed(futures): + future.result() # If an exception occurs, it will be raised here + except Exception as e: + print(f"File Transaction task failed with error: {e}", file=sys.stderr) + raise + + def process(self): + try: + with concurrent.futures.ThreadPoolExecutor(max_workers=min(8, len(self._transfers))) as executor: + # Submit backup tasks + backup_futures = [executor.submit(self._backup_file, dst, src) for dst, (src, _) in + self._transfers.items()] + self._process_futures(backup_futures) + + # Submit transfer tasks + transfer_futures = [executor.submit(self._transfer_file, dst, src, opts) for dst, (src, opts) in + self._transfers.items()] + self._process_futures(transfer_futures) + + except Exception as e: + print(f"File Transaction Failed: {e}", file=sys.stderr) + sys.exit(1) def _backup_file(self, dst, src): self.log.debug(f"Checking file ... {src} -> {dst}")