use FileTransaction in integrate hero

This commit is contained in:
Jakub Trllo 2025-12-19 11:23:37 +01:00
parent 92d4da9efa
commit 7485d99cf6

View file

@ -1,11 +1,8 @@
import os
import sys
import copy
import errno
import itertools
import shutil
from concurrent.futures import ThreadPoolExecutor
from speedcopy import copyfile
import clique
import pyblish.api
@ -16,11 +13,15 @@ from ayon_api.operations import (
)
from ayon_api.utils import create_entity_id
from ayon_core.lib import create_hard_link, source_hash
from ayon_core.lib.file_transaction import wait_for_future_errors
from ayon_core.lib import source_hash
from ayon_core.lib.file_transaction import (
FileTransaction,
DuplicateDestinationError,
)
from ayon_core.pipeline.publish import (
get_publish_template_name,
OptionalPyblishPluginMixin,
KnownPublishError,
)
@ -421,19 +422,41 @@ class IntegrateHeroVersion(
(repre_entity, dst_paths)
)
self.path_checks = []
file_transactions = FileTransaction(
log=self.log,
# Enforce unique transfers
allow_queue_replacements=False
)
mode = FileTransaction.MODE_COPY
if self.use_hardlinks:
mode = FileTransaction.MODE_LINK
# Copy(hardlink) paths of source and destination files
# TODO should we *only* create hardlinks?
# TODO should we keep files for deletion until this is successful?
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [
executor.submit(self.copy_file, src_path, dst_path)
for src_path, dst_path in itertools.chain(
src_to_dst_file_paths, other_file_paths_mapping
)
]
wait_for_future_errors(executor, futures)
try:
for src_path, dst_path in itertools.chain(
src_to_dst_file_paths,
other_file_paths_mapping
):
file_transactions.add(src_path, dst_path, mode=mode)
self.log.debug("Integrating source files to destination ...")
file_transactions.process()
except DuplicateDestinationError as exc:
# Raise DuplicateDestinationError as KnownPublishError
# and rollback the transactions
file_transactions.rollback()
raise KnownPublishError(exc).with_traceback(sys.exc_info()[2])
except Exception as exc:
# clean destination
# todo: preferably we'd also rollback *any* changes to the database
file_transactions.rollback()
self.log.critical("Error when copying files", exc_info=True)
raise exc
# Finalizing can't rollback safely so no use for moving it to
# the try, except.
file_transactions.finalize()
# Update prepared representation etity data with files
# and integrate it to server.
@ -622,48 +645,6 @@ class IntegrateHeroVersion(
).format(path))
return path
def copy_file(self, src_path, dst_path):
# TODO check drives if are the same to check if cas hardlink
dirname = os.path.dirname(dst_path)
try:
os.makedirs(dirname)
self.log.debug("Folder(s) created: \"{}\"".format(dirname))
except OSError as exc:
if exc.errno != errno.EEXIST:
self.log.error("An unexpected error occurred.", exc_info=True)
raise
self.log.debug("Folder already exists: \"{}\"".format(dirname))
if self.use_hardlinks:
# First try hardlink and copy if paths are cross drive
self.log.debug("Hardlinking file \"{}\" to \"{}\"".format(
src_path, dst_path
))
try:
create_hard_link(src_path, dst_path)
# Return when successful
return
except OSError as exc:
# re-raise exception if different than
# EXDEV - cross drive path
# EINVAL - wrong format, must be NTFS
self.log.debug(
"Hardlink failed with errno:'{}'".format(exc.errno))
if exc.errno not in [errno.EXDEV, errno.EINVAL]:
raise
self.log.debug(
"Hardlinking failed, falling back to regular copy...")
self.log.debug("Copying file \"{}\" to \"{}\"".format(
src_path, dst_path
))
copyfile(src_path, dst_path)
def version_from_representations(self, project_name, repres):
for repre in repres:
version = ayon_api.get_version_by_id(