From 2c102722627fad9949c950b70b47ddc7abf0bbf0 Mon Sep 17 00:00:00 2001 From: ChunYou Date: Tue, 4 Feb 2025 02:45:36 +0000 Subject: [PATCH] Make concurrent copying of files and change hero integration to use speedcopy on windows --- client/ayon_core/lib/file_transaction.py | 66 ++++++++++--------- .../plugins/publish/integrate_hero_version.py | 20 ++++-- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/client/ayon_core/lib/file_transaction.py b/client/ayon_core/lib/file_transaction.py index a502403958..7c330eb9b2 100644 --- a/client/ayon_core/lib/file_transaction.py +++ b/client/ayon_core/lib/file_transaction.py @@ -11,6 +11,7 @@ if sys.platform == "win32": else: from shutil import copyfile +import concurrent.futures class DuplicateDestinationError(ValueError): """Error raised when transfer destination already exists in queue. @@ -109,41 +110,46 @@ class FileTransaction: self._transfers[dst] = (src, opts) def process(self): - # Backup any existing files - for dst, (src, _) in self._transfers.items(): - self.log.debug("Checking file ... {} -> {}".format(src, dst)) - path_same = self._same_paths(src, dst) - if path_same or not os.path.exists(dst): - continue + with concurrent.futures.ThreadPoolExecutor() as executor: + backup_futures = [] + for dst, (src, _) in self._transfers.items(): + backup_futures.append(executor.submit(self.backup_file, dst, src)) + concurrent.futures.wait(backup_futures) - # Backup original file - # todo: add timestamp or uuid to ensure unique - backup = dst + ".bak" - self._backup_to_original[backup] = dst + transfer_futures = [] + for dst, (src, opts) in self._transfers.items(): + transfer_futures.append(executor.submit(self.transfer_file, dst, src, opts)) + concurrent.futures.wait(transfer_futures) + + def backup_file(self, dst, src): + self.log.debug("Checking file ... {} -> {}".format(src, dst)) + path_same = self._same_paths(src, dst) + if path_same or not os.path.exists(dst): + return + + # Backup original file + backup = dst + ".bak" + self._backup_to_original[backup] = dst + self.log.debug("Backup existing file: {} -> {}".format(dst, backup)) + os.rename(dst, backup) + + def transfer_file(self, dst, src, opts): + path_same = self._same_paths(src, dst) + if path_same: self.log.debug( - "Backup existing file: {} -> {}".format(dst, backup)) - os.rename(dst, backup) + "Source and destination are same files {} -> {}".format(src, dst)) + return - # Copy the files to transfer - for dst, (src, opts) in self._transfers.items(): - path_same = self._same_paths(src, dst) - if path_same: - self.log.debug( - "Source and destination are same files {} -> {}".format( - src, dst)) - continue + self._create_folder_for_file(dst) - self._create_folder_for_file(dst) + if opts["mode"] == self.MODE_COPY: + self.log.debug("Copying file ... {} -> {}".format(src, dst)) + copyfile(src, dst) + elif opts["mode"] == self.MODE_HARDLINK: + self.log.debug("Hardlinking file ... {} -> {}".format(src, dst)) + create_hard_link(src, dst) - if opts["mode"] == self.MODE_COPY: - self.log.debug("Copying file ... {} -> {}".format(src, dst)) - copyfile(src, dst) - elif opts["mode"] == self.MODE_HARDLINK: - self.log.debug("Hardlinking file ... {} -> {}".format( - src, dst)) - create_hard_link(src, dst) - - self._transferred.append(dst) + self._transferred.append(dst) def finalize(self): # Delete any backed up files diff --git a/client/ayon_core/plugins/publish/integrate_hero_version.py b/client/ayon_core/plugins/publish/integrate_hero_version.py index 2163596864..92becb6e01 100644 --- a/client/ayon_core/plugins/publish/integrate_hero_version.py +++ b/client/ayon_core/plugins/publish/integrate_hero_version.py @@ -2,6 +2,14 @@ import os import copy import errno import shutil +import sys +# this is needed until speedcopy for linux is fixed +if sys.platform == "win32": + from speedcopy import copyfile +else: + from shutil import copyfile + +import concurrent.futures import clique import pyblish.api @@ -415,11 +423,11 @@ class IntegrateHeroVersion( # Copy(hardlink) paths of source and destination files # TODO should we *only* create hardlinks? # TODO should we keep files for deletion until this is successful? - for src_path, dst_path in src_to_dst_file_paths: - self.copy_file(src_path, dst_path) - - for src_path, dst_path in other_file_paths_mapping: - self.copy_file(src_path, dst_path) + with concurrent.futures.ThreadPoolExecutor() as executor: + file_futures = [] + for src_path, dst_path in src_to_dst_file_paths + other_file_paths_mapping: + file_futures.append(executor.submit(self.copy_file, src_path, dst_path)) + concurrent.futures.wait(file_futures) # Update prepared representation etity data with files # and integrate it to server. @@ -648,7 +656,7 @@ class IntegrateHeroVersion( src_path, dst_path )) - shutil.copy(src_path, dst_path) + copyfile(src_path, dst_path) def version_from_representations(self, project_name, repres): for repre in repres: