Skip to content

Commit a60bfc3

Browse files
committed
changed syntax of subscribe
1 parent fe04c2e commit a60bfc3

File tree

4 files changed

+77
-45
lines changed

4 files changed

+77
-45
lines changed

packages/service-library/src/servicelib/rabbitmq/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,16 @@
88
RPCNotInitializedError,
99
RPCServerError,
1010
)
11-
from ._models import ConsumerTag, QueueName
11+
from ._models import ConsumerTag, ExchangeName, QueueName
1212
from ._rpc_router import RPCRouter
1313
from ._utils import is_rabbitmq_responsive, wait_till_rabbitmq_responsive
1414

1515
__all__: tuple[str, ...] = (
1616
"BIND_TO_ALL_TOPICS",
17+
"ConsumerTag",
18+
"ExchangeName",
1719
"is_rabbitmq_responsive",
20+
"QueueName",
1821
"RabbitMQClient",
1922
"RabbitMQRPCClient",
2023
"RemoteMethodNotRegisteredError",
@@ -24,8 +27,6 @@
2427
"RPCRouter",
2528
"RPCServerError",
2629
"wait_till_rabbitmq_responsive",
27-
"QueueName",
28-
"ConsumerTag",
2930
)
3031

3132
# nopycln: file

packages/service-library/src/servicelib/rabbitmq/_client.py

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,14 @@
1010

1111
from ..logging_utils import log_catch, log_context
1212
from ._client_base import RabbitMQClientBase
13-
from ._models import ConsumerTag, MessageHandler, QueueName, RabbitMessage
13+
from ._models import (
14+
ConsumerTag,
15+
ExchangeName,
16+
MessageHandler,
17+
QueueName,
18+
RabbitMessage,
19+
TopicName,
20+
)
1421
from ._utils import (
1522
RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
1623
declare_queue,
@@ -27,7 +34,7 @@
2734
_DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S: Final[float] = 1
2835
_DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS: Final[NonNegativeInt] = 15
2936

30-
_DELAYED_EXCHANGE_NAME: Final[str] = "delayed_{exchange_name}"
37+
_DELAYED_EXCHANGE_NAME: Final[ExchangeName] = ExchangeName("delayed_{exchange_name}")
3138

3239

3340
def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int:
@@ -139,12 +146,14 @@ async def _get_channel(self) -> aio_pika.abc.AbstractChannel:
139146
channel.close_callbacks.add(self._channel_close_callback)
140147
return channel
141148

142-
async def _get_consumer_tag(self, exchange_name) -> str:
143-
return f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}_{uuid4()}"
149+
async def _create_consumer_tag(self, exchange_name) -> ConsumerTag:
150+
return ConsumerTag(
151+
f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}_{uuid4()}"
152+
)
144153

