Workfiles tool: Refactor workfiles tool (for AYON) (#5550)

* ayon workfiles tool initial commit

* separated models into smaller files

* workfile can be listed and opened

* added browse logic

* added TODO for helper functions

* modified abstract controller

* implemented required methods

* base of save dialog

* added project settings to controller

* set context of side panel on init

* implemented save as dialog

* cleanup expected selection

* unify controller variable name

* base of published workfiles

* working published workfile copy

* added more missing features from workfiles tool

* Changed size policy of buttons to fill space vertically

* added overlay messages

* moved objects to abstraction

* moved 'window.py' to widgets

* small modifications in widgets

* get_workfile_info returns object

* filled docstrings in abstractions

* finishing touches

* backwards compatible work with host

* close window on successfull open

* remove indentation completelly

* added style for overlay label

* added handling of invalid host in controller

* added overlay with message if host is not valid

* added missing feature of disabled save

* use ayon_workfiles in ayon mode

* cleanup

* hound fixes

* use asset doc for 'change_current_context'

* added duplication action

* removed unused attributes and methods

* refresh workarea view on save as finished

* support host integrations without 'HostBase'

* fix 'filepath' fill

* reset item cache on save

* do not handle filepath in prepare workfile

* rename '_create_workfile_doc' > '_create_workfile_info_entity'

* fill comment before formatting

* fix column count by not calling 'clear'

* more explicit name of method

* use 'setHeaderData' to define header labels

* mimic changes from workarea widget in published widget
This commit is contained in:
Jakub Trllo 2023-09-08 13:45:00 +02:00 committed by GitHub
parent db05543e30
commit 076d16a50d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 5610 additions and 8 deletions

View file

@ -1427,6 +1427,10 @@ CreateNextPageOverlay {
background: rgba(0, 0, 0, 127);
}
#OverlayFrameLabel {
font-size: 15pt;
}
#BreadcrumbsPathInput {
padding: 2px;
font-size: 9pt;

View file

@ -0,0 +1,984 @@
import os
from abc import ABCMeta, abstractmethod
import six
from openpype.style import get_default_entity_icon_color
class WorkfileInfo:
"""Information about workarea file with possible additional from database.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
filepath (str): Filepath.
filesize (int): File size.
creation_time (int): Creation time (timestamp).
modification_time (int): Modification time (timestamp).
note (str): Note.
"""
def __init__(
self,
folder_id,
task_id,
filepath,
filesize,
creation_time,
modification_time,
note,
):
self.folder_id = folder_id
self.task_id = task_id
self.filepath = filepath
self.filesize = filesize
self.creation_time = creation_time
self.modification_time = modification_time
self.note = note
def to_data(self):
"""Converts WorkfileInfo item to data.
Returns:
dict[str, Any]: Folder item data.
"""
return {
"folder_id": self.folder_id,
"task_id": self.task_id,
"filepath": self.filepath,
"filesize": self.filesize,
"creation_time": self.creation_time,
"modification_time": self.modification_time,
"note": self.note,
}
@classmethod
def from_data(cls, data):
"""Re-creates WorkfileInfo item from data.
Args:
data (dict[str, Any]): Workfile info item data.
Returns:
WorkfileInfo: Workfile info item.
"""
return cls(**data)
class FolderItem:
"""Item representing folder entity on a server.
Folder can be a child of another folder or a project.
Args:
entity_id (str): Folder id.
parent_id (Union[str, None]): Parent folder id. If 'None' then project
is parent.
name (str): Name of folder.
label (str): Folder label.
icon_name (str): Name of icon from font awesome.
icon_color (str): Hex color string that will be used for icon.
"""
def __init__(
self, entity_id, parent_id, name, label, icon_name, icon_color
):
self.entity_id = entity_id
self.parent_id = parent_id
self.name = name
self.icon_name = icon_name or "fa.folder"
self.icon_color = icon_color or get_default_entity_icon_color()
self.label = label or name
def to_data(self):
"""Converts folder item to data.
Returns:
dict[str, Any]: Folder item data.
"""
return {
"entity_id": self.entity_id,
"parent_id": self.parent_id,
"name": self.name,
"label": self.label,
"icon_name": self.icon_name,
"icon_color": self.icon_color,
}
@classmethod
def from_data(cls, data):
"""Re-creates folder item from data.
Args:
data (dict[str, Any]): Folder item data.
Returns:
FolderItem: Folder item.
"""
return cls(**data)
class TaskItem:
"""Task item representing task entity on a server.
Task is child of a folder.
Task item has label that is used for display in UI. The label is by
default using task name and type.
Args:
task_id (str): Task id.
name (str): Name of task.
task_type (str): Type of task.
parent_id (str): Parent folder id.
icon_name (str): Name of icon from font awesome.
icon_color (str): Hex color string that will be used for icon.
"""
def __init__(
self, task_id, name, task_type, parent_id, icon_name, icon_color
):
self.task_id = task_id
self.name = name
self.task_type = task_type
self.parent_id = parent_id
self.icon_name = icon_name or "fa.male"
self.icon_color = icon_color or get_default_entity_icon_color()
self._label = None
@property
def id(self):
"""Alias for task_id.
Returns:
str: Task id.
"""
return self.task_id
@property
def label(self):
"""Label of task item for UI.
Returns:
str: Label of task item.
"""
if self._label is None:
self._label = "{} ({})".format(self.name, self.task_type)
return self._label
def to_data(self):
"""Converts task item to data.
Returns:
dict[str, Any]: Task item data.
"""
return {
"task_id": self.task_id,
"name": self.name,
"parent_id": self.parent_id,
"task_type": self.task_type,
"icon_name": self.icon_name,
"icon_color": self.icon_color,
}
@classmethod
def from_data(cls, data):
"""Re-create task item from data.
Args:
data (dict[str, Any]): Task item data.
Returns:
TaskItem: Task item.
"""
return cls(**data)
class FileItem:
"""File item that represents a file.
Can be used for both Workarea and Published workfile. Workarea file
will always exist on disk which is not the case for Published workfile.
Args:
dirpath (str): Directory path of file.
filename (str): Filename.
modified (float): Modified timestamp.
representation_id (Optional[str]): Representation id of published
workfile.
filepath (Optional[str]): Prepared filepath.
exists (Optional[bool]): If file exists on disk.
"""
def __init__(
self,
dirpath,
filename,
modified,
representation_id=None,
filepath=None,
exists=None
):
self.filename = filename
self.dirpath = dirpath
self.modified = modified
self.representation_id = representation_id
self._filepath = filepath
self._exists = exists
@property
def filepath(self):
"""Filepath of file.
Returns:
str: Full path to a file.
"""
if self._filepath is None:
self._filepath = os.path.join(self.dirpath, self.filename)
return self._filepath
@property
def exists(self):
"""File is available.
Returns:
bool: If file exists on disk.
"""
if self._exists is None:
self._exists = os.path.exists(self.filepath)
return self._exists
def to_data(self):
"""Converts file item to data.
Returns:
dict[str, Any]: File item data.
"""
return {
"filename": self.filename,
"dirpath": self.dirpath,
"modified": self.modified,
"representation_id": self.representation_id,
"filepath": self.filepath,
"exists": self.exists,
}
@classmethod
def from_data(cls, data):
"""Re-creates file item from data.
Args:
data (dict[str, Any]): File item data.
Returns:
FileItem: File item.
"""
required_keys = {
"filename",
"dirpath",
"modified",
"representation_id"
}
missing_keys = required_keys - set(data.keys())
if missing_keys:
raise KeyError("Missing keys: {}".format(missing_keys))
return cls(**{
key: data[key]
for key in required_keys
})
class WorkareaFilepathResult:
"""Result of workarea file formatting.
Args:
root (str): Root path of workarea.
filename (str): Filename.
exists (bool): True if file exists.
filepath (str): Filepath. If not provided it will be constructed
from root and filename.
"""
def __init__(self, root, filename, exists, filepath=None):
if not filepath and root and filename:
filepath = os.path.join(root, filename)
self.root = root
self.filename = filename
self.exists = exists
self.filepath = filepath
@six.add_metaclass(ABCMeta)
class AbstractWorkfilesCommon(object):
@abstractmethod
def is_host_valid(self):
"""Host is valid for workfiles tool work.
Returns:
bool: True if host is valid.
"""
pass
@abstractmethod
def get_workfile_extensions(self):
"""Get possible workfile extensions.
Defined by host implementation.
Returns:
Iterable[str]: List of extensions.
"""
pass
@abstractmethod
def is_save_enabled(self):
"""Is workfile save enabled.
Returns:
bool: True if save is enabled.
"""
pass
@abstractmethod
def set_save_enabled(self, enabled):
"""Enable or disabled workfile save.
Args:
enabled (bool): Enable save workfile when True.
"""
pass
class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
# Current context
@abstractmethod
def get_host_name(self):
"""Name of host.
Returns:
str: Name of host.
"""
pass
@abstractmethod
def get_current_project_name(self):
"""Project name from current context of host.
Returns:
str: Name of project.
"""
pass
@abstractmethod
def get_current_folder_id(self):
"""Folder id from current context of host.
Returns:
Union[str, None]: Folder id or None if host does not have
any context.
"""
pass
@abstractmethod
def get_current_task_name(self):
"""Task name from current context of host.
Returns:
Union[str, None]: Task name or None if host does not have
any context.
"""
pass
@abstractmethod
def get_current_workfile(self):
"""Current workfile from current context of host.
Returns:
Union[str, None]: Path to workfile or None if host does
not have opened specific file.
"""
pass
@property
@abstractmethod
def project_anatomy(self):
"""Project anatomy for current project.
Returns:
Anatomy: Project anatomy.
"""
pass
@property
@abstractmethod
def project_settings(self):
"""Project settings for current project.
Returns:
dict[str, Any]: Project settings.
"""
pass
@abstractmethod
def get_folder_entity(self, folder_id):
"""Get folder entity by id.
Args:
folder_id (str): Folder id.
Returns:
dict[str, Any]: Folder entity data.
"""
pass
@abstractmethod
def get_task_entity(self, task_id):
"""Get task entity by id.
Args:
task_id (str): Task id.
Returns:
dict[str, Any]: Task entity data.
"""
pass
def emit_event(self, topic, data=None, source=None):
"""Emit event.
Args:
topic (str): Event topic used for callbacks filtering.
data (Optional[dict[str, Any]]): Event data.
source (Optional[str]): Event source.
"""
pass
class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
"""UI controller abstraction that is used for workfiles tool frontend.
Abstraction to provide data for UI and to handle UI events.
Provide access to abstract backend data, like folders and tasks. Cares
about handling of selection, keep information about current UI selection
and have ability to tell what selection should UI show.
Selection is separated into 2 parts, first is what UI elements tell
about selection, and second is what UI should show as selected.
"""
@abstractmethod
def register_event_callback(self, topic, callback):
"""Register event callback.
Listen for events with given topic.
Args:
topic (str): Name of topic.
callback (Callable): Callback that will be called when event
is triggered.
"""
pass
# Host information
@abstractmethod
def get_workfile_extensions(self):
"""Each host can define extensions that can be used for workfile.
Returns:
List[str]: File extensions that can be used as workfile for
current host.
"""
pass
# Selection information
@abstractmethod
def get_selected_folder_id(self):
"""Currently selected folder id.
Returns:
Union[str, None]: Folder id or None if no folder is selected.
"""
pass
@abstractmethod
def set_selected_folder(self, folder_id):
"""Change selected folder.
This deselects currently selected task.
Args:
folder_id (Union[str, None]): Folder id or None if no folder
is selected.
"""
pass
@abstractmethod
def get_selected_task_id(self):
"""Currently selected task id.
Returns:
Union[str, None]: Task id or None if no folder is selected.
"""
pass
@abstractmethod
def get_selected_task_name(self):
"""Currently selected task name.
Returns:
Union[str, None]: Task name or None if no folder is selected.
"""
pass
@abstractmethod
def set_selected_task(self, folder_id, task_id, task_name):
"""Change selected task.
Args:
folder_id (Union[str, None]): Folder id or None if no folder
is selected.
task_id (Union[str, None]): Task id or None if no task
is selected.
task_name (Union[str, None]): Task name or None if no task
is selected.
"""
pass
@abstractmethod
def get_selected_workfile_path(self):
"""Currently selected workarea workile.
Returns:
Union[str, None]: Selected workfile path.
"""
pass
@abstractmethod
def set_selected_workfile_path(self, path):
"""Change selected workfile path.
Args:
path (Union[str, None]): Selected workfile path.
"""
pass
@abstractmethod
def get_selected_representation_id(self):
"""Currently selected workfile representation id.
Returns:
Union[str, None]: Representation id or None if no representation
is selected.
"""
pass
@abstractmethod
def set_selected_representation_id(self, representation_id):
"""Change selected representation.
Args:
representation_id (Union[str, None]): Selected workfile
representation id.
"""
pass
def get_selected_context(self):
"""Obtain selected context.
Returns:
dict[str, Union[str, None]]: Selected context.
"""
return {
"folder_id": self.get_selected_folder_id(),
"task_id": self.get_selected_task_id(),
"task_name": self.get_selected_task_name(),
"workfile_path": self.get_selected_workfile_path(),
"representation_id": self.get_selected_representation_id(),
}
# Expected selection
# - expected selection is used to restore selection after refresh
# or when current context should be used
@abstractmethod
def set_expected_selection(
self,
folder_id,
task_name,
workfile_name=None,
representation_id=None
):
"""Define what should be selected in UI.
Expected selection provide a way to define/change selection of
sequential UI elements. For example, if folder and task should be
selected a task element should wait until folder element has selected
folder.
Triggers 'expected_selection.changed' event.
Args:
folder_id (str): Folder id.
task_name (str): Task name.
workfile_name (Optional[str]): Workfile name. Used for workarea
files UI element.
representation_id (Optional[str]): Representation id. Used for
published filed UI element.
"""
pass
@abstractmethod
def get_expected_selection_data(self):
"""Data of expected selection.
TODOs:
Return defined object instead of dict.
Returns:
dict[str, Any]: Expected selection data.
"""
pass
@abstractmethod
def expected_folder_selected(self, folder_id):
"""Expected folder was selected in UI.
Args:
folder_id (str): Folder id which was selected.
"""
pass
@abstractmethod
def expected_task_selected(self, folder_id, task_name):
"""Expected task was selected in UI.
Args:
folder_id (str): Folder id under which task is.
task_name (str): Task name which was selected.
"""
pass
@abstractmethod
def expected_representation_selected(self, representation_id):
"""Expected representation was selected in UI.
Args:
representation_id (str): Representation id which was selected.
"""
pass
@abstractmethod
def expected_workfile_selected(self, workfile_path):
"""Expected workfile was selected in UI.
Args:
workfile_path (str): Workfile path which was selected.
"""
pass
@abstractmethod
def go_to_current_context(self):
"""Set expected selection to current context."""
pass
# Model functions
@abstractmethod
def get_folder_items(self, sender):
"""Folder items to visualize project hierarchy.
This function may trigger events 'folders.refresh.started' and
'folders.refresh.finished' which will contain 'sender' value in data.
That may help to avoid re-refresh of folder items in UI elements.
Args:
sender (str): Who requested folder items.
Returns:
list[FolderItem]: Minimum possible information needed
for visualisation of folder hierarchy.
"""
pass
@abstractmethod
def get_task_items(self, folder_id, sender):
"""Task items.
This function may trigger events 'tasks.refresh.started' and
'tasks.refresh.finished' which will contain 'sender' value in data.
That may help to avoid re-refresh of task items in UI elements.
Args:
folder_id (str): Folder ID for which are tasks requested.
sender (str): Who requested folder items.
Returns:
list[TaskItem]: Minimum possible information needed
for visualisation of tasks.
"""
pass
@abstractmethod
def has_unsaved_changes(self):
"""Has host unsaved change in currently running session.
Returns:
bool: Has unsaved changes.
"""
pass
@abstractmethod
def get_workarea_dir_by_context(self, folder_id, task_id):
"""Get workarea directory by context.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
Returns:
str: Workarea directory.
"""
pass
@abstractmethod
def get_workarea_file_items(self, folder_id, task_id):
"""Get workarea file items.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
Returns:
list[FileItem]: List of workarea file items.
"""
pass
@abstractmethod
def get_workarea_save_as_data(self, folder_id, task_id):
"""Prepare data for Save As operation.
Todos:
Return defined object instead of dict.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
Returns:
dict[str, Any]: Data for Save As operation.
"""
pass
@abstractmethod
def fill_workarea_filepath(
self,
folder_id,
task_id,
extension,
use_last_version,
version,
comment,
):
"""Calculate workfile path for passed context.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
extension (str): File extension.
use_last_version (bool): Use last version.
version (int): Version used if 'use_last_version' if 'False'.
comment (str): User's comment (subversion).
Returns:
WorkareaFilepathResult: Result of the operation.
"""
pass
@abstractmethod
def get_published_file_items(self, folder_id, task_id):
"""Get published file items.
Args:
folder_id (str): Folder id.
task_id (Union[str, None]): Task id.
Returns:
list[FileItem]: List of published file items.
"""
pass
@abstractmethod
def get_workfile_info(self, folder_id, task_id, filepath):
"""Workfile info from database.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
filepath (str): Workfile path.
Returns:
Union[WorkfileInfo, None]: Workfile info or None if was passed
invalid context.
"""
pass
@abstractmethod
def save_workfile_info(self, folder_id, task_id, filepath, note):
"""Save workfile info to database.
At this moment the only information which can be saved about
workfile is 'note'.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
filepath (str): Workfile path.
note (str): Note.
"""
pass
# General commands
@abstractmethod
def refresh(self):
"""Refresh everything, models, ui etc.
Triggers 'controller.refresh.started' event at the beginning and
'controller.refresh.finished' at the end.
"""
pass
# Controller actions
@abstractmethod
def open_workfile(self, filepath):
"""Open a workfile.
Args:
filepath (str): Workfile path.
"""
pass
@abstractmethod
def save_current_workfile(self):
"""Save state of current workfile."""
pass
@abstractmethod
def save_as_workfile(
self,
folder_id,
task_id,
workdir,
filename,
template_key,
):
"""Save current state of workfile to workarea.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
workdir (str): Workarea directory.
filename (str): Workarea filename.
template_key (str): Template key used to get the workdir
and filename.
"""
pass
@abstractmethod
def copy_workfile_representation(
self,
representation_id,
representation_filepath,
folder_id,
task_id,
workdir,
filename,
template_key,
):
"""Action to copy published workfile representation to workarea.
Triggers 'copy_representation.started' event on start and
'copy_representation.finished' event with '{"failed": bool}'.
Args:
representation_id (str): Representation id.
representation_filepath (str): Path to representation file.
folder_id (str): Folder id.
task_id (str): Task id.
workdir (str): Workarea directory.
filename (str): Workarea filename.
template_key (str): Template key.
"""
pass
@abstractmethod
def duplicate_workfile(self, src_filepath, workdir, filename):
"""Duplicate workfile.
Workfiles is not opened when done.
Args:
src_filepath (str): Source workfile path.
workdir (str): Destination workdir.
filename (str): Destination filename.
"""
pass

