Merge branch 'develop' into 989-ay-7315_extract-review-and-oiio-transcode-failing-to-transcode-media-blocking-publishes-2

This commit is contained in:
Roy Nieterau 2025-06-23 13:32:34 +02:00 committed by GitHub
commit 4629a09036
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
47 changed files with 6781 additions and 417 deletions

1
tests/__init__.py Normal file
View file

@ -0,0 +1 @@
"""Tests."""

View file

@ -0,0 +1 @@
"""Tests for the representation traits."""

View file

@ -0,0 +1,25 @@
"""Metadata traits."""
from typing import ClassVar
from ayon_core.pipeline.traits import TraitBase
class NewTestTrait(TraitBase):
"""New Test trait model.
This model represents a tagged trait.
Attributes:
name (str): Trait name.
description (str): Trait description.
id (str): id should be namespaced trait name with version
"""
name: ClassVar[str] = "New Test Trait"
description: ClassVar[str] = (
"This test trait is used for testing updating."
)
id: ClassVar[str] = "ayon.test.NewTestTrait.v999"
__all__ = ["NewTestTrait"]

View file

@ -0,0 +1,184 @@
"""Tests for the content traits."""
from __future__ import annotations
import re
from pathlib import Path
import pytest
from ayon_core.pipeline.traits import (
Bundle,
FileLocation,
FileLocations,
FrameRanged,
Image,
MimeType,
PixelBased,
Planar,
Representation,
Sequence,
)
from ayon_core.pipeline.traits.trait import TraitValidationError
def test_bundles() -> None:
"""Test bundle trait."""
diffuse_texture = [
Image(),
PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
Planar(planar_configuration="RGB"),
FileLocation(
file_path=Path("/path/to/diffuse.jpg"),
file_size=1024,
file_hash=None),
MimeType(mime_type="image/jpeg"),
]
bump_texture = [
Image(),
PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
Planar(planar_configuration="RGB"),
FileLocation(
file_path=Path("/path/to/bump.tif"),
file_size=1024,
file_hash=None),
MimeType(mime_type="image/tiff"),
]
bundle = Bundle(items=[diffuse_texture, bump_texture])
representation = Representation(name="test_bundle", traits=[bundle])
if representation.contains_trait(trait=Bundle):
assert representation.get_trait(trait=Bundle).items == [
diffuse_texture, bump_texture
]
for item in representation.get_trait(trait=Bundle).items:
sub_representation = Representation(name="test", traits=item)
assert sub_representation.contains_trait(trait=Image)
sub: MimeType = sub_representation.get_trait(trait=MimeType)
assert sub.mime_type in {
"image/jpeg", "image/tiff"
}
def test_file_locations_validation() -> None:
"""Test FileLocations trait validation."""
file_locations_list = [
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1001, 1051)
]
representation = Representation(name="test", traits=[
FileLocations(file_paths=file_locations_list),
Sequence(frame_padding=4),
])
file_locations_trait: FileLocations = FileLocations(
file_paths=file_locations_list)
# this should be valid trait
file_locations_trait.validate_trait(representation)
# add valid FrameRanged trait
frameranged_trait = FrameRanged(
frame_start=1001,
frame_end=1050,
frames_per_second="25"
)
representation.add_trait(frameranged_trait)
# it should still validate fine
file_locations_trait.validate_trait(representation)
# create empty file locations trait
empty_file_locations_trait = FileLocations(file_paths=[])
representation = Representation(name="test", traits=[
empty_file_locations_trait
])
with pytest.raises(TraitValidationError):
empty_file_locations_trait.validate_trait(representation)
# create valid file locations trait but with not matching
# frame range trait
representation = Representation(name="test", traits=[
FileLocations(file_paths=file_locations_list),
Sequence(frame_padding=4),
])
invalid_sequence_trait = FrameRanged(
frame_start=1001,
frame_end=1051,
frames_per_second="25"
)
representation.add_trait(invalid_sequence_trait)
with pytest.raises(TraitValidationError):
file_locations_trait.validate_trait(representation)
# invalid representation with multiple file locations but
# unrelated to either Sequence or Bundle traits
representation = Representation(name="test", traits=[
FileLocations(file_paths=[
FileLocation(
file_path=Path("/path/to/file_foo.exr"),
file_size=1024,
file_hash=None,
),
FileLocation(
file_path=Path("/path/to/anotherfile.obj"),
file_size=1234,
file_hash=None,
)
])
])
with pytest.raises(TraitValidationError):
representation.validate()
def test_get_file_location_from_frame() -> None:
"""Test get_file_location_from_frame method."""
file_locations_list = [
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1001, 1051)
]
file_locations_trait: FileLocations = FileLocations(
file_paths=file_locations_list)
assert file_locations_trait.get_file_location_for_frame(frame=1001) == \
file_locations_list[0]
assert file_locations_trait.get_file_location_for_frame(frame=1050) == \
file_locations_list[-1]
assert file_locations_trait.get_file_location_for_frame(frame=1100) is None
# test with custom regex
sequence = Sequence(
frame_padding=4,
frame_regex=re.compile(r"boo_(?P<index>(?P<padding>0*)\d+)\.exr"))
file_locations_list = [
FileLocation(
file_path=Path(f"/path/to/boo_{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1001, 1051)
]
file_locations_trait = FileLocations(
file_paths=file_locations_list)
assert file_locations_trait.get_file_location_for_frame(
frame=1001, sequence_trait=sequence) == \
file_locations_list[0]

View file

@ -0,0 +1,248 @@
"""Tests for the time related traits."""
from __future__ import annotations
import re
from pathlib import Path
import pytest
from ayon_core.pipeline.traits import (
FileLocation,
FileLocations,
FrameRanged,
Handles,
Representation,
Sequence,
)
from ayon_core.pipeline.traits.trait import TraitValidationError
def test_sequence_validations() -> None:
"""Test Sequence trait validation."""
file_locations_list = [
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1001, 1010 + 1) # because range is zero based
]
file_locations_list += [
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1015, 1020 + 1)
]
file_locations_list += [
FileLocation
(
file_path=Path("/path/to/file.1100.exr"),
file_size=1024,
file_hash=None,
)
]
representation = Representation(name="test_1", traits=[
FileLocations(file_paths=file_locations_list),
FrameRanged(
frame_start=1001,
frame_end=1100, frames_per_second="25"),
Sequence(
frame_padding=4,
frame_spec="1001-1010,1015-1020,1100")
])
representation.get_trait(Sequence).validate_trait(representation)
# here we set handles and set them as inclusive, so this should pass
representation = Representation(name="test_2", traits=[
FileLocations(file_paths=[
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1001, 1100 + 1) # because range is zero based
]),
Handles(
frame_start_handle=5,
frame_end_handle=5,
inclusive=True
),
FrameRanged(
frame_start=1001,
frame_end=1100, frames_per_second="25"),
Sequence(frame_padding=4)
])
representation.validate()
# do the same but set handles as exclusive
representation = Representation(name="test_3", traits=[
FileLocations(file_paths=[
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(996, 1105 + 1) # because range is zero based
]),
Handles(
frame_start_handle=5,
frame_end_handle=5,
inclusive=False
),
FrameRanged(
frame_start=1001,
frame_end=1100, frames_per_second="25"),
Sequence(frame_padding=4)
])
representation.validate()
# invalid representation with file range not extended for handles
representation = Representation(name="test_4", traits=[
FileLocations(file_paths=[
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1001, 1050 + 1) # because range is zero based
]),
Handles(
frame_start_handle=5,
frame_end_handle=5,
inclusive=False
),
FrameRanged(
frame_start=1001,
frame_end=1050, frames_per_second="25"),
Sequence(frame_padding=4)
])
with pytest.raises(TraitValidationError):
representation.validate()
# invalid representation with frame spec not matching the files
del representation
representation = Representation(name="test_5", traits=[
FileLocations(file_paths=[
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1001, 1050 + 1) # because range is zero based
]),
FrameRanged(
frame_start=1001,
frame_end=1050, frames_per_second="25"),
Sequence(frame_padding=4, frame_spec="1001-1010,1012-2000")
])
with pytest.raises(TraitValidationError):
representation.validate()
representation = Representation(name="test_6", traits=[
FileLocations(file_paths=[
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1001, 1050 + 1) # because range is zero based
]),
Sequence(frame_padding=4, frame_spec="1-1010,1012-1050"),
Handles(
frame_start_handle=5,
frame_end_handle=5,
inclusive=False
)
])
with pytest.raises(TraitValidationError):
representation.validate()
representation = Representation(name="test_6", traits=[
FileLocations(file_paths=[
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(996, 1050 + 1) # because range is zero based
]),
Sequence(frame_padding=4, frame_spec="1001-1010,1012-2000"),
Handles(
frame_start_handle=5,
frame_end_handle=5,
inclusive=False
)
])
with pytest.raises(TraitValidationError):
representation.validate()
representation = Representation(name="test_7", traits=[
FileLocations(file_paths=[
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(996, 1050 + 1) # because range is zero based
]),
Sequence(
frame_padding=4,
frame_regex=re.compile(
r"img\.(?P<index>(?P<padding>0*)\d{4})\.png$")),
Handles(
frame_start_handle=5,
frame_end_handle=5,
inclusive=False
)
])
representation.validate()
def test_list_spec_to_frames() -> None:
"""Test converting list specification to frames."""
assert Sequence.list_spec_to_frames("1-10,20-30,55") == [
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 55
]
assert Sequence.list_spec_to_frames("1,2,3,4,5") == [
1, 2, 3, 4, 5
]
assert Sequence.list_spec_to_frames("1-10") == [
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
]
test_list = list(range(1001, 1011))
test_list += list(range(1012, 2001))
assert Sequence.list_spec_to_frames("1001-1010,1012-2000") == test_list
assert Sequence.list_spec_to_frames("1") == [1]
with pytest.raises(
ValueError,
match=r"Invalid frame number in the list: .*"):
Sequence.list_spec_to_frames("a")
def test_sequence_get_frame_padding() -> None:
"""Test getting frame padding from FileLocations trait."""
file_locations_list = [
FileLocation(
file_path=Path(f"/path/to/file.{frame}.exr"),
file_size=1024,
file_hash=None,
)
for frame in range(1001, 1051)
]
representation = Representation(name="test", traits=[
FileLocations(file_paths=file_locations_list)
])
assert Sequence.get_frame_padding(
file_locations=representation.get_trait(FileLocations)) == 4

