Skip to content

Commit 9f3ee00

Browse files
committed
Move AIOKafkaProducer and AIOKafkaConsumer initialization to startup
1 parent a8f6548 commit 9f3ee00

File tree

1 file changed

+17
-17
lines changed

1 file changed

+17
-17
lines changed

taskiq_aio_kafka/broker.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -86,22 +86,9 @@ def __init__( # noqa: WPS211
8686
replication_factor=1,
8787
)
8888

89-
self._aiokafka_producer: AIOKafkaProducer = (
90-
aiokafka_producer
91-
or AIOKafkaProducer(
92-
bootstrap_servers=self._bootstrap_servers,
93-
loop=self._loop,
94-
)
95-
)
89+
self._aiokafka_producer: Optional[AIOKafkaProducer] = aiokafka_producer
9690

97-
self._aiokafka_consumer: AIOKafkaConsumer = (
98-
aiokafka_consumer
99-
or AIOKafkaConsumer(
100-
self._kafka_topic.name,
101-
bootstrap_servers=self._bootstrap_servers,
102-
loop=self._loop,
103-
)
104-
)
91+
self._aiokafka_consumer: Optional[AIOKafkaConsumer] = aiokafka_consumer
10592

10693
self._kafka_admin_client: KafkaAdminClient = (
10794
kafka_admin_client
@@ -137,8 +124,21 @@ async def startup(self) -> None:
137124
validate_only=False,
138125
)
139126

127+
if not self._aiokafka_producer:
128+
self._aiokafka_producer = AIOKafkaProducer(
129+
bootstrap_servers=self._bootstrap_servers,
130+
loop=self._loop,
131+
)
140132
await self._aiokafka_producer.start()
133+
141134
if self.is_worker_process:
135+
if not self._aiokafka_consumer:
136+
self._aiokafka_consumer = AIOKafkaConsumer(
137+
self._kafka_topic.name,
138+
bootstrap_servers=self._bootstrap_servers,
139+
loop=self._loop,
140+
)
141+
142142
await self._aiokafka_consumer.start()
143143
self._is_consumer_started = True
144144

@@ -185,7 +185,7 @@ async def kick(self, message: BrokerMessage) -> None:
185185

186186
topic_name: str = self._kafka_topic.name
187187

188-
await self._aiokafka_producer.send(
188+
await self._aiokafka_producer.send( # type: ignore
189189
topic=topic_name,
190190
value=message.message,
191191
)
@@ -204,5 +204,5 @@ async def listen(
204204
if not self._is_consumer_started:
205205
raise ValueError("Please run startup before listening.")
206206

207-
async for raw_kafka_message in self._aiokafka_consumer:
207+
async for raw_kafka_message in self._aiokafka_consumer: # type: ignore
208208
yield raw_kafka_message.value

0 commit comments

Comments
 (0)