View file

@ -0,0 +1,642 @@
import os
import shutil
import ayon_api
from openpype.client import get_asset_by_id
from openpype.host import IWorkfileHost
from openpype.lib import Logger, emit_event
from openpype.lib.events import QueuedEventSystem
from openpype.settings import get_project_settings
from openpype.pipeline import Anatomy, registered_host
from openpype.pipeline.context_tools import (
change_current_context,
get_current_host_name,
get_global_context,
)
from openpype.pipeline.workfile import create_workdir_extra_folders
from .abstract import (
AbstractWorkfilesFrontend,
AbstractWorkfilesBackend,
)
from .models import SelectionModel, EntitiesModel, WorkfilesModel
class ExpectedSelection:
def __init__(self):
self._folder_id = None
self._task_name = None
self._workfile_name = None
self._representation_id = None
self._folder_selected = True
self._task_selected = True
self._workfile_name_selected = True
self._representation_id_selected = True
def set_expected_selection(
self,
folder_id,
task_name,
workfile_name=None,
representation_id=None
):
self._folder_id = folder_id
self._task_name = task_name
self._workfile_name = workfile_name
self._representation_id = representation_id
self._folder_selected = False
self._task_selected = False
self._workfile_name_selected = workfile_name is None
self._representation_id_selected = representation_id is None
def get_expected_selection_data(self):
return {
"folder_id": self._folder_id,
"task_name": self._task_name,
"workfile_name": self._workfile_name,
"representation_id": self._representation_id,
"folder_selected": self._folder_selected,
"task_selected": self._task_selected,
"workfile_name_selected": self._workfile_name_selected,
"representation_id_selected": self._representation_id_selected,
}
def is_expected_folder_selected(self, folder_id):
return folder_id == self._folder_id and self._folder_selected
def is_expected_task_selected(self, folder_id, task_name):
if not self.is_expected_folder_selected(folder_id):
return False
return task_name == self._task_name and self._task_selected
def expected_folder_selected(self, folder_id):
if folder_id != self._folder_id:
return False
self._folder_selected = True
return True
def expected_task_selected(self, folder_id, task_name):
if not self.is_expected_folder_selected(folder_id):
return False
if task_name != self._task_name:
return False
self._task_selected = True
return True
def expected_workfile_selected(self, folder_id, task_name, workfile_name):
if not self.is_expected_task_selected(folder_id, task_name):
return False
if workfile_name != self._workfile_name:
return False
self._workfile_name_selected = True
return True
def expected_representation_selected(
self, folder_id, task_name, representation_id
):
if not self.is_expected_task_selected(folder_id, task_name):
return False
if representation_id != self._representation_id:
return False
self._representation_id_selected = True
return True
class BaseWorkfileController(
AbstractWorkfilesFrontend, AbstractWorkfilesBackend
):
def __init__(self, host=None):
if host is None:
host = registered_host()
host_is_valid = False
if host is not None:
missing_methods = (
IWorkfileHost.get_missing_workfile_methods(host)
)
host_is_valid = len(missing_methods) == 0
self._host = host
self._host_is_valid = host_is_valid
self._project_anatomy = None
self._project_settings = None
self._event_system = None
self._log = None
self._current_project_name = None
self._current_folder_name = None
self._current_folder_id = None
self._current_task_name = None
self._save_is_enabled = True
# Expected selected folder and task
self._expected_selection = self._create_expected_selection_obj()
self._selection_model = self._create_selection_model()
self._entities_model = self._create_entities_model()
self._workfiles_model = self._create_workfiles_model()
@property
def log(self):
if self._log is None:
self._log = Logger.get_logger("WorkfilesUI")
return self._log
def is_host_valid(self):
return self._host_is_valid
def _create_expected_selection_obj(self):
return ExpectedSelection()
def _create_selection_model(self):
return SelectionModel(self)
def _create_entities_model(self):
return EntitiesModel(self)
def _create_workfiles_model(self):
return WorkfilesModel(self)
@property
def event_system(self):
"""Inner event system for workfiles tool controller.
Is used for communication with UI. Event system is created on demand.
Returns:
QueuedEventSystem: Event system which can trigger callbacks
for topics.
"""
if self._event_system is None:
self._event_system = QueuedEventSystem()
return self._event_system
# ----------------------------------------------------
# Implementation of methods required for backend logic
# ----------------------------------------------------
@property
def project_settings(self):
if self._project_settings is None:
self._project_settings = get_project_settings(
self.get_current_project_name())
return self._project_settings
@property
def project_anatomy(self):
if self._project_anatomy is None:
self._project_anatomy = Anatomy(self.get_current_project_name())
return self._project_anatomy
def get_folder_entity(self, folder_id):
return self._entities_model.get_folder_entity(folder_id)
def get_task_entity(self, task_id):
return self._entities_model.get_task_entity(task_id)
# ---------------------------------
# Implementation of abstract methods
# ---------------------------------
def emit_event(self, topic, data=None, source=None):
"""Use implemented event system to trigger event."""
if data is None:
data = {}
self.event_system.emit(topic, data, source)
def register_event_callback(self, topic, callback):
self.event_system.add_callback(topic, callback)
def is_save_enabled(self):
"""Is workfile save enabled.
Returns:
bool: True if save is enabled.
"""
return self._save_is_enabled
def set_save_enabled(self, enabled):
"""Enable or disabled workfile save.
Args:
enabled (bool): Enable save workfile when True.
"""
if self._save_is_enabled == enabled:
return
self._save_is_enabled = enabled
self._emit_event(
"workfile_save_enable.changed",
{"enabled": enabled}
)
# Host information
def get_workfile_extensions(self):
host = self._host
if isinstance(host, IWorkfileHost):
return host.get_workfile_extensions()
return host.file_extensions()
def has_unsaved_changes(self):
host = self._host
if isinstance(host, IWorkfileHost):
return host.workfile_has_unsaved_changes()
return host.has_unsaved_changes()
# Current context
def get_host_name(self):
host = self._host
if isinstance(host, IWorkfileHost):
return host.name
return get_current_host_name()
def _get_host_current_context(self):
if hasattr(self._host, "get_current_context"):
return self._host.get_current_context()
return get_global_context()
def get_current_project_name(self):
return self._current_project_name
def get_current_folder_id(self):
return self._current_folder_id
def get_current_task_name(self):
return self._current_task_name
def get_current_workfile(self):
host = self._host
if isinstance(host, IWorkfileHost):
return host.get_current_workfile()
return host.current_file()
# Selection information
def get_selected_folder_id(self):
return self._selection_model.get_selected_folder_id()
def set_selected_folder(self, folder_id):
self._selection_model.set_selected_folder(folder_id)
def get_selected_task_id(self):
return self._selection_model.get_selected_task_id()
def get_selected_task_name(self):
return self._selection_model.get_selected_task_name()
def set_selected_task(self, folder_id, task_id, task_name):
return self._selection_model.set_selected_task(
folder_id, task_id, task_name)
def get_selected_workfile_path(self):
return self._selection_model.get_selected_workfile_path()
def set_selected_workfile_path(self, path):
self._selection_model.set_selected_workfile_path(path)
def get_selected_representation_id(self):
return self._selection_model.get_selected_representation_id()
def set_selected_representation_id(self, representation_id):
self._selection_model.set_selected_representation_id(
representation_id)
def set_expected_selection(
self,
folder_id,
task_name,
workfile_name=None,
representation_id=None
):
self._expected_selection.set_expected_selection(
folder_id, task_name, workfile_name, representation_id
)
self._trigger_expected_selection_changed()
def expected_folder_selected(self, folder_id):
if self._expected_selection.expected_folder_selected(folder_id):
self._trigger_expected_selection_changed()
def expected_task_selected(self, folder_id, task_name):
if self._expected_selection.expected_task_selected(
folder_id, task_name
):
self._trigger_expected_selection_changed()
def expected_workfile_selected(self, folder_id, task_name, workfile_name):
if self._expected_selection.expected_workfile_selected(
folder_id, task_name, workfile_name
):
self._trigger_expected_selection_changed()
def expected_representation_selected(
self, folder_id, task_name, representation_id
):
if self._expected_selection.expected_representation_selected(
folder_id, task_name, representation_id
):
self._trigger_expected_selection_changed()
def get_expected_selection_data(self):
return self._expected_selection.get_expected_selection_data()
def go_to_current_context(self):
self.set_expected_selection(
self._current_folder_id, self._current_task_name
)
# Model functions
def get_folder_items(self, sender):
return self._entities_model.get_folder_items(sender)
def get_task_items(self, folder_id, sender):
return self._entities_model.get_tasks_items(folder_id, sender)
def get_workarea_dir_by_context(self, folder_id, task_id):
return self._workfiles_model.get_workarea_dir_by_context(
folder_id, task_id)
def get_workarea_file_items(self, folder_id, task_id):
return self._workfiles_model.get_workarea_file_items(
folder_id, task_id)
def get_workarea_save_as_data(self, folder_id, task_id):
return self._workfiles_model.get_workarea_save_as_data(
folder_id, task_id)
def fill_workarea_filepath(
self,
folder_id,
task_id,
extension,
use_last_version,
version,
comment,
):
return self._workfiles_model.fill_workarea_filepath(
folder_id,
task_id,
extension,
use_last_version,
version,
comment,
)
def get_published_file_items(self, folder_id, task_id):
task_name = None
if task_id:
task = self.get_task_entity(task_id)
task_name = task.get("name")
return self._workfiles_model.get_published_file_items(
folder_id, task_name)
def get_workfile_info(self, folder_id, task_id, filepath):
return self._workfiles_model.get_workfile_info(
folder_id, task_id, filepath
)
def save_workfile_info(self, folder_id, task_id, filepath, note):
self._workfiles_model.save_workfile_info(
folder_id, task_id, filepath, note
)
def refresh(self):
if not self._host_is_valid:
self._emit_event("controller.refresh.started")
self._emit_event("controller.refresh.finished")
return
expected_folder_id = self.get_selected_folder_id()
expected_task_name = self.get_selected_task_name()
self._emit_event("controller.refresh.started")
context = self._get_host_current_context()
project_name = context["project_name"]
folder_name = context["asset_name"]
task_name = context["task_name"]
folder_id = None
if folder_name:
folder = ayon_api.get_folder_by_name(project_name, folder_name)
if folder:
folder_id = folder["id"]
self._project_settings = None
self._project_anatomy = None
self._current_project_name = project_name
self._current_folder_name = folder_name
self._current_folder_id = folder_id
self._current_task_name = task_name
if not expected_folder_id:
expected_folder_id = folder_id
expected_task_name = task_name
self._expected_selection.set_expected_selection(
expected_folder_id, expected_task_name
)
self._entities_model.refresh()
self._emit_event("controller.refresh.finished")
# Controller actions
def open_workfile(self, filepath):
self._emit_event("open_workfile.started")
failed = False
try:
self._host_open_workfile(filepath)
except Exception:
failed = True
self.log.warning("Open of workfile failed", exc_info=True)
self._emit_event(
"open_workfile.finished",
{"failed": failed},
)
def save_current_workfile(self):
current_file = self.get_current_workfile()
self._host_save_workfile(current_file)
def save_as_workfile(
self,
folder_id,
task_id,
workdir,
filename,
template_key,
):
self._emit_event("save_as.started")
failed = False
try:
self._save_as_workfile(
folder_id,
task_id,
workdir,
filename,
template_key,
)
except Exception:
failed = True
self.log.warning("Save as failed", exc_info=True)
self._emit_event(
"save_as.finished",
{"failed": failed},
)
def copy_workfile_representation(
self,
representation_id,
representation_filepath,
folder_id,
task_id,
workdir,
filename,
template_key,
):
self._emit_event("copy_representation.started")
failed = False
try:
self._save_as_workfile(
folder_id,
task_id,
workdir,
filename,
template_key,
)
except Exception:
failed = True
self.log.warning(
"Copy of workfile representation failed", exc_info=True
)
self._emit_event(
"copy_representation.finished",
{"failed": failed},
)
def duplicate_workfile(self, src_filepath, workdir, filename):
self._emit_event("workfile_duplicate.started")
failed = False
try:
dst_filepath = os.path.join(workdir, filename)
shutil.copy(src_filepath, dst_filepath)
except Exception:
failed = True
self.log.warning("Duplication of workfile failed", exc_info=True)
self._emit_event(
"workfile_duplicate.finished",
{"failed": failed},
)
# Helper host methods that resolve 'IWorkfileHost' interface
def _host_open_workfile(self, filepath):
host = self._host
if isinstance(host, IWorkfileHost):
host.open_workfile(filepath)
else:
host.open_file(filepath)
def _host_save_workfile(self, filepath):
host = self._host
if isinstance(host, IWorkfileHost):
host.save_workfile(filepath)
else:
host.save_file(filepath)
def _emit_event(self, topic, data=None):
self.emit_event(topic, data, "controller")
# Expected selection
# - expected selection is used to restore selection after refresh
# or when current context should be used
def _trigger_expected_selection_changed(self):
self._emit_event(
"expected_selection_changed",
self._expected_selection.get_expected_selection_data(),
)
def _save_as_workfile(
self,
folder_id,
task_id,
workdir,
filename,
template_key,
src_filepath=None,
):
# Trigger before save event
project_name = self.get_current_project_name()
folder = self.get_folder_entity(folder_id)
task = self.get_task_entity(task_id)
task_name = task["name"]
# QUESTION should the data be different for 'before' and 'after'?
# NOTE keys should be OpenPype compatible
event_data = {
"project_name": project_name,
"folder_id": folder_id,
"asset_id": folder_id,
"asset_name": folder["name"],
"task_id": task_id,
"task_name": task_name,
"host_name": self.get_host_name(),
"filename": filename,
"workdir_path": workdir,
}
emit_event("workfile.save.before", event_data, source="workfiles.tool")
# Create workfiles root folder
if not os.path.exists(workdir):
self.log.debug("Initializing work directory: %s", workdir)
os.makedirs(workdir)
# Change context
if (
folder_id != self.get_current_folder_id()
or task_name != self.get_current_task_name()
):
# Use OpenPype asset-like object
asset_doc = get_asset_by_id(project_name, folder["id"])
change_current_context(
asset_doc,
task["name"],
template_key=template_key
)
# Save workfile
dst_filepath = os.path.join(workdir, filename)
if src_filepath:
shutil.copyfile(src_filepath, dst_filepath)
self._host_open_workfile(dst_filepath)
else:
self._host_save_workfile(dst_filepath)
# Create extra folders
create_workdir_extra_folders(
workdir,
self.get_host_name(),
task["taskType"],
task_name,
project_name
)
# Trigger after save events
emit_event("workfile.save.after", event_data, source="workfiles.tool")
self.refresh()

View file

@ -0,0 +1,10 @@
from .hierarchy import EntitiesModel
from .selection import SelectionModel
from .workfiles import WorkfilesModel
__all__ = (
"SelectionModel",
"EntitiesModel",
"WorkfilesModel",
)

View file

