Skip to content

Commit 7e3b4ca

Browse files
committed
Change broker and tests
1 parent a8fbf68 commit 7e3b4ca

File tree

5 files changed

+72
-163
lines changed

5 files changed

+72
-163
lines changed

poetry.lock

Lines changed: 5 additions & 57 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

taskiq_aio_kafka/broker.py

Lines changed: 56 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -68,18 +68,6 @@ def __init__( # noqa: WPS211
6868
"""
6969
super().__init__(result_backend, task_id_generator)
7070

71-
self._bootstrap_servers: Optional[Union[str, List[str]]] = bootstrap_servers
72-
self._kafka_topic: Optional[NewTopic] = kafka_topic
73-
self._aiokafka_producer: Optional[AIOKafkaProducer] = aiokafka_producer
74-
self._aiokafka_consumer: Optional[AIOKafkaConsumer] = aiokafka_consumer
75-
self._kafka_admin_client: Optional[KafkaAdminClient] = kafka_admin_client
76-
self._loop: Optional[asyncio.AbstractEventLoop] = loop
77-
78-
self._default_kafka_topic: str = "taskiq_topic"
79-
self._delete_topic_on_shutdown: bool = delete_topic_on_shutdown
80-
81-
self._delay_kick_tasks: Set[asyncio.Task[None]] = set()
82-
8371
if (aiokafka_producer or aiokafka_consumer) and not bootstrap_servers:
8472
raise WrongAioKafkaBrokerParametersError(
8573
(
@@ -88,52 +76,67 @@ def __init__( # noqa: WPS211
8876
),
8977
)
9078

91-
async def startup(self) -> None: # noqa: C901
92-
"""Setup AIOKafkaProducer, AIOKafkaConsumer and kafka topics.
93-
94-
We will have 2 topics for default and high priority.
79+
self._bootstrap_servers: Optional[Union[str, List[str]]] = bootstrap_servers
9580

96-
Also we need to create AIOKafkaProducer and AIOKafkaConsumer
97-
if there are no producer and consumer passed.
98-
"""
99-
await super().startup()
81+
self._kafka_topic: NewTopic = kafka_topic or NewTopic(
82+
name="taskiq_topic",
83+
num_partitions=1,
84+
replication_factor=1,
85+
)
10086

101-
if not self._aiokafka_producer:
102-
self._aiokafka_producer = AIOKafkaProducer(
87+
self._aiokafka_producer: AIOKafkaProducer = (
88+
aiokafka_producer
89+
or AIOKafkaProducer(
10390
bootstrap_servers=self._bootstrap_servers,
10491
loop=self._loop,
10592
)
93+
)
10694

107-
if not self._kafka_topic:
108-
self._kafka_topic: NewTopic = NewTopic( # type: ignore
109-
name=self._default_kafka_topic,
110-
num_partitions=1,
111-
replication_factor=1,
95+
self._aiokafka_consumer: AIOKafkaConsumer = (
96+
aiokafka_consumer
97+
or AIOKafkaConsumer(
98+
self._kafka_topic.name,
99+
bootstrap_servers=self._bootstrap_servers,
100+
loop=self._loop,
112101
)
102+
)
113103

114-
if not self._kafka_admin_client:
115-
self._kafka_admin_client: KafkaAdminClient = ( # type: ignore
116-
KafkaAdminClient(
117-
bootstrap_servers=self._bootstrap_servers,
118-
client_id="kafka-python-taskiq",
119-
)
104+
self._kafka_admin_client: KafkaAdminClient = (
105+
kafka_admin_client
106+
or KafkaAdminClient(
107+
bootstrap_servers=self._bootstrap_servers,
108+
client_id="kafka-python-taskiq",
120109
)
110+
)
111+
self._loop: Optional[asyncio.AbstractEventLoop] = loop
112+
113+
self._delete_topic_on_shutdown: bool = delete_topic_on_shutdown
114+
115+
self._delay_kick_tasks: Set[asyncio.Task[None]] = set()
116+
117+
self._is_started = False
118+
119+
async def startup(self) -> None: # noqa: C901
120+
"""Setup AIOKafkaProducer, AIOKafkaConsumer and kafka topics.
121+
122+
We will have 2 topics for default and high priority.
123+
124+
Also we need to create AIOKafkaProducer and AIOKafkaConsumer
125+
if there are no producer and consumer passed.
126+
"""
127+
await super().startup()
121128

122129
if self._kafka_topic.name not in self._kafka_admin_client.list_topics():
123130
self._kafka_admin_client.create_topics(
124131
new_topics=[self._kafka_topic],
125132
validate_only=False,
126133
)
127134

128-
if not self._aiokafka_consumer:
129-
self._aiokafka_consumer = AIOKafkaConsumer(
130-
self._kafka_topic.name,
131-
bootstrap_servers=self._bootstrap_servers,
132-
loop=self._loop,
133-
)
134-
135135
await self._aiokafka_producer.start()
136-
await self._aiokafka_consumer.start()
136+
if self.is_worker_process:
137+
await self._aiokafka_consumer.start()
138+
139+
self._is_started = True
137140

138141
async def shutdown(self) -> None:
139142
"""Close all connections on shutdown."""
@@ -171,34 +174,20 @@ async def kick(self, message: BrokerMessage) -> None:
171174
:raises ValueError: if startup wasn't called.
172175
:param message: message to send.
173176
"""
174-
if not self._aiokafka_producer:
175-
raise ValueError("Specify aiokafka_producer or run startup before kicking.")
177+
if not self._is_started:
178+
raise ValueError("Please run startup before kicking.")
176179

177180
kafka_message: bytes = pickle.dumps(message)
178-
topic_name: str = self._kafka_topic.name # type: ignore
181+
topic_name: str = self._kafka_topic.name
179182

180-
delay = parse_val(int, message.labels.get("delay"))
181-
if delay is None:
182-
await self._aiokafka_producer.send( # type: ignore
183-
topic=topic_name,
184-
value=kafka_message,
185-
)
186-
else:
187-
delay_kick_task = asyncio.create_task(
188-
self._delay_kick(
189-
kafka_message=kafka_message,
190-
topic_name=topic_name,
191-
delay=delay,
192-
),
193-
)
194-
self._delay_kick_tasks.add(delay_kick_task)
195-
delay_kick_task.add_done_callback(
196-
self._delay_kick_tasks.discard,
197-
)
183+
await self._aiokafka_producer.send(
184+
topic=topic_name,
185+
value=kafka_message,
186+
)
198187

199188
async def listen(
200189
self,
201-
) -> AsyncGenerator[BrokerMessage, None]:
190+
) -> AsyncGenerator[bytes, None]:
202191
"""Listen to topic.
203192
204193
This function starts listen to topic and
@@ -207,36 +196,8 @@ async def listen(
207196
:yields: parsed broker message.
208197
:raises ValueError: if no aiokafka_consumer or startup wasn't called.
209198
"""
210-
if not self._aiokafka_consumer:
211-
raise ValueError("Specify aiokafka_consumer or run startup before kicking.")
212-
async for raw_kafka_message in self._aiokafka_consumer:
213-
try:
214-
broker_message: BrokerMessage = pickle.loads( # noqa: S301
215-
raw_kafka_message.value,
216-
)
217-
except (TypeError, ValueError) as exc:
218-
logger.warning(
219-
"Cannot parse message from Kafka %s",
220-
exc,
221-
exc_info=True,
222-
)
223-
continue
224-
yield broker_message
199+
if not self._is_started:
200+
raise ValueError("Please run startup before listening.")
225201

