# -*- coding: utf-8 -*- """Base class for Pype Modules.""" import os import sys import json import time import inspect import logging import platform import threading import collections import traceback from uuid import uuid4 from abc import ABCMeta, abstractmethod import six from openpype.settings import ( get_system_settings, SYSTEM_SETTINGS_KEY, PROJECT_SETTINGS_KEY, SCHEMA_KEY_SYSTEM_SETTINGS, SCHEMA_KEY_PROJECT_SETTINGS ) from openpype.settings.lib import ( get_studio_system_settings_overrides, load_json_file ) from openpype.lib import ( Logger, import_filepath, import_module_from_dirpath ) from .interfaces import ( OpenPypeInterface, IPluginPaths, IHostAddon, ITrayModule, ITrayService ) # Files that will be always ignored on modules import IGNORED_FILENAMES = ( "__pycache__", ) # Files ignored on modules import from "./openpype/modules" IGNORED_DEFAULT_FILENAMES = ( "__init__.py", "base.py", "interfaces.py", "example_addons", "default_modules", ) # Inherit from `object` for Python 2 hosts class _ModuleClass(object): """Fake module class for storing OpenPype modules. Object of this class can be stored to `sys.modules` and used for storing dynamically imported modules. """ def __init__(self, name): # Call setattr on super class super(_ModuleClass, self).__setattr__("name", name) super(_ModuleClass, self).__setattr__("__name__", name) # Where modules and interfaces are stored super(_ModuleClass, self).__setattr__("__attributes__", dict()) super(_ModuleClass, self).__setattr__("__defaults__", set()) super(_ModuleClass, self).__setattr__("_log", None) def __getattr__(self, attr_name): if attr_name not in self.__attributes__: if attr_name in ("__path__", "__file__"): return None raise AttributeError("'{}' has not attribute '{}'".format( self.name, attr_name )) return self.__attributes__[attr_name] def __iter__(self): for module in self.values(): yield module def __setattr__(self, attr_name, value): if attr_name in self.__attributes__: self.log.warning( "Duplicated name \"{}\" in {}. Overriding.".format( self.name, attr_name ) ) self.__attributes__[attr_name] = value def __setitem__(self, key, value): self.__setattr__(key, value) def __getitem__(self, key): return getattr(self, key) @property def log(self): if self._log is None: super(_ModuleClass, self).__setattr__( "_log", Logger.get_logger(self.name) ) return self._log def get(self, key, default=None): return self.__attributes__.get(key, default) def keys(self): return self.__attributes__.keys() def values(self): return self.__attributes__.values() def items(self): return self.__attributes__.items() class _InterfacesClass(_ModuleClass): """Fake module class for storing OpenPype interfaces. MissingInterface object is returned if interfaces does not exists. - this is because interfaces must be available even if are missing implementation """ def __getattr__(self, attr_name): if attr_name not in self.__attributes__: if attr_name in ("__path__", "__file__"): return None raise AttributeError(( "cannot import name '{}' from 'openpype_interfaces'" ).format(attr_name)) if _LoadCache.interfaces_loaded and attr_name != "log": stack = list(traceback.extract_stack()) stack.pop(-1) self.log.warning(( "Using deprecated import of \"{}\" from 'openpype_interfaces'." " Please switch to use import" " from 'openpype.modules.interfaces'" " (will be removed after 3.16.x).{}" ).format(attr_name, "".join(traceback.format_list(stack)))) return self.__attributes__[attr_name] class _LoadCache: interfaces_lock = threading.Lock() modules_lock = threading.Lock() interfaces_loaded = False modules_loaded = False def get_default_modules_dir(): """Path to default OpenPype modules.""" current_dir = os.path.dirname(os.path.abspath(__file__)) output = [] for folder_name in ("default_modules", ): path = os.path.join(current_dir, folder_name) if os.path.exists(path) and os.path.isdir(path): output.append(path) return output def get_dynamic_modules_dirs(): """Possible paths to OpenPype Addons of Modules. Paths are loaded from studio settings under: `modules -> addon_paths -> {platform name}` Path may contain environment variable as a formatting string. They are not validated or checked their existence. Returns: list: Paths loaded from studio overrides. """ output = [] value = get_studio_system_settings_overrides() for key in ("modules", "addon_paths", platform.system().lower()): if key not in value: return output value = value[key] for path in value: if not path: continue try: path = path.format(**os.environ) except Exception: pass output.append(path) return output def get_module_dirs(): """List of paths where OpenPype modules can be found.""" _dirpaths = [] _dirpaths.extend(get_default_modules_dir()) _dirpaths.extend(get_dynamic_modules_dirs()) dirpaths = [] for path in _dirpaths: if not path: continue normalized = os.path.normpath(path) if normalized not in dirpaths: dirpaths.append(normalized) return dirpaths def load_interfaces(force=False): """Load interfaces from modules into `openpype_interfaces`. Only classes which inherit from `OpenPypeInterface` are loaded and stored. Args: force(bool): Force to load interfaces even if are already loaded. This won't update already loaded and used (cached) interfaces. """ if _LoadCache.interfaces_loaded and not force: return if not _LoadCache.interfaces_lock.locked(): with _LoadCache.interfaces_lock: _load_interfaces() _LoadCache.interfaces_loaded = True else: # If lock is locked wait until is finished while _LoadCache.interfaces_lock.locked(): time.sleep(0.1) def _load_interfaces(): # Key under which will be modules imported in `sys.modules` modules_key = "openpype_interfaces" sys.modules[modules_key] = openpype_interfaces = ( _InterfacesClass(modules_key) ) from . import interfaces for attr_name in dir(interfaces): attr = getattr(interfaces, attr_name) if ( not inspect.isclass(attr) or attr is OpenPypeInterface or not issubclass(attr, OpenPypeInterface) ): continue setattr(openpype_interfaces, attr_name, attr) def load_modules(force=False): """Load OpenPype modules as python modules. Modules does not load only classes (like in Interfaces) because there must be ability to use inner code of module and be able to import it from one defined place. With this it is possible to import module's content from predefined module. Function makes sure that `load_interfaces` was triggered. Modules import has specific order which can't be changed. Args: force(bool): Force to load modules even if are already loaded. This won't update already loaded and used (cached) modules. """ if _LoadCache.modules_loaded and not force: return # First load interfaces # - modules must not be imported before interfaces load_interfaces(force) if not _LoadCache.modules_lock.locked(): with _LoadCache.modules_lock: _load_modules() _LoadCache.modules_loaded = True else: # If lock is locked wait until is finished while _LoadCache.modules_lock.locked(): time.sleep(0.1) def _load_modules(): # Key under which will be modules imported in `sys.modules` modules_key = "openpype_modules" # Change `sys.modules` sys.modules[modules_key] = openpype_modules = _ModuleClass(modules_key) log = Logger.get_logger("ModulesLoader") # Look for OpenPype modules in paths defined with `get_module_dirs` # - dynamically imported OpenPype modules and addons module_dirs = get_module_dirs() # Add current directory at first place # - has small differences in import logic current_dir = os.path.abspath(os.path.dirname(__file__)) hosts_dir = os.path.join(os.path.dirname(current_dir), "hosts") module_dirs.insert(0, hosts_dir) module_dirs.insert(0, current_dir) addons_dir = os.path.join(os.path.dirname(current_dir), "addons") module_dirs.append(addons_dir) processed_paths = set() for dirpath in frozenset(module_dirs): # Skip already processed paths if dirpath in processed_paths: continue processed_paths.add(dirpath) if not os.path.exists(dirpath): log.warning(( "Could not find path when loading OpenPype modules \"{}\"" ).format(dirpath)) continue is_in_current_dir = dirpath == current_dir is_in_host_dir = dirpath == hosts_dir for filename in os.listdir(dirpath): # Ignore filenames if filename in IGNORED_FILENAMES: continue if ( is_in_current_dir and filename in IGNORED_DEFAULT_FILENAMES ): continue fullpath = os.path.join(dirpath, filename) basename, ext = os.path.splitext(filename) # Validations if os.path.isdir(fullpath): # Check existence of init file init_path = os.path.join(fullpath, "__init__.py") if not os.path.exists(init_path): log.debug(( "Module directory does not contain __init__.py" " file {}" ).format(fullpath)) continue elif ext not in (".py", ): continue # TODO add more logic how to define if folder is module or not # - check manifest and content of manifest try: # Don't import dynamically current directory modules if is_in_current_dir: import_str = "openpype.modules.{}".format(basename) new_import_str = "{}.{}".format(modules_key, basename) default_module = __import__(import_str, fromlist=("", )) sys.modules[new_import_str] = default_module setattr(openpype_modules, basename, default_module) elif is_in_host_dir: import_str = "openpype.hosts.{}".format(basename) new_import_str = "{}.{}".format(modules_key, basename) # Until all hosts are converted to be able use them as # modules is this error check needed try: default_module = __import__( import_str, fromlist=("", ) ) sys.modules[new_import_str] = default_module setattr(openpype_modules, basename, default_module) except Exception: log.warning( "Failed to import host folder {}".format(basename), exc_info=True ) elif os.path.isdir(fullpath): import_module_from_dirpath(dirpath, filename, modules_key) else: module = import_filepath(fullpath) setattr(openpype_modules, basename, module) except Exception: if is_in_current_dir: msg = "Failed to import default module '{}'.".format( basename ) else: msg = "Failed to import module '{}'.".format(fullpath) log.error(msg, exc_info=True) @six.add_metaclass(ABCMeta) class OpenPypeModule: """Base class of pype module. Attributes: id (UUID): Module's id. enabled (bool): Is module enabled. name (str): Module name. manager (ModulesManager): Manager that created the module. """ # Disable by default enabled = False _id = None @property @abstractmethod def name(self): """Module's name.""" pass def __init__(self, manager, settings): self.manager = manager self.log = Logger.get_logger(self.name) self.initialize(settings) @property def id(self): if self._id is None: self._id = uuid4() return self._id @abstractmethod def initialize(self, module_settings): """Initialization of module attributes. It is not recommended to override __init__ that's why specific method was implemented. """ pass def connect_with_modules(self, enabled_modules): """Connect with other enabled modules.""" pass def get_global_environments(self): """Get global environments values of module. Environment variables that can be get only from system settings. """ return {} def modify_application_launch_arguments(self, application, env): """Give option to modify launch environments before application launch. Implementation is optional. To change environments modify passed dictionary of environments. Args: application (Application): Application that is launched. env (dict): Current environment variables. """ pass def on_host_install(self, host, host_name, project_name): """Host was installed which gives option to handle in-host logic. It is a good option to register in-host event callbacks which are specific for the module. The module is kept in memory for rest of the process. Arguments may change in future. E.g. 'host_name' should be possible to receive from 'host' object. Args: host (ModuleType): Access to installed/registered host object. host_name (str): Name of host. project_name (str): Project name which is main part of host context. """ pass def cli(self, module_click_group): """Add commands to click group. The best practise is to create click group for whole module which is used to separate commands. class MyPlugin(OpenPypeModule): ... def cli(self, module_click_group): module_click_group.add_command(cli_main) @click.group(, help="") def cli_main(): pass @cli_main.command() def mycommand(): print("my_command") """ pass class OpenPypeAddOn(OpenPypeModule): # Enable Addon by default enabled = True def initialize(self, module_settings): """Initialization is not be required for most of addons.""" pass class ModulesManager: """Manager of Pype modules helps to load and prepare them to work. Args: modules_settings(dict): To be able create module manager with specified data. For settings changes callbacks and testing purposes. """ # Helper attributes for report _report_total_key = "Total" def __init__(self, _system_settings=None): self.log = logging.getLogger(self.__class__.__name__) self._system_settings = _system_settings self.modules = [] self.modules_by_id = {} self.modules_by_name = {} # For report of time consumption self._report = {} self.initialize_modules() self.connect_modules() def __getitem__(self, module_name): return self.modules_by_name[module_name] def get(self, module_name, default=None): """Access module by name. Args: module_name (str): Name of module which should be returned. default (Any): Default output if module is not available. Returns: Union[OpenPypeModule, None]: Module found by name or None. """ return self.modules_by_name.get(module_name, default) def get_enabled_module(self, module_name, default=None): """Fast access to enabled module. If module is available but is not enabled default value is returned. Args: module_name (str): Name of module which should be returned. default (Any): Default output if module is not available or is not enabled. Returns: Union[OpenPypeModule, None]: Enabled module found by name or None. """ module = self.get(module_name) if module is not None and module.enabled: return module return default def initialize_modules(self): """Import and initialize modules.""" # Make sure modules are loaded load_modules() import openpype_modules self.log.debug("*** Pype modules initialization.") # Prepare settings for modules system_settings = getattr(self, "_system_settings", None) if system_settings is None: system_settings = get_system_settings() modules_settings = system_settings["modules"] report = {} time_start = time.time() prev_start_time = time_start module_classes = [] for module in openpype_modules: # Go through globals in `pype.modules` for name in dir(module): modules_item = getattr(module, name, None) # Filter globals that are not classes which inherit from # OpenPypeModule if ( not inspect.isclass(modules_item) or modules_item is OpenPypeModule or modules_item is OpenPypeAddOn or not issubclass(modules_item, OpenPypeModule) ): continue # Check if class is abstract (Developing purpose) if inspect.isabstract(modules_item): # Find abstract attributes by convention on `abc` module not_implemented = [] for attr_name in dir(modules_item): attr = getattr(modules_item, attr_name, None) abs_method = getattr( attr, "__isabstractmethod__", None ) if attr and abs_method: not_implemented.append(attr_name) # Log missing implementations self.log.warning(( "Skipping abstract Class: {}." " Missing implementations: {}" ).format(name, ", ".join(not_implemented))) continue module_classes.append(modules_item) for modules_item in module_classes: try: name = modules_item.__name__ # Try initialize module module = modules_item(self, modules_settings) # Store initialized object self.modules.append(module) self.modules_by_id[module.id] = module self.modules_by_name[module.name] = module enabled_str = "X" if not module.enabled: enabled_str = " " self.log.debug("[{}] {}".format(enabled_str, name)) now = time.time() report[module.__class__.__name__] = now - prev_start_time prev_start_time = now except Exception: self.log.warning( "Initialization of module {} failed.".format(name), exc_info=True ) if self._report is not None: report[self._report_total_key] = time.time() - time_start self._report["Initialization"] = report def connect_modules(self): """Trigger connection with other enabled modules. Modules should handle their interfaces in `connect_with_modules`. """ report = {} time_start = time.time() prev_start_time = time_start enabled_modules = self.get_enabled_modules() self.log.debug("Has {} enabled modules.".format(len(enabled_modules))) for module in enabled_modules: try: module.connect_with_modules(enabled_modules) except Exception: self.log.error( "BUG: Module failed on connection with other modules.", exc_info=True ) now = time.time() report[module.__class__.__name__] = now - prev_start_time prev_start_time = now if self._report is not None: report[self._report_total_key] = time.time() - time_start self._report["Connect modules"] = report def get_enabled_modules(self): """Enabled modules initialized by the manager. Returns: list: Initialized and enabled modules. """ return [ module for module in self.modules if module.enabled ] def collect_global_environments(self): """Helper to collect global environment variabled from modules. Returns: dict: Global environment variables from enabled modules. Raises: AssertionError: Global environment variables must be unique for all modules. """ module_envs = {} for module in self.get_enabled_modules(): # Collect global module's global environments _envs = module.get_global_environments() for key, value in _envs.items(): if key in module_envs: # TODO better error message raise AssertionError( "Duplicated environment key {}".format(key) ) module_envs[key] = value return module_envs def collect_plugin_paths(self): """Helper to collect all plugins from modules inherited IPluginPaths. Unknown keys are logged out. Returns: dict: Output is dictionary with keys "publish", "create", "load" and "actions" each containing list of paths. """ # Output structure output = { "publish": [], "create": [], "load": [], "actions": [] } unknown_keys_by_module = {} for module in self.get_enabled_modules(): # Skip module that do not inherit from `IPluginPaths` if not isinstance(module, IPluginPaths): continue plugin_paths = module.get_plugin_paths() for key, value in plugin_paths.items(): # Filter unknown keys if key not in output: if module.name not in unknown_keys_by_module: unknown_keys_by_module[module.name] = [] unknown_keys_by_module[module.name].append(key) continue # Skip if value is empty if not value: continue # Convert to list if value is not list if not isinstance(value, (list, tuple, set)): value = [value] output[key].extend(value) # Report unknown keys (Developing purposes) if unknown_keys_by_module: expected_keys = ", ".join([ "\"{}\"".format(key) for key in output.keys() ]) msg_template = "Module: \"{}\" - got key {}" msg_items = [] for module_name, keys in unknown_keys_by_module.items(): joined_keys = ", ".join([ "\"{}\"".format(key) for key in keys ]) msg_items.append(msg_template.format(module_name, joined_keys)) self.log.warning(( "Expected keys from `get_plugin_paths` are {}. {}" ).format(expected_keys, " | ".join(msg_items))) return output def _collect_plugin_paths(self, method_name, *args, **kwargs): output = [] for module in self.get_enabled_modules(): # Skip module that do not inherit from `IPluginPaths` if not isinstance(module, IPluginPaths): continue method = getattr(module, method_name) paths = method(*args, **kwargs) if paths: # Convert to list if value is not list if not isinstance(paths, (list, tuple, set)): paths = [paths] output.extend(paths) return output def collect_create_plugin_paths(self, host_name): """Helper to collect creator plugin paths from modules. Args: host_name (str): For which host are creators meant. Returns: list: List of creator plugin paths. """ return self._collect_plugin_paths( "get_create_plugin_paths", host_name ) collect_creator_plugin_paths = collect_create_plugin_paths def collect_load_plugin_paths(self, host_name): """Helper to collect load plugin paths from modules. Args: host_name (str): For which host are load plugins meant. Returns: list: List of load plugin paths. """ return self._collect_plugin_paths( "get_load_plugin_paths", host_name ) def collect_publish_plugin_paths(self, host_name): """Helper to collect load plugin paths from modules. Args: host_name (str): For which host are load plugins meant. Returns: list: List of pyblish plugin paths. """ return self._collect_plugin_paths( "get_publish_plugin_paths", host_name ) def get_host_module(self, host_name): """Find host module by host name. Args: host_name (str): Host name for which is found host module. Returns: OpenPypeModule: Found host module by name. None: There was not found module inheriting IHostAddon which has host name set to passed 'host_name'. """ for module in self.get_enabled_modules(): if ( isinstance(module, IHostAddon) and module.host_name == host_name ): return module return None def get_host_names(self): """List of available host names based on host modules. Returns: Iterable[str]: All available host names based on enabled modules inheriting 'IHostAddon'. """ host_names = { module.host_name for module in self.get_enabled_modules() if isinstance(module, IHostAddon) } return host_names def print_report(self): """Print out report of time spent on modules initialization parts. Reporting is not automated must be implemented for each initialization part separatelly. Reports must be stored to `_report` attribute. Print is skipped if `_report` is empty. Attribute `_report` is dictionary where key is "label" describing the processed part and value is dictionary where key is module's class name and value is time delta of it's processing. It is good idea to add total time delta on processed part under key which is defined in attribute `_report_total_key`. By default has value `"Total"` but use the attribute please. ```javascript { "Initialization": { "FtrackModule": 0.003, ... "Total": 1.003, }, ... } ``` """ if not self._report: return available_col_names = set() for module_names in self._report.values(): available_col_names |= set(module_names.keys()) # Prepare ordered dictionary for columns cols = collections.OrderedDict() # Add module names to first columnt cols["Module name"] = list(sorted( module.__class__.__name__ for module in self.modules if module.__class__.__name__ in available_col_names )) # Add total key (as last module) cols["Module name"].append(self._report_total_key) # Add columns from report for label in self._report.keys(): cols[label] = [] total_module_times = {} for module_name in cols["Module name"]: total_module_times[module_name] = 0 for label, reported in self._report.items(): for module_name in cols["Module name"]: col_time = reported.get(module_name) if col_time is None: cols[label].append("N/A") continue cols[label].append("{:.3f}".format(col_time)) total_module_times[module_name] += col_time # Add to also total column that should sum the row cols[self._report_total_key] = [] for module_name in cols["Module name"]: cols[self._report_total_key].append( "{:.3f}".format(total_module_times[module_name]) ) # Prepare column widths and total row count # - column width is by col_widths = {} total_rows = None for key, values in cols.items(): if total_rows is None: total_rows = 1 + len(values) max_width = len(key) for value in values: value_length = len(value) if value_length > max_width: max_width = value_length col_widths[key] = max_width rows = [] for _idx in range(total_rows): rows.append([]) for key, values in cols.items(): width = col_widths[key] idx = 0 rows[idx].append(key.ljust(width)) for value in values: idx += 1 rows[idx].append(value.ljust(width)) filler_parts = [] for width in col_widths.values(): filler_parts.append(width * "-") filler = "+".join(filler_parts) formatted_rows = [filler] last_row_idx = len(rows) - 1 for idx, row in enumerate(rows): # Add filler before last row if idx == last_row_idx: formatted_rows.append(filler) formatted_rows.append("|".join(row)) # Add filler after first row if idx == 0: formatted_rows.append(filler) # Join rows with newline char and add new line at the end output = "\n".join(formatted_rows) + "\n" print(output) class TrayModulesManager(ModulesManager): # Define order of modules in menu modules_menu_order = ( "user", "ftrack", "kitsu", "muster", "launcher_tool", "avalon", "clockify", "standalonepublish_tool", "traypublish_tool", "log_viewer", "local_settings", "settings" ) def __init__(self): self.log = Logger.get_logger(self.__class__.__name__) self.modules = [] self.modules_by_id = {} self.modules_by_name = {} self._report = {} self.tray_manager = None self.doubleclick_callbacks = {} self.doubleclick_callback = None def add_doubleclick_callback(self, module, callback): """Register doubleclick callbacks on tray icon. Currently there is no way how to determine which is launched. Name of callback can be defined with `doubleclick_callback` attribute. Missing feature how to define default callback. """ callback_name = "_".join([module.name, callback.__name__]) if callback_name not in self.doubleclick_callbacks: self.doubleclick_callbacks[callback_name] = callback if self.doubleclick_callback is None: self.doubleclick_callback = callback_name return self.log.warning(( "Callback with name \"{}\" is already registered." ).format(callback_name)) def initialize(self, tray_manager, tray_menu): self.tray_manager = tray_manager self.initialize_modules() self.tray_init() self.connect_modules() self.tray_menu(tray_menu) def get_enabled_tray_modules(self): output = [] for module in self.modules: if module.enabled and isinstance(module, ITrayModule): output.append(module) return output def restart_tray(self): if self.tray_manager: self.tray_manager.restart() def tray_init(self): report = {} time_start = time.time() prev_start_time = time_start for module in self.get_enabled_tray_modules(): try: module._tray_manager = self.tray_manager module.tray_init() module.tray_initialized = True except Exception: self.log.warning( "Module \"{}\" crashed on `tray_init`.".format( module.name ), exc_info=True ) now = time.time() report[module.__class__.__name__] = now - prev_start_time prev_start_time = now if self._report is not None: report[self._report_total_key] = time.time() - time_start self._report["Tray init"] = report def tray_menu(self, tray_menu): ordered_modules = [] enabled_by_name = { module.name: module for module in self.get_enabled_tray_modules() } for name in self.modules_menu_order: module_by_name = enabled_by_name.pop(name, None) if module_by_name: ordered_modules.append(module_by_name) ordered_modules.extend(enabled_by_name.values()) report = {} time_start = time.time() prev_start_time = time_start for module in ordered_modules: if not module.tray_initialized: continue try: module.tray_menu(tray_menu) except Exception: # Unset initialized mark module.tray_initialized = False self.log.warning( "Module \"{}\" crashed on `tray_menu`.".format( module.name ), exc_info=True ) now = time.time() report[module.__class__.__name__] = now - prev_start_time prev_start_time = now if self._report is not None: report[self._report_total_key] = time.time() - time_start self._report["Tray menu"] = report def start_modules(self): report = {} time_start = time.time() prev_start_time = time_start for module in self.get_enabled_tray_modules(): if not module.tray_initialized: if isinstance(module, ITrayService): module.set_service_failed_icon() continue try: module.tray_start() except Exception: self.log.warning( "Module \"{}\" crashed on `tray_start`.".format( module.name ), exc_info=True ) now = time.time() report[module.__class__.__name__] = now - prev_start_time prev_start_time = now if self._report is not None: report[self._report_total_key] = time.time() - time_start self._report["Modules start"] = report def on_exit(self): for module in self.get_enabled_tray_modules(): if module.tray_initialized: try: module.tray_exit() except Exception: self.log.warning( "Module \"{}\" crashed on `tray_exit`.".format( module.name ), exc_info=True ) def get_module_settings_defs(): """Check loaded addons/modules for existence of their settings definition. Check if OpenPype addon/module as python module has class that inherit from `ModuleSettingsDef` in python module variables (imported in `__init__py`). Returns: list: All valid and not abstract settings definitions from imported openpype addons and modules. """ # Make sure modules are loaded load_modules() import openpype_modules settings_defs = [] log = Logger.get_logger("ModuleSettingsLoad") for raw_module in openpype_modules: for attr_name in dir(raw_module): attr = getattr(raw_module, attr_name) if ( not inspect.isclass(attr) or attr is ModuleSettingsDef or not issubclass(attr, ModuleSettingsDef) ): continue if inspect.isabstract(attr): # Find missing implementations by convention on `abc` module not_implemented = [] for attr_name in dir(attr): attr = getattr(attr, attr_name, None) abs_method = getattr( attr, "__isabstractmethod__", None ) if attr and abs_method: not_implemented.append(attr_name) # Log missing implementations log.warning(( "Skipping abstract Class: {} in module {}." " Missing implementations: {}" ).format( attr_name, raw_module.__name__, ", ".join(not_implemented) )) continue settings_defs.append(attr) return settings_defs @six.add_metaclass(ABCMeta) class BaseModuleSettingsDef: """Definition of settings for OpenPype module or AddOn.""" _id = None @property def id(self): """ID created on initialization. ID should be per created object. Helps to store objects. """ if self._id is None: self._id = uuid4() return self._id @abstractmethod def get_settings_schemas(self, schema_type): """Setting schemas for passed schema type. These are main schemas by dynamic schema keys. If they're using sub schemas or templates they should be loaded with `get_dynamic_schemas`. Returns: dict: Schema by `dynamic_schema` keys. """ pass @abstractmethod def get_dynamic_schemas(self, schema_type): """Settings schemas and templates that can be used anywhere. It is recommended to add prefix specific for addon/module to keys (e.g. "my_addon/real_schema_name"). Returns: dict: Schemas and templates by their keys. """ pass @abstractmethod def get_defaults(self, top_key): """Default values for passed top key. Top keys are (currently) "system_settings" or "project_settings". Should return exactly what was passed with `save_defaults`. Returns: dict: Default values by path to first key in OpenPype defaults. """ pass @abstractmethod def save_defaults(self, top_key, data): """Save default values for passed top key. Top keys are (currently) "system_settings" or "project_settings". Passed data are by path to first key defined in main schemas. """ pass class ModuleSettingsDef(BaseModuleSettingsDef): """Settings definition with separated system and procect settings parts. Reduce conditions that must be checked and adds predefined methods for each case. """ def get_defaults(self, top_key): """Split method into 2 methods by top key.""" if top_key == SYSTEM_SETTINGS_KEY: return self.get_default_system_settings() or {} elif top_key == PROJECT_SETTINGS_KEY: return self.get_default_project_settings() or {} return {} def save_defaults(self, top_key, data): """Split method into 2 methods by top key.""" if top_key == SYSTEM_SETTINGS_KEY: self.save_system_defaults(data) elif top_key == PROJECT_SETTINGS_KEY: self.save_project_defaults(data) def get_settings_schemas(self, schema_type): """Split method into 2 methods by schema type.""" if schema_type == SCHEMA_KEY_SYSTEM_SETTINGS: return self.get_system_settings_schemas() or {} elif schema_type == SCHEMA_KEY_PROJECT_SETTINGS: return self.get_project_settings_schemas() or {} return {} def get_dynamic_schemas(self, schema_type): """Split method into 2 methods by schema type.""" if schema_type == SCHEMA_KEY_SYSTEM_SETTINGS: return self.get_system_dynamic_schemas() or {} elif schema_type == SCHEMA_KEY_PROJECT_SETTINGS: return self.get_project_dynamic_schemas() or {} return {} @abstractmethod def get_system_settings_schemas(self): """Schemas and templates usable in system settings schemas. Returns: dict: Schemas and templates by it's names. Names must be unique across whole OpenPype. """ pass @abstractmethod def get_project_settings_schemas(self): """Schemas and templates usable in project settings schemas. Returns: dict: Schemas and templates by it's names. Names must be unique across whole OpenPype. """ pass @abstractmethod def get_system_dynamic_schemas(self): """System schemas by dynamic schema name. If dynamic schema name is not available in then schema will not used. Returns: dict: Schemas or list of schemas by dynamic schema name. """ pass @abstractmethod def get_project_dynamic_schemas(self): """Project schemas by dynamic schema name. If dynamic schema name is not available in then schema will not used. Returns: dict: Schemas or list of schemas by dynamic schema name. """ pass @abstractmethod def get_default_system_settings(self): """Default system settings values. Returns: dict: Default values by path to first key. """ pass @abstractmethod def get_default_project_settings(self): """Default project settings values. Returns: dict: Default values by path to first key. """ pass @abstractmethod def save_system_defaults(self, data): """Save default system settings values. Passed data are by path to first key defined in main schemas. """ pass @abstractmethod def save_project_defaults(self, data): """Save default project settings values. Passed data are by path to first key defined in main schemas. """ pass class JsonFilesSettingsDef(ModuleSettingsDef): """Preimplemented settings definition using json files and file structure. Expected file structure: ┕ root │ │ # Default values ┝ defaults │ ┝ system_settings.json │ ┕ project_settings.json │ │ # Schemas for `dynamic_template` type ┝ dynamic_schemas │ ┝ system_dynamic_schemas.json │ ┕ project_dynamic_schemas.json │ │ # Schemas that can be used anywhere (enhancement for `dynamic_schemas`) ┕ schemas ┝ system_schemas │ ┝ # Any schema or template files │ ┕ ... ┕ project_schemas ┝ # Any schema or template files ┕ ... Schemas can be loaded with prefix to avoid duplicated schema/template names across all OpenPype addons/modules. Prefix can be defined with class attribute `schema_prefix`. Only think which must be implemented in `get_settings_root_path` which should return directory path to `root` (in structure graph above). """ # Possible way how to define `schemas` prefix schema_prefix = "" @abstractmethod def get_settings_root_path(self): """Directory path where settings and it's schemas are located.""" pass def __init__(self): settings_root_dir = self.get_settings_root_path() defaults_dir = os.path.join( settings_root_dir, "defaults" ) dynamic_schemas_dir = os.path.join( settings_root_dir, "dynamic_schemas" ) schemas_dir = os.path.join( settings_root_dir, "schemas" ) self.system_defaults_filepath = os.path.join( defaults_dir, "system_settings.json" ) self.project_defaults_filepath = os.path.join( defaults_dir, "project_settings.json" ) self.system_dynamic_schemas_filepath = os.path.join( dynamic_schemas_dir, "system_dynamic_schemas.json" ) self.project_dynamic_schemas_filepath = os.path.join( dynamic_schemas_dir, "project_dynamic_schemas.json" ) self.system_schemas_dir = os.path.join( schemas_dir, "system_schemas" ) self.project_schemas_dir = os.path.join( schemas_dir, "project_schemas" ) def _load_json_file_data(self, path): if os.path.exists(path): return load_json_file(path) return {} def get_default_system_settings(self): """Default system settings values. Returns: dict: Default values by path to first key. """ return self._load_json_file_data(self.system_defaults_filepath) def get_default_project_settings(self): """Default project settings values. Returns: dict: Default values by path to first key. """ return self._load_json_file_data(self.project_defaults_filepath) def _save_data_to_filepath(self, path, data): dirpath = os.path.dirname(path) if not os.path.exists(dirpath): os.makedirs(dirpath) with open(path, "w") as file_stream: json.dump(data, file_stream, indent=4) def save_system_defaults(self, data): """Save default system settings values. Passed data are by path to first key defined in main schemas. """ self._save_data_to_filepath(self.system_defaults_filepath, data) def save_project_defaults(self, data): """Save default project settings values. Passed data are by path to first key defined in main schemas. """ self._save_data_to_filepath(self.project_defaults_filepath, data) def get_system_dynamic_schemas(self): """System schemas by dynamic schema name. If dynamic schema name is not available in then schema will not used. Returns: dict: Schemas or list of schemas by dynamic schema name. """ return self._load_json_file_data(self.system_dynamic_schemas_filepath) def get_project_dynamic_schemas(self): """Project schemas by dynamic schema name. If dynamic schema name is not available in then schema will not used. Returns: dict: Schemas or list of schemas by dynamic schema name. """ return self._load_json_file_data(self.project_dynamic_schemas_filepath) def _load_files_from_path(self, path): output = {} if not path or not os.path.exists(path): return output if os.path.isfile(path): filename = os.path.basename(path) basename, ext = os.path.splitext(filename) if ext == ".json": if self.schema_prefix: key = "{}/{}".format(self.schema_prefix, basename) else: key = basename output[key] = self._load_json_file_data(path) return output path = os.path.normpath(path) for root, _, files in os.walk(path, topdown=False): for filename in files: basename, ext = os.path.splitext(filename) if ext != ".json": continue json_path = os.path.join(root, filename) store_key = os.path.join( root.replace(path, ""), basename ).replace("\\", "/") if self.schema_prefix: store_key = "{}/{}".format(self.schema_prefix, store_key) output[store_key] = self._load_json_file_data(json_path) return output def get_system_settings_schemas(self): """Schemas and templates usable in system settings schemas. Returns: dict: Schemas and templates by it's names. Names must be unique across whole OpenPype. """ return self._load_files_from_path(self.system_schemas_dir) def get_project_settings_schemas(self): """Schemas and templates usable in project settings schemas. Returns: dict: Schemas and templates by it's names. Names must be unique across whole OpenPype. """ return self._load_files_from_path(self.project_schemas_dir)