Skip to content

Commit 1563754

Browse files
author
Sergio García Prado
committed
ISSUE #?
* Raise a `StopAsyncIteration` when consumer is stopped.
1 parent 7e2d88b commit 1563754

File tree

2 files changed

+4
-13
lines changed

2 files changed

+4
-13
lines changed

packages/plugins/minos-broker-kafka/minos/plugins/kafka/subscriber.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,10 +166,9 @@ def admin_client(self):
166166
async def _receive(self) -> BrokerMessage:
167167
try:
168168
record = await self.client.getone()
169-
except ConsumerStoppedError as exc:
170-
if self.already_destroyed:
171-
raise StopAsyncIteration
172-
raise exc
169+
except ConsumerStoppedError:
170+
raise StopAsyncIteration
171+
173172
bytes_ = record.value
174173
message = BrokerMessage.from_avro_bytes(bytes_)
175174
return message

packages/plugins/minos-broker-kafka/tests/test_kafka/test_subscriber.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -201,20 +201,12 @@ async def test_receive(self):
201201
self.assertEqual(messages[0], await subscriber.receive())
202202
self.assertEqual(messages[1], await subscriber.receive())
203203

204-
async def test_receive_already_stopped_raises(self):
205-
subscriber = KafkaBrokerSubscriber.from_config(CONFIG_FILE_PATH, topics={"foo", "bar"})
206-
get_mock = AsyncMock(side_effect=ConsumerStoppedError)
207-
subscriber.client.getone = get_mock
208-
209-
with self.assertRaises(StopAsyncIteration):
210-
await subscriber.receive()
211-
212204
async def test_receive_stopped(self):
213205
async with KafkaBrokerSubscriber.from_config(CONFIG_FILE_PATH, topics={"foo", "bar"}) as subscriber:
214206
get_mock = AsyncMock(side_effect=ConsumerStoppedError)
215207
subscriber.client.getone = get_mock
216208

217-
with self.assertRaises(ConsumerStoppedError):
209+
with self.assertRaises(StopAsyncIteration):
218210
await subscriber.receive()
219211

220212

0 commit comments

Comments
 (0)