File tree Expand file tree Collapse file tree 2 files changed +14
-1
lines changed
packages/plugins/minos-broker-kafka Expand file tree Collapse file tree 2 files changed +14
-1
lines changed Original file line number Diff line number Diff line change 2222
2323from aiokafka import (
2424 AIOKafkaConsumer ,
25+ ConsumerStoppedError ,
2526)
2627from cached_property import (
2728 cached_property ,
@@ -166,7 +167,10 @@ def admin_client(self):
166167 return KafkaAdminClient (bootstrap_servers = f"{ self .host } :{ self .port } " )
167168
168169 async def _receive (self ) -> BrokerMessage :
169- record = await self .client .getone ()
170+ try :
171+ record = await self .client .getone ()
172+ except ConsumerStoppedError :
173+ raise StopAsyncIteration
170174 bytes_ = record .value
171175 message = BrokerMessage .from_avro_bytes (bytes_ )
172176 return message
Original file line number Diff line number Diff line change 1010
1111from aiokafka import (
1212 AIOKafkaConsumer ,
13+ ConsumerStoppedError ,
1314)
1415from kafka import (
1516 KafkaAdminClient ,
@@ -206,6 +207,14 @@ async def test_receive(self):
206207 self .assertEqual (messages [0 ], await subscriber .receive ())
207208 self .assertEqual (messages [1 ], await subscriber .receive ())
208209
210+ async def test_receive_stopped (self ):
211+ async with KafkaBrokerSubscriber .from_config (CONFIG_FILE_PATH , topics = {"foo" , "bar" }) as subscriber :
212+ get_mock = AsyncMock (side_effect = ConsumerStoppedError )
213+ subscriber .client .getone = get_mock
214+
215+ with self .assertRaises (StopAsyncIteration ):
216+ await subscriber .receive ()
217+
209218
210219class TestKafkaBrokerSubscriberBuilder (unittest .TestCase ):
211220 def setUp (self ) -> None :
You can’t perform that action at this time.
0 commit comments