@ -0,0 +1,225 @@
"""Hierarchy model that handles folders and tasks.
The model can be extracted for common usage. In that case it will be required
to add more handling of project name changes.
"""
import time
import collections
import contextlib
import ayon_api
from openpype.tools.ayon_workfiles.abstract import (
FolderItem,
TaskItem,
)
def _get_task_items_from_tasks(tasks):
"""
Returns:
TaskItem: Task item.
"""
output = []
for task in tasks:
folder_id = task["folderId"]
output.append(TaskItem(
task["id"],
task["name"],
task["type"],
folder_id,
None,
None
))
return output
def _get_folder_item_from_hierarchy_item(item):
return FolderItem(
item["id"],
item["parentId"],
item["name"],
item["label"],
None,
None,
)
class CacheItem:
def __init__(self, lifetime=120):
self._lifetime = lifetime
self._last_update = None
self._data = None
@property
def is_valid(self):
if self._last_update is None:
return False
return (time.time() - self._last_update) < self._lifetime
def set_invalid(self, data=None):
self._last_update = None
self._data = data
def get_data(self):
return self._data
def update_data(self, data):
self._data = data
self._last_update = time.time()
class EntitiesModel(object):
event_source = "entities.model"
def __init__(self, controller):
folders_cache = CacheItem()
folders_cache.set_invalid({})
self._folders_cache = folders_cache
self._tasks_cache = {}
self._folders_by_id = {}
self._tasks_by_id = {}
self._folders_refreshing = False
self._tasks_refreshing = set()
self._controller = controller
def reset(self):
self._folders_cache.set_invalid({})
self._tasks_cache = {}
self._folders_by_id = {}
self._tasks_by_id = {}
def refresh(self):
self._refresh_folders_cache()
def get_folder_items(self, sender):
if not self._folders_cache.is_valid:
self._refresh_folders_cache(sender)
return self._folders_cache.get_data()
def get_tasks_items(self, folder_id, sender):
if not folder_id:
return []
task_cache = self._tasks_cache.get(folder_id)
if task_cache is None or not task_cache.is_valid:
self._refresh_tasks_cache(folder_id, sender)
task_cache = self._tasks_cache.get(folder_id)
return task_cache.get_data()
def get_folder_entity(self, folder_id):
if folder_id not in self._folders_by_id:
entity = None
if folder_id:
project_name = self._controller.get_current_project_name()
entity = ayon_api.get_folder_by_id(project_name, folder_id)
self._folders_by_id[folder_id] = entity
return self._folders_by_id[folder_id]
def get_task_entity(self, task_id):
if task_id not in self._tasks_by_id:
entity = None
if task_id:
project_name = self._controller.get_current_project_name()
entity = ayon_api.get_task_by_id(project_name, task_id)
self._tasks_by_id[task_id] = entity
return self._tasks_by_id[task_id]
@contextlib.contextmanager
def _folder_refresh_event_manager(self, project_name, sender):
self._folders_refreshing = True
self._controller.emit_event(
"folders.refresh.started",
{"project_name": project_name, "sender": sender},
self.event_source
)
try:
yield
finally:
self._controller.emit_event(
"folders.refresh.finished",
{"project_name": project_name, "sender": sender},
self.event_source
)
self._folders_refreshing = False
@contextlib.contextmanager
def _task_refresh_event_manager(
self, project_name, folder_id, sender
):
self._tasks_refreshing.add(folder_id)
self._controller.emit_event(
"tasks.refresh.started",
{
"project_name": project_name,
"folder_id": folder_id,
"sender": sender,
},
self.event_source
)
try:
yield
finally:
self._controller.emit_event(
"tasks.refresh.finished",
{
"project_name": project_name,
"folder_id": folder_id,
"sender": sender,
},
self.event_source
)
self._tasks_refreshing.discard(folder_id)
def _refresh_folders_cache(self, sender=None):
if self._folders_refreshing:
return
project_name = self._controller.get_current_project_name()
with self._folder_refresh_event_manager(project_name, sender):
folder_items = self._query_folders(project_name)
self._folders_cache.update_data(folder_items)
def _query_folders(self, project_name):
hierarchy = ayon_api.get_folders_hierarchy(project_name)
folder_items = {}
hierachy_queue = collections.deque(hierarchy["hierarchy"])
while hierachy_queue:
item = hierachy_queue.popleft()
folder_item = _get_folder_item_from_hierarchy_item(item)
folder_items[folder_item.entity_id] = folder_item
hierachy_queue.extend(item["children"] or [])
return folder_items
def _refresh_tasks_cache(self, folder_id, sender=None):
if folder_id in self._tasks_refreshing:
return
project_name = self._controller.get_current_project_name()
with self._task_refresh_event_manager(
project_name, folder_id, sender
):
cache_item = self._tasks_cache.get(folder_id)
if cache_item is None:
cache_item = CacheItem()
self._tasks_cache[folder_id] = cache_item
task_items = self._query_tasks(project_name, folder_id)
cache_item.update_data(task_items)
def _query_tasks(self, project_name, folder_id):
tasks = list(ayon_api.get_tasks(
project_name,
folder_ids=[folder_id],
fields={"id", "name", "label", "folderId", "type"}
))
return _get_task_items_from_tasks(tasks)

View file

@ -0,0 +1,91 @@
class SelectionModel(object):
"""Model handling selection changes.
Triggering events:
- "selection.folder.changed"
- "selection.task.changed"
- "workarea.selection.changed"
- "selection.representation.changed"
"""
event_source = "selection.model"
def __init__(self, controller):
self._controller = controller
self._folder_id = None
self._task_name = None
self._task_id = None
self._workfile_path = None
self._representation_id = None
def get_selected_folder_id(self):
return self._folder_id
def set_selected_folder(self, folder_id):
if folder_id == self._folder_id:
return
self._folder_id = folder_id
self._controller.emit_event(
"selection.folder.changed",
{"folder_id": folder_id},
self.event_source
)
def get_selected_task_name(self):
return self._task_name
def get_selected_task_id(self):
return self._task_id
def set_selected_task(self, folder_id, task_id, task_name):
if folder_id != self._folder_id:
self.set_selected_folder(folder_id)
if task_id == self._task_id:
return
self._task_name = task_name
self._task_id = task_id
self._controller.emit_event(
"selection.task.changed",
{
"folder_id": folder_id,
"task_name": task_name,
"task_id": task_id
},
self.event_source
)
def get_selected_workfile_path(self):
return self._workfile_path
def set_selected_workfile_path(self, path):
if path == self._workfile_path:
return
self._workfile_path = path
self._controller.emit_event(
"workarea.selection.changed",
{
"path": path,
"folder_id": self._folder_id,
"task_name": self._task_name,
"task_id": self._task_id,
},
self.event_source
)
def get_selected_representation_id(self):
return self._representation_id
def set_selected_representation_id(self, representation_id):
if representation_id == self._representation_id:
return
self._representation_id = representation_id
self._controller.emit_event(
"selection.representation.changed",
{"representation_id": representation_id},
self.event_source
)

View file

@ -0,0 +1,711 @@
import os
import re
import copy
import arrow
import ayon_api
from ayon_api.operations import OperationsSession
from openpype.client import get_project
from openpype.client.operations import (
prepare_workfile_info_update_data,
)
from openpype.pipeline.template_data import (
get_template_data,
)
from openpype.pipeline.workfile import (
get_workdir_with_workdir_data,
get_workfile_template_key,
get_last_workfile_with_version,
)
from openpype.pipeline.version_start import get_versioning_start
from openpype.tools.ayon_workfiles.abstract import (
WorkareaFilepathResult,
FileItem,
WorkfileInfo,
)
def get_folder_template_data(folder):
if not folder:
return {}
parts = folder["path"].split("/")
parts.pop(-1)
hierarchy = "/".join(parts)
return {
"asset": folder["name"],
"folder": {
"name": folder["name"],
"type": folder["folderType"],
"path": folder["path"],
},
"hierarchy": hierarchy,
}
def get_task_template_data(task):
if not task:
return {}
return {
"task": {
"name": task["name"],
"type": task["taskType"]
}
}
class CommentMatcher(object):
"""Use anatomy and work file data to parse comments from filenames"""
def __init__(self, extensions, file_template, data):
self.fname_regex = None
if "{comment}" not in file_template:
# Don't look for comment if template doesn't allow it
return
# Create a regex group for extensions
any_extension = "(?:{})".format(
"|".join(re.escape(ext.lstrip(".")) for ext in extensions)
)
# Use placeholders that will never be in the filename
temp_data = copy.deepcopy(data)
temp_data["comment"] = "<<comment>>"
temp_data["version"] = "<<version>>"
temp_data["ext"] = "<<ext>>"
fname_pattern = file_template.format_strict(temp_data)
fname_pattern = re.escape(fname_pattern)
# Replace comment and version with something we can match with regex
replacements = {
"<<comment>>": "(.+)",
"<<version>>": "[0-9]+",
"<<ext>>": any_extension,
}
for src, dest in replacements.items():
fname_pattern = fname_pattern.replace(re.escape(src), dest)
# Match from beginning to end of string to be safe
fname_pattern = "^{}$".format(fname_pattern)
self.fname_regex = re.compile(fname_pattern)
def parse_comment(self, filepath):
"""Parse the {comment} part from a filename"""
if not self.fname_regex:
return
fname = os.path.basename(filepath)
match = self.fname_regex.match(fname)
if match:
return match.group(1)
class WorkareaModel:
"""Workfiles model looking for workfiles in workare folder.
Workarea folder is usually task and host specific, defined by
anatomy templates. Is looking for files with extensions defined
by host integration.
"""
def __init__(self, controller):
self._controller = controller
extensions = None
if controller.is_host_valid():
extensions = controller.get_workfile_extensions()
self._extensions = extensions
self._base_data = None
self._fill_data_by_folder_id = {}
self._task_data_by_folder_id = {}
self._workdir_by_context = {}
@property
def project_name(self):
return self._controller.get_current_project_name()
def reset(self):
self._base_data = None
self._fill_data_by_folder_id = {}
self._task_data_by_folder_id = {}
def _get_base_data(self):
if self._base_data is None:
base_data = get_template_data(get_project(self.project_name))
base_data["app"] = self._controller.get_host_name()
self._base_data = base_data
return copy.deepcopy(self._base_data)
def _get_folder_data(self, folder_id):
fill_data = self._fill_data_by_folder_id.get(folder_id)
if fill_data is None:
folder = self._controller.get_folder_entity(folder_id)
fill_data = get_folder_template_data(folder)
self._fill_data_by_folder_id[folder_id] = fill_data
return copy.deepcopy(fill_data)
def _get_task_data(self, folder_id, task_id):
task_data = self._task_data_by_folder_id.setdefault(folder_id, {})
if task_id not in task_data:
task = self._controller.get_task_entity(task_id)
if task:
task_data[task_id] = get_task_template_data(task)
return copy.deepcopy(task_data[task_id])
def _prepare_fill_data(self, folder_id, task_id):
if not folder_id or not task_id:
return {}
base_data = self._get_base_data()
folder_data = self._get_folder_data(folder_id)
task_data = self._get_task_data(folder_id, task_id)
base_data.update(folder_data)
base_data.update(task_data)
return base_data
def get_workarea_dir_by_context(self, folder_id, task_id):
if not folder_id or not task_id:
return None
folder_mapping = self._workdir_by_context.setdefault(folder_id, {})
workdir = folder_mapping.get(task_id)
if workdir is not None:
return workdir
workdir_data = self._prepare_fill_data(folder_id, task_id)
workdir = get_workdir_with_workdir_data(
workdir_data,
self.project_name,
anatomy=self._controller.project_anatomy,
)
folder_mapping[task_id] = workdir
return workdir
def get_file_items(self, folder_id, task_id):
items = []
if not folder_id or not task_id:
return items
workdir = self.get_workarea_dir_by_context(folder_id, task_id)
if not os.path.exists(workdir):
return items
for filename in os.listdir(workdir):
filepath = os.path.join(workdir, filename)
if not os.path.isfile(filepath):
continue
ext = os.path.splitext(filename)[1].lower()
if ext not in self._extensions:
continue
modified = os.path.getmtime(filepath)
items.append(
FileItem(workdir, filename, modified)
)
return items
def _get_template_key(self, fill_data):
task_type = fill_data.get("task", {}).get("type")
# TODO cache
return get_workfile_template_key(
task_type,
self._controller.get_host_name(),
project_name=self.project_name
)
def _get_last_workfile_version(
self, workdir, file_template, fill_data, extensions
):
version = get_last_workfile_with_version(
workdir, str(file_template), fill_data, extensions
)[1]
if version is None:
task_info = fill_data.get("task", {})
version = get_versioning_start(
self.project_name,
self._controller.get_host_name(),
task_name=task_info.get("name"),
task_type=task_info.get("type"),
family="workfile",
project_settings=self._controller.project_settings,
)
else:
version += 1
return version
def _get_comments_from_root(
self,
file_template,
extensions,
fill_data,
root,
current_filename,
):
current_comment = None
comment_hints = set()
filenames = []
if root and os.path.exists(root):
for filename in os.listdir(root):
path = os.path.join(root, filename)
if not os.path.isfile(path):
continue
ext = os.path.splitext(filename)[-1].lower()
if ext in extensions:
filenames.append(filename)
if not filenames:
return comment_hints, current_comment
matcher = CommentMatcher(extensions, file_template, fill_data)
for filename in filenames:
comment = matcher.parse_comment(filename)
if comment:
comment_hints.add(comment)
if filename == current_filename:
current_comment = comment
return list(comment_hints), current_comment
def _get_workdir(self, anatomy, template_key, fill_data):
template_info = anatomy.templates_obj[template_key]
directory_template = template_info["folder"]
return directory_template.format_strict(fill_data).normalized()
def get_workarea_save_as_data(self, folder_id, task_id):
folder = None
task = None
if folder_id:
folder = self._controller.get_folder_entity(folder_id)
if task_id:
task = self._controller.get_task_entity(task_id)
if not folder or not task:
return {
"template_key": None,
"template_has_version": None,
"template_has_comment": None,
"ext": None,
"workdir": None,
"comment": None,
"comment_hints": None,
"last_version": None,
"extensions": None,
}
anatomy = self._controller.project_anatomy
fill_data = self._prepare_fill_data(folder_id, task_id)
template_key = self._get_template_key(fill_data)
current_workfile = self._controller.get_current_workfile()
current_filename = None
current_ext = None
if current_workfile:
current_filename = os.path.basename(current_workfile)
current_ext = os.path.splitext(current_filename)[1].lower()
extensions = self._extensions
if not current_ext and extensions:
current_ext = tuple(extensions)[0]
workdir = self._get_workdir(anatomy, template_key, fill_data)
template_info = anatomy.templates_obj[template_key]
file_template = template_info["file"]
comment_hints, comment = self._get_comments_from_root(
file_template,
extensions,
fill_data,
workdir,
current_filename,
)
last_version = self._get_last_workfile_version(
workdir, file_template, fill_data, extensions)
str_file_template = str(file_template)
template_has_version = "{version" in str_file_template
template_has_comment = "{comment" in str_file_template
return {
"template_key": template_key,
"template_has_version": template_has_version,
"template_has_comment": template_has_comment,
"ext": current_ext,
"workdir": workdir,
"comment": comment,
"comment_hints": comment_hints,
"last_version": last_version,
"extensions": extensions,
}
def fill_workarea_filepath(
self,
folder_id,
task_id,
extension,
use_last_version,
version,
comment,
):
anatomy = self._controller.project_anatomy
fill_data = self._prepare_fill_data(folder_id, task_id)
template_key = self._get_template_key(fill_data)
workdir = self._get_workdir(anatomy, template_key, fill_data)
template_info = anatomy.templates_obj[template_key]
file_template = template_info["file"]
if use_last_version:
version = self._get_last_workfile_version(
workdir, file_template, fill_data, self._extensions
)
fill_data["version"] = version
fill_data["ext"] = extension.lstrip(".")
if comment:
fill_data["comment"] = comment
filename = file_template.format(fill_data)
if not filename.solved:
filename = None
exists = False
if filename:
filepath = os.path.join(workdir, filename)
exists = os.path.exists(filepath)
return WorkareaFilepathResult(
workdir,
filename,
exists
)
class WorkfileEntitiesModel:
"""Workfile entities model.
Args:
control (AbstractWorkfileController): Controller object.
"""
def __init__(self, controller):
self._controller = controller
self._cache = {}
self._items = {}
def _get_workfile_info_identifier(
self, folder_id, task_id, rootless_path
):
return "_".join([folder_id, task_id, rootless_path])
def _get_rootless_path(self, filepath):
anatomy = self._controller.project_anatomy
workdir, filename = os.path.split(filepath)
success, rootless_dir = anatomy.find_root_template_from_path(workdir)
return "/".join([
os.path.normpath(rootless_dir).replace("\\", "/"),
filename
])
def _prepare_workfile_info_item(
self, folder_id, task_id, workfile_info, filepath
):
note = ""
if workfile_info:
note = workfile_info["attrib"].get("description") or ""
filestat = os.stat(filepath)
return WorkfileInfo(
folder_id,
task_id,
filepath,
filesize=filestat.st_size,
creation_time=filestat.st_ctime,
modification_time=filestat.st_mtime,
note=note
)
def _get_workfile_info(self, folder_id, task_id, identifier):
workfile_info = self._cache.get(identifier)
if workfile_info is not None:
return workfile_info
for workfile_info in ayon_api.get_workfiles_info(
self._controller.get_current_project_name(),
task_ids=[task_id],
fields=["id", "path", "attrib"],
):
workfile_identifier = self._get_workfile_info_identifier(
folder_id, task_id, workfile_info["path"]
)
self._cache[workfile_identifier] = workfile_info
return self._cache.get(identifier)
def get_workfile_info(
self, folder_id, task_id, filepath, rootless_path=None
):
if not folder_id or not task_id or not filepath:
return None
if rootless_path is None:
rootless_path = self._get_rootless_path(filepath)
identifier = self._get_workfile_info_identifier(
folder_id, task_id, rootless_path)
item = self._items.get(identifier)
if item is None:
workfile_info = self._get_workfile_info(
folder_id, task_id, identifier
)
item = self._prepare_workfile_info_item(
folder_id, task_id, workfile_info, filepath
)
self._items[identifier] = item
return item
def save_workfile_info(self, folder_id, task_id, filepath, note):
rootless_path = self._get_rootless_path(filepath)
identifier = self._get_workfile_info_identifier(
folder_id, task_id, rootless_path
)
workfile_info = self._get_workfile_info(
folder_id, task_id, identifier
)
if not workfile_info:
self._cache[identifier] = self._create_workfile_info_entity(
task_id, rootless_path, note)
self._items.pop(identifier, None)
return
new_workfile_info = copy.deepcopy(workfile_info)
attrib = new_workfile_info.setdefault("attrib", {})
attrib["description"] = note
update_data = prepare_workfile_info_update_data(
workfile_info, new_workfile_info
)
self._cache[identifier] = new_workfile_info
self._items.pop(identifier, None)
if not update_data:
return
project_name = self._controller.get_current_project_name()
session = OperationsSession()
session.update_entity(
project_name, "workfile", workfile_info["id"], update_data
)
session.commit()
def _create_workfile_info_entity(self, task_id, rootless_path, note):
extension = os.path.splitext(rootless_path)[1]
project_name = self._controller.get_current_project_name()
workfile_info = {
"path": rootless_path,
"taskId": task_id,
"attrib": {
"extension": extension,
"description": note
}
}
session = OperationsSession()
session.create_entity(project_name, "workfile", workfile_info)
session.commit()
return workfile_info
class PublishWorkfilesModel:
"""Model for handling of published workfiles.
Todos:
Cache workfiles products and representations for some time.
Note Representations won't change. Only what can change are
versions.
"""
def __init__(self, controller):
self._controller = controller
self._cached_extensions = None
self._cached_repre_extensions = None
@property
def _extensions(self):
if self._cached_extensions is None:
exts = self._controller.get_workfile_extensions() or []
self._cached_extensions = exts
return self._cached_extensions
@property
def _repre_extensions(self):
if self._cached_repre_extensions is None:
self._cached_repre_extensions = {
ext.lstrip(".") for ext in self._extensions
}
return self._cached_repre_extensions
def _file_item_from_representation(
self, repre_entity, project_anatomy, task_name=None
):
if task_name is not None:
task_info = repre_entity["context"].get("task")
if not task_info or task_info["name"] != task_name:
return None
# Filter by extension
extensions = self._repre_extensions
workfile_path = None
for repre_file in repre_entity["files"]:
ext = (
os.path.splitext(repre_file["name"])[1]
.lower()
.lstrip(".")
)
if ext in extensions:
workfile_path = repre_file["path"]
break
if not workfile_path:
return None
try:
workfile_path = workfile_path.format(
root=project_anatomy.roots)
except Exception as exc:
print("Failed to format workfile path: {}".format(exc))
dirpath, filename = os.path.split(workfile_path)
created_at = arrow.get(repre_entity["createdAt"])
return FileItem(
dirpath,
filename,
created_at.float_timestamp,
repre_entity["id"]
)
def get_file_items(self, folder_id, task_name):
# TODO refactor to use less server API calls
project_name = self._controller.get_current_project_name()
# Get subset docs of asset
product_entities = ayon_api.get_products(
project_name,
folder_ids=[folder_id],
product_types=["workfile"],
fields=["id", "name"]
)
output = []
product_ids = {product["id"] for product in product_entities}
if not product_ids:
return output
# Get version docs of subsets with their families
version_entities = ayon_api.get_versions(
project_name,
product_ids=product_ids,
fields=["id", "productId"]
)
version_ids = {version["id"] for version in version_entities}
if not version_ids:
return output
# Query representations of filtered versions and add filter for
# extension
repre_entities = ayon_api.get_representations(
project_name,
version_ids=version_ids
)
project_anatomy = self._controller.project_anatomy
# Filter queried representations by task name if task is set
file_items = []
for repre_entity in repre_entities:
file_item = self._file_item_from_representation(
repre_entity, project_anatomy, task_name
)
if file_item is not None:
file_items.append(file_item)
return file_items
class WorkfilesModel:
"""Workfiles model."""
def __init__(self, controller):
self._controller = controller
self._entities_model = WorkfileEntitiesModel(controller)
self._workarea_model = WorkareaModel(controller)
self._published_model = PublishWorkfilesModel(controller)
def get_workfile_info(self, folder_id, task_id, filepath):
return self._entities_model.get_workfile_info(
folder_id, task_id, filepath
)
def save_workfile_info(self, folder_id, task_id, filepath, note):
self._entities_model.save_workfile_info(
folder_id, task_id, filepath, note
)
def get_workarea_dir_by_context(self, folder_id, task_id):
"""Workarea dir for passed context.
The directory path is based on project anatomy templates.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
Returns:
Union[str, None]: Workarea dir path or None for invalid context.
"""
return self._workarea_model.get_workarea_dir_by_context(
folder_id, task_id)
def get_workarea_file_items(self, folder_id, task_id):
"""Workfile items for passed context from workarea.
Args:
folder_id (Union[str, None]): Folder id.
task_id (Union[str, None]): Task id.
Returns:
list[FileItem]: List of file items matching workarea of passed
context.
"""
return self._workarea_model.get_file_items(folder_id, task_id)
def get_workarea_save_as_data(self, folder_id, task_id):
return self._workarea_model.get_workarea_save_as_data(
folder_id, task_id)
def fill_workarea_filepath(self, *args, **kwargs):
return self._workarea_model.fill_workarea_filepath(
*args, **kwargs
)
def get_published_file_items(self, folder_id, task_name):
"""Published workfiles for passed context.
Args:
folder_id (str): Folder id.
task_name (str): Task name.
Returns:
list[FileItem]: List of files for published workfiles.
"""
return self._published_model.get_file_items(folder_id, task_name)

