From 941d4aee9ea66b758bd337560fb48b6e38a1b1c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Samohel?= Date: Tue, 26 Aug 2025 15:13:52 +0200 Subject: [PATCH] :recycle: add docstrings and hints --- .../plugins/load/create_hero_version.py | 165 +++++++++++++++--- 1 file changed, 139 insertions(+), 26 deletions(-) diff --git a/client/ayon_core/plugins/load/create_hero_version.py b/client/ayon_core/plugins/load/create_hero_version.py index adf9d5f669..aef0cf8863 100644 --- a/client/ayon_core/plugins/load/create_hero_version.py +++ b/client/ayon_core/plugins/load/create_hero_version.py @@ -1,10 +1,12 @@ - +"""Plugin to create hero version from selected context.""" +from __future__ import annotations import os import copy import shutil import errno import itertools from concurrent.futures import ThreadPoolExecutor +from typing import Any, Optional from speedcopy import copyfile import clique @@ -20,7 +22,17 @@ from ayon_core.pipeline.publish import get_publish_template_name from ayon_core.pipeline.template_data import get_template_data -def prepare_changes(old_entity, new_entity): +def prepare_changes(old_entity: dict, new_entity: dict) -> dict: + """Prepare changes dict for update entity operation. + + Args: + old_entity (dict): Existing entity data from database. + new_entity (dict): New entity data to compare against old. + + Returns: + dict: Changes to apply to old entity to make it like new entity. + + """ changes = {} for key in set(new_entity.keys()): if key == "attrib": @@ -48,19 +60,21 @@ class CreateHeroVersion(load.ProductLoaderPlugin): icon = "star" color = "#ffd700" - ignored_representation_names = [] + ignored_representation_names: list[str] = [] db_representation_context_keys = [ "project", "folder", "asset", "hierarchy", "task", "product", "subset", "family", "representation", "username", "user", "output" ] use_hardlinks = False - def message(self, text): + @staticmethod + def message(text: str) -> None: + """Show message box with text.""" msgBox = QtWidgets.QMessageBox() msgBox.setText(text) msgBox.setStyleSheet(style.load_stylesheet()) msgBox.setWindowFlags( - msgBox.windowFlags() | QtCore.Qt.FramelessWindowHint + msgBox.windowFlags() | QtCore.Qt.WindowType.FramelessWindowHint ) msgBox.exec_() @@ -143,8 +157,31 @@ class CreateHeroVersion(load.ProductLoaderPlugin): self.message( f"Failed to create hero version:\n{chr(10).join(errors)}") - def create_hero_version(self, instance_data, anatomy, context): - """Create hero version from instance data.""" + def create_hero_version( + self, + instance_data: dict[str, Any], + anatomy: Anatomy, + context: dict[str, Any]) -> None: + """Create hero version from instance data. + + Args: + instance_data (dict): Instance data with keys: + - productName (str): Name of the product. + - productType (str): Type of the product. + - anatomyData (dict): Anatomy data for templates. + - publishDir (str): Directory where the product is published. + - published_representations (dict): Published representations. + - versionEntity (dict, optional): Source version entity. + anatomy (Anatomy): Anatomy object for the project. + context (dict): Context data with keys: + - hostName (str): Name of the host application. + - project_settings (dict): Project settings. + + Raises: + RuntimeError: If any required data is missing or an error occurs + during the hero version creation process. + + """ published_repres = instance_data.get("published_representations") if not published_repres: raise RuntimeError("No published representations found.") @@ -158,7 +195,6 @@ class CreateHeroVersion(load.ProductLoaderPlugin): instance_data.get("anatomyData", {}).get("task", {}).get("type"), project_settings=context.get("project_settings", {}), hero=True, - logger=None ) hero_template = anatomy.get_template_item( "hero", template_key, "path", default=None @@ -197,12 +233,12 @@ class CreateHeroVersion(load.ProductLoaderPlugin): raise RuntimeError("Version 0 cannot have hero version.") all_copied_files = [] - transfers = instance_data.get("transfers", list()) + transfers = instance_data.get("transfers", []) for _src, dst in transfers: dst = os.path.normpath(dst) if dst not in all_copied_files: all_copied_files.append(dst) - hardlinks = instance_data.get("hardlinks", list()) + hardlinks = instance_data.get("hardlinks", []) for _src, dst in hardlinks: dst = os.path.normpath(dst) if dst not in all_copied_files: @@ -267,7 +303,6 @@ class CreateHeroVersion(load.ProductLoaderPlugin): instance_data["heroVersionEntity"] = new_hero_version old_repres_to_replace = {} - old_repres_to_delete = {} for repre_info in published_repres.values(): repre = repre_info["representation"] repre_name_low = repre["name"].lower() @@ -275,12 +310,10 @@ class CreateHeroVersion(load.ProductLoaderPlugin): old_repres_to_replace[repre_name_low] = ( old_repres_by_name.pop(repre_name_low) ) - if old_repres_by_name: - old_repres_to_delete = old_repres_by_name - + old_repres_to_delete = old_repres_by_name or {} backup_hero_publish_dir = None if os.path.exists(hero_publish_dir): - base_backup_dir = hero_publish_dir + ".BACKUP" + base_backup_dir = f"{hero_publish_dir}.BACKUP" max_idx = 10 # Find the first available backup directory name for idx in range(max_idx + 1): @@ -298,10 +331,11 @@ class CreateHeroVersion(load.ProductLoaderPlugin): try: os.rename(hero_publish_dir, backup_hero_publish_dir) - except PermissionError: + except PermissionError as e: raise AssertionError( "Could not create hero version because it is " - "not possible to replace current hero files.") + "not possible to replace current hero files." + ) from e try: src_to_dst_file_paths = [] @@ -445,14 +479,41 @@ class CreateHeroVersion(load.ProductLoaderPlugin): os.rename(backup_hero_publish_dir, hero_publish_dir) raise - def get_files_info(self, filepaths, anatomy): + def get_files_info( + self, filepaths: list[str], anatomy: Anatomy) -> list[dict]: + """Get list of file info dictionaries for given file paths. + + Args: + filepaths (list[str]): List of absolute file paths. + anatomy (Anatomy): Anatomy object for the project. + + Returns: + list[dict]: List of file info dictionaries. + + """ file_infos = [] for filepath in filepaths: file_info = self.prepare_file_info(filepath, anatomy) file_infos.append(file_info) return file_infos - def prepare_file_info(self, path, anatomy): + def prepare_file_info(self, path: str, anatomy: Anatomy) -> dict: + """Prepare file info dictionary for given path. + + Args: + path (str): Absolute file path. + anatomy (Anatomy): Anatomy object for the project. + + Returns: + dict: File info dictionary with keys: + - id (str): Unique identifier for the file. + - name (str): Base name of the file. + - path (str): Rootless file path. + - size (int): Size of the file in bytes. + - hash (str): Hash of the file content. + - hash_type (str): Type of the hash used. + + """ return { "id": create_entity_id(), "name": os.path.basename(path), @@ -462,7 +523,22 @@ class CreateHeroVersion(load.ProductLoaderPlugin): "hash_type": "op3", } - def get_publish_dir(self, instance_data, anatomy, template_key): + @staticmethod + def get_publish_dir( + instance_data: dict, + anatomy: Anatomy, + template_key: str) -> str: + """Get publish directory from instance data and anatomy. + + Args: + instance_data (dict): Instance data with "anatomyData" key. + anatomy (Anatomy): Anatomy object for the project. + template_key (str): Template key for the hero template. + + Returns: + str: Normalized publish directory path. + + """ template_data = copy.deepcopy(instance_data.get("anatomyData", {})) if "originalBasename" in instance_data: template_data["originalBasename"] = ( @@ -473,13 +549,34 @@ class CreateHeroVersion(load.ProductLoaderPlugin): ) return os.path.normpath(template_obj.format_strict(template_data)) - def get_rootless_path(self, anatomy, path): + @staticmethod + def get_rootless_path(anatomy: Anatomy, path: str) -> str: + """Get rootless path from absolute path. + + Args: + anatomy (Anatomy): Anatomy object for the project. + path (str): Absolute file path. + + Returns: + str: Rootless file path if root found, else original path. + + """ success, rootless_path = anatomy.find_root_template_from_path(path) if success: path = rootless_path return path - def copy_file(self, src_path, dst_path): + def copy_file(self, src_path: str, dst_path: str) -> None: + """Copy file from src to dst with creating directories. + + Args: + src_path (str): Source file path. + dst_path (str): Destination file path. + + Raises: + OSError: If copying or linking fails. + + """ dirname = os.path.dirname(dst_path) try: os.makedirs(dirname) @@ -495,23 +592,39 @@ class CreateHeroVersion(load.ProductLoaderPlugin): raise copyfile(src_path, dst_path) - def version_from_representations(self, project_name, repres): + @staticmethod + def version_from_representations( + project_name: str, repres: dict) -> Optional[dict[str, Any]]: + """Find version from representations. + + Args: + project_name (str): Name of the project. + repres (dict): Dictionary of representations info. + + Returns: + Optional[dict]: Version entity if found, else None. + + """ for repre_info in repres.values(): version = ayon_api.get_version_by_id( project_name, repre_info["representation"]["versionId"] ) if version: return version + return None - def current_hero_ents(self, project_name, version): + @staticmethod + def current_hero_ents( + project_name: str, + version: dict[str, Any]) -> tuple[Any, list[dict[str, Any]]]: hero_version = ayon_api.get_hero_version_by_product_id( project_name, version["productId"] ) if not hero_version: - return (None, []) + return None, [] hero_repres = list( ayon_api.get_representations( project_name, version_ids={hero_version["id"]} ) ) - return (hero_version, hero_repres) + return hero_version, hero_repres