@@ -19,6 +19,7 @@ class AioPikaBroker(AsyncBroker):
1919
2020 def __init__ ( # noqa: WPS211
2121 self ,
22+ url : Optional [str ] = None ,
2223 result_backend : Optional [AsyncResultBackend [_T ]] = None ,
2324 task_id_generator : Optional [Callable [[], str ]] = None ,
2425 qos : int = 10 ,
@@ -29,13 +30,39 @@ def __init__( # noqa: WPS211
2930 queue_name : str = "taskiq" ,
3031 declare_exchange : bool = True ,
3132 routing_key : str = "#" ,
32- * connection_args : Any ,
33+ exchange_type : ExchangeType = ExchangeType . TOPIC ,
3334 ** connection_kwargs : Any ,
3435 ) -> None :
36+ """
37+ Construct a new broker.
38+
39+ :param url: url to rabbitmq. If None,
40+ the default "amqp://guest:guest@localhost:5672" is used.
41+ :param result_backend: custom result backend.
42+
43+ :param task_id_generator: custom task_id genertaor.
44+ :param qos: number of messages that worker can prefetch.
45+ :param loop: specific even loop.
46+ :param max_channel_pool_size: maximum number of channels for each connection.
47+ :param max_connection_pool_size: maximum number of connections in pool.
48+ :param exchange_name: name of exchange that used to send messages.
49+ :param queue_name: queue that used to get incoming messages.
50+ :param declare_exchange: whether you want to declare new exchange
51+ if it doesn't exist.
52+ :param routing_key: that used to bind that queue to the exchange.
53+ :param exchange_type: type of the exchange.
54+ Used only if `declare_exchange` is True.
55+ :param connection_kwargs: additional keyword arguments,
56+ for connect_robust method of aio-pika.
57+ """
3558 super ().__init__ (result_backend , task_id_generator )
3659
3760 async def _get_rmq_connection () -> AbstractRobustConnection :
38- return await connect_robust (* connection_args , ** connection_kwargs )
61+ return await connect_robust (
62+ url ,
63+ loop = loop ,
64+ ** connection_kwargs ,
65+ )
3966
4067 self ._connection_pool : Pool [AbstractRobustConnection ] = Pool (
4168 _get_rmq_connection ,
@@ -54,6 +81,7 @@ async def get_channel() -> AbstractChannel:
5481 )
5582
5683 self ._exchange_name = exchange_name
84+ self ._exchange_type = exchange_type
5785 self ._qos = qos
5886 self ._declare_exchange = declare_exchange
5987 self ._queue_name = queue_name
@@ -65,7 +93,7 @@ async def startup(self) -> None:
6593 if self ._declare_exchange :
6694 exchange = await channel .declare_exchange (
6795 self ._exchange_name ,
68- type = ExchangeType . TOPIC ,
96+ type = self . _exchange_type ,
6997 )
7098 else :
7199 exchange = await channel .get_exchange (self ._exchange_name , ensure = False )
0 commit comments