226-
async def _delay_kick(
227-
self,
228-
kafka_message: bytes,
229-
topic_name: str,
230-
delay: float,
231-
) -> None:
232-
"""Send message to the topic after delay time.
233-
234-
:param kafka_message: message to send.
235-
:param topic_name: name of the topic.
236-
:param delay: delay before kicking in seconds.
237-
"""
238-
await asyncio.sleep(delay=delay)
239-
await self._aiokafka_producer.send( # type: ignore
240-
topic=topic_name,
241-
value=kafka_message,
242-
)
202+
async for raw_kafka_message in self._aiokafka_consumer:
203+
yield raw_kafka_message.value

taskiq_aio_kafka/exceptions.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
class BaseTaskiqKafkaError(Exception):
2-
"""Base class for all TaskiqKafka errors."""
1+
from taskiq.exceptions import BrokerError
32

43

5-
class WrongAioKafkaBrokerParametersError(BaseTaskiqKafkaError):
4+
class WrongAioKafkaBrokerParametersError(BrokerError):
65
"""Error if Producer or Consumer is specified but no bootstrap_servers."""

tests/conftest.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ async def broker_without_arguments(
103103
broker = AioKafkaBroker(
104104
bootstrap_servers=kafka_url,
105105
)
106-
broker._default_kafka_topic = base_topic_name # noqa: WPS437
107106
broker.is_worker_process = True
108107

109108
await broker.startup()

tests/test_broker.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from taskiq_aio_kafka.broker import AioKafkaBroker
1111

1212

13-
async def get_first_task(broker: AioKafkaBroker) -> BrokerMessage: # type: ignore
13+
async def get_first_task(broker: AioKafkaBroker) -> bytes: # type: ignore
1414
"""Get first message from the topic.
1515
1616
:param broker: async message broker.
@@ -36,7 +36,7 @@ async def test_kick_success(broker: AioKafkaBroker) -> None:
3636
message_to_send = BrokerMessage(
3737
task_id=task_id,
3838
task_name=task_name,
39-
message="my_msg",
39+
message=pickle.dumps("my_msg"),
4040
labels={
4141
"label1": "val1",
4242
},
@@ -91,7 +91,7 @@ async def test_listen(
9191
"""
9292
task_id: str = uuid4().hex
9393
task_name: str = uuid4().hex
94-
message: str = uuid4().hex
94+
message: bytes = pickle.dumps(uuid4().hex)
9595
labels: Dict[str, str] = {"test_label": "123"}
9696

9797
await test_kafka_producer.send(
@@ -106,9 +106,11 @@ async def test_listen(
106106
),
107107
)
108108

109-
broker_message: BrokerMessage = await asyncio.wait_for(
110-
get_first_task(broker),
111-
timeout=1,
109+
broker_message: BrokerMessage = pickle.loads( # noqa: S301
110+
await asyncio.wait_for(
111+
get_first_task(broker),
112+
timeout=1,
113+
),
112114
)
113115

114116
assert broker_message.message == message
@@ -132,7 +134,7 @@ async def test_delayed_message(
132134
broker_msg = BrokerMessage(
133135
task_id="1",
134136
task_name="name",
135-
message="message",
137+
message=pickle.dumps("message"),
136138
labels={"delay": "3"},
137139
)
138140

0 commit comments

Comments
 (0)