Skip to content

Commit 4f20f3c

Browse files
committed
cleanup
1 parent 2849646 commit 4f20f3c

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

packages/service-library/src/servicelib/rabbitmq/_client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ async def subscribe(
157157
message_handler: MessageHandler,
158158
*,
159159
exclusive_queue: bool = True,
160+
non_exclusive_queue_name: str | None = None,
160161
topics: list[str] | None = None,
161162
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
162163
unexpected_error_retry_delay_s: float = _DEFAULT_UNEXPECTED_ERROR_RETRY_DELAY_S,
@@ -167,6 +168,8 @@ async def subscribe(
167168
receive the incoming messages
168169
- exclusive_queue: False means that only one instance of this application will
169170
reveice the incoming message
171+
- non_exclusive_queue_name: if exclusive_queue is False, then this name will be used. If None
172+
it will use the exchange_name.
170173
171174
NOTE: ``message_ttl` is also a soft timeout: if the handler does not finish processing
172175
the message before this is reached the message will be redelivered!
@@ -192,7 +195,7 @@ async def subscribe(
192195
aio_pika.exceptions.ChannelPreconditionFailed: In case an existing exchange with
193196
different type is used
194197
Returns:
195-
queue name
198+
tuple of queue name and consumer tag mapping
196199
"""
197200

198201
assert self._channel_pool # nosec
@@ -224,6 +227,7 @@ async def subscribe(
224227
self.client_name,
225228
exchange_name,
226229
exclusive_queue=exclusive_queue,
230+
non_exclusive_queue_name=non_exclusive_queue_name,
227231
message_ttl=message_ttl,
228232
arguments={"x-dead-letter-exchange": delayed_exchange_name},
229233
)
@@ -243,6 +247,7 @@ async def subscribe(
243247
self.client_name,
244248
delayed_exchange_name,
245249
exclusive_queue=exclusive_queue,
250+
non_exclusive_queue_name=non_exclusive_queue_name,
246251
message_ttl=int(unexpected_error_retry_delay_s * 1000),
247252
arguments={"x-dead-letter-exchange": exchange.name},
248253
)
@@ -271,6 +276,7 @@ async def add_topics(
271276
self.client_name,
272277
exchange_name,
273278
exclusive_queue=True,
279+
non_exclusive_queue_name=None,
274280
arguments={
275281
"x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format(
276282
exchange_name=exchange_name
@@ -296,6 +302,7 @@ async def remove_topics(
296302
self.client_name,
297303
exchange_name,
298304
exclusive_queue=True,
305+
non_exclusive_queue_name=None,
299306
arguments={
300307
"x-dead-letter-exchange": _DELAYED_EXCHANGE_NAME.format(
301308
exchange_name=exchange_name

packages/service-library/src/servicelib/rabbitmq/_utils.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ async def declare_queue(
6868
exchange_name: str,
6969
*,
7070
exclusive_queue: bool,
71+
non_exclusive_queue_name: str | None,
7172
arguments: dict[str, Any] | None = None,
7273
message_ttl: NonNegativeInt = RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_MS,
7374
) -> aio_pika.abc.AbstractRobustQueue:
@@ -78,11 +79,11 @@ async def declare_queue(
7879
"durable": True,
7980
"exclusive": exclusive_queue,
8081
"arguments": default_arguments,
81-
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{exchange_name}_exclusive",
82+
"name": f"{get_rabbitmq_client_unique_name(client_name)}_{non_exclusive_queue_name or exchange_name}_exclusive",
8283
}
8384
if not exclusive_queue:
8485
# NOTE: setting a name will ensure multiple instance will take their data here
85-
queue_parameters |= {"name": exchange_name}
86+
queue_parameters |= {"name": non_exclusive_queue_name or exchange_name}
8687

8788
# NOTE: if below line raises something similar to ``ChannelPreconditionFailed: PRECONDITION_FAILED``
8889
# most likely someone changed the signature of the queues (parameters etc...)

0 commit comments

Comments
 (0)