Skip to content

Commit c179375

Browse files
committed
Add new classes that describe parameters for kafka producer and consumer cuz AIOKafkaProducer and AIOKafkaConsumer must be initialize in async method or with passed loop parameter.
1 parent 9f3ee00 commit c179375

File tree

6 files changed

+130
-24
lines changed

6 files changed

+130
-24
lines changed

poetry.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ packages = [{ include = "taskiq_aio_kafka" }]
2727
python = "^3.7"
2828
taskiq = "^0"
2929
aiokafka = "^0.8.0"
30+
pydantic = "^1.10.7"
3031

3132
[tool.poetry.group.dev.dependencies]
3233
pytest = "^7.1.2"

taskiq_aio_kafka/broker.py

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import asyncio
22
import pickle # noqa: S403
33
from logging import getLogger
4-
from typing import AsyncGenerator, Callable, List, Optional, Set, TypeVar, Union
4+
from typing import Any, AsyncGenerator, Callable, List, Optional, Set, TypeVar, Union
55

66
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
77
from kafka.admin import KafkaAdminClient, NewTopic
88
from taskiq import AsyncResultBackend, BrokerMessage
99
from taskiq.abc.broker import AsyncBroker
1010

1111
from taskiq_aio_kafka.exceptions import WrongAioKafkaBrokerParametersError
12+
from taskiq_aio_kafka.models import KafkaConsumerParameters, KafkaProducerParameters
1213

1314
_T = TypeVar("_T") # noqa: WPS111
1415

@@ -86,9 +87,13 @@ def __init__( # noqa: WPS211
8687
replication_factor=1,
8788
)
8889

89-
self._aiokafka_producer: Optional[AIOKafkaProducer] = aiokafka_producer
90+
self._aiokafka_producer_params: KafkaProducerParameters = (
91+
KafkaProducerParameters()
92+
)
9093

91-
self._aiokafka_consumer: Optional[AIOKafkaConsumer] = aiokafka_consumer
94+
self._aiokafka_consumer_params: KafkaConsumerParameters = (
95+
KafkaConsumerParameters()
96+
)
9297

