diff --git a/client/ayon_core/plugins/load/delete_old_versions.py b/client/ayon_core/plugins/load/delete_old_versions.py deleted file mode 100644 index 3a42ccba7e..0000000000 --- a/client/ayon_core/plugins/load/delete_old_versions.py +++ /dev/null @@ -1,477 +0,0 @@ -import collections -import os -import uuid -from typing import List, Dict, Any - -import clique -import ayon_api -from ayon_api.operations import OperationsSession -import qargparse -from qtpy import QtWidgets, QtCore - -from ayon_core import style -from ayon_core.lib import format_file_size -from ayon_core.pipeline import load, Anatomy -from ayon_core.pipeline.load import ( - get_representation_path_with_anatomy, - InvalidRepresentationContext, -) - - -class DeleteOldVersions(load.ProductLoaderPlugin): - """Deletes specific number of old version""" - - is_multiple_contexts_compatible = True - sequence_splitter = "__sequence_splitter__" - - representations = ["*"] - product_types = {"*"} - tool_names = ["library_loader"] - - label = "Delete Old Versions" - order = 35 - icon = "trash" - color = "#d8d8d8" - - options = [ - qargparse.Integer( - "versions_to_keep", default=2, min=0, help="Versions to keep:" - ), - qargparse.Boolean( - "remove_publish_folder", help="Remove publish folder:" - ) - ] - - requires_confirmation = True - - def delete_whole_dir_paths(self, dir_paths, delete=True): - size = 0 - - for dir_path in dir_paths: - # Delete all files and folders in dir path - for root, dirs, files in os.walk(dir_path, topdown=False): - for name in files: - file_path = os.path.join(root, name) - size += os.path.getsize(file_path) - if delete: - os.remove(file_path) - self.log.debug("Removed file: {}".format(file_path)) - - for name in dirs: - if delete: - os.rmdir(os.path.join(root, name)) - - if not delete: - continue - - # Delete even the folder and it's parents folders if they are empty - while True: - if not os.path.exists(dir_path): - dir_path = os.path.dirname(dir_path) - continue - - if len(os.listdir(dir_path)) != 0: - break - - os.rmdir(os.path.join(dir_path)) - - return size - - def path_from_representation(self, representation, anatomy): - try: - context = representation["context"] - except KeyError: - return (None, None) - - try: - path = get_representation_path_with_anatomy( - representation, anatomy - ) - except InvalidRepresentationContext: - return (None, None) - - sequence_path = None - if "frame" in context: - context["frame"] = self.sequence_splitter - sequence_path = get_representation_path_with_anatomy( - representation, anatomy - ) - - if sequence_path: - sequence_path = sequence_path.normalized() - - return (path.normalized(), sequence_path) - - def delete_only_repre_files(self, dir_paths, file_paths, delete=True): - size = 0 - - for dir_id, dir_path in dir_paths.items(): - dir_files = os.listdir(dir_path) - collections, remainders = clique.assemble(dir_files) - for file_path, seq_path in file_paths[dir_id]: - file_path_base = os.path.split(file_path)[1] - # Just remove file if `frame` key was not in context or - # filled path is in remainders (single file sequence) - if not seq_path or file_path_base in remainders: - if not os.path.exists(file_path): - self.log.debug( - "File was not found: {}".format(file_path) - ) - continue - - size += os.path.getsize(file_path) - - if delete: - os.remove(file_path) - self.log.debug("Removed file: {}".format(file_path)) - - if file_path_base in remainders: - remainders.remove(file_path_base) - continue - - seq_path_base = os.path.split(seq_path)[1] - head, tail = seq_path_base.split(self.sequence_splitter) - - final_col = None - for collection in collections: - if head != collection.head or tail != collection.tail: - continue - final_col = collection - break - - if final_col is not None: - # Fill full path to head - final_col.head = os.path.join(dir_path, final_col.head) - for _file_path in final_col: - if os.path.exists(_file_path): - - size += os.path.getsize(_file_path) - - if delete: - os.remove(_file_path) - self.log.debug( - "Removed file: {}".format(_file_path) - ) - - _seq_path = final_col.format("{head}{padding}{tail}") - self.log.debug("Removed files: {}".format(_seq_path)) - collections.remove(final_col) - - elif os.path.exists(file_path): - size += os.path.getsize(file_path) - - if delete: - os.remove(file_path) - self.log.debug("Removed file: {}".format(file_path)) - else: - self.log.debug( - "File was not found: {}".format(file_path) - ) - - # Delete as much as possible parent folders - if not delete: - return size - - for dir_path in dir_paths.values(): - while True: - if not os.path.exists(dir_path): - dir_path = os.path.dirname(dir_path) - continue - - if len(os.listdir(dir_path)) != 0: - break - - self.log.debug("Removed folder: {}".format(dir_path)) - os.rmdir(dir_path) - - return size - - def message(self, text): - msgBox = QtWidgets.QMessageBox() - msgBox.setText(text) - msgBox.setStyleSheet(style.load_stylesheet()) - msgBox.setWindowFlags( - msgBox.windowFlags() | QtCore.Qt.FramelessWindowHint - ) - msgBox.exec_() - - def _confirm_delete(self, - contexts: List[Dict[str, Any]], - versions_to_keep: int) -> bool: - """Prompt user for a deletion confirmation""" - - contexts_list = "\n".join(sorted( - "- {folder[name]} > {product[name]}".format_map(context) - for context in contexts - )) - num_contexts = len(contexts) - s = "s" if num_contexts > 1 else "" - text = ( - "Are you sure you want to delete versions?\n\n" - f"This will keep only the last {versions_to_keep} " - f"versions for the {num_contexts} selected product{s}." - ) - informative_text = "Warning: This will delete files from disk" - detailed_text = ( - f"Keep only {versions_to_keep} versions for:\n{contexts_list}" - ) - - messagebox = QtWidgets.QMessageBox() - messagebox.setIcon(QtWidgets.QMessageBox.Warning) - messagebox.setWindowTitle("Delete Old Versions") - messagebox.setText(text) - messagebox.setInformativeText(informative_text) - messagebox.setDetailedText(detailed_text) - messagebox.setStandardButtons( - QtWidgets.QMessageBox.Yes - | QtWidgets.QMessageBox.Cancel - ) - messagebox.setDefaultButton(QtWidgets.QMessageBox.Cancel) - messagebox.setStyleSheet(style.load_stylesheet()) - messagebox.setAttribute(QtCore.Qt.WA_DeleteOnClose, True) - return messagebox.exec_() == QtWidgets.QMessageBox.Yes - - def get_data(self, context, versions_count): - product_entity = context["product"] - folder_entity = context["folder"] - project_name = context["project"]["name"] - anatomy = Anatomy(project_name, project_entity=context["project"]) - - version_fields = ayon_api.get_default_fields_for_type("version") - version_fields.add("tags") - versions = list(ayon_api.get_versions( - project_name, - product_ids=[product_entity["id"]], - active=None, - hero=False, - fields=version_fields - )) - self.log.debug( - "Version Number ({})".format(len(versions)) - ) - versions_by_parent = collections.defaultdict(list) - for ent in versions: - versions_by_parent[ent["productId"]].append(ent) - - def sort_func(ent): - return int(ent["version"]) - - all_last_versions = [] - for _parent_id, _versions in versions_by_parent.items(): - for idx, version in enumerate( - sorted(_versions, key=sort_func, reverse=True) - ): - if idx >= versions_count: - break - all_last_versions.append(version) - - self.log.debug("Collected versions ({})".format(len(versions))) - - # Filter latest versions - for version in all_last_versions: - versions.remove(version) - - # Update versions_by_parent without filtered versions - versions_by_parent = collections.defaultdict(list) - for ent in versions: - versions_by_parent[ent["productId"]].append(ent) - - # Filter already deleted versions - versions_to_pop = [] - for version in versions: - if "deleted" in version["tags"]: - versions_to_pop.append(version) - - for version in versions_to_pop: - msg = "Folder: \"{}\" | Product: \"{}\" | Version: \"{}\"".format( - folder_entity["path"], - product_entity["name"], - version["version"] - ) - self.log.debug(( - "Skipping version. Already tagged as inactive. < {} >" - ).format(msg)) - versions.remove(version) - - version_ids = [ent["id"] for ent in versions] - - self.log.debug( - "Filtered versions to delete ({})".format(len(version_ids)) - ) - - if not version_ids: - msg = "Skipping processing. Nothing to delete on {}/{}".format( - folder_entity["path"], product_entity["name"] - ) - self.log.info(msg) - print(msg) - return - - repres = list(ayon_api.get_representations( - project_name, version_ids=version_ids - )) - - self.log.debug( - "Collected representations to remove ({})".format(len(repres)) - ) - - dir_paths = {} - file_paths_by_dir = collections.defaultdict(list) - for repre in repres: - file_path, seq_path = self.path_from_representation( - repre, anatomy - ) - if file_path is None: - self.log.debug(( - "Could not format path for represenation \"{}\"" - ).format(str(repre))) - continue - - dir_path = os.path.dirname(file_path) - dir_id = None - for _dir_id, _dir_path in dir_paths.items(): - if _dir_path == dir_path: - dir_id = _dir_id - break - - if dir_id is None: - dir_id = uuid.uuid4() - dir_paths[dir_id] = dir_path - - file_paths_by_dir[dir_id].append([file_path, seq_path]) - - dir_ids_to_pop = [] - for dir_id, dir_path in dir_paths.items(): - if os.path.exists(dir_path): - continue - - dir_ids_to_pop.append(dir_id) - - # Pop dirs from both dictionaries - for dir_id in dir_ids_to_pop: - dir_paths.pop(dir_id) - paths = file_paths_by_dir.pop(dir_id) - # TODO report of missing directories? - paths_msg = ", ".join([ - "'{}'".format(path[0].replace("\\", "/")) for path in paths - ]) - self.log.debug(( - "Folder does not exist. Deleting its files skipped: {}" - ).format(paths_msg)) - - return { - "dir_paths": dir_paths, - "file_paths_by_dir": file_paths_by_dir, - "versions": versions, - "folder": folder_entity, - "product": product_entity, - "archive_product": versions_count == 0 - } - - def main(self, project_name, data, remove_publish_folder): - # Size of files. - size = 0 - if not data: - return size - - if remove_publish_folder: - size = self.delete_whole_dir_paths(data["dir_paths"].values()) - else: - size = self.delete_only_repre_files( - data["dir_paths"], data["file_paths_by_dir"] - ) - - op_session = OperationsSession() - for version in data["versions"]: - orig_version_tags = version["tags"] - version_tags = list(orig_version_tags) - changes = {} - if "deleted" not in version_tags: - version_tags.append("deleted") - changes["tags"] = version_tags - - if version["active"]: - changes["active"] = False - - if not changes: - continue - op_session.update_entity( - project_name, "version", version["id"], changes - ) - - op_session.commit() - - return size - - def load(self, contexts, name=None, namespace=None, options=None): - - # Get user options - versions_to_keep = 2 - remove_publish_folder = False - if options: - versions_to_keep = options.get( - "versions_to_keep", versions_to_keep - ) - remove_publish_folder = options.get( - "remove_publish_folder", remove_publish_folder - ) - - # Because we do not want this run by accident we will add an extra - # user confirmation - if ( - self.requires_confirmation - and not self._confirm_delete(contexts, versions_to_keep) - ): - return - - try: - size = 0 - for count, context in enumerate(contexts): - data = self.get_data(context, versions_to_keep) - if not data: - continue - project_name = context["project"]["name"] - size += self.main(project_name, data, remove_publish_folder) - print("Progressing {}/{}".format(count + 1, len(contexts))) - - msg = "Total size of files: {}".format(format_file_size(size)) - self.log.info(msg) - self.message(msg) - - except Exception: - self.log.error("Failed to delete versions.", exc_info=True) - - -class CalculateOldVersions(DeleteOldVersions): - """Calculate file size of old versions""" - label = "Calculate Old Versions" - order = 30 - tool_names = ["library_loader"] - - options = [ - qargparse.Integer( - "versions_to_keep", default=2, min=0, help="Versions to keep:" - ), - qargparse.Boolean( - "remove_publish_folder", help="Remove publish folder:" - ) - ] - - requires_confirmation = False - - def main(self, project_name, data, remove_publish_folder): - size = 0 - - if not data: - return size - - if remove_publish_folder: - size = self.delete_whole_dir_paths( - data["dir_paths"].values(), delete=False - ) - else: - size = self.delete_only_repre_files( - data["dir_paths"], data["file_paths_by_dir"], delete=False - ) - - return size diff --git a/client/ayon_core/plugins/loader/delete_old_versions.py b/client/ayon_core/plugins/loader/delete_old_versions.py new file mode 100644 index 0000000000..31b0ff4bdf --- /dev/null +++ b/client/ayon_core/plugins/loader/delete_old_versions.py @@ -0,0 +1,393 @@ +from __future__ import annotations + +import os +import collections +import json +import shutil +from typing import Optional, Any + +import clique +from ayon_api.operations import OperationsSession +from qtpy import QtWidgets, QtCore + +from ayon_core import style +from ayon_core.lib import ( + format_file_size, + AbstractAttrDef, + NumberDef, + BoolDef, + TextDef, + UILabelDef, +) +from ayon_core.pipeline import Anatomy +from ayon_core.pipeline.actions import ( + LoaderSelectedType, + LoaderActionPlugin, + LoaderActionItem, + LoaderActionSelection, + LoaderActionResult, + LoaderActionForm, +) + + +class DeleteOldVersions(LoaderActionPlugin): + """Deletes specific number of old version""" + + is_multiple_contexts_compatible = True + sequence_splitter = "__sequence_splitter__" + + requires_confirmation = True + + def get_action_items( + self, selection: LoaderActionSelection + ) -> list[LoaderActionItem]: + # Do not show in hosts + if self.host_name is not None: + return [] + + versions = None + if selection.selected_type == LoaderSelectedType.version: + versions = selection.entities.get_versions( + selection.selected_ids + ) + + if not versions: + return [] + + product_ids = { + version["productId"] + for version in versions + } + + return [ + LoaderActionItem( + identifier="delete-versions", + label="Delete Versions", + order=35, + entity_ids=product_ids, + entity_type="product", + icon={ + "type": "material-symbols", + "name": "delete", + "color": "#d8d8d8", + } + ), + LoaderActionItem( + identifier="calculate-versions-size", + label="Calculate Versions size", + order=30, + entity_ids=product_ids, + entity_type="product", + icon={ + "type": "material-symbols", + "name": "auto_delete", + "color": "#d8d8d8", + } + ) + ] + + def execute_action( + self, + identifier: str, + entity_ids: set[str], + entity_type: str, + selection: LoaderActionSelection, + form_values: dict[str, Any], + ) -> Optional[LoaderActionResult]: + step = form_values.get("step") + versions_to_keep = form_values.get("versions_to_keep") + remove_publish_folder = form_values.get("remove_publish_folder") + if step is None: + return self._first_step( + identifier, + versions_to_keep, + remove_publish_folder, + ) + + if versions_to_keep is None: + versions_to_keep = 2 + if remove_publish_folder is None: + remove_publish_folder = False + + if step == "prepare-data": + return self._prepare_data_step( + identifier, + versions_to_keep, + remove_publish_folder, + entity_ids, + selection, + ) + + if step == "delete-versions": + return self._delete_versions_step( + selection.project_name, form_values + ) + return None + + def _first_step( + self, + identifier: str, + versions_to_keep: Optional[int], + remove_publish_folder: Optional[bool], + ) -> LoaderActionResult: + fields: list[AbstractAttrDef] = [ + TextDef( + "step", + visible=False, + ), + NumberDef( + "versions_to_keep", + label="Versions to keep", + minimum=0, + default=2, + ), + ] + if identifier == "delete-versions": + fields.append( + BoolDef( + "remove_publish_folder", + label="Remove publish folder", + default=False, + ) + ) + + form_values = { + key: value + for key, value in ( + ("remove_publish_folder", remove_publish_folder), + ("versions_to_keep", versions_to_keep), + ) + if value is not None + } + form_values["step"] = "prepare-data" + return LoaderActionResult( + form=LoaderActionForm( + title="Delete Old Versions", + fields=fields, + ), + form_values=form_values + ) + + def _prepare_data_step( + self, + identifier: str, + versions_to_keep: int, + remove_publish_folder: bool, + entity_ids: set[str], + selection: LoaderActionSelection, + ): + versions_by_product_id = collections.defaultdict(list) + for version in selection.entities.get_products_versions(entity_ids): + # Keep hero version + if versions_to_keep != 0 and version["version"] < 0: + continue + versions_by_product_id[version["productId"]].append(version) + + versions_to_delete = [] + for product_id, versions in versions_by_product_id.items(): + if versions_to_keep == 0: + versions_to_delete.extend(versions) + continue + + if len(versions) <= versions_to_keep: + continue + + versions.sort(key=lambda v: v["version"]) + for _ in range(versions_to_keep): + if not versions: + break + versions.pop(-1) + versions_to_delete.extend(versions) + + self.log.debug( + f"Collected versions to delete ({len(versions_to_delete)})" + ) + + version_ids = { + version["id"] + for version in versions_to_delete + } + if not version_ids: + return LoaderActionResult( + message="Skipping. Nothing to delete.", + success=False, + ) + + project = selection.entities.get_project() + anatomy = Anatomy(project["name"], project_entity=project) + + repres = selection.entities.get_versions_representations(version_ids) + + self.log.debug( + f"Collected representations to remove ({len(repres)})" + ) + + filepaths_by_repre_id = {} + repre_ids_by_version_id = { + version_id: [] + for version_id in version_ids + } + for repre in repres: + repre_ids_by_version_id[repre["versionId"]].append(repre["id"]) + filepaths_by_repre_id[repre["id"]] = [ + anatomy.fill_root(repre_file["path"]) + for repre_file in repre["files"] + ] + + size = 0 + for filepaths in filepaths_by_repre_id.values(): + for filepath in filepaths: + if os.path.exists(filepath): + size += os.path.getsize(filepath) + + if identifier == "calculate-versions-size": + return LoaderActionResult( + message="Calculated size", + success=True, + form=LoaderActionForm( + title="Calculated versions size", + fields=[ + UILabelDef( + f"Total size of files: {format_file_size(size)}" + ), + ], + submit_label=None, + cancel_label="Close", + ), + ) + + form, form_values = self._get_delete_form( + size, + remove_publish_folder, + list(version_ids), + repre_ids_by_version_id, + filepaths_by_repre_id, + ) + return LoaderActionResult( + form=form, + form_values=form_values + ) + + def _delete_versions_step( + self, project_name: str, form_values: dict[str, Any] + ) -> LoaderActionResult: + delete_data = json.loads(form_values["delete_data"]) + remove_publish_folder = form_values["remove_publish_folder"] + if form_values["delete_value"].lower() != "delete": + size = delete_data["size"] + form, form_values = self._get_delete_form( + size, + remove_publish_folder, + delete_data["version_ids"], + delete_data["repre_ids_by_version_id"], + delete_data["filepaths_by_repre_id"], + True, + ) + return LoaderActionResult( + form=form, + form_values=form_values, + ) + + version_ids = delete_data["version_ids"] + repre_ids_by_version_id = delete_data["repre_ids_by_version_id"] + filepaths_by_repre_id = delete_data["filepaths_by_repre_id"] + op_session = OperationsSession() + total_versions = len(version_ids) + try: + for version_idx, version_id in enumerate(version_ids): + self.log.info( + f"Progressing version {version_idx + 1}/{total_versions}" + ) + for repre_id in repre_ids_by_version_id[version_id]: + for filepath in filepaths_by_repre_id[repre_id]: + publish_folder = os.path.dirname(filepath) + if remove_publish_folder: + if os.path.exists(publish_folder): + shutil.rmtree(publish_folder, ignore_errors=True) + continue + + if os.path.exists(filepath): + os.remove(filepath) + + op_session.delete_entity( + project_name, "representation", repre_id + ) + op_session.delete_entity( + project_name, "version", version_id + ) + self.log.info("All done") + + except Exception: + self.log.error("Failed to delete versions.", exc_info=True) + return LoaderActionResult( + message="Failed to delete versions.", + success=False, + ) + + finally: + op_session.commit() + + return LoaderActionResult( + message="Deleted versions", + success=True, + ) + + def _get_delete_form( + self, + size: int, + remove_publish_folder: bool, + version_ids: list[str], + repre_ids_by_version_id: dict[str, list[str]], + filepaths_by_repre_id: dict[str, list[str]], + repeated: bool = False, + ) -> tuple[LoaderActionForm, dict[str, Any]]: + versions_len = len(repre_ids_by_version_id) + fields = [ + UILabelDef( + f"Going to delete {versions_len} versions
" + f"- total size of files: {format_file_size(size)}
" + ), + UILabelDef("Are you sure you want to continue?"), + TextDef( + "delete_value", + placeholder="Type 'delete' to confirm...", + ), + ] + if repeated: + fields.append(UILabelDef( + "*Please fill in '**delete**' to confirm deletion.*" + )) + fields.extend([ + TextDef( + "delete_data", + visible=False, + ), + TextDef( + "step", + visible=False, + ), + BoolDef( + "remove_publish_folder", + label="Remove publish folder", + default=False, + visible=False, + ) + ]) + + form = LoaderActionForm( + title="Delete versions", + submit_label="Delete", + cancel_label="Close", + fields=fields, + ) + form_values = { + "delete_data": json.dumps({ + "size": size, + "version_ids": version_ids, + "repre_ids_by_version_id": repre_ids_by_version_id, + "filepaths_by_repre_id": filepaths_by_repre_id, + }), + "step": "delete-versions", + "remove_publish_folder": remove_publish_folder, + } + return form, form_values