Skip to content

Commit 7d0b30e

Browse files
author
Sergio García Prado
committed
ISSUE #283
* Improve `RabbitMQBrokerPublisher` and `RabbitMQBrokerSubscriber`.
1 parent 9a05678 commit 7d0b30e

File tree

6 files changed

+113
-44
lines changed

6 files changed

+113
-44
lines changed

packages/plugins/minos-broker-rabbitmq/minos/plugins/rabbitmq/publisher.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
)
44

55
import logging
6+
from typing import (
7+
Optional,
8+
)
69

710
from aio_pika import (
811
Message,
@@ -25,23 +28,30 @@
2528
class RabbitMQBrokerPublisher(BrokerPublisher):
2629
"""RabbitMQ Broker Publisher class."""
2730

28-
def __init__(self, *args, host: str, port: int, **kwargs):
31+
def __init__(self, *args, host: Optional[str] = None, port: Optional[int] = None, **kwargs):
2932
super().__init__(*args, **kwargs)
33+
34+
if host is None:
35+
host = "localhost"
36+
if port is None:
37+
port = 5672
38+
3039
self.host = host
3140
self.port = port
3241

42+
self.connection = None
43+
self.channel = None
44+
3345
async def _setup(self) -> None:
3446
await super()._setup()
3547
self.connection = await connect(f"amqp://guest:guest@{self.host}:{self.port}/")
48+
self.channel = await self.connection.channel()
3649

3750
async def _destroy(self) -> None:
3851
await self.connection.close()
3952

4053
async def _send(self, message: BrokerMessage) -> None:
41-
async with self.connection:
42-
channel = await self.connection.channel()
43-
queue = await channel.declare_queue(message.topic)
44-
await channel.default_exchange.publish(Message(message.avro_bytes), routing_key=queue.name)
54+
await self.channel.default_exchange.publish(Message(message.avro_bytes), routing_key=message.topic)
4555

4656

4757
class RabbitMQBrokerPublisherBuilder(BrokerPublisherBuilder[RabbitMQBrokerPublisher], RabbitMQBrokerBuilderMixin):

packages/plugins/minos-broker-rabbitmq/minos/plugins/rabbitmq/subscriber.py

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,28 @@
33
)
44

55
import logging
6+
from asyncio import (
7+
CancelledError,
8+
Queue,
9+
TimeoutError,
10+
create_task,
11+
gather,
12+
wait_for,
13+
)
614
from collections.abc import (
715
Iterable,
816
)
17+
from contextlib import (
18+
suppress,
19+
)
920
from typing import (
21+
NoReturn,
1022
Optional,
1123
)
1224

1325
from aio_pika import (
1426
connect,
1527
)
16-
from aio_pika.exceptions import (
17-
QueueEmpty,
18-
)
1928