View file

@ -0,0 +1,6 @@
from .window import WorkfilesToolWindow
__all__ = (
"WorkfilesToolWindow",
)

View file

@ -0,0 +1,7 @@
from qtpy import QtCore
ITEM_ID_ROLE = QtCore.Qt.UserRole + 1
PARENT_ID_ROLE = QtCore.Qt.UserRole + 2
ITEM_NAME_ROLE = QtCore.Qt.UserRole + 3
TASK_TYPE_ROLE = QtCore.Qt.UserRole + 4

View file

@ -0,0 +1,398 @@
import os
import qtpy
from qtpy import QtWidgets, QtCore
from .save_as_dialog import SaveAsDialog
from .files_widget_workarea import WorkAreaFilesWidget
from .files_widget_published import PublishedFilesWidget
class FilesWidget(QtWidgets.QWidget):
"""A widget displaying files that allows to save and open files.
Args:
controller (AbstractWorkfilesFrontend): The control object.
parent (QtWidgets.QWidget): The parent widget.
"""
def __init__(self, controller, parent):
super(FilesWidget, self).__init__(parent)
files_widget = QtWidgets.QStackedWidget(self)
workarea_widget = WorkAreaFilesWidget(controller, files_widget)
published_widget = PublishedFilesWidget(controller, files_widget)
files_widget.addWidget(workarea_widget)
files_widget.addWidget(published_widget)
btns_widget = QtWidgets.QWidget(self)
workarea_btns_widget = QtWidgets.QWidget(btns_widget)
workarea_btn_open = QtWidgets.QPushButton(
"Open", workarea_btns_widget)
workarea_btn_browse = QtWidgets.QPushButton(
"Browse", workarea_btns_widget)
workarea_btn_save = QtWidgets.QPushButton(
"Save As", workarea_btns_widget)
workarea_btns_layout = QtWidgets.QHBoxLayout(workarea_btns_widget)
workarea_btns_layout.setContentsMargins(0, 0, 0, 0)
workarea_btns_layout.addWidget(workarea_btn_open, 1)
workarea_btns_layout.addWidget(workarea_btn_browse, 1)
workarea_btns_layout.addWidget(workarea_btn_save, 1)
published_btns_widget = QtWidgets.QWidget(btns_widget)
published_btn_copy_n_open = QtWidgets.QPushButton(
"Copy && Open", published_btns_widget
)
published_btn_change_context = QtWidgets.QPushButton(
"Choose different context", published_btns_widget
)
published_btn_cancel = QtWidgets.QPushButton(
"Cancel", published_btns_widget
)
published_btns_layout = QtWidgets.QHBoxLayout(published_btns_widget)
published_btns_layout.setContentsMargins(0, 0, 0, 0)
published_btns_layout.addWidget(published_btn_copy_n_open, 1)
published_btns_layout.addWidget(published_btn_change_context, 1)
published_btns_layout.addWidget(published_btn_cancel, 1)
btns_layout = QtWidgets.QVBoxLayout(btns_widget)
btns_layout.setContentsMargins(0, 0, 0, 0)
btns_layout.addWidget(workarea_btns_widget, 1)
btns_layout.addWidget(published_btns_widget, 1)
main_layout = QtWidgets.QVBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(files_widget, 1)
main_layout.addWidget(btns_widget, 0)
controller.register_event_callback(
"workarea.selection.changed",
self._on_workarea_path_changed
)
controller.register_event_callback(
"selection.representation.changed",
self._on_published_repre_changed
)
controller.register_event_callback(
"selection.task.changed",
self._on_task_changed
)
controller.register_event_callback(
"copy_representation.finished",
self._on_copy_representation_finished,
)
controller.register_event_callback(
"workfile_save_enable.changed",
self._on_workfile_save_enabled_change,
)
workarea_widget.open_current_requested.connect(
self._on_current_open_requests)
workarea_widget.duplicate_requested.connect(
self._on_duplicate_request)
workarea_btn_open.clicked.connect(self._on_workarea_open_clicked)
workarea_btn_browse.clicked.connect(self._on_workarea_browse_clicked)
workarea_btn_save.clicked.connect(self._on_workarea_save_clicked)
published_widget.save_as_requested.connect(self._on_save_as_request)
published_btn_copy_n_open.clicked.connect(
self._on_published_save_clicked)
published_btn_change_context.clicked.connect(
self._on_published_change_context_clicked)
published_btn_cancel.clicked.connect(
self._on_published_cancel_clicked)
self._selected_folder_id = None
self._selected_tak_name = None
self._pre_select_folder_id = None
self._pre_select_task_name = None
self._select_context_mode = False
self._valid_selected_context = False
self._valid_representation_id = False
self._tmp_text_filter = None
self._is_save_enabled = True
self._controller = controller
self._files_widget = files_widget
self._workarea_widget = workarea_widget
self._published_widget = published_widget
self._workarea_btns_widget = workarea_btns_widget
self._published_btns_widget = published_btns_widget
self._workarea_btn_open = workarea_btn_open
self._workarea_btn_browse = workarea_btn_browse
self._workarea_btn_save = workarea_btn_save
self._published_widget = published_widget
self._published_btn_copy_n_open = published_btn_copy_n_open
self._published_btn_change_context = published_btn_change_context
self._published_btn_cancel = published_btn_cancel
# Initial setup
workarea_btn_open.setEnabled(False)
published_btn_copy_n_open.setEnabled(False)
published_btn_change_context.setEnabled(False)
published_btn_cancel.setVisible(False)
def set_published_mode(self, published_mode):
# Make sure context selection is disabled
self._set_select_contex_mode(False)
# Change current widget
self._files_widget.setCurrentWidget((
self._published_widget
if published_mode
else self._workarea_widget
))
# Pass the mode to the widgets, so they can start/stop handle events
self._workarea_widget.set_published_mode(published_mode)
self._published_widget.set_published_mode(published_mode)
# Change available buttons
self._workarea_btns_widget.setVisible(not published_mode)
self._published_btns_widget.setVisible(published_mode)
def set_text_filter(self, text_filter):
if self._select_context_mode:
self._tmp_text_filter = text_filter
return
self._workarea_widget.set_text_filter(text_filter)
self._published_widget.set_text_filter(text_filter)
def _exec_save_as_dialog(self):
"""Show SaveAs dialog using currently selected context.
Returns:
Union[dict[str, Any], None]: Result of the dialog.
"""
dialog = SaveAsDialog(self._controller, self)
dialog.update_context()
dialog.exec_()
return dialog.get_result()
# -------------------------------------------------------------
# Workarea workfiles
# -------------------------------------------------------------
def _open_workfile(self, filepath):
if self._controller.has_unsaved_changes():
result = self._save_changes_prompt()
if result is None:
return
if result:
self._controller.save_current_workfile()
self._controller.open_workfile(filepath)
def _on_workarea_open_clicked(self):
path = self._workarea_widget.get_selected_path()
if path:
self._open_workfile(path)
def _on_current_open_requests(self):
self._on_workarea_open_clicked()
def _on_duplicate_request(self):
filepath = self._workarea_widget.get_selected_path()
if filepath is None:
return
result = self._exec_save_as_dialog()
if result is None:
return
self._controller.duplicate_workfile(
filepath,
result["workdir"],
result["filename"]
)
def _on_workarea_browse_clicked(self):
extnsions = self._controller.get_workfile_extensions()
ext_filter = "Work File (*{0})".format(
" *".join(extnsions)
)
dir_key = "directory"
if qtpy.API in ("pyside", "pyside2", "pyside6"):
dir_key = "dir"
selected_context = self._controller.get_selected_context()
workfile_root = self._controller.get_workarea_dir_by_context(
selected_context["folder_id"], selected_context["task_id"]
)
# Find existing directory of workfile root
# - Qt will use 'cwd' instead, if path does not exist, which may lead
# to igniter directory
while workfile_root:
if os.path.exists(workfile_root):
break
workfile_root = os.path.dirname(workfile_root)
kwargs = {
"caption": "Work Files",
"filter": ext_filter,
dir_key: workfile_root
}
filepath = QtWidgets.QFileDialog.getOpenFileName(**kwargs)[0]
if filepath:
self._open_workfile(filepath)
def _on_workarea_save_clicked(self):
result = self._exec_save_as_dialog()
if result is None:
return
self._controller.save_as_workfile(
result["folder_id"],
result["task_id"],
result["workdir"],
result["filename"],
result["template_key"],
)
def _on_workarea_path_changed(self, event):
valid_path = event["path"] is not None
self._workarea_btn_open.setEnabled(valid_path)
# -------------------------------------------------------------
# Published workfiles
# -------------------------------------------------------------
def _update_published_btns_state(self):
enabled = (
self._valid_representation_id
and self._valid_selected_context
and self._is_save_enabled
)
self._published_btn_copy_n_open.setEnabled(enabled)
self._published_btn_change_context.setEnabled(enabled)
def _update_workarea_btns_state(self):
enabled = self._is_save_enabled
self._workarea_btn_save.setEnabled(enabled)
def _on_published_repre_changed(self, event):
self._valid_representation_id = event["representation_id"] is not None
self._update_published_btns_state()
def _on_task_changed(self, event):
self._selected_folder_id = event["folder_id"]
self._selected_tak_name = event["task_name"]
self._valid_selected_context = (
self._selected_folder_id is not None
and self._selected_tak_name is not None
)
self._update_published_btns_state()
def _on_published_save_clicked(self):
result = self._exec_save_as_dialog()
if result is None:
return
repre_info = self._published_widget.get_selected_repre_info()
self._controller.copy_workfile_representation(
repre_info["representation_id"],
repre_info["filepath"],
result["folder_id"],
result["task_id"],
result["workdir"],
result["filename"],
result["template_key"],
)
def _on_save_as_request(self):
self._on_published_save_clicked()
def _set_select_contex_mode(self, enabled):
if self._select_context_mode is enabled:
return
if enabled:
self._pre_select_folder_id = self._selected_folder_id
self._pre_select_task_name = self._selected_tak_name
else:
self._pre_select_folder_id = None
self._pre_select_task_name = None
self._select_context_mode = enabled
self._published_btn_cancel.setVisible(enabled)
self._published_btn_change_context.setVisible(not enabled)
self._published_widget.set_select_context_mode(enabled)
if not enabled and self._tmp_text_filter is not None:
self.set_text_filter(self._tmp_text_filter)
self._tmp_text_filter = None
def _on_published_change_context_clicked(self):
self._set_select_contex_mode(True)
def _should_set_pre_select_context(self):
if self._pre_select_folder_id is None:
return False
if self._pre_select_folder_id != self._selected_folder_id:
return True
if self._pre_select_task_name is None:
return False
return self._pre_select_task_name != self._selected_tak_name
def _on_published_cancel_clicked(self):
folder_id = self._pre_select_folder_id
task_name = self._pre_select_task_name
representation_id = self._published_widget.get_selected_repre_id()
should_change_selection = self._should_set_pre_select_context()
self._set_select_contex_mode(False)
if should_change_selection:
self._controller.set_expected_selection(
folder_id, task_name, representation_id=representation_id
)
def _on_copy_representation_finished(self, event):
"""Callback for when copy representation is finished.
Make sure that select context mode is disabled when representation
copy is finished.
Args:
event (Event): Event object.
"""
if not event["failed"]:
self._set_select_contex_mode(False)
def _on_workfile_save_enabled_change(self, event):
enabled = event["enabled"]
self._is_save_enabled = enabled
self._update_published_btns_state()
self._update_workarea_btns_state()
def _save_changes_prompt(self):
"""Ask user if wants to save changes to current file.
Returns:
Union[bool, None]: True if user wants to save changes, False if
user does not want to save changes, None if user cancels
operation.
"""
messagebox = QtWidgets.QMessageBox(parent=self)
messagebox.setWindowFlags(
messagebox.windowFlags() | QtCore.Qt.FramelessWindowHint
)
messagebox.setIcon(QtWidgets.QMessageBox.Warning)
messagebox.setWindowTitle("Unsaved Changes!")
messagebox.setText(
"There are unsaved changes to the current file."
"\nDo you want to save the changes?"
)
messagebox.setStandardButtons(
QtWidgets.QMessageBox.Yes
| QtWidgets.QMessageBox.No
| QtWidgets.QMessageBox.Cancel
)
result = messagebox.exec_()
if result == QtWidgets.QMessageBox.Yes:
return True
if result == QtWidgets.QMessageBox.No:
return False
return None

