|
4 | 4 | import sys |
5 | 5 | import traceback |
6 | 6 | import warnings |
| 7 | +from typing import Dict, List |
7 | 8 |
|
8 | 9 | from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor |
9 | 10 |
|
|
14 | 15 | ConsumerStoppedError, IllegalOperation, UnsupportedVersionError, |
15 | 16 | IllegalStateError, NoOffsetForPartitionError, RecordTooLargeError |
16 | 17 | ) |
17 | | -from aiokafka.structs import TopicPartition |
| 18 | +from aiokafka.structs import TopicPartition, ConsumerRecord |
18 | 19 | from aiokafka.util import ( |
19 | 20 | commit_structure_validate, get_running_loop |
20 | 21 | ) |
@@ -1104,7 +1105,7 @@ def unsubscribe(self): |
1104 | 1105 | log.info( |
1105 | 1106 | "Unsubscribed all topics or patterns and assigned partitions") |
1106 | 1107 |
|
1107 | | - async def getone(self, *partitions): |
| 1108 | + async def getone(self, *partitions) -> ConsumerRecord: |
1108 | 1109 | """ |
1109 | 1110 | Get one message from Kafka. |
1110 | 1111 | If no new messages prefetched, this method will wait for it. |
@@ -1148,7 +1149,9 @@ async def getone(self, *partitions): |
1148 | 1149 | msg = await self._fetcher.next_record(partitions) |
1149 | 1150 | return msg |
1150 | 1151 |
|
1151 | | - async def getmany(self, *partitions, timeout_ms=0, max_records=None): |
| 1152 | + async def getmany( |
| 1153 | + self, *partitions, timeout_ms=0, max_records=None |
| 1154 | + ) -> Dict[TopicPartition, List[ConsumerRecord]]: |
1152 | 1155 | """Get messages from assigned topics / partitions. |
1153 | 1156 |
|
1154 | 1157 | Prefetched messages are returned in batches by topic-partition. |
@@ -1247,7 +1250,7 @@ def __aiter__(self): |
1247 | 1250 | raise ConsumerStoppedError() |
1248 | 1251 | return self |
1249 | 1252 |
|
1250 | | - async def __anext__(self): |
| 1253 | + async def __anext__(self) -> ConsumerRecord: |
1251 | 1254 | """Asyncio iterator interface for consumer |
1252 | 1255 |
|
1253 | 1256 | Note: |
|
0 commit comments