convert delete old versions actions

This commit is contained in:
Jakub Trllo 2025-08-25 12:44:16 +02:00
parent c2cdd4130e
commit 270d7cbff9
2 changed files with 393 additions and 477 deletions

View file

@ -1,477 +0,0 @@
import collections
import os
import uuid
from typing import List, Dict, Any
import clique
import ayon_api
from ayon_api.operations import OperationsSession
import qargparse
from qtpy import QtWidgets, QtCore
from ayon_core import style
from ayon_core.lib import format_file_size
from ayon_core.pipeline import load, Anatomy
from ayon_core.pipeline.load import (
get_representation_path_with_anatomy,
InvalidRepresentationContext,
)
class DeleteOldVersions(load.ProductLoaderPlugin):
"""Deletes specific number of old version"""
is_multiple_contexts_compatible = True
sequence_splitter = "__sequence_splitter__"
representations = ["*"]
product_types = {"*"}
tool_names = ["library_loader"]
label = "Delete Old Versions"
order = 35
icon = "trash"
color = "#d8d8d8"
options = [
qargparse.Integer(
"versions_to_keep", default=2, min=0, help="Versions to keep:"
),
qargparse.Boolean(
"remove_publish_folder", help="Remove publish folder:"
)
]
requires_confirmation = True
def delete_whole_dir_paths(self, dir_paths, delete=True):
size = 0
for dir_path in dir_paths:
# Delete all files and folders in dir path
for root, dirs, files in os.walk(dir_path, topdown=False):
for name in files:
file_path = os.path.join(root, name)
size += os.path.getsize(file_path)
if delete:
os.remove(file_path)
self.log.debug("Removed file: {}".format(file_path))
for name in dirs:
if delete:
os.rmdir(os.path.join(root, name))
if not delete:
continue
# Delete even the folder and it's parents folders if they are empty
while True:
if not os.path.exists(dir_path):
dir_path = os.path.dirname(dir_path)
continue
if len(os.listdir(dir_path)) != 0:
break
os.rmdir(os.path.join(dir_path))
return size
def path_from_representation(self, representation, anatomy):
try:
context = representation["context"]
except KeyError:
return (None, None)
try:
path = get_representation_path_with_anatomy(
representation, anatomy
)
except InvalidRepresentationContext:
return (None, None)
sequence_path = None
if "frame" in context:
context["frame"] = self.sequence_splitter
sequence_path = get_representation_path_with_anatomy(
representation, anatomy
)
if sequence_path:
sequence_path = sequence_path.normalized()
return (path.normalized(), sequence_path)
def delete_only_repre_files(self, dir_paths, file_paths, delete=True):
size = 0
for dir_id, dir_path in dir_paths.items():
dir_files = os.listdir(dir_path)
collections, remainders = clique.assemble(dir_files)
for file_path, seq_path in file_paths[dir_id]:
file_path_base = os.path.split(file_path)[1]
# Just remove file if `frame` key was not in context or
# filled path is in remainders (single file sequence)
if not seq_path or file_path_base in remainders:
if not os.path.exists(file_path):
self.log.debug(
"File was not found: {}".format(file_path)
)
continue
size += os.path.getsize(file_path)
if delete:
os.remove(file_path)
self.log.debug("Removed file: {}".format(file_path))
if file_path_base in remainders:
remainders.remove(file_path_base)
continue
seq_path_base = os.path.split(seq_path)[1]
head, tail = seq_path_base.split(self.sequence_splitter)
final_col = None
for collection in collections:
if head != collection.head or tail != collection.tail:
continue
final_col = collection
break
if final_col is not None:
# Fill full path to head
final_col.head = os.path.join(dir_path, final_col.head)
for _file_path in final_col:
if os.path.exists(_file_path):
size += os.path.getsize(_file_path)
if delete:
os.remove(_file_path)
self.log.debug(
"Removed file: {}".format(_file_path)
)
_seq_path = final_col.format("{head}{padding}{tail}")
self.log.debug("Removed files: {}".format(_seq_path))
collections.remove(final_col)
elif os.path.exists(file_path):
size += os.path.getsize(file_path)
if delete:
os.remove(file_path)
self.log.debug("Removed file: {}".format(file_path))
else:
self.log.debug(
"File was not found: {}".format(file_path)
)
# Delete as much as possible parent folders
if not delete:
return size
for dir_path in dir_paths.values():
while True:
if not os.path.exists(dir_path):
dir_path = os.path.dirname(dir_path)
continue
if len(os.listdir(dir_path)) != 0:
break
self.log.debug("Removed folder: {}".format(dir_path))
os.rmdir(dir_path)
return size
def message(self, text):
msgBox = QtWidgets.QMessageBox()
msgBox.setText(text)
msgBox.setStyleSheet(style.load_stylesheet())
msgBox.setWindowFlags(
msgBox.windowFlags() | QtCore.Qt.FramelessWindowHint
)
msgBox.exec_()
def _confirm_delete(self,
contexts: List[Dict[str, Any]],
versions_to_keep: int) -> bool:
"""Prompt user for a deletion confirmation"""
contexts_list = "\n".join(sorted(
"- {folder[name]} > {product[name]}".format_map(context)
for context in contexts
))
num_contexts = len(contexts)
s = "s" if num_contexts > 1 else ""
text = (
"Are you sure you want to delete versions?\n\n"
f"This will keep only the last {versions_to_keep} "
f"versions for the {num_contexts} selected product{s}."
)
informative_text = "Warning: This will delete files from disk"
detailed_text = (
f"Keep only {versions_to_keep} versions for:\n{contexts_list}"
)
messagebox = QtWidgets.QMessageBox()
messagebox.setIcon(QtWidgets.QMessageBox.Warning)
messagebox.setWindowTitle("Delete Old Versions")
messagebox.setText(text)
messagebox.setInformativeText(informative_text)
messagebox.setDetailedText(detailed_text)
messagebox.setStandardButtons(
QtWidgets.QMessageBox.Yes
| QtWidgets.QMessageBox.Cancel
)
messagebox.setDefaultButton(QtWidgets.QMessageBox.Cancel)
messagebox.setStyleSheet(style.load_stylesheet())
messagebox.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
return messagebox.exec_() == QtWidgets.QMessageBox.Yes
def get_data(self, context, versions_count):
product_entity = context["product"]
folder_entity = context["folder"]
project_name = context["project"]["name"]
anatomy = Anatomy(project_name, project_entity=context["project"])
version_fields = ayon_api.get_default_fields_for_type("version")
version_fields.add("tags")
versions = list(ayon_api.get_versions(
project_name,
product_ids=[product_entity["id"]],
active=None,
hero=False,
fields=version_fields
))
self.log.debug(
"Version Number ({})".format(len(versions))
)
versions_by_parent = collections.defaultdict(list)
for ent in versions:
versions_by_parent[ent["productId"]].append(ent)
def sort_func(ent):
return int(ent["version"])
all_last_versions = []
for _parent_id, _versions in versions_by_parent.items():
for idx, version in enumerate(
sorted(_versions, key=sort_func, reverse=True)
):
if idx >= versions_count:
break
all_last_versions.append(version)
self.log.debug("Collected versions ({})".format(len(versions)))
# Filter latest versions
for version in all_last_versions:
versions.remove(version)
# Update versions_by_parent without filtered versions
versions_by_parent = collections.defaultdict(list)
for ent in versions:
versions_by_parent[ent["productId"]].append(ent)
# Filter already deleted versions
versions_to_pop = []
for version in versions:
if "deleted" in version["tags"]:
versions_to_pop.append(version)
for version in versions_to_pop:
msg = "Folder: \"{}\" | Product: \"{}\" | Version: \"{}\"".format(
folder_entity["path"],
product_entity["name"],
version["version"]
)
self.log.debug((
"Skipping version. Already tagged as inactive. < {} >"
).format(msg))
versions.remove(version)
version_ids = [ent["id"] for ent in versions]
self.log.debug(
"Filtered versions to delete ({})".format(len(version_ids))
)
if not version_ids:
msg = "Skipping processing. Nothing to delete on {}/{}".format(
folder_entity["path"], product_entity["name"]
)
self.log.info(msg)
print(msg)
return
repres = list(ayon_api.get_representations(
project_name, version_ids=version_ids
))
self.log.debug(
"Collected representations to remove ({})".format(len(repres))
)
dir_paths = {}
file_paths_by_dir = collections.defaultdict(list)
for repre in repres:
file_path, seq_path = self.path_from_representation(
repre, anatomy
)
if file_path is None:
self.log.debug((
"Could not format path for represenation \"{}\""
).format(str(repre)))
continue
dir_path = os.path.dirname(file_path)
dir_id = None
for _dir_id, _dir_path in dir_paths.items():
if _dir_path == dir_path:
dir_id = _dir_id
break
if dir_id is None:
dir_id = uuid.uuid4()
dir_paths[dir_id] = dir_path
file_paths_by_dir[dir_id].append([file_path, seq_path])
dir_ids_to_pop = []
for dir_id, dir_path in dir_paths.items():
if os.path.exists(dir_path):
continue
dir_ids_to_pop.append(dir_id)
# Pop dirs from both dictionaries
for dir_id in dir_ids_to_pop:
dir_paths.pop(dir_id)
paths = file_paths_by_dir.pop(dir_id)
# TODO report of missing directories?
paths_msg = ", ".join([
"'{}'".format(path[0].replace("\\", "/")) for path in paths
])
self.log.debug((
"Folder does not exist. Deleting its files skipped: {}"
).format(paths_msg))
return {
"dir_paths": dir_paths,
"file_paths_by_dir": file_paths_by_dir,
"versions": versions,
"folder": folder_entity,
"product": product_entity,
"archive_product": versions_count == 0
}
def main(self, project_name, data, remove_publish_folder):
# Size of files.
size = 0
if not data:
return size
if remove_publish_folder:
size = self.delete_whole_dir_paths(data["dir_paths"].values())
else:
size = self.delete_only_repre_files(
data["dir_paths"], data["file_paths_by_dir"]
)
op_session = OperationsSession()
for version in data["versions"]:
orig_version_tags = version["tags"]
version_tags = list(orig_version_tags)
changes = {}
if "deleted" not in version_tags:
version_tags.append("deleted")
changes["tags"] = version_tags
if version["active"]:
changes["active"] = False
if not changes:
continue
op_session.update_entity(
project_name, "version", version["id"], changes
)
op_session.commit()
return size
def load(self, contexts, name=None, namespace=None, options=None):
# Get user options
versions_to_keep = 2
remove_publish_folder = False
if options:
versions_to_keep = options.get(
"versions_to_keep", versions_to_keep
)
remove_publish_folder = options.get(
"remove_publish_folder", remove_publish_folder
)
# Because we do not want this run by accident we will add an extra
# user confirmation
if (
self.requires_confirmation
and not self._confirm_delete(contexts, versions_to_keep)
):
return
try:
size = 0
for count, context in enumerate(contexts):
data = self.get_data(context, versions_to_keep)
if not data:
continue
project_name = context["project"]["name"]
size += self.main(project_name, data, remove_publish_folder)
print("Progressing {}/{}".format(count + 1, len(contexts)))
msg = "Total size of files: {}".format(format_file_size(size))
self.log.info(msg)
self.message(msg)
except Exception:
self.log.error("Failed to delete versions.", exc_info=True)
class CalculateOldVersions(DeleteOldVersions):
"""Calculate file size of old versions"""
label = "Calculate Old Versions"
order = 30
tool_names = ["library_loader"]
options = [
qargparse.Integer(
"versions_to_keep", default=2, min=0, help="Versions to keep:"
),
qargparse.Boolean(
"remove_publish_folder", help="Remove publish folder:"
)
]
requires_confirmation = False
def main(self, project_name, data, remove_publish_folder):
size = 0
if not data:
return size
if remove_publish_folder:
size = self.delete_whole_dir_paths(
data["dir_paths"].values(), delete=False
)
else:
size = self.delete_only_repre_files(
data["dir_paths"], data["file_paths_by_dir"], delete=False
)
return size