View file

@ -0,0 +1,378 @@
import qtawesome
from qtpy import QtWidgets, QtCore, QtGui
from openpype.style import (
get_default_entity_icon_color,
get_disabled_entity_icon_color,
)
from openpype.tools.utils.delegates import PrettyTimeDelegate
from .utils import TreeView, BaseOverlayFrame
REPRE_ID_ROLE = QtCore.Qt.UserRole + 1
FILEPATH_ROLE = QtCore.Qt.UserRole + 2
DATE_MODIFIED_ROLE = QtCore.Qt.UserRole + 3
class PublishedFilesModel(QtGui.QStandardItemModel):
"""A model for displaying files.
Args:
controller (AbstractWorkfilesFrontend): The control object.
"""
def __init__(self, controller):
super(PublishedFilesModel, self).__init__()
self.setColumnCount(2)
self.setHeaderData(0, QtCore.Qt.Horizontal, "Name")
self.setHeaderData(1, QtCore.Qt.Horizontal, "Date Modified")
controller.register_event_callback(
"selection.task.changed",
self._on_task_changed
)
controller.register_event_callback(
"selection.folder.changed",
self._on_folder_changed
)
self._file_icon = qtawesome.icon(
"fa.file-o",
color=get_default_entity_icon_color()
)
self._controller = controller
self._items_by_id = {}
self._missing_context_item = None
self._missing_context_used = False
self._empty_root_item = None
self._empty_item_used = False
self._published_mode = False
self._context_select_mode = False
self._last_folder_id = None
self._last_task_id = None
self._add_empty_item()
def _clear_items(self):
self._remove_missing_context_item()
self._remove_empty_item()
if self._items_by_id:
root = self.invisibleRootItem()
root.removeRows(0, root.rowCount())
self._items_by_id = {}
def set_published_mode(self, published_mode):
if self._published_mode == published_mode:
return
self._published_mode = published_mode
if published_mode:
self._fill_items()
elif self._context_select_mode:
self.set_select_context_mode(False)
def set_select_context_mode(self, select_mode):
if self._context_select_mode is select_mode:
return
self._context_select_mode = select_mode
if not select_mode and self._published_mode:
self._fill_items()
def get_index_by_representation_id(self, representation_id):
item = self._items_by_id.get(representation_id)
if item is None:
return QtCore.QModelIndex()
return self.indexFromItem(item)
def _get_missing_context_item(self):
if self._missing_context_item is None:
message = "Select folder"
item = QtGui.QStandardItem(message)
icon = qtawesome.icon(
"fa.times",
color=get_disabled_entity_icon_color()
)
item.setData(icon, QtCore.Qt.DecorationRole)
item.setFlags(QtCore.Qt.NoItemFlags)
item.setColumnCount(self.columnCount())
self._missing_context_item = item
return self._missing_context_item
def _add_missing_context_item(self):
if self._missing_context_used:
return
self._clear_items()
root_item = self.invisibleRootItem()
root_item.appendRow(self._get_missing_context_item())
self._missing_context_used = True
def _remove_missing_context_item(self):
if not self._missing_context_used:
return
root_item = self.invisibleRootItem()
root_item.takeRow(self._missing_context_item.row())
self._missing_context_used = False
def _get_empty_root_item(self):
if self._empty_root_item is None:
message = "Didn't find any published workfiles."
item = QtGui.QStandardItem(message)
icon = qtawesome.icon(
"fa.times",
color=get_disabled_entity_icon_color()
)
item.setData(icon, QtCore.Qt.DecorationRole)
item.setFlags(QtCore.Qt.NoItemFlags)
item.setColumnCount(self.columnCount())
self._empty_root_item = item
return self._empty_root_item
def _add_empty_item(self):
if self._empty_item_used:
return
self._clear_items()
root_item = self.invisibleRootItem()
root_item.appendRow(self._get_empty_root_item())
self._empty_item_used = True
def _remove_empty_item(self):
if not self._empty_item_used:
return
root_item = self.invisibleRootItem()
root_item.takeRow(self._empty_root_item.row())
self._empty_item_used = False
def _on_folder_changed(self, event):
self._last_folder_id = event["folder_id"]
self._last_task_id = None
if self._context_select_mode:
return
if self._published_mode:
self._fill_items()
def _on_task_changed(self, event):
self._last_folder_id = event["folder_id"]
self._last_task_id = event["task_id"]
if self._context_select_mode:
return
if self._published_mode:
self._fill_items()
def _fill_items(self):
folder_id = self._last_folder_id
task_id = self._last_task_id
if not folder_id:
self._add_missing_context_item()
return
file_items = self._controller.get_published_file_items(
folder_id, task_id
)
root_item = self.invisibleRootItem()
if not file_items:
self._add_empty_item()
return
self._remove_empty_item()
self._remove_missing_context_item()
items_to_remove = set(self._items_by_id.keys())
new_items = []
for file_item in file_items:
repre_id = file_item.representation_id
if repre_id in self._items_by_id:
items_to_remove.discard(repre_id)
item = self._items_by_id[repre_id]
else:
item = QtGui.QStandardItem()
new_items.append(item)
item.setColumnCount(self.columnCount())
item.setData(self._file_icon, QtCore.Qt.DecorationRole)
item.setData(file_item.filename, QtCore.Qt.DisplayRole)
item.setData(repre_id, REPRE_ID_ROLE)
if file_item.exists:
flags = QtCore.Qt.ItemIsEnabled | QtCore.Qt.ItemIsSelectable
else:
flags = QtCore.Qt.NoItemFlags
item.setFlags(flags)
item.setData(file_item.filepath, FILEPATH_ROLE)
item.setData(file_item.modified, DATE_MODIFIED_ROLE)
self._items_by_id[repre_id] = item
if new_items:
root_item.appendRows(new_items)
for repre_id in items_to_remove:
item = self._items_by_id.pop(repre_id)
root_item.removeRow(item.row())
if root_item.rowCount() == 0:
self._add_empty_item()
def flags(self, index):
# Use flags of first column for all columns
if index.column() != 0:
index = self.index(index.row(), 0, index.parent())
return super(PublishedFilesModel, self).flags(index)
def data(self, index, role=None):
if role is None:
role = QtCore.Qt.DisplayRole
# Handle roles for first column
if index.column() == 1:
if role == QtCore.Qt.DecorationRole:
return None
if role in (QtCore.Qt.DisplayRole, QtCore.Qt.EditRole):
role = DATE_MODIFIED_ROLE
index = self.index(index.row(), 0, index.parent())
return super(PublishedFilesModel, self).data(index, role)
class SelectContextOverlay(BaseOverlayFrame):
"""Overlay for files view when user should select context.
Todos:
The look of this overlay should be improved, it is "not nice" now.
"""
def __init__(self, parent):
super(SelectContextOverlay, self).__init__(parent)
label_widget = QtWidgets.QLabel(
"Please choose context on the left<br/>&lt",
self
)
label_widget.setAlignment(QtCore.Qt.AlignCenter)
label_widget.setObjectName("OverlayFrameLabel")
layout = QtWidgets.QHBoxLayout(self)
layout.addWidget(label_widget, 1, QtCore.Qt.AlignCenter)
label_widget.setAttribute(QtCore.Qt.WA_TranslucentBackground)
class PublishedFilesWidget(QtWidgets.QWidget):
"""Published workfiles widget.
Args:
controller (AbstractWorkfilesFrontend): The control object.
parent (QtWidgets.QWidget): The parent widget.
"""
selection_changed = QtCore.Signal()
save_as_requested = QtCore.Signal()
def __init__(self, controller, parent):
super(PublishedFilesWidget, self).__init__(parent)
view = TreeView(self)
view.setSortingEnabled(True)
view.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
# Smaller indentation
view.setIndentation(0)
model = PublishedFilesModel(controller)
proxy_model = QtCore.QSortFilterProxyModel()
proxy_model.setSourceModel(model)
proxy_model.setSortCaseSensitivity(QtCore.Qt.CaseInsensitive)
proxy_model.setDynamicSortFilter(True)
view.setModel(proxy_model)
time_delegate = PrettyTimeDelegate()
view.setItemDelegateForColumn(1, time_delegate)
# Default to a wider first filename column it is what we mostly care
# about and the date modified is relatively small anyway.
view.setColumnWidth(0, 330)
select_overlay = SelectContextOverlay(view)
select_overlay.setVisible(False)
main_layout = QtWidgets.QVBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(view, 1)
selection_model = view.selectionModel()
selection_model.selectionChanged.connect(self._on_selection_change)
view.double_clicked_left.connect(self._on_left_double_click)
controller.register_event_callback(
"expected_selection_changed",
self._on_expected_selection_change
)
self._view = view
self._select_overlay = select_overlay
self._model = model
self._proxy_model = proxy_model
self._time_delegate = time_delegate
self._controller = controller
def set_published_mode(self, published_mode):
self._model.set_published_mode(published_mode)
def set_select_context_mode(self, select_mode):
self._model.set_select_context_mode(select_mode)
self._select_overlay.setVisible(select_mode)
def set_text_filter(self, text_filter):
self._proxy_model.setFilterFixedString(text_filter)
def get_selected_repre_info(self):
selection_model = self._view.selectionModel()
representation_id = None
filepath = None
for index in selection_model.selectedIndexes():
representation_id = index.data(REPRE_ID_ROLE)
filepath = index.data(FILEPATH_ROLE)
return {
"representation_id": representation_id,
"filepath": filepath,
}
def get_selected_repre_id(self):
return self.get_selected_repre_info()["representation_id"]
def _on_selection_change(self):
repre_id = self.get_selected_repre_id()
self._controller.set_selected_representation_id(repre_id)
def _on_left_double_click(self):
self.save_as_requested.emit()
def _on_expected_selection_change(self, event):
if (
event["representation_id_selected"]
or not event["folder_selected"]
or (event["task_name"] and not event["task_selected"])
):
return
representation_id = event["representation_id"]
selected_repre_id = self.get_selected_repre_id()
if (
representation_id is not None
and representation_id != selected_repre_id
):
index = self._model.get_index_by_representation_id(
representation_id)
if index.isValid():
proxy_index = self._proxy_model.mapFromSource(index)
self._view.setCurrentIndex(proxy_index)
self._controller.expected_representation_selected(
event["folder_id"], event["task_name"], representation_id
)

View file

@ -0,0 +1,380 @@
import qtawesome
from qtpy import QtWidgets, QtCore, QtGui
from openpype.style import (
get_default_entity_icon_color,
get_disabled_entity_icon_color,
)
from openpype.tools.utils.delegates import PrettyTimeDelegate
from .utils import TreeView
FILENAME_ROLE = QtCore.Qt.UserRole + 1
FILEPATH_ROLE = QtCore.Qt.UserRole + 2
DATE_MODIFIED_ROLE = QtCore.Qt.UserRole + 3
class WorkAreaFilesModel(QtGui.QStandardItemModel):
"""A model for workare workfiles.
Args:
controller (AbstractWorkfilesFrontend): The control object.
"""
def __init__(self, controller):
super(WorkAreaFilesModel, self).__init__()
self.setColumnCount(2)
self.setHeaderData(0, QtCore.Qt.Horizontal, "Name")
self.setHeaderData(1, QtCore.Qt.Horizontal, "Date Modified")
controller.register_event_callback(
"selection.task.changed",
self._on_task_changed
)
controller.register_event_callback(
"workfile_duplicate.finished",
self._on_duplicate_finished
)
controller.register_event_callback(
"save_as.finished",
self._on_save_as_finished
)
self._file_icon = qtawesome.icon(
"fa.file-o",
color=get_default_entity_icon_color()
)
self._controller = controller
self._items_by_filename = {}
self._missing_context_item = None
self._missing_context_used = False
self._empty_root_item = None
self._empty_item_used = False
self._published_mode = False
self._selected_folder_id = None
self._selected_task_id = None
self._add_missing_context_item()
def get_index_by_filename(self, filename):
item = self._items_by_filename.get(filename)
if item is None:
return QtCore.QModelIndex()
return self.indexFromItem(item)
def _get_missing_context_item(self):
if self._missing_context_item is None:
message = "Select folder and task"
item = QtGui.QStandardItem(message)
icon = qtawesome.icon(
"fa.times",
color=get_disabled_entity_icon_color()
)
item.setData(icon, QtCore.Qt.DecorationRole)
item.setFlags(QtCore.Qt.NoItemFlags)
item.setColumnCount(self.columnCount())
self._missing_context_item = item
return self._missing_context_item
def _clear_items(self):
self._remove_missing_context_item()
self._remove_empty_item()
if self._items_by_filename:
root = self.invisibleRootItem()
root.removeRows(0, root.rowCount())
self._items_by_filename = {}
def _add_missing_context_item(self):
if self._missing_context_used:
return
self._clear_items()
root_item = self.invisibleRootItem()
root_item.appendRow(self._get_missing_context_item())
self._missing_context_used = True
def _remove_missing_context_item(self):
if not self._missing_context_used:
return
root_item = self.invisibleRootItem()
root_item.takeRow(self._missing_context_item.row())
self._missing_context_used = False
def _get_empty_root_item(self):
if self._empty_root_item is None:
message = "Work Area is empty.."
item = QtGui.QStandardItem(message)
icon = qtawesome.icon(
"fa.exclamation-circle",
color=get_disabled_entity_icon_color()
)
item.setData(icon, QtCore.Qt.DecorationRole)
item.setFlags(QtCore.Qt.NoItemFlags)
item.setColumnCount(self.columnCount())
self._empty_root_item = item
return self._empty_root_item
def _add_empty_item(self):
if self._empty_item_used:
return
self._clear_items()
root_item = self.invisibleRootItem()
root_item.appendRow(self._get_empty_root_item())
self._empty_item_used = True
def _remove_empty_item(self):
if not self._empty_item_used:
return
root_item = self.invisibleRootItem()
root_item.takeRow(self._empty_root_item.row())
self._empty_item_used = False
def _on_task_changed(self, event):
self._selected_folder_id = event["folder_id"]
self._selected_task_id = event["task_id"]
if not self._published_mode:
self._fill_items()
def _on_duplicate_finished(self, event):
if event["failed"]:
return
if not self._published_mode:
self._fill_items()
def _on_save_as_finished(self, event):
if event["failed"]:
return
if not self._published_mode:
self._fill_items()
def _fill_items(self):
folder_id = self._selected_folder_id
task_id = self._selected_task_id
if not folder_id or not task_id:
self._add_missing_context_item()
return
file_items = self._controller.get_workarea_file_items(
folder_id, task_id
)
root_item = self.invisibleRootItem()
if not file_items:
self._add_empty_item()
return
self._remove_empty_item()
self._remove_missing_context_item()
items_to_remove = set(self._items_by_filename.keys())
new_items = []
for file_item in file_items:
filename = file_item.filename
if filename in self._items_by_filename:
items_to_remove.discard(filename)
item = self._items_by_filename[filename]
else:
item = QtGui.QStandardItem()
new_items.append(item)
item.setColumnCount(self.columnCount())
item.setFlags(
QtCore.Qt.ItemIsEnabled | QtCore.Qt.ItemIsSelectable
)
item.setData(self._file_icon, QtCore.Qt.DecorationRole)
item.setData(file_item.filename, QtCore.Qt.DisplayRole)
item.setData(file_item.filename, FILENAME_ROLE)
item.setData(file_item.filepath, FILEPATH_ROLE)
item.setData(file_item.modified, DATE_MODIFIED_ROLE)
self._items_by_filename[file_item.filename] = item
if new_items:
root_item.appendRows(new_items)
for filename in items_to_remove:
item = self._items_by_filename.pop(filename)
root_item.removeRow(item.row())
if root_item.rowCount() == 0:
self._add_empty_item()
def flags(self, index):
# Use flags of first column for all columns
if index.column() != 0:
index = self.index(index.row(), 0, index.parent())
return super(WorkAreaFilesModel, self).flags(index)
def data(self, index, role=None):
if role is None:
role = QtCore.Qt.DisplayRole
# Handle roles for first column
if index.column() == 1:
if role == QtCore.Qt.DecorationRole:
return None
if role in (QtCore.Qt.DisplayRole, QtCore.Qt.EditRole):
role = DATE_MODIFIED_ROLE
index = self.index(index.row(), 0, index.parent())
return super(WorkAreaFilesModel, self).data(index, role)
def set_published_mode(self, published_mode):
if self._published_mode == published_mode:
return
self._published_mode = published_mode
if not published_mode:
self._fill_items()
class WorkAreaFilesWidget(QtWidgets.QWidget):
"""Workarea files widget.
Args:
controller (AbstractWorkfilesFrontend): The control object.
parent (QtWidgets.QWidget): The parent widget.
"""
selection_changed = QtCore.Signal()
open_current_requested = QtCore.Signal()
duplicate_requested = QtCore.Signal()
def __init__(self, controller, parent):
super(WorkAreaFilesWidget, self).__init__(parent)
view = TreeView(self)
view.setSortingEnabled(True)
view.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
# Smaller indentation
view.setIndentation(0)
model = WorkAreaFilesModel(controller)
proxy_model = QtCore.QSortFilterProxyModel()
proxy_model.setSourceModel(model)
proxy_model.setSortCaseSensitivity(QtCore.Qt.CaseInsensitive)
proxy_model.setDynamicSortFilter(True)
view.setModel(proxy_model)
time_delegate = PrettyTimeDelegate()
view.setItemDelegateForColumn(1, time_delegate)
# Default to a wider first filename column it is what we mostly care
# about and the date modified is relatively small anyway.
view.setColumnWidth(0, 330)
main_layout = QtWidgets.QVBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(view, 1)
selection_model = view.selectionModel()
selection_model.selectionChanged.connect(self._on_selection_change)
view.double_clicked_left.connect(self._on_left_double_click)
view.customContextMenuRequested.connect(self._on_context_menu)
controller.register_event_callback(
"expected_selection_changed",
self._on_expected_selection_change
)
self._view = view
self._model = model
self._proxy_model = proxy_model
self._time_delegate = time_delegate
self._controller = controller
self._published_mode = False
def set_published_mode(self, published_mode):
"""Set the published mode.
Widget should ignore most of events when in published mode is enabled.
Args:
published_mode (bool): The published mode.
"""
self._model.set_published_mode(published_mode)
self._published_mode = published_mode
def set_text_filter(self, text_filter):
"""Set the text filter.
Args:
text_filter (str): The text filter.
"""
self._proxy_model.setFilterFixedString(text_filter)
def _get_selected_info(self):
selection_model = self._view.selectionModel()
filepath = None
filename = None
for index in selection_model.selectedIndexes():
filepath = index.data(FILEPATH_ROLE)
filename = index.data(FILENAME_ROLE)
return {
"filepath": filepath,
"filename": filename,
}
def get_selected_path(self):
"""Selected filepath.
Returns:
Union[str, None]: The selected filepath or None if nothing is
selected.
"""
return self._get_selected_info()["filepath"]
def _on_selection_change(self):
filepath = self.get_selected_path()
self._controller.set_selected_workfile_path(filepath)
def _on_left_double_click(self):
self.open_current_requested.emit()
def _on_context_menu(self, point):
index = self._view.indexAt(point)
if not index.isValid():
return
if not index.flags() & QtCore.Qt.ItemIsEnabled:
return
menu = QtWidgets.QMenu(self)
# Duplicate
action = QtWidgets.QAction("Duplicate", menu)
tip = "Duplicate selected file."
action.setToolTip(tip)
action.setStatusTip(tip)
action.triggered.connect(self._on_duplicate_pressed)
menu.addAction(action)
# Show the context action menu
global_point = self._view.mapToGlobal(point)
_ = menu.exec_(global_point)
def _on_duplicate_pressed(self):
self.duplicate_requested.emit()
def _on_expected_selection_change(self, event):
if event["workfile_name_selected"]:
return
workfile_name = event["workfile_name"]
if (
workfile_name is not None
and workfile_name != self._get_selected_info()["filename"]
):
index = self._model.get_index_by_filename(workfile_name)
if index.isValid():
proxy_index = self._proxy_model.mapFromSource(index)
self._view.setCurrentIndex(proxy_index)
self._controller.expected_workfile_selected(
event["folder_id"], event["task_name"], workfile_name
)

