@@ -79,8 +79,17 @@ def test_send_fetches(fetcher, topic, mocker):
7979 ])])
8080 ]
8181
82- mocker .patch .object (fetcher , '_create_fetch_requests' ,
83- return_value = dict (enumerate (fetch_requests )))
82+ def build_fetch_offsets (request ):
83+ fetch_offsets = {}
84+ for topic , partitions in request .topics :
85+ for partition_data in partitions :
86+ partition , offset = partition_data [:2 ]
87+ fetch_offsets [TopicPartition (topic , partition )] = offset
88+ return fetch_offsets
89+
90+ mocker .patch .object (
91+ fetcher , '_create_fetch_requests' ,
92+ return_value = (dict (enumerate (map (lambda r : (r , build_fetch_offsets (r )), fetch_requests )))))
8493
8594 mocker .patch .object (fetcher ._client , 'ready' , return_value = True )
8695 mocker .patch .object (fetcher ._client , 'send' )
@@ -100,8 +109,8 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version):
100109 fetcher ._client ._api_versions = BROKER_API_VERSIONS [api_version ]
101110 mocker .patch .object (fetcher ._client .cluster , "leader_for_partition" , return_value = 0 )
102111 by_node = fetcher ._create_fetch_requests ()
103- requests = by_node .values ()
104- assert set ([r .API_VERSION for r in requests ]) == set ([fetch_version ])
112+ requests_and_offsets = by_node .values ()
113+ assert set ([r .API_VERSION for ( r , _offsets ) in requests_and_offsets ]) == set ([fetch_version ])
105114
106115
107116def test_update_fetch_positions (fetcher , topic , mocker ):
@@ -345,19 +354,15 @@ def test_fetched_records(fetcher, topic, mocker):
345354 assert partial is False
346355
347356
348- @pytest .mark .parametrize (("fetch_request " , "fetch_response" , "num_partitions" ), [
357+ @pytest .mark .parametrize (("fetch_offsets " , "fetch_response" , "num_partitions" ), [
349358 (
350- FetchRequest [0 ](
351- - 1 , 100 , 100 ,
352- [('foo' , [(0 , 0 , 1000 ),])]),
359+ {TopicPartition ('foo' , 0 ): 0 },
353360 FetchResponse [0 ](
354361 [("foo" , [(0 , 0 , 1000 , [(0 , b'xxx' ),])]),]),
355362 1 ,
356363 ),
357364 (
358- FetchRequest [1 ](
359- - 1 , 100 , 100 ,
360- [('foo' , [(0 , 0 , 1000 ), (1 , 0 , 1000 ),])]),
365+ {TopicPartition ('foo' , 0 ): 0 , TopicPartition ('foo' , 1 ): 0 },
361366 FetchResponse [1 ](
362367 0 ,
363368 [("foo" , [
@@ -367,41 +372,33 @@ def test_fetched_records(fetcher, topic, mocker):
367372 2 ,
368373 ),
369374 (
370- FetchRequest [2 ](
371- - 1 , 100 , 100 ,
372- [('foo' , [(0 , 0 , 1000 ),])]),
375+ {TopicPartition ('foo' , 0 ): 0 },
373376 FetchResponse [2 ](
374377 0 , [("foo" , [(0 , 0 , 1000 , [(0 , b'xxx' ),])]),]),
375378 1 ,
376379 ),
377380 (
378- FetchRequest [3 ](
379- - 1 , 100 , 100 , 10000 ,
380- [('foo' , [(0 , 0 , 1000 ),])]),
381+ {TopicPartition ('foo' , 0 ): 0 },
381382 FetchResponse [3 ](
382383 0 , [("foo" , [(0 , 0 , 1000 , [(0 , b'xxx' ),])]),]),
383384 1 ,
384385 ),
385386 (
386- FetchRequest [4 ](
387- - 1 , 100 , 100 , 10000 , 0 ,
388- [('foo' , [(0 , 0 , 1000 ),])]),
387+ {TopicPartition ('foo' , 0 ): 0 },
389388 FetchResponse [4 ](
390389 0 , [("foo" , [(0 , 0 , 1000 , 0 , [], [(0 , b'xxx' ),])]),]),
391390 1 ,
392391 ),
393392 (
394393 # This may only be used in broker-broker api calls
395- FetchRequest [5 ](
396- - 1 , 100 , 100 , 10000 , 0 ,
397- [('foo' , [(0 , 0 , 1000 ),])]),
394+ {TopicPartition ('foo' , 0 ): 0 },
398395 FetchResponse [5 ](
399396 0 , [("foo" , [(0 , 0 , 1000 , 0 , 0 , [], [(0 , b'xxx' ),])]),]),
400397 1 ,
401398 ),
402399])
403- def test__handle_fetch_response (fetcher , fetch_request , fetch_response , num_partitions ):
404- fetcher ._handle_fetch_response (fetch_request , time .time (), fetch_response )
400+ def test__handle_fetch_response (fetcher , fetch_offsets , fetch_response , num_partitions ):
401+ fetcher ._handle_fetch_response (0 , fetch_offsets , time .time (), fetch_response )
405402 assert len (fetcher ._completed_fetches ) == num_partitions
406403
407404
0 commit comments