@@ -139,15 +139,14 @@ private static final class RespEntry {
139139 }
140140 }
141141
142- private static List < ShareFetchResponseData .ShareFetchableTopicResponse > respList (RespEntry ... entries ) {
143- HashMap <TopicIdPartition , ShareFetchResponseData .ShareFetchableTopicResponse > map = new HashMap <>();
142+ private static LinkedHashMap < TopicIdPartition , ShareFetchResponseData .PartitionData > buildResponseData (RespEntry ... entries ) {
143+ LinkedHashMap <TopicIdPartition , ShareFetchResponseData .PartitionData > topicIdPartitionToPartition = new LinkedHashMap <>();
144144 for (RespEntry entry : entries ) {
145- ShareFetchResponseData .ShareFetchableTopicResponse response = map .computeIfAbsent (entry .part , topicIdPartition ->
146- new ShareFetchResponseData .ShareFetchableTopicResponse ().setTopicId (topicIdPartition .topicId ()));
147- response .partitions ().add (new ShareFetchResponseData .PartitionData ()
148- .setPartitionIndex (entry .part .partition ()));
145+ ShareFetchResponseData .PartitionData partitionData = new ShareFetchResponseData .PartitionData ()
146+ .setPartitionIndex (entry .part .partition ());
147+ topicIdPartitionToPartition .put (entry .part , partitionData );
149148 }
150- return new ArrayList <>( map . values ()) ;
149+ return topicIdPartitionToPartition ;
151150 }
152151
153152 @ Test
@@ -170,13 +169,11 @@ public void testShareSession() {
170169 assertListEquals (expectedToSend1 , reqFetchList (requestData1 , topicNames ));
171170 assertEquals (memberId .toString (), requestData1 .memberId ());
172171
173- ShareFetchResponse resp = new ShareFetchResponse (
174- new ShareFetchResponseData ()
175- .setErrorCode (Errors .NONE .code ())
176- .setThrottleTimeMs (0 )
177- .setResponses (respList (
178- new RespEntry ("foo" , 0 , fooId ),
179- new RespEntry ("foo" , 1 , fooId ))));
172+ ShareFetchResponse resp = ShareFetchResponse .of (Errors .NONE ,
173+ 0 ,
174+ buildResponseData (new RespEntry ("foo" , 0 , fooId ), new RespEntry ("foo" , 1 , fooId )),
175+ List .of (),
176+ 0 );
180177 handler .handleResponse (resp , ApiKeys .SHARE_FETCH .latestVersion (true ));
181178
182179 // Test a fetch request which adds one partition
@@ -194,18 +191,15 @@ public void testShareSession() {
194191 expectedToSend2 .add (new TopicIdPartition (barId , 0 , "bar" ));
195192 assertListEquals (expectedToSend2 , reqFetchList (requestData2 , topicNames ));
196193
197- ShareFetchResponse resp2 = new ShareFetchResponse (
198- new ShareFetchResponseData ()
199- .setErrorCode (Errors .NONE .code ())
200- .setThrottleTimeMs (0 )
201- .setResponses (respList (
202- new RespEntry ("foo" , 1 , fooId ))));
194+ ShareFetchResponse resp2 = ShareFetchResponse .of (Errors .NONE ,
195+ 0 ,
196+ buildResponseData (new RespEntry ("foo" , 1 , fooId )),
197+ List .of (),
198+ 0 );
203199 handler .handleResponse (resp2 , ApiKeys .SHARE_FETCH .latestVersion (true ));
204200
205201 // A top-level error code will reset the session epoch
206- ShareFetchResponse resp3 = new ShareFetchResponse (
207- new ShareFetchResponseData ()
208- .setErrorCode (Errors .INVALID_SHARE_SESSION_EPOCH .code ()));
202+ ShareFetchResponse resp3 = ShareFetchResponse .of (Errors .INVALID_SHARE_SESSION_EPOCH , 0 , new LinkedHashMap <>(), List .of (), 0 );
209203 handler .handleResponse (resp3 , ApiKeys .SHARE_FETCH .latestVersion (true ));
210204
211205 ShareFetchRequestData requestData4 = handler .newShareFetchBuilder (groupId , fetchConfig ).build ().data ();
@@ -251,14 +245,14 @@ public void testPartitionRemoval() {
251245 assertListEquals (expectedToSend1 , reqFetchList (requestData1 , topicNames ));
252246 assertEquals (memberId .toString (), requestData1 .memberId ());
253247
254- ShareFetchResponse resp = new ShareFetchResponse (
255- new ShareFetchResponseData ()
256- . setErrorCode ( Errors . NONE . code ())
257- . setThrottleTimeMs ( 0 )
258- . setResponses ( respList (
259- new RespEntry ("foo " , 0 , fooId ),
260- new RespEntry ( "foo" , 1 , fooId ),
261- new RespEntry ( "bar" , 0 , barId ))) );
248+ ShareFetchResponse resp = ShareFetchResponse . of ( Errors . NONE ,
249+ 0 ,
250+ buildResponseData (
251+ new RespEntry ( "foo" , 0 , fooId ),
252+ new RespEntry ( "foo" , 1 , fooId ),
253+ new RespEntry ("bar " , 0 , barId ) ),
254+ List . of ( ),
255+ 0 );
262256 handler .handleResponse (resp , ApiKeys .SHARE_FETCH .latestVersion (true ));
263257
264258 // Test a fetch request which removes two partitions
@@ -275,9 +269,7 @@ public void testPartitionRemoval() {
275269 assertListEquals (expectedToForget2 , reqForgetList (requestData2 , topicNames ));
276270
277271 // A top-level error code will reset the session epoch
278- ShareFetchResponse resp2 = new ShareFetchResponse (
279- new ShareFetchResponseData ()
280- .setErrorCode (Errors .INVALID_SHARE_SESSION_EPOCH .code ()));
272+ ShareFetchResponse resp2 = ShareFetchResponse .of (Errors .INVALID_SHARE_SESSION_EPOCH , 0 , new LinkedHashMap <>(), List .of (), 0 );
281273 handler .handleResponse (resp2 , ApiKeys .SHARE_FETCH .latestVersion (true ));
282274
283275 handler .addPartitionToFetch (foo1 , null );
@@ -309,12 +301,11 @@ public void testTopicIdReplaced() {
309301 expectedToSend1 .add (new TopicIdPartition (topicId1 , 0 , "foo" ));
310302 assertListEquals (expectedToSend1 , reqFetchList (requestData1 , topicNames ));
311303
312- ShareFetchResponse resp = new ShareFetchResponse (
313- new ShareFetchResponseData ()
314- .setErrorCode (Errors .NONE .code ())
315- .setThrottleTimeMs (0 )
316- .setResponses (respList (
317- new RespEntry ("foo" , 0 , topicId1 ))));
304+ ShareFetchResponse resp = ShareFetchResponse .of (Errors .NONE ,
305+ 0 ,
306+ buildResponseData (new RespEntry ("foo" , 0 , topicId1 )),
307+ List .of (),
308+ 0 );
318309 handler .handleResponse (resp , ApiKeys .SHARE_FETCH .latestVersion (true ));
319310
320311 // Try to add a new topic ID
@@ -354,12 +345,11 @@ public void testPartitionForgottenOnAcknowledgeOnly() {
354345 expectedToSend1 .add (new TopicIdPartition (topicId , 0 , "foo" ));
355346 assertListEquals (expectedToSend1 , reqFetchList (requestData1 , topicNames ));
356347
357- ShareFetchResponse resp = new ShareFetchResponse (
358- new ShareFetchResponseData ()
359- .setErrorCode (Errors .NONE .code ())
360- .setThrottleTimeMs (0 )
361- .setResponses (respList (
362- new RespEntry ("foo" , 0 , topicId ))));
348+ ShareFetchResponse resp = ShareFetchResponse .of (Errors .NONE ,
349+ 0 ,
350+ buildResponseData (new RespEntry ("foo" , 0 , topicId )),
351+ List .of (),
352+ 0 );
363353 handler .handleResponse (resp , ApiKeys .SHARE_FETCH .latestVersion (true ));
364354
365355 // Remove the topic from the session by setting acknowledgements only - this is not asking to fetch records
@@ -390,12 +380,11 @@ public void testForgottenPartitions() {
390380 expectedToSend1 .add (new TopicIdPartition (topicId , 0 , "foo" ));
391381 assertListEquals (expectedToSend1 , reqFetchList (requestData1 , topicNames ));
392382
393- ShareFetchResponse resp = new ShareFetchResponse (
394- new ShareFetchResponseData ()
395- .setErrorCode (Errors .NONE .code ())
396- .setThrottleTimeMs (0 )
397- .setResponses (respList (
398- new RespEntry ("foo" , 0 , topicId ))));
383+ ShareFetchResponse resp = ShareFetchResponse .of (Errors .NONE ,
384+ 0 ,
385+ buildResponseData (new RespEntry ("foo" , 0 , topicId )),
386+ List .of (),
387+ 0 );
399388 handler .handleResponse (resp , ApiKeys .SHARE_FETCH .latestVersion (true ));
400389
401390 // Remove the topic from the session
@@ -424,23 +413,18 @@ public void testAddNewIdAfterTopicRemovedFromSession() {
424413 expectedToSend1 .add (new TopicIdPartition (topicId , 0 , "foo" ));
425414 assertListEquals (expectedToSend1 , reqFetchList (requestData1 , topicNames ));
426415
427- ShareFetchResponse resp = new ShareFetchResponse (
428- new ShareFetchResponseData ()
429- .setErrorCode (Errors .NONE .code ())
430- .setThrottleTimeMs (0 )
431- .setResponses (respList (
432- new RespEntry ("foo" , 0 , topicId ))));
416+ ShareFetchResponse resp = ShareFetchResponse .of (Errors .NONE ,
417+ 0 ,
418+ buildResponseData (new RespEntry ("foo" , 0 , topicId )),
419+ List .of (),
420+ 0 );
433421 handler .handleResponse (resp , ApiKeys .SHARE_FETCH .latestVersion (true ));
434422
435423 // Remove the partition from the session
436424 ShareFetchRequestData requestData2 = handler .newShareFetchBuilder (groupId , fetchConfig ).build ().data ();
437425 assertTrue (handler .sessionPartitionMap ().isEmpty ());
438426 assertTrue (requestData2 .topics ().isEmpty ());
439- ShareFetchResponse resp2 = new ShareFetchResponse (
440- new ShareFetchResponseData ()
441- .setErrorCode (Errors .NONE .code ())
442- .setThrottleTimeMs (0 )
443- .setResponses (respList ()));
427+ ShareFetchResponse resp2 = ShareFetchResponse .of (Errors .NONE , 0 , new LinkedHashMap <>(), List .of (), 0 );
444428 handler .handleResponse (resp2 , ApiKeys .SHARE_FETCH .latestVersion (true ));
445429
446430 // After the topic is removed, add a recreated topic with a new ID
0 commit comments