From 4d90d35fc7204e97e4588c9f7969d58e7037630a Mon Sep 17 00:00:00 2001 From: Jakub Trllo <43494761+iLLiCiTiT@users.noreply.github.com> Date: Thu, 13 Nov 2025 15:01:08 +0100 Subject: [PATCH] Extended open file possibilities --- client/ayon_core/plugins/loader/open_file.py | 286 ++++++++++++++++--- 1 file changed, 254 insertions(+), 32 deletions(-) diff --git a/client/ayon_core/plugins/loader/open_file.py b/client/ayon_core/plugins/loader/open_file.py index 9b5a6fec20..80ddf925d3 100644 --- a/client/ayon_core/plugins/loader/open_file.py +++ b/client/ayon_core/plugins/loader/open_file.py @@ -1,8 +1,10 @@ import os import sys import subprocess +import platform import collections -from typing import Optional, Any +import ctypes +from typing import Optional, Any, Callable from ayon_core.pipeline.load import get_representation_path_with_anatomy from ayon_core.pipeline.actions import ( @@ -13,6 +15,232 @@ from ayon_core.pipeline.actions import ( ) +WINDOWS_USER_REG_PATH = ( + r"Software\Microsoft\Windows\CurrentVersion\Explorer\FileExts" + r"\{ext}\UserChoice" +) + + +class _Cache: + supported_exts: set[str] = set() + unsupported_exts: set[str] = set() + + @classmethod + def is_supported(cls, ext: str) -> bool: + return ext in cls.supported_exts + + @classmethod + def already_checked(cls, ext: str) -> bool: + return ( + ext in cls.supported_exts + or ext in cls.unsupported_exts + ) + + @classmethod + def set_ext_support(cls, ext: str, supported: bool) -> None: + if supported: + cls.supported_exts.add(ext) + else: + cls.unsupported_exts.add(ext) + + +def _extension_has_assigned_app_windows(ext: str) -> bool: + import winreg + progid = None + try: + with winreg.OpenKey( + winreg.HKEY_CURRENT_USER, + WINDOWS_USER_REG_PATH.format(ext=ext), + ) as k: + progid, _ = winreg.QueryValueEx(k, "ProgId") + except OSError: + pass + + if progid: + return True + + try: + with winreg.OpenKey(winreg.HKEY_CLASSES_ROOT, ext) as k: + progid = winreg.QueryValueEx(k, None)[0] + except OSError: + pass + return bool(progid) + + +def _linux_find_desktop_file(desktop: str) -> Optional[str]: + for p in ( + os.path.join(os.path.expanduser("~/.local/share/applications"), desktop), + os.path.join("/usr/share/applications", desktop), + os.path.join("/usr/local/share/applications", desktop), + ): + if os.path.isfile(p): + return p + return None + + +def _extension_has_assigned_app_linux(ext: str) -> bool: + import mimetypes + + mime, _ = mimetypes.guess_type(f"file{ext}") + if not mime: + return False + + try: + # xdg-mime query default + desktop = subprocess.check_output( + ["xdg-mime", "query", "default", mime], + text=True + ).strip() or None + except Exception: + desktop = None + + if not desktop: + return False + + desktop_path = _linux_find_desktop_file(desktop) + if not desktop_path: + return False + if desktop_path and os.path.isfile(desktop_path): + return True + return False + + +def _extension_has_assigned_app_macos(ext: str): + # Uses CoreServices/LaunchServices and Uniform Type Identifiers via ctypes. + # Steps: ext -> UTI -> default handler bundle id for role 'all'. + cf = ctypes.cdll.LoadLibrary( + "/System/Library/Frameworks/CoreFoundation.framework/CoreFoundation" + ) + ls = ctypes.cdll.LoadLibrary( + "/System/Library/Frameworks/CoreServices.framework/Frameworks" + "/LaunchServices.framework/LaunchServices" + ) + + # CFType/CFString helpers + CFStringRef = ctypes.c_void_p + CFURLRef = ctypes.c_void_p + CFAllocatorRef = ctypes.c_void_p + CFIndex = ctypes.c_long + UniChar = ctypes.c_ushort + + kCFStringEncodingUTF8 = 0x08000100 + + cf.CFStringCreateWithCString.argtypes = [CFAllocatorRef, ctypes.c_char_p, ctypes.c_uint32] + cf.CFStringCreateWithCString.restype = CFStringRef + + cf.CFStringGetCStringPtr.argtypes = [CFStringRef, ctypes.c_uint32] + cf.CFStringGetCStringPtr.restype = ctypes.c_char_p + + cf.CFStringGetCString.argtypes = [CFStringRef, ctypes.c_char_p, CFIndex, ctypes.c_uint32] + cf.CFStringGetCString.restype = ctypes.c_bool + + cf.CFRelease.argtypes = [ctypes.c_void_p] + cf.CFRelease.restype = None + + try: + UTTypeCreatePreferredIdentifierForTag = ctypes.cdll.LoadLibrary( + "/System/Library/Frameworks/CoreServices.framework/CoreServices" + ).UTTypeCreatePreferredIdentifierForTag + except OSError: + # Fallback path (older systems) + UTTypeCreatePreferredIdentifierForTag = ( + ls.UTTypeCreatePreferredIdentifierForTag + ) + UTTypeCreatePreferredIdentifierForTag.argtypes = [ + CFStringRef, CFStringRef, CFStringRef + ] + UTTypeCreatePreferredIdentifierForTag.restype = CFStringRef + + LSRolesMask = ctypes.c_uint + kLSRolesAll = 0xFFFFFFFF + ls.LSCopyDefaultRoleHandlerForContentType.argtypes = [ + CFStringRef, LSRolesMask + ] + ls.LSCopyDefaultRoleHandlerForContentType.restype = CFStringRef + + def cfstr(py_s: str) -> CFStringRef: + return cf.CFStringCreateWithCString( + None, py_s.encode("utf-8"), kCFStringEncodingUTF8 + ) + + def to_pystr(cf_s: CFStringRef) -> Optional[str]: + if not cf_s: + return None + # Try fast pointer + ptr = cf.CFStringGetCStringPtr(cf_s, kCFStringEncodingUTF8) + if ptr: + return ctypes.cast(ptr, ctypes.c_char_p).value.decode("utf-8") + + # Fallback buffer + buf_size = 1024 + buf = ctypes.create_string_buffer(buf_size) + ok = cf.CFStringGetCString( + cf_s, buf, buf_size, kCFStringEncodingUTF8 + ) + if ok: + return buf.value.decode("utf-8") + return None + + # Convert extension (without dot) to UTI + tag_class = cfstr("public.filename-extension") + tag_value = cfstr(ext.lstrip(".")) + + uti_ref = UTTypeCreatePreferredIdentifierForTag( + tag_class, tag_value, None + ) + uti = to_pystr(uti_ref) + + # Clean up temporary CFStrings + for ref in (tag_class, tag_value): + if ref: + cf.CFRelease(ref) + + bundle_id = None + if uti_ref: + # Get default handler for the UTI + default_bundle_ref = ls.LSCopyDefaultRoleHandlerForContentType( + uti_ref, kLSRolesAll + ) + bundle_id = to_pystr(default_bundle_ref) + if default_bundle_ref: + cf.CFRelease(default_bundle_ref) + cf.CFRelease(uti_ref) + return bundle_id is not None + + +def _filter_supported_exts( + extensions: set[str], test_func: Callable +) -> set[str]: + filtered_exs: set[str] = set() + for ext in extensions: + if not _Cache.already_checked(ext): + r = test_func(ext) + print(ext, r) + _Cache.set_ext_support(ext, r) + if _Cache.is_supported(ext): + filtered_exs.add(ext) + return filtered_exs + + +def filter_supported_exts(extensions: set[str]) -> set[str]: + if not extensions: + return set() + platform_name = platform.system().lower() + if platform_name == "windows": + return _filter_supported_exts( + extensions, _extension_has_assigned_app_windows + ) + if platform_name == "linux": + return _filter_supported_exts( + extensions, _extension_has_assigned_app_linux + ) + if platform_name == "darwin": + return _filter_supported_exts( + extensions, _extension_has_assigned_app_macos + ) + return set() + + def open_file(filepath: str) -> None: """Open file with system default executable""" if sys.platform.startswith("darwin"): @@ -27,8 +255,6 @@ class OpenFileAction(LoaderActionPlugin): """Open Image Sequence or Video with system default""" identifier = "core.open-file" - product_types = {"render2d"} - def get_action_items( self, selection: LoaderActionSelection ) -> list[LoaderActionItem]: @@ -46,37 +272,32 @@ class OpenFileAction(LoaderActionPlugin): if not repres: return [] - repre_ids = {repre["id"] for repre in repres} - versions = selection.entities.get_representations_versions( - repre_ids - ) - product_ids = {version["productId"] for version in versions} - products = selection.entities.get_products(product_ids) - filtered_product_ids = { - product["id"] - for product in products - if product["productType"] in self.product_types - } - if not filtered_product_ids: + repres_by_ext = collections.defaultdict(list) + for repre in repres: + repre_context = repre.get("context") + if not repre_context: + continue + ext = repre_context.get("ext") + if not ext: + path = repre["attrib"].get("path") + if path: + ext = os.path.splitext(path)[1] + + if ext: + ext = ext.lower() + if not ext.startswith("."): + ext = f".{ext}" + repres_by_ext[ext.lower()].append(repre) + + if not repres_by_ext: return [] - versions_by_product_id = collections.defaultdict(list) - for version in versions: - versions_by_product_id[version["productId"]].append(version) - - repres_by_version_ids = collections.defaultdict(list) - for repre in repres: - repres_by_version_ids[repre["versionId"]].append(repre) - - filtered_repres = [] - for product_id in filtered_product_ids: - for version in versions_by_product_id[product_id]: - for repre in repres_by_version_ids[version["id"]]: - filtered_repres.append(repre) + filtered_exts = filter_supported_exts(set(repres_by_ext)) repre_ids_by_name = collections.defaultdict(set) - for repre in filtered_repres: - repre_ids_by_name[repre["name"]].add(repre["id"]) + for ext in filtered_exts: + for repre in repres_by_ext[ext]: + repre_ids_by_name[repre["name"]].add(repre["id"]) return [ LoaderActionItem( @@ -86,8 +307,8 @@ class OpenFileAction(LoaderActionPlugin): data={"representation_ids": list(repre_ids)}, icon={ "type": "material-symbols", - "name": "play_circle", - "color": "#FFA500", + "name": "file_open", + "color": "#ffffff", } ) for repre_name, repre_ids in repre_ids_by_name.items() @@ -122,6 +343,7 @@ class OpenFileAction(LoaderActionPlugin): ) self.log.info(f"Opening: {path}") + open_file(path) return LoaderActionResult(