-
Notifications
You must be signed in to change notification settings - Fork 203
Open
Description
Hi,
This is my dependency situation:
aio-pika==9.5.7
aiormq==6.9.2And I have the following code:
import aio_pika
async def _process_message(message: aio_pika.abc.AbstractIncomingMessage):
async with message.process():
try:
body = message.body.decode("utf-8")
# DO SOMETHING
await message.ack()
except Exception:
raise # message.process() should reject the message
connection = await aio_pika.connect_robust("[REDACTED]", loop=self.loop)
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
queue = await channel.declare_queue("test", durable=True)
async with queue.iterator(no_ack=True) as queue_iter:
async for message in queue_iter:
await _process_message(message)What I want to achieve is the following:
I would like to acknowledge the message myself only when the processing of the message is complete and not immediately on delivery. I am fine rejecting the message on a raised exception. The problem I have with the code above is that when I ack() the message it fails with: "Can't ack message with "no_ack" flag". Have I misunderstood the no_ack meaning? How can I manually acknowledge the message? I worked with RabbitMQ on other languages and framework and I think this request is sound. Am I correct?
Thanks in advance,
Davide
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels