More refactoring + draft (untested) implementation for separating File Transaction logic

This commit is contained in:
Roy Nieterau 2022-03-17 11:49:12 +01:00
parent d88ed919e6
commit ae1a9ff4cf

View file

@ -1,12 +1,10 @@
import os
from os.path import getsize
import logging
import sys
import copy
import clique
import errno
import six
import re
from pymongo import DeleteOne, InsertOne, UpdateOne
import pyblish.api
@ -14,7 +12,6 @@ from avalon import io
from avalon.api import format_template_with_optional_keys
import openpype.api
from datetime import datetime
# from pype.modules import ModulesManager
from openpype.lib.profiles_filtering import filter_profiles
from openpype.lib import (
prepare_template_data,
@ -41,6 +38,160 @@ def get_first_frame_padded(collection):
return get_frame_padded(start_frame, padding=collection.padding)
class FileTransaction(object):
"""
The file transaction is a three step process.
1) Rename any existing files to a "temporary backup" during `process()`
2) Copy the files to final destination during `process()`
3) Remove any backed up files (*no rollback possible!) during `finalize()`
Step 3 is done during `finalize()`. If not called the .bak files will
remain on disk.
These steps try to ensure that we don't overwrite half of any existing
files e.g. if they are currently in use.
Note:
A regular filesystem is *not* a transactional file system and even
though this implementation tries to produce a 'safe copy' with a
potential rollback do keep in mind that it's inherently unsafe due
to how filesystem works and a myriad of things could happen during
the transaction that break the logic. A file storage could go down,
permissions could be changed, other machines could be moving or writing
files. A lot can happen.
Warning:
Any folders created during the transfer will not be removed.
"""
MODE_COPY = 0
MODE_HARDLINK = 1
def __init__(self, log=None):
if log is None:
log = logging.getLogger("FileTransaction")
self.log = log
# The transfer queue
# todo: make this an actual FIFO queue?
self._transfers = {}
# Destination file paths that a file was transferred to
self._transferred = []
# Backup file location mapping to original locations
self._backup_to_original = {}
def add(self, src, dst, mode=MODE_COPY):
"""Add a new file to transfer queue"""
opts = {"mode": mode}
src = os.path.normpath(src)
dst = os.path.normpath(dst)
if dst in self._transfers:
queued_src = self._transfers[dst][0]
if src == queued_src:
self.log.debug("File transfer was already "
"in queue: {} -> {}".format(src, dst))
return
else:
self.log.warning("File transfer in queue overwritten")
self._transfers[dst] = (src, opts)
def process(self):
# Backup any existing files
for dst in self._transfers.keys():
if os.path.exists(dst):
# Backup original file
# todo: add timestamp or uuid to ensure unique
backup = dst + ".bak"
self._backup_to_original[backup] = dst
self.log.debug("Backup existing file: "
"{} -> {}".format(dst, backup))
os.rename(dst, backup)
# Copy the files to transfer
for dst, (src, opts) in self._transfers.items():
self._create_folder_for_file(dst)
if opts["mode"] == self.MODE_COPY:
self.log.debug("Copying file ... {} -> {}".format(src, dst))
copyfile(src, dst)
elif opts["mode"] == self.MODE_HARDLINK:
self.log.debug("Hardlinking file ... {} -> {}".format(src, dst))
create_hard_link(src, dst)
self._transferred.append(dst)
def finalize(self):
# Delete any backed up files
for backup in self._backup_to_original.keys():
try:
os.remove(backup)
except OSError:
self.log.error("Failed to remove backup file: "
"{}".format(backup),
exc_info=True)
def rollback(self):
errors = 0
# Rollback any transferred files
for path in self._transferred:
try:
os.remove(path)
except OSError:
errors += 1
self.log.error("Failed to rollback created file: "
"{}".format(path),
exc_info=True)
# Rollback the backups
for backup, original in self._backup_to_original.items():
try:
os.rename(backup, original)
except OSError:
errors +=1
self.log.error("Failed to restore original file: "
"{} -> {}".format(backup, original),
exc_info=True)
if errors:
self.log.error("{} errors occurred during "
"rollback.".format(errors), exc_info=True)
six.reraise(*sys.exc_info())
@property
def transferred(self):
"""Return the processed transfers destination paths"""
return list(self._transferred)
@property
def backups(self):
"""Return the backup file paths"""
return list(self._backup_to_original.keys())
def _create_folder_for_file(self, path):
dirname = os.path.dirname(path)
try:
os.makedirs(dirname)
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
self.log.critical("An unexpected error occurred.")
six.reraise(*sys.exc_info())
class IntegrateAssetNew(pyblish.api.InstancePlugin):
"""Resolve any dependency issues
@ -122,18 +273,11 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
]
default_template_name = "publish"
# suffix to denote temporary files, use without '.'
TMP_FILE_EXT = 'tmp'
# file_url : file_size of all published and uploaded files
destinations = list()
# Attributes set by settings
template_name_profiles = None
subset_grouping_profiles = None
def process(self, instance):
self.destinations = []
# Exclude instances that also contain families from exclude families
families = set(
@ -143,17 +287,20 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
if families & set(self.exclude_families):
return
file_transactions = FileTransaction(log=self.log)
try:
self.register(instance)
self.log.info("Integrated Asset in to the database ...")
self.handle_destination_files(self.destinations,
'finalize')
self.register(instance, file_transactions)
except Exception:
# clean destination
# todo: rollback any registered entities? (or how safe are we?)
file_transactions.rollback()
self.log.critical("Error when registering", exc_info=True)
self.handle_destination_files(self.destinations, 'remove')
six.reraise(*sys.exc_info())
# Finalizing can't be rollbacked safely so no use for moving it to
# the try, except.
file_transactions.finalize()
def prepare_anatomy(self, instance):
"""Prepare anatomy data used to define representation destinations"""
@ -244,7 +391,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
return template_name, anatomy_data
def register(self, instance):
def register(self, instance, file_transactions):
instance_stagingdir = instance.data.get("stagingDir")
if not instance_stagingdir:
@ -272,9 +419,8 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
version = self.register_version(instance, subset)
instance.data["versionEntity"] = version
instance.data['version'] = version['name']
existing_repres = list(io.find({
archived_repres = list(io.find({
"parent": version["_id"],
"type": "archived_representation"
}))
@ -294,19 +440,47 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
prepared = self.prepare_representation(repre,
anatomy_data,
template_name,
existing_repres,
archived_repres,
version,
instance_stagingdir,
instance)
representation = prepared["representation"]
# todo: register the file transfers correctly
for src, dst in representation["transfers"]:
file_transactions.add(src, dst,
mode=file_transactions.MODE_COPY)
for src, dst in representation["hardlinks"]:
file_transactions.add(src, dst,
mode=file_transactions.MODE_HARDLINK)
# todo: simplify this?
representation = prepared["representation"]
representations.append(representation)
published_representations[representation["_id"]] = prepared
# could throw exception, will be caught in 'process'
# all integration to DB is being done together lower,
# so no rollback needed
self.log.debug("Integrating source files to destination ...")
file_transactions.process()
self.log.debug("Backup files "
"{}".format(file_transactions.backups))
self.log.debug("Integrated files "
"{}".format(file_transactions.transferred))
# todo: fix get file info for transferred files per representation
# currently it'd set all files for all representations
# get 'files' info for representation and all attached resources
integrated_files = file_transactions.transferred
self.log.debug("Preparing files information ...")
representation["files"] = self.get_files_info(
instance,
integrated_files
)
# Remove old representations if there are any (before insertion of new)
if existing_repres:
repre_ids_to_remove = [repre["_id"] for repre in existing_repres]
if archived_repres:
repre_ids_to_remove = [repre["_id"] for repre in archived_repres]
io.delete_many({"_id": {"$in": repre_ids_to_remove}})
# Write the new representations to the database
@ -395,7 +569,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
def prepare_representation(self, repre,
anatomy_data,
template_name,
existing_repres,
archived_repres,
version,
instance_stagingdir,
instance):
@ -439,11 +613,13 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
self.log.debug("Representation uses instance staging dir: "
"{}".format(instance_stagingdir))
stagingdir = instance_stagingdir
if not stagingdir:
raise ValueError("No staging directory set for representation: "
"{}".format(repre))
self.log.debug("Anatomy template name: {}".format(template_name))
anatomy = instance.context.data['anatomy']
template = os.path.normpath(
anatomy.templates[template_name]["path"])
template = os.path.normpath(anatomy.templates[template_name]["path"])
is_sequence_representation = isinstance(files, (list, tuple))
if is_sequence_representation:
@ -566,24 +742,21 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
continue
repre_context[key] = template_data[key]
# Use previous representation's id if there are any
repre_id = None
repre_name_lower = repre["name"].lower()
for _existing_repre in existing_repres:
# NOTE should we check lowered names?
if repre_name_lower == _existing_repre["name"].lower():
repre_id = _existing_repre["orig_id"]
break
# Define representation id
repre_id = io.ObjectId()
# Create new id if existing representations does not match
if repre_id is None:
repre_id = io.ObjectId()
# Use previous representation's id if there is a name match
repre_name_lower = repre["name"].lower()
for _archived_repres in archived_repres:
if repre_name_lower == _archived_repres["name"].lower():
repre_id = _archived_repres["orig_id"]
break
# todo: `repre` is not the actual `representation` entity
# we should simplify/clarify difference between data above
# and the actual representation entity for the database
data = repre.get("data") or {}
data.update({'path': dst, 'template': template})
data.update({'path': repre["published_path"], 'template': template})
representation = {
"_id": repre_id,
"schema": "openpype:representation-2.0",
@ -597,34 +770,14 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
"context": repre_context
}
# todo: simplify/streamline which additional data makes its way into
# the representation context
if repre.get("outputName"):
representation["context"]["output"] = repre['outputName']
if is_sequence_representation and repre.get("frameStart") is not None:
representation['context']['frame'] = template_data["frame"]
# any file that should be physically copied is expected in
# 'transfers' or 'hardlinks'
integrated_files = []
if instance.data.get('transfers', False) or \
instance.data.get('hardlinks', False):
# could throw exception, will be caught in 'process'
# all integration to DB is being done together lower,
# so no rollback needed
# todo: separate the actual integrating of the files onto its own
# taking just a list of transfers as inputs (potentially
# with copy mode flag, like hardlink/copy, etc.)
self.log.debug("Integrating source files to destination ...")
integrated_files = self.integrate(instance)
self.log.debug("Integrated files {}".format(integrated_files))
# get 'files' info for representation and all attached resources
self.log.debug("Preparing files information ...")
representation["files"] = self.get_files_info(
instance,
integrated_files
)
return {
"representation": representation,
"anatomy_data": template_data,
@ -633,84 +786,6 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
"published_files": [transfer[1] for transfer in repre["transfers"]]
}
def integrate(self, instance):
""" Move the files.
Through `instance.data["transfers"]`
Args:
instance: the instance to integrate
Returns:
list: destination full paths of integrated files
"""
# store destinations for potential rollback and measuring sizes
destinations = []
transfers = list(instance.data.get("transfers", list()))
for src, dest in transfers:
src = os.path.normpath(src)
dest = os.path.normpath(dest)
if src != dest:
dest = self.get_dest_temp_url(dest)
self.copy_file(src, dest)
destinations.append(dest)
# Produce hardlinked copies
hardlinks = instance.data.get("hardlinks", list())
for src, dest in hardlinks:
dest = self.get_dest_temp_url(dest)
if not os.path.exists(dest):
self.hardlink_file(src, dest)
destinations.append(dest)
return destinations
def _create_folder_for_file(self, path):
dirname = os.path.dirname(path)
try:
os.makedirs(dirname)
except OSError as e:
if e.errno == errno.EEXIST:
pass
else:
self.log.critical("An unexpected error occurred.")
six.reraise(*sys.exc_info())
def copy_file(self, src, dst):
"""Copy source filepath to destination filepath
Arguments:
src (str): the source file which needs to be copied
dst (str): the destination filepath
Returns:
None
"""
self._create_folder_for_file(dst)
self.log.debug("Copying file ... {} -> {}".format(src, dst))
copyfile(src, dst)
def hardlink_file(self, src, dst):
"""Hardlink source filepath to destination filepath.
Note:
Hardlink can only be produced between two files on the same
server/disk and editing one of the two will edit both files at
once. As such it is recommended to only make hardlinks between
static files to ensure publishes remain safe and non-edited.
Arguments:
src (str): the source file which needs to be hardlinked
dst (str): the destination filepath
Returns:
None
"""
self._create_folder_for_file(dst)
self.log.debug("Hardlinking file ... {} -> {}".format(src, dst))
create_hard_link(src, dst)
def _get_instance_families(self, instance):
"""Get all families of the instance"""
# todo: move this to lib?
@ -727,7 +802,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
def register_subset(self, instance):
# todo: rely less on self.prepare_anatomy to create this value
asset = instance.data.get("assetEntity") # <- from prepare_anatomy :(
asset = instance.data.get("assetEntity") # stored by prepare_anatomy
subset_name = instance.data["subset"]
subset = io.find_one({
"type": "subset",
@ -957,25 +1032,6 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
return output_resources
def get_dest_temp_url(self, dest):
""" Enhance destination path with TMP_FILE_EXT to denote temporary
file.
Temporary files will be renamed after successful registration
into DB and full copy to destination
Arguments:
dest: destination url of published file (absolute)
Returns:
dest: destination path + '.TMP_FILE_EXT'
"""
if self.TMP_FILE_EXT and '.{}'.format(self.TMP_FILE_EXT) not in dest:
dest += '.{}'.format(self.TMP_FILE_EXT)
return dest
def get_dest_final_url(self, temp_file_url):
"""Temporary destination file url to final destination file url"""
return re.sub(r'\.{}$'.format(self.TMP_FILE_EXT), '', temp_file_url)
def prepare_file_info(self, path, anatomy, sites):
""" Prepare information for one file (asset or resource)
@ -991,11 +1047,6 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
"""
file_hash = openpype.api.source_hash(path)
# todo: Avoid this logic
# Strip the temporary file extension from the file hash
if self.TMP_FILE_EXT and ',{}'.format(self.TMP_FILE_EXT) in file_hash:
file_hash = file_hash.replace(',{}'.format(self.TMP_FILE_EXT), '')
return {
"_id": io.ObjectId(),
"path": self.get_rootless_path(anatomy, path),
@ -1004,6 +1055,7 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
"sites": sites
}
# region sync sites
def compute_resource_sync_sites(self, instance):
"""Get available resource sync sites"""
# Sync server logic
@ -1101,47 +1153,4 @@ class IntegrateAssetNew(pyblish.api.InstancePlugin):
already_attached_sites[meta["name"]] = real_created
return alternative_sites
def handle_destination_files(self, destinations, mode):
""" Clean destination files
Called when error happened during integrating to DB or to disk
OR called to rename uploaded files from temporary name to final to
highlight publishing in progress/broken
Used to clean unwanted files
Arguments:
destinations (list): file paths
mode: 'remove' - clean files,
'finalize' - rename files,
remove TMP_FILE_EXT suffix denoting temp file
"""
if not destinations:
return
for file_url in destinations:
if not os.path.exists(file_url):
self.log.debug(
"File {} was not found.".format(file_url)
)
continue
try:
if mode == 'remove':
self.log.debug("Removing file {}".format(file_url))
os.remove(file_url)
if mode == 'finalize':
new_name = self.get_dest_final_url(file_url)
if os.path.exists(new_name):
self.log.debug("Removing existing "
"file: {}".format(new_name))
os.remove(new_name)
self.log.debug(
"Renaming file {} to {}".format(file_url, new_name)
)
os.rename(file_url, new_name)
except OSError:
self.log.error("Cannot {} file {}".format(mode, file_url),
exc_info=True)
six.reraise(*sys.exc_info())
# endregion