View file

@ -0,0 +1,405 @@
"""Tests for the representation traits."""
from __future__ import annotations
from pathlib import Path
import pytest
from ayon_core.pipeline.traits import (
Bundle,
FileLocation,
Image,
MimeType,
Overscan,
PixelBased,
Planar,
Representation,
TraitBase,
)
REPRESENTATION_DATA: dict = {
FileLocation.id: {
"file_path": Path("/path/to/file"),
"file_size": 1024,
"file_hash": None,
# "persistent": True,
},
Image.id: {},
PixelBased.id: {
"display_window_width": 1920,
"display_window_height": 1080,
"pixel_aspect_ratio": 1.0,
# "persistent": True,
},
Planar.id: {
"planar_configuration": "RGB",
# "persistent": True,
},
}
class UpgradedImage(Image):
"""Upgraded image class."""
id = "ayon.2d.Image.v2"
@classmethod
def upgrade(cls, data: dict) -> UpgradedImage: # noqa: ARG003
"""Upgrade the trait.
Returns:
UpgradedImage: Upgraded image instance.
"""
return cls()
class InvalidTrait:
"""Invalid trait class."""
foo = "bar"
@pytest.fixture
def representation() -> Representation:
"""Return a traits data instance."""
return Representation(name="test", traits=[
FileLocation(**REPRESENTATION_DATA[FileLocation.id]),
Image(),
PixelBased(**REPRESENTATION_DATA[PixelBased.id]),
Planar(**REPRESENTATION_DATA[Planar.id]),
])
def test_representation_errors(representation: Representation) -> None:
"""Test errors in representation."""
with pytest.raises(ValueError,
match=r"Invalid trait .* - ID is required."):
representation.add_trait(InvalidTrait())
with pytest.raises(ValueError,
match=f"Trait with ID {Image.id} already exists."):
representation.add_trait(Image())
with pytest.raises(ValueError,
match=r"Trait with ID .* not found."):
representation.remove_trait_by_id("foo")
def test_representation_traits(representation: Representation) -> None:
"""Test setting and getting traits."""
assert representation.get_trait_by_id(
"ayon.2d.PixelBased").get_version() == 1
assert len(representation) == len(REPRESENTATION_DATA)
assert representation.get_trait_by_id(FileLocation.id)
assert representation.get_trait_by_id(Image.id)
assert representation.get_trait_by_id(trait_id="ayon.2d.Image.v1")
assert representation.get_trait_by_id(PixelBased.id)
assert representation.get_trait_by_id(trait_id="ayon.2d.PixelBased.v1")
assert representation.get_trait_by_id(Planar.id)
assert representation.get_trait_by_id(trait_id="ayon.2d.Planar.v1")
assert representation.get_trait(FileLocation)
assert representation.get_trait(Image)
assert representation.get_trait(PixelBased)
assert representation.get_trait(Planar)
assert issubclass(
type(representation.get_trait(FileLocation)), TraitBase)
assert representation.get_trait(FileLocation) == \
representation.get_trait_by_id(FileLocation.id)
assert representation.get_trait(Image) == \
representation.get_trait_by_id(Image.id)
assert representation.get_trait(PixelBased) == \
representation.get_trait_by_id(PixelBased.id)
assert representation.get_trait(Planar) == \
representation.get_trait_by_id(Planar.id)
assert representation.get_trait_by_id(
"ayon.2d.PixelBased.v1").display_window_width == \
REPRESENTATION_DATA[PixelBased.id]["display_window_width"]
assert representation.get_trait(
trait=PixelBased).display_window_height == \
REPRESENTATION_DATA[PixelBased.id]["display_window_height"]
repre_dict = {
FileLocation.id: FileLocation(**REPRESENTATION_DATA[FileLocation.id]),
Image.id: Image(),
PixelBased.id: PixelBased(**REPRESENTATION_DATA[PixelBased.id]),
Planar.id: Planar(**REPRESENTATION_DATA[Planar.id]),
}
assert representation.get_traits() == repre_dict
assert representation.get_traits_by_ids(
trait_ids=[FileLocation.id, Image.id, PixelBased.id, Planar.id]) == \
repre_dict
assert representation.get_traits(
[FileLocation, Image, PixelBased, Planar]) == \
repre_dict
assert representation.has_traits() is True
empty_representation: Representation = Representation(
name="test", traits=[])
assert empty_representation.has_traits() is False
assert representation.contains_trait(trait=FileLocation) is True
assert representation.contains_traits([Image, FileLocation]) is True
assert representation.contains_trait_by_id(FileLocation.id) is True
assert representation.contains_traits_by_id(
trait_ids=[FileLocation.id, Image.id]) is True
assert representation.contains_trait(trait=Bundle) is False
assert representation.contains_traits([Image, Bundle]) is False
assert representation.contains_trait_by_id(Bundle.id) is False
assert representation.contains_traits_by_id(
trait_ids=[FileLocation.id, Bundle.id]) is False
def test_trait_removing(representation: Representation) -> None:
"""Test removing traits."""
assert representation.contains_trait_by_id("nonexistent") is False
with pytest.raises(
ValueError, match=r"Trait with ID nonexistent not found."):
representation.remove_trait_by_id("nonexistent")
assert representation.contains_trait(trait=FileLocation) is True
representation.remove_trait(trait=FileLocation)
assert representation.contains_trait(trait=FileLocation) is False
assert representation.contains_trait_by_id(Image.id) is True
representation.remove_trait_by_id(Image.id)
assert representation.contains_trait_by_id(Image.id) is False
assert representation.contains_traits([PixelBased, Planar]) is True
representation.remove_traits([Planar, PixelBased])
assert representation.contains_traits([PixelBased, Planar]) is False
assert representation.has_traits() is False
with pytest.raises(
ValueError, match=f"Trait with ID {Image.id} not found."):
representation.remove_trait(Image)
def test_representation_dict_properties(
representation: Representation) -> None:
"""Test representation as dictionary."""
representation = Representation(name="test")
representation[Image.id] = Image()
assert Image.id in representation
image = representation[Image.id]
assert image == Image()
for trait_id, trait in representation.items():
assert trait_id == Image.id
assert trait == Image()
def test_getting_traits_data(representation: Representation) -> None:
"""Test getting a batch of traits."""
result = representation.get_traits_by_ids(
trait_ids=[FileLocation.id, Image.id, PixelBased.id, Planar.id])
assert result == {
"ayon.2d.Image.v1": Image(),
"ayon.2d.PixelBased.v1": PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
"ayon.2d.Planar.v1": Planar(planar_configuration="RGB"),
"ayon.content.FileLocation.v1": FileLocation(
file_path=Path("/path/to/file"),
file_size=1024,
file_hash=None)
}
def test_traits_data_to_dict(representation: Representation) -> None:
"""Test converting traits data to dictionary."""
result = representation.traits_as_dict()
assert result == REPRESENTATION_DATA
def test_get_version_from_id() -> None:
"""Test getting version from trait ID."""
assert Image().get_version() == 1
class TestOverscan(Overscan):
id = "ayon.2d.Overscan.v2"
assert TestOverscan(
left=0,
right=0,
top=0,
bottom=0
).get_version() == 2
class TestMimeType(MimeType):
id = "ayon.content.MimeType"
assert TestMimeType(mime_type="foo/bar").get_version() is None
def test_get_versionless_id() -> None:
"""Test getting versionless trait ID."""
assert Image().get_versionless_id() == "ayon.2d.Image"
class TestOverscan(Overscan):
id = "ayon.2d.Overscan.v2"
assert TestOverscan(
left=0,
right=0,
top=0,
bottom=0
).get_versionless_id() == "ayon.2d.Overscan"
class TestMimeType(MimeType):
id = "ayon.content.MimeType"
assert TestMimeType(mime_type="foo/bar").get_versionless_id() == \
"ayon.content.MimeType"
def test_from_dict() -> None:
"""Test creating representation from dictionary."""
traits_data = {
"ayon.content.FileLocation.v1": {
"file_path": "/path/to/file",
"file_size": 1024,
"file_hash": None,
},
"ayon.2d.Image.v1": {},
}
representation = Representation.from_dict(
"test", trait_data=traits_data)
assert len(representation) == 2
assert representation.get_trait_by_id("ayon.content.FileLocation.v1")
assert representation.get_trait_by_id("ayon.2d.Image.v1")
traits_data = {
"ayon.content.FileLocation.v999": {
"file_path": "/path/to/file",
"file_size": 1024,
"file_hash": None,
},
}
with pytest.raises(ValueError, match=r"Trait model with ID .* not found."):
representation = Representation.from_dict(
"test", trait_data=traits_data)
traits_data = {
"ayon.content.FileLocation": {
"file_path": "/path/to/file",
"file_size": 1024,
"file_hash": None,
},
}
representation = Representation.from_dict(
"test", trait_data=traits_data)
assert len(representation) == 1
assert representation.get_trait_by_id("ayon.content.FileLocation.v1")
# this won't work right now because we would need to somewhat mock
# the import
"""
from .lib import NewTestTrait
traits_data = {
"ayon.test.NewTestTrait.v1": {},
}
representation = Representation.from_dict(
"test", trait_data=traits_data)
"""
def test_representation_equality() -> None:
"""Test representation equality."""
# rep_a and rep_b are equal
rep_a = Representation(name="test", traits=[
FileLocation(file_path=Path("/path/to/file"), file_size=1024),
Image(),
PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
Planar(planar_configuration="RGB"),
])
rep_b = Representation(name="test", traits=[
FileLocation(file_path=Path("/path/to/file"), file_size=1024),
Image(),
PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
Planar(planar_configuration="RGB"),
])
# rep_c has different value for planar_configuration then rep_a and rep_b
rep_c = Representation(name="test", traits=[
FileLocation(file_path=Path("/path/to/file"), file_size=1024),
Image(),
PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
Planar(planar_configuration="RGBA"),
])
rep_d = Representation(name="test", traits=[
FileLocation(file_path=Path("/path/to/file"), file_size=1024),
Image(),
])
rep_e = Representation(name="foo", traits=[
FileLocation(file_path=Path("/path/to/file"), file_size=1024),
Image(),
])
rep_f = Representation(name="foo", traits=[
FileLocation(file_path=Path("/path/to/file"), file_size=1024),
Planar(planar_configuration="RGBA"),
])
# let's assume ids are the same (because ids are randomly generated)
rep_b.representation_id = rep_d.representation_id = rep_a.representation_id
rep_c.representation_id = rep_e.representation_id = rep_a.representation_id
rep_f.representation_id = rep_a.representation_id
assert rep_a == rep_b
# because of the trait value difference
assert rep_a != rep_c
# because of the type difference
assert rep_a != "foo"
# because of the trait count difference
assert rep_a != rep_d
# because of the name difference
assert rep_d != rep_e
# because of the trait difference
assert rep_d != rep_f
def test_get_repre_by_name():
"""Test getting representation by name."""
rep_a = Representation(name="test_a", traits=[
FileLocation(file_path=Path("/path/to/file"), file_size=1024),
Image(),
PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
Planar(planar_configuration="RGB"),
])
rep_b = Representation(name="test_b", traits=[
FileLocation(file_path=Path("/path/to/file"), file_size=1024),
Image(),
PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
Planar(planar_configuration="RGB"),
])
representations = [rep_a, rep_b]
_ = next(rep for rep in representations if rep.name == "test_a")

