Skip to content

Commit 62c1083

Browse files
committed
@matusdrobuliak66 review: added option to define the queue name
1 parent 70fb780 commit 62c1083

File tree

3 files changed

+45
-17
lines changed

3 files changed

+45
-17
lines changed

packages/service-library/src/servicelib/rabbitmq/_client.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
_DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS: Final[NonNegativeInt] = 15
3636

3737
_DELAYED_EXCHANGE_NAME: Final[ExchangeName] = ExchangeName("delayed_{exchange_name}")
38+
_DELAYED_QUEUE_NAME: Final[ExchangeName] = ExchangeName("delayed_{queue_name}")
3839

3940

4041
def _get_x_death_count(message: aio_pika.abc.AbstractIncomingMessage) -> int:
@@ -225,9 +226,8 @@ async def subscribe(
225226
queue = await declare_queue(
226227
channel,
227228
self.client_name,
228-
exchange_name,
229+
non_exclusive_queue_name or exchange_name,
229230
exclusive_queue=exclusive_queue,
230-
non_exclusive_queue_name=non_exclusive_queue_name,
231231
message_ttl=message_ttl,
232232
arguments={"x-dead-letter-exchange": delayed_exchange_name},
233233
)
@@ -241,13 +241,15 @@ async def subscribe(
241241
delayed_exchange = await channel.declare_exchange(
242242
delayed_exchange_name, aio_pika.ExchangeType.FANOUT, durable=True
243243
)
244+
delayed_queue_name = _DELAYED_QUEUE_NAME.format(
245+
queue_name=non_exclusive_queue_name or exchange_name
246+
)
244247

245248
delayed_queue = await declare_queue(
246249
channel,
247250
self.client_name,
248-
delayed_exchange_name,
251+
delayed_queue_name,
249252
exclusive_queue=exclusive_queue,
250-
non_exclusive_queue_name=non_exclusive_queue_name,
251253
message_ttl=int(unexpected_error_retry_delay_s * 1000),
252254
arguments={"x-dead-letter-exchange": exchange.name},
253255
)
@@ -276,7 +278,6 @@ async def add_topics(
276278
self.client_name,
277279
exchange_name,
278280
exclusive_queue=True,
279-
non_exclusive_queue_name=None,
280281
arguments={
281282
"x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format(
282283
exchange_name=exchange_name
@@ -302,7 +303,6 @@ async def remove_topics(
302303
self.client_name,
303304
exchange_name,
304305
exclusive_queue=True,
305-
non_exclusive_queue_name=None,
306306
arguments={
307307
"x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format(
308308
exchange_name=exchange_name

packages/service-library/src/servicelib/rabbitmq/_utils.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from tenacity.wait import wait_fixed
1414

1515
from ..logging_utils import log_context
16+
from ._models import QueueName
1617

1718
_logger = logging.getLogger(__file__)
1819

@@ -65,10 +66,9 @@ def get_rabbitmq_client_unique_name(base_name: str) -> str:
6566
async def declare_queue(
6667
channel: aio_pika.RobustChannel,
6768
client_name: str,
68-
exchange_name: str,
69+
queue_name: QueueName,
6970
*,
7071
exclusive_queue: bool,
71-
non_exclusive_queue_name: str | None,
7272
arguments: dict[str, Any] | None = None,
7373
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
7474
) -> aio_pika.abc.AbstractRobustQueue:
@@ -79,11 +79,11 @@ async def declare_queue(
7979
"durable": True,
8080
"exclusive": exclusive_queue,
8181
"arguments": default_arguments,
82-
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{non_exclusive_queue_name or exchange_name}_exclusive",
82+
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{queue_name}_exclusive",
8383
}
8484
if not exclusive_queue:
8585
# NOTE: setting a name will ensure multiple instance will take their data here
86-
queue_parameters |= {"name": non_exclusive_queue_name or exchange_name}
86+
queue_parameters |= {"name": queue_name}
8787

8888
# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
8989
# most likely someone changed the signature of the queues (parameters etc...)

packages/service-library/tests/rabbitmq/test_rabbitmq.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77

88
import asyncio
9-
from collections.abc import Awaitable, Callable
9+
from collections.abc import AsyncIterator, Awaitable, Callable
1010
from dataclasses import dataclass
1111
from typing import Any, Final
1212
from unittest import mock
@@ -464,9 +464,7 @@ def _raise_once_then_true(*args, **kwargs):
464464
if _raise_once_then_true.calls == 1:
465465
msg = "this is a test!"
466466
raise KeyError(msg)
467-
if _raise_once_then_true.calls == 2:
468-
return False
469-
return True
467+
return _raise_once_then_true.calls != 2
470468

471469
exchange_name = random_exchange_name()
472470
_raise_once_then_true.calls = 0
@@ -476,13 +474,31 @@ def _raise_once_then_true(*args, **kwargs):
476474
await _assert_message_received(mocked_message_parser, 3, message)
477475

478476

477+
@pytest.fixture
478+
async def ensure_queue_deletion(
479+
create_rabbitmq_client: Callable[[str], RabbitMQClient]
480+
) -> AsyncIterator[Callable[[QueueName], None]]:
481+
created_queues = set()
482+
483+
def _(queue_name: QueueName) -> None:
484+
created_queues.add(queue_name)
485+
486+
yield _
487+
488+
client = create_rabbitmq_client("ensure_queue_deletion")
489+
await asyncio.gather(*(client.unsubscribe(q) for q in created_queues))
490+
491+
492+
@pytest.mark.parametrize("defined_queue_name", [None, "pytest-queue"])
479493
@pytest.mark.parametrize("num_subs", [10])
480494
async def test_pub_sub_with_non_exclusive_queue(
481495
create_rabbitmq_client: Callable[[str], RabbitMQClient],
482496
random_exchange_name: Callable[[], str],
483497
mocker: MockerFixture,
484498
random_rabbit_message: Callable[..., PytestRabbitMessage],
485499
num_subs: int,
500+
defined_queue_name: QueueName | None,
501+
ensure_queue_deletion: Callable[[QueueName], None],
486502
):
487503
consumers = (create_rabbitmq_client(f"consumer_{n}") for n in range(num_subs))
488504
mocked_message_parsers = [
@@ -492,13 +508,25 @@ async def test_pub_sub_with_non_exclusive_queue(
492508
publisher = create_rabbitmq_client("publisher")
493509
message = random_rabbit_message()
494510
exchange_name = random_exchange_name()
495-
await asyncio.gather(
511+
list_queue_name_consumer_mappings = await asyncio.gather(
496512
*(
497-
consumer.subscribe(exchange_name, parser, exclusive_queue=False)
513+
consumer.subscribe(
514+
exchange_name,
515+
parser,
516+
exclusive_queue=False,
517+
non_exclusive_queue_name=defined_queue_name,
518+
)
498519
for consumer, parser in zip(consumers, mocked_message_parsers, strict=True)
499520
)
500521
)
501-
522+
for queue_name, _ in list_queue_name_consumer_mappings:
523+
assert (
524+
queue_name == exchange_name
525+
if defined_queue_name is None
526+
else defined_queue_name
527+
)
528+
ensure_queue_deletion(queue_name)
529+
ensure_queue_deletion(f"delayed_{queue_name}")
502530
await publisher.publish(exchange_name, message)
503531
# only one consumer should have gotten the message here and the others not
504532
async for attempt in AsyncRetrying(

0 commit comments

Comments
 (0)