View file

@ -0,0 +1,324 @@
import uuid
import collections
import qtawesome
from qtpy import QtWidgets, QtGui, QtCore
from openpype.tools.utils import (
RecursiveSortFilterProxyModel,
DeselectableTreeView,
)
from .constants import ITEM_ID_ROLE, ITEM_NAME_ROLE
SENDER_NAME = "qt_folders_model"
class FoldersRefreshThread(QtCore.QThread):
"""Thread for refreshing folders.
Call controller to get folders and emit signal when finished.
Args:
controller (AbstractWorkfilesFrontend): The control object.
"""
refresh_finished = QtCore.Signal(str)
def __init__(self, controller):
super(FoldersRefreshThread, self).__init__()
self._id = uuid.uuid4().hex
self._controller = controller
self._result = None
@property
def id(self):
"""Thread id.
Returns:
str: Unique id of the thread.
"""
return self._id
def run(self):
self._result = self._controller.get_folder_items(SENDER_NAME)
self.refresh_finished.emit(self.id)
def get_result(self):
return self._result
class FoldersModel(QtGui.QStandardItemModel):
"""Folders model which cares about refresh of folders.
Args:
controller (AbstractWorkfilesFrontend): The control object.
"""
refreshed = QtCore.Signal()
def __init__(self, controller):
super(FoldersModel, self).__init__()
self._controller = controller
self._items_by_id = {}
self._parent_id_by_id = {}
self._refresh_threads = {}
self._current_refresh_thread = None
self._has_content = False
self._is_refreshing = False
@property
def is_refreshing(self):
"""Model is refreshing.
Returns:
bool: True if model is refreshing.
"""
return self._is_refreshing
@property
def has_content(self):
"""Has at least one folder.
Returns:
bool: True if model has at least one folder.
"""
return self._has_content
def clear(self):
self._items_by_id = {}
self._parent_id_by_id = {}
self._has_content = False
super(FoldersModel, self).clear()
def get_index_by_id(self, item_id):
"""Get index by folder id.
Returns:
QtCore.QModelIndex: Index of the folder. Can be invalid if folder
is not available.
"""
item = self._items_by_id.get(item_id)
if item is None:
return QtCore.QModelIndex()
return self.indexFromItem(item)
def refresh(self):
"""Refresh folders items.
Refresh start thread because it can cause that controller can
start query from database if folders are not cached.
"""
self._is_refreshing = True
thread = FoldersRefreshThread(self._controller)
self._current_refresh_thread = thread.id
self._refresh_threads[thread.id] = thread
thread.refresh_finished.connect(self._on_refresh_thread)
thread.start()
def _on_refresh_thread(self, thread_id):
"""Callback when refresh thread is finished.
Technically can be running multiple refresh threads at the same time,
to avoid using values from wrong thread, we check if thread id is
current refresh thread id.
Folders are stored by id.
Args:
thread_id (str): Thread id.
"""
thread = self._refresh_threads.pop(thread_id)
if thread_id != self._current_refresh_thread:
return
folder_items_by_id = thread.get_result()
if not folder_items_by_id:
if folder_items_by_id is not None:
self.clear()
self._is_refreshing = False
return
self._has_content = True
folder_ids = set(folder_items_by_id)
ids_to_remove = set(self._items_by_id) - folder_ids
folder_items_by_parent = collections.defaultdict(list)
for folder_item in folder_items_by_id.values():
folder_items_by_parent[folder_item.parent_id].append(folder_item)
hierarchy_queue = collections.deque()
hierarchy_queue.append(None)
while hierarchy_queue:
parent_id = hierarchy_queue.popleft()
folder_items = folder_items_by_parent[parent_id]
if parent_id is None:
parent_item = self.invisibleRootItem()
else:
parent_item = self._items_by_id[parent_id]
new_items = []
for folder_item in folder_items:
item_id = folder_item.entity_id
item = self._items_by_id.get(item_id)
if item is None:
is_new = True
item = QtGui.QStandardItem()
item.setEditable(False)
else:
is_new = self._parent_id_by_id[item_id] != parent_id
icon = qtawesome.icon(
folder_item.icon_name,
color=folder_item.icon_color,
)
item.setData(item_id, ITEM_ID_ROLE)
item.setData(folder_item.name, ITEM_NAME_ROLE)
item.setData(folder_item.label, QtCore.Qt.DisplayRole)
item.setData(icon, QtCore.Qt.DecorationRole)
if is_new:
new_items.append(item)
self._items_by_id[item_id] = item
self._parent_id_by_id[item_id] = parent_id
hierarchy_queue.append(item_id)
if new_items:
parent_item.appendRows(new_items)
for item_id in ids_to_remove:
item = self._items_by_id[item_id]
parent_id = self._parent_id_by_id[item_id]
if parent_id is None:
parent_item = self.invisibleRootItem()
else:
parent_item = self._items_by_id[parent_id]
parent_item.takeChild(item.row())
for item_id in ids_to_remove:
self._items_by_id.pop(item_id)
self._parent_id_by_id.pop(item_id)
self._is_refreshing = False
self.refreshed.emit()
class FoldersWidget(QtWidgets.QWidget):
"""Folders widget.
Widget that handles folders view, model and selection.
Args:
controller (AbstractWorkfilesFrontend): The control object.
parent (QtWidgets.QWidget): The parent widget.
"""
def __init__(self, controller, parent):
super(FoldersWidget, self).__init__(parent)
folders_view = DeselectableTreeView(self)
folders_view.setHeaderHidden(True)
folders_model = FoldersModel(controller)
folders_proxy_model = RecursiveSortFilterProxyModel()
folders_proxy_model.setSourceModel(folders_model)
folders_view.setModel(folders_proxy_model)
main_layout = QtWidgets.QHBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(folders_view, 1)
controller.register_event_callback(
"folders.refresh.finished",
self._on_folders_refresh_finished
)
controller.register_event_callback(
"controller.refresh.finished",
self._on_controller_refresh
)
controller.register_event_callback(
"expected_selection_changed",
self._on_expected_selection_change
)
selection_model = folders_view.selectionModel()
selection_model.selectionChanged.connect(self._on_selection_change)
folders_model.refreshed.connect(self._on_model_refresh)
self._controller = controller
self._folders_view = folders_view
self._folders_model = folders_model
self._folders_proxy_model = folders_proxy_model
self._expected_selection = None
def set_name_filer(self, name):
self._folders_proxy_model.setFilterFixedString(name)
def _clear(self):
self._folders_model.clear()
def _on_folders_refresh_finished(self, event):
if event["sender"] != SENDER_NAME:
self._folders_model.refresh()
def _on_controller_refresh(self):
self._update_expected_selection()
def _update_expected_selection(self, expected_data=None):
if expected_data is None:
expected_data = self._controller.get_expected_selection_data()
# We're done
if expected_data["folder_selected"]:
return
folder_id = expected_data["folder_id"]
self._expected_selection = folder_id
if not self._folders_model.is_refreshing:
self._set_expected_selection()
def _set_expected_selection(self):
folder_id = self._expected_selection
self._expected_selection = None
if (
folder_id is not None
and folder_id != self._get_selected_item_id()
):
index = self._folders_model.get_index_by_id(folder_id)
if index.isValid():
proxy_index = self._folders_proxy_model.mapFromSource(index)
self._folders_view.setCurrentIndex(proxy_index)
self._controller.expected_folder_selected(folder_id)
def _on_model_refresh(self):
if self._expected_selection:
self._set_expected_selection()
self._folders_proxy_model.sort(0)
def _on_expected_selection_change(self, event):
self._update_expected_selection(event.data)
def _get_selected_item_id(self):
selection_model = self._folders_view.selectionModel()
for index in selection_model.selectedIndexes():
item_id = index.data(ITEM_ID_ROLE)
if item_id is not None:
return item_id
return None
def _on_selection_change(self):
item_id = self._get_selected_item_id()
self._controller.set_selected_folder(item_id)

View file

@ -0,0 +1,351 @@
from qtpy import QtWidgets, QtCore
from openpype.tools.utils import PlaceholderLineEdit
class SubversionLineEdit(QtWidgets.QWidget):
"""QLineEdit with QPushButton for drop down selection of list of strings"""
text_changed = QtCore.Signal(str)
def __init__(self, *args, **kwargs):
super(SubversionLineEdit, self).__init__(*args, **kwargs)
input_field = PlaceholderLineEdit(self)
menu_btn = QtWidgets.QPushButton(self)
menu_btn.setFixedWidth(18)
menu = QtWidgets.QMenu(self)
menu_btn.setMenu(menu)
layout = QtWidgets.QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(3)
layout.addWidget(input_field, 1)
layout.addWidget(menu_btn, 0)
input_field.textChanged.connect(self.text_changed)
self.setFocusProxy(input_field)
self._input_field = input_field
self._menu_btn = menu_btn
self._menu = menu
def set_placeholder(self, placeholder):
self._input_field.setPlaceholderText(placeholder)
def set_text(self, text):
self._input_field.setText(text)
def set_values(self, values):
self._update(values)
def _on_button_clicked(self):
self._menu.exec_()
def _on_action_clicked(self, action):
self._input_field.setText(action.text())
def _update(self, values):
"""Create optional predefined subset names
Args:
default_names(list): all predefined names
Returns:
None
"""
menu = self._menu
button = self._menu_btn
state = any(values)
button.setEnabled(state)
if state is False:
return
# Include an empty string
values = [""] + sorted(values)
# Get and destroy the action group
group = button.findChild(QtWidgets.QActionGroup)
if group:
group.deleteLater()
# Build new action group
group = QtWidgets.QActionGroup(button)
for name in values:
action = group.addAction(name)
menu.addAction(action)
group.triggered.connect(self._on_action_clicked)
class SaveAsDialog(QtWidgets.QDialog):
"""Save as dialog to define a unique filename inside workdir.
The filename is calculated in controller where UI sends values from
dialog inputs.
Args:
controller (AbstractWorkfilesFrontend): The control object.
"""
def __init__(self, controller, parent):
super(SaveAsDialog, self).__init__(parent=parent)
self.setWindowFlags(self.windowFlags() | QtCore.Qt.FramelessWindowHint)
self._controller = controller
self._folder_id = None
self._task_id = None
self._last_version = None
self._template_key = None
self._comment_value = None
self._version_value = None
self._ext_value = None
self._filename = None
self._workdir = None
self._result = None
# Btns widget
btns_widget = QtWidgets.QWidget(self)
btn_ok = QtWidgets.QPushButton("Ok", btns_widget)
btn_cancel = QtWidgets.QPushButton("Cancel", btns_widget)
btns_layout = QtWidgets.QHBoxLayout(btns_widget)
btns_layout.addWidget(btn_ok)
btns_layout.addWidget(btn_cancel)
# Inputs widget
inputs_widget = QtWidgets.QWidget(self)
# Version widget
version_widget = QtWidgets.QWidget(inputs_widget)
# Version number input
version_input = QtWidgets.QSpinBox(version_widget)
version_input.setMinimum(1)
version_input.setMaximum(9999)
# Last version checkbox
last_version_check = QtWidgets.QCheckBox(
"Next Available Version", version_widget
)
last_version_check.setChecked(True)
version_layout = QtWidgets.QHBoxLayout(version_widget)
version_layout.setContentsMargins(0, 0, 0, 0)
version_layout.addWidget(version_input)
version_layout.addWidget(last_version_check)
# Preview widget
preview_widget = QtWidgets.QLabel("Preview filename", inputs_widget)
preview_widget.setWordWrap(True)
# Subversion input
subversion_input = SubversionLineEdit(inputs_widget)
subversion_input.set_placeholder("Will be part of filename.")
# Extensions combobox
extension_combobox = QtWidgets.QComboBox(inputs_widget)
# Add styled delegate to use stylesheets
extension_delegate = QtWidgets.QStyledItemDelegate()
extension_combobox.setItemDelegate(extension_delegate)
version_label = QtWidgets.QLabel("Version:", inputs_widget)
subversion_label = QtWidgets.QLabel("Subversion:", inputs_widget)
extension_label = QtWidgets.QLabel("Extension:", inputs_widget)
preview_label = QtWidgets.QLabel("Preview:", inputs_widget)
# Build inputs
inputs_layout = QtWidgets.QGridLayout(inputs_widget)
inputs_layout.addWidget(version_label, 0, 0)
inputs_layout.addWidget(version_widget, 0, 1)
inputs_layout.addWidget(subversion_label, 1, 0)
inputs_layout.addWidget(subversion_input, 1, 1)
inputs_layout.addWidget(extension_label, 2, 0)
inputs_layout.addWidget(extension_combobox, 2, 1)
inputs_layout.addWidget(preview_label, 3, 0)
inputs_layout.addWidget(preview_widget, 3, 1)
# Build layout
main_layout = QtWidgets.QVBoxLayout(self)
main_layout.addWidget(inputs_widget)
main_layout.addWidget(btns_widget)
# Signal callback registration
version_input.valueChanged.connect(self._on_version_spinbox_change)
last_version_check.stateChanged.connect(
self._on_version_checkbox_change
)
subversion_input.text_changed.connect(self._on_comment_change)
extension_combobox.currentIndexChanged.connect(
self._on_extension_change)
btn_ok.pressed.connect(self._on_ok_pressed)
btn_cancel.pressed.connect(self._on_cancel_pressed)
# Store objects
self._inputs_layout = inputs_layout
self._btn_ok = btn_ok
self._btn_cancel = btn_cancel
self._version_widget = version_widget
self._version_input = version_input
self._last_version_check = last_version_check
self._extension_delegate = extension_delegate
self._extension_combobox = extension_combobox
self._subversion_input = subversion_input
self._preview_widget = preview_widget
self._version_label = version_label
self._subversion_label = subversion_label
self._extension_label = extension_label
self._preview_label = preview_label
# Post init setup
# Allow "Enter" key to accept the save.
btn_ok.setDefault(True)
# Disable version input if last version is checked
version_input.setEnabled(not last_version_check.isChecked())
# Force default focus to comment, some hosts didn't automatically
# apply focus to this line edit (e.g. Houdini)
subversion_input.setFocus()
def get_result(self):
return self._result
def update_context(self):
# Add version only if template contains version key
# - since the version can be padded with "{version:0>4}" we only search
# for "{version".
selected_context = self._controller.get_selected_context()
folder_id = selected_context["folder_id"]
task_id = selected_context["task_id"]
data = self._controller.get_workarea_save_as_data(folder_id, task_id)
last_version = data["last_version"]
comment = data["comment"]
comment_hints = data["comment_hints"]
template_has_version = data["template_has_version"]
template_has_comment = data["template_has_comment"]
self._folder_id = folder_id
self._task_id = task_id
self._workdir = data["workdir"]
self._comment_value = data["comment"]
self._ext_value = data["ext"]
self._template_key = data["template_key"]
self._last_version = data["last_version"]
self._extension_combobox.clear()
self._extension_combobox.addItems(data["extensions"])
self._version_input.setValue(last_version)
vw_idx = self._inputs_layout.indexOf(self._version_widget)
self._version_label.setVisible(template_has_version)
self._version_widget.setVisible(template_has_version)
if template_has_version:
if vw_idx == -1:
self._inputs_layout.addWidget(self._version_label, 0, 0)
self._inputs_layout.addWidget(self._version_widget, 0, 1)
elif vw_idx != -1:
self._inputs_layout.takeAt(vw_idx)
self._inputs_layout.takeAt(
self._inputs_layout.indexOf(self._version_label)
)
cw_idx = self._inputs_layout.indexOf(self._subversion_input)
self._subversion_label.setVisible(template_has_comment)
self._subversion_input.setVisible(template_has_comment)
if template_has_comment:
if cw_idx == -1:
self._inputs_layout.addWidget(self._subversion_label, 1, 0)
self._inputs_layout.addWidget(self._subversion_input, 1, 1)
elif cw_idx != -1:
self._inputs_layout.takeAt(cw_idx)
self._inputs_layout.takeAt(
self._inputs_layout.indexOf(self._subversion_label)
)
if template_has_comment:
self._subversion_input.set_text(comment or "")
self._subversion_input.set_values(comment_hints)
self._update_filename()
def _on_version_spinbox_change(self, value):
if value == self._version_value:
return
self._version_value = value
if not self._last_version_check.isChecked():
self._update_filename()
def _on_version_checkbox_change(self):
use_last_version = self._last_version_check.isChecked()
self._version_input.setEnabled(not use_last_version)
if use_last_version:
self._version_input.blockSignals(True)
self._version_input.setValue(self._last_version)
self._version_input.blockSignals(False)
self._update_filename()
def _on_comment_change(self, text):
if self._comment_value == text:
return
self._comment_value = text
self._update_filename()
def _on_extension_change(self):
ext = self._extension_combobox.currentText()
if ext == self._ext_value:
return
self._ext_value = ext
self._update_filename()
def _on_ok_pressed(self):
self._result = {
"filename": self._filename,
"workdir": self._workdir,
"folder_id": self._folder_id,
"task_id": self._task_id,
"template_key": self._template_key,
}
self.close()
def _on_cancel_pressed(self):
self.close()
def _update_filename(self):
result = self._controller.fill_workarea_filepath(
self._folder_id,
self._task_id,
self._ext_value,
self._last_version_check.isChecked(),
self._version_value,
self._comment_value,
)
self._filename = result.filename
self._btn_ok.setEnabled(not result.exists)
if result.exists:
self._preview_widget.setText((
"<font color='red'>Cannot create \"{}\" because file exists!"
"</font>"
).format(result.filename))
else:
self._preview_widget.setText(
"<font color='green'>{}</font>".format(result.filename)
)

