|
2 | 2 | import gc |
3 | 3 | import json |
4 | 4 | import time |
| 5 | +import warnings |
5 | 6 | from contextlib import contextmanager |
6 | 7 | from unittest import mock |
7 | 8 |
|
@@ -188,6 +189,36 @@ async def test_consumer_context_manager(self): |
188 | 189 | raise ValueError |
189 | 190 | assert consumer._closed |
190 | 191 |
|
| 192 | + @run_until_complete |
| 193 | + async def test_consumer_context_manager_start_error(self): |
| 194 | + for target, group_id in [ |
| 195 | + ("aiokafka.consumer.consumer.AIOKafkaClient.bootstrap", None), |
| 196 | + ("aiokafka.consumer.consumer.Fetcher.__init__", None), |
| 197 | + ("aiokafka.consumer.consumer.NoGroupCoordinator.__init__", None), |
| 198 | + ( |
| 199 | + "aiokafka.consumer.consumer.GroupCoordinator.__init__", |
| 200 | + f"group-{self.id()}", |
| 201 | + ), |
| 202 | + ]: |
| 203 | + consumer = AIOKafkaConsumer( |
| 204 | + self.topic, |
| 205 | + group_id=group_id, |
| 206 | + bootstrap_servers=self.hosts, |
| 207 | + enable_auto_commit=False, |
| 208 | + auto_offset_reset="earliest", |
| 209 | + ) |
| 210 | + |
| 211 | + # make consumer.start() raise |
| 212 | + with mock.patch(target, side_effect=RuntimeError("error")): |
| 213 | + with self.assertRaises(RuntimeError): |
| 214 | + async with consumer: |
| 215 | + self.fail(f"{target} did not raise") |
| 216 | + |
| 217 | + # should not require calling consumer.close() |
| 218 | + with warnings.catch_warnings(record=True) as w: |
| 219 | + del consumer |
| 220 | + self.assertEqual(len(w), 0, f"{target} got unexpected warnings: {w}") |
| 221 | + |
191 | 222 | @run_until_complete |
192 | 223 | async def test_consumer_api_version(self): |
193 | 224 | await self.send_messages(0, list(range(10))) |
|
0 commit comments