Skip to content

Allow passing extra params to declare exchange and declare queues (rebased on develop) #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions taskiq_aio_pika/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __init__( # noqa: WPS211
exchange_type: ExchangeType = ExchangeType.TOPIC,
max_priority: Optional[int] = None,
delayed_message_exchange_plugin: bool = False,
declare_exchange_kwargs: Optional[Dict] = None,
declare_queues_kwargs: Optional[Dict] = None,
**connection_kwargs: Any,
) -> None:
"""
Expand Down Expand Up @@ -80,6 +82,8 @@ def __init__( # noqa: WPS211
:param max_priority: maximum priority value for messages.
:param delayed_message_exchange_plugin: turn on or disable
delayed-message-exchange rabbitmq plugin.
:param declare_exchange_kwargs additional from AbstractChannel.declare_exchange
:param declare_queues_kwargs additional from AbstractChannel.declare_queue
:param connection_kwargs: additional keyword arguments,
for connect_robust method of aio-pika.
"""
Expand All @@ -92,7 +96,9 @@ def __init__( # noqa: WPS211
self._exchange_type = exchange_type
self._qos = qos
self._declare_exchange = declare_exchange
self._declare_exchange_kwargs = declare_exchange_kwargs or {}
self._declare_queues = declare_queues
self._declare_queues_kwargs = declare_queues_kwargs or {}
self._queue_name = queue_name
self._routing_key = routing_key
self._max_priority = max_priority
Expand Down Expand Up @@ -135,6 +141,7 @@ async def startup(self) -> None: # noqa: WPS217
await self.write_channel.declare_exchange(
self._exchange_name,
type=self._exchange_type,
**self._declare_exchange_kwargs,
)

if self._delayed_message_exchange_plugin:
Expand Down Expand Up @@ -178,6 +185,7 @@ async def declare_queues(
"""
await channel.declare_queue(
self._dead_letter_queue_name,
**self._declare_queues_kwargs,
)
args: "Dict[str, Any]" = {
"x-dead-letter-exchange": "",
Expand All @@ -188,6 +196,7 @@ async def declare_queues(
queue = await channel.declare_queue(
self._queue_name,
arguments=args,
**self._declare_queues_kwargs,
)
if self._delayed_message_exchange_plugin:
await queue.bind(
Expand All @@ -201,6 +210,7 @@ async def declare_queues(
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": self._queue_name,
},
**self._declare_queues_kwargs,
)

await queue.bind(
Expand Down
Loading