View file

@ -0,0 +1,393 @@
from __future__ import annotations
import os
import collections
import json
import shutil
from typing import Optional, Any
import clique
from ayon_api.operations import OperationsSession
from qtpy import QtWidgets, QtCore
from ayon_core import style
from ayon_core.lib import (
format_file_size,
AbstractAttrDef,
NumberDef,
BoolDef,
TextDef,
UILabelDef,
)
from ayon_core.pipeline import Anatomy
from ayon_core.pipeline.actions import (
LoaderSelectedType,
LoaderActionPlugin,
LoaderActionItem,
LoaderActionSelection,
LoaderActionResult,
LoaderActionForm,
)
class DeleteOldVersions(LoaderActionPlugin):
"""Deletes specific number of old version"""
is_multiple_contexts_compatible = True
sequence_splitter = "__sequence_splitter__"
requires_confirmation = True
def get_action_items(
self, selection: LoaderActionSelection
) -> list[LoaderActionItem]:
# Do not show in hosts
if self.host_name is not None:
return []
versions = None
if selection.selected_type == LoaderSelectedType.version:
versions = selection.entities.get_versions(
selection.selected_ids
)
if not versions:
return []
product_ids = {
version["productId"]
for version in versions
}
return [
LoaderActionItem(
identifier="delete-versions",
label="Delete Versions",
order=35,
entity_ids=product_ids,
entity_type="product",
icon={
"type": "material-symbols",
"name": "delete",
"color": "#d8d8d8",
}
),
LoaderActionItem(
identifier="calculate-versions-size",
label="Calculate Versions size",
order=30,
entity_ids=product_ids,
entity_type="product",
icon={
"type": "material-symbols",
"name": "auto_delete",
"color": "#d8d8d8",
}
)
]
def execute_action(
self,
identifier: str,
entity_ids: set[str],
entity_type: str,
selection: LoaderActionSelection,
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
step = form_values.get("step")
versions_to_keep = form_values.get("versions_to_keep")
remove_publish_folder = form_values.get("remove_publish_folder")
if step is None:
return self._first_step(
identifier,
versions_to_keep,
remove_publish_folder,
)
if versions_to_keep is None:
versions_to_keep = 2
if remove_publish_folder is None:
remove_publish_folder = False
if step == "prepare-data":
return self._prepare_data_step(
identifier,
versions_to_keep,
remove_publish_folder,
entity_ids,
selection,
)
if step == "delete-versions":
return self._delete_versions_step(
selection.project_name, form_values
)
return None
def _first_step(
self,
identifier: str,
versions_to_keep: Optional[int],
remove_publish_folder: Optional[bool],
) -> LoaderActionResult:
fields: list[AbstractAttrDef] = [
TextDef(
"step",
visible=False,
),
NumberDef(
"versions_to_keep",
label="Versions to keep",
minimum=0,
default=2,
),
]
if identifier == "delete-versions":
fields.append(
BoolDef(
"remove_publish_folder",
label="Remove publish folder",
default=False,
)
)
form_values = {
key: value
for key, value in (
("remove_publish_folder", remove_publish_folder),
("versions_to_keep", versions_to_keep),
)
if value is not None
}
form_values["step"] = "prepare-data"
return LoaderActionResult(
form=LoaderActionForm(
title="Delete Old Versions",
fields=fields,
),
form_values=form_values
)
def _prepare_data_step(
self,
identifier: str,
versions_to_keep: int,
remove_publish_folder: bool,
entity_ids: set[str],
selection: LoaderActionSelection,
):
versions_by_product_id = collections.defaultdict(list)
for version in selection.entities.get_products_versions(entity_ids):
# Keep hero version
if versions_to_keep != 0 and version["version"] < 0:
continue
versions_by_product_id[version["productId"]].append(version)
versions_to_delete = []
for product_id, versions in versions_by_product_id.items():
if versions_to_keep == 0:
versions_to_delete.extend(versions)
continue
if len(versions) <= versions_to_keep:
continue
versions.sort(key=lambda v: v["version"])
for _ in range(versions_to_keep):
if not versions:
break
versions.pop(-1)
versions_to_delete.extend(versions)
self.log.debug(
f"Collected versions to delete ({len(versions_to_delete)})"
)
version_ids = {
version["id"]
for version in versions_to_delete
}
if not version_ids:
return LoaderActionResult(
message="Skipping. Nothing to delete.",
success=False,
)
project = selection.entities.get_project()
anatomy = Anatomy(project["name"], project_entity=project)
repres = selection.entities.get_versions_representations(version_ids)
self.log.debug(
f"Collected representations to remove ({len(repres)})"
)
filepaths_by_repre_id = {}
repre_ids_by_version_id = {
version_id: []
for version_id in version_ids
}
for repre in repres:
repre_ids_by_version_id[repre["versionId"]].append(repre["id"])
filepaths_by_repre_id[repre["id"]] = [
anatomy.fill_root(repre_file["path"])
for repre_file in repre["files"]
]
size = 0
for filepaths in filepaths_by_repre_id.values():
for filepath in filepaths:
if os.path.exists(filepath):
size += os.path.getsize(filepath)
if identifier == "calculate-versions-size":
return LoaderActionResult(
message="Calculated size",
success=True,
form=LoaderActionForm(
title="Calculated versions size",
fields=[
UILabelDef(
f"Total size of files: {format_file_size(size)}"
),
],
submit_label=None,
cancel_label="Close",
),
)
form, form_values = self._get_delete_form(
size,
remove_publish_folder,
list(version_ids),
repre_ids_by_version_id,
filepaths_by_repre_id,
)
return LoaderActionResult(
form=form,
form_values=form_values
)
def _delete_versions_step(
self, project_name: str, form_values: dict[str, Any]
) -> LoaderActionResult:
delete_data = json.loads(form_values["delete_data"])
remove_publish_folder = form_values["remove_publish_folder"]
if form_values["delete_value"].lower() != "delete":
size = delete_data["size"]
form, form_values = self._get_delete_form(
size,
remove_publish_folder,
delete_data["version_ids"],
delete_data["repre_ids_by_version_id"],
delete_data["filepaths_by_repre_id"],
True,
)
return LoaderActionResult(
form=form,
form_values=form_values,
)
version_ids = delete_data["version_ids"]
repre_ids_by_version_id = delete_data["repre_ids_by_version_id"]
filepaths_by_repre_id = delete_data["filepaths_by_repre_id"]
op_session = OperationsSession()
total_versions = len(version_ids)
try:
for version_idx, version_id in enumerate(version_ids):
self.log.info(
f"Progressing version {version_idx + 1}/{total_versions}"
)
for repre_id in repre_ids_by_version_id[version_id]:
for filepath in filepaths_by_repre_id[repre_id]:
publish_folder = os.path.dirname(filepath)
if remove_publish_folder:
if os.path.exists(publish_folder):
shutil.rmtree(publish_folder, ignore_errors=True)
continue
if os.path.exists(filepath):
os.remove(filepath)
op_session.delete_entity(
project_name, "representation", repre_id
)
op_session.delete_entity(
project_name, "version", version_id
)
self.log.info("All done")
except Exception:
self.log.error("Failed to delete versions.", exc_info=True)
return LoaderActionResult(
message="Failed to delete versions.",
success=False,
)
finally:
op_session.commit()
return LoaderActionResult(
message="Deleted versions",
success=True,
)
def _get_delete_form(
self,
size: int,
remove_publish_folder: bool,
version_ids: list[str],
repre_ids_by_version_id: dict[str, list[str]],
filepaths_by_repre_id: dict[str, list[str]],
repeated: bool = False,
) -> tuple[LoaderActionForm, dict[str, Any]]:
versions_len = len(repre_ids_by_version_id)
fields = [
UILabelDef(
f"Going to delete {versions_len} versions<br/>"
f"- total size of files: {format_file_size(size)}<br/>"
),
UILabelDef("Are you sure you want to continue?"),
TextDef(
"delete_value",
placeholder="Type 'delete' to confirm...",
),
]
if repeated:
fields.append(UILabelDef(
"*Please fill in '**delete**' to confirm deletion.*"
))
fields.extend([
TextDef(
"delete_data",
visible=False,
),
TextDef(
"step",
visible=False,
),
BoolDef(
"remove_publish_folder",
label="Remove publish folder",
default=False,
visible=False,
)
])
form = LoaderActionForm(
title="Delete versions",
submit_label="Delete",
cancel_label="Close",
fields=fields,
)
form_values = {
"delete_data": json.dumps({
"size": size,
"version_ids": version_ids,
"repre_ids_by_version_id": repre_ids_by_version_id,
"filepaths_by_repre_id": filepaths_by_repre_id,
}),
"step": "delete-versions",
"remove_publish_folder": remove_publish_folder,
}
return form, form_values