Merge pull request #1120 from r42-chun/unify-and-speed-up-copying

Speed Up Publishing Times
This commit is contained in:
Jakub Trllo 2025-05-22 14:28:26 +02:00 committed by GitHub
commit 633e1e8739
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 102 additions and 42 deletions

View file

@ -1,15 +1,13 @@
import concurrent.futures
import os
import logging
import sys
import errno
from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Optional
from ayon_core.lib import create_hard_link
# this is needed until speedcopy for linux is fixed
if sys.platform == "win32":
from speedcopy import copyfile
else:
from shutil import copyfile
from speedcopy import copyfile
class DuplicateDestinationError(ValueError):
@ -109,41 +107,52 @@ class FileTransaction:
self._transfers[dst] = (src, opts)
def process(self):
# Backup any existing files
for dst, (src, _) in self._transfers.items():
self.log.debug("Checking file ... {} -> {}".format(src, dst))
path_same = self._same_paths(src, dst)
if path_same or not os.path.exists(dst):
continue
with ThreadPoolExecutor(max_workers=8) as executor:
# Submit backup tasks
backup_futures = [
executor.submit(self._backup_file, dst, src)
for dst, (src, _) in self._transfers.items()
]
wait_for_future_errors(
executor, backup_futures, logger=self.log)
# Backup original file
# todo: add timestamp or uuid to ensure unique
backup = dst + ".bak"
self._backup_to_original[backup] = dst
# Submit transfer tasks
transfer_futures = [
executor.submit(self._transfer_file, dst, src, opts)
for dst, (src, opts) in self._transfers.items()
]
wait_for_future_errors(
executor, transfer_futures, logger=self.log)
def _backup_file(self, dst, src):
self.log.debug(f"Checking file ... {src} -> {dst}")
path_same = self._same_paths(src, dst)
if path_same or not os.path.exists(dst):
return
# Backup original file
backup = dst + ".bak"
self._backup_to_original[backup] = dst
self.log.debug(f"Backup existing file: {dst} -> {backup}")
os.rename(dst, backup)
def _transfer_file(self, dst, src, opts):
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug(
"Backup existing file: {} -> {}".format(dst, backup))
os.rename(dst, backup)
f"Source and destination are same files {src} -> {dst}")
return
# Copy the files to transfer
for dst, (src, opts) in self._transfers.items():
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug(
"Source and destination are same files {} -> {}".format(
src, dst))
continue
self._create_folder_for_file(dst)
self._create_folder_for_file(dst)
if opts["mode"] == self.MODE_COPY:
self.log.debug(f"Copying file ... {src} -> {dst}")
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug(f"Hardlinking file ... {src} -> {dst}")
create_hard_link(src, dst)
if opts["mode"] == self.MODE_COPY:
self.log.debug("Copying file ... {} -> {}".format(src, dst))
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug("Hardlinking file ... {} -> {}".format(
src, dst))
create_hard_link(src, dst)
self._transferred.append(dst)
self._transferred.append(dst)
def finalize(self):
# Delete any backed up files
@ -212,3 +221,46 @@ class FileTransaction:
return os.stat(src) == os.stat(dst)
return src == dst
def wait_for_future_errors(
executor: ThreadPoolExecutor,
futures: List[Future],
logger: Optional[logging.Logger] = None):
"""For the ThreadPoolExecutor shutdown and cancel futures as soon one of
the workers raises an error as they complete.
The ThreadPoolExecutor only cancels pending futures on exception but will
still complete those that are running - each which also themselves could
fail. We log all exceptions but re-raise the last exception only.
"""
if logger is None:
logger = logging.getLogger(__name__)
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
# As soon as an error occurs, stop executing more futures.
# Running workers, however, will still be complete, so we also want
# to log those errors if any occurred on them.
executor.shutdown(wait=True, cancel_futures=True)
break
else:
# Futures are completed, no exceptions occurred
return
# An exception occurred in at least one future. Get exceptions from
# all futures that are done and ended up failing until that point.
exceptions = []
for future in futures:
if not future.cancelled() and future.done():
exception = future.exception()
if exception:
exceptions.append(exception)
# Log any exceptions that occurred in all workers
for exception in exceptions:
logger.error("Error occurred in worker", exc_info=exception)
# Raise the last exception
raise exceptions[-1]

View file

@ -1,7 +1,11 @@
import os
import copy
import errno
import itertools
import shutil
from concurrent.futures import ThreadPoolExecutor
from speedcopy import copyfile
import clique
import pyblish.api
@ -13,6 +17,7 @@ from ayon_api.operations import (
from ayon_api.utils import create_entity_id
from ayon_core.lib import create_hard_link, source_hash
from ayon_core.lib.file_transaction import wait_for_future_errors
from ayon_core.pipeline.publish import (
get_publish_template_name,
OptionalPyblishPluginMixin,
@ -415,11 +420,14 @@ class IntegrateHeroVersion(
# Copy(hardlink) paths of source and destination files
# TODO should we *only* create hardlinks?
# TODO should we keep files for deletion until this is successful?
for src_path, dst_path in src_to_dst_file_paths:
self.copy_file(src_path, dst_path)
for src_path, dst_path in other_file_paths_mapping:
self.copy_file(src_path, dst_path)
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [
executor.submit(self.copy_file, src_path, dst_path)
for src_path, dst_path in itertools.chain(
src_to_dst_file_paths, other_file_paths_mapping
)
]
wait_for_future_errors(executor, futures)
# Update prepared representation etity data with files
# and integrate it to server.
@ -648,7 +656,7 @@ class IntegrateHeroVersion(
src_path, dst_path
))
shutil.copy(src_path, dst_path)
copyfile(src_path, dst_path)
def version_from_representations(self, project_name, repres):
for repre in repres: