Merge pull request #1622 from ynput/bugfix/902-ay-3875_ayon-integrate-hero-for-review

Integrate Hero: Use FileTransaction in integrate plugin
This commit is contained in:
Jakub Trllo 2025-12-22 13:29:11 +01:00 committed by GitHub
commit f9bbab9944
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,11 +1,8 @@
import os import os
import sys
import copy import copy
import errno
import itertools import itertools
import shutil import shutil
from concurrent.futures import ThreadPoolExecutor
from speedcopy import copyfile
import clique import clique
import pyblish.api import pyblish.api
@ -16,11 +13,15 @@ from ayon_api.operations import (
) )
from ayon_api.utils import create_entity_id from ayon_api.utils import create_entity_id
from ayon_core.lib import create_hard_link, source_hash from ayon_core.lib import source_hash
from ayon_core.lib.file_transaction import wait_for_future_errors from ayon_core.lib.file_transaction import (
FileTransaction,
DuplicateDestinationError,
)
from ayon_core.pipeline.publish import ( from ayon_core.pipeline.publish import (
get_publish_template_name, get_publish_template_name,
OptionalPyblishPluginMixin, OptionalPyblishPluginMixin,
KnownPublishError,
) )
@ -421,19 +422,40 @@ class IntegrateHeroVersion(
(repre_entity, dst_paths) (repre_entity, dst_paths)
) )
self.path_checks = [] file_transactions = FileTransaction(
log=self.log,
# Enforce unique transfers
allow_queue_replacements=False
)
mode = FileTransaction.MODE_COPY
if self.use_hardlinks:
mode = FileTransaction.MODE_LINK
# Copy(hardlink) paths of source and destination files try:
# TODO should we *only* create hardlinks? for src_path, dst_path in itertools.chain(
# TODO should we keep files for deletion until this is successful? src_to_dst_file_paths,
with ThreadPoolExecutor(max_workers=8) as executor: other_file_paths_mapping
futures = [ ):
executor.submit(self.copy_file, src_path, dst_path) file_transactions.add(src_path, dst_path, mode=mode)
for src_path, dst_path in itertools.chain(
src_to_dst_file_paths, other_file_paths_mapping self.log.debug("Integrating source files to destination ...")
) file_transactions.process()
]
wait_for_future_errors(executor, futures) except DuplicateDestinationError as exc:
# Raise DuplicateDestinationError as KnownPublishError
# and rollback the transactions
file_transactions.rollback()
raise KnownPublishError(exc).with_traceback(sys.exc_info()[2])
except Exception as exc:
# Rollback the transactions
file_transactions.rollback()
self.log.critical("Error when copying files", exc_info=True)
raise exc
# Finalizing can't rollback safely so no use for moving it to
# the try, except.
file_transactions.finalize()
# Update prepared representation etity data with files # Update prepared representation etity data with files
# and integrate it to server. # and integrate it to server.
@ -622,48 +644,6 @@ class IntegrateHeroVersion(
).format(path)) ).format(path))
return path return path
def copy_file(self, src_path, dst_path):
# TODO check drives if are the same to check if cas hardlink
dirname = os.path.dirname(dst_path)
try:
os.makedirs(dirname)
self.log.debug("Folder(s) created: \"{}\"".format(dirname))
except OSError as exc:
if exc.errno != errno.EEXIST:
self.log.error("An unexpected error occurred.", exc_info=True)
raise
self.log.debug("Folder already exists: \"{}\"".format(dirname))
if self.use_hardlinks:
# First try hardlink and copy if paths are cross drive
self.log.debug("Hardlinking file \"{}\" to \"{}\"".format(
src_path, dst_path
))
try:
create_hard_link(src_path, dst_path)
# Return when successful
return
except OSError as exc:
# re-raise exception if different than
# EXDEV - cross drive path
# EINVAL - wrong format, must be NTFS
self.log.debug(
"Hardlink failed with errno:'{}'".format(exc.errno))
if exc.errno not in [errno.EXDEV, errno.EINVAL]:
raise
self.log.debug(
"Hardlinking failed, falling back to regular copy...")
self.log.debug("Copying file \"{}\" to \"{}\"".format(
src_path, dst_path
))
copyfile(src_path, dst_path)
def version_from_representations(self, project_name, repres): def version_from_representations(self, project_name, repres):
for repre in repres: for repre in repres:
version = ayon_api.get_version_by_id( version = ayon_api.get_version_by_id(