diff --git a/openpype/lib/file_transaction.py b/openpype/lib/file_transaction.py new file mode 100644 index 0000000000..57592e297f --- /dev/null +++ b/openpype/lib/file_transaction.py @@ -0,0 +1,171 @@ +import os +import logging +import sys +import errno +import six + +from openpype.lib import create_hard_link + +# this is needed until speedcopy for linux is fixed +if sys.platform == "win32": + from speedcopy import copyfile +else: + from shutil import copyfile + + +class FileTransaction(object): + """ + + The file transaction is a three step process. + + 1) Rename any existing files to a "temporary backup" during `process()` + 2) Copy the files to final destination during `process()` + 3) Remove any backed up files (*no rollback possible!) during `finalize()` + + Step 3 is done during `finalize()`. If not called the .bak files will + remain on disk. + + These steps try to ensure that we don't overwrite half of any existing + files e.g. if they are currently in use. + + Note: + A regular filesystem is *not* a transactional file system and even + though this implementation tries to produce a 'safe copy' with a + potential rollback do keep in mind that it's inherently unsafe due + to how filesystem works and a myriad of things could happen during + the transaction that break the logic. A file storage could go down, + permissions could be changed, other machines could be moving or writing + files. A lot can happen. + + Warning: + Any folders created during the transfer will not be removed. + + """ + + MODE_COPY = 0 + MODE_HARDLINK = 1 + + def __init__(self, log=None): + + if log is None: + log = logging.getLogger("FileTransaction") + + self.log = log + + # The transfer queue + # todo: make this an actual FIFO queue? + self._transfers = {} + + # Destination file paths that a file was transferred to + self._transferred = [] + + # Backup file location mapping to original locations + self._backup_to_original = {} + + def add(self, src, dst, mode=MODE_COPY): + """Add a new file to transfer queue""" + opts = {"mode": mode} + + src = os.path.normpath(src) + dst = os.path.normpath(dst) + + if dst in self._transfers: + queued_src = self._transfers[dst][0] + if src == queued_src: + self.log.debug("File transfer was already " + "in queue: {} -> {}".format(src, dst)) + return + else: + self.log.warning("File transfer in queue replaced..") + self.log.debug("Removed from queue: " + "{} -> {}".format(queued_src, dst)) + self.log.debug("Added to queue: {} -> {}".format(src, dst)) + + self._transfers[dst] = (src, opts) + + def process(self): + + # Backup any existing files + for dst in self._transfers.keys(): + if os.path.exists(dst): + # Backup original file + # todo: add timestamp or uuid to ensure unique + backup = dst + ".bak" + self._backup_to_original[backup] = dst + self.log.debug("Backup existing file: " + "{} -> {}".format(dst, backup)) + os.rename(dst, backup) + + # Copy the files to transfer + for dst, (src, opts) in self._transfers.items(): + self._create_folder_for_file(dst) + + if opts["mode"] == self.MODE_COPY: + self.log.debug("Copying file ... {} -> {}".format(src, dst)) + copyfile(src, dst) + elif opts["mode"] == self.MODE_HARDLINK: + self.log.debug("Hardlinking file ... {} -> {}".format(src, + dst)) + create_hard_link(src, dst) + + self._transferred.append(dst) + + def finalize(self): + # Delete any backed up files + for backup in self._backup_to_original.keys(): + try: + os.remove(backup) + except OSError: + self.log.error("Failed to remove backup file: " + "{}".format(backup), + exc_info=True) + + def rollback(self): + + errors = 0 + + # Rollback any transferred files + for path in self._transferred: + try: + os.remove(path) + except OSError: + errors += 1 + self.log.error("Failed to rollback created file: " + "{}".format(path), + exc_info=True) + + # Rollback the backups + for backup, original in self._backup_to_original.items(): + try: + os.rename(backup, original) + except OSError: + errors += 1 + self.log.error("Failed to restore original file: " + "{} -> {}".format(backup, original), + exc_info=True) + + if errors: + self.log.error("{} errors occurred during " + "rollback.".format(errors), exc_info=True) + six.reraise(*sys.exc_info()) + + @property + def transferred(self): + """Return the processed transfers destination paths""" + return list(self._transferred) + + @property + def backups(self): + """Return the backup file paths""" + return list(self._backup_to_original.keys()) + + def _create_folder_for_file(self, path): + dirname = os.path.dirname(path) + try: + os.makedirs(dirname) + except OSError as e: + if e.errno == errno.EEXIST: + pass + else: + self.log.critical("An unexpected error occurred.") + six.reraise(*sys.exc_info()) diff --git a/openpype/plugins/publish/integrate_new.py b/openpype/plugins/publish/integrate_new.py index 138a4fcc06..92976e6151 100644 --- a/openpype/plugins/publish/integrate_new.py +++ b/openpype/plugins/publish/integrate_new.py @@ -3,7 +3,6 @@ import logging import sys import copy import clique -import errno import six from bson.objectid import ObjectId @@ -13,19 +12,13 @@ from avalon import io import openpype.api from datetime import datetime from openpype.lib.profiles_filtering import filter_profiles +from openpype.lib.file_transaction import FileTransaction from openpype.lib import ( prepare_template_data, - create_hard_link, StringTemplate, TemplateUnsolved ) -# this is needed until speedcopy for linux is fixed -if sys.platform == "win32": - from speedcopy import copyfile -else: - from shutil import copyfile - log = logging.getLogger(__name__) @@ -40,164 +33,6 @@ def get_first_frame_padded(collection): return get_frame_padded(start_frame, padding=collection.padding) -class FileTransaction(object): - """ - - The file transaction is a three step process. - - 1) Rename any existing files to a "temporary backup" during `process()` - 2) Copy the files to final destination during `process()` - 3) Remove any backed up files (*no rollback possible!) during `finalize()` - - Step 3 is done during `finalize()`. If not called the .bak files will - remain on disk. - - These steps try to ensure that we don't overwrite half of any existing - files e.g. if they are currently in use. - - Note: - A regular filesystem is *not* a transactional file system and even - though this implementation tries to produce a 'safe copy' with a - potential rollback do keep in mind that it's inherently unsafe due - to how filesystem works and a myriad of things could happen during - the transaction that break the logic. A file storage could go down, - permissions could be changed, other machines could be moving or writing - files. A lot can happen. - - Warning: - Any folders created during the transfer will not be removed. - - """ - - MODE_COPY = 0 - MODE_HARDLINK = 1 - - def __init__(self, log=None): - - if log is None: - log = logging.getLogger("FileTransaction") - - self.log = log - - # The transfer queue - # todo: make this an actual FIFO queue? - self._transfers = {} - - # Destination file paths that a file was transferred to - self._transferred = [] - - # Backup file location mapping to original locations - self._backup_to_original = {} - - def add(self, src, dst, mode=MODE_COPY): - """Add a new file to transfer queue""" - opts = {"mode": mode} - - src = os.path.normpath(src) - dst = os.path.normpath(dst) - - if dst in self._transfers: - queued_src = self._transfers[dst][0] - if src == queued_src: - self.log.debug("File transfer was already " - "in queue: {} -> {}".format(src, dst)) - return - else: - self.log.warning("File transfer in queue replaced..") - self.log.debug("Removed from queue: " - "{} -> {}".format(queued_src, dst)) - self.log.debug("Added to queue: {} -> {}".format(src, dst)) - - self._transfers[dst] = (src, opts) - - def process(self): - - # Backup any existing files - for dst in self._transfers.keys(): - if os.path.exists(dst): - # Backup original file - # todo: add timestamp or uuid to ensure unique - backup = dst + ".bak" - self._backup_to_original[backup] = dst - self.log.debug("Backup existing file: " - "{} -> {}".format(dst, backup)) - os.rename(dst, backup) - - # Copy the files to transfer - for dst, (src, opts) in self._transfers.items(): - self._create_folder_for_file(dst) - - if opts["mode"] == self.MODE_COPY: - self.log.debug("Copying file ... {} -> {}".format(src, dst)) - copyfile(src, dst) - elif opts["mode"] == self.MODE_HARDLINK: - self.log.debug("Hardlinking file ... {} -> {}".format(src, - dst)) - create_hard_link(src, dst) - - self._transferred.append(dst) - - def finalize(self): - # Delete any backed up files - for backup in self._backup_to_original.keys(): - try: - os.remove(backup) - except OSError: - self.log.error("Failed to remove backup file: " - "{}".format(backup), - exc_info=True) - - def rollback(self): - - errors = 0 - - # Rollback any transferred files - for path in self._transferred: - try: - os.remove(path) - except OSError: - errors += 1 - self.log.error("Failed to rollback created file: " - "{}".format(path), - exc_info=True) - - # Rollback the backups - for backup, original in self._backup_to_original.items(): - try: - os.rename(backup, original) - except OSError: - errors += 1 - self.log.error("Failed to restore original file: " - "{} -> {}".format(backup, original), - exc_info=True) - - if errors: - self.log.error("{} errors occurred during " - "rollback.".format(errors), exc_info=True) - six.reraise(*sys.exc_info()) - - @property - def transferred(self): - """Return the processed transfers destination paths""" - return list(self._transferred) - - @property - def backups(self): - """Return the backup file paths""" - return list(self._backup_to_original.keys()) - - def _create_folder_for_file(self, path): - dirname = os.path.dirname(path) - try: - os.makedirs(dirname) - except OSError as e: - if e.errno == errno.EEXIST: - pass - else: - self.log.critical("An unexpected error occurred.") - six.reraise(*sys.exc_info()) - - class IntegrateAssetNew(pyblish.api.InstancePlugin): """Resolve any dependency issues