View file

@ -0,0 +1,63 @@
"""Tests for the 2d related traits."""
from __future__ import annotations
from pathlib import Path
from ayon_core.pipeline.traits import (
UDIM,
FileLocation,
FileLocations,
Representation,
)
def test_get_file_location_for_udim() -> None:
"""Test get_file_location_for_udim."""
file_locations_list = [
FileLocation(
file_path=Path("/path/to/file.1001.exr"),
file_size=1024,
file_hash=None,
),
FileLocation(
file_path=Path("/path/to/file.1002.exr"),
file_size=1024,
file_hash=None,
),
FileLocation(
file_path=Path("/path/to/file.1003.exr"),
file_size=1024,
file_hash=None,
),
]
representation = Representation(name="test_1", traits=[
FileLocations(file_paths=file_locations_list),
UDIM(udim=[1001, 1002, 1003]),
])
udim_trait = representation.get_trait(UDIM)
assert udim_trait.get_file_location_for_udim(
file_locations=representation.get_trait(FileLocations),
udim=1001
) == file_locations_list[0]
def test_get_udim_from_file_location() -> None:
"""Test get_udim_from_file_location."""
file_location_1 = FileLocation(
file_path=Path("/path/to/file.1001.exr"),
file_size=1024,
file_hash=None,
)
file_location_2 = FileLocation(
file_path=Path("/path/to/file.xxxxx.exr"),
file_size=1024,
file_hash=None,
)
assert UDIM(udim=[1001]).get_udim_from_file_location(
file_location_1) == 1001
assert UDIM(udim=[1001]).get_udim_from_file_location(
file_location_2) is None

