Merge branch 'develop' into bugfix/oiio-trancode-padding-issue

This commit is contained in:
Jakub Ježek 2025-05-23 09:59:08 +02:00 committed by GitHub
commit c3c8096149
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 107 additions and 45 deletions

View file

@ -35,6 +35,8 @@ body:
label: Version label: Version
description: What version are you running? Look to AYON Tray description: What version are you running? Look to AYON Tray
options: options:
- 1.3.1
- 1.3.0
- 1.2.0 - 1.2.0
- 1.1.9 - 1.1.9
- 1.1.8 - 1.1.8

View file

@ -1,15 +1,13 @@
import concurrent.futures
import os import os
import logging import logging
import sys
import errno import errno
from concurrent.futures import ThreadPoolExecutor, Future
from typing import List, Optional
from ayon_core.lib import create_hard_link from ayon_core.lib import create_hard_link
# this is needed until speedcopy for linux is fixed from speedcopy import copyfile
if sys.platform == "win32":
from speedcopy import copyfile
else:
from shutil import copyfile
class DuplicateDestinationError(ValueError): class DuplicateDestinationError(ValueError):
@ -109,41 +107,52 @@ class FileTransaction:
self._transfers[dst] = (src, opts) self._transfers[dst] = (src, opts)
def process(self): def process(self):
# Backup any existing files with ThreadPoolExecutor(max_workers=8) as executor:
for dst, (src, _) in self._transfers.items(): # Submit backup tasks
self.log.debug("Checking file ... {} -> {}".format(src, dst)) backup_futures = [
path_same = self._same_paths(src, dst) executor.submit(self._backup_file, dst, src)
if path_same or not os.path.exists(dst): for dst, (src, _) in self._transfers.items()
continue ]
wait_for_future_errors(
executor, backup_futures, logger=self.log)
# Backup original file # Submit transfer tasks
# todo: add timestamp or uuid to ensure unique transfer_futures = [
backup = dst + ".bak" executor.submit(self._transfer_file, dst, src, opts)
self._backup_to_original[backup] = dst for dst, (src, opts) in self._transfers.items()
]
wait_for_future_errors(
executor, transfer_futures, logger=self.log)
def _backup_file(self, dst, src):
self.log.debug(f"Checking file ... {src} -> {dst}")
path_same = self._same_paths(src, dst)
if path_same or not os.path.exists(dst):
return
# Backup original file
backup = dst + ".bak"
self._backup_to_original[backup] = dst
self.log.debug(f"Backup existing file: {dst} -> {backup}")
os.rename(dst, backup)
def _transfer_file(self, dst, src, opts):
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug( self.log.debug(
"Backup existing file: {} -> {}".format(dst, backup)) f"Source and destination are same files {src} -> {dst}")
os.rename(dst, backup) return
# Copy the files to transfer self._create_folder_for_file(dst)
for dst, (src, opts) in self._transfers.items():
path_same = self._same_paths(src, dst)
if path_same:
self.log.debug(
"Source and destination are same files {} -> {}".format(
src, dst))
continue
self._create_folder_for_file(dst) if opts["mode"] == self.MODE_COPY:
self.log.debug(f"Copying file ... {src} -> {dst}")
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug(f"Hardlinking file ... {src} -> {dst}")
create_hard_link(src, dst)
if opts["mode"] == self.MODE_COPY: self._transferred.append(dst)
self.log.debug("Copying file ... {} -> {}".format(src, dst))
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug("Hardlinking file ... {} -> {}".format(
src, dst))
create_hard_link(src, dst)
self._transferred.append(dst)
def finalize(self): def finalize(self):
# Delete any backed up files # Delete any backed up files
@ -212,3 +221,46 @@ class FileTransaction:
return os.stat(src) == os.stat(dst) return os.stat(src) == os.stat(dst)
return src == dst return src == dst
def wait_for_future_errors(
executor: ThreadPoolExecutor,
futures: List[Future],
logger: Optional[logging.Logger] = None):
"""For the ThreadPoolExecutor shutdown and cancel futures as soon one of
the workers raises an error as they complete.
The ThreadPoolExecutor only cancels pending futures on exception but will
still complete those that are running - each which also themselves could
fail. We log all exceptions but re-raise the last exception only.
"""
if logger is None:
logger = logging.getLogger(__name__)
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
# As soon as an error occurs, stop executing more futures.
# Running workers, however, will still be complete, so we also want
# to log those errors if any occurred on them.
executor.shutdown(wait=True, cancel_futures=True)
break
else:
# Futures are completed, no exceptions occurred
return
# An exception occurred in at least one future. Get exceptions from
# all futures that are done and ended up failing until that point.
exceptions = []
for future in futures:
if not future.cancelled() and future.done():
exception = future.exception()
if exception:
exceptions.append(exception)
# Log any exceptions that occurred in all workers
for exception in exceptions:
logger.error("Error occurred in worker", exc_info=exception)
# Raise the last exception
raise exceptions[-1]

