Skip to content

Commit c07a5a9

Browse files
committed
Add support for quorum queues and max_attempts_at_message
1 parent b6a8be3 commit c07a5a9

File tree

4 files changed

+261
-24
lines changed

4 files changed

+261
-24
lines changed

README.md

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,32 @@ async def main():
116116

117117
```
118118

119+
## Queue Types and Message Reliability
120+
121+
AioPikaBroker supports both classic and quorum queues. Quorum queues are a more modern queue type in RabbitMQ that provides better reliability and data safety guarantees.
122+
123+
```python
124+
from taskiq_aio_pika import AioPikaBroker, QueueType
125+
126+
broker = AioPikaBroker(
127+
queue_type=QueueType.QUORUM, # Use quorum queues for better reliability
128+
max_attempts_at_message=3 # Limit redelivery attempts
129+
)
130+
```
131+
132+
### Message Redelivery Control
133+
134+
When message processing fails due to consumer crashes (e.g. due to an OOM condition resulting in a SIGKILL), network issues, or other infrastructure problems, before the consumer has had the chance to acknowledge, positively or negatively, the message (and schedule a retry via taskiq's retry middleware), RabbitMQ will requeue the message to the front of the queue and it will be redelivered. With quorum queues, you can control how many times such a message will be redelivered:
135+
136+
- Set `max_attempts_at_message` to limit delivery attempts.
137+
- Set `max_attempts_at_message=None` for unlimited attempts.
138+
- This operates at the message delivery level, not application retry level. For application-level retries in case of exceptions that can be caught (e.g., temporary API failures), use taskiq's retry middleware instead.
139+
- After max attempts, the message is logged and discarded.
140+
- `max_attempts_at_message` requires using quorum queues (`queue_type=QueueType.QUORUM`).
141+
142+
This is particularly useful for preventing infinite loops of redeliveries of messages that consistently cause the consumer to crash ([poison messages](https://www.rabbitmq.com/docs/quorum-queues#poison-message-handling)) and can cause the queue to backup.
143+
144+
119145
## Configuration
120146

121147
AioPikaBroker parameters:
@@ -125,13 +151,12 @@ AioPikaBroker parameters:
125151
* `exchange_name` - name of exchange that used to send messages.
126152
* `exchange_type` - type of the exchange. Used only if `declare_exchange` is True.
127153
* `queue_name` - queue that used to get incoming messages.
154+
* `queue_type` - type of RabbitMQ queue to use: `classic` or `quorum`. defaults to `classic`.
128155
* `routing_key` - that used to bind that queue to the exchange.
129156
* `declare_exchange` - whether you want to declare new exchange if it doesn't exist.
130157
* `max_priority` - maximum priority for messages.
131-
* `delay_queue_name` - custom delay queue name.
132-
This queue is used to deliver messages with delays.
133-
* `dead_letter_queue_name` - custom dead letter queue name.
134-
This queue is used to receive negatively acknowleged messages from the main queue.
158+
* `delay_queue_name` - custom delay queue name. This queue is used to deliver messages with delays.
159+
* `dead_letter_queue_name` - custom dead letter queue name. This queue is used to receive negatively acknowleged messages from the main queue.
135160
* `qos` - number of messages that worker can prefetch.
136-
* `declare_queues` - whether you want to declare queues even on
137-
client side. May be useful for message persistance.
161+
* `declare_queues` - whether you want to declare queues even on client side. May be useful for message persistance.
162+
* `max_attempts_at_message` - maximum number of attempts at processing the same message. requires the queue type to be set to `QueueType.QUORUM`. defaults to `20` for quorum queues and to `None` for classic queues. is not the same as task retries. pass `None` for unlimited attempts.

taskiq_aio_pika/broker.py

Lines changed: 103 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,40 @@
11
import asyncio
2+
import copy
23
from datetime import timedelta
4+
from enum import Enum
35
from logging import getLogger
4-
from typing import Any, AsyncGenerator, Callable, Dict, Optional, TypeVar
6+
from typing import (
7+
Any,
8+
AsyncGenerator,
9+
Callable,
10+
Dict,
11+
Literal,
12+
Optional,
13+
TypeVar,
14+
Union,
15+
)
516

617
from aio_pika import DeliveryMode, ExchangeType, Message, connect_robust
718
from aio_pika.abc import AbstractChannel, AbstractQueue, AbstractRobustConnection
8-
from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage
19+
from taskiq import (
20+
AsyncBroker,
21+
AsyncResultBackend,
22+
BrokerMessage,
23+
)
24+
from taskiq.message import AckableNackableWrappedMessageWithMetadata, MessageMetadata
925

1026
_T = TypeVar("_T")
1127

1228
logger = getLogger("taskiq.aio_pika_broker")
1329

1430

31+
class QueueType(Enum):
32+
"""Type of RabbitMQ queue."""
33+
34+
CLASSIC = "classic"
35+
QUORUM = "quorum"
36+
37+
1538
def parse_val(
1639
parse_func: Callable[[str], _T],
1740
target: Optional[str] = None,
@@ -35,7 +58,7 @@ def parse_val(
3558
class AioPikaBroker(AsyncBroker):
3659
"""Broker that works with RabbitMQ."""
3760

38-
def __init__(
61+
def __init__( # noqa: PLR0912
3962
self,
4063
url: Optional[str] = None,
4164
result_backend: Optional[AsyncResultBackend[_T]] = None,
@@ -44,6 +67,7 @@ def __init__(
4467
loop: Optional[asyncio.AbstractEventLoop] = None,
4568
exchange_name: str = "taskiq",
4669
queue_name: str = "taskiq",
70+
queue_type: QueueType = QueueType.CLASSIC,
4771
dead_letter_queue_name: Optional[str] = None,
4872
delay_queue_name: Optional[str] = None,
4973
declare_exchange: bool = True,
@@ -54,6 +78,7 @@ def __init__(
5478
delayed_message_exchange_plugin: bool = False,
5579
declare_exchange_kwargs: Optional[Dict[Any, Any]] = None,
5680
declare_queues_kwargs: Optional[Dict[Any, Any]] = None,
81+
max_attempts_at_message: Union[Optional[int], Literal["default"]] = "default",
5782
**connection_kwargs: Any,
5883
) -> None:
5984
"""
@@ -62,12 +87,13 @@ def __init__(
6287
:param url: url to rabbitmq. If None,
6388
the default "amqp://guest:guest@localhost:5672" is used.
6489
:param result_backend: custom result backend.
65-
6690
:param task_id_generator: custom task_id genertaor.
6791
:param qos: number of messages that worker can prefetch.
6892
:param loop: specific even loop.
6993
:param exchange_name: name of exchange that used to send messages.
7094
:param queue_name: queue that used to get incoming messages.
95+
:param queue_type: type of RabbitMQ queue to use: `classic` or `quorum`.
96+
defaults to `classic`.
7197
:param dead_letter_queue_name: custom name for dead-letter queue.
7298
by default it set to {queue_name}.dead_letter.
7399
:param delay_queue_name: custom name for queue that used to
@@ -86,6 +112,11 @@ def __init__(
86112
:param declare_queues_kwargs: additional from AbstractChannel.declare_queue
87113
:param connection_kwargs: additional keyword arguments,
88114
for connect_robust method of aio-pika.
115+
:param max_attempts_at_message: maximum number of attempts at processing
116+
the same message. requires the queue type to be set to `QueueType.QUORUM`.
117+
defaults to `20` for quorum queues and to `None` for classic queues.
118+
is not the same as task retries. pass `None` for unlimited attempts.
119+
:raises ValueError: if inappropriate arguments were passed.
89120
"""
90121
super().__init__(result_backend, task_id_generator)
91122

@@ -104,6 +135,52 @@ def __init__(
104135
self._max_priority = max_priority
105136
self._delayed_message_exchange_plugin = delayed_message_exchange_plugin
106137

138+
if self._declare_queues_kwargs.get("arguments", {}).get(
139+
"x-queue-type",
140+
) or self._declare_queues_kwargs.get("arguments", {}).get("x-delivery-limit"):
141+
raise ValueError(
142+
"Use the `queue_type` and `max_attempts_at_message` parameters of "
143+
"`AioPikaBroker.__init__` instead of `x-queue-type` and "
144+
"`x-delivery-limit`",
145+
)
146+
if queue_type == QueueType.QUORUM:
147+
self._declare_queues_kwargs.setdefault("arguments", {})[
148+
"x-queue-type"
149+
] = "quorum"
150+
self._declare_queues_kwargs["durable"] = True
151+
else:
152+
self._declare_queues_kwargs.setdefault("arguments", {})[
153+
"x-queue-type"
154+
] = "classic"
155+
156+
if queue_type != QueueType.QUORUM and max_attempts_at_message not in (
157+
"default",
158+
None,
159+
):
160+
raise ValueError(
161+
"`max_attempts_at_message` requires `queue_type` to be set to "
162+
"`QueueType.QUORUM`.",
163+
)
164+
165+
if max_attempts_at_message == "default":
166+
if queue_type == QueueType.QUORUM:
167+
self.max_attempts_at_message = 20
168+
else:
169+
self.max_attempts_at_message = None
170+
else:
171+
self.max_attempts_at_message = max_attempts_at_message
172+
173+
if queue_type == QueueType.QUORUM:
174+
if self.max_attempts_at_message is None:
175+
# no limit
176+
self._declare_queues_kwargs["arguments"]["x-delivery-limit"] = "-1"
177+
else:
178+
# the final attempt will be handled in `taskiq.Receiver`
179+
# to generate visible logs
180+
self._declare_queues_kwargs["arguments"]["x-delivery-limit"] = (
181+
self.max_attempts_at_message + 1
182+
)
183+
107184
self._dead_letter_queue_name = f"{queue_name}.dead_letter"
108185
if dead_letter_queue_name:
109186
self._dead_letter_queue_name = dead_letter_queue_name
@@ -183,9 +260,15 @@ async def declare_queues(
183260
:param channel: channel to used for declaration.
184261
:return: main queue instance.
185262
"""
263+
declare_queues_kwargs_ex_arguments = copy.copy(self._declare_queues_kwargs)
264+
declare_queue_arguments = declare_queues_kwargs_ex_arguments.pop(
265+
"arguments",
266+
{},
267+
)
186268
await channel.declare_queue(
187269
self._dead_letter_queue_name,
188-
**self._declare_queues_kwargs,
270+
**declare_queues_kwargs_ex_arguments,
271+
arguments=declare_queue_arguments,
189272
)
190273
args: "Dict[str, Any]" = {
191274
"x-dead-letter-exchange": "",
@@ -195,8 +278,8 @@ async def declare_queues(
195278
args["x-max-priority"] = self._max_priority
196279
queue = await channel.declare_queue(
197280
self._queue_name,
198-
arguments=args,
199-
**self._declare_queues_kwargs,
281+
arguments=args | declare_queue_arguments,
282+
**declare_queues_kwargs_ex_arguments,
200283
)
201284
if self._delayed_message_exchange_plugin:
202285
await queue.bind(
@@ -209,8 +292,9 @@ async def declare_queues(
209292
arguments={
210293
"x-dead-letter-exchange": "",
211294
"x-dead-letter-routing-key": self._queue_name,
212-
},
213-
**self._declare_queues_kwargs,
295+
}
296+
| declare_queue_arguments,
297+
**declare_queues_kwargs_ex_arguments,
214298
)
215299

216300
await queue.bind(
@@ -275,7 +359,9 @@ async def kick(self, message: BrokerMessage) -> None:
275359
routing_key=self._delay_queue_name,
276360
)
277361

278-
async def listen(self) -> AsyncGenerator[AckableMessage, None]:
362+
async def listen(
363+
self,
364+
) -> AsyncGenerator[AckableNackableWrappedMessageWithMetadata, None]:
279365
"""
280366
Listen to queue.
281367
@@ -291,7 +377,12 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
291377
queue = await self.declare_queues(self.read_channel)
292378
async with queue.iterator() as iterator:
293379
async for message in iterator:
294-
yield AckableMessage(
295-
data=message.body,
380+
delivery_count: Optional[int] = message.headers.get("x-delivery-count") # type: ignore[assignment]
381+
yield AckableNackableWrappedMessageWithMetadata(
382+
message=message.body,
383+
metadata=MessageMetadata(
384+
delivery_count=delivery_count,
385+
),
296386
ack=message.ack,
387+
nack=message.nack,
297388
)

tests/conftest.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from contextlib import suppress
23
from typing import AsyncGenerator
34
from uuid import uuid4
45

@@ -229,3 +230,13 @@ async def broker_with_delayed_message_plugin(
229230
if_empty=False,
230231
if_unused=False,
231232
)
233+
234+
235+
@pytest.fixture(autouse=True, scope="function")
236+
async def cleanup_rabbitmq(test_channel: Channel) -> AsyncGenerator[None, None]:
237+
yield
238+
239+
for queue_name in ["taskiq", "taskiq.dead_letter", "taskiq.delay"]:
240+
with suppress(Exception):
241+
queue = await test_channel.get_queue(queue_name, ensure=False)
242+
await queue.delete(if_unused=False, if_empty=False)

0 commit comments

Comments
 (0)