2029
from minos.networks import (
2130
BrokerMessage,
@@ -36,36 +45,58 @@ class RabbitMQBrokerSubscriber(BrokerSubscriber):
3645
def __init__(
3746
self,
3847
topics: Iterable[str],
39-
host: str,
40-
port: int,
41-
group_id: Optional[str] = None,
42-
remove_topics_on_destroy: bool = False,
48+
host: Optional[str] = None,
49+
port: Optional[int] = None,
4350
**kwargs,
4451
):
4552
super().__init__(topics, **kwargs)
53+
54+
if host is None:
55+
host = "localhost"
56+
if port is None:
57+
port = 5672
58+
4659
self.host = host
4760
self.port = port
48-
self.group_id = group_id
4961

50-
self.remove_topics_on_destroy = remove_topics_on_destroy
62+
self.connection = None
63+
64+
self._run_task = None
65+
self._queue: Queue[bytes] = Queue(maxsize=1)
5166

5267
async def _setup(self) -> None:
5368
await super()._setup()
5469
self.connection = await connect(f"amqp://guest:guest@{self.host}:{self.port}/")
70+
await self._start_task()
5571

5672
async def _destroy(self) -> None:
73+
await self._stop_task()
5774
await self.connection.close()
5875

76+
async def _start_task(self):
77+
if self._run_task is None:
78+
self._run_task = create_task(self._run())
79+
80+
async def _stop_task(self):
81+
if self._run_task is not None:
82+
self._run_task.cancel()
83+
with suppress(TimeoutError, CancelledError):
84+
await wait_for(self._run_task, 0.5)
85+
self._run_task = None
86+
87+
async def _run(self) -> NoReturn:
88+
await gather(*(self._run_one(topic) for topic in self.topics))
89+
90+
async def _run_one(self, topic: str) -> None:
91+
channel = await self.connection.channel()
92+
queue = await channel.declare_queue(topic)
93+
async for message in queue.iterator():
94+
await self._queue.put(message.body)
95+
5996
async def _receive(self) -> BrokerMessage:
60-
for topic in self.topics:
61-
async with self.connection:
62-
channel = await self.connection.channel()
63-
queue = await channel.declare_queue(topic)
64-
try:
65-
message = await queue.get()
66-
return BrokerMessage.from_avro_bytes(message.body)
67-
except QueueEmpty:
68-
pass
97+
bytes_ = await self._queue.get()
98+
message = BrokerMessage.from_avro_bytes(bytes_)
99+
return message
69100

70101

71102
class RabbitMQBrokerSubscriberBuilder(BrokerSubscriberBuilder[RabbitMQBrokerSubscriber], RabbitMQBrokerBuilderMixin):

packages/plugins/minos-broker-rabbitmq/tests/test_rabbitmq/test_integration.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
from asyncio import (
2+
TimeoutError,
3+
wait_for,
4+
)
15
from unittest import (
26
IsolatedAsyncioTestCase,
37
)
@@ -29,4 +33,5 @@ async def test_one_topic(self):
2933

3034
async def test_empty_topic(self):
3135
async with RabbitMQBrokerSubscriber.from_config(CONFIG_FILE_PATH, topics={"empty_topic"}) as subscriber:
32-
await subscriber.receive()
36+
with self.assertRaises(TimeoutError):
37+
await wait_for(subscriber.receive(), 0.1)

packages/plugins/minos-broker-rabbitmq/tests/test_rabbitmq/test_publisher.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ class TestRabbitMQBrokerPublisher(unittest.IsolatedAsyncioTestCase):
2323
def test_is_subclass(self):
2424
self.assertTrue(issubclass(RabbitMQBrokerPublisher, BrokerPublisher))
2525

26+
def test_constructor(self):
27+
publisher = RabbitMQBrokerPublisher()
28+
self.assertEqual("localhost", publisher.host)
29+
self.assertEqual(5672, publisher.port)
30+
2631
def test_from_config(self):
2732
config = Config(CONFIG_FILE_PATH)
2833
broker_config = config.get_interface_by_name("broker")["common"]
@@ -33,21 +38,14 @@ def test_from_config(self):
3338
self.assertEqual(broker_config["host"], publisher.host)
3439
self.assertEqual(broker_config["port"], publisher.port)
3540

36-
@patch("minos.plugins.rabbitmq.publisher.connect")
37-
async def test_send(self, connect_mock):
38-
message = BrokerMessageV1("foo", BrokerMessageV1Payload("bar"))
39-
40-
async with RabbitMQBrokerPublisher.from_config(CONFIG_FILE_PATH) as publisher:
41-
await publisher.send(message)
42-
43-
self.assertEqual(1, connect_mock.call_count)
44-
45-
@patch("minos.plugins.rabbitmq.publisher.connect")
46-
async def test_destroy(self, destroy_mock):
41+
async def test_send(self):
4742
async with RabbitMQBrokerPublisher.from_config(CONFIG_FILE_PATH) as publisher:
48-
await publisher.destroy()
43+
with patch("aio_pika.Exchange.publish") as mock:
44+
await publisher.send(BrokerMessageV1("foo1", BrokerMessageV1Payload("bar")))
45+
await publisher.send(BrokerMessageV1("foo2", BrokerMessageV1Payload("bar")))
46+
await publisher.send(BrokerMessageV1("foo1", BrokerMessageV1Payload("bar")))
4947

50-
self.assertEqual(1, destroy_mock.call_count)
48+
self.assertEqual(3, mock.call_count)
5149

5250

5351
if __name__ == "__main__":

packages/plugins/minos-broker-rabbitmq/tests/test_rabbitmq/test_subscriber.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from minos.common import (
1010
Config,
11+
ModelType,
1112
)
1213
from minos.networks import (
1314
BrokerSubscriber,
@@ -18,31 +19,39 @@
1819
)
1920
from tests.utils import (
2021
CONFIG_FILE_PATH,
22+
FakeAsyncIterator,
2123
)
2224

23-
_ConsumerMessage = namedtuple("_ConsumerMessage", ["value"])
25+
_Foo = ModelType.build("_Foo", {"bar": str})
26+
_ConsumerMessage = namedtuple("_ConsumerMessage", ["body"])
2427

2528

2629
class TestRabbitMQBrokerSubscriber(unittest.IsolatedAsyncioTestCase):
2730
def test_is_subclass(self):
2831
self.assertTrue(issubclass(RabbitMQBrokerSubscriber, BrokerSubscriber))
2932

33+
def test_constructor(self):
34+
subscriber = RabbitMQBrokerSubscriber({"foo", "bar"})
35+
self.assertEqual({"foo", "bar"}, subscriber.topics)
36+
self.assertEqual("localhost", subscriber.host)
37+
self.assertEqual(5672, subscriber.port)
38+
3039
async def test_from_config(self):
3140
config = Config(CONFIG_FILE_PATH)
3241
broker_config = config.get_interface_by_name("broker")["common"]
3342
async with RabbitMQBrokerSubscriber.from_config(config, topics={"foo", "bar"}) as subscriber:
3443
self.assertEqual(broker_config["host"], subscriber.host)
3544
self.assertEqual(broker_config["port"], subscriber.port)
36-
self.assertEqual(False, subscriber.remove_topics_on_destroy)
3745
self.assertEqual({"foo", "bar"}, subscriber.topics)
3846

39-
@patch("minos.plugins.rabbitmq.subscriber.connect")
40-
@patch("minos.networks.BrokerMessage.from_avro_bytes")
41-
async def test_receive(self, connect_mock, mock_avro):
47+
async def test_receive(self):
48+
message = _ConsumerMessage(_Foo("foobar").avro_bytes)
49+
4250
async with RabbitMQBrokerSubscriber.from_config(CONFIG_FILE_PATH, topics={"foo", "bar"}) as subscriber:
43-
await subscriber.receive()
51+
with patch("aio_pika.Queue.iterator", return_value=FakeAsyncIterator([message])):
52+
observed = await subscriber.receive()
4453

45-
self.assertEqual(1, connect_mock.call_count)
54+
self.assertEqual(_Foo("foobar"), observed)
4655

4756

4857
class TestRabbitMQBrokerSubscriberBuilder(unittest.TestCase):

packages/plugins/minos-broker-rabbitmq/tests/utils.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,19 @@
44

55
BASE_PATH = Path(__file__).parent
66
CONFIG_FILE_PATH = BASE_PATH / "test_config.yml"
7+
8+
9+
class FakeAsyncIterator:
10+
"""For testing purposes."""
11+
12+
def __init__(self, seq):
13+
self.iter = iter(seq)
14+
15+
def __aiter__(self):
16+
return self
17+
18+
async def __anext__(self):
19+
try:
20+
return next(self.iter)
21+
except StopIteration:
22+
raise StopAsyncIteration

0 commit comments

Comments
 (0)