Skip to content

Commit c709cf2

Browse files
committed
- Fixed issue with empty queues.
issue #283
1 parent 71ac284 commit c709cf2

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

packages/plugins/minos-broker-rabbitmq/minos/plugins/rabbitmq/subscriber.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from aio_pika import (
1414
connect,
1515
)
16+
from aio_pika.exceptions import QueueEmpty
1617

1718
from minos.common import (
1819
MinosConfig, Config,
@@ -55,7 +56,7 @@ def _from_config(cls, config: MinosConfig, **kwargs) -> RabbitMQBrokerSubscriber
5556

5657
async def _setup(self) -> None:
5758
await super()._setup()
58-
self.connection = await connect(f"amqp://guest:guest@{self.broker_host}/")
59+
self.connection = await connect(f"amqp://guest:guest@{self.broker_host}:{self.broker_port}/")
5960

6061
async def _destroy(self) -> None:
6162
await self.connection.close()
@@ -65,8 +66,11 @@ async def _receive(self) -> BrokerMessage:
6566
async with self.connection:
6667
channel = await self.connection.channel()
6768
queue = await channel.declare_queue(topic)
68-
message = await queue.get(fail=False)
69-
return BrokerMessage.from_avro_bytes(message.body if message.body else None)
69+
try:
70+
message = await queue.get()
71+
return BrokerMessage.from_avro_bytes(message.body)
72+
except QueueEmpty:
73+
pass
7074

7175

7276
class RabbitMQBrokerSubscriberBuilder(BrokerSubscriberBuilder):

packages/plugins/minos-broker-rabbitmq/tests/test_rabbitmq/test_integration.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818

1919
class IntegrationTests(IsolatedAsyncioTestCase):
20-
async def test_integration(self):
20+
async def test_one_topic(self):
2121
message = BrokerMessageV1("foo", BrokerMessageV1Payload("bar"))
2222

2323
async with RabbitMQBrokerPublisher.from_config(CONFIG_FILE_PATH) as publisher:
@@ -27,3 +27,8 @@ async def test_integration(self):
2727
observed = await subscriber.receive()
2828

2929
self.assertEqual(message.content, observed.content)
30+
31+
async def test_empty_topic(self):
32+
async with RabbitMQBrokerSubscriber.from_config(CONFIG_FILE_PATH, topics={"empty_topic"}) as subscriber:
33+
observed = await subscriber.receive()
34+

0 commit comments

Comments
 (0)