View file

@ -0,0 +1,163 @@
import datetime
from qtpy import QtWidgets, QtCore
def file_size_to_string(file_size):
size = 0
size_ending_mapping = {
"KB": 1024 ** 1,
"MB": 1024 ** 2,
"GB": 1024 ** 3
}
ending = "B"
for _ending, _size in size_ending_mapping.items():
if file_size < _size:
break
size = file_size / _size
ending = _ending
return "{:.2f} {}".format(size, ending)
class SidePanelWidget(QtWidgets.QWidget):
"""Details about selected workfile.
Todos:
At this moment only shows created and modified date of file
or its size.
Args:
controller (AbstractWorkfilesFrontend): The control object.
parent (QtWidgets.QWidget): The parent widget.
"""
published_workfile_message = (
"<b>INFO</b>: Opened published workfiles will be stored in"
" temp directory on your machine. Current temp size: <b>{}</b>."
)
def __init__(self, controller, parent):
super(SidePanelWidget, self).__init__(parent)
details_label = QtWidgets.QLabel("Details", self)
details_input = QtWidgets.QPlainTextEdit(self)
details_input.setReadOnly(True)
artist_note_widget = QtWidgets.QWidget(self)
note_label = QtWidgets.QLabel("Artist note", artist_note_widget)
note_input = QtWidgets.QPlainTextEdit(artist_note_widget)
btn_note_save = QtWidgets.QPushButton("Save note", artist_note_widget)
artist_note_layout = QtWidgets.QVBoxLayout(artist_note_widget)
artist_note_layout.setContentsMargins(0, 0, 0, 0)
artist_note_layout.addWidget(note_label, 0)
artist_note_layout.addWidget(note_input, 1)
artist_note_layout.addWidget(
btn_note_save, 0, alignment=QtCore.Qt.AlignRight
)
main_layout = QtWidgets.QVBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(details_label, 0)
main_layout.addWidget(details_input, 1)
main_layout.addWidget(artist_note_widget, 1)
note_input.textChanged.connect(self._on_note_change)
btn_note_save.clicked.connect(self._on_save_click)
controller.register_event_callback(
"workarea.selection.changed", self._on_selection_change
)
self._details_input = details_input
self._artist_note_widget = artist_note_widget
self._note_input = note_input
self._btn_note_save = btn_note_save
self._folder_id = None
self._task_id = None
self._filepath = None
self._orig_note = ""
self._controller = controller
self._set_context(None, None, None)
def set_published_mode(self, published_mode):
"""Change published mode.
Args:
published_mode (bool): Published mode enabled.
"""
self._artist_note_widget.setVisible(not published_mode)
def _on_selection_change(self, event):
folder_id = event["folder_id"]
task_id = event["task_id"]
filepath = event["path"]
self._set_context(folder_id, task_id, filepath)
def _on_note_change(self):
text = self._note_input.toPlainText()
self._btn_note_save.setEnabled(self._orig_note != text)
def _on_save_click(self):
note = self._note_input.toPlainText()
self._controller.save_workfile_info(
self._folder_id,
self._task_id,
self._filepath,
note
)
self._orig_note = note
self._btn_note_save.setEnabled(False)
def _set_context(self, folder_id, task_id, filepath):
workfile_info = None
# Check if folder, task and file are selected
if bool(folder_id) and bool(task_id) and bool(filepath):
workfile_info = self._controller.get_workfile_info(
folder_id, task_id, filepath
)
enabled = workfile_info is not None
self._details_input.setEnabled(enabled)
self._note_input.setEnabled(enabled)
self._btn_note_save.setEnabled(enabled)
self._folder_id = folder_id
self._task_id = task_id
self._filepath = filepath
# Disable inputs and remove texts if any required arguments are
# missing
if not enabled:
self._orig_note = ""
self._details_input.setPlainText("")
self._note_input.setPlainText("")
return
note = workfile_info.note
size_value = file_size_to_string(workfile_info.filesize)
# Append html string
datetime_format = "%b %d %Y %H:%M:%S"
creation_time = datetime.datetime.fromtimestamp(
workfile_info.creation_time)
modification_time = datetime.datetime.fromtimestamp(
workfile_info.modification_time)
lines = (
"<b>Size:</b>",
size_value,
"<b>Created:</b>",
creation_time.strftime(datetime_format),
"<b>Modified:</b>",
modification_time.strftime(datetime_format)
)
self._orig_note = note
self._note_input.setPlainText(note)
# Set as empty string
self._details_input.setPlainText("")
self._details_input.appendHtml("<br>".join(lines))

View file

@ -0,0 +1,420 @@
import uuid
import qtawesome
from qtpy import QtWidgets, QtGui, QtCore
from openpype.style import get_disabled_entity_icon_color
from openpype.tools.utils import DeselectableTreeView
from .constants import (
ITEM_NAME_ROLE,
ITEM_ID_ROLE,
PARENT_ID_ROLE,
)
SENDER_NAME = "qt_tasks_model"
class RefreshThread(QtCore.QThread):
"""Thread for refreshing tasks.
Call controller to get tasks and emit signal when finished.
Args:
controller (AbstractWorkfilesFrontend): The control object.
folder_id (str): Folder id.
"""
refresh_finished = QtCore.Signal(str)
def __init__(self, controller, folder_id):
super(RefreshThread, self).__init__()
self._id = uuid.uuid4().hex
self._controller = controller
self._folder_id = folder_id
self._result = None
@property
def id(self):
return self._id
def run(self):
self._result = self._controller.get_task_items(
self._folder_id, SENDER_NAME)
self.refresh_finished.emit(self.id)
def get_result(self):
return self._result
class TasksModel(QtGui.QStandardItemModel):
"""Tasks model which cares about refresh of tasks by folder id.
Args:
controller (AbstractWorkfilesFrontend): The control object.
"""
refreshed = QtCore.Signal()
def __init__(self, controller):
super(TasksModel, self).__init__()
self._controller = controller
self._items_by_name = {}
self._has_content = False
self._is_refreshing = False
self._invalid_selection_item_used = False
self._invalid_selection_item = None
self._empty_tasks_item_used = False
self._empty_tasks_item = None
self._last_folder_id = None
self._refresh_threads = {}
self._current_refresh_thread = None
# Initial state
self._add_invalid_selection_item()
def clear(self):
self._items_by_name = {}
self._has_content = False
self._remove_invalid_items()
super(TasksModel, self).clear()
def refresh(self, folder_id):
"""Refresh tasks for folder.
Args:
folder_id (Union[str, None]): Folder id.
"""
self._refresh(folder_id)
def get_index_by_name(self, task_name):
"""Find item by name and return its index.
Returns:
QtCore.QModelIndex: Index of item. Is invalid if task is not
found by name.
"""
item = self._items_by_name.get(task_name)
if item is None:
return QtCore.QModelIndex()
return self.indexFromItem(item)
def get_last_folder_id(self):
"""Get last refreshed folder id.
Returns:
Union[str, None]: Folder id.
"""
return self._last_folder_id
def _get_invalid_selection_item(self):
if self._invalid_selection_item is None:
item = QtGui.QStandardItem("Select a folder")
item.setFlags(QtCore.Qt.NoItemFlags)
icon = qtawesome.icon(
"fa.times",
color=get_disabled_entity_icon_color()
)
item.setData(icon, QtCore.Qt.DecorationRole)
self._invalid_selection_item = item
return self._invalid_selection_item
def _get_empty_task_item(self):
if self._empty_tasks_item is None:
item = QtGui.QStandardItem("No task")
icon = qtawesome.icon(
"fa.exclamation-circle",
color=get_disabled_entity_icon_color()
)
item.setData(icon, QtCore.Qt.DecorationRole)
item.setFlags(QtCore.Qt.NoItemFlags)
self._empty_tasks_item = item
return self._empty_tasks_item
def _add_invalid_item(self, item):
self.clear()
root_item = self.invisibleRootItem()
root_item.appendRow(item)
def _remove_invalid_item(self, item):
root_item = self.invisibleRootItem()
root_item.takeRow(item.row())
def _remove_invalid_items(self):
self._remove_invalid_selection_item()
self._remove_empty_task_item()
def _add_invalid_selection_item(self):
if not self._invalid_selection_item_used:
self._add_invalid_item(self._get_invalid_selection_item())
self._invalid_selection_item_used = True
def _remove_invalid_selection_item(self):
if self._invalid_selection_item:
self._remove_invalid_item(self._get_invalid_selection_item())
self._invalid_selection_item_used = False
def _add_empty_task_item(self):
if not self._empty_tasks_item_used:
self._add_invalid_item(self._get_empty_task_item())
self._empty_tasks_item_used = True
def _remove_empty_task_item(self):
if self._empty_tasks_item_used:
self._remove_invalid_item(self._get_empty_task_item())
self._empty_tasks_item_used = False
def _refresh(self, folder_id):
self._is_refreshing = True
self._last_folder_id = folder_id
if not folder_id:
self._add_invalid_selection_item()
self._current_refresh_thread = None
self._is_refreshing = False
self.refreshed.emit()
return
thread = RefreshThread(self._controller, folder_id)
self._current_refresh_thread = thread.id
self._refresh_threads[thread.id] = thread
thread.refresh_finished.connect(self._on_refresh_thread)
thread.start()
def _on_refresh_thread(self, thread_id):
"""Callback when refresh thread is finished.
Technically can be running multiple refresh threads at the same time,
to avoid using values from wrong thread, we check if thread id is
current refresh thread id.
Tasks are stored by name, so if a folder has same task name as
previously selected folder it keeps the selection.
Args:
thread_id (str): Thread id.
"""
thread = self._refresh_threads.pop(thread_id)
if thread_id != self._current_refresh_thread:
return
task_items = thread.get_result()
# Task items are refreshed
if task_items is None:
return
# No tasks are available on folder
if not task_items:
self._add_empty_task_item()
return
self._remove_invalid_items()
new_items = []
new_names = set()
for task_item in task_items:
name = task_item.name
new_names.add(name)
item = self._items_by_name.get(name)
if item is None:
item = QtGui.QStandardItem()
item.setEditable(False)
new_items.append(item)
self._items_by_name[name] = item
# TODO cache locally
icon = qtawesome.icon(
task_item.icon_name,
color=task_item.icon_color,
)
item.setData(task_item.label, QtCore.Qt.DisplayRole)
item.setData(name, ITEM_NAME_ROLE)
item.setData(task_item.id, ITEM_ID_ROLE)
item.setData(task_item.parent_id, PARENT_ID_ROLE)
item.setData(icon, QtCore.Qt.DecorationRole)
root_item = self.invisibleRootItem()
for name in set(self._items_by_name) - new_names:
item = self._items_by_name.pop(name)
root_item.removeRow(item.row())
if new_items:
root_item.appendRows(new_items)
self._has_content = root_item.rowCount() > 0
self._is_refreshing = False
self.refreshed.emit()
@property
def is_refreshing(self):
"""Model is refreshing.
Returns:
bool: Model is refreshing
"""
return self._is_refreshing
@property
def has_content(self):
"""Model has content.
Returns:
bools: Have at least one task.
"""
return self._has_content
def headerData(self, section, orientation, role):
# Show nice labels in the header
if (
role == QtCore.Qt.DisplayRole
and orientation == QtCore.Qt.Horizontal
):
if section == 0:
return "Tasks"
return super(TasksModel, self).headerData(
section, orientation, role
)
class TasksWidget(QtWidgets.QWidget):
"""Tasks widget.
Widget that handles tasks view, model and selection.
Args:
controller (AbstractWorkfilesFrontend): Workfiles controller.
"""
def __init__(self, controller, parent):
super(TasksWidget, self).__init__(parent)
tasks_view = DeselectableTreeView(self)
tasks_view.setIndentation(0)
tasks_model = TasksModel(controller)
tasks_proxy_model = QtCore.QSortFilterProxyModel()
tasks_proxy_model.setSourceModel(tasks_model)
tasks_view.setModel(tasks_proxy_model)
main_layout = QtWidgets.QHBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(tasks_view, 1)
controller.register_event_callback(
"tasks.refresh.finished",
self._on_tasks_refresh_finished
)
controller.register_event_callback(
"selection.folder.changed",
self._folder_selection_changed
)
controller.register_event_callback(
"expected_selection_changed",
self._on_expected_selection_change
)
selection_model = tasks_view.selectionModel()
selection_model.selectionChanged.connect(self._on_selection_change)
tasks_model.refreshed.connect(self._on_tasks_model_refresh)
self._controller = controller
self._tasks_view = tasks_view
self._tasks_model = tasks_model
self._tasks_proxy_model = tasks_proxy_model
self._selected_folder_id = None
self._expected_selection_data = None
def _clear(self):
self._tasks_model.clear()
def _on_tasks_refresh_finished(self, event):
"""Tasks were refreshed in controller.
Ignore if refresh was triggered by tasks model, or refreshed folder is
not the same as currently selected folder.
Args:
event (Event): Event object.
"""
# Refresh only if current folder id is the same
if (
event["sender"] == SENDER_NAME
or event["folder_id"] != self._selected_folder_id
):
return
self._tasks_model.refresh(self._selected_folder_id)
def _folder_selection_changed(self, event):
self._selected_folder_id = event["folder_id"]
self._tasks_model.refresh(self._selected_folder_id)
def _on_tasks_model_refresh(self):
if not self._set_expected_selection():
self._on_selection_change()
self._tasks_proxy_model.sort(0)
def _set_expected_selection(self):
if self._expected_selection_data is None:
return False
folder_id = self._expected_selection_data["folder_id"]
task_name = self._expected_selection_data["task_name"]
self._expected_selection_data = None
model_folder_id = self._tasks_model.get_last_folder_id()
if folder_id != model_folder_id:
return False
if task_name is not None:
index = self._tasks_model.get_index_by_name(task_name)
if index.isValid():
proxy_index = self._tasks_proxy_model.mapFromSource(index)
self._tasks_view.setCurrentIndex(proxy_index)
self._controller.expected_task_selected(folder_id, task_name)
return True
def _on_expected_selection_change(self, event):
if event["task_selected"] or not event["folder_selected"]:
return
model_folder_id = self._tasks_model.get_last_folder_id()
folder_id = event["folder_id"]
self._expected_selection_data = {
"task_name": event["task_name"],
"folder_id": folder_id,
}
if folder_id != model_folder_id or self._tasks_model.is_refreshing:
return
self._set_expected_selection()
def _get_selected_item_ids(self):
selection_model = self._tasks_view.selectionModel()
for index in selection_model.selectedIndexes():
task_id = index.data(ITEM_ID_ROLE)
task_name = index.data(ITEM_NAME_ROLE)
parent_id = index.data(PARENT_ID_ROLE)
if task_name is not None:
return parent_id, task_id, task_name
return self._selected_folder_id, None, None
def _on_selection_change(self):
# Don't trigger task change during refresh
# - a task was deselected if that happens
# - can cause crash triggered during tasks refreshing
if self._tasks_model.is_refreshing:
return
parent_id, task_id, task_name = self._get_selected_item_ids()
self._controller.set_selected_task(parent_id, task_id, task_name)

