diff --git a/README.md b/README.md index 706835f..99e4bdb 100644 --- a/README.md +++ b/README.md @@ -135,3 +135,26 @@ AioPikaBroker parameters: * `qos` - number of messages that worker can prefetch. * `declare_queues` - whether you want to declare queues even on client side. May be useful for message persistance. +* `declare_queues_kwargs` - see [Custom Queue Arguments](#custom-queue-arguments) for more details. + +## Custom Queue Arguments + +You can pass custom arguments to the underlying RabbitMQ queue declaration by using the `declare_queues_kwargs` parameter of `AioPikaBroker`. If you want to set specific queue arguments (such as RabbitMQ extensions or custom behaviors), provide them in the `arguments` dictionary inside `declare_queues_kwargs`. + +These arguments will be merged with the default arguments used by the broker (such as dead-lettering and priority settings). + +**Example:** + +```python +broker = AioPikaBroker( + declare_queues_kwargs={ + "arguments": { + "x-message-ttl": 60000, # Set message TTL to 60 seconds + "x-queue-type": "quorum", # Use quorum queue type + } + } +) +``` + +This will ensure that the queue is created with your custom arguments, in addition to the broker's defaults. + diff --git a/taskiq_aio_pika/broker.py b/taskiq_aio_pika/broker.py index 35422a9..a3f3939 100644 --- a/taskiq_aio_pika/broker.py +++ b/taskiq_aio_pika/broker.py @@ -187,7 +187,7 @@ async def declare_queues( self._dead_letter_queue_name, **self._declare_queues_kwargs, ) - args: "Dict[str, Any]" = { + args: Dict[str, Any] = { "x-dead-letter-exchange": "", "x-dead-letter-routing-key": self._dead_letter_queue_name, } @@ -195,8 +195,13 @@ async def declare_queues( args["x-max-priority"] = self._max_priority queue = await channel.declare_queue( self._queue_name, - arguments=args, - **self._declare_queues_kwargs, + **{ + **self._declare_queues_kwargs, + "arguments": { + **self._declare_queues_kwargs.get("arguments", {}), + **args, + }, + }, ) if self._delayed_message_exchange_plugin: await queue.bind( @@ -206,11 +211,14 @@ async def declare_queues( else: await channel.declare_queue( self._delay_queue_name, - arguments={ - "x-dead-letter-exchange": "", - "x-dead-letter-routing-key": self._queue_name, + **{ + **self._declare_queues_kwargs, + "arguments": { + **self._declare_queues_kwargs.get("arguments", {}), + "x-dead-letter-exchange": "", + "x-dead-letter-routing-key": self._queue_name, + }, }, - **self._declare_queues_kwargs, ) await queue.bind(