Merge branch 'develop' into enhancement/per-project-bundle

This commit is contained in:
Jakub Trllo 2025-07-28 11:20:22 +02:00 committed by GitHub
commit 948cd33549
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 4550 additions and 1676 deletions

View file

@ -35,6 +35,7 @@ body:
label: Version
description: What version are you running? Look to AYON Tray
options:
- 1.5.0
- 1.4.1
- 1.4.0
- 1.3.2

View file

@ -1,9 +1,12 @@
from .constants import ContextChangeReason
from .host import (
HostBase,
)
from .interfaces import (
IWorkfileHost,
WorkfileInfo,
PublishedWorkfileInfo,
ILoadHost,
IPublishHost,
INewPublisher,
@ -13,9 +16,13 @@ from .dirmap import HostDirmap
__all__ = (
"ContextChangeReason",
"HostBase",
"IWorkfileHost",
"WorkfileInfo",
"PublishedWorkfileInfo",
"ILoadHost",
"IPublishHost",
"INewPublisher",

View file

@ -0,0 +1,15 @@
from enum import Enum
class StrEnum(str, Enum):
"""A string-based Enum class that allows for string comparison."""
def __str__(self) -> str:
return self.value
class ContextChangeReason(StrEnum):
"""Reasons for context change in the host."""
undefined = "undefined"
workfile_open = "workfile.opened"
workfile_save = "workfile.saved"

View file

@ -1,10 +1,37 @@
from __future__ import annotations
import os
import logging
import contextlib
from abc import ABC, abstractproperty
from abc import ABC, abstractmethod
from dataclasses import dataclass
import typing
from typing import Optional, Any
# NOTE can't import 'typing' because of issues in Maya 2020
# - shiboken crashes on 'typing' module import
import ayon_api
from ayon_core.lib import emit_event
from .constants import ContextChangeReason
if typing.TYPE_CHECKING:
from ayon_core.pipeline import Anatomy
from typing import TypedDict
class HostContextData(TypedDict):
project_name: str
folder_path: Optional[str]
task_name: Optional[str]
@dataclass
class ContextChangeData:
project_entity: dict[str, Any]
folder_entity: dict[str, Any]
task_entity: dict[str, Any]
reason: ContextChangeReason
anatomy: Anatomy
class HostBase(ABC):
@ -92,8 +119,9 @@ class HostBase(ABC):
self._log = logging.getLogger(self.__class__.__name__)
return self._log
@abstractproperty
def name(self):
@property
@abstractmethod
def name(self) -> str:
"""Host name."""
pass
@ -106,7 +134,7 @@ class HostBase(ABC):
return os.environ.get("AYON_PROJECT_NAME")
def get_current_folder_path(self):
def get_current_folder_path(self) -> Optional[str]:
"""
Returns:
Union[str, None]: Current asset name.
@ -114,7 +142,7 @@ class HostBase(ABC):
return os.environ.get("AYON_FOLDER_PATH")
def get_current_task_name(self):
def get_current_task_name(self) -> Optional[str]:
"""
Returns:
Union[str, None]: Current task name.
@ -122,7 +150,7 @@ class HostBase(ABC):
return os.environ.get("AYON_TASK_NAME")
def get_current_context(self):
def get_current_context(self) -> "HostContextData":
"""Get current context information.
This method should be used to get current context of host. Usage of
@ -141,6 +169,75 @@ class HostBase(ABC):
"task_name": self.get_current_task_name()
}
def set_current_context(
self,
folder_entity: dict[str, Any],
task_entity: dict[str, Any],
*,
reason: ContextChangeReason = ContextChangeReason.undefined,
project_entity: Optional[dict[str, Any]] = None,
anatomy: Optional[Anatomy] = None,
) -> "HostContextData":
"""Set current context information.
This method should be used to set current context of host. Usage of
this method can be crucial for host implementations in DCCs where
can be opened multiple workfiles at one moment and change of context
can't be caught properly.
Notes:
This method should not care about change of workdir and expect any
of the arguments.
Args:
folder_entity (Optional[dict[str, Any]]): Folder entity.
task_entity (Optional[dict[str, Any]]): Task entity.
reason (ContextChangeReason): Reason for context change.
project_entity (Optional[dict[str, Any]]): Project entity data.
anatomy (Optional[Anatomy]): Anatomy instance for the project.
Returns:
dict[str, Optional[str]]: Context information with project name,
folder path and task name.
"""
from ayon_core.pipeline import Anatomy
folder_path = folder_entity["path"]
task_name = task_entity["name"]
context = self.get_current_context()
# Don't do anything if context did not change
if (
context["folder_path"] == folder_path
and context["task_name"] == task_name
):
return context
project_name = self.get_current_project_name()
if project_entity is None:
project_entity = ayon_api.get_project(project_name)
if anatomy is None:
anatomy = Anatomy(project_name, project_entity=project_entity)
context_change_data = ContextChangeData(
project_entity,
folder_entity,
task_entity,
reason,
anatomy,
)
self._before_context_change(context_change_data)
self._set_current_context(context_change_data)
self._after_context_change(context_change_data)
return self._emit_context_change_event(
project_name,
folder_path,
task_name,
)
def get_context_title(self):
"""Context title shown for UI purposes.
@ -187,3 +284,91 @@ class HostBase(ABC):
yield
finally:
pass
def _emit_context_change_event(
self,
project_name: str,
folder_path: Optional[str],
task_name: Optional[str],
) -> "HostContextData":
"""Emit context change event.
Args:
project_name (str): Name of the project.
folder_path (Optional[str]): Path of the folder.
task_name (Optional[str]): Name of the task.
Returns:
HostContextData: Data send to context change event.
"""
data = {
"project_name": project_name,
"folder_path": folder_path,
"task_name": task_name,
}
emit_event("taskChanged", data)
return data
def _set_current_context(
self, context_change_data: ContextChangeData
) -> None:
"""Method that changes the context in host.
Can be overriden for hosts that do need different handling of context
than using environment variables.
Args:
context_change_data (ContextChangeData): Context change related
data.
"""
project_name = self.get_current_project_name()
folder_path = None
task_name = None
if context_change_data.folder_entity:
folder_path = context_change_data.folder_entity["path"]
if context_change_data.task_entity:
task_name = context_change_data.task_entity["name"]
envs = {
"AYON_PROJECT_NAME": project_name,
"AYON_FOLDER_PATH": folder_path,
"AYON_TASK_NAME": task_name,
}
# Update the Session and environments. Pop from environments all
# keys with value set to None.
for key, value in envs.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
def _before_context_change(self, context_change_data: ContextChangeData):
"""Before context is changed.
This method is called before the context is changed in the host.
Can be overridden to implement host specific logic.
Args:
context_change_data (ContextChangeData): Object with information
about context change.
"""
pass
def _after_context_change(self, context_change_data: ContextChangeData):
"""After context is changed.
This method is called after the context is changed in the host.
Can be overridden to implement host specific logic.
Args:
context_change_data (ContextChangeData): Object with information
about context change.
"""
pass

View file

@ -0,0 +1,66 @@
from .exceptions import MissingMethodsError
from .workfiles import (
IWorkfileHost,
WorkfileInfo,
PublishedWorkfileInfo,
OpenWorkfileOptionalData,
ListWorkfilesOptionalData,
ListPublishedWorkfilesOptionalData,
SaveWorkfileOptionalData,
CopyWorkfileOptionalData,
CopyPublishedWorkfileOptionalData,
get_open_workfile_context,
get_list_workfiles_context,
get_list_published_workfiles_context,
get_save_workfile_context,
get_copy_workfile_context,
get_copy_repre_workfile_context,
OpenWorkfileContext,
ListWorkfilesContext,
ListPublishedWorkfilesContext,
SaveWorkfileContext,
CopyWorkfileContext,
CopyPublishedWorkfileContext,
)
from .interfaces import (
IPublishHost,
INewPublisher,
ILoadHost,
)
__all__ = (
"MissingMethodsError",
"IWorkfileHost",
"WorkfileInfo",
"PublishedWorkfileInfo",
"OpenWorkfileOptionalData",
"ListWorkfilesOptionalData",
"ListPublishedWorkfilesOptionalData",
"SaveWorkfileOptionalData",
"CopyWorkfileOptionalData",
"CopyPublishedWorkfileOptionalData",
"get_open_workfile_context",
"get_list_workfiles_context",
"get_list_published_workfiles_context",
"get_save_workfile_context",
"get_copy_workfile_context",
"get_copy_repre_workfile_context",
"OpenWorkfileContext",
"ListWorkfilesContext",
"ListPublishedWorkfilesContext",
"SaveWorkfileContext",
"CopyWorkfileContext",
"CopyPublishedWorkfileContext",
"IPublishHost",
"INewPublisher",
"ILoadHost",
)

View file

@ -0,0 +1,15 @@
class MissingMethodsError(ValueError):
"""Exception when host miss some required methods for a specific workflow.
Args:
host (HostBase): Host implementation where are missing methods.
missing_methods (list[str]): List of missing methods.
"""
def __init__(self, host, missing_methods):
joined_missing = ", ".join(
['"{}"'.format(item) for item in missing_methods]
)
super().__init__(
f"Host \"{host.name}\" miss methods {joined_missing}"
)

View file

@ -1,28 +1,6 @@
from abc import ABC, abstractmethod
from abc import abstractmethod
class MissingMethodsError(ValueError):
"""Exception when host miss some required methods for specific workflow.
Args:
host (HostBase): Host implementation where are missing methods.
missing_methods (list[str]): List of missing methods.
"""
def __init__(self, host, missing_methods):
joined_missing = ", ".join(
['"{}"'.format(item) for item in missing_methods]
)
host_name = getattr(host, "name", None)
if not host_name:
try:
host_name = host.__file__.replace("\\", "/").split("/")[-3]
except Exception:
host_name = str(host)
message = (
"Host \"{}\" miss methods {}".format(host_name, joined_missing)
)
super(MissingMethodsError, self).__init__(message)
from .exceptions import MissingMethodsError
class ILoadHost:
@ -105,181 +83,6 @@ class ILoadHost:
return self.get_containers()
class IWorkfileHost(ABC):
"""Implementation requirements to be able use workfile utils and tool."""
@staticmethod
def get_missing_workfile_methods(host):
"""Look for missing methods on "old type" host implementation.
Method is used for validation of implemented functions related to
workfiles. Checks only existence of methods.
Args:
Union[ModuleType, HostBase]: Object of host where to look for
required methods.
Returns:
list[str]: Missing method implementations for workfiles workflow.
"""
if isinstance(host, IWorkfileHost):
return []
required = [
"open_file",
"save_file",
"current_file",
"has_unsaved_changes",
"file_extensions",
"work_root",
]
missing = []
for name in required:
if not hasattr(host, name):
missing.append(name)
return missing
@staticmethod
def validate_workfile_methods(host):
"""Validate methods of "old type" host for workfiles workflow.
Args:
Union[ModuleType, HostBase]: Object of host to validate.
Raises:
MissingMethodsError: If there are missing methods on host
implementation.
"""
missing = IWorkfileHost.get_missing_workfile_methods(host)
if missing:
raise MissingMethodsError(host, missing)
@abstractmethod
def get_workfile_extensions(self):
"""Extensions that can be used as save.
Questions:
This could potentially use 'HostDefinition'.
"""
return []
@abstractmethod
def save_workfile(self, dst_path=None):
"""Save currently opened scene.
Args:
dst_path (str): Where the current scene should be saved. Or use
current path if 'None' is passed.
"""
pass
@abstractmethod
def open_workfile(self, filepath):
"""Open passed filepath in the host.
Args:
filepath (str): Path to workfile.
"""
pass
@abstractmethod
def get_current_workfile(self):
"""Retrieve path to current opened file.
Returns:
str: Path to file which is currently opened.
None: If nothing is opened.
"""
return None
def workfile_has_unsaved_changes(self):
"""Currently opened scene is saved.
Not all hosts can know if current scene is saved because the API of
DCC does not support it.
Returns:
bool: True if scene is saved and False if has unsaved
modifications.
None: Can't tell if workfiles has modifications.
"""
return None
def work_root(self, session):
"""Modify workdir per host.
Default implementation keeps workdir untouched.
Warnings:
We must handle this modification with more sophisticated way
because this can't be called out of DCC so opening of last workfile
(calculated before DCC is launched) is complicated. Also breaking
defined work template is not a good idea.
Only place where it's really used and can make sense is Maya. There
workspace.mel can modify subfolders where to look for maya files.
Args:
session (dict): Session context data.
Returns:
str: Path to new workdir.
"""
return session["AYON_WORKDIR"]
# --- Deprecated method names ---
def file_extensions(self):
"""Deprecated variant of 'get_workfile_extensions'.
Todo:
Remove when all usages are replaced.
"""
return self.get_workfile_extensions()
def save_file(self, dst_path=None):
"""Deprecated variant of 'save_workfile'.
Todo:
Remove when all usages are replaced.
"""
self.save_workfile(dst_path)
def open_file(self, filepath):
"""Deprecated variant of 'open_workfile'.
Todo:
Remove when all usages are replaced.
"""
return self.open_workfile(filepath)
def current_file(self):
"""Deprecated variant of 'get_current_workfile'.
Todo:
Remove when all usages are replaced.
"""
return self.get_current_workfile()
def has_unsaved_changes(self):
"""Deprecated variant of 'workfile_has_unsaved_changes'.
Todo:
Remove when all usages are replaced.
"""
return self.workfile_has_unsaved_changes()
class IPublishHost:
"""Functions related to new creation system in new publisher.

File diff suppressed because it is too large Load diff

View file

@ -15,6 +15,10 @@ import ayon_api
_PLACEHOLDER = object()
class _Cache:
username = None
def _get_ayon_appdirs(*args):
return os.path.join(
platformdirs.user_data_dir("AYON", "Ynput"),
@ -591,10 +595,26 @@ def get_local_site_id():
def get_ayon_username():
"""AYON username used for templates and publishing.
Uses curet ayon api username.
Uses current ayon api username.
Returns:
str: Username.
"""
return ayon_api.get_user()["name"]
# Look for username in the connection stack
# - this is used when service is working as other user
# (e.g. in background sync)
# TODO @iLLiCiTiT - do not use private attribute of 'ServerAPI', rather
# use public method to get username from connection stack.
con = ayon_api.get_server_api_connection()
user_stack = getattr(con, "_as_user_stack", None)
if user_stack is not None:
username = user_stack.username
if username is not None:
return username
# Cache the username to avoid multiple API calls
# - it is not expected that user would change
if _Cache.username is None:
_Cache.username = ayon_api.get_user()["name"]
return _Cache.username

View file

@ -3,6 +3,7 @@ import re
import copy
import numbers
import warnings
import platform
from string import Formatter
import typing
from typing import List, Dict, Any, Set
@ -12,6 +13,7 @@ if typing.TYPE_CHECKING:
SUB_DICT_PATTERN = re.compile(r"([^\[\]]+)")
OPTIONAL_PATTERN = re.compile(r"(<.*?[^{0]*>)[^0-9]*?")
_IS_WINDOWS = platform.system().lower() == "windows"
class TemplateUnsolved(Exception):
@ -277,8 +279,11 @@ class TemplateResult(str):
"""Convert to normalized path."""
cls = self.__class__
path = str(self)
if _IS_WINDOWS:
path = path.replace("\\", "/")
return cls(
os.path.normpath(self.replace("\\", "/")),
os.path.normpath(path),
self.template,
self.solved,
self.used_values,

View file

@ -6,6 +6,7 @@ from .exceptions import (
AnatomyTemplateUnsolved,
)
from .anatomy import Anatomy
from .templates import AnatomyTemplateResult, AnatomyStringTemplate
__all__ = (
@ -16,4 +17,7 @@ __all__ = (
"AnatomyTemplateUnsolved",
"Anatomy",
"AnatomyTemplateResult",
"AnatomyStringTemplate",
)

View file

@ -1,6 +1,7 @@
import os
import re
import copy
import platform
import collections
import numbers
@ -15,6 +16,7 @@ from .exceptions import (
AnatomyTemplateUnsolved,
)
_IS_WINDOWS = platform.system().lower() == "windows"
_PLACEHOLDER = object()
@ -526,6 +528,14 @@ class AnatomyTemplates:
root_key = "{" + root_key + "}"
output = output.replace(str(used_value), root_key)
# Make sure rootless path is with forward slashes
if _IS_WINDOWS:
output.replace("\\", "/")
# Make sure there are no double slashes
while "//" in output:
output = output.replace("//", "/")
return output
def format(self, data, strict=True):

View file

@ -1,9 +1,12 @@
"""Core pipeline functionality"""
from __future__ import annotations
import os
import logging
import platform
import uuid
import warnings
from typing import Optional, Any
import ayon_api
import pyblish.api
@ -14,8 +17,6 @@ from ayon_core.host import HostBase
from ayon_core.lib import (
is_in_tests,
initialize_ayon_connection,
emit_event,
version_up
)
from ayon_core.addon import load_addons, AddonsManager
from ayon_core.settings import get_project_settings
@ -23,13 +24,7 @@ from ayon_core.settings import get_project_settings
from .publish.lib import filter_pyblish_plugins
from .anatomy import Anatomy
from .template_data import get_template_data_with_names
from .workfile import (
get_workdir,
get_custom_workfile_template_by_string_context,
get_workfile_template_key_from_context,
get_last_workfile,
MissingWorkdirError,
)
from .workfile import get_custom_workfile_template_by_string_context
from . import (
register_loader_plugin_path,
register_inventory_action_path,
@ -75,7 +70,7 @@ def _get_addons_manager():
def register_root(path):
"""Register currently active root"""
"""DEPRECATED Register currently active root."""
log.info("Registering root: %s" % path)
_registered_root["_"] = path
@ -94,18 +89,29 @@ def registered_root():
Returns:
dict[str, str]: Root paths.
"""
"""
warnings.warn(
"Used deprecated function 'registered_root'. Please use 'Anatomy'"
" to get roots.",
DeprecationWarning,
stacklevel=2,
)
return _registered_root["_"]
def install_host(host):
def install_host(host: HostBase) -> None:
"""Install `host` into the running Python session.
Args:
host (HostBase): A host interface object.
"""
if not isinstance(host, HostBase):
log.error(
f"Host must be a subclass of 'HostBase', got '{type(host)}'."
)
global _is_installed
_is_installed = True
@ -183,7 +189,7 @@ def install_ayon_plugins(project_name=None, host_name=None):
register_inventory_action_path(INVENTORY_PATH)
if host_name is None:
host_name = os.environ.get("AYON_HOST_NAME")
host_name = get_current_host_name()
addons_manager = _get_addons_manager()
publish_plugin_dirs = addons_manager.collect_publish_plugin_paths(
@ -366,6 +372,24 @@ def get_current_task_name():
return get_global_context()["task_name"]
def get_current_project_settings() -> dict[str, Any]:
"""Project settings for the current context project.
Returns:
dict[str, Any]: Project settings for the current context project.
Raises:
ValueError: If current project is not set.
"""
project_name = get_current_project_name()
if not project_name:
raise ValueError(
"Current project is not set. Can't get project settings."
)
return get_project_settings(project_name)
def get_current_project_entity(fields=None):
"""Helper function to get project document based on global Session.
@ -505,66 +529,64 @@ def get_current_context_custom_workfile_template(project_settings=None):
)
def change_current_context(folder_entity, task_entity, template_key=None):
_PLACEHOLDER = object()
def change_current_context(
folder_entity: dict[str, Any],
task_entity: dict[str, Any],
*,
template_key: Optional[str] = _PLACEHOLDER,
reason: Optional[str] = None,
project_entity: Optional[dict[str, Any]] = None,
anatomy: Optional[Anatomy] = None,
) -> dict[str, str]:
"""Update active Session to a new task work area.
This updates the live Session to a different task under folder.
This updates the live Session to a different task under a folder.
Notes:
* This function does a lot of things related to workfiles which
extends arguments options a lot.
* We might want to implement 'set_current_context' on host integration
instead. But `AYON_WORKDIR`, which is related to 'IWorkfileHost',
would not be available in that case which might break some
logic.
Args:
folder_entity (Dict[str, Any]): Folder entity to set.
task_entity (Dict[str, Any]): Task entity to set.
template_key (Union[str, None]): Prepared template key to be used for
workfile template in Anatomy.
template_key (Optional[str]): DEPRECATED: Prepared template key to
be used for workfile template in Anatomy.
reason (Optional[str]): Reason for changing context.
anatomy (Optional[Anatomy]): Anatomy object used for workdir
calculation.
project_entity (Optional[dict[str, Any]]): Project entity used for
workdir calculation.
Returns:
Dict[str, str]: The changed key, values in the current Session.
"""
dict[str, str]: New context data.
project_name = get_current_project_name()
workdir = None
folder_path = None
task_name = None
if folder_entity:
folder_path = folder_entity["path"]
if task_entity:
task_name = task_entity["name"]
project_entity = ayon_api.get_project(project_name)
host_name = get_current_host_name()
workdir = get_workdir(
project_entity,
folder_entity,
task_entity,
host_name,
template_key=template_key
"""
if template_key is not _PLACEHOLDER:
warnings.warn(
(
"Used deprecated argument 'template_key' in"
" 'change_current_context'."
" It is not necessary to pass it in anymore."
),
DeprecationWarning,
stacklevel=2,
)
envs = {
"AYON_PROJECT_NAME": project_name,
"AYON_FOLDER_PATH": folder_path,
"AYON_TASK_NAME": task_name,
"AYON_WORKDIR": workdir,
}
# Update the Session and environments. Pop from environments all keys with
# value set to None.
for key, value in envs.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
data = envs.copy()
# Convert env keys to human readable keys
data["project_name"] = project_name
data["folder_path"] = folder_path
data["task_name"] = task_name
data["workdir_path"] = workdir
# Emit session change
emit_event("taskChanged", data)
return data
host = registered_host()
return host.set_current_context(
folder_entity,
task_entity,
reason=reason,
project_entity=project_entity,
anatomy=anatomy,
)
def get_process_id():
@ -583,53 +605,16 @@ def get_process_id():
def version_up_current_workfile():
"""Function to increment and save workfile
"""DEPRECATED Function to increment and save workfile.
Please use 'save_next_version' from 'ayon_core.pipeline.workfile' instead.
"""
host = registered_host()
project_name = get_current_project_name()
folder_path = get_current_folder_path()
task_name = get_current_task_name()
host_name = get_current_host_name()
template_key = get_workfile_template_key_from_context(
project_name,
folder_path,
task_name,
host_name,
warnings.warn(
"Used deprecated 'version_up_current_workfile' please use"
" 'save_next_version' from 'ayon_core.pipeline.workfile' instead.",
DeprecationWarning,
stacklevel=2,
)
anatomy = Anatomy(project_name)
data = get_template_data_with_names(
project_name, folder_path, task_name, host_name
)
data["root"] = anatomy.roots
work_template = anatomy.get_template_item("work", template_key)
# Define saving file extension
extensions = host.get_workfile_extensions()
current_file = host.get_current_workfile()
if current_file:
extensions = [os.path.splitext(current_file)[-1]]
work_root = work_template["directory"].format_strict(data)
file_template = work_template["file"].template
last_workfile_path = get_last_workfile(
work_root, file_template, data, extensions, True
)
# `get_last_workfile` will return the first expected file version
# if no files exist yet. In that case, if they do not exist we will
# want to save v001
new_workfile_path = last_workfile_path
if os.path.exists(new_workfile_path):
new_workfile_path = version_up(new_workfile_path)
# Raise an error if the parent folder doesn't exist as `host.save_workfile`
# is not supposed/able to create missing folders.
parent_folder = os.path.dirname(new_workfile_path)
if not os.path.exists(parent_folder):
raise MissingWorkdirError(
f"Work area directory '{parent_folder}' does not exist.")
host.save_workfile(new_workfile_path)
from ayon_core.pipeline.workfile import save_next_version
save_next_version()

View file

@ -720,11 +720,13 @@ def get_representation_path(representation, root=None):
str: fullpath of the representation
"""
if root is None:
from ayon_core.pipeline import registered_root
from ayon_core.pipeline import get_current_project_name, Anatomy
root = registered_root()
anatomy = Anatomy(get_current_project_name())
return get_representation_path_with_anatomy(
representation, anatomy
)
def path_from_representation():
try:
@ -772,7 +774,7 @@ def get_representation_path(representation, root=None):
dir_path, file_name = os.path.split(path)
if not os.path.exists(dir_path):
return
return None
base_name, ext = os.path.splitext(file_name)
file_name_items = None
@ -782,7 +784,7 @@ def get_representation_path(representation, root=None):
file_name_items = base_name.split("%")
if not file_name_items:
return
return None
filename_start = file_name_items[0]

View file

@ -4,6 +4,8 @@ from .path_resolving import (
get_workdir_with_workdir_data,
get_workdir,
get_last_workfile_with_version_from_paths,
get_last_workfile_from_paths,
get_last_workfile_with_version,
get_last_workfile,
@ -11,12 +13,21 @@ from .path_resolving import (
get_custom_workfile_template_by_string_context,
create_workdir_extra_folders,
get_comments_from_workfile_paths,
)
from .utils import (
should_use_last_workfile_on_launch,
should_open_workfiles_tool_on_launch,
MissingWorkdirError,
save_workfile_info,
save_current_workfile_to,
save_workfile_with_current_context,
save_next_version,
copy_workfile_to_context,
find_workfile_rootless_path,
)
from .build_workfile import BuildWorkfile
@ -37,18 +48,29 @@ __all__ = (
"get_workdir_with_workdir_data",
"get_workdir",
"get_last_workfile_with_version_from_paths",
"get_last_workfile_from_paths",
"get_last_workfile_with_version",
"get_last_workfile",
"find_workfile_rootless_path",
"get_custom_workfile_template",
"get_custom_workfile_template_by_string_context",
"create_workdir_extra_folders",
"get_comments_from_workfile_paths",
"should_use_last_workfile_on_launch",
"should_open_workfiles_tool_on_launch",
"MissingWorkdirError",
"save_workfile_info",
"save_current_workfile_to",
"save_workfile_with_current_context",
"save_next_version",
"copy_workfile_to_context",
"BuildWorkfile",
"discover_workfile_build_plugins",

View file

@ -1,8 +1,12 @@
from __future__ import annotations
import os
import re
import copy
import platform
import warnings
import typing
from typing import Optional, Dict, Any
from dataclasses import dataclass
import ayon_api
@ -15,6 +19,9 @@ from ayon_core.lib import (
from ayon_core.pipeline import version_start, Anatomy
from ayon_core.pipeline.template_data import get_template_data
if typing.TYPE_CHECKING:
from ayon_core.pipeline.anatomy import AnatomyTemplateResult
def get_workfile_template_key_from_context(
project_name: str,
@ -111,7 +118,7 @@ def get_workdir_with_workdir_data(
anatomy=None,
template_key=None,
project_settings=None
):
) -> "AnatomyTemplateResult":
"""Fill workdir path from entered data and project's anatomy.
It is possible to pass only project's name instead of project's anatomy but
@ -130,9 +137,9 @@ def get_workdir_with_workdir_data(
if 'template_key' is not passed.
Returns:
TemplateResult: Workdir path.
"""
AnatomyTemplateResult: Workdir path.
"""
if not anatomy:
anatomy = Anatomy(project_name)
@ -147,7 +154,7 @@ def get_workdir_with_workdir_data(
template_obj = anatomy.get_template_item(
"work", template_key, "directory"
)
# Output is TemplateResult object which contain useful data
# Output is AnatomyTemplateResult object which contain useful data
output = template_obj.format_strict(workdir_data)
if output:
return output.normalized()
@ -155,14 +162,14 @@ def get_workdir_with_workdir_data(
def get_workdir(
project_entity,
folder_entity,
task_entity,
host_name,
project_entity: dict[str, Any],
folder_entity: dict[str, Any],
task_entity: dict[str, Any],
host_name: str,
anatomy=None,
template_key=None,
project_settings=None
):
) -> "AnatomyTemplateResult":
"""Fill workdir path from entered data and project's anatomy.
Args:
@ -174,8 +181,8 @@ def get_workdir(
is stored under `AYON_HOST_NAME` key.
anatomy (Anatomy): Optional argument. Anatomy object is created using
project name from `project_entity`. It is preferred to pass this
argument as initialization of a new Anatomy object may be time
consuming.
argument as initialization of a new Anatomy object may be
time-consuming.
template_key (str): Key of work templates in anatomy templates. Default
value is defined in `get_workdir_with_workdir_data`.
project_settings(Dict[str, Any]): Prepared project settings for
@ -183,9 +190,9 @@ def get_workdir(
if 'template_key' is not passed.
Returns:
TemplateResult: Workdir path.
"""
AnatomyTemplateResult: Workdir path.
"""
if not anatomy:
anatomy = Anatomy(
project_entity["name"], project_entity=project_entity
@ -197,7 +204,7 @@ def get_workdir(
task_entity,
host_name,
)
# Output is TemplateResult object which contain useful data
# Output is AnatomyTemplateResult object which contain useful data
return get_workdir_with_workdir_data(
workdir_data,
anatomy.project_name,
@ -207,12 +214,141 @@ def get_workdir(
)
def get_last_workfile_with_version(
workdir, file_template, fill_data, extensions
):
@dataclass
class WorkfileParsedData:
version: Optional[int] = None
comment: Optional[str] = None
ext: Optional[str] = None
class WorkfileDataParser:
"""Parse dynamic data from existing filenames based on template.
Args:
file_template (str): Workfile file template.
data (dict[str, Any]): Data to fill the template with.
"""
def __init__(
self,
file_template: str,
data: dict[str, Any],
):
data = copy.deepcopy(data)
file_template = str(file_template)
# Use placeholders that will never be in the filename
ext_replacement = "CIextID"
version_replacement = "CIversionID"
comment_replacement = "CIcommentID"
data["version"] = version_replacement
data["comment"] = comment_replacement
for pattern, replacement in (
# Replace `.{ext}` with `{ext}` so we are sure dot is not
# at the end
(r"\.?{ext}", ext_replacement),
):
file_template = re.sub(pattern, replacement, file_template)
file_template = StringTemplate(file_template)
# Prepare template that does contain 'comment'
comment_template = re.escape(str(file_template.format_strict(data)))
# Prepare template that does not contain 'comment'
# - comment is usually marked as optional and in that case the regex
# to find the comment is different based on the filename
# - if filename contains comment then 'comment_template' will match
# - if filename does not contain comment then 'file_template' will
# match
data.pop("comment")
file_template = re.escape(str(file_template.format_strict(data)))
for src, replacement in (
(ext_replacement, r"(?P<ext>\..*)"),
(version_replacement, r"(?P<version>[0-9]+)"),
(comment_replacement, r"(?P<comment>.+?)"),
):
comment_template = comment_template.replace(src, replacement)
file_template = file_template.replace(src, replacement)
kwargs = {}
if platform.system().lower() == "windows":
kwargs["flags"] = re.IGNORECASE
# Match from beginning to end of string to be safe
self._comment_template = re.compile(f"^{comment_template}$", **kwargs)
self._file_template = re.compile(f"^{file_template}$", **kwargs)
def parse_data(self, filename: str) -> WorkfileParsedData:
"""Parse the dynamic data from a filename."""
match = self._comment_template.match(filename)
if not match:
match = self._file_template.match(filename)
if not match:
return WorkfileParsedData()
kwargs = match.groupdict()
version = kwargs.get("version")
if version is not None:
kwargs["version"] = int(version)
return WorkfileParsedData(**kwargs)
def parse_dynamic_data_from_workfile(
filename: str,
file_template: str,
template_data: dict[str, Any],
) -> WorkfileParsedData:
"""Parse dynamic data from a workfile filename.
Dynamic data are 'version', 'comment' and 'ext'.
Args:
filename (str): Workfile filename.
file_template (str): Workfile file template.
template_data (dict[str, Any]): Data to fill the template with.
Returns:
WorkfileParsedData: Dynamic data parsed from the filename.
"""
parser = WorkfileDataParser(file_template, template_data)
return parser.parse_data(filename)
def parse_dynamic_data_from_workfiles(
filenames: list[str],
file_template: str,
template_data: dict[str, Any],
) -> dict[str, WorkfileParsedData]:
"""Parse dynamic data from a workfiles filenames.
Dynamic data are 'version', 'comment' and 'ext'.
Args:
filenames (list[str]): Workfiles filenames.
file_template (str): Workfile file template.
template_data (dict[str, Any]): Data to fill the template with.
Returns:
dict[str, WorkfileParsedData]: Dynamic data parsed from the filenames
by filename.
"""
parser = WorkfileDataParser(file_template, template_data)
return {
filename: parser.parse_data(filename)
for filename in filenames
}
def get_last_workfile_with_version_from_paths(
filepaths: list[str],
file_template: str,
template_data: dict[str, Any],
extensions: set[str],
) -> tuple[Optional[str], Optional[int]]:
"""Return last workfile version.
Usign workfile template and it's filling data find most possible last
Using the workfile template and its template data find most possible last
version of workfile which was created for the context.
Functionality is fully based on knowing which keys are optional or what
@ -222,50 +358,43 @@ def get_last_workfile_with_version(
last workfile.
Args:
workdir (str): Path to dir where workfiles are stored.
filepaths (list[str]): Workfile paths.
file_template (str): Template of file name.
fill_data (Dict[str, Any]): Data for filling template.
extensions (Iterable[str]): All allowed file extensions of workfile.
template_data (Dict[str, Any]): Data for filling template.
extensions (set[str]): All allowed file extensions of workfile.
Returns:
Tuple[Union[str, None], Union[int, None]]: Last workfile with version
tuple[Optional[str], Optional[int]]: Last workfile with version
if there is any workfile otherwise None for both.
"""
if not os.path.exists(workdir):
"""
if not filepaths:
return None, None
dotted_extensions = set()
for ext in extensions:
if not ext.startswith("."):
ext = ".{}".format(ext)
dotted_extensions.add(ext)
# Fast match on extension
filenames = [
filename
for filename in os.listdir(workdir)
if os.path.splitext(filename)[-1] in dotted_extensions
]
ext = f".{ext}"
dotted_extensions.add(re.escape(ext))
# Build template without optionals, version to digits only regex
# and comment to any definable value.
# Escape extensions dot for regex
regex_exts = [
"\\" + ext
for ext in dotted_extensions
]
ext_expression = "(?:" + "|".join(regex_exts) + ")"
ext_expression = "(?:" + "|".join(dotted_extensions) + ")"
for pattern, replacement in (
# Replace `.{ext}` with `{ext}` so we are sure dot is not at the end
(r"\.?{ext}", ext_expression),
# Replace optional keys with optional content regex
(r"<.*?>", r".*?"),
# Replace `{version}` with group regex
(r"{version.*?}", r"([0-9]+)"),
(r"{comment.*?}", r".+?"),
):
file_template = re.sub(pattern, replacement, file_template)
# Replace `.{ext}` with `{ext}` so we are sure there is not dot at the end
file_template = re.sub(r"\.?{ext}", ext_expression, file_template)
# Replace optional keys with optional content regex
file_template = re.sub(r"<.*?>", r".*?", file_template)
# Replace `{version}` with group regex
file_template = re.sub(r"{version.*?}", r"([0-9]+)", file_template)
file_template = re.sub(r"{comment.*?}", r".+?", file_template)
file_template = StringTemplate.format_strict_template(
file_template, fill_data
file_template, template_data
)
# Match with ignore case on Windows due to the Windows
@ -278,64 +407,189 @@ def get_last_workfile_with_version(
# Get highest version among existing matching files
version = None
output_filenames = []
for filename in sorted(filenames):
output_filepaths = []
for filepath in sorted(filepaths):
filename = os.path.basename(filepath)
match = re.match(file_template, filename, **kwargs)
if not match:
continue
if not match.groups():
output_filenames.append(filename)
output_filepaths.append(filename)
continue
file_version = int(match.group(1))
if version is None or file_version > version:
output_filenames[:] = []
output_filepaths.clear()
version = file_version
if file_version == version:
output_filenames.append(filename)
output_filepaths.append(filepath)
output_filename = None
if output_filenames:
if len(output_filenames) == 1:
output_filename = output_filenames[0]
else:
last_time = None
for _output_filename in output_filenames:
full_path = os.path.join(workdir, _output_filename)
mod_time = os.path.getmtime(full_path)
if last_time is None or last_time < mod_time:
output_filename = _output_filename
last_time = mod_time
# Use file modification time to use most recent file if there are
# multiple workfiles with the same version
output_filepath = None
last_time = None
for _output_filepath in output_filepaths:
mod_time = None
if os.path.exists(_output_filepath):
mod_time = os.path.getmtime(_output_filepath)
if (
last_time is None
or (mod_time is not None and last_time < mod_time)
):
output_filepath = _output_filepath
last_time = mod_time
return output_filename, version
return output_filepath, version
def get_last_workfile(
workdir, file_template, fill_data, extensions, full_path=False
):
"""Return last workfile filename.
def get_last_workfile_from_paths(
filepaths: list[str],
file_template: str,
template_data: dict[str, Any],
extensions: set[str],
) -> Optional[str]:
"""Return the last workfile filename.
Returns file with version 1 if there is not workfile yet.
Returns the file with version 1 if there is not workfile yet.
Args:
filepaths (list[str]): Paths to workfiles.
file_template (str): Template of file name.
template_data (dict[str, Any]): Data for filling template.
extensions (set[str]): All allowed file extensions of workfile.
Returns:
Optional[str]: Last workfile path.
"""
filepath, _version = get_last_workfile_with_version_from_paths(
filepaths, file_template, template_data, extensions
)
return filepath
def _filter_dir_files_by_ext(
dirpath: str,
extensions: set[str],
) -> tuple[list[str], set[str]]:
"""Filter files by extensions.
Args:
dirpath (str): List of file paths.
extensions (set[str]): Set of file extensions.
Returns:
tuple[list[str], set[str]]: Filtered list of file paths.
"""
dotted_extensions = set()
for ext in extensions:
if not ext.startswith("."):
ext = f".{ext}"
dotted_extensions.add(ext)
if not os.path.exists(dirpath):
return [], dotted_extensions
filtered_paths = [
os.path.join(dirpath, filename)
for filename in os.listdir(dirpath)
if os.path.splitext(filename)[-1] in dotted_extensions
]
return filtered_paths, dotted_extensions
def get_last_workfile_with_version(
workdir: str,
file_template: str,
template_data: dict[str, Any],
extensions: set[str],
) -> tuple[Optional[str], Optional[int]]:
"""Return last workfile version.
Using the workfile template and its filling data to find the most possible
last version of workfile which was created for the context.
Functionality is fully based on knowing which keys are optional or what
values are expected as value.
The last modified file is used if more files can be considered as
last workfile.
Args:
workdir (str): Path to dir where workfiles are stored.
file_template (str): Template of file name.
fill_data (Dict[str, Any]): Data for filling template.
extensions (Iterable[str]): All allowed file extensions of workfile.
full_path (Optional[bool]): Full path to file is returned if
set to True.
template_data (dict[str, Any]): Data for filling template.
extensions (set[str]): All allowed file extensions of workfile.
Returns:
str: Last or first workfile as filename of full path to filename.
tuple[Optional[str], Optional[int]]: Last workfile with version
if there is any workfile otherwise None for both.
"""
filename, _version = get_last_workfile_with_version(
workdir, file_template, fill_data, extensions
if not os.path.exists(workdir):
return None, None
filepaths, dotted_extensions = _filter_dir_files_by_ext(
workdir, extensions
)
if filename is None:
data = copy.deepcopy(fill_data)
return get_last_workfile_with_version_from_paths(
filepaths,
file_template,
template_data,
dotted_extensions,
)
def get_last_workfile(
workdir: str,
file_template: str,
template_data: dict[str, Any],
extensions: set[str],
full_path: bool = False,
) -> str:
"""Return last the workfile filename.
Returns first file name/path if there are not workfiles yet.
Args:
workdir (str): Path to dir where workfiles are stored.
file_template (str): Template of file name.
template_data (Dict[str, Any]): Data for filling template.
extensions (Iterable[str]): All allowed file extensions of workfile.
full_path (bool): Return full path to the file or only filename.
Returns:
str: Last or first workfile file name or path based on
'full_path' value.
"""
# TODO (iLLiCiTiT): Remove the argument 'full_path' and return only full
# path. As far as I can tell it is always called with 'full_path' set
# to 'True'.
# - it has to be 2 step operation, first warn about having it 'False', and
# then warn about having it filled.
if full_path is False:
warnings.warn(
"Argument 'full_path' will be removed and will return"
" only full path in future.",
DeprecationWarning,
)
filepaths, dotted_extensions = _filter_dir_files_by_ext(
workdir, extensions
)
filepath = get_last_workfile_from_paths(
filepaths,
file_template,
template_data,
dotted_extensions
)
if filepath is None:
data = copy.deepcopy(template_data)
data["version"] = version_start.get_versioning_start(
data["project"]["name"],
data["app"],
@ -344,15 +598,15 @@ def get_last_workfile(
product_type="workfile"
)
data.pop("comment", None)
if not data.get("ext"):
data["ext"] = extensions[0]
if data.get("ext") is None:
data["ext"] = next(iter(extensions), "")
data["ext"] = data["ext"].lstrip(".")
filename = StringTemplate.format_strict_template(file_template, data)
filepath = os.path.join(workdir, filename)
if full_path:
return os.path.normpath(os.path.join(workdir, filename))
return filename
return os.path.normpath(filepath)
return os.path.basename(filepath)
def get_custom_workfile_template(
@ -389,11 +643,10 @@ def get_custom_workfile_template(
project_settings(Dict[str, Any]): Preloaded project settings.
Returns:
str: Path to template or None if none of profiles match current
context. Existence of formatted path is not validated.
None: If no profile is matching context.
"""
Optional[str]: Path to template or None if none of profiles match
current context. Existence of formatted path is not validated.
"""
log = Logger.get_logger("CustomWorkfileResolve")
project_name = project_entity["name"]
@ -562,3 +815,112 @@ def create_workdir_extra_folders(
fullpath = os.path.join(workdir, subfolder)
if not os.path.exists(fullpath):
os.makedirs(fullpath)
class CommentMatcher:
"""Use anatomy and work file data to parse comments from filenames.
Args:
extensions (set[str]): Set of extensions.
file_template (StringTemplate): Workfile file template.
data (dict[str, Any]): Data to fill the template with.
"""
def __init__(
self,
extensions: set[str],
file_template: StringTemplate,
data: dict[str, Any]
):
warnings.warn(
"Class 'CommentMatcher' is deprecated. Please"
" use 'parse_dynamic_data_from_workfiles' instead.",
DeprecationWarning,
stacklevel=2,
)
self._fname_regex = None
if "{comment}" not in file_template:
# Don't look for comment if template doesn't allow it
return
# Create a regex group for extensions
any_extension = "(?:{})".format(
"|".join(re.escape(ext.lstrip(".")) for ext in extensions)
)
# Use placeholders that will never be in the filename
temp_data = copy.deepcopy(data)
temp_data["comment"] = "<<comment>>"
temp_data["version"] = "<<version>>"
temp_data["ext"] = "<<ext>>"
fname_pattern = re.escape(
file_template.format_strict(temp_data)
)
# Replace comment and version with something we can match with regex
replacements = (
("<<comment>>", r"(?P<comment>.+)"),
("<<version>>", r"[0-9]+"),
("<<ext>>", any_extension),
)
for src, dest in replacements:
fname_pattern = fname_pattern.replace(re.escape(src), dest)
# Match from beginning to end of string to be safe
self._fname_regex = re.compile(f"^{fname_pattern}$")
def parse_comment(self, filename: str) -> Optional[str]:
"""Parse the {comment} part from a filename."""
if self._fname_regex:
match = self._fname_regex.match(filename)
if match:
return match.group("comment")
return None
def get_comments_from_workfile_paths(
filepaths: list[str],
extensions: set[str],
file_template: StringTemplate,
template_data: dict[str, Any],
current_filename: Optional[str] = None,
) -> tuple[list[str], str]:
"""DEPRECATED Collect comments from workfile filenames.
Based on 'current_filename' is also returned "current comment".
Args:
filepaths (list[str]): List of filepaths to parse.
extensions (set[str]): Set of file extensions.
file_template (StringTemplate): Workfile file template.
template_data (dict[str, Any]): Data to fill the template with.
current_filename (str): Filename to check for the current comment.
Returns:
tuple[list[str], str]: List of comments and the current comment.
"""
warnings.warn(
"Function 'get_comments_from_workfile_paths' is deprecated. Please"
" use 'parse_dynamic_data_from_workfiles' instead.",
DeprecationWarning,
stacklevel=2,
)
current_comment = ""
if not filepaths:
return [], current_comment
matcher = CommentMatcher(extensions, file_template, template_data)
comment_hints = set()
for filepath in filepaths:
filename = os.path.basename(filepath)
comment = matcher.parse_comment(filename)
if comment:
comment_hints.add(comment)
if filename == current_filename:
current_comment = comment
return list(comment_hints), current_comment

View file

@ -1,5 +1,30 @@
from ayon_core.lib import filter_profiles
from __future__ import annotations
import os
import platform
import uuid
import typing
from typing import Optional, Any
import ayon_api
from ayon_api.operations import OperationsSession
from ayon_core.lib import filter_profiles, get_ayon_username
from ayon_core.settings import get_project_settings
from ayon_core.host.interfaces import (
SaveWorkfileOptionalData,
ListWorkfilesOptionalData,
CopyWorkfileOptionalData,
)
from ayon_core.pipeline.version_start import get_versioning_start
from ayon_core.pipeline.template_data import get_template_data
from .path_resolving import (
get_workdir,
get_workfile_template_key,
)
if typing.TYPE_CHECKING:
from ayon_core.pipeline import Anatomy
class MissingWorkdirError(Exception):
@ -7,14 +32,61 @@ class MissingWorkdirError(Exception):
pass
def get_workfiles_info(
workfile_path: str,
project_name: str,
task_id: str,
*,
anatomy: Optional["Anatomy"] = None,
workfile_entities: Optional[list[dict[str, Any]]] = None,
) -> Optional[dict[str, Any]]:
"""Find workfile info entity for a workfile path.
Args:
workfile_path (str): Workfile path.
project_name (str): The name of the project.
task_id (str): Task id under which is workfile created.
anatomy (Optional[Anatomy]): Project anatomy used to get roots.
workfile_entities (Optional[list[dict[str, Any]]]): Pre-fetched
workfile entities related to the task.
Returns:
Optional[dict[str, Any]]: Workfile info entity if found, otherwise
`None`.
"""
if anatomy is None:
anatomy = Anatomy(project_name)
if workfile_entities is None:
workfile_entities = list(ayon_api.get_workfiles_info(
project_name,
task_ids=[task_id],
))
if platform.system().lower() == "windows":
workfile_path = workfile_path.replace("\\", "/")
workfile_path = workfile_path.lower()
for workfile_entity in workfile_entities:
path = workfile_entity["path"]
filled_path = anatomy.fill_root(path)
if platform.system().lower() == "windows":
filled_path = filled_path.replace("\\", "/")
filled_path = filled_path.lower()
if filled_path == workfile_path:
return workfile_entity
return None
def should_use_last_workfile_on_launch(
project_name,
host_name,
task_name,
task_type,
default_output=False,
project_settings=None,
):
project_name: str,
host_name: str,
task_name: str,
task_type: str,
default_output: bool = False,
project_settings: Optional[dict[str, Any]] = None,
) -> bool:
"""Define if host should start last version workfile if possible.
Default output is `False`. Can be overridden with environment variable
@ -124,3 +196,608 @@ def should_open_workfiles_tool_on_launch(
if output is None:
return default_output
return output
def save_workfile_info(
project_name: str,
task_id: str,
rootless_path: str,
host_name: str,
version: Optional[int] = None,
comment: Optional[str] = None,
description: Optional[str] = None,
username: Optional[str] = None,
workfile_entities: Optional[list[dict[str, Any]]] = None,
) -> dict[str, Any]:
"""Save workfile info entity for a workfile path.
Args:
project_name (str): The name of the project.
task_id (str): Task id under which is workfile created.
rootless_path (str): Rootless path of the workfile.
host_name (str): Name of host which is saving the workfile.
version (Optional[int]): Workfile version.
comment (Optional[str]): Workfile comment.
description (Optional[str]): Workfile description.
username (Optional[str]): Username of user who saves the workfile.
If not provided, current user is used.
workfile_entities (Optional[list[dict[str, Any]]]): Pre-fetched
workfile entities related to task.
Returns:
dict[str, Any]: Workfile info entity.
"""
if workfile_entities is None:
workfile_entities = list(ayon_api.get_workfiles_info(
project_name,
task_ids=[task_id],
))
workfile_entity = next(
(
_ent
for _ent in workfile_entities
if _ent["path"] == rootless_path
),
None
)
if username is None:
username = get_ayon_username()
if not workfile_entity:
return _create_workfile_info_entity(
project_name,
task_id,
host_name,
rootless_path,
username,
version,
comment,
description,
)
data = {
key: value
for key, value in (
("host_name", host_name),
("version", version),
("comment", comment),
)
if value is not None
}
old_data = workfile_entity["data"]
changed_data = {}
for key, value in data.items():
if key not in old_data or old_data[key] != value:
changed_data[key] = value
update_data = {}
if changed_data:
update_data["data"] = changed_data
old_description = workfile_entity["attrib"].get("description")
if description is not None and old_description != description:
update_data["attrib"] = {"description": description}
workfile_entity["attrib"]["description"] = description
# Automatically fix 'createdBy' and 'updatedBy' fields
# NOTE both fields were not automatically filled by server
# until 1.1.3 release.
if workfile_entity.get("createdBy") is None:
update_data["createdBy"] = username
workfile_entity["createdBy"] = username
if workfile_entity.get("updatedBy") != username:
update_data["updatedBy"] = username
workfile_entity["updatedBy"] = username
if not update_data:
return workfile_entity
session = OperationsSession()
session.update_entity(
project_name,
"workfile",
workfile_entity["id"],
update_data,
)
session.commit()
return workfile_entity
def save_current_workfile_to(
workfile_path: str,
folder_path: str,
task_name: str,
*,
version: Optional[int] = None,
comment: Optional[str] = None,
description: Optional[str] = None,
prepared_data: Optional[SaveWorkfileOptionalData] = None,
) -> None:
"""Save current workfile to new location or context.
Args:
workfile_path (str): Destination workfile path.
folder_path (str): Target folder path.
task_name (str): Target task name.
version (Optional[int]): Workfile version.
comment (optional[str]): Workfile comment.
description (Optional[str]): Workfile description.
prepared_data (Optional[SaveWorkfileOptionalData]): Prepared data
for speed enhancements.
"""
from ayon_core.pipeline.context_tools import registered_host
host = registered_host()
context = host.get_current_context()
project_name = context["project_name"]
folder_entity = ayon_api.get_folder_by_path(
project_name, folder_path
)
task_entity = ayon_api.get_task_by_name(
project_name, folder_entity["id"], task_name
)
host.save_workfile_with_context(
workfile_path,
folder_entity,
task_entity,
version=version,
comment=comment,
description=description,
prepared_data=prepared_data,
)
def save_workfile_with_current_context(
workfile_path: str,
*,
version: Optional[int] = None,
comment: Optional[str] = None,
description: Optional[str] = None,
prepared_data: Optional[SaveWorkfileOptionalData] = None,
) -> None:
"""Save current workfile to new location using current context.
Helper function to save workfile using current context. Calls
'save_current_workfile_to' at the end.
Args:
workfile_path (str): Destination workfile path.
version (Optional[int]): Workfile version.
comment (optional[str]): Workfile comment.
description (Optional[str]): Workfile description.
prepared_data (Optional[SaveWorkfileOptionalData]): Prepared data
for speed enhancements.
"""
from ayon_core.pipeline.context_tools import registered_host
host = registered_host()
context = host.get_current_context()
project_name = context["project_name"]
folder_path = context["folder_path"]
task_name = context["task_name"]
folder_entity = task_entity = None
if folder_path:
folder_entity = ayon_api.get_folder_by_path(project_name, folder_path)
if folder_entity and task_name:
task_entity = ayon_api.get_task_by_name(
project_name, folder_entity["id"], task_name
)
host.save_workfile_with_context(
workfile_path,
folder_entity,
task_entity,
version=version,
comment=comment,
description=description,
prepared_data=prepared_data,
)
def save_next_version(
version: Optional[int] = None,
comment: Optional[str] = None,
description: Optional[str] = None,
*,
prepared_data: Optional[SaveWorkfileOptionalData] = None,
) -> None:
"""Save workfile using current context, version and comment.
Helper function to save a workfile using the current context. Last
workfile version + 1 is used if is not passed in.
Args:
version (Optional[int]): Workfile version that will be used. Last
version + 1 is used if is not passed in.
comment (optional[str]): Workfile comment. Pass '""' to clear comment.
The current workfile comment is used if it is not passed.
description (Optional[str]): Workfile description.
prepared_data (Optional[SaveWorkfileOptionalData]): Prepared data
for speed enhancements.
"""
from ayon_core.pipeline import Anatomy
from ayon_core.pipeline.context_tools import registered_host
host = registered_host()
current_path = host.get_current_workfile()
if not current_path:
current_path = None
else:
current_path = os.path.normpath(current_path)
context = host.get_current_context()
project_name = context["project_name"]
folder_path = context["folder_path"]
task_name = context["task_name"]
if prepared_data is None:
prepared_data = SaveWorkfileOptionalData()
project_entity = prepared_data.project_entity
anatomy = prepared_data.anatomy
project_settings = prepared_data.project_settings
if project_entity is None:
project_entity = ayon_api.get_project(project_name)
prepared_data.project_entity = project_entity
if project_settings is None:
project_settings = get_project_settings(project_name)
prepared_data.project_settings = project_settings
if anatomy is None:
anatomy = Anatomy(project_name, project_entity=project_entity)
prepared_data.anatomy = anatomy
folder_entity = ayon_api.get_folder_by_path(project_name, folder_path)
task_entity = ayon_api.get_task_by_name(
project_name, folder_entity["id"], task_name
)
template_key = get_workfile_template_key(
project_name,
task_entity["taskType"],
host.name,
project_settings=project_settings
)
file_template = anatomy.get_template_item("work", template_key, "file")
template_data = get_template_data(
project_entity,
folder_entity,
task_entity,
host.name,
project_settings,
)
workdir = get_workdir(
project_entity,
folder_entity,
task_entity,
host.name,
anatomy=anatomy,
template_key=template_key,
project_settings=project_settings,
)
rootless_dir = workdir.rootless
last_workfile = None
current_workfile = None
if version is None or comment is None:
workfiles = host.list_workfiles(
project_name, folder_entity, task_entity,
prepared_data=ListWorkfilesOptionalData(
project_entity=project_entity,
anatomy=anatomy,
project_settings=project_settings,
template_key=template_key,
)
)
for workfile in workfiles:
if current_workfile is None and workfile.filepath == current_path:
current_workfile = workfile
if workfile.version is None:
continue
if (
last_workfile is None
or last_workfile.version < workfile.version
):
last_workfile = workfile
if version is None and last_workfile is not None:
version = last_workfile.version + 1
if version is None:
version = get_versioning_start(
project_name,
host.name,
task_name=task_entity["name"],
task_type=task_entity["taskType"],
product_type="workfile"
)
# Re-use comment from the current workfile if is not passed in
if comment is None and current_workfile is not None:
comment = current_workfile.comment
template_data["version"] = version
if comment:
template_data["comment"] = comment
# Resolve extension
# - Don't fill any if the host does not have defined any -> e.g. if host
# uses directory instead of a file.
# 1. Use the current file extension.
# 2. Use the last known workfile extension.
# 3. Use the first extensions from 'get_workfile_extensions'.
ext = None
workfile_extensions = host.get_workfile_extensions()
if workfile_extensions:
if current_path:
ext = os.path.splitext(current_path)[1]
elif last_workfile is not None:
ext = os.path.splitext(last_workfile.filepath)[1]
else:
ext = next(iter(workfile_extensions))
ext = ext.lstrip(".")
if ext:
template_data["ext"] = ext
filename = file_template.format_strict(template_data)
workfile_path = os.path.join(workdir, filename)
rootless_path = f"{rootless_dir}/{filename}"
if platform.system().lower() == "windows":
rootless_path = rootless_path.replace("\\", "/")
prepared_data.rootless_path = rootless_path
host.save_workfile_with_context(
workfile_path,
folder_entity,
task_entity,
version=version,
comment=comment,
description=description,
prepared_data=prepared_data,
)
def copy_workfile_to_context(
src_workfile_path: str,
folder_entity: dict[str, Any],
task_entity: dict[str, Any],
*,
version: Optional[int] = None,
comment: Optional[str] = None,
description: Optional[str] = None,
open_workfile: bool = True,
prepared_data: Optional[CopyWorkfileOptionalData] = None,
) -> None:
"""Copy workfile to a context.
Copy workfile to a specified folder and task. Destination path is
calculated based on passed information.
Args:
src_workfile_path (str): Source workfile path.
folder_entity (dict[str, Any]): Target folder entity.
task_entity (dict[str, Any]): Target task entity.
version (Optional[int]): Workfile version. Use next version if not
passed.
comment (optional[str]): Workfile comment.
description (Optional[str]): Workfile description.
prepared_data (Optional[CopyWorkfileOptionalData]): Prepared data
for speed enhancements. Rootless path is calculated in this
function.
"""
from ayon_core.pipeline import Anatomy
from ayon_core.pipeline.context_tools import registered_host
host = registered_host()
project_name = host.get_current_project_name()
anatomy = prepared_data.anatomy
if anatomy is None:
if prepared_data.project_entity is None:
prepared_data.project_entity = ayon_api.get_project(
project_name
)
anatomy = Anatomy(
project_name, project_entity=prepared_data.project_entity
)
prepared_data.anatomy = anatomy
project_settings = prepared_data.project_settings
if project_settings is None:
project_settings = get_project_settings(project_name)
prepared_data.project_settings = project_settings
if version is None:
list_prepared_data = None
if prepared_data is not None:
list_prepared_data = ListWorkfilesOptionalData(
project_entity=prepared_data.project_entity,
anatomy=prepared_data.anatomy,
project_settings=prepared_data.project_settings,
workfile_entities=prepared_data.workfile_entities,
)
workfiles = host.list_workfiles(
project_name,
folder_entity,
task_entity,
prepared_data=list_prepared_data
)
if workfiles:
version = max(
workfile.version
for workfile in workfiles
) + 1
else:
version = get_versioning_start(
project_name,
host.name,
task_name=task_entity["name"],
task_type=task_entity["taskType"],
product_type="workfile"
)
task_type = task_entity["taskType"]
template_key = get_workfile_template_key(
project_name,
task_type,
host.name,
project_settings=prepared_data.project_settings
)
template_data = get_template_data(
prepared_data.project_entity,
folder_entity,
task_entity,
host.name,
prepared_data.project_settings,
)
template_data["version"] = version
if comment:
template_data["comment"] = comment
workfile_extensions = host.get_workfile_extensions()
if workfile_extensions:
ext = os.path.splitext(src_workfile_path)[1].lstrip(".")
template_data["ext"] = ext
workfile_template = anatomy.get_template_item(
"work", template_key, "path"
)
workfile_path = workfile_template.format_strict(template_data)
prepared_data.rootless_path = workfile_path.rootless
host.copy_workfile(
src_workfile_path,
workfile_path,
folder_entity,
task_entity,
version=version,
comment=comment,
description=description,
open_workfile=open_workfile,
prepared_data=prepared_data,
)
def find_workfile_rootless_path(
workfile_path: str,
project_name: str,
folder_entity: dict[str, Any],
task_entity: dict[str, Any],
host_name: str,
*,
project_entity: Optional[dict[str, Any]] = None,
project_settings: Optional[dict[str, Any]] = None,
anatomy: Optional["Anatomy"] = None,
) -> str:
"""Find rootless workfile path."""
if anatomy is None:
from ayon_core.pipeline import Anatomy
anatomy = Anatomy(project_name, project_entity=project_entity)
task_type = task_entity["taskType"]
template_key = get_workfile_template_key(
project_name,
task_type,
host_name,
project_settings=project_settings
)
dir_template = anatomy.get_template_item(
"work", template_key, "directory"
)
result = dir_template.format({"root": anatomy.roots})
used_root = result.used_values.get("root")
rootless_path = str(workfile_path)
if platform.system().lower() == "windows":
rootless_path = rootless_path.replace("\\", "/")
root_key = root_value = None
if used_root is not None:
root_key, root_value = next(iter(used_root.items()))
if platform.system().lower() == "windows":
root_value = root_value.replace("\\", "/")
if root_value and rootless_path.startswith(root_value):
rootless_path = rootless_path[len(root_value):].lstrip("/")
rootless_path = f"{{root[{root_key}]}}/{rootless_path}"
else:
success, result = anatomy.find_root_template_from_path(rootless_path)
if success:
rootless_path = result
return rootless_path
def _create_workfile_info_entity(
project_name: str,
task_id: str,
host_name: str,
rootless_path: str,
username: str,
version: Optional[int],
comment: Optional[str],
description: Optional[str],
) -> dict[str, Any]:
"""Create workfile entity data.
Args:
project_name (str): Project name.
task_id (str): Task id.
host_name (str): Host name.
rootless_path (str): Rootless workfile path.
username (str): Username.
version (Optional[int]): Workfile version.
comment (Optional[str]): Workfile comment.
description (Optional[str]): Workfile description.
Returns:
dict[str, Any]: Created workfile entity data.
"""
extension = os.path.splitext(rootless_path)[1]
attrib = {}
for key, value in (
("extension", extension),
("description", description),
):
if value is not None:
attrib[key] = value
data = {
"host_name": host_name,
"version": version,
"comment": comment,
}
workfile_info = {
"id": uuid.uuid4().hex,
"path": rootless_path,
"taskId": task_id,
"attrib": attrib,
"data": data,
# TODO remove 'createdBy' and 'updatedBy' fields when server is
# or above 1.1.3 .
"createdBy": username,
"updatedBy": username,
}
session = OperationsSession()
session.create_entity(
project_name, "workfile", workfile_info
)
session.commit()
return workfile_info

View file

@ -631,7 +631,7 @@ class AbstractTemplateBuilder(ABC):
"""Open template file with registered host."""
template_preset = self.get_template_preset()
template_path = template_preset["path"]
self.host.open_file(template_path)
self.host.open_workfile(template_path)
@abstractmethod
def import_template(self, template_path):

View file

@ -1,7 +1,9 @@
import ayon_api
import ayon_api.utils
from ayon_core.host import ILoadHost
from ayon_core.pipeline import registered_host
import pyblish.api
@ -27,16 +29,23 @@ class CollectSceneLoadedVersions(pyblish.api.ContextPlugin):
def process(self, context):
host = registered_host()
if host is None:
self.log.warn("No registered host.")
self.log.warning("No registered host.")
return
if not hasattr(host, "ls"):
host_name = host.__name__
self.log.warn("Host %r doesn't have ls() implemented." % host_name)
if not isinstance(host, ILoadHost):
host_name = host.name
self.log.warning(
f"Host {host_name} does not implement ILoadHost. "
"Skipping querying of loaded versions in scene."
)
return
containers = list(host.get_containers())
if not containers:
# Opt out early if there are no containers
self.log.debug("No loaded containers found in scene.")
return
loaded_versions = []
containers = list(host.ls())
repre_ids = {
container["representation"]
for container in containers
@ -61,6 +70,7 @@ class CollectSceneLoadedVersions(pyblish.api.ContextPlugin):
# QUESTION should we add same representation id when loaded multiple
# times?
loaded_versions = []
for con in containers:
repre_id = con["representation"]
repre_entity = repre_entities_by_id.get(repre_id)
@ -80,4 +90,5 @@ class CollectSceneLoadedVersions(pyblish.api.ContextPlugin):
}
loaded_versions.append(version)
self.log.debug(f"Collected {len(loaded_versions)} loaded versions.")
context.data["loadedVersions"] = loaded_versions

View file

@ -203,15 +203,21 @@ class ExtractReview(pyblish.api.InstancePlugin):
def _get_outputs_for_instance(self, instance):
host_name = instance.context.data["hostName"]
product_type = instance.data["productType"]
task_type = None
task_entity = instance.data.get("taskEntity")
if task_entity:
task_type = task_entity["taskType"]
self.log.debug("Host: \"{}\"".format(host_name))
self.log.debug("Product type: \"{}\"".format(product_type))
self.log.debug("Task type: \"{}\"".format(task_type))
profile = filter_profiles(
self.profiles,
{
"hosts": host_name,
"product_types": product_type,
"task_types": task_type
},
logger=self.log)
if not profile:

View file

@ -105,7 +105,7 @@ class IntegrateInputLinksAYON(pyblish.api.ContextPlugin):
created links by its type
"""
if workfile_instance is None:
self.log.warn("No workfile in this publish session.")
self.log.warning("No workfile in this publish session.")
return
workfile_version_id = workfile_instance.data["versionEntity"]["id"]

View file

@ -34,7 +34,11 @@ class ValidateProductUniqueness(pyblish.api.ContextPlugin):
for instance in context:
# Ignore disabled instances
if not instance.data.get('publish', True):
if not instance.data.get("publish", True):
continue
# Ignore instances not marked to integrate
if not instance.data.get("integrate", True):
continue
# Ignore instance without folder data

View file

@ -4,6 +4,7 @@ import logging
import collections
import copy
import time
import warnings
from urllib.parse import urlencode
import ayon_api
@ -224,17 +225,22 @@ def get_project_environments(project_name, project_settings=None):
def get_current_project_settings():
"""Project settings for current context project.
"""DEPRECATE Project settings for current context project.
Function requires access to pipeline context which is in
'ayon_core.pipeline'.
Returns:
dict[str, Any]: Project settings for current context project.
Project name should be stored in environment variable `AYON_PROJECT_NAME`.
This function should be used only in host context where environment
variable must be set and should not happen that any part of process will
change the value of the environment variable.
"""
project_name = os.environ.get("AYON_PROJECT_NAME")
if not project_name:
raise ValueError(
"Missing context project in environment"
" variable `AYON_PROJECT_NAME`."
)
return get_project_settings(project_name)
warnings.warn(
"Used deprecated function 'get_current_project_settings' in"
" 'ayon_core.settings'. The function was moved to"
" 'ayon_core.pipeline.context_tools'.",
DeprecationWarning,
stacklevel=2
)
from ayon_core.pipeline.context_tools import get_current_project_settings
return get_current_project_settings()

View file

@ -399,7 +399,11 @@ class ActionsModel:
return cache.get_data()
try:
response = ayon_api.post("actions/list", **request_data)
# 'variant' query is supported since AYON backend 1.10.4
query = urlencode({"variant": self._variant})
response = ayon_api.post(
f"actions/list?{query}", **request_data
)
response.raise_for_status()
except Exception:
self.log.warning("Failed to collect webactions.", exc_info=True)

View file

@ -4,76 +4,6 @@ from abc import ABC, abstractmethod
from ayon_core.style import get_default_entity_icon_color
class WorkfileInfo:
"""Information about workarea file with possible additional from database.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
filepath (str): Filepath.
filesize (int): File size.
creation_time (float): Creation time (timestamp).
modification_time (float): Modification time (timestamp).
created_by (Union[str, none]): User who created the file.
updated_by (Union[str, none]): User who last updated the file.
note (str): Note.
"""
def __init__(
self,
folder_id,
task_id,
filepath,
filesize,
creation_time,
modification_time,
created_by,
updated_by,
note,
):
self.folder_id = folder_id
self.task_id = task_id
self.filepath = filepath
self.filesize = filesize
self.creation_time = creation_time
self.modification_time = modification_time
self.created_by = created_by
self.updated_by = updated_by
self.note = note
def to_data(self):
"""Converts WorkfileInfo item to data.
Returns:
dict[str, Any]: Folder item data.
"""
return {
"folder_id": self.folder_id,
"task_id": self.task_id,
"filepath": self.filepath,
"filesize": self.filesize,
"creation_time": self.creation_time,
"modification_time": self.modification_time,
"created_by": self.created_by,
"updated_by": self.updated_by,
"note": self.note,
}
@classmethod
def from_data(cls, data):
"""Re-creates WorkfileInfo item from data.
Args:
data (dict[str, Any]): Workfile info item data.
Returns:
WorkfileInfo: Workfile info item.
"""
return cls(**data)
class FolderItem:
"""Item representing folder entity on a server.
@ -87,8 +17,8 @@ class FolderItem:
label (str): Folder label.
icon_name (str): Name of icon from font awesome.
icon_color (str): Hex color string that will be used for icon.
"""
"""
def __init__(
self, entity_id, parent_id, name, label, icon_name, icon_color
):
@ -104,8 +34,8 @@ class FolderItem:
Returns:
dict[str, Any]: Folder item data.
"""
"""
return {
"entity_id": self.entity_id,
"parent_id": self.parent_id,
@ -124,8 +54,8 @@ class FolderItem:
Returns:
FolderItem: Folder item.
"""
"""
return cls(**data)
@ -144,8 +74,8 @@ class TaskItem:
parent_id (str): Parent folder id.
icon_name (str): Name of icon from font awesome.
icon_color (str): Hex color string that will be used for icon.
"""
"""
def __init__(
self, task_id, name, task_type, parent_id, icon_name, icon_color
):
@ -163,8 +93,8 @@ class TaskItem:
Returns:
str: Task id.
"""
"""
return self.task_id
@property
@ -173,8 +103,8 @@ class TaskItem:
Returns:
str: Label of task item.
"""
"""
if self._label is None:
self._label = "{} ({})".format(self.name, self.task_type)
return self._label
@ -184,8 +114,8 @@ class TaskItem:
Returns:
dict[str, Any]: Task item data.
"""
"""
return {
"task_id": self.task_id,
"name": self.name,
@ -204,116 +134,11 @@ class TaskItem:
Returns:
TaskItem: Task item.
"""
"""
return cls(**data)
class FileItem:
"""File item that represents a file.
Can be used for both Workarea and Published workfile. Workarea file
will always exist on disk which is not the case for Published workfile.
Args:
dirpath (str): Directory path of file.
filename (str): Filename.
modified (float): Modified timestamp.
created_by (Optional[str]): Username.
representation_id (Optional[str]): Representation id of published
workfile.
filepath (Optional[str]): Prepared filepath.
exists (Optional[bool]): If file exists on disk.
"""
def __init__(
self,
dirpath,
filename,
modified,
created_by=None,
updated_by=None,
representation_id=None,
filepath=None,
exists=None
):
self.filename = filename
self.dirpath = dirpath
self.modified = modified
self.created_by = created_by
self.updated_by = updated_by
self.representation_id = representation_id
self._filepath = filepath
self._exists = exists
@property
def filepath(self):
"""Filepath of file.
Returns:
str: Full path to a file.
"""
if self._filepath is None:
self._filepath = os.path.join(self.dirpath, self.filename)
return self._filepath
@property
def exists(self):
"""File is available.
Returns:
bool: If file exists on disk.
"""
if self._exists is None:
self._exists = os.path.exists(self.filepath)
return self._exists
def to_data(self):
"""Converts file item to data.
Returns:
dict[str, Any]: File item data.
"""
return {
"filename": self.filename,
"dirpath": self.dirpath,
"modified": self.modified,
"created_by": self.created_by,
"representation_id": self.representation_id,
"filepath": self.filepath,
"exists": self.exists,
}
@classmethod
def from_data(cls, data):
"""Re-creates file item from data.
Args:
data (dict[str, Any]): File item data.
Returns:
FileItem: File item.
"""
required_keys = {
"filename",
"dirpath",
"modified",
"representation_id"
}
missing_keys = required_keys - set(data.keys())
if missing_keys:
raise KeyError("Missing keys: {}".format(missing_keys))
return cls(**{
key: data[key]
for key in required_keys
})
class WorkareaFilepathResult:
"""Result of workarea file formatting.
@ -323,8 +148,8 @@ class WorkareaFilepathResult:
exists (bool): True if file exists.
filepath (str): Filepath. If not provided it will be constructed
from root and filename.
"""
"""
def __init__(self, root, filename, exists, filepath=None):
if not filepath and root and filename:
filepath = os.path.join(root, filename)
@ -341,8 +166,8 @@ class AbstractWorkfilesCommon(ABC):
Returns:
bool: True if host is valid.
"""
"""
pass
@abstractmethod
@ -353,8 +178,8 @@ class AbstractWorkfilesCommon(ABC):
Returns:
Iterable[str]: List of extensions.
"""
"""
pass
@abstractmethod
@ -363,8 +188,8 @@ class AbstractWorkfilesCommon(ABC):
Returns:
bool: True if save is enabled.
"""
"""
pass
@abstractmethod
@ -373,8 +198,8 @@ class AbstractWorkfilesCommon(ABC):
Args:
enabled (bool): Enable save workfile when True.
"""
"""
pass
@ -386,6 +211,7 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
str: Name of host.
"""
pass
@ -395,8 +221,8 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
str: Name of project.
"""
"""
pass
@abstractmethod
@ -406,8 +232,8 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
Union[str, None]: Folder id or None if host does not have
any context.
"""
"""
pass
@abstractmethod
@ -417,8 +243,8 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
Union[str, None]: Task name or None if host does not have
any context.
"""
"""
pass
@abstractmethod
@ -428,8 +254,8 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
Union[str, None]: Path to workfile or None if host does
not have opened specific file.
"""
"""
pass
@property
@ -439,8 +265,8 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
Anatomy: Project anatomy.
"""
"""
pass
@property
@ -450,8 +276,8 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
dict[str, Any]: Project settings.
"""
"""
pass
@abstractmethod
@ -463,8 +289,8 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
dict[str, Any]: Project entity data.
"""
"""
pass
@abstractmethod
@ -477,8 +303,8 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
dict[str, Any]: Folder entity data.
"""
"""
pass
@abstractmethod
@ -491,10 +317,24 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
Returns:
dict[str, Any]: Task entity data.
"""
"""
pass
@abstractmethod
def get_workfile_entities(self, task_id: str):
"""Workfile entities for given task.
Args:
task_id (str): Task id.
Returns:
list[dict[str, Any]]: List of workfile entities.
"""
pass
@abstractmethod
def emit_event(self, topic, data=None, source=None):
"""Emit event.
@ -502,8 +342,8 @@ class AbstractWorkfilesBackend(AbstractWorkfilesCommon):
topic (str): Event topic used for callbacks filtering.
data (Optional[dict[str, Any]]): Event data.
source (Optional[str]): Event source.
"""
"""
pass
@ -530,8 +370,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
topic (str): Name of topic.
callback (Callable): Callback that will be called when event
is triggered.
"""
"""
pass
@abstractmethod
@ -592,8 +432,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
List[str]: File extensions that can be used as workfile for
current host.
"""
"""
pass
# Selection information
@ -603,8 +443,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
Union[str, None]: Folder id or None if no folder is selected.
"""
"""
pass
@abstractmethod
@ -616,8 +456,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Args:
folder_id (Union[str, None]): Folder id or None if no folder
is selected.
"""
"""
pass
@abstractmethod
@ -626,8 +466,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
Union[str, None]: Task id or None if no folder is selected.
"""
"""
pass
@abstractmethod
@ -649,8 +489,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
is selected.
task_name (Union[str, None]): Task name or None if no task
is selected.
"""
"""
pass
@abstractmethod
@ -659,18 +499,22 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
Union[str, None]: Selected workfile path.
"""
"""
pass
@abstractmethod
def set_selected_workfile_path(self, path):
def set_selected_workfile_path(
self, rootless_path, path, workfile_entity_id
):
"""Change selected workfile path.
Args:
rootless_path (Union[str, None]): Selected workfile rootless path.
path (Union[str, None]): Selected workfile path.
"""
workfile_entity_id (Union[str, None]): Workfile entity id.
"""
pass
@abstractmethod
@ -680,8 +524,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
Union[str, None]: Representation id or None if no representation
is selected.
"""
"""
pass
@abstractmethod
@ -691,8 +535,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Args:
representation_id (Union[str, None]): Selected workfile
representation id.
"""
"""
pass
def get_selected_context(self):
@ -700,8 +544,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
dict[str, Union[str, None]]: Selected context.
"""
"""
return {
"folder_id": self.get_selected_folder_id(),
"task_id": self.get_selected_task_id(),
@ -737,8 +581,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
files UI element.
representation_id (Optional[str]): Representation id. Used for
published filed UI element.
"""
"""
pass
@abstractmethod
@ -750,8 +594,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
dict[str, Any]: Expected selection data.
"""
"""
pass
@abstractmethod
@ -760,8 +604,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Args:
folder_id (str): Folder id which was selected.
"""
"""
pass
@abstractmethod
@ -771,8 +615,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Args:
folder_id (str): Folder id under which task is.
task_name (str): Task name which was selected.
"""
"""
pass
@abstractmethod
@ -785,8 +629,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
folder_id (str): Folder id under which representation is.
task_name (str): Task name under which representation is.
representation_id (str): Representation id which was selected.
"""
"""
pass
@abstractmethod
@ -797,8 +641,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
folder_id (str): Folder id under which workfile is.
task_name (str): Task name under which workfile is.
workfile_name (str): Workfile filename which was selected.
"""
"""
pass
@abstractmethod
@ -823,8 +667,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
list[FolderItem]: Minimum possible information needed
for visualisation of folder hierarchy.
"""
"""
pass
@abstractmethod
@ -843,8 +687,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
list[TaskItem]: Minimum possible information needed
for visualisation of tasks.
"""
"""
pass
@abstractmethod
@ -853,8 +697,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
bool: Has unsaved changes.
"""
"""
pass
@abstractmethod
@ -867,8 +711,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
str: Workarea directory.
"""
"""
pass
@abstractmethod
@ -881,9 +725,9 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
sender (Optional[str]): Who requested workarea file items.
Returns:
list[FileItem]: List of workarea file items.
"""
list[WorkfileInfo]: List of workarea file items.
"""
pass
@abstractmethod
@ -899,8 +743,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
dict[str, Any]: Data for Save As operation.
"""
"""
pass
@abstractmethod
@ -925,12 +769,12 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Returns:
WorkareaFilepathResult: Result of the operation.
"""
"""
pass
@abstractmethod
def get_published_file_items(self, folder_id, task_id):
def get_published_file_items(self, folder_id: str, task_id: str):
"""Get published file items.
Args:
@ -938,44 +782,52 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
task_id (Union[str, None]): Task id.
Returns:
list[FileItem]: List of published file items.
"""
list[PublishedWorkfileInfo]: List of published file items.
"""
pass
@abstractmethod
def get_workfile_info(self, folder_id, task_name, filepath):
def get_workfile_info(self, folder_id, task_id, rootless_path):
"""Workfile info from database.
Args:
folder_id (str): Folder id.
task_name (str): Task id.
filepath (str): Workfile path.
task_id (str): Task id.
rootless_path (str): Workfile path.
Returns:
Union[WorkfileInfo, None]: Workfile info or None if was passed
Optional[WorkfileInfo]: Workfile info or None if was passed
invalid context.
"""
"""
pass
@abstractmethod
def save_workfile_info(self, folder_id, task_name, filepath, note):
def save_workfile_info(
self,
task_id,
rootless_path,
version=None,
comment=None,
description=None,
):
"""Save workfile info to database.
At this moment the only information which can be saved about
workfile is 'note'.
workfile is 'description'.
When 'note' is 'None' it is only validated if workfile info exists,
and if not then creates one with empty note.
If value of 'version', 'comment' or 'description' is 'None' it is not
added/updated to entity.
Args:
folder_id (str): Folder id.
task_name (str): Task id.
filepath (str): Workfile path.
note (Union[str, None]): Note.
"""
task_id (str): Task id.
rootless_path (str): Rootless workfile path.
version (Optional[int]): Version of workfile.
comment (Optional[str]): User's comment (subversion).
description (Optional[str]): Workfile description.
"""
pass
# General commands
@ -985,8 +837,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
Triggers 'controller.reset.started' event at the beginning and
'controller.reset.finished' at the end.
"""
"""
pass
# Controller actions
@ -998,8 +850,8 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
folder_id (str): Folder id.
task_id (str): Task id.
filepath (str): Workfile path.
"""
"""
pass
@abstractmethod
@ -1013,22 +865,27 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
self,
folder_id,
task_id,
rootless_workdir,
workdir,
filename,
template_key,
artist_note,
version,
comment,
description,
):
"""Save current state of workfile to workarea.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
workdir (str): Workarea directory.
rootless_workdir (str): Workarea directory.
filename (str): Workarea filename.
template_key (str): Template key used to get the workdir
and filename.
"""
version (Optional[int]): Version of workfile.
comment (Optional[str]): User's comment (subversion).
description (Optional[str]): Workfile description.
"""
pass
@abstractmethod
@ -1040,8 +897,10 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
task_id,
workdir,
filename,
template_key,
artist_note,
rootless_workdir,
version,
comment,
description,
):
"""Action to copy published workfile representation to workarea.
@ -1055,23 +914,40 @@ class AbstractWorkfilesFrontend(AbstractWorkfilesCommon):
task_id (str): Task id.
workdir (str): Workarea directory.
filename (str): Workarea filename.
template_key (str): Template key.
artist_note (str): Artist note.
"""
rootless_workdir (str): Rootless workdir.
version (int): Workfile version.
comment (str): User's comment (subversion).
description (str): Description note.
"""
pass
@abstractmethod
def duplicate_workfile(self, src_filepath, workdir, filename, artist_note):
def duplicate_workfile(
self,
folder_id,
task_id,
src_filepath,
rootless_workdir,
workdir,
filename,
description,
version,
comment
):
"""Duplicate workfile.
Workfiles is not opened when done.
Args:
folder_id (str): Folder id.
task_id (str): Task id.
src_filepath (str): Source workfile path.
rootless_workdir (str): Rootless workdir.
workdir (str): Destination workdir.
filename (str): Destination filename.
artist_note (str): Artist note.
version (int): Workfile version.
comment (str): User's comment (subversion).
description (str): Workfile description.
"""
pass

View file

@ -1,19 +1,13 @@
import os
import shutil
import ayon_api
from ayon_core.host import IWorkfileHost
from ayon_core.lib import Logger, emit_event
from ayon_core.lib import Logger
from ayon_core.lib.events import QueuedEventSystem
from ayon_core.settings import get_project_settings
from ayon_core.pipeline import Anatomy, registered_host
from ayon_core.pipeline.context_tools import (
change_current_context,
get_current_host_name,
get_global_context,
)
from ayon_core.pipeline.workfile import create_workdir_extra_folders
from ayon_core.pipeline.context_tools import get_global_context
from ayon_core.tools.common_models import (
HierarchyModel,
@ -140,12 +134,7 @@ class BaseWorkfileController(
if host is None:
host = registered_host()
host_is_valid = False
if host is not None:
missing_methods = (
IWorkfileHost.get_missing_workfile_methods(host)
)
host_is_valid = len(missing_methods) == 0
host_is_valid = isinstance(host, IWorkfileHost)
self._host = host
self._host_is_valid = host_is_valid
@ -182,7 +171,7 @@ class BaseWorkfileController(
return UsersModel(self)
def _create_workfiles_model(self):
return WorkfilesModel(self)
return WorkfilesModel(self._host, self)
def _create_expected_selection_obj(self):
return WorkfilesToolExpectedSelection(self)
@ -293,28 +282,14 @@ class BaseWorkfileController(
# Host information
def get_workfile_extensions(self):
host = self._host
if isinstance(host, IWorkfileHost):
return host.get_workfile_extensions()
return host.file_extensions()
return self._host.get_workfile_extensions()
def has_unsaved_changes(self):
host = self._host
if isinstance(host, IWorkfileHost):
return host.workfile_has_unsaved_changes()
return host.has_unsaved_changes()
return self._host.workfile_has_unsaved_changes()
# Current context
def get_host_name(self):
host = self._host
if isinstance(host, IWorkfileHost):
return host.name
return get_current_host_name()
def _get_host_current_context(self):
if hasattr(self._host, "get_current_context"):
return self._host.get_current_context()
return get_global_context()
return self._host.name
def get_current_project_name(self):
return self._current_project_name
@ -326,10 +301,7 @@ class BaseWorkfileController(
return self._current_task_name
def get_current_workfile(self):
host = self._host
if isinstance(host, IWorkfileHost):
return host.get_current_workfile()
return host.current_file()
return self._workfiles_model.get_current_workfile()
# Selection information
def get_selected_folder_id(self):
@ -350,8 +322,12 @@ class BaseWorkfileController(
def get_selected_workfile_path(self):
return self._selection_model.get_selected_workfile_path()
def set_selected_workfile_path(self, path):
self._selection_model.set_selected_workfile_path(path)
def set_selected_workfile_path(
self, rootless_path, path, workfile_entity_id
):
self._selection_model.set_selected_workfile_path(
rootless_path, path, workfile_entity_id
)
def get_selected_representation_id(self):
return self._selection_model.get_selected_representation_id()
@ -424,7 +400,7 @@ class BaseWorkfileController(
def get_workarea_file_items(self, folder_id, task_name, sender=None):
task_id = self._get_task_id(folder_id, task_name)
return self._workfiles_model.get_workarea_file_items(
folder_id, task_id, task_name
folder_id, task_id
)
def get_workarea_save_as_data(self, folder_id, task_id):
@ -450,28 +426,34 @@ class BaseWorkfileController(
)
def get_published_file_items(self, folder_id, task_id):
task_name = None
if task_id:
task = self.get_task_entity(
self.get_current_project_name(), task_id
)
task_name = task.get("name")
return self._workfiles_model.get_published_file_items(
folder_id, task_name)
folder_id, task_id
)
def get_workfile_info(self, folder_id, task_name, filepath):
task_id = self._get_task_id(folder_id, task_name)
def get_workfile_info(self, folder_id, task_id, rootless_path):
return self._workfiles_model.get_workfile_info(
folder_id, task_id, filepath
folder_id, task_id, rootless_path
)
def save_workfile_info(self, folder_id, task_name, filepath, note):
task_id = self._get_task_id(folder_id, task_name)
def save_workfile_info(
self,
task_id,
rootless_path,
version=None,
comment=None,
description=None,
):
self._workfiles_model.save_workfile_info(
folder_id, task_id, filepath, note
task_id,
rootless_path,
version,
comment,
description,
)
def get_workfile_entities(self, task_id):
return self._workfiles_model.get_workfile_entities(task_id)
def reset(self):
if not self._host_is_valid:
self._emit_event("controller.reset.started")
@ -509,6 +491,7 @@ class BaseWorkfileController(
self._projects_model.reset()
self._hierarchy_model.reset()
self._workfiles_model.reset()
if not expected_folder_id:
expected_folder_id = folder_id
@ -528,53 +511,31 @@ class BaseWorkfileController(
# Controller actions
def open_workfile(self, folder_id, task_id, filepath):
self._emit_event("open_workfile.started")
failed = False
try:
self._open_workfile(folder_id, task_id, filepath)
except Exception:
failed = True
self.log.warning("Open of workfile failed", exc_info=True)
self._emit_event(
"open_workfile.finished",
{"failed": failed},
)
self._workfiles_model.open_workfile(folder_id, task_id, filepath)
def save_current_workfile(self):
current_file = self.get_current_workfile()
self._host_save_workfile(current_file)
self._workfiles_model.save_current_workfile()
def save_as_workfile(
self,
folder_id,
task_id,
rootless_workdir,
workdir,
filename,
template_key,
artist_note,
version,
comment,
description,
):
self._emit_event("save_as.started")
failed = False
try:
self._save_as_workfile(
folder_id,
task_id,
workdir,
filename,
template_key,
artist_note=artist_note,
)
except Exception:
failed = True
self.log.warning("Save as failed", exc_info=True)
self._emit_event(
"save_as.finished",
{"failed": failed},
self._workfiles_model.save_as_workfile(
folder_id,
task_id,
rootless_workdir,
workdir,
filename,
version,
comment,
description,
)
def copy_workfile_representation(
@ -585,64 +546,48 @@ class BaseWorkfileController(
task_id,
workdir,
filename,
template_key,
artist_note,
rootless_workdir,
version,
comment,
description,
):
self._emit_event("copy_representation.started")
failed = False
try:
self._save_as_workfile(
folder_id,
task_id,
workdir,
filename,
template_key,
artist_note,
src_filepath=representation_filepath
)
except Exception:
failed = True
self.log.warning(
"Copy of workfile representation failed", exc_info=True
)
self._emit_event(
"copy_representation.finished",
{"failed": failed},
self._workfiles_model.copy_workfile_representation(
representation_id,
representation_filepath,
folder_id,
task_id,
workdir,
filename,
rootless_workdir,
version,
comment,
description,
)
def duplicate_workfile(self, src_filepath, workdir, filename, artist_note):
self._emit_event("workfile_duplicate.started")
failed = False
try:
dst_filepath = os.path.join(workdir, filename)
shutil.copy(src_filepath, dst_filepath)
except Exception:
failed = True
self.log.warning("Duplication of workfile failed", exc_info=True)
self._emit_event(
"workfile_duplicate.finished",
{"failed": failed},
def duplicate_workfile(
self,
folder_id,
task_id,
src_filepath,
rootless_workdir,
workdir,
filename,
version,
comment,
description
):
self._workfiles_model.duplicate_workfile(
folder_id,
task_id,
src_filepath,
rootless_workdir,
workdir,
filename,
version,
comment,
description,
)
# Helper host methods that resolve 'IWorkfileHost' interface
def _host_open_workfile(self, filepath):
host = self._host
if isinstance(host, IWorkfileHost):
host.open_workfile(filepath)
else:
host.open_file(filepath)
def _host_save_workfile(self, filepath):
host = self._host
if isinstance(host, IWorkfileHost):
host.save_workfile(filepath)
else:
host.save_file(filepath)
def _emit_event(self, topic, data=None):
self.emit_event(topic, data, "controller")
@ -657,6 +602,11 @@ class BaseWorkfileController(
return None
return task_item.id
def _get_host_current_context(self):
if hasattr(self._host, "get_current_context"):
return self._host.get_current_context()
return get_global_context()
# Expected selection
# - expected selection is used to restore selection after refresh
# or when current context should be used
@ -665,123 +615,3 @@ class BaseWorkfileController(
"expected_selection_changed",
self._expected_selection.get_expected_selection_data(),
)
def _get_event_context_data(
self, project_name, folder_id, task_id, folder=None, task=None
):
if folder is None:
folder = self.get_folder_entity(project_name, folder_id)
if task is None:
task = self.get_task_entity(project_name, task_id)
return {
"project_name": project_name,
"folder_id": folder_id,
"folder_path": folder["path"],
"task_id": task_id,
"task_name": task["name"],
"host_name": self.get_host_name(),
}
def _open_workfile(self, folder_id, task_id, filepath):
project_name = self.get_current_project_name()
event_data = self._get_event_context_data(
project_name, folder_id, task_id
)
event_data["filepath"] = filepath
emit_event("workfile.open.before", event_data, source="workfiles.tool")
# Change context
task_name = event_data["task_name"]
if (
folder_id != self.get_current_folder_id()
or task_name != self.get_current_task_name()
):
self._change_current_context(project_name, folder_id, task_id)
self._host_open_workfile(filepath)
emit_event("workfile.open.after", event_data, source="workfiles.tool")
def _save_as_workfile(
self,
folder_id: str,
task_id: str,
workdir: str,
filename: str,
template_key: str,
artist_note: str,
src_filepath=None,
):
# Trigger before save event
project_name = self.get_current_project_name()
folder = self.get_folder_entity(project_name, folder_id)
task = self.get_task_entity(project_name, task_id)
task_name = task["name"]
# QUESTION should the data be different for 'before' and 'after'?
event_data = self._get_event_context_data(
project_name, folder_id, task_id, folder, task
)
event_data.update({
"filename": filename,
"workdir_path": workdir,
})
emit_event("workfile.save.before", event_data, source="workfiles.tool")
# Create workfiles root folder
if not os.path.exists(workdir):
self.log.debug("Initializing work directory: %s", workdir)
os.makedirs(workdir)
# Change context
if (
folder_id != self.get_current_folder_id()
or task_name != self.get_current_task_name()
):
self._change_current_context(
project_name, folder_id, task_id, template_key
)
# Save workfile
dst_filepath = os.path.join(workdir, filename)
if src_filepath:
shutil.copyfile(src_filepath, dst_filepath)
self._host_open_workfile(dst_filepath)
else:
self._host_save_workfile(dst_filepath)
# Make sure workfile info exists
if not artist_note:
artist_note = None
self.save_workfile_info(
folder_id, task_name, dst_filepath, note=artist_note
)
# Create extra folders
create_workdir_extra_folders(
workdir,
self.get_host_name(),
task["taskType"],
task_name,
project_name
)
# Trigger after save events
emit_event("workfile.save.after", event_data, source="workfiles.tool")
def _change_current_context(
self, project_name, folder_id, task_id, template_key=None
):
# Change current context
folder_entity = self.get_folder_entity(project_name, folder_id)
task_entity = self.get_task_entity(project_name, task_id)
change_current_context(
folder_entity,
task_entity,
template_key=template_key
)
self._current_folder_id = folder_entity["id"]
self._current_folder_path = folder_entity["path"]
self._current_task_name = task_entity["name"]

View file

@ -62,7 +62,9 @@ class SelectionModel(object):
def get_selected_workfile_path(self):
return self._workfile_path
def set_selected_workfile_path(self, path):
def set_selected_workfile_path(
self, rootless_path, path, workfile_entity_id
):
if path == self._workfile_path:
return
@ -72,9 +74,11 @@ class SelectionModel(object):
{
"project_name": self._controller.get_current_project_name(),
"path": path,
"rootless_path": rootless_path,
"folder_id": self._folder_id,
"task_name": self._task_name,
"task_id": self._task_id,
"workfile_entity_id": workfile_entity_id,
},
self.event_source
)

File diff suppressed because it is too large Load diff

View file

@ -200,6 +200,9 @@ class FilesWidget(QtWidgets.QWidget):
self._open_workfile(folder_id, task_id, path)
def _on_current_open_requests(self):
# TODO validate if item under mouse is enabled
# - this uses selected item, but that does not have to be the one
# under mouse
self._on_workarea_open_clicked()
def _on_duplicate_request(self):
@ -210,11 +213,18 @@ class FilesWidget(QtWidgets.QWidget):
result = self._exec_save_as_dialog()
if result is None:
return
folder_id = self._selected_folder_id
task_id = self._selected_task_id
self._controller.duplicate_workfile(
folder_id,
task_id,
filepath,
result["rootless_workdir"],
result["workdir"],
result["filename"],
artist_note=result["artist_note"]
version=result["version"],
comment=result["comment"],
description=result["description"]
)
def _on_workarea_browse_clicked(self):
@ -259,10 +269,12 @@ class FilesWidget(QtWidgets.QWidget):
self._controller.save_as_workfile(
result["folder_id"],
result["task_id"],
result["rootless_workdir"],
result["workdir"],
result["filename"],
result["template_key"],
artist_note=result["artist_note"]
version=result["version"],
comment=result["comment"],
description=result["description"]
)
def _on_workarea_path_changed(self, event):
@ -314,12 +326,16 @@ class FilesWidget(QtWidgets.QWidget):
result["task_id"],
result["workdir"],
result["filename"],
result["template_key"],
artist_note=result["artist_note"]
result["rootless_workdir"],
version=result["version"],
comment=result["comment"],
description=result["description"],
)
def _on_save_as_request(self):
self._on_published_save_clicked()
# Make sure the save is enabled
if self._is_save_enabled and self._valid_selected_context:
self._on_published_save_clicked()
def _set_select_contex_mode(self, enabled):
if self._select_context_mode is enabled:

View file

@ -1,3 +1,5 @@
import os
import qtawesome
from qtpy import QtWidgets, QtCore, QtGui
@ -205,24 +207,25 @@ class PublishedFilesModel(QtGui.QStandardItemModel):
new_items.append(item)
item.setColumnCount(self.columnCount())
item.setData(self._file_icon, QtCore.Qt.DecorationRole)
item.setData(file_item.filename, QtCore.Qt.DisplayRole)
item.setData(repre_id, REPRE_ID_ROLE)
if file_item.exists:
if file_item.available:
flags = QtCore.Qt.ItemIsEnabled | QtCore.Qt.ItemIsSelectable
else:
flags = QtCore.Qt.NoItemFlags
author = file_item.created_by
author = file_item.author
user_item = user_items_by_name.get(author)
if user_item is not None and user_item.full_name:
author = user_item.full_name
item.setFlags(flags)
filename = os.path.basename(file_item.filepath)
item.setFlags(flags)
item.setData(filename, QtCore.Qt.DisplayRole)
item.setData(file_item.filepath, FILEPATH_ROLE)
item.setData(author, AUTHOR_ROLE)
item.setData(file_item.modified, DATE_MODIFIED_ROLE)
item.setData(file_item.file_modified, DATE_MODIFIED_ROLE)
self._items_by_id[repre_id] = item

View file

@ -1,3 +1,5 @@
import os
import qtawesome
from qtpy import QtWidgets, QtCore, QtGui
@ -10,8 +12,10 @@ from ayon_core.tools.utils.delegates import PrettyTimeDelegate
FILENAME_ROLE = QtCore.Qt.UserRole + 1
FILEPATH_ROLE = QtCore.Qt.UserRole + 2
AUTHOR_ROLE = QtCore.Qt.UserRole + 3
DATE_MODIFIED_ROLE = QtCore.Qt.UserRole + 4
ROOTLESS_PATH_ROLE = QtCore.Qt.UserRole + 3
AUTHOR_ROLE = QtCore.Qt.UserRole + 4
DATE_MODIFIED_ROLE = QtCore.Qt.UserRole + 5
WORKFILE_ENTITY_ID_ROLE = QtCore.Qt.UserRole + 6
class WorkAreaFilesModel(QtGui.QStandardItemModel):
@ -198,7 +202,7 @@ class WorkAreaFilesModel(QtGui.QStandardItemModel):
items_to_remove = set(self._items_by_filename.keys())
new_items = []
for file_item in file_items:
filename = file_item.filename
filename = os.path.basename(file_item.filepath)
if filename in self._items_by_filename:
items_to_remove.discard(filename)
item = self._items_by_filename[filename]
@ -206,23 +210,28 @@ class WorkAreaFilesModel(QtGui.QStandardItemModel):
item = QtGui.QStandardItem()
new_items.append(item)
item.setColumnCount(self.columnCount())
item.setFlags(
QtCore.Qt.ItemIsEnabled | QtCore.Qt.ItemIsSelectable
)
item.setData(self._file_icon, QtCore.Qt.DecorationRole)
item.setData(file_item.filename, QtCore.Qt.DisplayRole)
item.setData(file_item.filename, FILENAME_ROLE)
item.setData(filename, QtCore.Qt.DisplayRole)
item.setData(filename, FILENAME_ROLE)
flags = QtCore.Qt.ItemIsSelectable
if file_item.available:
flags |= QtCore.Qt.ItemIsEnabled
item.setFlags(flags)
updated_by = file_item.updated_by
user_item = user_items_by_name.get(updated_by)
if user_item is not None and user_item.full_name:
updated_by = user_item.full_name
item.setData(
file_item.workfile_entity_id, WORKFILE_ENTITY_ID_ROLE
)
item.setData(file_item.filepath, FILEPATH_ROLE)
item.setData(file_item.rootless_path, ROOTLESS_PATH_ROLE)
item.setData(file_item.file_modified, DATE_MODIFIED_ROLE)
item.setData(updated_by, AUTHOR_ROLE)
item.setData(file_item.modified, DATE_MODIFIED_ROLE)
self._items_by_filename[file_item.filename] = item
self._items_by_filename[filename] = item
if new_items:
root_item.appendRows(new_items)
@ -354,14 +363,18 @@ class WorkAreaFilesWidget(QtWidgets.QWidget):
def _get_selected_info(self):
selection_model = self._view.selectionModel()
filepath = None
filename = None
workfile_entity_id = filename = rootless_path = filepath = None
for index in selection_model.selectedIndexes():
filepath = index.data(FILEPATH_ROLE)
rootless_path = index.data(ROOTLESS_PATH_ROLE)
filename = index.data(FILENAME_ROLE)
workfile_entity_id = index.data(WORKFILE_ENTITY_ID_ROLE)
return {
"filepath": filepath,
"rootless_path": rootless_path,
"filename": filename,
"workfile_entity_id": workfile_entity_id,
}
def get_selected_path(self):
@ -374,8 +387,12 @@ class WorkAreaFilesWidget(QtWidgets.QWidget):
return self._get_selected_info()["filepath"]
def _on_selection_change(self):
filepath = self.get_selected_path()
self._controller.set_selected_workfile_path(filepath)
info = self._get_selected_info()
self._controller.set_selected_workfile_path(
info["rootless_path"],
info["filepath"],
info["workfile_entity_id"],
)
def _on_mouse_double_click(self, event):
if event.button() == QtCore.Qt.LeftButton:
@ -430,19 +447,25 @@ class WorkAreaFilesWidget(QtWidgets.QWidget):
)
def _on_model_refresh(self):
if (
not self._change_selection_on_refresh
or self._proxy_model.rowCount() < 1
):
if not self._change_selection_on_refresh:
return
# Find the row with latest date modified
indexes = [
self._proxy_model.index(idx, 0)
for idx in range(self._proxy_model.rowCount())
]
filtered_indexes = [
index
for index in indexes
if self._proxy_model.flags(index) & QtCore.Qt.ItemIsEnabled
]
if not filtered_indexes:
return
latest_index = max(
(
self._proxy_model.index(idx, 0)
for idx in range(self._proxy_model.rowCount())
),
key=lambda model_index: model_index.data(DATE_MODIFIED_ROLE)
filtered_indexes,
key=lambda model_index: model_index.data(DATE_MODIFIED_ROLE) or 0
)
# Select row of latest modified

View file

@ -108,6 +108,7 @@ class SaveAsDialog(QtWidgets.QDialog):
self._ext_value = None
self._filename = None
self._workdir = None
self._rootless_workdir = None
self._result = None
@ -144,8 +145,8 @@ class SaveAsDialog(QtWidgets.QDialog):
version_layout.addWidget(last_version_check)
# Artist note widget
artist_note_input = PlaceholderPlainTextEdit(inputs_widget)
artist_note_input.setPlaceholderText(
description_input = PlaceholderPlainTextEdit(inputs_widget)
description_input.setPlaceholderText(
"Provide a note about this workfile.")
# Preview widget
@ -166,7 +167,7 @@ class SaveAsDialog(QtWidgets.QDialog):
subversion_label = QtWidgets.QLabel("Subversion:", inputs_widget)
extension_label = QtWidgets.QLabel("Extension:", inputs_widget)
preview_label = QtWidgets.QLabel("Preview:", inputs_widget)
artist_note_label = QtWidgets.QLabel("Artist Note:", inputs_widget)
description_label = QtWidgets.QLabel("Artist Note:", inputs_widget)
# Build inputs
inputs_layout = QtWidgets.QGridLayout(inputs_widget)
@ -178,8 +179,8 @@ class SaveAsDialog(QtWidgets.QDialog):
inputs_layout.addWidget(extension_combobox, 2, 1)
inputs_layout.addWidget(preview_label, 3, 0)
inputs_layout.addWidget(preview_widget, 3, 1)
inputs_layout.addWidget(artist_note_label, 4, 0, 1, 2)
inputs_layout.addWidget(artist_note_input, 5, 0, 1, 2)
inputs_layout.addWidget(description_label, 4, 0, 1, 2)
inputs_layout.addWidget(description_input, 5, 0, 1, 2)
# Build layout
main_layout = QtWidgets.QVBoxLayout(self)
@ -214,13 +215,13 @@ class SaveAsDialog(QtWidgets.QDialog):
self._extension_combobox = extension_combobox
self._subversion_input = subversion_input
self._preview_widget = preview_widget
self._artist_note_input = artist_note_input
self._description_input = description_input
self._version_label = version_label
self._subversion_label = subversion_label
self._extension_label = extension_label
self._preview_label = preview_label
self._artist_note_label = artist_note_label
self._description_label = description_label
# Post init setup
@ -255,6 +256,7 @@ class SaveAsDialog(QtWidgets.QDialog):
self._folder_id = folder_id
self._task_id = task_id
self._workdir = data["workdir"]
self._rootless_workdir = data["rootless_workdir"]
self._comment_value = data["comment"]
self._ext_value = data["ext"]
self._template_key = data["template_key"]
@ -329,10 +331,13 @@ class SaveAsDialog(QtWidgets.QDialog):
self._result = {
"filename": self._filename,
"workdir": self._workdir,
"rootless_workdir": self._rootless_workdir,
"folder_id": self._folder_id,
"task_id": self._task_id,
"template_key": self._template_key,
"artist_note": self._artist_note_input.toPlainText(),
"version": self._version_value,
"comment": self._comment_value,
"description": self._description_input.toPlainText(),
}
self.close()

View file

@ -4,6 +4,8 @@ from qtpy import QtWidgets, QtCore
def file_size_to_string(file_size):
if not file_size:
return "N/A"
size = 0
size_ending_mapping = {
"KB": 1024 ** 1,
@ -43,44 +45,47 @@ class SidePanelWidget(QtWidgets.QWidget):
details_input = QtWidgets.QPlainTextEdit(self)
details_input.setReadOnly(True)
artist_note_widget = QtWidgets.QWidget(self)
note_label = QtWidgets.QLabel("Artist note", artist_note_widget)
note_input = QtWidgets.QPlainTextEdit(artist_note_widget)
btn_note_save = QtWidgets.QPushButton("Save note", artist_note_widget)
description_widget = QtWidgets.QWidget(self)
description_label = QtWidgets.QLabel("Artist note", description_widget)
description_input = QtWidgets.QPlainTextEdit(description_widget)
btn_description_save = QtWidgets.QPushButton(
"Save note", description_widget
)
artist_note_layout = QtWidgets.QVBoxLayout(artist_note_widget)
artist_note_layout.setContentsMargins(0, 0, 0, 0)
artist_note_layout.addWidget(note_label, 0)
artist_note_layout.addWidget(note_input, 1)
artist_note_layout.addWidget(
btn_note_save, 0, alignment=QtCore.Qt.AlignRight
description_layout = QtWidgets.QVBoxLayout(description_widget)
description_layout.setContentsMargins(0, 0, 0, 0)
description_layout.addWidget(description_label, 0)
description_layout.addWidget(description_input, 1)
description_layout.addWidget(
btn_description_save, 0, alignment=QtCore.Qt.AlignRight
)
main_layout = QtWidgets.QVBoxLayout(self)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.addWidget(details_label, 0)
main_layout.addWidget(details_input, 1)
main_layout.addWidget(artist_note_widget, 1)
main_layout.addWidget(description_widget, 1)
note_input.textChanged.connect(self._on_note_change)
btn_note_save.clicked.connect(self._on_save_click)
description_input.textChanged.connect(self._on_description_change)
btn_description_save.clicked.connect(self._on_save_click)
controller.register_event_callback(
"selection.workarea.changed", self._on_selection_change
)
self._details_input = details_input
self._artist_note_widget = artist_note_widget
self._note_input = note_input
self._btn_note_save = btn_note_save
self._description_widget = description_widget
self._description_input = description_input
self._btn_description_save = btn_description_save
self._folder_id = None
self._task_name = None
self._task_id = None
self._filepath = None
self._orig_note = ""
self._rootless_path = None
self._orig_description = ""
self._controller = controller
self._set_context(None, None, None)
self._set_context(None, None, None, None)
def set_published_mode(self, published_mode):
"""Change published mode.
@ -89,64 +94,69 @@ class SidePanelWidget(QtWidgets.QWidget):
published_mode (bool): Published mode enabled.
"""
self._artist_note_widget.setVisible(not published_mode)
self._description_widget.setVisible(not published_mode)
def _on_selection_change(self, event):
folder_id = event["folder_id"]
task_name = event["task_name"]
task_id = event["task_id"]
filepath = event["path"]
rootless_path = event["rootless_path"]
self._set_context(folder_id, task_name, filepath)
self._set_context(folder_id, task_id, rootless_path, filepath)
def _on_note_change(self):
text = self._note_input.toPlainText()
self._btn_note_save.setEnabled(self._orig_note != text)
def _on_description_change(self):
text = self._description_input.toPlainText()
self._btn_description_save.setEnabled(self._orig_description != text)
def _on_save_click(self):
note = self._note_input.toPlainText()
description = self._description_input.toPlainText()
self._controller.save_workfile_info(
self._folder_id,
self._task_name,
self._filepath,
note
self._task_id,
self._rootless_path,
description=description,
)
self._orig_note = note
self._btn_note_save.setEnabled(False)
self._orig_description = description
self._btn_description_save.setEnabled(False)
def _set_context(self, folder_id, task_name, filepath):
def _set_context(self, folder_id, task_id, rootless_path, filepath):
workfile_info = None
# Check if folder, task and file are selected
if bool(folder_id) and bool(task_name) and bool(filepath):
if folder_id and task_id and rootless_path:
workfile_info = self._controller.get_workfile_info(
folder_id, task_name, filepath
folder_id, task_id, rootless_path
)
enabled = workfile_info is not None
self._details_input.setEnabled(enabled)
self._note_input.setEnabled(enabled)
self._btn_note_save.setEnabled(enabled)
self._description_input.setEnabled(enabled)
self._btn_description_save.setEnabled(enabled)
self._folder_id = folder_id
self._task_name = task_name
self._task_id = task_id
self._filepath = filepath
self._rootless_path = rootless_path
# Disable inputs and remove texts if any required arguments are
# missing
if not enabled:
self._orig_note = ""
self._orig_description = ""
self._details_input.setPlainText("")
self._note_input.setPlainText("")
self._description_input.setPlainText("")
return
note = workfile_info.note
size_value = file_size_to_string(workfile_info.filesize)
description = workfile_info.description
size_value = file_size_to_string(workfile_info.file_size)
# Append html string
datetime_format = "%b %d %Y %H:%M:%S"
creation_time = datetime.datetime.fromtimestamp(
workfile_info.creation_time)
modification_time = datetime.datetime.fromtimestamp(
workfile_info.modification_time)
file_created = workfile_info.file_created
modification_time = workfile_info.file_modified
if file_created:
file_created = datetime.datetime.fromtimestamp(file_created)
if modification_time:
modification_time = datetime.datetime.fromtimestamp(
modification_time)
user_items_by_name = self._controller.get_user_items_by_name()
@ -156,33 +166,38 @@ class SidePanelWidget(QtWidgets.QWidget):
return user_item.full_name
return username
created_lines = [
creation_time.strftime(datetime_format)
]
created_lines = []
if workfile_info.created_by:
created_lines.insert(
0, convert_username(workfile_info.created_by)
created_lines.append(
convert_username(workfile_info.created_by)
)
if file_created:
created_lines.append(file_created.strftime(datetime_format))
modified_lines = [
modification_time.strftime(datetime_format)
]
if created_lines:
created_lines.insert(0, "<b>Created:</b>")
modified_lines = []
if workfile_info.updated_by:
modified_lines.insert(
0, convert_username(workfile_info.updated_by)
modified_lines.append(
convert_username(workfile_info.updated_by)
)
if modification_time:
modified_lines.append(
modification_time.strftime(datetime_format)
)
if modified_lines:
modified_lines.insert(0, "<b>Modified:</b>")
lines = (
"<b>Size:</b>",
size_value,
"<b>Created:</b>",
"<br/>".join(created_lines),
"<b>Modified:</b>",
"<br/>".join(modified_lines),
)
self._orig_note = note
self._note_input.setPlainText(note)
self._orig_description = description
self._description_input.setPlainText(description)
# Set as empty string
self._details_input.setPlainText("")
self._details_input.appendHtml("<br>".join(lines))
self._details_input.appendHtml("<br/>".join(lines))

View file

@ -1,3 +1,3 @@
# -*- coding: utf-8 -*-
"""Package declaring AYON addon 'core' version."""
__version__ = "1.4.1+dev"
__version__ = "1.5.0+dev"

View file

@ -1,6 +1,6 @@
name = "core"
title = "Core"
version = "1.4.1+dev"
version = "1.5.0+dev"
client_dir = "ayon_core"

View file

@ -5,7 +5,7 @@
[tool.poetry]
name = "ayon-core"
version = "1.4.1+dev"
version = "1.5.0+dev"
description = ""
authors = ["Ynput Team <team@ynput.io>"]
readme = "README.md"
@ -19,6 +19,7 @@ python = ">=3.9.1,<3.10"
pytest = "^8.0"
pytest-print = "^1.0"
ayon-python-api = "^1.0"
arrow = "0.17.0"
# linting dependencies
ruff = "^0.11.7"
pre-commit = "^4"

View file

@ -747,6 +747,11 @@ class ExtractReviewProfileModel(BaseSettingsModel):
hosts: list[str] = SettingsField(
default_factory=list, title="Host names"
)
task_types: list[str] = SettingsField(
default_factory=list,
title="Task Types",
enum_resolver=task_types_enum,
)
outputs: list[ExtractReviewOutputDefModel] = SettingsField(
default_factory=list, title="Output Definitions"
)
@ -1348,6 +1353,7 @@ DEFAULT_PUBLISH_VALUES = {
{
"product_types": [],
"hosts": [],
"task_types": [],
"outputs": [
{
"name": "png",