Skip to content

Commit 66631f5

Browse files
author
David Romaschenko
committed
feat: added support for both versions
1 parent 137492a commit 66631f5

File tree

4 files changed

+20
-8
lines changed

4 files changed

+20
-8
lines changed

poetry.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ packages = [{ include = "taskiq_aio_kafka" }]
2727
python = "^3.9"
2828
taskiq = "^0"
2929
aiokafka = "^0.10.0"
30-
pydantic = "^2.7.4"
3130
kafka-python = "^2.0.2"
3231

3332
[tool.poetry.group.dev.dependencies]

taskiq_aio_kafka/broker.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
66
from kafka.admin import KafkaAdminClient, NewTopic
7+
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
8+
from kafka.partitioner.default import DefaultPartitioner
79
from taskiq import AsyncResultBackend, BrokerMessage
810
from taskiq.abc.broker import AsyncBroker
11+
from taskiq.compat import model_dump
912

1013
from taskiq_aio_kafka.exceptions import WrongAioKafkaBrokerParametersError
1114
from taskiq_aio_kafka.models import KafkaConsumerParameters, KafkaProducerParameters
@@ -146,19 +149,31 @@ async def startup(self) -> None:
146149
new_topics=[self._kafka_topic],
147150
validate_only=False,
148151
)
152+
153+
partitioner = self._aiokafka_producer_params.partitioner or DefaultPartitioner()
154+
producer_kwargs = model_dump(self._aiokafka_producer_params)
155+
producer_kwargs["partitioner"] = partitioner
149156
self._aiokafka_producer = AIOKafkaProducer(
150157
bootstrap_servers=self._bootstrap_servers,
151158
loop=self._loop,
152-
**self._aiokafka_producer_params.model_dump(),
159+
**producer_kwargs,
153160
)
154161
await self._aiokafka_producer.start()
155162

156163
if self.is_worker_process:
164+
partition_assignment_strategy = (
165+
self._aiokafka_consumer_params.partition_assignment_strategy
166+
or (RoundRobinPartitionAssignor,)
167+
)
168+
consumer_kwargs = model_dump(self._aiokafka_consumer_params)
169+
consumer_kwargs["partition_assignment_strategy"] = (
170+
partition_assignment_strategy
171+
)
157172
self._aiokafka_consumer = AIOKafkaConsumer(
158173
self._kafka_topic.name,
159174
bootstrap_servers=self._bootstrap_servers,
160175
loop=self._loop,
161-
**self._aiokafka_consumer_params.model_dump(),
176+
**consumer_kwargs,
162177
)
163178

164179
await self._aiokafka_consumer.start()

taskiq_aio_kafka/models.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from typing import Any, Callable, Optional, Union
22

33
from aiokafka import __version__
4-
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
5-
from kafka.partitioner.default import DefaultPartitioner
64
from pydantic import BaseModel
75

86

@@ -18,7 +16,7 @@ class KafkaProducerParameters(BaseModel):
1816
value_serializer: Optional[Callable[..., bytes]] = None
1917
compression_type: Optional[str] = None
2018
max_batch_size: int = 16384
21-
partitioner: Callable[..., Any] = DefaultPartitioner()
19+
partitioner: Optional[Callable[..., Any]] = None
2220
max_request_size: int = 1048576
2321
linger_ms: int = 0
2422
send_backoff_ms: int = 100
@@ -55,7 +53,7 @@ class KafkaConsumerParameters(BaseModel):
5553
auto_commit_interval_ms: int = 5000
5654
check_crcs: bool = True
5755
metadata_max_age_ms: int = 5 * 60 * 1000
58-
partition_assignment_strategy: Any = (RoundRobinPartitionAssignor,)
56+
partition_assignment_strategy: Any = None
5957
max_poll_interval_ms: int = 300000
6058
rebalance_timeout_ms: Optional[int] = None
6159
session_timeout_ms: int = 10000

0 commit comments

Comments
 (0)