♻️ add docstrings and hints

This commit is contained in:
Ondřej Samohel 2025-08-26 15:13:52 +02:00
parent 89d0777baf
commit 941d4aee9e
No known key found for this signature in database
GPG key ID: 02376E18990A97C6

View file

@ -1,10 +1,12 @@
"""Plugin to create hero version from selected context."""
from __future__ import annotations
import os
import copy
import shutil
import errno
import itertools
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Optional
from speedcopy import copyfile
import clique
@ -20,7 +22,17 @@ from ayon_core.pipeline.publish import get_publish_template_name
from ayon_core.pipeline.template_data import get_template_data
def prepare_changes(old_entity, new_entity):
def prepare_changes(old_entity: dict, new_entity: dict) -> dict:
"""Prepare changes dict for update entity operation.
Args:
old_entity (dict): Existing entity data from database.
new_entity (dict): New entity data to compare against old.
Returns:
dict: Changes to apply to old entity to make it like new entity.
"""
changes = {}
for key in set(new_entity.keys()):
if key == "attrib":
@ -48,19 +60,21 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
icon = "star"
color = "#ffd700"
ignored_representation_names = []
ignored_representation_names: list[str] = []
db_representation_context_keys = [
"project", "folder", "asset", "hierarchy", "task", "product",
"subset", "family", "representation", "username", "user", "output"
]
use_hardlinks = False
def message(self, text):
@staticmethod
def message(text: str) -> None:
"""Show message box with text."""
msgBox = QtWidgets.QMessageBox()
msgBox.setText(text)
msgBox.setStyleSheet(style.load_stylesheet())
msgBox.setWindowFlags(
msgBox.windowFlags() | QtCore.Qt.FramelessWindowHint
msgBox.windowFlags() | QtCore.Qt.WindowType.FramelessWindowHint
)
msgBox.exec_()
@ -143,8 +157,31 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
self.message(
f"Failed to create hero version:\n{chr(10).join(errors)}")
def create_hero_version(self, instance_data, anatomy, context):
"""Create hero version from instance data."""
def create_hero_version(
self,
instance_data: dict[str, Any],
anatomy: Anatomy,
context: dict[str, Any]) -> None:
"""Create hero version from instance data.
Args:
instance_data (dict): Instance data with keys:
- productName (str): Name of the product.
- productType (str): Type of the product.
- anatomyData (dict): Anatomy data for templates.
- publishDir (str): Directory where the product is published.
- published_representations (dict): Published representations.
- versionEntity (dict, optional): Source version entity.
anatomy (Anatomy): Anatomy object for the project.
context (dict): Context data with keys:
- hostName (str): Name of the host application.
- project_settings (dict): Project settings.
Raises:
RuntimeError: If any required data is missing or an error occurs
during the hero version creation process.
"""
published_repres = instance_data.get("published_representations")
if not published_repres:
raise RuntimeError("No published representations found.")
@ -158,7 +195,6 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
instance_data.get("anatomyData", {}).get("task", {}).get("type"),
project_settings=context.get("project_settings", {}),
hero=True,
logger=None
)
hero_template = anatomy.get_template_item(
"hero", template_key, "path", default=None
@ -197,12 +233,12 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
raise RuntimeError("Version 0 cannot have hero version.")
all_copied_files = []
transfers = instance_data.get("transfers", list())
transfers = instance_data.get("transfers", [])
for _src, dst in transfers:
dst = os.path.normpath(dst)
if dst not in all_copied_files:
all_copied_files.append(dst)
hardlinks = instance_data.get("hardlinks", list())
hardlinks = instance_data.get("hardlinks", [])
for _src, dst in hardlinks:
dst = os.path.normpath(dst)
if dst not in all_copied_files:
@ -267,7 +303,6 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
instance_data["heroVersionEntity"] = new_hero_version
old_repres_to_replace = {}
old_repres_to_delete = {}
for repre_info in published_repres.values():
repre = repre_info["representation"]
repre_name_low = repre["name"].lower()
@ -275,12 +310,10 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
old_repres_to_replace[repre_name_low] = (
old_repres_by_name.pop(repre_name_low)
)
if old_repres_by_name:
old_repres_to_delete = old_repres_by_name
old_repres_to_delete = old_repres_by_name or {}
backup_hero_publish_dir = None
if os.path.exists(hero_publish_dir):
base_backup_dir = hero_publish_dir + ".BACKUP"
base_backup_dir = f"{hero_publish_dir}.BACKUP"
max_idx = 10
# Find the first available backup directory name
for idx in range(max_idx + 1):
@ -298,10 +331,11 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
try:
os.rename(hero_publish_dir, backup_hero_publish_dir)
except PermissionError:
except PermissionError as e:
raise AssertionError(
"Could not create hero version because it is "
"not possible to replace current hero files.")
"not possible to replace current hero files."
) from e
try:
src_to_dst_file_paths = []
@ -445,14 +479,41 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
os.rename(backup_hero_publish_dir, hero_publish_dir)
raise
def get_files_info(self, filepaths, anatomy):
def get_files_info(
self, filepaths: list[str], anatomy: Anatomy) -> list[dict]:
"""Get list of file info dictionaries for given file paths.
Args:
filepaths (list[str]): List of absolute file paths.
anatomy (Anatomy): Anatomy object for the project.
Returns:
list[dict]: List of file info dictionaries.
"""
file_infos = []
for filepath in filepaths:
file_info = self.prepare_file_info(filepath, anatomy)
file_infos.append(file_info)
return file_infos
def prepare_file_info(self, path, anatomy):
def prepare_file_info(self, path: str, anatomy: Anatomy) -> dict:
"""Prepare file info dictionary for given path.
Args:
path (str): Absolute file path.
anatomy (Anatomy): Anatomy object for the project.
Returns:
dict: File info dictionary with keys:
- id (str): Unique identifier for the file.
- name (str): Base name of the file.
- path (str): Rootless file path.
- size (int): Size of the file in bytes.
- hash (str): Hash of the file content.
- hash_type (str): Type of the hash used.
"""
return {
"id": create_entity_id(),
"name": os.path.basename(path),
@ -462,7 +523,22 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
"hash_type": "op3",
}
def get_publish_dir(self, instance_data, anatomy, template_key):
@staticmethod
def get_publish_dir(
instance_data: dict,
anatomy: Anatomy,
template_key: str) -> str:
"""Get publish directory from instance data and anatomy.
Args:
instance_data (dict): Instance data with "anatomyData" key.
anatomy (Anatomy): Anatomy object for the project.
template_key (str): Template key for the hero template.
Returns:
str: Normalized publish directory path.
"""
template_data = copy.deepcopy(instance_data.get("anatomyData", {}))
if "originalBasename" in instance_data:
template_data["originalBasename"] = (
@ -473,13 +549,34 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
)
return os.path.normpath(template_obj.format_strict(template_data))
def get_rootless_path(self, anatomy, path):
@staticmethod
def get_rootless_path(anatomy: Anatomy, path: str) -> str:
"""Get rootless path from absolute path.
Args:
anatomy (Anatomy): Anatomy object for the project.
path (str): Absolute file path.
Returns:
str: Rootless file path if root found, else original path.
"""
success, rootless_path = anatomy.find_root_template_from_path(path)
if success:
path = rootless_path
return path
def copy_file(self, src_path, dst_path):
def copy_file(self, src_path: str, dst_path: str) -> None:
"""Copy file from src to dst with creating directories.
Args:
src_path (str): Source file path.
dst_path (str): Destination file path.
Raises:
OSError: If copying or linking fails.
"""
dirname = os.path.dirname(dst_path)
try:
os.makedirs(dirname)
@ -495,23 +592,39 @@ class CreateHeroVersion(load.ProductLoaderPlugin):
raise
copyfile(src_path, dst_path)
def version_from_representations(self, project_name, repres):
@staticmethod
def version_from_representations(
project_name: str, repres: dict) -> Optional[dict[str, Any]]:
"""Find version from representations.
Args:
project_name (str): Name of the project.
repres (dict): Dictionary of representations info.
Returns:
Optional[dict]: Version entity if found, else None.
"""
for repre_info in repres.values():
version = ayon_api.get_version_by_id(
project_name, repre_info["representation"]["versionId"]
)
if version:
return version
return None
def current_hero_ents(self, project_name, version):
@staticmethod
def current_hero_ents(
project_name: str,
version: dict[str, Any]) -> tuple[Any, list[dict[str, Any]]]:
hero_version = ayon_api.get_hero_version_by_product_id(
project_name, version["productId"]
)
if not hero_version:
return (None, [])
return None, []
hero_repres = list(
ayon_api.get_representations(
project_name, version_ids={hero_version["id"]}
)
)
return (hero_version, hero_repres)
return hero_version, hero_repres