145154
async def subscribe(
146155
self,
147-
exchange_name: str,
156+
exchange_name: ExchangeName,
148157
message_handler: MessageHandler,
149158
*,
150159
exclusive_queue: bool = True,
@@ -239,20 +248,19 @@ async def subscribe(
239248
)
240249
await delayed_queue.bind(delayed_exchange)
241250

242-
consumer_tag = await self._get_consumer_tag(exchange_name)
251+
consumer_tag = await self._create_consumer_tag(exchange_name)
243252
await queue.consume(
244253
partial(_on_message, message_handler, unexpected_error_max_attempts),
245254
exclusive=exclusive_queue,
246255
consumer_tag=consumer_tag,
247256
)
248-
output: str = queue.name
249-
return output, consumer_tag
257+
return queue.name, consumer_tag
250258

251259
async def add_topics(
252260
self,
253-
exchange_name: str,
261+
exchange_name: ExchangeName,
254262
*,
255-
topics: list[str],
263+
topics: list[TopicName],
256264
) -> None:
257265
assert self._channel_pool # nosec
258266

@@ -276,9 +284,9 @@ async def add_topics(
276284

277285
async def remove_topics(
278286
self,
279-
exchange_name: str,
287+
exchange_name: ExchangeName,
280288
*,
281-
topics: list[str],
289+
topics: list[TopicName],
282290
) -> None:
283291
assert self._channel_pool # nosec
284292
async with self._channel_pool.acquire() as channel:
@@ -305,14 +313,20 @@ async def unsubscribe(
305313
) -> None:
306314
"""This will delete the queue if there are no consumers left"""
307315
assert self._connection_pool # nosec
308-
if not self._connection_pool.is_closed:
309-
assert self._channel_pool # nosec
310-
async with self._channel_pool.acquire() as channel:
311-
queue = await channel.get_queue(queue_name)
312-
# NOTE: we force delete here
313-
await queue.delete(if_unused=False, if_empty=False)
316+
if self._connection_pool.is_closed:
317+
_logger.warning(
318+
"Connection to RabbitMQ is already closed, skipping unsubscribe from queue..."
319+
)
320+
return
321+
assert self._channel_pool # nosec
322+
async with self._channel_pool.acquire() as channel:
323+
queue = await channel.get_queue(queue_name)
324+
# NOTE: we force delete here
325+
await queue.delete(if_unused=False, if_empty=False)
314326

315-
async def publish(self, exchange_name: str, message: RabbitMessage) -> None:
327+
async def publish(
328+
self, exchange_name: ExchangeName, message: RabbitMessage
329+
) -> None:
316330
"""publish message in the exchange exchange_name.
317331
specifying a topic will use a TOPIC type of RabbitMQ Exchange instead of FANOUT
318332
@@ -342,9 +356,13 @@ async def unsubscribe_consumer(
342356
) -> None:
343357
"""This will only remove the consumers without deleting the queue"""
344358
assert self._connection_pool # nosec
345-
if not self._connection_pool.is_closed:
346-
assert self._channel_pool # nosec
347-
async with self._channel_pool.acquire() as channel:
348-
assert isinstance(channel, aio_pika.RobustChannel) # nosec
349-
queue = await channel.get_queue(queue_name)
350-
await queue.cancel(consumer_tag)
359+
if self._connection_pool.is_closed:
360+
_logger.warning(
361+
"Connection to RabbitMQ is already closed, skipping unsubscribe consumers from queue..."
362+
)
363+
return
364+
assert self._channel_pool # nosec
365+
async with self._channel_pool.acquire() as channel:
366+
assert isinstance(channel, aio_pika.RobustChannel) # nosec
367+
queue = await channel.get_queue(queue_name)
368+
await queue.cancel(consumer_tag)

packages/service-library/src/servicelib/rabbitmq/_models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111

1212
MessageHandler = Callable[[Any], Awaitable[bool]]
1313

14+
ExchangeName: TypeAlias = str
1415
QueueName: TypeAlias = str
1516
ConsumerTag: TypeAlias = str
17+
TopicName: TypeAlias = str
1618

1719

1820
class RabbitMessage(Protocol):

packages/service-library/tests/rabbitmq/test_rabbitmq.py

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@
1515
import pytest
1616
from faker import Faker
1717
from pytest_mock.plugin import MockerFixture
18-
from servicelib.rabbitmq import BIND_TO_ALL_TOPICS, RabbitMQClient, _client
18+
from servicelib.rabbitmq import (
19+
BIND_TO_ALL_TOPICS,
20+
ConsumerTag,
21+
QueueName,
22+
RabbitMQClient,
23+
_client,
24+
)
1925
from servicelib.rabbitmq._client import _DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS
2026
from settings_library.rabbit import RabbitSettings
2127
from tenacity.asyncio import AsyncRetrying
@@ -325,7 +331,7 @@ async def test_publish_with_no_registered_subscriber(
325331
ttl_s: float = 0.1
326332
topics_count: int = 1 if topics is None else len(topics)
327333

328-
async def _publish_random_message():
334+
async def _publish_random_message() -> None:
329335
if topics is None:
330336
message = random_rabbit_message()
331337
await publisher.publish(exchange_name, message)
@@ -335,8 +341,8 @@ async def _publish_random_message():
335341
message = random_rabbit_message(topic=topic)
336342
await publisher.publish(exchange_name, message)
337343

338-
async def _subscribe_consumer_to_queue():
339-
await consumer.subscribe(
344+
async def _subscribe_consumer_to_queue() -> tuple[QueueName, ConsumerTag]:
345+
return await consumer.subscribe(
340346
exchange_name,
341347
mocked_message_parser,
342348
topics=topics,
@@ -346,27 +352,30 @@ async def _subscribe_consumer_to_queue():
346352
unexpected_error_retry_delay_s=ttl_s,
347353
)
348354

349-
async def _unsubscribe_consumer():
350-
await consumer.unsubscribe_consumer(exchange_name)
355+
async def _unsubscribe_consumer(
356+
queue_name: QueueName, consumer_tag: ConsumerTag
357+
) -> None:
358+
await consumer.unsubscribe_consumer(queue_name, consumer_tag)
351359

352360
# CASE 1 (subscribe immediately after publishing message)
353361

354-
await _subscribe_consumer_to_queue()
355-
await _unsubscribe_consumer()
362+
consumer_1 = await _subscribe_consumer_to_queue()
363+
await _unsubscribe_consumer(*consumer_1)
356364
await _publish_random_message()
357365
# reconnect immediately
358-
await _subscribe_consumer_to_queue()
366+
consumer_2 = await _subscribe_consumer_to_queue()
359367
# expected to receive a message (one per topic)
360368
await _assert_wait_for_messages(on_message_spy, 1 * topics_count)
361369

362370
# CASE 2 (no subscriber attached when publishing)
363371
on_message_spy.reset_mock()
364372

365-
await _unsubscribe_consumer()
373+
await _unsubscribe_consumer(*consumer_2)
366374
await _publish_random_message()
367375
# wait for message to expire (will be dropped)
368376
await asyncio.sleep(ttl_s * 2)
369-
await _subscribe_consumer_to_queue()
377+
_consumer_3 = await _subscribe_consumer_to_queue()
378+
370379
# wait for a message to be possibly delivered
371380
await asyncio.sleep(ttl_s * 2)
372381
# nothing changed from before
@@ -604,7 +613,7 @@ async def test_rabbit_pub_sub_bind_and_unbind_topics(
604613
)
605614

606615
# we should get no messages since no one was subscribed
607-
queue_name = await consumer.subscribe(
616+
queue_name, consumer_tag = await consumer.subscribe(
608617
exchange_name, mocked_message_parser, topics=[]
609618
)
610619
await _assert_message_received(mocked_message_parser, 0)
@@ -666,7 +675,7 @@ async def test_rabbit_adding_topics_to_a_fanout_exchange(
666675
message = random_rabbit_message()
667676
publisher = create_rabbitmq_client("publisher")
668677
consumer = create_rabbitmq_client("consumer")
669-
queue_name = await consumer.subscribe(exchange_name, mocked_message_parser)
678+
queue_name, _ = await consumer.subscribe(exchange_name, mocked_message_parser)
670679
await publisher.publish(exchange_name, message)
671680
await _assert_message_received(mocked_message_parser, 1, message)
672681
mocked_message_parser.reset_mock()
@@ -709,10 +718,12 @@ async def test_unsubscribe_consumer(
709718
):
710719
exchange_name = f"{random_exchange_name()}"
711720
client = create_rabbitmq_client("consumer")
712-
await client.subscribe(exchange_name, mocked_message_parser, exclusive_queue=False)
721+
queue_name, consumer_tag = await client.subscribe(
722+
exchange_name, mocked_message_parser, exclusive_queue=False
723+
)
713724
# Unsubsribe just a consumer, the queue will be still there
714-
await client.unsubscribe_consumer(exchange_name)
725+
await client.unsubscribe_consumer(queue_name, consumer_tag)
715726
# Unsubsribe the queue
716-
await client.unsubscribe(exchange_name)
727+
await client.unsubscribe(queue_name)
717728
with pytest.raises(aio_pika.exceptions.ChannelNotFoundEntity):
718-
await client.unsubscribe(exchange_name)
729+
await client.unsubscribe(queue_name)

0 commit comments

Comments
 (0)