diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml index c1e18faf55..f71c6e2c29 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -35,6 +35,8 @@ body: label: Version description: What version are you running? Look to AYON Tray options: + - 1.3.1 + - 1.3.0 - 1.2.0 - 1.1.9 - 1.1.8 diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index a502403958..d720ff8d30 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -1,15 +1,13 @@ +import concurrent.futures import os import logging -import sys import errno +from concurrent.futures import ThreadPoolExecutor, Future +from typing import List, Optional from ayon_core.lib import create_hard_link -# this is needed until speedcopy for linux is fixed -if sys.platform == "win32": - from speedcopy import copyfile -else: - from shutil import copyfile +from speedcopy import copyfile class DuplicateDestinationError(ValueError): @@ -109,41 +107,52 @@ class FileTransaction: self._transfers[dst] = (src, opts) def process(self): - # Backup any existing files - for dst, (src, _) in self._transfers.items(): - self.log.debug("Checking file ... {} -> {}".format(src, dst)) - path_same = self._same_paths(src, dst) - if path_same or not os.path.exists(dst): - continue + with ThreadPoolExecutor(max_workers=8) as executor: + # Submit backup tasks + backup_futures = [ + executor.submit(self._backup_file, dst, src) + for dst, (src, _) in self._transfers.items() + ] + wait_for_future_errors( + executor, backup_futures, logger=self.log) - # Backup original file - # todo: add timestamp or uuid to ensure unique - backup = dst + ".bak" - self._backup_to_original[backup] = dst + # Submit transfer tasks + transfer_futures = [ + executor.submit(self._transfer_file, dst, src, opts) + for dst, (src, opts) in self._transfers.items() + ] + wait_for_future_errors( + executor, transfer_futures, logger=self.log) + + def _backup_file(self, dst, src): + self.log.debug(f"Checking file ... {src} -> {dst}") + path_same = self._same_paths(src, dst) + if path_same or not os.path.exists(dst): + return + + # Backup original file + backup = dst + ".bak" + self._backup_to_original[backup] = dst + self.log.debug(f"Backup existing file: {dst} -> {backup}") + os.rename(dst, backup) + + def _transfer_file(self, dst, src, opts): + path_same = self._same_paths(src, dst) + if path_same: self.log.debug( - "Backup existing file: {} -> {}".format(dst, backup)) - os.rename(dst, backup) + f"Source and destination are same files {src} -> {dst}") + return - # Copy the files to transfer - for dst, (src, opts) in self._transfers.items(): - path_same = self._same_paths(src, dst) - if path_same: - self.log.debug( - "Source and destination are same files {} -> {}".format( - src, dst)) - continue + self._create_folder_for_file(dst) - self._create_folder_for_file(dst) + if opts["mode"] == self.MODE_COPY: + self.log.debug(f"Copying file ... {src} -> {dst}") + copyfile(src, dst) + elif opts["mode"] == self.MODE_HARDLINK: + self.log.debug(f"Hardlinking file ... {src} -> {dst}") + create_hard_link(src, dst) - if opts["mode"] == self.MODE_COPY: - self.log.debug("Copying file ... {} -> {}".format(src, dst)) - copyfile(src, dst) - elif opts["mode"] == self.MODE_HARDLINK: - self.log.debug("Hardlinking file ... {} -> {}".format( - src, dst)) - create_hard_link(src, dst) - - self._transferred.append(dst) + self._transferred.append(dst) def finalize(self): # Delete any backed up files @@ -212,3 +221,46 @@ class FileTransaction: return os.stat(src) == os.stat(dst) return src == dst + + +def wait_for_future_errors( + executor: ThreadPoolExecutor, + futures: List[Future], + logger: Optional[logging.Logger] = None): + """For the ThreadPoolExecutor shutdown and cancel futures as soon one of + the workers raises an error as they complete. + + The ThreadPoolExecutor only cancels pending futures on exception but will + still complete those that are running - each which also themselves could + fail. We log all exceptions but re-raise the last exception only. + """ + if logger is None: + logger = logging.getLogger(__name__) + + for future in concurrent.futures.as_completed(futures): + exception = future.exception() + if exception: + # As soon as an error occurs, stop executing more futures. + # Running workers, however, will still be complete, so we also want + # to log those errors if any occurred on them. + executor.shutdown(wait=True, cancel_futures=True) + break + else: + # Futures are completed, no exceptions occurred + return + + # An exception occurred in at least one future. Get exceptions from + # all futures that are done and ended up failing until that point. + exceptions = [] + for future in futures: + if not future.cancelled() and future.done(): + exception = future.exception() + if exception: + exceptions.append(exception) + + # Log any exceptions that occurred in all workers + for exception in exceptions: + logger.error("Error occurred in worker", exc_info=exception) + + # Raise the last exception + raise exceptions[-1] diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 2163596864..43f93da293 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -1,7 +1,11 @@ import os import copy import errno +import itertools import shutil +from concurrent.futures import ThreadPoolExecutor + +from speedcopy import copyfile import clique import pyblish.api @@ -13,6 +17,7 @@ from ayon_api.operations import ( from ayon_api.utils import create_entity_id from ayon_core.lib import create_hard_link, source_hash +from ayon_core.lib.file_transaction import wait_for_future_errors from ayon_core.pipeline.publish import ( get_publish_template_name, OptionalPyblishPluginMixin, @@ -415,11 +420,14 @@ class IntegrateHeroVersion( # Copy(hardlink) paths of source and destination files # TODO should we *only* create hardlinks? # TODO should we keep files for deletion until this is successful? - for src_path, dst_path in src_to_dst_file_paths: - self.copy_file(src_path, dst_path) - - for src_path, dst_path in other_file_paths_mapping: - self.copy_file(src_path, dst_path) + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [ + executor.submit(self.copy_file, src_path, dst_path) + for src_path, dst_path in itertools.chain( + src_to_dst_file_paths, other_file_paths_mapping + ) + ] + wait_for_future_errors(executor, futures) # Update prepared representation etity data with files # and integrate it to server. @@ -648,7 +656,7 @@ class IntegrateHeroVersion( src_path, dst_path )) - shutil.copy(src_path, dst_path) + copyfile(src_path, dst_path) def version_from_representations(self, project_name, repres): for repre in repres: diff --git a/client/ayon_core/version.py b/client/ayon_core/version.py index 4fd7bde336..9c43e80bf1 100644 --- a/client/ayon_core/version.py +++ b/client/ayon_core/version.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- """Package declaring AYON addon 'core' version.""" -__version__ = "1.2.0+dev" +__version__ = "1.3.1+dev" diff --git a/package.py b/package.py index 601d703857..47e3b39083 100644 --- a/package.py +++ b/package.py @@ -1,6 +1,6 @@ name = "core" title = "Core" -version = "1.2.0+dev" +version = "1.3.1+dev" client_dir = "ayon_core" diff --git a/pyproject.toml b/pyproject.toml index c7e2bb5000..f919a9589b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ [tool.poetry] name = "ayon-core" -version = "1.2.0+dev" +version = "1.3.1+dev" description = "" authors = ["Ynput Team "] readme = "README.md"