Merge pull request #1421 from ynput/enhancement/1416-loader-actions

Loader: Actions plugin
This commit is contained in:
Jakub Trllo 2025-11-12 18:37:32 +01:00 committed by GitHub
commit d00e35de84
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
34 changed files with 2835 additions and 1229 deletions

View file

@ -185,6 +185,15 @@ class IPluginPaths(AYONInterface):
"""
return self._get_plugin_paths_by_type("inventory")
def get_loader_action_plugin_paths(self) -> list[str]:
"""Receive loader action plugin paths.
Returns:
list[str]: Paths to loader action plugins.
"""
return []
class ITrayAddon(AYONInterface):
"""Addon has special procedures when used in Tray tool.

View file

@ -1,11 +1,4 @@
from enum import Enum
class StrEnum(str, Enum):
"""A string-based Enum class that allows for string comparison."""
def __str__(self) -> str:
return self.value
from ayon_core.lib import StrEnum
class ContextChangeReason(StrEnum):

View file

@ -2,6 +2,7 @@
# flake8: noqa E402
"""AYON lib functions."""
from ._compatibility import StrEnum
from .local_settings import (
IniSettingRegistry,
JSONSettingRegistry,
@ -142,6 +143,8 @@ from .ayon_info import (
terminal = Terminal
__all__ = [
"StrEnum",
"IniSettingRegistry",
"JSONSettingRegistry",
"AYONSecureRegistry",

View file

@ -0,0 +1,8 @@
from enum import Enum
class StrEnum(str, Enum):
"""A string-based Enum class that allows for string comparison."""
def __str__(self) -> str:
return self.value

View file

@ -0,0 +1,62 @@
from .structures import (
ActionForm,
)
from .utils import (
webaction_fields_to_attribute_defs,
)
from .loader import (
LoaderSelectedType,
LoaderActionResult,
LoaderActionItem,
LoaderActionPlugin,
LoaderActionSelection,
LoaderActionsContext,
SelectionEntitiesCache,
LoaderSimpleActionPlugin,
)
from .launcher import (
LauncherAction,
LauncherActionSelection,
discover_launcher_actions,
register_launcher_action,
register_launcher_action_path,
)
from .inventory import (
InventoryAction,
discover_inventory_actions,
register_inventory_action,
register_inventory_action_path,
deregister_inventory_action,
deregister_inventory_action_path,
)
__all__ = (
"ActionForm",
"webaction_fields_to_attribute_defs",
"LoaderSelectedType",
"LoaderActionResult",
"LoaderActionItem",
"LoaderActionPlugin",
"LoaderActionSelection",
"LoaderActionsContext",
"SelectionEntitiesCache",
"LoaderSimpleActionPlugin",
"LauncherAction",
"LauncherActionSelection",
"discover_launcher_actions",
"register_launcher_action",
"register_launcher_action_path",
"InventoryAction",
"discover_inventory_actions",
"register_inventory_action",
"register_inventory_action_path",
"deregister_inventory_action",
"deregister_inventory_action_path",
)

View file

@ -0,0 +1,108 @@
import logging
from ayon_core.pipeline.plugin_discover import (
discover,
register_plugin,
register_plugin_path,
deregister_plugin,
deregister_plugin_path
)
from ayon_core.pipeline.load.utils import get_representation_path_from_context
class InventoryAction:
"""A custom action for the scene inventory tool
If registered the action will be visible in the Right Mouse Button menu
under the submenu "Actions".
"""
label = None
icon = None
color = None
order = 0
log = logging.getLogger("InventoryAction")
log.propagate = True
@staticmethod
def is_compatible(container):
"""Override function in a custom class
This method is specifically used to ensure the action can operate on
the container.
Args:
container(dict): the data of a loaded asset, see host.ls()
Returns:
bool
"""
return bool(container.get("objectName"))
def process(self, containers):
"""Override function in a custom class
This method will receive all containers even those which are
incompatible. It is advised to create a small filter along the lines
of this example:
valid_containers = filter(self.is_compatible(c) for c in containers)
The return value will need to be a True-ish value to trigger
the data_changed signal in order to refresh the view.
You can return a list of container names to trigger GUI to select
treeview items.
You can return a dict to carry extra GUI options. For example:
{
"objectNames": [container names...],
"options": {"mode": "toggle",
"clear": False}
}
Currently workable GUI options are:
- clear (bool): Clear current selection before selecting by action.
Default `True`.
- mode (str): selection mode, use one of these:
"select", "deselect", "toggle". Default is "select".
Args:
containers (list): list of dictionaries
Return:
bool, list or dict
"""
return True
@classmethod
def filepath_from_context(cls, context):
return get_representation_path_from_context(context)
def discover_inventory_actions():
actions = discover(InventoryAction)
filtered_actions = []
for action in actions:
if action is not InventoryAction:
filtered_actions.append(action)
return filtered_actions
def register_inventory_action(plugin):
return register_plugin(InventoryAction, plugin)
def deregister_inventory_action(plugin):
deregister_plugin(InventoryAction, plugin)
def register_inventory_action_path(path):
return register_plugin_path(InventoryAction, path)
def deregister_inventory_action_path(path):
return deregister_plugin_path(InventoryAction, path)

View file

@ -8,12 +8,8 @@ from ayon_core.pipeline.plugin_discover import (
discover,
register_plugin,
register_plugin_path,
deregister_plugin,
deregister_plugin_path
)
from .load.utils import get_representation_path_from_context
class LauncherActionSelection:
"""Object helper to pass selection to actions.
@ -390,79 +386,6 @@ class LauncherAction(object):
pass
class InventoryAction(object):
"""A custom action for the scene inventory tool
If registered the action will be visible in the Right Mouse Button menu
under the submenu "Actions".
"""
label = None
icon = None
color = None
order = 0
log = logging.getLogger("InventoryAction")
log.propagate = True
@staticmethod
def is_compatible(container):
"""Override function in a custom class
This method is specifically used to ensure the action can operate on
the container.
Args:
container(dict): the data of a loaded asset, see host.ls()
Returns:
bool
"""
return bool(container.get("objectName"))
def process(self, containers):
"""Override function in a custom class
This method will receive all containers even those which are
incompatible. It is advised to create a small filter along the lines
of this example:
valid_containers = filter(self.is_compatible(c) for c in containers)
The return value will need to be a True-ish value to trigger
the data_changed signal in order to refresh the view.
You can return a list of container names to trigger GUI to select
treeview items.
You can return a dict to carry extra GUI options. For example:
{
"objectNames": [container names...],
"options": {"mode": "toggle",
"clear": False}
}
Currently workable GUI options are:
- clear (bool): Clear current selection before selecting by action.
Default `True`.
- mode (str): selection mode, use one of these:
"select", "deselect", "toggle". Default is "select".
Args:
containers (list): list of dictionaries
Return:
bool, list or dict
"""
return True
@classmethod
def filepath_from_context(cls, context):
return get_representation_path_from_context(context)
# Launcher action
def discover_launcher_actions():
return discover(LauncherAction)
@ -473,30 +396,3 @@ def register_launcher_action(plugin):
def register_launcher_action_path(path):
return register_plugin_path(LauncherAction, path)
# Inventory action
def discover_inventory_actions():
actions = discover(InventoryAction)
filtered_actions = []
for action in actions:
if action is not InventoryAction:
filtered_actions.append(action)
return filtered_actions
def register_inventory_action(plugin):
return register_plugin(InventoryAction, plugin)
def deregister_inventory_action(plugin):
deregister_plugin(InventoryAction, plugin)
def register_inventory_action_path(path):
return register_plugin_path(InventoryAction, path)
def deregister_inventory_action_path(path):
return deregister_plugin_path(InventoryAction, path)

View file

@ -0,0 +1,864 @@
"""API for actions for loader tool.
Even though the api is meant for the loader tool, the api should be possible
to use in a standalone way out of the loader tool.
To use add actions, make sure your addon does inherit from
'IPluginPaths' and implements 'get_loader_action_plugin_paths' which
returns paths to python files with loader actions.
The plugin is used to collect available actions for the given context and to
execute them. Selection is defined with 'LoaderActionSelection' object
that also contains a cache of entities and project anatomy.
Implementing 'get_action_items' allows the plugin to define what actions
are shown and available for the selection. Because for a single selection
can be shown multiple actions with the same action identifier, the action
items also have 'data' attribute which can be used to store additional
data for the action (they have to be json-serializable).
The action is triggered by calling the 'execute_action' method. Which takes
the action identifier, the selection, the additional data from the action
item and form values from the form if any.
Using 'LoaderActionResult' as the output of 'execute_action' can trigger to
show a message in UI or to show an additional form ('ActionForm')
which would retrigger the action with the values from the form on
submitting. That allows handling of multistep actions.
It is also recommended that the plugin does override the 'identifier'
attribute. The identifier has to be unique across all plugins.
Class name is used by default.
The selection wrapper currently supports the following types of entity types:
- version
- representation
It is planned to add 'folder' and 'task' selection in the future.
NOTE: It is possible to trigger 'execute_action' without ever calling
'get_action_items', that can be handy in automations.
The whole logic is wrapped into 'LoaderActionsContext'. It takes care of
the discovery of plugins and wraps the collection and execution of
action items. Method 'execute_action' on context also requires plugin
identifier.
The flow of the logic is (in the loader tool):
1. User selects entities in the UI.
2. Right-click the selected entities.
3. Use 'LoaderActionsContext' to collect items using 'get_action_items'.
4. Show a menu (with submenus) in the UI.
5. If a user selects an action, the action is triggered using
'execute_action'.
5a. If the action returns 'LoaderActionResult', show a 'message' if it is
filled and show a form dialog if 'form' is filled.
5b. If the user submitted the form, trigger the action again with the
values from the form and repeat from 5a.
"""
from __future__ import annotations
import os
import collections
import copy
import logging
from abc import ABC, abstractmethod
import typing
from typing import Optional, Any, Callable
from dataclasses import dataclass
import ayon_api
from ayon_core import AYON_CORE_ROOT
from ayon_core.lib import StrEnum, Logger
from ayon_core.host import AbstractHost
from ayon_core.addon import AddonsManager, IPluginPaths
from ayon_core.settings import get_studio_settings, get_project_settings
from ayon_core.pipeline import Anatomy
from ayon_core.pipeline.plugin_discover import discover_plugins
from .structures import ActionForm
if typing.TYPE_CHECKING:
from typing import Union
DataBaseType = Union[str, int, float, bool]
DataType = dict[str, Union[DataBaseType, list[DataBaseType]]]
_PLACEHOLDER = object()
class LoaderSelectedType(StrEnum):
"""Selected entity type."""
# folder = "folder"
# task = "task"
version = "version"
representation = "representation"
class SelectionEntitiesCache:
"""Cache of entities used as helper in the selection wrapper.
It is possible to get entities based on ids with helper methods to get
entities, their parents or their children's entities.
The goal is to avoid multiple API calls for the same entity in multiple
action plugins.
The cache is based on the selected project. Entities are fetched
if are not in cache yet.
"""
def __init__(
self,
project_name: str,
project_entity: Optional[dict[str, Any]] = None,
folders_by_id: Optional[dict[str, dict[str, Any]]] = None,
tasks_by_id: Optional[dict[str, dict[str, Any]]] = None,
products_by_id: Optional[dict[str, dict[str, Any]]] = None,
versions_by_id: Optional[dict[str, dict[str, Any]]] = None,
representations_by_id: Optional[dict[str, dict[str, Any]]] = None,
task_ids_by_folder_id: Optional[dict[str, set[str]]] = None,
product_ids_by_folder_id: Optional[dict[str, set[str]]] = None,
version_ids_by_product_id: Optional[dict[str, set[str]]] = None,
representation_ids_by_version_id: Optional[dict[str, set[str]]] = None,
):
self._project_name = project_name
self._project_entity = project_entity
self._folders_by_id = folders_by_id or {}
self._tasks_by_id = tasks_by_id or {}
self._products_by_id = products_by_id or {}
self._versions_by_id = versions_by_id or {}
self._representations_by_id = representations_by_id or {}
self._task_ids_by_folder_id = task_ids_by_folder_id or {}
self._product_ids_by_folder_id = product_ids_by_folder_id or {}
self._version_ids_by_product_id = version_ids_by_product_id or {}
self._representation_ids_by_version_id = (
representation_ids_by_version_id or {}
)
def get_project(self) -> dict[str, Any]:
"""Get project entity"""
if self._project_entity is None:
self._project_entity = ayon_api.get_project(self._project_name)
return copy.deepcopy(self._project_entity)
def get_folders(
self, folder_ids: set[str]
) -> list[dict[str, Any]]:
return self._get_entities(
folder_ids,
self._folders_by_id,
"folder_ids",
ayon_api.get_folders,
)
def get_tasks(
self, task_ids: set[str]
) -> list[dict[str, Any]]:
return self._get_entities(
task_ids,
self._tasks_by_id,
"task_ids",
ayon_api.get_tasks,
)
def get_products(
self, product_ids: set[str]
) -> list[dict[str, Any]]:
return self._get_entities(
product_ids,
self._products_by_id,
"product_ids",
ayon_api.get_products,
)
def get_versions(
self, version_ids: set[str]
) -> list[dict[str, Any]]:
return self._get_entities(
version_ids,
self._versions_by_id,
"version_ids",
ayon_api.get_versions,
)
def get_representations(
self, representation_ids: set[str]
) -> list[dict[str, Any]]:
return self._get_entities(
representation_ids,
self._representations_by_id,
"representation_ids",
ayon_api.get_representations,
)
def get_folders_tasks(
self, folder_ids: set[str]
) -> list[dict[str, Any]]:
task_ids = self._fill_parent_children_ids(
folder_ids,
"folderId",
"folder_ids",
self._task_ids_by_folder_id,
ayon_api.get_tasks,
)
return self.get_tasks(task_ids)
def get_folders_products(
self, folder_ids: set[str]
) -> list[dict[str, Any]]:
product_ids = self._get_folders_products_ids(folder_ids)
return self.get_products(product_ids)
def get_tasks_versions(
self, task_ids: set[str]
) -> list[dict[str, Any]]:
folder_ids = {
task["folderId"]
for task in self.get_tasks(task_ids)
}
product_ids = self._get_folders_products_ids(folder_ids)
output = []
for version in self.get_products_versions(product_ids):
task_id = version["taskId"]
if task_id in task_ids:
output.append(version)
return output
def get_products_versions(
self, product_ids: set[str]
) -> list[dict[str, Any]]:
version_ids = self._fill_parent_children_ids(
product_ids,
"productId",
"product_ids",
self._version_ids_by_product_id,
ayon_api.get_versions,
)
return self.get_versions(version_ids)
def get_versions_representations(
self, version_ids: set[str]
) -> list[dict[str, Any]]:
repre_ids = self._fill_parent_children_ids(
version_ids,
"versionId",
"version_ids",
self._representation_ids_by_version_id,
ayon_api.get_representations,
)
return self.get_representations(repre_ids)
def get_tasks_folders(self, task_ids: set[str]) -> list[dict[str, Any]]:
folder_ids = {
task["folderId"]
for task in self.get_tasks(task_ids)
}
return self.get_folders(folder_ids)
def get_products_folders(
self, product_ids: set[str]
) -> list[dict[str, Any]]:
folder_ids = {
product["folderId"]
for product in self.get_products(product_ids)
}
return self.get_folders(folder_ids)
def get_versions_products(
self, version_ids: set[str]
) -> list[dict[str, Any]]:
product_ids = {
version["productId"]
for version in self.get_versions(version_ids)
}
return self.get_products(product_ids)
def get_versions_tasks(
self, version_ids: set[str]
) -> list[dict[str, Any]]:
task_ids = {
version["taskId"]
for version in self.get_versions(version_ids)
if version["taskId"]
}
return self.get_tasks(task_ids)
def get_representations_versions(
self, representation_ids: set[str]
) -> list[dict[str, Any]]:
version_ids = {
repre["versionId"]
for repre in self.get_representations(representation_ids)
}
return self.get_versions(version_ids)
def _get_folders_products_ids(self, folder_ids: set[str]) -> set[str]:
return self._fill_parent_children_ids(
folder_ids,
"folderId",
"folder_ids",
self._product_ids_by_folder_id,
ayon_api.get_products,
)
def _fill_parent_children_ids(
self,
entity_ids: set[str],
parent_key: str,
filter_attr: str,
parent_mapping: dict[str, set[str]],
getter: Callable,
) -> set[str]:
if not entity_ids:
return set()
children_ids = set()
missing_ids = set()
for entity_id in entity_ids:
_children_ids = parent_mapping.get(entity_id)
if _children_ids is None:
missing_ids.add(entity_id)
else:
children_ids.update(_children_ids)
if missing_ids:
entities_by_parent_id = collections.defaultdict(set)
for entity in getter(
self._project_name,
fields={"id", parent_key},
**{filter_attr: missing_ids},
):
child_id = entity["id"]
children_ids.add(child_id)
entities_by_parent_id[entity[parent_key]].add(child_id)
for entity_id in missing_ids:
parent_mapping[entity_id] = entities_by_parent_id[entity_id]
return children_ids
def _get_entities(
self,
entity_ids: set[str],
cache_var: dict[str, Any],
filter_arg: str,
getter: Callable,
) -> list[dict[str, Any]]:
if not entity_ids:
return []
output = []
missing_ids: set[str] = set()
for entity_id in entity_ids:
entity = cache_var.get(entity_id)
if entity_id not in cache_var:
missing_ids.add(entity_id)
cache_var[entity_id] = None
elif entity:
output.append(entity)
if missing_ids:
for entity in getter(
self._project_name,
**{filter_arg: missing_ids}
):
output.append(entity)
cache_var[entity["id"]] = entity
return output
class LoaderActionSelection:
"""Selection of entities for loader actions.
Selection tells action plugins what exactly is selected in the tool and
which ids.
Contains entity cache which can be used to get entities by their ids. Or
to get project settings and anatomy.
"""
def __init__(
self,
project_name: str,
selected_ids: set[str],
selected_type: LoaderSelectedType,
*,
project_anatomy: Optional[Anatomy] = None,
project_settings: Optional[dict[str, Any]] = None,
entities_cache: Optional[SelectionEntitiesCache] = None,
):
self._project_name = project_name
self._selected_ids = selected_ids
self._selected_type = selected_type
self._project_anatomy = project_anatomy
self._project_settings = project_settings
if entities_cache is None:
entities_cache = SelectionEntitiesCache(project_name)
self._entities_cache = entities_cache
def get_entities_cache(self) -> SelectionEntitiesCache:
return self._entities_cache
def get_project_name(self) -> str:
return self._project_name
def get_selected_ids(self) -> set[str]:
return set(self._selected_ids)
def get_selected_type(self) -> str:
return self._selected_type
def get_project_settings(self) -> dict[str, Any]:
if self._project_settings is None:
self._project_settings = get_project_settings(self._project_name)
return copy.deepcopy(self._project_settings)
def get_project_anatomy(self) -> Anatomy:
if self._project_anatomy is None:
self._project_anatomy = Anatomy(
self._project_name,
project_entity=self.get_entities_cache().get_project(),
)
return self._project_anatomy
project_name = property(get_project_name)
selected_ids = property(get_selected_ids)
selected_type = property(get_selected_type)
project_settings = property(get_project_settings)
project_anatomy = property(get_project_anatomy)
entities = property(get_entities_cache)
# --- Helper methods ---
def versions_selected(self) -> bool:
"""Selected entity type is version.
Returns:
bool: True if selected entity type is version.
"""
return self._selected_type == LoaderSelectedType.version
def representations_selected(self) -> bool:
"""Selected entity type is representation.
Returns:
bool: True if selected entity type is representation.
"""
return self._selected_type == LoaderSelectedType.representation
def get_selected_version_entities(self) -> list[dict[str, Any]]:
"""Retrieve selected version entities.
An empty list is returned if 'version' is not the selected
entity type.
Returns:
list[dict[str, Any]]: List of selected version entities.
"""
if self.versions_selected():
return self.entities.get_versions(self.selected_ids)
return []
def get_selected_representation_entities(self) -> list[dict[str, Any]]:
"""Retrieve selected representation entities.
An empty list is returned if 'representation' is not the selected
entity type.
Returns:
list[dict[str, Any]]: List of selected representation entities.
"""
if self.representations_selected():
return self.entities.get_representations(self.selected_ids)
return []
@dataclass
class LoaderActionItem:
"""Item of loader action.
Action plugins return these items as possible actions to run for a given
context.
Because the action item can be related to a specific entity
and not the whole selection, they also have to define the entity type
and ids to be executed on.
Attributes:
label (str): Text shown in UI.
order (int): Order of the action in UI.
group_label (Optional[str]): Label of the group to which the action
belongs.
icon (Optional[dict[str, Any]): Icon definition.
data (Optional[DataType]): Action item data.
identifier (Optional[str]): Identifier of the plugin which
created the action item. Is filled automatically. Is not changed
if is filled -> can lead to different plugin.
"""
label: str
order: int = 0
group_label: Optional[str] = None
icon: Optional[dict[str, Any]] = None
data: Optional[DataType] = None
# Is filled automatically
identifier: str = None
@dataclass
class LoaderActionResult:
"""Result of loader action execution.
Attributes:
message (Optional[str]): Message to show in UI.
success (bool): If the action was successful. Affects color of
the message.
form (Optional[ActionForm]): Form to show in UI.
form_values (Optional[dict[str, Any]]): Values for the form. Can be
used if the same form is re-shown e.g. because a user forgot to
fill a required field.
"""
message: Optional[str] = None
success: bool = True
form: Optional[ActionForm] = None
form_values: Optional[dict[str, Any]] = None
def to_json_data(self) -> dict[str, Any]:
form = self.form
if form is not None:
form = form.to_json_data()
return {
"message": self.message,
"success": self.success,
"form": form,
"form_values": self.form_values,
}
@classmethod
def from_json_data(cls, data: dict[str, Any]) -> "LoaderActionResult":
form = data["form"]
if form is not None:
data["form"] = ActionForm.from_json_data(form)
return LoaderActionResult(**data)
class LoaderActionPlugin(ABC):
"""Plugin for loader actions.
Plugin is responsible for getting action items and executing actions.
"""
_log: Optional[logging.Logger] = None
enabled: bool = True
def __init__(self, context: "LoaderActionsContext") -> None:
self._context = context
self.apply_settings(context.get_studio_settings())
def apply_settings(self, studio_settings: dict[str, Any]) -> None:
"""Apply studio settings to the plugin.
Args:
studio_settings (dict[str, Any]): Studio settings.
"""
pass
@property
def log(self) -> logging.Logger:
if self._log is None:
self._log = Logger.get_logger(self.__class__.__name__)
return self._log
@property
def identifier(self) -> str:
"""Identifier of the plugin.
Returns:
str: Plugin identifier.
"""
return self.__class__.__name__
@property
def host_name(self) -> Optional[str]:
"""Name of the current host."""
return self._context.get_host_name()
@abstractmethod
def get_action_items(
self, selection: LoaderActionSelection
) -> list[LoaderActionItem]:
"""Action items for the selection.
Args:
selection (LoaderActionSelection): Selection.
Returns:
list[LoaderActionItem]: Action items.
"""
pass
@abstractmethod
def execute_action(
self,
selection: LoaderActionSelection,
data: Optional[DataType],
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
"""Execute an action.
Args:
selection (LoaderActionSelection): Selection wrapper. Can be used
to get entities or get context of original selection.
data (Optional[DataType]): Additional action item data.
form_values (dict[str, Any]): Attribute values.
Returns:
Optional[LoaderActionResult]: Result of the action execution.
"""
pass
class LoaderActionsContext:
"""Wrapper for loader actions and their logic.
Takes care about the public api of loader actions and internal logic like
discovery and initialization of plugins.
"""
def __init__(
self,
studio_settings: Optional[dict[str, Any]] = None,
addons_manager: Optional[AddonsManager] = None,
host: Optional[AbstractHost] = _PLACEHOLDER,
) -> None:
self._log = Logger.get_logger(self.__class__.__name__)
self._addons_manager = addons_manager
self._host = host
# Attributes that are re-cached on reset
self._studio_settings = studio_settings
self._plugins = None
def reset(
self, studio_settings: Optional[dict[str, Any]] = None
) -> None:
"""Reset context cache.
Reset plugins and studio settings to reload them.
Notes:
Does not reset the cache of AddonsManger because there should not
be a reason to do so.
"""
self._studio_settings = studio_settings
self._plugins = None
def get_addons_manager(self) -> AddonsManager:
if self._addons_manager is None:
self._addons_manager = AddonsManager(
settings=self.get_studio_settings()
)
return self._addons_manager
def get_host(self) -> Optional[AbstractHost]:
"""Get current host integration.
Returns:
Optional[AbstractHost]: Host integration. Can be None if host
integration is not registered -> probably not used in the
host integration process.
"""
if self._host is _PLACEHOLDER:
from ayon_core.pipeline import registered_host
self._host = registered_host()
return self._host
def get_host_name(self) -> Optional[str]:
host = self.get_host()
if host is None:
return None
return host.name
def get_studio_settings(self) -> dict[str, Any]:
if self._studio_settings is None:
self._studio_settings = get_studio_settings()
return copy.deepcopy(self._studio_settings)
def get_action_items(
self, selection: LoaderActionSelection
) -> list[LoaderActionItem]:
"""Collect action items from all plugins for given selection.
Args:
selection (LoaderActionSelection): Selection wrapper.
"""
output = []
for plugin_id, plugin in self._get_plugins().items():
try:
for action_item in plugin.get_action_items(selection):
if action_item.identifier is None:
action_item.identifier = plugin_id
output.append(action_item)
except Exception:
self._log.warning(
"Failed to get action items for"
f" plugin '{plugin.identifier}'",
exc_info=True,
)
return output
def execute_action(
self,
identifier: str,
selection: LoaderActionSelection,
data: Optional[DataType],
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
"""Trigger action execution.
Args:
identifier (str): Identifier of the plugin.
selection (LoaderActionSelection): Selection wrapper. Can be used
to get what is selected in UI and to get access to entity
cache.
data (Optional[DataType]): Additional action item data.
form_values (dict[str, Any]): Form values related to action.
Usually filled if action returned response with form.
"""
plugins_by_id = self._get_plugins()
plugin = plugins_by_id[identifier]
return plugin.execute_action(
selection,
data,
form_values,
)
def _get_plugins(self) -> dict[str, LoaderActionPlugin]:
if self._plugins is None:
addons_manager = self.get_addons_manager()
all_paths = [
os.path.join(AYON_CORE_ROOT, "plugins", "loader")
]
for addon in addons_manager.addons:
if not isinstance(addon, IPluginPaths):
continue
paths = addon.get_loader_action_plugin_paths()
if paths:
all_paths.extend(paths)
result = discover_plugins(LoaderActionPlugin, all_paths)
result.log_report()
plugins = {}
for cls in result.plugins:
try:
plugin = cls(self)
if not plugin.enabled:
continue
plugin_id = plugin.identifier
if plugin_id not in plugins:
plugins[plugin_id] = plugin
continue
self._log.warning(
f"Duplicated plugins identifier found '{plugin_id}'."
)
except Exception:
self._log.warning(
f"Failed to initialize plugin '{cls.__name__}'",
exc_info=True,
)
self._plugins = plugins
return self._plugins
class LoaderSimpleActionPlugin(LoaderActionPlugin):
"""Simple action plugin.
This action will show exactly one action item defined by attributes
on the class.
Attributes:
label: Label of the action item.
order: Order of the action item.
group_label: Label of the group to which the action belongs.
icon: Icon definition shown next to label.
"""
label: Optional[str] = None
order: int = 0
group_label: Optional[str] = None
icon: Optional[dict[str, Any]] = None
@abstractmethod
def is_compatible(self, selection: LoaderActionSelection) -> bool:
"""Check if plugin is compatible with selection.
Args:
selection (LoaderActionSelection): Selection information.
Returns:
bool: True if plugin is compatible with selection.
"""
pass
@abstractmethod
def execute_simple_action(
self,
selection: LoaderActionSelection,
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
"""Process action based on selection.
Args:
selection (LoaderActionSelection): Selection information.
form_values (dict[str, Any]): Values from a form if there are any.
Returns:
Optional[LoaderActionResult]: Result of the action.
"""
pass
def get_action_items(
self, selection: LoaderActionSelection
) -> list[LoaderActionItem]:
if self.is_compatible(selection):
label = self.label or self.__class__.__name__
return [
LoaderActionItem(
label=label,
order=self.order,
group_label=self.group_label,
icon=self.icon,
)
]
return []
def execute_action(
self,
selection: LoaderActionSelection,
data: Optional[DataType],
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
return self.execute_simple_action(selection, form_values)

View file

@ -0,0 +1,60 @@
from dataclasses import dataclass
from typing import Optional, Any
from ayon_core.lib.attribute_definitions import (
AbstractAttrDef,
serialize_attr_defs,
deserialize_attr_defs,
)
@dataclass
class ActionForm:
"""Form for loader action.
If an action needs to collect information from a user before or during of
the action execution, it can return a response with a form. When the
form is submitted, a new execution of the action is triggered.
It is also possible to just show a label message without the submit
button to make sure the user has seen the message.
Attributes:
title (str): Title of the form -> title of the window.
fields (list[AbstractAttrDef]): Fields of the form.
submit_label (Optional[str]): Label of the submit button. Is hidden
if is set to None.
submit_icon (Optional[dict[str, Any]]): Icon definition of the submit
button.
cancel_label (Optional[str]): Label of the cancel button. Is hidden
if is set to None. User can still close the window tho.
cancel_icon (Optional[dict[str, Any]]): Icon definition of the cancel
button.
"""
title: str
fields: list[AbstractAttrDef]
submit_label: Optional[str] = "Submit"
submit_icon: Optional[dict[str, Any]] = None
cancel_label: Optional[str] = "Cancel"
cancel_icon: Optional[dict[str, Any]] = None
def to_json_data(self) -> dict[str, Any]:
fields = self.fields
if fields is not None:
fields = serialize_attr_defs(fields)
return {
"title": self.title,
"fields": fields,
"submit_label": self.submit_label,
"submit_icon": self.submit_icon,
"cancel_label": self.cancel_label,
"cancel_icon": self.cancel_icon,
}
@classmethod
def from_json_data(cls, data: dict[str, Any]) -> "ActionForm":
fields = data["fields"]
if fields is not None:
data["fields"] = deserialize_attr_defs(fields)
return cls(**data)

View file

@ -0,0 +1,100 @@
from __future__ import annotations
import uuid
from typing import Any
from ayon_core.lib.attribute_definitions import (
AbstractAttrDef,
UILabelDef,
BoolDef,
TextDef,
NumberDef,
EnumDef,
HiddenDef,
)
def webaction_fields_to_attribute_defs(
fields: list[dict[str, Any]]
) -> list[AbstractAttrDef]:
"""Helper function to convert fields definition from webactions form.
Convert form fields to attribute definitions to be able to display them
using attribute definitions.
Args:
fields (list[dict[str, Any]]): Fields from webaction form.
Returns:
list[AbstractAttrDef]: Converted attribute definitions.
"""
attr_defs = []
for field in fields:
field_type = field["type"]
attr_def = None
if field_type == "label":
label = field.get("value")
if label is None:
label = field.get("text")
attr_def = UILabelDef(
label, key=uuid.uuid4().hex
)
elif field_type == "boolean":
value = field["value"]
if isinstance(value, str):
value = value.lower() == "true"
attr_def = BoolDef(
field["name"],
default=value,
label=field.get("label"),
)
elif field_type == "text":
attr_def = TextDef(
field["name"],
default=field.get("value"),
label=field.get("label"),
placeholder=field.get("placeholder"),
multiline=field.get("multiline", False),
regex=field.get("regex"),
# syntax=field["syntax"],
)
elif field_type in ("integer", "float"):
value = field.get("value")
if isinstance(value, str):
if field_type == "integer":
value = int(value)
else:
value = float(value)
attr_def = NumberDef(
field["name"],
default=value,
label=field.get("label"),
decimals=0 if field_type == "integer" else 5,
# placeholder=field.get("placeholder"),
minimum=field.get("min"),
maximum=field.get("max"),
)
elif field_type in ("select", "multiselect"):
attr_def = EnumDef(
field["name"],
items=field["options"],
default=field.get("value"),
label=field.get("label"),
multiselection=field_type == "multiselect",
)
elif field_type == "hidden":
attr_def = HiddenDef(
field["name"],
default=field.get("value"),
)
if attr_def is None:
print(f"Unknown config field type: {field_type}")
attr_def = UILabelDef(
f"Unknown field type '{field_type}",
key=uuid.uuid4().hex
)
attr_defs.append(attr_def)
return attr_defs

View file

@ -1,6 +1,9 @@
from __future__ import annotations
import os
import inspect
import traceback
from typing import Optional
from ayon_core.lib import Logger
from ayon_core.lib.python_module_tools import (
@ -96,6 +99,70 @@ class DiscoverResult:
log.info(report)
def discover_plugins(
base_class: type,
paths: Optional[list[str]] = None,
classes: Optional[list[type]] = None,
ignored_classes: Optional[list[type]] = None,
allow_duplicates: bool = True,
):
"""Find and return subclasses of `superclass`
Args:
base_class (type): Class which determines discovered subclasses.
paths (Optional[list[str]]): List of paths to look for plug-ins.
classes (Optional[list[str]]): List of classes to filter.
ignored_classes (list[type]): List of classes that won't be added to
the output plugins.
allow_duplicates (bool): Validate class name duplications.
Returns:
DiscoverResult: Object holding successfully
discovered plugins, ignored plugins, plugins with missing
abstract implementation and duplicated plugin.
"""
ignored_classes = ignored_classes or []
paths = paths or []
classes = classes or []
result = DiscoverResult(base_class)
all_plugins = list(classes)
for path in paths:
modules, crashed = modules_from_path(path)
for (filepath, exc_info) in crashed:
result.crashed_file_paths[filepath] = exc_info
for item in modules:
filepath, module = item
result.add_module(module)
all_plugins.extend(classes_from_module(base_class, module))
if base_class not in ignored_classes:
ignored_classes.append(base_class)
plugin_names = set()
for cls in all_plugins:
if cls in ignored_classes:
result.ignored_plugins.add(cls)
continue
if inspect.isabstract(cls):
result.abstract_plugins.append(cls)
continue
if not allow_duplicates:
class_name = cls.__name__
if class_name in plugin_names:
result.duplicated_plugins.append(cls)
continue
plugin_names.add(class_name)
result.plugins.append(cls)
return result
class PluginDiscoverContext(object):
"""Store and discover registered types nad registered paths to types.
@ -141,58 +208,17 @@ class PluginDiscoverContext(object):
Union[DiscoverResult, list[Any]]: Object holding successfully
discovered plugins, ignored plugins, plugins with missing
abstract implementation and duplicated plugin.
"""
if not ignore_classes:
ignore_classes = []
result = DiscoverResult(superclass)
plugin_names = set()
registered_classes = self._registered_plugins.get(superclass) or []
registered_paths = self._registered_plugin_paths.get(superclass) or []
for cls in registered_classes:
if cls is superclass or cls in ignore_classes:
result.ignored_plugins.add(cls)
continue
if inspect.isabstract(cls):
result.abstract_plugins.append(cls)
continue
class_name = cls.__name__
if class_name in plugin_names:
result.duplicated_plugins.append(cls)
continue
plugin_names.add(class_name)
result.plugins.append(cls)
# Include plug-ins from registered paths
for path in registered_paths:
modules, crashed = modules_from_path(path)
for item in crashed:
filepath, exc_info = item
result.crashed_file_paths[filepath] = exc_info
for item in modules:
filepath, module = item
result.add_module(module)
for cls in classes_from_module(superclass, module):
if cls is superclass or cls in ignore_classes:
result.ignored_plugins.add(cls)
continue
if inspect.isabstract(cls):
result.abstract_plugins.append(cls)
continue
if not allow_duplicates:
class_name = cls.__name__
if class_name in plugin_names:
result.duplicated_plugins.append(cls)
continue
plugin_names.add(class_name)
result.plugins.append(cls)
result = discover_plugins(
superclass,
paths=registered_paths,
classes=registered_classes,
ignored_classes=ignore_classes,
allow_duplicates=allow_duplicates,
)
# Store in memory last result to keep in memory loaded modules
self._last_discovered_results[superclass] = result

View file

@ -1,34 +0,0 @@
from ayon_core.style import get_default_entity_icon_color
from ayon_core.pipeline import load
class CopyFile(load.LoaderPlugin):
"""Copy the published file to be pasted at the desired location"""
representations = {"*"}
product_types = {"*"}
label = "Copy File"
order = 10
icon = "copy"
color = get_default_entity_icon_color()
def load(self, context, name=None, namespace=None, data=None):
path = self.filepath_from_context(context)
self.log.info("Added copy to clipboard: {0}".format(path))
self.copy_file_to_clipboard(path)
@staticmethod
def copy_file_to_clipboard(path):
from qtpy import QtCore, QtWidgets
clipboard = QtWidgets.QApplication.clipboard()
assert clipboard, "Must have running QApplication instance"
# Build mime data for clipboard
data = QtCore.QMimeData()
url = QtCore.QUrl.fromLocalFile(path)
data.setUrls([url])
# Set to Clipboard
clipboard.setMimeData(data)

View file

@ -1,29 +0,0 @@
import os
from ayon_core.pipeline import load
class CopyFilePath(load.LoaderPlugin):
"""Copy published file path to clipboard"""
representations = {"*"}
product_types = {"*"}
label = "Copy File Path"
order = 20
icon = "clipboard"
color = "#999999"
def load(self, context, name=None, namespace=None, data=None):
path = self.filepath_from_context(context)
self.log.info("Added file path to clipboard: {0}".format(path))
self.copy_path_to_clipboard(path)
@staticmethod
def copy_path_to_clipboard(path):
from qtpy import QtWidgets
clipboard = QtWidgets.QApplication.clipboard()
assert clipboard, "Must have running QApplication instance"
# Set to Clipboard
clipboard.setText(os.path.normpath(path))

View file

@ -1,477 +0,0 @@
import collections
import os
import uuid
from typing import List, Dict, Any
import clique
import ayon_api
from ayon_api.operations import OperationsSession
import qargparse
from qtpy import QtWidgets, QtCore
from ayon_core import style
from ayon_core.lib import format_file_size
from ayon_core.pipeline import load, Anatomy
from ayon_core.pipeline.load import (
get_representation_path_with_anatomy,
InvalidRepresentationContext,
)
class DeleteOldVersions(load.ProductLoaderPlugin):
"""Deletes specific number of old version"""
is_multiple_contexts_compatible = True
sequence_splitter = "__sequence_splitter__"
representations = ["*"]
product_types = {"*"}
tool_names = ["library_loader"]
label = "Delete Old Versions"
order = 35
icon = "trash"
color = "#d8d8d8"
options = [
qargparse.Integer(
"versions_to_keep", default=2, min=0, help="Versions to keep:"
),
qargparse.Boolean(
"remove_publish_folder", help="Remove publish folder:"
)
]
requires_confirmation = True
def delete_whole_dir_paths(self, dir_paths, delete=True):
size = 0
for dir_path in dir_paths:
# Delete all files and folders in dir path
for root, dirs, files in os.walk(dir_path, topdown=False):
for name in files:
file_path = os.path.join(root, name)
size += os.path.getsize(file_path)
if delete:
os.remove(file_path)
self.log.debug("Removed file: {}".format(file_path))
for name in dirs:
if delete:
os.rmdir(os.path.join(root, name))
if not delete:
continue
# Delete even the folder and it's parents folders if they are empty
while True:
if not os.path.exists(dir_path):
dir_path = os.path.dirname(dir_path)
continue
if len(os.listdir(dir_path)) != 0:
break
os.rmdir(os.path.join(dir_path))
return size
def path_from_representation(self, representation, anatomy):
try:
context = representation["context"]
except KeyError:
return (None, None)
try:
path = get_representation_path_with_anatomy(
representation, anatomy
)
except InvalidRepresentationContext:
return (None, None)
sequence_path = None
if "frame" in context:
context["frame"] = self.sequence_splitter
sequence_path = get_representation_path_with_anatomy(
representation, anatomy
)
if sequence_path:
sequence_path = sequence_path.normalized()
return (path.normalized(), sequence_path)
def delete_only_repre_files(self, dir_paths, file_paths, delete=True):
size = 0
for dir_id, dir_path in dir_paths.items():
dir_files = os.listdir(dir_path)
collections, remainders = clique.assemble(dir_files)
for file_path, seq_path in file_paths[dir_id]:
file_path_base = os.path.split(file_path)[1]
# Just remove file if `frame` key was not in context or
# filled path is in remainders (single file sequence)
if not seq_path or file_path_base in remainders:
if not os.path.exists(file_path):
self.log.debug(
"File was not found: {}".format(file_path)
)
continue
size += os.path.getsize(file_path)
if delete:
os.remove(file_path)
self.log.debug("Removed file: {}".format(file_path))
if file_path_base in remainders:
remainders.remove(file_path_base)
continue
seq_path_base = os.path.split(seq_path)[1]
head, tail = seq_path_base.split(self.sequence_splitter)
final_col = None
for collection in collections:
if head != collection.head or tail != collection.tail:
continue
final_col = collection
break
if final_col is not None:
# Fill full path to head
final_col.head = os.path.join(dir_path, final_col.head)
for _file_path in final_col:
if os.path.exists(_file_path):
size += os.path.getsize(_file_path)
if delete:
os.remove(_file_path)
self.log.debug(
"Removed file: {}".format(_file_path)
)
_seq_path = final_col.format("{head}{padding}{tail}")
self.log.debug("Removed files: {}".format(_seq_path))
collections.remove(final_col)
elif os.path.exists(file_path):
size += os.path.getsize(file_path)
if delete:
os.remove(file_path)
self.log.debug("Removed file: {}".format(file_path))
else:
self.log.debug(
"File was not found: {}".format(file_path)
)
# Delete as much as possible parent folders
if not delete:
return size
for dir_path in dir_paths.values():
while True:
if not os.path.exists(dir_path):
dir_path = os.path.dirname(dir_path)
continue
if len(os.listdir(dir_path)) != 0:
break
self.log.debug("Removed folder: {}".format(dir_path))
os.rmdir(dir_path)
return size
def message(self, text):
msgBox = QtWidgets.QMessageBox()
msgBox.setText(text)
msgBox.setStyleSheet(style.load_stylesheet())
msgBox.setWindowFlags(
msgBox.windowFlags() | QtCore.Qt.FramelessWindowHint
)
msgBox.exec_()
def _confirm_delete(self,
contexts: List[Dict[str, Any]],
versions_to_keep: int) -> bool:
"""Prompt user for a deletion confirmation"""
contexts_list = "\n".join(sorted(
"- {folder[name]} > {product[name]}".format_map(context)
for context in contexts
))
num_contexts = len(contexts)
s = "s" if num_contexts > 1 else ""
text = (
"Are you sure you want to delete versions?\n\n"
f"This will keep only the last {versions_to_keep} "
f"versions for the {num_contexts} selected product{s}."
)
informative_text = "Warning: This will delete files from disk"
detailed_text = (
f"Keep only {versions_to_keep} versions for:\n{contexts_list}"
)
messagebox = QtWidgets.QMessageBox()
messagebox.setIcon(QtWidgets.QMessageBox.Warning)
messagebox.setWindowTitle("Delete Old Versions")
messagebox.setText(text)
messagebox.setInformativeText(informative_text)
messagebox.setDetailedText(detailed_text)
messagebox.setStandardButtons(
QtWidgets.QMessageBox.Yes
| QtWidgets.QMessageBox.Cancel
)
messagebox.setDefaultButton(QtWidgets.QMessageBox.Cancel)
messagebox.setStyleSheet(style.load_stylesheet())
messagebox.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
return messagebox.exec_() == QtWidgets.QMessageBox.Yes
def get_data(self, context, versions_count):
product_entity = context["product"]
folder_entity = context["folder"]
project_name = context["project"]["name"]
anatomy = Anatomy(project_name, project_entity=context["project"])
version_fields = ayon_api.get_default_fields_for_type("version")
version_fields.add("tags")
versions = list(ayon_api.get_versions(
project_name,
product_ids=[product_entity["id"]],
active=None,
hero=False,
fields=version_fields
))
self.log.debug(
"Version Number ({})".format(len(versions))
)
versions_by_parent = collections.defaultdict(list)
for ent in versions:
versions_by_parent[ent["productId"]].append(ent)
def sort_func(ent):
return int(ent["version"])
all_last_versions = []
for _parent_id, _versions in versions_by_parent.items():
for idx, version in enumerate(
sorted(_versions, key=sort_func, reverse=True)
):
if idx >= versions_count:
break
all_last_versions.append(version)
self.log.debug("Collected versions ({})".format(len(versions)))
# Filter latest versions
for version in all_last_versions:
versions.remove(version)
# Update versions_by_parent without filtered versions
versions_by_parent = collections.defaultdict(list)
for ent in versions:
versions_by_parent[ent["productId"]].append(ent)
# Filter already deleted versions
versions_to_pop = []
for version in versions:
if "deleted" in version["tags"]:
versions_to_pop.append(version)
for version in versions_to_pop:
msg = "Folder: \"{}\" | Product: \"{}\" | Version: \"{}\"".format(
folder_entity["path"],
product_entity["name"],
version["version"]
)
self.log.debug((
"Skipping version. Already tagged as inactive. < {} >"
).format(msg))
versions.remove(version)
version_ids = [ent["id"] for ent in versions]
self.log.debug(
"Filtered versions to delete ({})".format(len(version_ids))
)
if not version_ids:
msg = "Skipping processing. Nothing to delete on {}/{}".format(
folder_entity["path"], product_entity["name"]
)
self.log.info(msg)
print(msg)
return
repres = list(ayon_api.get_representations(
project_name, version_ids=version_ids
))
self.log.debug(
"Collected representations to remove ({})".format(len(repres))
)
dir_paths = {}
file_paths_by_dir = collections.defaultdict(list)
for repre in repres:
file_path, seq_path = self.path_from_representation(
repre, anatomy
)
if file_path is None:
self.log.debug((
"Could not format path for represenation \"{}\""
).format(str(repre)))
continue
dir_path = os.path.dirname(file_path)
dir_id = None
for _dir_id, _dir_path in dir_paths.items():
if _dir_path == dir_path:
dir_id = _dir_id
break
if dir_id is None:
dir_id = uuid.uuid4()
dir_paths[dir_id] = dir_path
file_paths_by_dir[dir_id].append([file_path, seq_path])
dir_ids_to_pop = []
for dir_id, dir_path in dir_paths.items():
if os.path.exists(dir_path):
continue
dir_ids_to_pop.append(dir_id)
# Pop dirs from both dictionaries
for dir_id in dir_ids_to_pop:
dir_paths.pop(dir_id)
paths = file_paths_by_dir.pop(dir_id)
# TODO report of missing directories?
paths_msg = ", ".join([
"'{}'".format(path[0].replace("\\", "/")) for path in paths
])
self.log.debug((
"Folder does not exist. Deleting its files skipped: {}"
).format(paths_msg))
return {
"dir_paths": dir_paths,
"file_paths_by_dir": file_paths_by_dir,
"versions": versions,
"folder": folder_entity,
"product": product_entity,
"archive_product": versions_count == 0
}
def main(self, project_name, data, remove_publish_folder):
# Size of files.
size = 0
if not data:
return size
if remove_publish_folder:
size = self.delete_whole_dir_paths(data["dir_paths"].values())
else:
size = self.delete_only_repre_files(
data["dir_paths"], data["file_paths_by_dir"]
)
op_session = OperationsSession()
for version in data["versions"]:
orig_version_tags = version["tags"]
version_tags = list(orig_version_tags)
changes = {}
if "deleted" not in version_tags:
version_tags.append("deleted")
changes["tags"] = version_tags
if version["active"]:
changes["active"] = False
if not changes:
continue
op_session.update_entity(
project_name, "version", version["id"], changes
)
op_session.commit()
return size
def load(self, contexts, name=None, namespace=None, options=None):
# Get user options
versions_to_keep = 2
remove_publish_folder = False
if options:
versions_to_keep = options.get(
"versions_to_keep", versions_to_keep
)
remove_publish_folder = options.get(
"remove_publish_folder", remove_publish_folder
)
# Because we do not want this run by accident we will add an extra
# user confirmation
if (
self.requires_confirmation
and not self._confirm_delete(contexts, versions_to_keep)
):
return
try:
size = 0
for count, context in enumerate(contexts):
data = self.get_data(context, versions_to_keep)
if not data:
continue
project_name = context["project"]["name"]
size += self.main(project_name, data, remove_publish_folder)
print("Progressing {}/{}".format(count + 1, len(contexts)))
msg = "Total size of files: {}".format(format_file_size(size))
self.log.info(msg)
self.message(msg)
except Exception:
self.log.error("Failed to delete versions.", exc_info=True)
class CalculateOldVersions(DeleteOldVersions):
"""Calculate file size of old versions"""
label = "Calculate Old Versions"
order = 30
tool_names = ["library_loader"]
options = [
qargparse.Integer(
"versions_to_keep", default=2, min=0, help="Versions to keep:"
),
qargparse.Boolean(
"remove_publish_folder", help="Remove publish folder:"
)
]
requires_confirmation = False
def main(self, project_name, data, remove_publish_folder):
size = 0
if not data:
return size
if remove_publish_folder:
size = self.delete_whole_dir_paths(
data["dir_paths"].values(), delete=False
)
else:
size = self.delete_only_repre_files(
data["dir_paths"], data["file_paths_by_dir"], delete=False
)
return size

View file

@ -1,36 +0,0 @@
import sys
import os
import subprocess
from ayon_core.pipeline import load
def open(filepath):
"""Open file with system default executable"""
if sys.platform.startswith('darwin'):
subprocess.call(('open', filepath))
elif os.name == 'nt':
os.startfile(filepath)
elif os.name == 'posix':
subprocess.call(('xdg-open', filepath))
class OpenFile(load.LoaderPlugin):
"""Open Image Sequence or Video with system default"""
product_types = {"render2d"}
representations = {"*"}
label = "Open"
order = -10
icon = "play-circle"
color = "orange"
def load(self, context, name, namespace, data):
path = self.filepath_from_context(context)
if not os.path.exists(path):
raise RuntimeError("File not found: {}".format(path))
self.log.info("Opening : {}".format(path))
open(path)

View file

@ -1,56 +0,0 @@
import os
from ayon_core import AYON_CORE_ROOT
from ayon_core.lib import get_ayon_launcher_args, run_detached_process
from ayon_core.pipeline import load
from ayon_core.pipeline.load import LoadError
class PushToProject(load.ProductLoaderPlugin):
"""Export selected versions to different project"""
is_multiple_contexts_compatible = True
representations = {"*"}
product_types = {"*"}
label = "Push to project"
order = 35
icon = "send"
color = "#d8d8d8"
def load(self, contexts, name=None, namespace=None, options=None):
filtered_contexts = [
context
for context in contexts
if context.get("project") and context.get("version")
]
if not filtered_contexts:
raise LoadError("Nothing to push for your selection")
folder_ids = set(
context["folder"]["id"]
for context in filtered_contexts
)
if len(folder_ids) > 1:
raise LoadError("Please select products from single folder")
push_tool_script_path = os.path.join(
AYON_CORE_ROOT,
"tools",
"push_to_project",
"main.py"
)
project_name = filtered_contexts[0]["project"]["name"]
version_ids = {
context["version"]["id"]
for context in filtered_contexts
}
args = get_ayon_launcher_args(
push_tool_script_path,
"--project", project_name,
"--versions", ",".join(version_ids)
)
run_detached_process(args)

View file

@ -0,0 +1,120 @@
import os
import collections
from typing import Optional, Any
from ayon_core.pipeline.load import get_representation_path_with_anatomy
from ayon_core.pipeline.actions import (
LoaderActionPlugin,
LoaderActionItem,
LoaderActionSelection,
LoaderActionResult,
)
class CopyFileActionPlugin(LoaderActionPlugin):
"""Copy published file path to clipboard"""
identifier = "core.copy-action"
def get_action_items(
self, selection: LoaderActionSelection
) -> list[LoaderActionItem]:
repres = []
if selection.selected_type == "representation":
repres = selection.entities.get_representations(
selection.selected_ids
)
if selection.selected_type == "version":
repres = selection.entities.get_versions_representations(
selection.selected_ids
)
output = []
if not repres:
return output
repre_ids_by_name = collections.defaultdict(set)
for repre in repres:
repre_ids_by_name[repre["name"]].add(repre["id"])
for repre_name, repre_ids in repre_ids_by_name.items():
repre_id = next(iter(repre_ids), None)
if not repre_id:
continue
output.append(
LoaderActionItem(
label=repre_name,
group_label="Copy file path",
data={
"representation_id": repre_id,
"action": "copy-path",
},
icon={
"type": "material-symbols",
"name": "content_copy",
"color": "#999999",
}
)
)
output.append(
LoaderActionItem(
label=repre_name,
group_label="Copy file",
data={
"representation_id": repre_id,
"action": "copy-file",
},
icon={
"type": "material-symbols",
"name": "file_copy",
"color": "#999999",
}
)
)
return output
def execute_action(
self,
selection: LoaderActionSelection,
data: dict,
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
from qtpy import QtWidgets, QtCore
action = data["action"]
repre_id = data["representation_id"]
repre = next(iter(selection.entities.get_representations({repre_id})))
path = get_representation_path_with_anatomy(
repre, selection.get_project_anatomy()
)
self.log.info(f"Added file path to clipboard: {path}")
clipboard = QtWidgets.QApplication.clipboard()
if not clipboard:
return LoaderActionResult(
"Failed to copy file path to clipboard.",
success=False,
)
if action == "copy-path":
# Set to Clipboard
clipboard.setText(os.path.normpath(path))
return LoaderActionResult(
"Path stored to clipboard...",
success=True,
)
# Build mime data for clipboard
data = QtCore.QMimeData()
url = QtCore.QUrl.fromLocalFile(path)
data.setUrls([url])
# Set to Clipboard
clipboard.setMimeData(data)
return LoaderActionResult(
"File added to clipboard...",
success=True,
)

View file

@ -0,0 +1,388 @@
from __future__ import annotations
import os
import collections
import json
import shutil
from typing import Optional, Any
from ayon_api.operations import OperationsSession
from ayon_core.lib import (
format_file_size,
AbstractAttrDef,
NumberDef,
BoolDef,
TextDef,
UILabelDef,
)
from ayon_core.pipeline import Anatomy
from ayon_core.pipeline.actions import (
ActionForm,
LoaderActionPlugin,
LoaderActionItem,
LoaderActionSelection,
LoaderActionResult,
)
class DeleteOldVersions(LoaderActionPlugin):
"""Deletes specific number of old version"""
is_multiple_contexts_compatible = True
sequence_splitter = "__sequence_splitter__"
requires_confirmation = True
def get_action_items(
self, selection: LoaderActionSelection
) -> list[LoaderActionItem]:
# Do not show in hosts
if self.host_name is not None:
return []
versions = selection.get_selected_version_entities()
if not versions:
return []
product_ids = {
version["productId"]
for version in versions
}
return [
LoaderActionItem(
label="Delete Versions",
order=35,
data={
"product_ids": list(product_ids),
"action": "delete-versions",
},
icon={
"type": "material-symbols",
"name": "delete",
"color": "#d8d8d8",
}
),
LoaderActionItem(
label="Calculate Versions size",
order=30,
data={
"product_ids": list(product_ids),
"action": "calculate-versions-size",
},
icon={
"type": "material-symbols",
"name": "auto_delete",
"color": "#d8d8d8",
}
)
]
def execute_action(
self,
selection: LoaderActionSelection,
data: dict[str, Any],
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
step = form_values.get("step")
action = data["action"]
versions_to_keep = form_values.get("versions_to_keep")
remove_publish_folder = form_values.get("remove_publish_folder")
if step is None:
return self._first_step(
action,
versions_to_keep,
remove_publish_folder,
)
if versions_to_keep is None:
versions_to_keep = 2
if remove_publish_folder is None:
remove_publish_folder = False
product_ids = data["product_ids"]
if step == "prepare-data":
return self._prepare_data_step(
action,
versions_to_keep,
remove_publish_folder,
product_ids,
selection,
)
if step == "delete-versions":
return self._delete_versions_step(
selection.project_name, form_values
)
return None
def _first_step(
self,
action: str,
versions_to_keep: Optional[int],
remove_publish_folder: Optional[bool],
) -> LoaderActionResult:
fields: list[AbstractAttrDef] = [
TextDef(
"step",
visible=False,
),
NumberDef(
"versions_to_keep",
label="Versions to keep",
minimum=0,
default=2,
),
]
if action == "delete-versions":
fields.append(
BoolDef(
"remove_publish_folder",
label="Remove publish folder",
default=False,
)
)
form_values = {
key: value
for key, value in (
("remove_publish_folder", remove_publish_folder),
("versions_to_keep", versions_to_keep),
)
if value is not None
}
form_values["step"] = "prepare-data"
return LoaderActionResult(
form=ActionForm(
title="Delete Old Versions",
fields=fields,
),
form_values=form_values
)
def _prepare_data_step(
self,
action: str,
versions_to_keep: int,
remove_publish_folder: bool,
entity_ids: set[str],
selection: LoaderActionSelection,
):
versions_by_product_id = collections.defaultdict(list)
for version in selection.entities.get_products_versions(entity_ids):
# Keep hero version
if versions_to_keep != 0 and version["version"] < 0:
continue
versions_by_product_id[version["productId"]].append(version)
versions_to_delete = []
for product_id, versions in versions_by_product_id.items():
if versions_to_keep == 0:
versions_to_delete.extend(versions)
continue
if len(versions) <= versions_to_keep:
continue
versions.sort(key=lambda v: v["version"])
for _ in range(versions_to_keep):
if not versions:
break
versions.pop(-1)
versions_to_delete.extend(versions)
self.log.debug(
f"Collected versions to delete ({len(versions_to_delete)})"
)
version_ids = {
version["id"]
for version in versions_to_delete
}
if not version_ids:
return LoaderActionResult(
message="Skipping. Nothing to delete.",
success=False,
)
project = selection.entities.get_project()
anatomy = Anatomy(project["name"], project_entity=project)
repres = selection.entities.get_versions_representations(version_ids)
self.log.debug(
f"Collected representations to remove ({len(repres)})"
)
filepaths_by_repre_id = {}
repre_ids_by_version_id = {
version_id: []
for version_id in version_ids
}
for repre in repres:
repre_ids_by_version_id[repre["versionId"]].append(repre["id"])
filepaths_by_repre_id[repre["id"]] = [
anatomy.fill_root(repre_file["path"])
for repre_file in repre["files"]
]
size = 0
for filepaths in filepaths_by_repre_id.values():
for filepath in filepaths:
if os.path.exists(filepath):
size += os.path.getsize(filepath)
if action == "calculate-versions-size":
return LoaderActionResult(
message="Calculated size",
success=True,
form=ActionForm(
title="Calculated versions size",
fields=[
UILabelDef(
f"Total size of files: {format_file_size(size)}"
),
],
submit_label=None,
cancel_label="Close",
),
)
form, form_values = self._get_delete_form(
size,
remove_publish_folder,
list(version_ids),
repre_ids_by_version_id,
filepaths_by_repre_id,
)
return LoaderActionResult(
form=form,
form_values=form_values
)
def _delete_versions_step(
self, project_name: str, form_values: dict[str, Any]
) -> LoaderActionResult:
delete_data = json.loads(form_values["delete_data"])
remove_publish_folder = form_values["remove_publish_folder"]
if form_values["delete_value"].lower() != "delete":
size = delete_data["size"]
form, form_values = self._get_delete_form(
size,
remove_publish_folder,
delete_data["version_ids"],
delete_data["repre_ids_by_version_id"],
delete_data["filepaths_by_repre_id"],
True,
)
return LoaderActionResult(
form=form,
form_values=form_values,
)
version_ids = delete_data["version_ids"]
repre_ids_by_version_id = delete_data["repre_ids_by_version_id"]
filepaths_by_repre_id = delete_data["filepaths_by_repre_id"]
op_session = OperationsSession()
total_versions = len(version_ids)
try:
for version_idx, version_id in enumerate(version_ids):
self.log.info(
f"Progressing version {version_idx + 1}/{total_versions}"
)
for repre_id in repre_ids_by_version_id[version_id]:
for filepath in filepaths_by_repre_id[repre_id]:
publish_folder = os.path.dirname(filepath)
if remove_publish_folder:
if os.path.exists(publish_folder):
shutil.rmtree(
publish_folder, ignore_errors=True
)
continue
if os.path.exists(filepath):
os.remove(filepath)
op_session.delete_entity(
project_name, "representation", repre_id
)
op_session.delete_entity(
project_name, "version", version_id
)
self.log.info("All done")
except Exception:
self.log.error("Failed to delete versions.", exc_info=True)
return LoaderActionResult(
message="Failed to delete versions.",
success=False,
)
finally:
op_session.commit()
return LoaderActionResult(
message="Deleted versions",
success=True,
)
def _get_delete_form(
self,
size: int,
remove_publish_folder: bool,
version_ids: list[str],
repre_ids_by_version_id: dict[str, list[str]],
filepaths_by_repre_id: dict[str, list[str]],
repeated: bool = False,
) -> tuple[ActionForm, dict[str, Any]]:
versions_len = len(repre_ids_by_version_id)
fields = [
UILabelDef(
f"Going to delete {versions_len} versions<br/>"
f"- total size of files: {format_file_size(size)}<br/>"
),
UILabelDef("Are you sure you want to continue?"),
TextDef(
"delete_value",
placeholder="Type 'delete' to confirm...",
),
]
if repeated:
fields.append(UILabelDef(
"*Please fill in '**delete**' to confirm deletion.*"
))
fields.extend([
TextDef(
"delete_data",
visible=False,
),
TextDef(
"step",
visible=False,
),
BoolDef(
"remove_publish_folder",
label="Remove publish folder",
default=False,
visible=False,
)
])
form = ActionForm(
title="Delete versions",
submit_label="Delete",
cancel_label="Close",
fields=fields,
)
form_values = {
"delete_data": json.dumps({
"size": size,
"version_ids": version_ids,
"repre_ids_by_version_id": repre_ids_by_version_id,
"filepaths_by_repre_id": filepaths_by_repre_id,
}),
"step": "delete-versions",
"remove_publish_folder": remove_publish_folder,
}
return form, form_values

View file

@ -1,5 +1,6 @@
import platform
from collections import defaultdict
from typing import Optional, Any
import ayon_api
from qtpy import QtWidgets, QtCore, QtGui
@ -10,7 +11,12 @@ from ayon_core.lib import (
collect_frames,
get_datetime_data,
)
from ayon_core.pipeline import load, Anatomy
from ayon_core.pipeline import Anatomy
from ayon_core.pipeline.actions import (
LoaderSimpleActionPlugin,
LoaderActionSelection,
LoaderActionResult,
)
from ayon_core.pipeline.load import get_representation_path_with_anatomy
from ayon_core.pipeline.delivery import (
get_format_dict,
@ -20,43 +26,72 @@ from ayon_core.pipeline.delivery import (
)
class Delivery(load.ProductLoaderPlugin):
"""Export selected versions to folder structure from Template"""
is_multiple_contexts_compatible = True
sequence_splitter = "__sequence_splitter__"
representations = {"*"}
product_types = {"*"}
tool_names = ["library_loader"]
class DeliveryAction(LoaderSimpleActionPlugin):
identifier = "core.delivery"
label = "Deliver Versions"
order = 35
icon = "upload"
color = "#d8d8d8"
icon = {
"type": "material-symbols",
"name": "upload",
"color": "#d8d8d8",
}
def message(self, text):
msgBox = QtWidgets.QMessageBox()
msgBox.setText(text)
msgBox.setStyleSheet(style.load_stylesheet())
msgBox.setWindowFlags(
msgBox.windowFlags() | QtCore.Qt.FramelessWindowHint
def is_compatible(self, selection: LoaderActionSelection) -> bool:
if self.host_name is not None:
return False
if not selection.selected_ids:
return False
return (
selection.versions_selected()
or selection.representations_selected()
)
msgBox.exec_()
def load(self, contexts, name=None, namespace=None, options=None):
def execute_simple_action(
self,
selection: LoaderActionSelection,
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
version_ids = set()
if selection.selected_type == "representation":
versions = selection.entities.get_representations_versions(
selection.selected_ids
)
version_ids = {version["id"] for version in versions}
if selection.selected_type == "version":
version_ids = set(selection.selected_ids)
if not version_ids:
return LoaderActionResult(
message="No versions found in your selection",
success=False,
)
try:
dialog = DeliveryOptionsDialog(contexts, self.log)
# TODO run the tool in subprocess
dialog = DeliveryOptionsDialog(
selection.project_name, version_ids, self.log
)
dialog.exec_()
except Exception:
self.log.error("Failed to deliver versions.", exc_info=True)
return LoaderActionResult()
class DeliveryOptionsDialog(QtWidgets.QDialog):
"""Dialog to select template where to deliver selected representations."""
def __init__(self, contexts, log=None, parent=None):
super(DeliveryOptionsDialog, self).__init__(parent=parent)
def __init__(
self,
project_name,
version_ids,
log=None,
parent=None,
):
super().__init__(parent=parent)
self.setWindowTitle("AYON - Deliver versions")
icon = QtGui.QIcon(resources.get_ayon_icon_filepath())
@ -70,13 +105,12 @@ class DeliveryOptionsDialog(QtWidgets.QDialog):
self.setStyleSheet(style.load_stylesheet())
project_name = contexts[0]["project"]["name"]
self.anatomy = Anatomy(project_name)
self._representations = None
self.log = log
self.currently_uploaded = 0
self._set_representations(project_name, contexts)
self._set_representations(project_name, version_ids)
dropdown = QtWidgets.QComboBox()
self.templates = self._get_templates(self.anatomy)
@ -316,9 +350,7 @@ class DeliveryOptionsDialog(QtWidgets.QDialog):
return templates
def _set_representations(self, project_name, contexts):
version_ids = {context["version"]["id"] for context in contexts}
def _set_representations(self, project_name, version_ids):
repres = list(ayon_api.get_representations(
project_name, version_ids=version_ids
))

View file

@ -2,11 +2,10 @@ import logging
import os
from pathlib import Path
from collections import defaultdict
from typing import Any, Optional
from qtpy import QtWidgets, QtCore, QtGui
from ayon_api import get_representations
from ayon_core.pipeline import load, Anatomy
from ayon_core import resources, style
from ayon_core.lib.transcoding import (
IMAGE_EXTENSIONS,
@ -16,9 +15,16 @@ from ayon_core.lib import (
get_ffprobe_data,
is_oiio_supported,
)
from ayon_core.pipeline import Anatomy
from ayon_core.pipeline.load import get_representation_path_with_anatomy
from ayon_core.tools.utils import show_message_dialog
from ayon_core.pipeline.actions import (
LoaderSimpleActionPlugin,
LoaderActionSelection,
LoaderActionResult,
)
OTIO = None
FRAME_SPLITTER = "__frame_splitter__"
@ -30,34 +36,99 @@ def _import_otio():
OTIO = opentimelineio
class ExportOTIO(load.ProductLoaderPlugin):
"""Export selected versions to OpenTimelineIO."""
is_multiple_contexts_compatible = True
sequence_splitter = "__sequence_splitter__"
representations = {"*"}
product_types = {"*"}
tool_names = ["library_loader"]
class ExportOTIO(LoaderSimpleActionPlugin):
identifier = "core.export-otio"
label = "Export OTIO"
group_label = None
order = 35
icon = "save"
color = "#d8d8d8"
icon = {
"type": "material-symbols",
"name": "save",
"color": "#d8d8d8",
}
def load(self, contexts, name=None, namespace=None, options=None):
def is_compatible(
self, selection: LoaderActionSelection
) -> bool:
# Don't show in hosts
if self.host_name is not None:
return False
return selection.versions_selected()
def execute_simple_action(
self,
selection: LoaderActionSelection,
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
_import_otio()
version_ids = set(selection.selected_ids)
versions_by_id = {
version["id"]: version
for version in selection.entities.get_versions(version_ids)
}
product_ids = {
version["productId"]
for version in versions_by_id.values()
}
products_by_id = {
product["id"]: product
for product in selection.entities.get_products(product_ids)
}
folder_ids = {
product["folderId"]
for product in products_by_id.values()
}
folder_by_id = {
folder["id"]: folder
for folder in selection.entities.get_folders(folder_ids)
}
repre_entities = selection.entities.get_versions_representations(
version_ids
)
version_path_by_id = {}
for version in versions_by_id.values():
version_id = version["id"]
product_id = version["productId"]
product = products_by_id[product_id]
folder_id = product["folderId"]
folder = folder_by_id[folder_id]
version_path_by_id[version_id] = "/".join([
folder["path"],
product["name"],
version["name"]
])
try:
dialog = ExportOTIOOptionsDialog(contexts, self.log)
# TODO this should probably trigger a subprocess?
dialog = ExportOTIOOptionsDialog(
selection.project_name,
versions_by_id,
repre_entities,
version_path_by_id,
self.log
)
dialog.exec_()
except Exception:
self.log.error("Failed to export OTIO.", exc_info=True)
return LoaderActionResult()
class ExportOTIOOptionsDialog(QtWidgets.QDialog):
"""Dialog to select template where to deliver selected representations."""
def __init__(self, contexts, log=None, parent=None):
def __init__(
self,
project_name,
versions_by_id,
repre_entities,
version_path_by_id,
log=None,
parent=None
):
# Not all hosts have OpenTimelineIO available.
self.log = log
@ -73,30 +144,14 @@ class ExportOTIOOptionsDialog(QtWidgets.QDialog):
| QtCore.Qt.WindowMinimizeButtonHint
)
project_name = contexts[0]["project"]["name"]
versions_by_id = {
context["version"]["id"]: context["version"]
for context in contexts
}
repre_entities = list(get_representations(
project_name, version_ids=set(versions_by_id)
))
version_by_representation_id = {
repre_entity["id"]: versions_by_id[repre_entity["versionId"]]
for repre_entity in repre_entities
}
version_path_by_id = {}
representations_by_version_id = {}
for context in contexts:
version_id = context["version"]["id"]
if version_id in version_path_by_id:
continue
representations_by_version_id[version_id] = []
version_path_by_id[version_id] = "/".join([
context["folder"]["path"],
context["product"]["name"],
context["version"]["name"]
])
representations_by_version_id = {
version_id: []
for version_id in versions_by_id
}
for repre_entity in repre_entities:
representations_by_version_id[repre_entity["versionId"]].append(

View file

@ -0,0 +1,130 @@
import os
import sys
import subprocess
import collections
from typing import Optional, Any
from ayon_core.pipeline.load import get_representation_path_with_anatomy
from ayon_core.pipeline.actions import (
LoaderActionPlugin,
LoaderActionItem,
LoaderActionSelection,
LoaderActionResult,
)
def open_file(filepath: str) -> None:
"""Open file with system default executable"""
if sys.platform.startswith("darwin"):
subprocess.call(("open", filepath))
elif os.name == "nt":
os.startfile(filepath)
elif os.name == "posix":
subprocess.call(("xdg-open", filepath))
class OpenFileAction(LoaderActionPlugin):
"""Open Image Sequence or Video with system default"""
identifier = "core.open-file"
product_types = {"render2d"}
def get_action_items(
self, selection: LoaderActionSelection
) -> list[LoaderActionItem]:
repres = []
if selection.selected_type == "representation":
repres = selection.entities.get_representations(
selection.selected_ids
)
if selection.selected_type == "version":
repres = selection.entities.get_versions_representations(
selection.selected_ids
)
if not repres:
return []
repre_ids = {repre["id"] for repre in repres}
versions = selection.entities.get_representations_versions(
repre_ids
)
product_ids = {version["productId"] for version in versions}
products = selection.entities.get_products(product_ids)
filtered_product_ids = {
product["id"]
for product in products
if product["productType"] in self.product_types
}
if not filtered_product_ids:
return []
versions_by_product_id = collections.defaultdict(list)
for version in versions:
versions_by_product_id[version["productId"]].append(version)
repres_by_version_ids = collections.defaultdict(list)
for repre in repres:
repres_by_version_ids[repre["versionId"]].append(repre)
filtered_repres = []
for product_id in filtered_product_ids:
for version in versions_by_product_id[product_id]:
for repre in repres_by_version_ids[version["id"]]:
filtered_repres.append(repre)
repre_ids_by_name = collections.defaultdict(set)
for repre in filtered_repres:
repre_ids_by_name[repre["name"]].add(repre["id"])
return [
LoaderActionItem(
label=repre_name,
group_label="Open file",
order=-10,
data={"representation_ids": list(repre_ids)},
icon={
"type": "material-symbols",
"name": "play_circle",
"color": "#FFA500",
}
)
for repre_name, repre_ids in repre_ids_by_name.items()
]
def execute_action(
self,
selection: LoaderActionSelection,
data: dict[str, Any],
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
path = None
repre_path = None
repre_ids = data["representation_ids"]
for repre in selection.entities.get_representations(repre_ids):
repre_path = get_representation_path_with_anatomy(
repre, selection.get_project_anatomy()
)
if os.path.exists(repre_path):
path = repre_path
break
if path is None:
if repre_path is None:
return LoaderActionResult(
"Failed to fill representation path...",
success=False,
)
return LoaderActionResult(
"File to open was not found...",
success=False,
)
self.log.info(f"Opening: {path}")
open_file(path)
return LoaderActionResult(
"File was opened...",
success=True,
)

View file

@ -0,0 +1,69 @@
import os
from typing import Optional, Any
from ayon_core import AYON_CORE_ROOT
from ayon_core.lib import get_ayon_launcher_args, run_detached_process
from ayon_core.pipeline.actions import (
LoaderSimpleActionPlugin,
LoaderActionSelection,
LoaderActionResult,
)
class PushToProject(LoaderSimpleActionPlugin):
identifier = "core.push-to-project"
label = "Push to project"
order = 35
icon = {
"type": "material-symbols",
"name": "send",
"color": "#d8d8d8",
}
def is_compatible(
self, selection: LoaderActionSelection
) -> bool:
if not selection.versions_selected():
return False
version_ids = set(selection.selected_ids)
product_ids = {
product["id"]
for product in selection.entities.get_versions_products(
version_ids
)
}
folder_ids = {
folder["id"]
for folder in selection.entities.get_products_folders(
product_ids
)
}
if len(folder_ids) == 1:
return True
return False
def execute_simple_action(
self,
selection: LoaderActionSelection,
form_values: dict[str, Any],
) -> Optional[LoaderActionResult]:
push_tool_script_path = os.path.join(
AYON_CORE_ROOT,
"tools",
"push_to_project",
"main.py"
)
args = get_ayon_launcher_args(
push_tool_script_path,
"--project", selection.project_name,
"--versions", ",".join(selection.selected_ids)
)
run_detached_process(args)
return LoaderActionResult(
message="Push to project tool opened...",
success=True,
)

View file

@ -56,6 +56,7 @@ class AttributeDefinitionsDialog(QtWidgets.QDialog):
btns_layout.addWidget(cancel_btn, 0)
main_layout = QtWidgets.QVBoxLayout(self)
main_layout.setContentsMargins(10, 10, 10, 10)
main_layout.addWidget(attrs_widget, 0)
main_layout.addStretch(1)
main_layout.addWidget(btns_widget, 0)

View file

@ -182,6 +182,7 @@ class AttributeDefinitionsWidget(QtWidgets.QWidget):
layout.deleteLater()
new_layout = QtWidgets.QGridLayout()
new_layout.setContentsMargins(0, 0, 0, 0)
new_layout.setColumnStretch(0, 0)
new_layout.setColumnStretch(1, 1)
self.setLayout(new_layout)
@ -210,12 +211,8 @@ class AttributeDefinitionsWidget(QtWidgets.QWidget):
if not attr_def.visible:
continue
col_num = 0
expand_cols = 2
if attr_def.is_value_def and attr_def.is_label_horizontal:
expand_cols = 1
col_num = 2 - expand_cols
if attr_def.is_value_def and attr_def.label:
label_widget = AttributeDefinitionsLabel(
attr_def.id, attr_def.label, self
@ -233,9 +230,12 @@ class AttributeDefinitionsWidget(QtWidgets.QWidget):
| QtCore.Qt.AlignVCenter
)
layout.addWidget(
label_widget, row, 0, 1, expand_cols
label_widget, row, col_num, 1, 1
)
if not attr_def.is_label_horizontal:
if attr_def.is_label_horizontal:
col_num += 1
expand_cols = 1
else:
row += 1
if attr_def.is_value_def:

View file

@ -1,22 +1,12 @@
import time
import uuid
import collections
from qtpy import QtWidgets, QtCore, QtGui
from ayon_core.lib import Logger
from ayon_core.lib.attribute_definitions import (
UILabelDef,
EnumDef,
TextDef,
BoolDef,
NumberDef,
HiddenDef,
)
from ayon_core.pipeline.actions import webaction_fields_to_attribute_defs
from ayon_core.tools.flickcharm import FlickCharm
from ayon_core.tools.utils import (
get_qt_icon,
)
from ayon_core.tools.utils import get_qt_icon
from ayon_core.tools.attribute_defs import AttributeDefinitionsDialog
from ayon_core.tools.launcher.abstract import WebactionContext
@ -1173,74 +1163,7 @@ class ActionsWidget(QtWidgets.QWidget):
float - 'label', 'value', 'placeholder', 'min', 'max'
"""
attr_defs = []
for config_field in config_fields:
field_type = config_field["type"]
attr_def = None
if field_type == "label":
label = config_field.get("value")
if label is None:
label = config_field.get("text")
attr_def = UILabelDef(
label, key=uuid.uuid4().hex
)
elif field_type == "boolean":
value = config_field["value"]
if isinstance(value, str):
value = value.lower() == "true"
attr_def = BoolDef(
config_field["name"],
default=value,
label=config_field.get("label"),
)
elif field_type == "text":
attr_def = TextDef(
config_field["name"],
default=config_field.get("value"),
label=config_field.get("label"),
placeholder=config_field.get("placeholder"),
multiline=config_field.get("multiline", False),
regex=config_field.get("regex"),
# syntax=config_field["syntax"],
)
elif field_type in ("integer", "float"):
value = config_field.get("value")
if isinstance(value, str):
if field_type == "integer":
value = int(value)
else:
value = float(value)
attr_def = NumberDef(
config_field["name"],
default=value,
label=config_field.get("label"),
decimals=0 if field_type == "integer" else 5,
# placeholder=config_field.get("placeholder"),
minimum=config_field.get("min"),
maximum=config_field.get("max"),
)
elif field_type in ("select", "multiselect"):
attr_def = EnumDef(
config_field["name"],
items=config_field["options"],
default=config_field.get("value"),
label=config_field.get("label"),
multiselection=field_type == "multiselect",
)
elif field_type == "hidden":
attr_def = HiddenDef(
config_field["name"],
default=config_field.get("value"),
)
if attr_def is None:
print(f"Unknown config field type: {field_type}")
attr_def = UILabelDef(
f"Unknown field type '{field_type}",
key=uuid.uuid4().hex
)
attr_defs.append(attr_def)
attr_defs = webaction_fields_to_attribute_defs(config_fields)
dialog = AttributeDefinitionsDialog(
attr_defs,

View file

@ -316,43 +316,34 @@ class ActionItem:
Args:
identifier (str): Action identifier.
label (str): Action label.
icon (dict[str, Any]): Action icon definition.
tooltip (str): Action tooltip.
group_label (Optional[str]): Group label.
icon (Optional[dict[str, Any]]): Action icon definition.
tooltip (Optional[str]): Action tooltip.
order (int): Action order.
data (Optional[dict[str, Any]]): Additional action data.
options (Union[list[AbstractAttrDef], list[qargparse.QArgument]]):
Action options. Note: 'qargparse' is considered as deprecated.
order (int): Action order.
project_name (str): Project name.
folder_ids (list[str]): Folder ids.
product_ids (list[str]): Product ids.
version_ids (list[str]): Version ids.
representation_ids (list[str]): Representation ids.
"""
"""
def __init__(
self,
identifier,
label,
icon,
tooltip,
options,
order,
project_name,
folder_ids,
product_ids,
version_ids,
representation_ids,
identifier: str,
label: str,
group_label: Optional[str],
icon: Optional[dict[str, Any]],
tooltip: Optional[str],
order: int,
data: Optional[dict[str, Any]],
options: Optional[list],
):
self.identifier = identifier
self.label = label
self.group_label = group_label
self.icon = icon
self.tooltip = tooltip
self.options = options
self.data = data
self.order = order
self.project_name = project_name
self.folder_ids = folder_ids
self.product_ids = product_ids
self.version_ids = version_ids
self.representation_ids = representation_ids
self.options = options
def _options_to_data(self):
options = self.options
@ -364,30 +355,26 @@ class ActionItem:
# future development of detached UI tools it would be better to be
# prepared for it.
raise NotImplementedError(
"{}.to_data is not implemented. Use Attribute definitions"
" from 'ayon_core.lib' instead of 'qargparse'.".format(
self.__class__.__name__
)
f"{self.__class__.__name__}.to_data is not implemented."
" Use Attribute definitions from 'ayon_core.lib'"
" instead of 'qargparse'."
)
def to_data(self):
def to_data(self) -> dict[str, Any]:
options = self._options_to_data()
return {
"identifier": self.identifier,
"label": self.label,
"group_label": self.group_label,
"icon": self.icon,
"tooltip": self.tooltip,
"options": options,
"order": self.order,
"project_name": self.project_name,
"folder_ids": self.folder_ids,
"product_ids": self.product_ids,
"version_ids": self.version_ids,
"representation_ids": self.representation_ids,
"data": self.data,
"options": options,
}
@classmethod
def from_data(cls, data):
def from_data(cls, data) -> "ActionItem":
options = data["options"]
if options:
options = deserialize_attr_defs(options)
@ -1005,43 +992,35 @@ class FrontendLoaderController(_BaseLoaderController):
# Load action items
@abstractmethod
def get_versions_action_items(self, project_name, version_ids):
def get_action_items(
self,
project_name: str,
entity_ids: set[str],
entity_type: str,
) -> list[ActionItem]:
"""Action items for versions selection.
Args:
project_name (str): Project name.
version_ids (Iterable[str]): Version ids.
entity_ids (set[str]): Entity ids.
entity_type (str): Entity type.
Returns:
list[ActionItem]: List of action items.
"""
pass
@abstractmethod
def get_representations_action_items(
self, project_name, representation_ids
):
"""Action items for representations selection.
Args:
project_name (str): Project name.
representation_ids (Iterable[str]): Representation ids.
Returns:
list[ActionItem]: List of action items.
"""
pass
@abstractmethod
def trigger_action_item(
self,
identifier,
options,
project_name,
version_ids,
representation_ids
identifier: str,
project_name: str,
selected_ids: set[str],
selected_entity_type: str,
data: Optional[dict[str, Any]],
options: dict[str, Any],
form_values: dict[str, Any],
):
"""Trigger action item.
@ -1059,13 +1038,15 @@ class FrontendLoaderController(_BaseLoaderController):
}
Args:
identifier (str): Action identifier.
options (dict[str, Any]): Action option values from UI.
identifier (sttr): Plugin identifier.
project_name (str): Project name.
version_ids (Iterable[str]): Version ids.
representation_ids (Iterable[str]): Representation ids.
"""
selected_ids (set[str]): Selected entity ids.
selected_entity_type (str): Selected entity type.
data (Optional[dict[str, Any]]): Additional action item data.
options (dict[str, Any]): Action option values from UI.
form_values (dict[str, Any]): Action form values from UI.
"""
pass
@abstractmethod

View file

@ -2,7 +2,7 @@ from __future__ import annotations
import logging
import uuid
from typing import Optional
from typing import Optional, Any
import ayon_api
@ -28,7 +28,8 @@ from ayon_core.tools.common_models import (
from .abstract import (
BackendLoaderController,
FrontendLoaderController,
ProductTypesFilter
ProductTypesFilter,
ActionItem,
)
from .models import (
SelectionModel,
@ -316,45 +317,47 @@ class LoaderController(BackendLoaderController, FrontendLoaderController):
project_name, product_ids, group_name
)
def get_versions_action_items(self, project_name, version_ids):
return self._loader_actions_model.get_versions_action_items(
project_name, version_ids)
def get_representations_action_items(
self, project_name, representation_ids):
action_items = (
self._loader_actions_model.get_representations_action_items(
project_name, representation_ids)
def get_action_items(
self,
project_name: str,
entity_ids: set[str],
entity_type: str,
) -> list[ActionItem]:
action_items = self._loader_actions_model.get_action_items(
project_name, entity_ids, entity_type
)
action_items.extend(self._sitesync_model.get_sitesync_action_items(
project_name, representation_ids)
site_sync_items = self._sitesync_model.get_sitesync_action_items(
project_name, entity_ids, entity_type
)
action_items.extend(site_sync_items)
return action_items
def trigger_action_item(
self,
identifier,
options,
project_name,
version_ids,
representation_ids
identifier: str,
project_name: str,
selected_ids: set[str],
selected_entity_type: str,
data: Optional[dict[str, Any]],
options: dict[str, Any],
form_values: dict[str, Any],
):
if self._sitesync_model.is_sitesync_action(identifier):
self._sitesync_model.trigger_action_item(
identifier,
project_name,
representation_ids
data,
)
return
self._loader_actions_model.trigger_action_item(
identifier,
options,
project_name,
version_ids,
representation_ids
identifier=identifier,
project_name=project_name,
selected_ids=selected_ids,
selected_entity_type=selected_entity_type,
data=data,
options=options,
form_values=form_values,
)
# Selection model wrappers

View file

@ -5,10 +5,16 @@ import traceback
import inspect
import collections
import uuid
from typing import Optional, Callable, Any
import ayon_api
from ayon_core.lib import NestedCacheItem
from ayon_core.lib import NestedCacheItem, Logger
from ayon_core.pipeline.actions import (
LoaderActionsContext,
LoaderActionSelection,
SelectionEntitiesCache,
)
from ayon_core.pipeline.load import (
discover_loader_plugins,
ProductLoaderPlugin,
@ -23,6 +29,7 @@ from ayon_core.pipeline.load import (
from ayon_core.tools.loader.abstract import ActionItem
ACTIONS_MODEL_SENDER = "actions.model"
LOADER_PLUGIN_ID = "__loader_plugin__"
NOT_SET = object()
@ -44,6 +51,7 @@ class LoaderActionsModel:
loaders_cache_lifetime = 30
def __init__(self, controller):
self._log = Logger.get_logger(self.__class__.__name__)
self._controller = controller
self._current_context_project = NOT_SET
self._loaders_by_identifier = NestedCacheItem(
@ -52,6 +60,15 @@ class LoaderActionsModel:
levels=1, lifetime=self.loaders_cache_lifetime)
self._repre_loaders = NestedCacheItem(
levels=1, lifetime=self.loaders_cache_lifetime)
self._loader_actions = LoaderActionsContext()
self._projects_cache = NestedCacheItem(levels=1, lifetime=60)
self._folders_cache = NestedCacheItem(levels=2, lifetime=300)
self._tasks_cache = NestedCacheItem(levels=2, lifetime=300)
self._products_cache = NestedCacheItem(levels=2, lifetime=300)
self._versions_cache = NestedCacheItem(levels=2, lifetime=1200)
self._representations_cache = NestedCacheItem(levels=2, lifetime=1200)
self._repre_parents_cache = NestedCacheItem(levels=2, lifetime=1200)
def reset(self):
"""Reset the model with all cached items."""
@ -60,64 +77,58 @@ class LoaderActionsModel:
self._loaders_by_identifier.reset()
self._product_loaders.reset()
self._repre_loaders.reset()
self._loader_actions.reset()
def get_versions_action_items(self, project_name, version_ids):
"""Get action items for given version ids.
self._folders_cache.reset()
self._tasks_cache.reset()
self._products_cache.reset()
self._versions_cache.reset()
self._representations_cache.reset()
self._repre_parents_cache.reset()
Args:
project_name (str): Project name.
version_ids (Iterable[str]): Version ids.
def get_action_items(
self,
project_name: str,
entity_ids: set[str],
entity_type: str,
) -> list[ActionItem]:
version_context_by_id = {}
repre_context_by_id = {}
if entity_type == "representation":
(
version_context_by_id,
repre_context_by_id
) = self._contexts_for_representations(project_name, entity_ids)
Returns:
list[ActionItem]: List of action items.
"""
if entity_type == "version":
(
version_context_by_id,
repre_context_by_id
) = self._contexts_for_versions(project_name, entity_ids)
(
version_context_by_id,
repre_context_by_id
) = self._contexts_for_versions(
project_name,
version_ids
)
return self._get_action_items_for_contexts(
action_items = self._get_action_items_for_contexts(
project_name,
version_context_by_id,
repre_context_by_id
)
def get_representations_action_items(
self, project_name, representation_ids
):
"""Get action items for given representation ids.
Args:
project_name (str): Project name.
representation_ids (Iterable[str]): Representation ids.
Returns:
list[ActionItem]: List of action items.
"""
(
product_context_by_id,
repre_context_by_id
) = self._contexts_for_representations(
action_items.extend(self._get_loader_action_items(
project_name,
representation_ids
)
return self._get_action_items_for_contexts(
project_name,
product_context_by_id,
repre_context_by_id
)
entity_ids,
entity_type,
version_context_by_id,
repre_context_by_id,
))
return action_items
def trigger_action_item(
self,
identifier,
options,
project_name,
version_ids,
representation_ids
identifier: str,
project_name: str,
selected_ids: set[str],
selected_entity_type: str,
data: Optional[dict[str, Any]],
options: dict[str, Any],
form_values: dict[str, Any],
):
"""Trigger action by identifier.
@ -128,15 +139,21 @@ class LoaderActionsModel:
happened.
Args:
identifier (str): Loader identifier.
options (dict[str, Any]): Loader option values.
identifier (str): Plugin identifier.
project_name (str): Project name.
version_ids (Iterable[str]): Version ids.
representation_ids (Iterable[str]): Representation ids.
"""
selected_ids (set[str]): Selected entity ids.
selected_entity_type (str): Selected entity type.
data (Optional[dict[str, Any]]): Additional action item data.
options (dict[str, Any]): Loader option values.
form_values (dict[str, Any]): Form values.
"""
event_data = {
"identifier": identifier,
"project_name": project_name,
"selected_ids": list(selected_ids),
"selected_entity_type": selected_entity_type,
"data": data,
"id": uuid.uuid4().hex,
}
self._controller.emit_event(
@ -144,24 +161,60 @@ class LoaderActionsModel:
event_data,
ACTIONS_MODEL_SENDER,
)
loader = self._get_loader_by_identifier(project_name, identifier)
if representation_ids is not None:
error_info = self._trigger_representation_loader(
loader,
options,
project_name,
representation_ids,
if identifier != LOADER_PLUGIN_ID:
result = None
crashed = False
try:
result = self._loader_actions.execute_action(
identifier=identifier,
selection=LoaderActionSelection(
project_name,
selected_ids,
selected_entity_type,
),
data=data,
form_values=form_values,
)
except Exception:
crashed = True
self._log.warning(
f"Failed to execute action '{identifier}'",
exc_info=True,
)
event_data["result"] = result
event_data["crashed"] = crashed
self._controller.emit_event(
"loader.action.finished",
event_data,
ACTIONS_MODEL_SENDER,
)
elif version_ids is not None:
return
loader = self._get_loader_by_identifier(
project_name, data["loader"]
)
entity_type = data["entity_type"]
entity_ids = data["entity_ids"]
if entity_type == "version":
error_info = self._trigger_version_loader(
loader,
options,
project_name,
version_ids,
entity_ids,
)
elif entity_type == "representation":
error_info = self._trigger_representation_loader(
loader,
options,
project_name,
entity_ids,
)
else:
raise NotImplementedError(
"Invalid arguments to trigger action item")
f"Invalid entity type '{entity_type}' to trigger action item"
)
event_data["error_info"] = error_info
self._controller.emit_event(
@ -276,28 +329,26 @@ class LoaderActionsModel:
self,
loader,
contexts,
project_name,
folder_ids=None,
product_ids=None,
version_ids=None,
representation_ids=None,
entity_ids,
entity_type,
repre_name=None,
):
label = self._get_action_label(loader)
if repre_name:
label = "{} ({})".format(label, repre_name)
label = f"{label} ({repre_name})"
return ActionItem(
get_loader_identifier(loader),
LOADER_PLUGIN_ID,
data={
"entity_ids": entity_ids,
"entity_type": entity_type,
"loader": get_loader_identifier(loader),
},
label=label,
group_label=None,
icon=self._get_action_icon(loader),
tooltip=self._get_action_tooltip(loader),
options=loader.get_options(contexts),
order=loader.order,
project_name=project_name,
folder_ids=folder_ids,
product_ids=product_ids,
version_ids=version_ids,
representation_ids=representation_ids,
options=loader.get_options(contexts),
)
def _get_loaders(self, project_name):
@ -351,15 +402,6 @@ class LoaderActionsModel:
loaders_by_identifier = loaders_by_identifier_c.get_data()
return loaders_by_identifier.get(identifier)
def _actions_sorter(self, action_item):
"""Sort the Loaders by their order and then their name.
Returns:
tuple[int, str]: Sort keys.
"""
return action_item.order, action_item.label
def _contexts_for_versions(self, project_name, version_ids):
"""Get contexts for given version ids.
@ -385,8 +427,8 @@ class LoaderActionsModel:
if not project_name and not version_ids:
return version_context_by_id, repre_context_by_id
version_entities = ayon_api.get_versions(
project_name, version_ids=version_ids
version_entities = self._get_versions(
project_name, version_ids
)
version_entities_by_id = {}
version_entities_by_product_id = collections.defaultdict(list)
@ -397,18 +439,18 @@ class LoaderActionsModel:
version_entities_by_product_id[product_id].append(version_entity)
_product_ids = set(version_entities_by_product_id.keys())
_product_entities = ayon_api.get_products(
project_name, product_ids=_product_ids
_product_entities = self._get_products(
project_name, _product_ids
)
product_entities_by_id = {p["id"]: p for p in _product_entities}
_folder_ids = {p["folderId"] for p in product_entities_by_id.values()}
_folder_entities = ayon_api.get_folders(
project_name, folder_ids=_folder_ids
_folder_entities = self._get_folders(
project_name, _folder_ids
)
folder_entities_by_id = {f["id"]: f for f in _folder_entities}
project_entity = ayon_api.get_project(project_name)
project_entity = self._get_project(project_name)
for version_id, version_entity in version_entities_by_id.items():
product_id = version_entity["productId"]
@ -422,8 +464,15 @@ class LoaderActionsModel:
"version": version_entity,
}
repre_entities = ayon_api.get_representations(
project_name, version_ids=version_ids)
all_repre_ids = set()
for repre_ids in self._get_repre_ids_by_version_ids(
project_name, version_ids
).values():
all_repre_ids |= repre_ids
repre_entities = self._get_representations(
project_name, all_repre_ids
)
for repre_entity in repre_entities:
version_id = repre_entity["versionId"]
version_entity = version_entities_by_id[version_id]
@ -459,49 +508,54 @@ class LoaderActionsModel:
Returns:
tuple[list[dict[str, Any]], list[dict[str, Any]]]: Version and
representation contexts.
"""
product_context_by_id = {}
"""
version_context_by_id = {}
repre_context_by_id = {}
if not project_name and not repre_ids:
return product_context_by_id, repre_context_by_id
return version_context_by_id, repre_context_by_id
repre_entities = list(ayon_api.get_representations(
project_name, representation_ids=repre_ids
))
repre_entities = self._get_representations(
project_name, repre_ids
)
version_ids = {r["versionId"] for r in repre_entities}
version_entities = ayon_api.get_versions(
project_name, version_ids=version_ids
version_entities = self._get_versions(
project_name, version_ids
)
version_entities_by_id = {
v["id"]: v for v in version_entities
}
product_ids = {v["productId"] for v in version_entities_by_id.values()}
product_entities = ayon_api.get_products(
project_name, product_ids=product_ids
product_entities = self._get_products(
project_name, product_ids
)
product_entities_by_id = {
p["id"]: p for p in product_entities
}
folder_ids = {p["folderId"] for p in product_entities_by_id.values()}
folder_entities = ayon_api.get_folders(
project_name, folder_ids=folder_ids
folder_entities = self._get_folders(
project_name, folder_ids
)
folder_entities_by_id = {
f["id"]: f for f in folder_entities
}
project_entity = ayon_api.get_project(project_name)
project_entity = self._get_project(project_name)
for product_id, product_entity in product_entities_by_id.items():
version_context_by_id = {}
for version_id, version_entity in version_entities_by_id.items():
product_id = version_entity["productId"]
product_entity = product_entities_by_id[product_id]
folder_id = product_entity["folderId"]
folder_entity = folder_entities_by_id[folder_id]
product_context_by_id[product_id] = {
version_context_by_id[version_id] = {
"project": project_entity,
"folder": folder_entity,
"product": product_entity,
"version": version_entity,
}
for repre_entity in repre_entities:
@ -519,7 +573,125 @@ class LoaderActionsModel:
"version": version_entity,
"representation": repre_entity,
}
return product_context_by_id, repre_context_by_id
return version_context_by_id, repre_context_by_id
def _get_project(self, project_name: str) -> dict[str, Any]:
cache = self._projects_cache[project_name]
if not cache.is_valid:
cache.update_data(ayon_api.get_project(project_name))
return cache.get_data()
def _get_folders(
self, project_name: str, folder_ids: set[str]
) -> list[dict[str, Any]]:
"""Get folders by ids."""
return self._get_entities(
project_name,
folder_ids,
self._folders_cache,
ayon_api.get_folders,
"folder_ids",
)
def _get_products(
self, project_name: str, product_ids: set[str]
) -> list[dict[str, Any]]:
"""Get products by ids."""
return self._get_entities(
project_name,
product_ids,
self._products_cache,
ayon_api.get_products,
"product_ids",
)
def _get_versions(
self, project_name: str, version_ids: set[str]
) -> list[dict[str, Any]]:
"""Get versions by ids."""
return self._get_entities(
project_name,
version_ids,
self._versions_cache,
ayon_api.get_versions,
"version_ids",
)
def _get_representations(
self, project_name: str, representation_ids: set[str]
) -> list[dict[str, Any]]:
"""Get representations by ids."""
return self._get_entities(
project_name,
representation_ids,
self._representations_cache,
ayon_api.get_representations,
"representation_ids",
)
def _get_repre_ids_by_version_ids(
self, project_name: str, version_ids: set[str]
) -> dict[str, set[str]]:
output = {}
if not version_ids:
return output
project_cache = self._repre_parents_cache[project_name]
missing_ids = set()
for version_id in version_ids:
cache = project_cache[version_id]
if cache.is_valid:
output[version_id] = cache.get_data()
else:
missing_ids.add(version_id)
if missing_ids:
repre_cache = self._representations_cache[project_name]
repres_by_parent_id = collections.defaultdict(list)
for repre in ayon_api.get_representations(
project_name, version_ids=missing_ids
):
version_id = repre["versionId"]
repre_cache[repre["id"]].update_data(repre)
repres_by_parent_id[version_id].append(repre)
for version_id, repres in repres_by_parent_id.items():
repre_ids = {
repre["id"]
for repre in repres
}
output[version_id] = set(repre_ids)
project_cache[version_id].update_data(repre_ids)
return output
def _get_entities(
self,
project_name: str,
entity_ids: set[str],
cache: NestedCacheItem,
getter: Callable,
filter_arg: str,
) -> list[dict[str, Any]]:
entities = []
if not entity_ids:
return entities
missing_ids = set()
project_cache = cache[project_name]
for entity_id in entity_ids:
entity_cache = project_cache[entity_id]
if entity_cache.is_valid:
entities.append(entity_cache.get_data())
else:
missing_ids.add(entity_id)
if missing_ids:
for entity in getter(project_name, **{filter_arg: missing_ids}):
entities.append(entity)
entity_id = entity["id"]
project_cache[entity_id].update_data(entity)
return entities
def _get_action_items_for_contexts(
self,
@ -557,51 +729,137 @@ class LoaderActionsModel:
if not filtered_repre_contexts:
continue
repre_ids = set()
repre_version_ids = set()
repre_product_ids = set()
repre_folder_ids = set()
for repre_context in filtered_repre_contexts:
repre_ids.add(repre_context["representation"]["id"])
repre_product_ids.add(repre_context["product"]["id"])
repre_version_ids.add(repre_context["version"]["id"])
repre_folder_ids.add(repre_context["folder"]["id"])
repre_ids = {
repre_context["representation"]["id"]
for repre_context in filtered_repre_contexts
}
item = self._create_loader_action_item(
loader,
repre_contexts,
project_name=project_name,
folder_ids=repre_folder_ids,
product_ids=repre_product_ids,
version_ids=repre_version_ids,
representation_ids=repre_ids,
repre_ids,
"representation",
repre_name=repre_name,
)
action_items.append(item)
# Product Loaders.
version_ids = set(version_context_by_id.keys())
product_folder_ids = set()
product_ids = set()
for product_context in version_context_by_id.values():
product_ids.add(product_context["product"]["id"])
product_folder_ids.add(product_context["folder"]["id"])
version_ids = set(version_context_by_id.keys())
version_contexts = list(version_context_by_id.values())
for loader in product_loaders:
item = self._create_loader_action_item(
loader,
version_contexts,
project_name=project_name,
folder_ids=product_folder_ids,
product_ids=product_ids,
version_ids=version_ids,
version_ids,
"version",
)
action_items.append(item)
action_items.sort(key=self._actions_sorter)
return action_items
def _get_loader_action_items(
self,
project_name: str,
entity_ids: set[str],
entity_type: str,
version_context_by_id: dict[str, dict[str, Any]],
repre_context_by_id: dict[str, dict[str, Any]],
) -> list[ActionItem]:
"""
Args:
project_name (str): Project name.
entity_ids (set[str]): Selected entity ids.
entity_type (str): Selected entity type.
version_context_by_id (dict[str, dict[str, Any]]): Version context
by id.
repre_context_by_id (dict[str, dict[str, Any]]): Representation
context by id.
Returns:
list[ActionItem]: List of action items.
"""
entities_cache = self._prepare_entities_cache(
project_name,
entity_type,
version_context_by_id,
repre_context_by_id,
)
selection = LoaderActionSelection(
project_name,
entity_ids,
entity_type,
entities_cache=entities_cache
)
items = []
for action in self._loader_actions.get_action_items(selection):
items.append(ActionItem(
action.identifier,
label=action.label,
group_label=action.group_label,
icon=action.icon,
tooltip=None, # action.tooltip,
order=action.order,
data=action.data,
options=None, # action.options,
))
return items
def _prepare_entities_cache(
self,
project_name: str,
entity_type: str,
version_context_by_id: dict[str, dict[str, Any]],
repre_context_by_id: dict[str, dict[str, Any]],
):
project_entity = None
folders_by_id = {}
products_by_id = {}
versions_by_id = {}
representations_by_id = {}
for context in version_context_by_id.values():
if project_entity is None:
project_entity = context["project"]
folder_entity = context["folder"]
product_entity = context["product"]
version_entity = context["version"]
folders_by_id[folder_entity["id"]] = folder_entity
products_by_id[product_entity["id"]] = product_entity
versions_by_id[version_entity["id"]] = version_entity
for context in repre_context_by_id.values():
repre_entity = context["representation"]
representations_by_id[repre_entity["id"]] = repre_entity
# Mapping has to be for all child entities which is available for
# representations only if version is selected
representation_ids_by_version_id = {}
if entity_type == "version":
representation_ids_by_version_id = {
version_id: set()
for version_id in versions_by_id
}
for context in repre_context_by_id.values():
repre_entity = context["representation"]
v_id = repre_entity["versionId"]
representation_ids_by_version_id[v_id].add(repre_entity["id"])
return SelectionEntitiesCache(
project_name,
project_entity=project_entity,
folders_by_id=folders_by_id,
products_by_id=products_by_id,
versions_by_id=versions_by_id,
representations_by_id=representations_by_id,
representation_ids_by_version_id=representation_ids_by_version_id,
)
def _trigger_version_loader(
self,
loader,
@ -634,12 +892,12 @@ class LoaderActionsModel:
project_name, version_ids=version_ids
))
product_ids = {v["productId"] for v in version_entities}
product_entities = ayon_api.get_products(
project_name, product_ids=product_ids
product_entities = self._get_products(
project_name, product_ids
)
product_entities_by_id = {p["id"]: p for p in product_entities}
folder_ids = {p["folderId"] for p in product_entities_by_id.values()}
folder_entities = ayon_api.get_folders(
folder_entities = self._get_folders(
project_name, folder_ids=folder_ids
)
folder_entities_by_id = {f["id"]: f for f in folder_entities}

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import collections
from typing import Any
from ayon_api import (
get_representations,
@ -246,26 +247,32 @@ class SiteSyncModel:
output[repre_id] = repre_cache.get_data()
return output
def get_sitesync_action_items(self, project_name, representation_ids):
def get_sitesync_action_items(
self, project_name, entity_ids, entity_type
):
"""
Args:
project_name (str): Project name.
representation_ids (Iterable[str]): Representation ids.
entity_ids (set[str]): Selected entity ids.
entity_type (str): Selected entity type.
Returns:
list[ActionItem]: Actions that can be shown in loader.
"""
if entity_type != "representation":
return []
if not self.is_sitesync_enabled(project_name):
return []
repres_status = self.get_representations_sync_status(
project_name, representation_ids
project_name, entity_ids
)
repre_ids_per_identifier = collections.defaultdict(set)
for repre_id in representation_ids:
for repre_id in entity_ids:
repre_status = repres_status[repre_id]
local_status, remote_status = repre_status
@ -293,36 +300,32 @@ class SiteSyncModel:
return action_items
def is_sitesync_action(self, identifier):
def is_sitesync_action(self, identifier: str) -> bool:
"""Should be `identifier` handled by SiteSync.
Args:
identifier (str): Action identifier.
identifier (str): Plugin identifier.
Returns:
bool: Should action be handled by SiteSync.
"""
return identifier in {
UPLOAD_IDENTIFIER,
DOWNLOAD_IDENTIFIER,
REMOVE_IDENTIFIER,
}
"""
return identifier == "sitesync.loader.action"
def trigger_action_item(
self,
identifier,
project_name,
representation_ids
project_name: str,
data: dict[str, Any],
):
"""Resets status for site_name or remove local files.
Args:
identifier (str): Action identifier.
project_name (str): Project name.
representation_ids (Iterable[str]): Representation ids.
"""
data (dict[str, Any]): Action item data.
"""
representation_ids = data["representation_ids"]
action_identifier = data["action_identifier"]
active_site = self.get_active_site(project_name)
remote_site = self.get_remote_site(project_name)
@ -346,17 +349,17 @@ class SiteSyncModel:
for repre_id in representation_ids:
repre_entity = repre_entities_by_id.get(repre_id)
product_type = product_type_by_repre_id[repre_id]
if identifier == DOWNLOAD_IDENTIFIER:
if action_identifier == DOWNLOAD_IDENTIFIER:
self._add_site(
project_name, repre_entity, active_site, product_type
)
elif identifier == UPLOAD_IDENTIFIER:
elif action_identifier == UPLOAD_IDENTIFIER:
self._add_site(
project_name, repre_entity, remote_site, product_type
)
elif identifier == REMOVE_IDENTIFIER:
elif action_identifier == REMOVE_IDENTIFIER:
self._sitesync_addon.remove_site(
project_name,
repre_id,
@ -476,27 +479,27 @@ class SiteSyncModel:
self,
project_name,
representation_ids,
identifier,
action_identifier,
label,
tooltip,
icon_name
):
return ActionItem(
identifier,
label,
"sitesync.loader.action",
label=label,
group_label=None,
icon={
"type": "awesome-font",
"name": icon_name,
"color": "#999999"
},
tooltip=tooltip,
options={},
order=1,
project_name=project_name,
folder_ids=[],
product_ids=[],
version_ids=[],
representation_ids=representation_ids,
data={
"representation_ids": representation_ids,
"action_identifier": action_identifier,
},
options=None,
)
def _add_site(self, project_name, repre_entity, site_name, product_type):

View file

@ -1,6 +1,7 @@
import uuid
from typing import Optional, Any
from qtpy import QtWidgets, QtGui
from qtpy import QtWidgets, QtGui, QtCore
import qtawesome
from ayon_core.lib.attribute_definitions import AbstractAttrDef
@ -11,9 +12,29 @@ from ayon_core.tools.utils.widgets import (
OptionDialog,
)
from ayon_core.tools.utils import get_qt_icon
from ayon_core.tools.loader.abstract import ActionItem
def show_actions_menu(action_items, global_point, one_item_selected, parent):
def _actions_sorter(item: tuple[ActionItem, str, str]):
"""Sort the Loaders by their order and then their name.
Returns:
tuple[int, str]: Sort keys.
"""
action_item, group_label, label = item
if group_label is None:
group_label = label
label = ""
return action_item.order, group_label, label
def show_actions_menu(
action_items: list[ActionItem],
global_point: QtCore.QPoint,
one_item_selected: bool,
parent: QtWidgets.QWidget,
) -> tuple[Optional[ActionItem], Optional[dict[str, Any]]]:
selected_action_item = None
selected_options = None
@ -26,8 +47,16 @@ def show_actions_menu(action_items, global_point, one_item_selected, parent):
menu = OptionalMenu(parent)
action_items_by_id = {}
action_items_with_labels = []
for action_item in action_items:
action_items_with_labels.append(
(action_item, action_item.group_label, action_item.label)
)
group_menu_by_label = {}
action_items_by_id = {}
for item in sorted(action_items_with_labels, key=_actions_sorter):
action_item, _, _ = item
item_id = uuid.uuid4().hex
action_items_by_id[item_id] = action_item
item_options = action_item.options
@ -50,7 +79,18 @@ def show_actions_menu(action_items, global_point, one_item_selected, parent):
action.setData(item_id)
menu.addAction(action)
group_label = action_item.group_label
if group_label:
group_menu = group_menu_by_label.get(group_label)
if group_menu is None:
group_menu = OptionalMenu(group_label, menu)
if icon is not None:
group_menu.setIcon(icon)
menu.addMenu(group_menu)
group_menu_by_label[group_label] = group_menu
group_menu.addAction(action)
else:
menu.addAction(action)
action = menu.exec_(global_point)
if action is not None:

View file

@ -420,8 +420,9 @@ class ProductsWidget(QtWidgets.QWidget):
if version_id is not None:
version_ids.add(version_id)
action_items = self._controller.get_versions_action_items(
project_name, version_ids)
action_items = self._controller.get_action_items(
project_name, version_ids, "version"
)
# Prepare global point where to show the menu
global_point = self._products_view.mapToGlobal(point)
@ -437,11 +438,13 @@ class ProductsWidget(QtWidgets.QWidget):
return
self._controller.trigger_action_item(
action_item.identifier,
options,
action_item.project_name,
version_ids=action_item.version_ids,
representation_ids=action_item.representation_ids,
identifier=action_item.identifier,
project_name=project_name,
selected_ids=version_ids,
selected_entity_type="version",
data=action_item.data,
options=options,
form_values={},
)
def _on_selection_change(self):

View file

@ -384,8 +384,8 @@ class RepresentationsWidget(QtWidgets.QWidget):
def _on_context_menu(self, point):
repre_ids = self._get_selected_repre_ids()
action_items = self._controller.get_representations_action_items(
self._selected_project_name, repre_ids
action_items = self._controller.get_action_items(
self._selected_project_name, repre_ids, "representation"
)
global_point = self._repre_view.mapToGlobal(point)
result = show_actions_menu(
@ -399,9 +399,11 @@ class RepresentationsWidget(QtWidgets.QWidget):
return
self._controller.trigger_action_item(
action_item.identifier,
options,
action_item.project_name,
version_ids=action_item.version_ids,
representation_ids=action_item.representation_ids,
identifier=action_item.identifier,
project_name=self._selected_project_name,
selected_ids=repre_ids,
selected_entity_type="representation",
data=action_item.data,
options=options,
form_values={},
)

View file

@ -1,18 +1,24 @@
from __future__ import annotations
from typing import Optional
from qtpy import QtWidgets, QtCore, QtGui
from ayon_core.resources import get_ayon_icon_filepath
from ayon_core.style import load_stylesheet
from ayon_core.pipeline.actions import LoaderActionResult
from ayon_core.tools.utils import (
MessageOverlayObject,
ErrorMessageBox,
ThumbnailPainterWidget,
RefreshButton,
GoToCurrentButton,
ProjectsCombobox,
get_qt_icon,
FoldersFiltersWidget,
)
from ayon_core.tools.attribute_defs import AttributeDefinitionsDialog
from ayon_core.tools.utils.lib import center_window
from ayon_core.tools.utils import ProjectsCombobox
from ayon_core.tools.common_models import StatusItem
from ayon_core.tools.loader.abstract import ProductTypeItem
from ayon_core.tools.loader.control import LoaderController
@ -141,6 +147,8 @@ class LoaderWindow(QtWidgets.QWidget):
if controller is None:
controller = LoaderController()
overlay_object = MessageOverlayObject(self)
main_splitter = QtWidgets.QSplitter(self)
context_splitter = QtWidgets.QSplitter(main_splitter)
@ -296,6 +304,12 @@ class LoaderWindow(QtWidgets.QWidget):
"controller.reset.finished",
self._on_controller_reset_finish,
)
controller.register_event_callback(
"loader.action.finished",
self._on_loader_action_finished,
)
self._overlay_object = overlay_object
self._group_dialog = ProductGroupDialog(controller, self)
@ -408,6 +422,20 @@ class LoaderWindow(QtWidgets.QWidget):
if self._reset_on_show:
self.refresh()
def _show_toast_message(
self,
message: str,
success: bool = True,
message_id: Optional[str] = None,
):
message_type = None
if not success:
message_type = "error"
self._overlay_object.add_message(
message, message_type, message_id=message_id
)
def _show_group_dialog(self):
project_name = self._projects_combobox.get_selected_project_name()
if not project_name:
@ -508,6 +536,77 @@ class LoaderWindow(QtWidgets.QWidget):
box = LoadErrorMessageBox(error_info, self)
box.show()
def _on_loader_action_finished(self, event):
crashed = event["crashed"]
if crashed:
self._show_toast_message(
"Action failed",
success=False,
)
return
result: Optional[LoaderActionResult] = event["result"]
if result is None:
return
if result.message:
self._show_toast_message(
result.message, result.success
)
if result.form is None:
return
form = result.form
dialog = AttributeDefinitionsDialog(
form.fields,
title=form.title,
parent=self,
)
if result.form_values:
dialog.set_values(result.form_values)
submit_label = form.submit_label
submit_icon = form.submit_icon
cancel_label = form.cancel_label
cancel_icon = form.cancel_icon
if submit_icon:
submit_icon = get_qt_icon(submit_icon)
if cancel_icon:
cancel_icon = get_qt_icon(cancel_icon)
if submit_label:
dialog.set_submit_label(submit_label)
else:
dialog.set_submit_visible(False)
if submit_icon:
dialog.set_submit_icon(submit_icon)
if cancel_label:
dialog.set_cancel_label(cancel_label)
else:
dialog.set_cancel_visible(False)
if cancel_icon:
dialog.set_cancel_icon(cancel_icon)
dialog.setMinimumSize(300, 140)
result = dialog.exec_()
if result != QtWidgets.QDialog.Accepted:
return
form_values = dialog.get_values()
self._controller.trigger_action_item(
identifier=event["identifier"],
project_name=event["project_name"],
selected_ids=event["selected_ids"],
selected_entity_type=event["selected_entity_type"],
options={},
data=event["data"],
form_values=form_values,
)
def _on_project_selection_changed(self, event):
self._selected_project_name = event["project_name"]
self._update_filters()

View file

@ -865,24 +865,26 @@ class OptionalMenu(QtWidgets.QMenu):
def mouseReleaseEvent(self, event):
"""Emit option clicked signal if mouse released on it"""
active = self.actionAt(event.pos())
if active and active.use_option:
if isinstance(active, OptionalAction) and active.use_option:
option = active.widget.option
if option.is_hovered(event.globalPos()):
option.clicked.emit()
super(OptionalMenu, self).mouseReleaseEvent(event)
super().mouseReleaseEvent(event)
def mouseMoveEvent(self, event):
"""Add highlight to active action"""
active = self.actionAt(event.pos())
for action in self.actions():
action.set_highlight(action is active, event.globalPos())
super(OptionalMenu, self).mouseMoveEvent(event)
if isinstance(action, OptionalAction):
action.set_highlight(action is active, event.globalPos())
super().mouseMoveEvent(event)
def leaveEvent(self, event):
"""Remove highlight from all actions"""
for action in self.actions():
action.set_highlight(False)
super(OptionalMenu, self).leaveEvent(event)
if isinstance(action, OptionalAction):
action.set_highlight(False)
super().leaveEvent(event)
class OptionalAction(QtWidgets.QWidgetAction):
@ -894,7 +896,7 @@ class OptionalAction(QtWidgets.QWidgetAction):
"""
def __init__(self, label, icon, use_option, parent):
super(OptionalAction, self).__init__(parent)
super().__init__(parent)
self.label = label
self.icon = icon
self.use_option = use_option
@ -955,7 +957,7 @@ class OptionalActionWidget(QtWidgets.QWidget):
"""Main widget class for `OptionalAction`"""
def __init__(self, label, parent=None):
super(OptionalActionWidget, self).__init__(parent)
super().__init__(parent)
body_widget = QtWidgets.QWidget(self)
body_widget.setObjectName("OptionalActionBody")