9398
self._kafka_admin_client: KafkaAdminClient = (
9499
kafka_admin_client
@@ -105,6 +110,30 @@ def __init__( # noqa: WPS211
105110
self._is_producer_started = False
106111
self._is_consumer_started = False
107112

113+
def configure_producer(self, **producer_parameters: Any) -> None:
114+
"""Configure kafka producer.
115+
116+
You can pass here any configuration parameters
117+
accepted by the kafka producer.
118+
119+
:param producer_parameters: producer parameters kwargs.
120+
"""
121+
self._aiokafka_producer_params = KafkaProducerParameters(
122+
**producer_parameters,
123+
)
124+
125+
def configure_consumer(self, **consumer_parameters: Any) -> None:
126+
"""Configure kafka consumer.
127+
128+
You can pass here any configuration parameters
129+
accepted by the kafka consumer.
130+
131+
:param consumer_parameters: consumer parameters kwargs.
132+
"""
133+
self._aiokafka_consumer_params = KafkaConsumerParameters(
134+
**consumer_parameters,
135+
)
136+
108137
async def startup(self) -> None:
109138
"""Setup AIOKafkaProducer, AIOKafkaConsumer and kafka topics.
110139
@@ -114,30 +143,28 @@ async def startup(self) -> None:
114143
if there are no producer and consumer passed.
115144
"""
116145
await super().startup()
117-
118-
is_topic_available: bool = bool(
119-
self._kafka_admin_client.describe_topics([self._kafka_topic.name]),
146+
available_condition: bool = (
147+
self._kafka_topic.name not in self._kafka_admin_client.list_topics()
120148
)
121-
if not is_topic_available:
149+
if available_condition:
122150
self._kafka_admin_client.create_topics(
123151
new_topics=[self._kafka_topic],
124152
validate_only=False,
125153
)
154+
self._aiokafka_producer = AIOKafkaProducer(
155+
bootstrap_servers=self._bootstrap_servers,
156+
loop=self._loop,
157+
**self._aiokafka_producer_params.dict(),
158+
)
159+
await self._aiokafka_producer.start()
126160

127-
if not self._aiokafka_producer:
128-
self._aiokafka_producer = AIOKafkaProducer(
161+
if self.is_worker_process:
162+
self._aiokafka_consumer = AIOKafkaConsumer(
163+
self._kafka_topic.name,
129164
bootstrap_servers=self._bootstrap_servers,
130165
loop=self._loop,
166+
**self._aiokafka_consumer_params.dict(),
131167
)
132-
await self._aiokafka_producer.start()
133-
134-
if self.is_worker_process:
135-
if not self._aiokafka_consumer:
136-
self._aiokafka_consumer = AIOKafkaConsumer(
137-
self._kafka_topic.name,
138-
bootstrap_servers=self._bootstrap_servers,
139-
loop=self._loop,
140-
)
141168

142169
await self._aiokafka_consumer.start()
143170
self._is_consumer_started = True
@@ -148,10 +175,10 @@ async def shutdown(self) -> None:
148175
"""Close all connections on shutdown."""
149176
await super().shutdown()
150177

151-
if self._aiokafka_producer:
178+
if self._is_producer_started:
152179
await self._aiokafka_producer.stop()
153180

154-
if self._aiokafka_consumer:
181+
if self._is_consumer_started:
155182
await self._aiokafka_consumer.stop()
156183

157184
topic_delete_condition: bool = all(

taskiq_aio_kafka/models.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
from typing import Any, Callable, Optional, Union
2+
3+
from aiokafka import __version__
4+
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
5+
from kafka.partitioner.default import DefaultPartitioner
6+
from pydantic import BaseModel
7+
8+
9+
class KafkaProducerParameters(BaseModel):
10+
"""Parameters to kafka producer."""
11+
12+
client_id: Optional[str] = None
13+
metadata_max_age_ms: int = 300000
14+
request_timeout_ms: int = 40000
15+
api_version: str = "auto"
16+
acks: Optional[Union[str, int]] = 1
17+
key_serializer: Optional[Callable[..., bytes]] = None
18+
value_serializer: Optional[Callable[..., bytes]] = None
19+
compression_type: Optional[str] = None
20+
max_batch_size: int = 16384
21+
partitioner: Callable[..., Any] = DefaultPartitioner()
22+
max_request_size: int = 1048576
23+
linger_ms: int = 0
24+
send_backoff_ms: int = 100
25+
retry_backoff_ms: int = 100
26+
security_protocol: str = "PLAINTEXT"
27+
ssl_context: Optional[Any] = None
28+
connections_max_idle_ms: int = 540000
29+
enable_idempotence: bool = False
30+
transactional_id: Optional[Any] = None
31+
transaction_timeout_ms: int = 60000
32+
sasl_mechanism: str = "PLAIN"
33+
sasl_plain_password: Optional[str] = None
34+
sasl_plain_username: Optional[str] = None
35+
sasl_kerberos_service_name: Any = "kafka"
36+
sasl_kerberos_domain_name: Any = None
37+
sasl_oauth_token_provider: Any = None
38+
39+
40+
class KafkaConsumerParameters(BaseModel):
41+
"""Parameters to kafka consumer."""
42+
43+
client_id: str = "aiokafka-" + __version__ # noqa: WPS336
44+
group_id: Optional[str] = None
45+
key_deserializer: Optional[Callable[..., Any]] = None
46+
value_deserializer: Optional[Callable[..., Any]] = None
47+
fetch_max_wait_ms: int = 500
48+
fetch_max_bytes: int = 52428800
49+
fetch_min_bytes: int = 1
50+
max_partition_fetch_bytes: int = 1 * 1024 * 1024 # noqa: WPS345
51+
request_timeout_ms: int = 40 * 1000 # noqa: WPS432
52+
retry_backoff_ms: int = 100
53+
auto_offset_reset: str = "latest"
54+
enable_auto_commit: bool = True
55+
auto_commit_interval_ms: int = 5000
56+
check_crcs: bool = True
57+
metadata_max_age_ms: int = 5 * 60 * 1000
58+
partition_assignment_strategy: Any = (RoundRobinPartitionAssignor,)
59+
max_poll_interval_ms: int = 300000
60+
rebalance_timeout_ms: Optional[int] = None
61+
session_timeout_ms: int = 10000
62+
heartbeat_interval_ms: int = 3000
63+
consumer_timeout_ms: int = 200
64+
max_poll_records: Optional[int] = None
65+
ssl_context: Optional[Any] = None
66+
security_protocol: str = "PLAINTEXT"
67+
api_version: str = "auto"
68+
exclude_internal_topics: bool = True
69+
connections_max_idle_ms: int = 540000
70+
isolation_level: str = "read_uncommitted"
71+
sasl_mechanism: str = "PLAIN"
72+
sasl_plain_password: Optional[str] = None
73+
sasl_plain_username: Optional[str] = None
74+
sasl_kerberos_service_name: Optional[str] = "kafka"
75+
sasl_kerberos_domain_name: Optional[str] = None
76+
sasl_oauth_token_provider: Optional[str] = None

tests/conftest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ async def broker(
115115
kafka_url: str,
116116
test_kafka_producer: AIOKafkaProducer,
117117
test_kafka_consumer: AIOKafkaConsumer,
118+
base_topic: NewTopic,
118119
) -> AsyncGenerator[AioKafkaBroker, None]:
119120
"""Yield new broker instance.
120121
@@ -125,6 +126,7 @@ async def broker(
125126
:param kafka_url: url to kafka.
126127
:param test_kafka_producer: custom AIOKafkaProducer.
127128
:param test_kafka_consumer: custom AIOKafkaConsumer.
129+
:param base_topic: base topic.
128130
129131
:yields: broker.
130132
"""
@@ -133,6 +135,7 @@ async def broker(
133135
aiokafka_producer=test_kafka_producer,
134136
aiokafka_consumer=test_kafka_consumer,
135137
delete_topic_on_shutdown=True,
138+
kafka_topic=base_topic,
136139
)
137140
broker.is_worker_process = True
138141

tests/test_broker.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ async def test_startup(
8080
async def test_listen(
8181
broker: AioKafkaBroker,
8282
test_kafka_producer: AIOKafkaProducer,
83-
base_topic_name: str,
8483
) -> None:
8584
"""Test that message are read correctly.
8685
@@ -89,8 +88,8 @@ async def test_listen(
8988
9089
:param broker: current broker.
9190
:param test_kafka_producer: AIOKafkaProducer.
92-
:param base_topic_name: topic name.
9391
"""
92+
await test_kafka_producer.start()
9493
task_id: str = uuid4().hex
9594
task_name: str = uuid4().hex
9695
message: bytes = pickle.dumps(uuid4().hex)
@@ -104,7 +103,7 @@ async def test_listen(
104103
)
105104

106105
await test_kafka_producer.send(
107-
topic=base_topic_name,
106+
topic=broker._kafka_topic.name,
108107
value=message_to_send.message,
109108
)
110109

0 commit comments

Comments
 (0)