mirror of
https://github.com/ynput/ayon-core.git
synced 2025-12-24 12:54:40 +01:00
Chore: Queued event system (#5514)
* implemented queued event system * implemented basic tests
This commit is contained in:
parent
c157f74b49
commit
e56d3530cb
2 changed files with 171 additions and 2 deletions
83
tests/unit/openpype/lib/test_event_system.py
Normal file
83
tests/unit/openpype/lib/test_event_system.py
Normal file
|
|
@ -0,0 +1,83 @@
|
|||
from openpype.lib.events import EventSystem, QueuedEventSystem
|
||||
|
||||
|
||||
def test_default_event_system():
|
||||
output = []
|
||||
expected_output = [3, 2, 1]
|
||||
event_system = EventSystem()
|
||||
|
||||
def callback_1():
|
||||
event_system.emit("topic.2", {}, None)
|
||||
output.append(1)
|
||||
|
||||
def callback_2():
|
||||
event_system.emit("topic.3", {}, None)
|
||||
output.append(2)
|
||||
|
||||
def callback_3():
|
||||
output.append(3)
|
||||
|
||||
event_system.add_callback("topic.1", callback_1)
|
||||
event_system.add_callback("topic.2", callback_2)
|
||||
event_system.add_callback("topic.3", callback_3)
|
||||
|
||||
event_system.emit("topic.1", {}, None)
|
||||
|
||||
assert output == expected_output, (
|
||||
"Callbacks were not called in correct order")
|
||||
|
||||
|
||||
def test_base_event_system_queue():
|
||||
output = []
|
||||
expected_output = [1, 2, 3]
|
||||
event_system = QueuedEventSystem()
|
||||
|
||||
def callback_1():
|
||||
event_system.emit("topic.2", {}, None)
|
||||
output.append(1)
|
||||
|
||||
def callback_2():
|
||||
event_system.emit("topic.3", {}, None)
|
||||
output.append(2)
|
||||
|
||||
def callback_3():
|
||||
output.append(3)
|
||||
|
||||
event_system.add_callback("topic.1", callback_1)
|
||||
event_system.add_callback("topic.2", callback_2)
|
||||
event_system.add_callback("topic.3", callback_3)
|
||||
|
||||
event_system.emit("topic.1", {}, None)
|
||||
|
||||
assert output == expected_output, (
|
||||
"Callbacks were not called in correct order")
|
||||
|
||||
|
||||
def test_manual_event_system_queue():
|
||||
output = []
|
||||
expected_output = [1, 2, 3]
|
||||
event_system = QueuedEventSystem(auto_execute=False)
|
||||
|
||||
def callback_1():
|
||||
event_system.emit("topic.2", {}, None)
|
||||
output.append(1)
|
||||
|
||||
def callback_2():
|
||||
event_system.emit("topic.3", {}, None)
|
||||
output.append(2)
|
||||
|
||||
def callback_3():
|
||||
output.append(3)
|
||||
|
||||
event_system.add_callback("topic.1", callback_1)
|
||||
event_system.add_callback("topic.2", callback_2)
|
||||
event_system.add_callback("topic.3", callback_3)
|
||||
|
||||
event_system.emit("topic.1", {}, None)
|
||||
|
||||
while True:
|
||||
if event_system.process_next_event() is None:
|
||||
break
|
||||
|
||||
assert output == expected_output, (
|
||||
"Callbacks were not called in correct order")
|
||||
Loading…
Add table
Add a link
Reference in a new issue