11import asyncio
22import pickle # noqa: S403
33from logging import getLogger
4- from typing import AsyncGenerator , Callable , List , Optional , Set , TypeVar , Union
4+ from typing import Any , AsyncGenerator , Callable , List , Optional , Set , TypeVar , Union
55
66from aiokafka import AIOKafkaConsumer , AIOKafkaProducer
77from kafka .admin import KafkaAdminClient , NewTopic
88from taskiq import AsyncResultBackend , BrokerMessage
99from taskiq .abc .broker import AsyncBroker
1010
1111from taskiq_aio_kafka .exceptions import WrongAioKafkaBrokerParametersError
12+ from taskiq_aio_kafka .models import KafkaConsumerParameters , KafkaProducerParameters
1213
1314_T = TypeVar ("_T" ) # noqa: WPS111
1415
@@ -86,21 +87,12 @@ def __init__( # noqa: WPS211
8687 replication_factor = 1 ,
8788 )
8889
89- self ._aiokafka_producer : AIOKafkaProducer = (
90- aiokafka_producer
91- or AIOKafkaProducer (
92- bootstrap_servers = self ._bootstrap_servers ,
93- loop = self ._loop ,
94- )
90+ self ._aiokafka_producer_params : KafkaProducerParameters = (
91+ KafkaProducerParameters ()
9592 )
9693
97- self ._aiokafka_consumer : AIOKafkaConsumer = (
98- aiokafka_consumer
99- or AIOKafkaConsumer (
100- self ._kafka_topic .name ,
101- bootstrap_servers = self ._bootstrap_servers ,
102- loop = self ._loop ,
103- )
94+ self ._aiokafka_consumer_params : KafkaConsumerParameters = (
95+ KafkaConsumerParameters ()
10496 )
10597
10698 self ._kafka_admin_client : KafkaAdminClient = (
@@ -118,6 +110,30 @@ def __init__( # noqa: WPS211
118110 self ._is_producer_started = False
119111 self ._is_consumer_started = False
120112
113+ def configure_producer (self , ** producer_parameters : Any ) -> None :
114+ """Configure kafka producer.
115+
116+ You can pass here any configuration parameters
117+ accepted by the kafka producer.
118+
119+ :param producer_parameters: producer parameters kwargs.
120+ """
121+ self ._aiokafka_producer_params = KafkaProducerParameters (
122+ ** producer_parameters ,
123+ )
124+
125+ def configure_consumer (self , ** consumer_parameters : Any ) -> None :
126+ """Configure kafka consumer.
127+
128+ You can pass here any configuration parameters
129+ accepted by the kafka consumer.
130+
131+ :param consumer_parameters: consumer parameters kwargs.
132+ """
133+ self ._aiokafka_consumer_params = KafkaConsumerParameters (
134+ ** consumer_parameters ,
135+ )
136+
121137 async def startup (self ) -> None :
122138 """Setup AIOKafkaProducer, AIOKafkaConsumer and kafka topics.
123139
@@ -127,18 +143,29 @@ async def startup(self) -> None:
127143 if there are no producer and consumer passed.
128144 """
129145 await super ().startup ()
130-
131- is_topic_available : bool = bool (
132- self ._kafka_admin_client .describe_topics ([self ._kafka_topic .name ]),
146+ available_condition : bool = (
147+ self ._kafka_topic .name not in self ._kafka_admin_client .list_topics ()
133148 )
134- if not is_topic_available :
149+ if available_condition :
135150 self ._kafka_admin_client .create_topics (
136151 new_topics = [self ._kafka_topic ],
137152 validate_only = False ,
138153 )
139-
154+ self ._aiokafka_producer = AIOKafkaProducer (
155+ bootstrap_servers = self ._bootstrap_servers ,
156+ loop = self ._loop ,
157+ ** self ._aiokafka_producer_params .dict (),
158+ )
140159 await self ._aiokafka_producer .start ()
160+
141161 if self .is_worker_process :
162+ self ._aiokafka_consumer = AIOKafkaConsumer (
163+ self ._kafka_topic .name ,
164+ bootstrap_servers = self ._bootstrap_servers ,
165+ loop = self ._loop ,
166+ ** self ._aiokafka_consumer_params .dict (),
167+ )
168+
142169 await self ._aiokafka_consumer .start ()
143170 self ._is_consumer_started = True
144171
@@ -148,10 +175,10 @@ async def shutdown(self) -> None:
148175 """Close all connections on shutdown."""
149176 await super ().shutdown ()
150177
151- if self ._aiokafka_producer :
178+ if self ._is_producer_started :
152179 await self ._aiokafka_producer .stop ()
153180
154- if self ._aiokafka_consumer :
181+ if self ._is_consumer_started :
155182 await self ._aiokafka_consumer .stop ()
156183
157184 topic_delete_condition : bool = all (
@@ -183,12 +210,11 @@ async def kick(self, message: BrokerMessage) -> None:
183210 if not self ._is_producer_started :
184211 raise ValueError ("Please run startup before kicking." )
185212
186- kafka_message : bytes = pickle .dumps (message )
187213 topic_name : str = self ._kafka_topic .name
188214
189- await self ._aiokafka_producer .send (
215+ await self ._aiokafka_producer .send ( # type: ignore
190216 topic = topic_name ,
191- value = kafka_message ,
217+ value = message . message ,
192218 )
193219
194220 async def listen (
@@ -205,5 +231,5 @@ async def listen(
205231 if not self ._is_consumer_started :
206232 raise ValueError ("Please run startup before listening." )
207233
208- async for raw_kafka_message in self ._aiokafka_consumer :
234+ async for raw_kafka_message in self ._aiokafka_consumer : # type: ignore
209235 yield raw_kafka_message .value
0 commit comments