View file

@ -1,7 +1,11 @@
import os import os
import copy import copy
import errno import errno
import itertools
import shutil import shutil
from concurrent.futures import ThreadPoolExecutor
from speedcopy import copyfile
import clique import clique
import pyblish.api import pyblish.api
@ -13,6 +17,7 @@ from ayon_api.operations import (
from ayon_api.utils import create_entity_id from ayon_api.utils import create_entity_id
from ayon_core.lib import create_hard_link, source_hash from ayon_core.lib import create_hard_link, source_hash
from ayon_core.lib.file_transaction import wait_for_future_errors
from ayon_core.pipeline.publish import ( from ayon_core.pipeline.publish import (
get_publish_template_name, get_publish_template_name,
OptionalPyblishPluginMixin, OptionalPyblishPluginMixin,
@ -415,11 +420,14 @@ class IntegrateHeroVersion(
# Copy(hardlink) paths of source and destination files # Copy(hardlink) paths of source and destination files
# TODO should we *only* create hardlinks? # TODO should we *only* create hardlinks?
# TODO should we keep files for deletion until this is successful? # TODO should we keep files for deletion until this is successful?
for src_path, dst_path in src_to_dst_file_paths: with ThreadPoolExecutor(max_workers=8) as executor:
self.copy_file(src_path, dst_path) futures = [
executor.submit(self.copy_file, src_path, dst_path)
for src_path, dst_path in other_file_paths_mapping: for src_path, dst_path in itertools.chain(
self.copy_file(src_path, dst_path) src_to_dst_file_paths, other_file_paths_mapping
)
]
wait_for_future_errors(executor, futures)
# Update prepared representation etity data with files # Update prepared representation etity data with files
# and integrate it to server. # and integrate it to server.
@ -648,7 +656,7 @@ class IntegrateHeroVersion(
src_path, dst_path src_path, dst_path
)) ))
shutil.copy(src_path, dst_path) copyfile(src_path, dst_path)
def version_from_representations(self, project_name, repres): def version_from_representations(self, project_name, repres):
for repre in repres: for repre in repres:

View file

@ -1,3 +1,3 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
"""Package declaring AYON addon 'core' version.""" """Package declaring AYON addon 'core' version."""
__version__ = "1.2.0+dev" __version__ = "1.3.1+dev"

View file

@ -1,6 +1,6 @@
name = "core" name = "core"
title = "Core" title = "Core"
version = "1.2.0+dev" version = "1.3.1+dev"
client_dir = "ayon_core" client_dir = "ayon_core"

View file

@ -5,7 +5,7 @@
[tool.poetry] [tool.poetry]
name = "ayon-core" name = "ayon-core"
version = "1.2.0+dev" version = "1.3.1+dev"
description = "" description = ""
authors = ["Ynput Team <team@ynput.io>"] authors = ["Ynput Team <team@ynput.io>"]
readme = "README.md" readme = "README.md"