Skip to content

Commit fda2423

Browse files
committed
Consumer.seek_to_committed now returns mapping of committed offsets
1 parent 1c59f57 commit fda2423

File tree

1 file changed

+7
-0
lines changed

1 file changed

+7
-0
lines changed

aiokafka/consumer/consumer.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,6 +825,10 @@ async def seek_to_committed(self, *partitions):
825825
*partitions: Optionally provide specific TopicPartitions, otherwise
826826
default to all assigned partitions.
827827
828+
Returns:
829+
dict: ``{TopicPartition: offset}`` mapping
830+
of the currently committed offsets.
831+
828832
Raises:
829833
IllegalStateError: If any partition is not currently assigned
830834
IllegalOperation: If used with ``group_id == None``
@@ -848,11 +852,14 @@ async def seek_to_committed(self, *partitions):
848852
raise IllegalStateError(
849853
"Partitions {} are not assigned".format(not_assigned))
850854

855+
committed_offsets = {}
851856
for tp in partitions:
852857
offset = await self.committed(tp)
858+
committed_offsets[tp] = offset
853859
log.debug("Seeking to committed of partition %s %s", tp, offset)
854860
if offset and offset > 0:
855861
self._fetcher.seek_to(tp, offset)
862+
return committed_offsets
856863

857864
async def offsets_for_times(self, timestamps):
858865
"""

0 commit comments

Comments
 (0)