@@ -134,18 +134,27 @@ def test_reset_offsets_if_needed(fetcher, topic, mocker):
134134
135135
136136def test__reset_offsets_async (fetcher , mocker ):
137- tp = TopicPartition ("topic" , 0 )
137+ tp0 = TopicPartition ("topic" , 0 )
138+ tp1 = TopicPartition ("topic" , 1 )
138139 fetcher ._subscriptions .subscribe (topics = ["topic" ])
139- fetcher ._subscriptions .assign_from_subscribed ([tp ])
140- fetcher ._subscriptions .request_offset_reset (tp )
141- fetched_offsets = {tp : OffsetAndTimestamp (1001 , None , - 1 )}
140+ fetcher ._subscriptions .assign_from_subscribed ([tp0 , tp1 ])
141+ fetcher ._subscriptions .request_offset_reset (tp0 )
142+ fetcher ._subscriptions .request_offset_reset (tp1 )
143+ mocker .patch .object (fetcher ._client .cluster , "leader_for_partition" , side_effect = [0 , 1 ])
142144 mocker .patch .object (fetcher ._client , 'ready' , return_value = True )
143- mocker .patch .object (fetcher , '_send_list_offsets_request' ,
144- return_value = Future ().success ((fetched_offsets , set ())))
145- mocker .patch .object (fetcher ._client .cluster , "leader_for_partition" , return_value = 0 )
146- fetcher ._reset_offsets_async ({tp : OffsetResetStrategy .EARLIEST })
147- assert not fetcher ._subscriptions .assignment [tp ].awaiting_reset
148- assert fetcher ._subscriptions .assignment [tp ].position .offset == 1001
145+ future1 = Future ()
146+ future2 = Future ()
147+ mocker .patch .object (fetcher , '_send_list_offsets_request' , side_effect = [future1 , future2 ])
148+ fetcher ._reset_offsets_async ({
149+ tp0 : OffsetResetStrategy .EARLIEST ,
150+ tp1 : OffsetResetStrategy .EARLIEST ,
151+ })
152+ future1 .success (({tp0 : OffsetAndTimestamp (1001 , None , - 1 )}, set ())),
153+ future2 .success (({tp1 : OffsetAndTimestamp (1002 , None , - 1 )}, set ())),
154+ assert not fetcher ._subscriptions .assignment [tp0 ].awaiting_reset
155+ assert not fetcher ._subscriptions .assignment [tp1 ].awaiting_reset
156+ assert fetcher ._subscriptions .assignment [tp0 ].position .offset == 1001
157+ assert fetcher ._subscriptions .assignment [tp1 ].position .offset == 1002
149158
150159
151160def test__send_list_offsets_requests (fetcher , mocker ):
0 commit comments