Skip to content

Commit b050f71

Browse files
author
Alan Christie
committed
feat: A test message queue implementation for protocol buffers
1 parent ca798c8 commit b050f71

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

tests/message_queue.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from contextlib import suppress
2+
from multiprocessing import Event, Process, Queue
3+
from queue import Empty
4+
from typing import Callable
5+
6+
from google.protobuf.message import Message
7+
from informaticsmatters.protobuf.datamanager.pod_message_pb2 import PodMessage
8+
from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage
9+
10+
11+
class UnitTestMessageQueue(Process):
12+
"""A simple asynchronous message passer, used by the Validator
13+
(and UnitTestInstanceLauncher) to send ProtocolBuffer messages to the Engine."""
14+
15+
def __init__(self, receiver: Callable[[Message], None]):
16+
super().__init__()
17+
self._stop = Event()
18+
self._queue = Queue()
19+
self._receiver = receiver
20+
21+
def run(self):
22+
while not self._stop.is_set():
23+
with suppress(Empty):
24+
if item := self._queue.get(True, 0.25):
25+
msg = None
26+
# We only support Workflow and Pod messages
27+
# during testing...
28+
if item["class"] == "WorkflowMessage":
29+
msg = WorkflowMessage()
30+
msg.ParseFromString(item["bytes"])
31+
elif item["class"] == "PodMessage":
32+
msg = PodMessage()
33+
msg.ParseFromString(item["bytes"])
34+
assert msg
35+
self._receiver(msg)
36+
37+
def put(self, msg: Message):
38+
"""Puts a protocol buffer message onto the queue."""
39+
self._queue.put({"class": type(msg).__name__, "bytes": msg.SerializeToString()})
40+
41+
def stop(self):
42+
"""A request to stop the process."""
43+
self._stop.set()

tests/test_test_message_queue.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import time
2+
from datetime import datetime, timezone
3+
4+
import pytest
5+
6+
pytestmark = pytest.mark.unit
7+
8+
from google.protobuf.message import Message
9+
from informaticsmatters.protobuf.datamanager.workflow_message_pb2 import WorkflowMessage
10+
11+
from tests.message_queue import UnitTestMessageQueue
12+
13+
14+
class Receiver:
15+
"""A dummy message receiver."""
16+
17+
def handle_msg(self, msg: Message) -> None:
18+
assert msg
19+
print(f"Received message ({type(msg).__name__})")
20+
21+
22+
def test_start_and_stop():
23+
# Arrange
24+
receiver = Receiver()
25+
utmq = UnitTestMessageQueue(receiver=receiver.handle_msg)
26+
utmq.start()
27+
28+
# Act
29+
utmq.stop()
30+
utmq.join()
31+
32+
# Assert
33+
34+
35+
def test_send_messages():
36+
# Arrange
37+
receiver = Receiver()
38+
utmq = UnitTestMessageQueue(receiver=receiver.handle_msg)
39+
utmq.start()
40+
41+
# Act
42+
msg = WorkflowMessage()
43+
msg.timestamp = f"{datetime.now(timezone.utc).isoformat()}Z"
44+
msg.action = "START"
45+
msg.running_workflow = "r-workflow-00000000-0000-0000-0000-000000000000"
46+
utmq.put(msg)
47+
48+
msg = WorkflowMessage()
49+
msg.timestamp = f"{datetime.now(timezone.utc).isoformat()}Z"
50+
msg.action = "STOP"
51+
msg.running_workflow = "r-workflow-00000000-0000-0000-0000-000000000000"
52+
utmq.put(msg)
53+
54+
time.sleep(0.5)
55+
56+
utmq.stop()
57+
utmq.join()
58+
59+
# Assert

0 commit comments

Comments
 (0)