View file

@ -0,0 +1,451 @@
"""Tests for the representation traits."""
from __future__ import annotations
import base64
import re
import time
from pathlib import Path
from typing import TYPE_CHECKING
import pyblish.api
import pytest
from ayon_core.lib.file_transaction import (
FileTransaction,
)
from ayon_core.pipeline.anatomy import Anatomy
from ayon_core.pipeline.traits import (
Bundle,
FileLocation,
FileLocations,
FrameRanged,
Image,
MimeType,
Persistent,
PixelBased,
Representation,
Sequence,
Transient,
)
from ayon_core.pipeline.version_start import get_versioning_start
# Tagged,
# TemplatePath,
from ayon_core.plugins.publish.integrate_traits import (
IntegrateTraits,
TransferItem,
)
from ayon_core.settings import get_project_settings
from ayon_api.operations import (
OperationsSession,
)
if TYPE_CHECKING:
import pytest_ayon
PNG_FILE_B64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQAAAAA3bvkkAAAACklEQVR4AWNgAAAAAgABc3UBGAAAAABJRU5ErkJggg==" # noqa: E501
SEQUENCE_LENGTH = 10
CURRENT_TIME = time.time()
@pytest.fixture(scope="session")
def single_file(tmp_path_factory: pytest.TempPathFactory) -> Path:
"""Return a temporary image file."""
filename = tmp_path_factory.mktemp("single") / "img.png"
filename.write_bytes(base64.b64decode(PNG_FILE_B64))
return filename
@pytest.fixture(scope="session")
def sequence_files(tmp_path_factory: pytest.TempPathFactory) -> list[Path]:
"""Return a sequence of temporary image files."""
files = []
dir_name = tmp_path_factory.mktemp("sequence")
for i in range(SEQUENCE_LENGTH):
frame = i + 1
filename = dir_name / f"img.{frame:04d}.png"
filename.write_bytes(base64.b64decode(PNG_FILE_B64))
files.append(filename)
return files
@pytest.fixture
def mock_context(
project: pytest_ayon.ProjectInfo,
single_file: Path,
sequence_files: list[Path]) -> pyblish.api.Context:
"""Return a mock instance.
This is mocking pyblish context for testing. It is using real AYON project
thanks to the ``project`` fixture.
Args:
project (object): The project info. It is `ProjectInfo` object
returned by pytest fixture.
single_file (Path): The path to a single image file.
sequence_files (list[Path]): The paths to a sequence of image files.
"""
anatomy = Anatomy(project.project_name)
context = pyblish.api.Context()
context.data["projectName"] = project.project_name
context.data["hostName"] = "test_host"
context.data["project_settings"] = get_project_settings(
project.project_name)
context.data["anatomy"] = anatomy
context.data["time"] = CURRENT_TIME
context.data["user"] = "test_user"
context.data["machine"] = "test_machine"
context.data["fps"] = 25
instance = context.create_instance("mock_instance")
instance.data["source"] = "test_source"
instance.data["families"] = ["render"]
parents = project.folder_entity["path"].lstrip("/").split("/")
hierarchy = "/".join(parents) if parents else ""
instance.data["anatomyData"] = {
"project": {
"name": project.project_name,
"code": project.project_code
},
"task": {
"name": project.task.name,
"type": "test" # pytest-ayon doesn't return the task type yet
},
"folder": {
"name": project.folder.name,
"type": "test" # pytest-ayon doesn't return the folder type yet
},
"product": {
"name": project.product.name,
"type": "test" # pytest-ayon doesn't return the product type yet
},
"hierarchy": hierarchy,
}
instance.data["folderEntity"] = project.folder_entity
instance.data["productType"] = "test_product"
instance.data["productName"] = project.product.name
instance.data["anatomy"] = anatomy
instance.data["comment"] = "test_comment"
instance.data["integrate"] = True
instance.data["farm"] = False
parents = project.folder_entity["path"].lstrip("/").split("/")
hierarchy = "/".join(parents) if parents else ""
instance.data["hierarchy"] = hierarchy
version_number = get_versioning_start(
context.data["projectName"],
instance.context.data["hostName"],
task_name=project.task.name,
task_type="test",
product_type=instance.data["productType"],
product_name=instance.data["productName"]
)
instance.data["version"] = version_number
file_size = len(base64.b64decode(PNG_FILE_B64))
file_locations = [
FileLocation(
file_path=f,
file_size=file_size)
for f in sequence_files]
instance.data["representations_with_traits"] = [
Representation(name="test_single", traits=[
Persistent(),
FileLocation(
file_path=single_file,
file_size=len(base64.b64decode(PNG_FILE_B64))),
Image(),
MimeType(mime_type="image/png"),
]),
Representation(name="test_sequence", traits=[
Persistent(),
FrameRanged(
frame_start=1,
frame_end=SEQUENCE_LENGTH,
frame_in=0,
frame_out=SEQUENCE_LENGTH - 1,
frames_per_second="25",
),
Sequence(
frame_padding=4,
frame_regex=re.compile(
r"img\.(?P<index>(?P<padding>0*)\d{4})\.png$"),
),
FileLocations(
file_paths=file_locations,
),
Image(),
PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
MimeType(mime_type="image/png"),
]),
Representation(name="test_bundle", traits=[
Persistent(),
Bundle(
items=[
[
FileLocation(
file_path=single_file,
file_size=len(base64.b64decode(PNG_FILE_B64))),
Image(),
MimeType(mime_type="image/png"),
],
[
Persistent(),
FrameRanged(
frame_start=1,
frame_end=SEQUENCE_LENGTH,
frame_in=0,
frame_out=SEQUENCE_LENGTH - 1,
frames_per_second="25",
),
Sequence(
frame_padding=4,
frame_regex=re.compile(
r"img\.(?P<index>(?P<padding>0*)\d{4})\.png$"),
),
FileLocations(
file_paths=file_locations,
),
Image(),
PixelBased(
display_window_width=1920,
display_window_height=1080,
pixel_aspect_ratio=1.0),
MimeType(mime_type="image/png"),
],
],
),
]),
]
return context
@pytest.mark.server
def test_get_template_name(mock_context: pyblish.api.Context) -> None:
"""Test get_template_name.
TODO (antirotor): this will always return "default" probably, if
there are no studio overrides. To test this properly, we need
to set up the studio overrides in the test environment.
"""
integrator = IntegrateTraits()
template_name = integrator.get_template_name(
mock_context[0])
assert template_name == "default"
class TestGetSize:
@staticmethod
def get_size(file_path: Path) -> int:
"""Get size of the file.
Args:
file_path (Path): File path.
Returns:
int: Size of the file.
"""
return file_path.stat().st_size
@pytest.mark.parametrize(
"file_path, expected_size",
[
(Path("./test_file_1.txt"), 10), # id: happy_path_small_file
(Path("./test_file_2.txt"), 1024), # id: happy_path_medium_file
(Path("./test_file_3.txt"), 10485760) # id: happy_path_large_file
],
ids=["happy_path_small_file",
"happy_path_medium_file",
"happy_path_large_file"]
)
def test_get_size_happy_path(
self, file_path: Path, expected_size: int, tmp_path: Path):
# Arrange
file_path = tmp_path / file_path
file_path.write_bytes(b"\0" * expected_size)
# Act
size = self.get_size(file_path)
# Assert
assert size == expected_size
@pytest.mark.parametrize(
"file_path, expected_size",
[
(Path("./test_file_empty.txt"), 0) # id: edge_case_empty_file
],
ids=["edge_case_empty_file"]
)
def test_get_size_edge_cases(
self, file_path: Path, expected_size: int, tmp_path: Path):
# Arrange
file_path = tmp_path / file_path
file_path.touch() # Create an empty file
# Act
size = self.get_size(file_path)
# Assert
assert size == expected_size
@pytest.mark.parametrize(
"file_path, expected_exception",
[
(
Path("./non_existent_file.txt"),
FileNotFoundError
), # id: error_file_not_found
(123, TypeError) # id: error_invalid_input_type
],
ids=["error_file_not_found", "error_invalid_input_type"]
)
def test_get_size_error_cases(
self, file_path, expected_exception, tmp_path):
# Act & Assert
with pytest.raises(expected_exception):
file_path = tmp_path / file_path
self.get_size(file_path)
def test_filter_lifecycle() -> None:
"""Test filter_lifecycle."""
integrator = IntegrateTraits()
persistent_representation = Representation(
name="test",
traits=[
Persistent(),
FileLocation(
file_path=Path("test"),
file_size=1234),
Image(),
MimeType(mime_type="image/png"),
])
transient_representation = Representation(
name="test",
traits=[
Transient(),
Image(),
MimeType(mime_type="image/png"),
])
filtered = integrator.filter_lifecycle(
[persistent_representation, transient_representation])
assert len(filtered) == 1
assert filtered[0] == persistent_representation
@pytest.mark.server
def test_prepare_product(
project: pytest_ayon.ProjectInfo,
mock_context: pyblish.api.Context) -> None:
"""Test prepare_product."""
integrator = IntegrateTraits()
op_session = OperationsSession()
product = integrator.prepare_product(mock_context[0], op_session)
assert product == {
"attrib": {},
"data": {
"families": ["default", "render"],
},
"folderId": project.folder_entity["id"],
"name": "renderMain",
"productType": "test_product",
"id": project.product_entity["id"],
}
@pytest.mark.server
def test_prepare_version(
project: pytest_ayon.ProjectInfo,
mock_context: pyblish.api.Context) -> None:
"""Test prepare_version."""
integrator = IntegrateTraits()
op_session = OperationsSession()
product = integrator.prepare_product(mock_context[0], op_session)
version = integrator.prepare_version(
mock_context[0], op_session, product)
assert version == {
"attrib": {
"comment": "test_comment",
"families": ["default", "render"],
"fps": 25,
"machine": "test_machine",
"source": "test_source",
},
"data": {
"author": "test_user",
"time": CURRENT_TIME,
},
"id": project.version_entity["id"],
"productId": project.product_entity["id"],
"version": 1,
}
@pytest.mark.server
def test_get_transfers_from_representation(
mock_context: pyblish.api.Context) -> None:
"""Test get_transfers_from_representation.
This tests getting actual transfers from the representations and
also the legacy files.
Todo: This test will benefit massively from a proper mocking of the
context. We need to parametrize the test with different
representations and test the output of the function.
"""
integrator = IntegrateTraits()
instance = mock_context[0]
representations: list[Representation] = instance.data[
"representations_with_traits"]
transfers = integrator.get_transfers_from_representations(
instance, representations)
assert len(representations) == 3
assert len(transfers) == 22
for transfer in transfers:
assert transfer.checksum == TransferItem.get_checksum(
transfer.source)
file_transactions = FileTransaction(
# Enforce unique transfers
allow_queue_replacements=False)
for transfer in transfers:
file_transactions.add(
transfer.source.as_posix(),
transfer.destination.as_posix(),
mode=FileTransaction.MODE_COPY,
)
file_transactions.process()
for representation in representations:
_ = integrator._get_legacy_files_for_representation( # noqa: SLF001
transfers, representation, anatomy=instance.data["anatomy"])

View file

@ -1,3 +1,4 @@
"""conftest.py: pytest configuration file."""
import sys
from pathlib import Path
@ -5,5 +6,3 @@ client_path = Path(__file__).resolve().parent.parent / "client"
# add client path to sys.path
sys.path.append(str(client_path))
print(f"Added {client_path} to sys.path")