Skip to content

Commit d685943

Browse files
author
Sergio García Prado
committed
ISSUE #336
* Minor improvement
1 parent 6cb4ed7 commit d685943

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

packages/plugins/minos-broker-kafka/minos/plugins/kafka/subscriber.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,10 @@ def admin_client(self):
169169
async def _receive(self) -> BrokerMessage:
170170
try:
171171
record = await self.client.getone()
172-
except ConsumerStoppedError:
173-
raise StopAsyncIteration
172+
except ConsumerStoppedError as exc:
173+
if self.already_destroyed:
174+
raise StopAsyncIteration
175+
raise exc
174176
bytes_ = record.value
175177
message = BrokerMessage.from_avro_bytes(bytes_)
176178
return message

packages/plugins/minos-broker-kafka/tests/test_kafka/test_subscriber.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,20 @@ async def test_receive(self):
207207
self.assertEqual(messages[0], await subscriber.receive())
208208
self.assertEqual(messages[1], await subscriber.receive())
209209

210+
async def test_receive_already_stopped_raises(self):
211+
subscriber = KafkaBrokerSubscriber.from_config(CONFIG_FILE_PATH, topics={"foo", "bar"})
212+
get_mock = AsyncMock(side_effect=ConsumerStoppedError)
213+
subscriber.client.getone = get_mock
214+
215+
with self.assertRaises(StopAsyncIteration):
216+
await subscriber.receive()
217+
210218
async def test_receive_stopped(self):
211219
async with KafkaBrokerSubscriber.from_config(CONFIG_FILE_PATH, topics={"foo", "bar"}) as subscriber:
212220
get_mock = AsyncMock(side_effect=ConsumerStoppedError)
213221
subscriber.client.getone = get_mock
214222

215-
with self.assertRaises(StopAsyncIteration):
223+
with self.assertRaises(ConsumerStoppedError):
216224
await subscriber.receive()
217225

218226

0 commit comments

Comments
 (0)