View file

@ -0,0 +1,94 @@
from qtpy import QtWidgets, QtCore
from openpype.tools.flickcharm import FlickCharm
class TreeView(QtWidgets.QTreeView):
"""Ultimate TreeView with flick charm and double click signals.
Tree view have deselectable mode, which allows to deselect items by
clicking on item area without any items.
Todos:
Add to tools utils.
"""
double_clicked_left = QtCore.Signal()
double_clicked_right = QtCore.Signal()
def __init__(self, *args, **kwargs):
super(TreeView, self).__init__(*args, **kwargs)
self._deselectable = False
self._flick_charm_activated = False
self._flick_charm = FlickCharm(parent=self)
self._before_flick_scroll_mode = None
def is_deselectable(self):
return self._deselectable
def set_deselectable(self, deselectable):
self._deselectable = deselectable
deselectable = property(is_deselectable, set_deselectable)
def mousePressEvent(self, event):
if self._deselectable:
index = self.indexAt(event.pos())
if not index.isValid():
# clear the selection
self.clearSelection()
# clear the current index
self.setCurrentIndex(QtCore.QModelIndex())
super(TreeView, self).mousePressEvent(event)
def mouseDoubleClickEvent(self, event):
if event.button() == QtCore.Qt.LeftButton:
self.double_clicked_left.emit()
elif event.button() == QtCore.Qt.RightButton:
self.double_clicked_right.emit()
return super(TreeView, self).mouseDoubleClickEvent(event)
def activate_flick_charm(self):
if self._flick_charm_activated:
return
self._flick_charm_activated = True
self._before_flick_scroll_mode = self.verticalScrollMode()
self._flick_charm.activateOn(self)
self.setVerticalScrollMode(QtWidgets.QAbstractItemView.ScrollPerPixel)
def deactivate_flick_charm(self):
if not self._flick_charm_activated:
return
self._flick_charm_activated = False
self._flick_charm.deactivateFrom(self)
if self._before_flick_scroll_mode is not None:
self.setVerticalScrollMode(self._before_flick_scroll_mode)
class BaseOverlayFrame(QtWidgets.QFrame):
"""Base frame for overlay widgets.
Has implemented automated resize and event filtering.
"""
def __init__(self, parent):
super(BaseOverlayFrame, self).__init__(parent)
self.setObjectName("OverlayFrame")
self._parent = parent
def setVisible(self, visible):
super(BaseOverlayFrame, self).setVisible(visible)
if visible:
self._parent.installEventFilter(self)
self.resize(self._parent.size())
else:
self._parent.removeEventFilter(self)
def eventFilter(self, obj, event):
if event.type() == QtCore.QEvent.Resize:
self.resize(obj.size())
return super(BaseOverlayFrame, self).eventFilter(obj, event)

View file

@ -0,0 +1,400 @@
from qtpy import QtCore, QtWidgets, QtGui
from openpype import style, resources
from openpype.tools.utils import (
PlaceholderLineEdit,
MessageOverlayObject,
)
from openpype.tools.utils.lib import get_qta_icon_by_name_and_color
from openpype.tools.ayon_workfiles.control import BaseWorkfileController
from .side_panel import SidePanelWidget
from .folders_widget import FoldersWidget
from .tasks_widget import TasksWidget
from .files_widget import FilesWidget
from .utils import BaseOverlayFrame
# TODO move to utils
# from openpype.tools.utils.lib import (
# get_refresh_icon, get_go_to_current_icon)
def get_refresh_icon():
return get_qta_icon_by_name_and_color(
"fa.refresh", style.get_default_tools_icon_color()
)
def get_go_to_current_icon():
return get_qta_icon_by_name_and_color(
"fa.arrow-down", style.get_default_tools_icon_color()
)
class InvalidHostOverlay(BaseOverlayFrame):
def __init__(self, parent):
super(InvalidHostOverlay, self).__init__(parent)
label_widget = QtWidgets.QLabel(
(
"Workfiles tool is not supported in this host/DCCs."
"<br/><br/>This may be caused by a bug."
" Please contact your TD for more information."
),
self
)
label_widget.setAlignment(QtCore.Qt.AlignCenter)
label_widget.setObjectName("OverlayFrameLabel")
layout = QtWidgets.QVBoxLayout(self)
layout.addStretch(2)
layout.addWidget(label_widget, 0, QtCore.Qt.AlignCenter)
layout.addStretch(3)
label_widget.setAttribute(QtCore.Qt.WA_TranslucentBackground)
class WorkfilesToolWindow(QtWidgets.QWidget):
"""WorkFiles Window.
Main windows of workfiles tool.
Args:
controller (AbstractWorkfilesFrontend): Frontend controller.
parent (Optional[QtWidgets.QWidget]): Parent widget.
"""
title = "Work Files"
def __init__(self, controller=None, parent=None):
super(WorkfilesToolWindow, self).__init__(parent=parent)
if controller is None:
controller = BaseWorkfileController()
self.setWindowTitle(self.title)
icon = QtGui.QIcon(resources.get_openpype_icon_filepath())
self.setWindowIcon(icon)
flags = self.windowFlags() | QtCore.Qt.Window
self.setWindowFlags(flags)
self._default_window_flags = flags
self._folder_widget = None
self._folder_filter_input = None
self._files_widget = None
self._first_show = True
self._controller_refreshed = False
self._context_to_set = None
# Host validation should happen only once
self._host_is_valid = None
self._controller = controller
# Create pages widget and set it as central widget
pages_widget = QtWidgets.QStackedWidget(self)
home_page_widget = QtWidgets.QWidget(pages_widget)
home_body_widget = QtWidgets.QWidget(home_page_widget)
col_1_widget = self._create_col_1_widget(controller, parent)
tasks_widget = TasksWidget(controller, home_body_widget)
col_3_widget = self._create_col_3_widget(controller, home_body_widget)
side_panel = SidePanelWidget(controller, home_body_widget)
pages_widget.addWidget(home_page_widget)
# Build home
home_page_layout = QtWidgets.QVBoxLayout(home_page_widget)
home_page_layout.addWidget(home_body_widget)
# Build home - body
body_layout = QtWidgets.QVBoxLayout(home_body_widget)
split_widget = QtWidgets.QSplitter(home_body_widget)
split_widget.addWidget(col_1_widget)
split_widget.addWidget(tasks_widget)
split_widget.addWidget(col_3_widget)
split_widget.addWidget(side_panel)
split_widget.setSizes([255, 160, 455, 175])
body_layout.addWidget(split_widget)
main_layout = QtWidgets.QHBoxLayout(self)
main_layout.addWidget(pages_widget, 1)
overlay_messages_widget = MessageOverlayObject(self)
overlay_invalid_host = InvalidHostOverlay(self)
overlay_invalid_host.setVisible(False)
first_show_timer = QtCore.QTimer()
first_show_timer.setSingleShot(True)
first_show_timer.setInterval(50)
first_show_timer.timeout.connect(self._on_first_show)
controller.register_event_callback(
"save_as.finished",
self._on_save_as_finished,
)
controller.register_event_callback(
"copy_representation.finished",
self._on_copy_representation_finished,
)
controller.register_event_callback(
"workfile_duplicate.finished",
self._on_duplicate_finished
)
controller.register_event_callback(
"open_workfile.finished",
self._on_open_finished
)
controller.register_event_callback(
"controller.refresh.started",
self._on_controller_refresh_started,
)
controller.register_event_callback(
"controller.refresh.finished",
self._on_controller_refresh_finished,
)
self._overlay_messages_widget = overlay_messages_widget
self._overlay_invalid_host = overlay_invalid_host
self._home_page_widget = home_page_widget
self._pages_widget = pages_widget
self._home_body_widget = home_body_widget
self._split_widget = split_widget
self._tasks_widget = tasks_widget
self._side_panel = side_panel
self._first_show_timer = first_show_timer
self._post_init()
def _post_init(self):
self._on_published_checkbox_changed()
# Force focus on the open button by default, required for Houdini.
self._files_widget.setFocus()
self.resize(1200, 600)
def _create_col_1_widget(self, controller, parent):
col_widget = QtWidgets.QWidget(parent)
header_widget = QtWidgets.QWidget(col_widget)
folder_filter_input = PlaceholderLineEdit(header_widget)
folder_filter_input.setPlaceholderText("Filter folders..")
go_to_current_btn = QtWidgets.QPushButton(header_widget)
go_to_current_btn.setIcon(get_go_to_current_icon())
go_to_current_btn_sp = go_to_current_btn.sizePolicy()
go_to_current_btn_sp.setVerticalPolicy(QtWidgets.QSizePolicy.Minimum)
go_to_current_btn.setSizePolicy(go_to_current_btn_sp)
refresh_btn = QtWidgets.QPushButton(header_widget)
refresh_btn.setIcon(get_refresh_icon())
refresh_btn_sp = refresh_btn.sizePolicy()
refresh_btn_sp.setVerticalPolicy(QtWidgets.QSizePolicy.Minimum)
refresh_btn.setSizePolicy(refresh_btn_sp)
folder_widget = FoldersWidget(controller, col_widget)
header_layout = QtWidgets.QHBoxLayout(header_widget)
header_layout.setContentsMargins(0, 0, 0, 0)
header_layout.addWidget(folder_filter_input, 1)
header_layout.addWidget(go_to_current_btn, 0)
header_layout.addWidget(refresh_btn, 0)
col_layout = QtWidgets.QVBoxLayout(col_widget)
col_layout.setContentsMargins(0, 0, 0, 0)
col_layout.addWidget(header_widget, 0)
col_layout.addWidget(folder_widget, 1)
folder_filter_input.textChanged.connect(self._on_folder_filter_change)
go_to_current_btn.clicked.connect(self._on_go_to_current_clicked)
refresh_btn.clicked.connect(self._on_refresh_clicked)
self._folder_filter_input = folder_filter_input
self._folder_widget = folder_widget
return col_widget
def _create_col_3_widget(self, controller, parent):
col_widget = QtWidgets.QWidget(parent)
header_widget = QtWidgets.QWidget(col_widget)
files_filter_input = PlaceholderLineEdit(header_widget)
files_filter_input.setPlaceholderText("Filter files..")
published_checkbox = QtWidgets.QCheckBox("Published", header_widget)
published_checkbox.setToolTip("Show published workfiles")
header_layout = QtWidgets.QHBoxLayout(header_widget)
header_layout.setContentsMargins(0, 0, 0, 0)
header_layout.addWidget(files_filter_input, 1)
header_layout.addWidget(published_checkbox, 0)
files_widget = FilesWidget(controller, col_widget)
col_layout = QtWidgets.QVBoxLayout(col_widget)
col_layout.setContentsMargins(0, 0, 0, 0)
col_layout.addWidget(header_widget, 0)
col_layout.addWidget(files_widget, 1)
files_filter_input.textChanged.connect(
self._on_file_text_filter_change)
published_checkbox.stateChanged.connect(
self._on_published_checkbox_changed
)
self._files_filter_input = files_filter_input
self._published_checkbox = published_checkbox
self._files_widget = files_widget
return col_widget
def set_window_on_top(self, on_top):
"""Set window on top of other windows.
Args:
on_top (bool): Show on top of other windows.
"""
flags = self._default_window_flags
if on_top:
flags |= QtCore.Qt.WindowStaysOnTopHint
if self.windowFlags() != flags:
self.setWindowFlags(flags)
def ensure_visible(self, use_context=True, save=True, on_top=False):
"""Ensure the window is visible.
This method expects arguments for compatibility with previous variant
of Workfiles tool.
Args:
use_context (Optional[bool]): DEPRECATED: This argument is
ignored.
save (Optional[bool]): Allow to save workfiles.
on_top (Optional[bool]): Show on top of other windows.
"""
save = True if save is None else save
on_top = False if on_top is None else on_top
is_visible = self.isVisible()
self._controller.set_save_enabled(save)
self.set_window_on_top(on_top)
self.show()
self.raise_()
self.activateWindow()
if is_visible:
self.refresh()
def refresh(self):
"""Trigger refresh of workfiles tool controller."""
self._controller.refresh()
def showEvent(self, event):
super(WorkfilesToolWindow, self).showEvent(event)
if self._first_show:
self._first_show = False
self._first_show_timer.start()
self.setStyleSheet(style.load_stylesheet())
def keyPressEvent(self, event):
"""Custom keyPressEvent.
Override keyPressEvent to do nothing so that Maya's panels won't
take focus when pressing "SHIFT" whilst mouse is over viewport or
outliner. This way users don't accidentally perform Maya commands
whilst trying to name an instance.
"""
pass
def _on_first_show(self):
if not self._controller_refreshed:
self.refresh()
def _on_file_text_filter_change(self, text):
self._files_widget.set_text_filter(text)
def _on_published_checkbox_changed(self):
"""Publish mode changed.
Tell children widgets about it so they can handle the mode.
"""
published_mode = self._published_checkbox.isChecked()
self._files_widget.set_published_mode(published_mode)
self._side_panel.set_published_mode(published_mode)
def _on_folder_filter_change(self, text):
self._folder_widget.set_name_filer(text)
def _on_go_to_current_clicked(self):
self._controller.go_to_current_context()
def _on_refresh_clicked(self):
self.refresh()
def _on_controller_refresh_started(self):
self._controller_refreshed = True
def _on_controller_refresh_finished(self):
if self._host_is_valid is None:
self._host_is_valid = self._controller.is_host_valid()
self._overlay_invalid_host.setVisible(not self._host_is_valid)
if not self._host_is_valid:
return
def _on_save_as_finished(self, event):
if event["failed"]:
self._overlay_messages_widget.add_message(
"Failed to save workfile",
"error",
)
else:
self._overlay_messages_widget.add_message(
"Workfile saved"
)
def _on_copy_representation_finished(self, event):
if event["failed"]:
self._overlay_messages_widget.add_message(
"Failed to copy published workfile",
"error",
)
else:
self._overlay_messages_widget.add_message(
"Publish workfile saved"
)
def _on_duplicate_finished(self, event):
if event["failed"]:
self._overlay_messages_widget.add_message(
"Failed to duplicate workfile",
"error",
)
else:
self._overlay_messages_widget.add_message(
"Workfile duplicated"
)
def _on_open_finished(self, event):
if event["failed"]:
self._overlay_messages_widget.add_message(
"Failed to open workfile",
"error",
)
else:
self.close()

View file

@ -6,6 +6,8 @@ use singleton approach with global functions (using helper anyway).
import os
import pyblish.api
from openpype import AYON_SERVER_ENABLED
from openpype.host import IWorkfileHost, ILoadHost
from openpype.lib import Logger
from openpype.pipeline import (
@ -46,17 +48,29 @@ class HostToolsHelper:
self._log = Logger.get_logger(self.__class__.__name__)
return self._log
def _init_ayon_workfiles_tool(self, parent):
from openpype.tools.ayon_workfiles.widgets import WorkfilesToolWindow
workfiles_window = WorkfilesToolWindow(parent=parent)
self._workfiles_tool = workfiles_window
def _init_openpype_workfiles_tool(self, parent):
from openpype.tools.workfiles.app import Window
# Host validation
host = registered_host()
IWorkfileHost.validate_workfile_methods(host)
workfiles_window = Window(parent=parent)
self._workfiles_tool = workfiles_window
def get_workfiles_tool(self, parent):
"""Create, cache and return workfiles tool window."""
if self._workfiles_tool is None:
from openpype.tools.workfiles.app import Window
# Host validation
host = registered_host()
IWorkfileHost.validate_workfile_methods(host)
workfiles_window = Window(parent=parent)
self._workfiles_tool = workfiles_window
if AYON_SERVER_ENABLED:
self._init_ayon_workfiles_tool(parent)
else:
self._init_openpype_workfiles_tool(parent)
return self._workfiles_tool