File tree Expand file tree Collapse file tree 1 file changed +7
-0
lines changed Expand file tree Collapse file tree 1 file changed +7
-0
lines changed Original file line number Diff line number Diff line change @@ -825,6 +825,10 @@ async def seek_to_committed(self, *partitions):
825825 *partitions: Optionally provide specific TopicPartitions, otherwise
826826 default to all assigned partitions.
827827
828+ Returns:
829+ dict: ``{TopicPartition: offset}`` mapping
830+ of the currently committed offsets.
831+
828832 Raises:
829833 IllegalStateError: If any partition is not currently assigned
830834 IllegalOperation: If used with ``group_id == None``
@@ -848,11 +852,14 @@ async def seek_to_committed(self, *partitions):
848852 raise IllegalStateError (
849853 "Partitions {} are not assigned" .format (not_assigned ))
850854
855+ committed_offsets = {}
851856 for tp in partitions :
852857 offset = await self .committed (tp )
858+ committed_offsets [tp ] = offset
853859 log .debug ("Seeking to committed of partition %s %s" , tp , offset )
854860 if offset and offset > 0 :
855861 self ._fetcher .seek_to (tp , offset )
862+ return committed_offsets
856863
857864 async def offsets_for_times (self , timestamps ):
858865 """
You can’t perform that action at this time.
0 commit comments