1111import org .apache .kafka .common .message .ApiVersionsRequestData ;
1212import org .apache .kafka .common .message .ApiVersionsResponseData ;
1313import org .apache .kafka .common .protocol .ApiKeys ;
14+ import org .apache .kafka .common .protocol .Errors ;
1415import org .junit .jupiter .api .Test ;
1516import org .junit .jupiter .api .extension .ExtendWith ;
1617
2627import io .kroxylicious .test .tester .MockServerKroxyliciousTester ;
2728
2829import static io .kroxylicious .test .tester .KroxyliciousTesters .mockKafkaKroxyliciousTester ;
30+ import static org .assertj .core .api .Assertions .assertThat ;
2931import static org .junit .jupiter .api .Assertions .assertEquals ;
3032import static org .junit .jupiter .api .Assertions .assertTrue ;
3133
@@ -48,7 +50,8 @@ void shouldOfferTheMinimumHighestSupportedVersionWhenBrokerIsAheadOfKroxylicious
4850 var client = tester .simpleTestClient ()) {
4951 givenMockRespondsWithApiVersionsForApiKey (tester , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), (short ) (ApiKeys .METADATA .latestVersion () + 1 ));
5052 Response response = whenGetApiVersionsFromKroxylicious (client );
51- assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), ApiKeys .METADATA .latestVersion ());
53+ assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), ApiKeys .METADATA .latestVersion (),
54+ Errors .NONE .code ());
5255 }
5356 }
5457
@@ -61,18 +64,37 @@ void shouldOfferTheMinimumHighestSupportedVersionWhenBrokerIsAheadOfKroxylicious
6164 var client = tester .simpleTestClient ()) {
6265 givenMockRespondsWithApiVersionsForApiKey (tester , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), ApiKeys .METADATA .latestVersion (true ));
6366 Response response = whenGetApiVersionsFromKroxylicious (client );
64- assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), overriddenVersion );
67+ assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), overriddenVersion , Errors . NONE . code () );
6568 }
6669 }
6770
6871 @ Test
6972 void shouldOfferTheMinimumHighestSupportedVersionWhenKroxyliciousIsAheadOfBroker () {
7073 try (var tester = mockKafkaKroxyliciousTester (KroxyliciousConfigUtils ::proxy );
7174 var client = tester .simpleTestClient ()) {
72- givenMockRespondsWithApiVersionsForApiKey (tester , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), (short ) (ApiKeys .METADATA .latestVersion () - 1 ));
75+ short expectedApiVersion = (short ) (ApiKeys .METADATA .latestVersion () - 1 );
76+ givenMockRespondsWithApiVersionsForApiKey (tester , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), expectedApiVersion );
7377 Response response = whenGetApiVersionsFromKroxylicious (client );
7478 assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (),
75- (short ) (ApiKeys .METADATA .latestVersion () - 1 ));
79+ expectedApiVersion , Errors .NONE .code ());
80+ }
81+ }
82+
83+ /**
84+ * By the Kafka protocol, when the client ApiVersions request is ahead of the upstream, the server will respond
85+ * with a v0 response with error code 35 as per KIP-511. If the upstream responds this way, the proxy should
86+ * forward v0 response body bytes to the client.
87+ */
88+ @ Test
89+ void shouldHandleVersionZeroErrorResponseWhenKroxyliciousIsAheadOfBroker () {
90+ try (var tester = mockKafkaKroxyliciousTester (KroxyliciousConfigUtils ::proxy );
91+ var client = tester .simpleTestClient ()) {
92+ short brokerMaxVersion = (short ) (ApiKeys .API_VERSIONS .latestVersion () - 1 );
93+ givenMockRespondsWithDowngradedV0ApiVersionsResponse (tester , ApiKeys .API_VERSIONS , ApiKeys .API_VERSIONS .oldestVersion (), brokerMaxVersion );
94+ Response response = whenGetApiVersionsFromKroxylicious (client );
95+
96+ assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .API_VERSIONS , ApiKeys .API_VERSIONS .oldestVersion (),
97+ brokerMaxVersion , Errors .UNSUPPORTED_VERSION .code ());
7698 }
7799 }
78100
@@ -83,7 +105,7 @@ void shouldOfferTheMaximumLowestSupportedVersionWhenBrokerIsAheadOfKroxylicious(
83105 short brokerOldestVersion = (short ) (ApiKeys .METADATA .oldestVersion () + 1 );
84106 givenMockRespondsWithApiVersionsForApiKey (tester , ApiKeys .METADATA , brokerOldestVersion , ApiKeys .METADATA .latestVersion ());
85107 Response response = whenGetApiVersionsFromKroxylicious (client );
86- assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .METADATA , brokerOldestVersion , ApiKeys .METADATA .latestVersion ());
108+ assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .METADATA , brokerOldestVersion , ApiKeys .METADATA .latestVersion (), Errors . NONE . code () );
87109 }
88110 }
89111
@@ -94,7 +116,8 @@ void shouldOfferTheMaximumLowestSupportedVersionWhenKroxyliciousIsAheadOfBroker(
94116 short brokerOldestVersion = (short ) (ApiKeys .METADATA .oldestVersion () - 1 );
95117 givenMockRespondsWithApiVersionsForApiKey (tester , ApiKeys .METADATA , brokerOldestVersion , ApiKeys .METADATA .latestVersion ());
96118 Response response = whenGetApiVersionsFromKroxylicious (client );
97- assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), ApiKeys .METADATA .latestVersion ());
119+ assertKroxyliciousResponseOffersApiVersionsForApiKey (response , ApiKeys .METADATA , ApiKeys .METADATA .oldestVersion (), ApiKeys .METADATA .latestVersion (),
120+ Errors .NONE .code ());
98121 }
99122 }
100123
@@ -150,20 +173,32 @@ private static void givenMockRespondsWithApiVersionsForApiKey(MockServerKroxylic
150173 tester .addMockResponseForApiKey (new ResponsePayload (ApiKeys .API_VERSIONS , (short ) 3 , mockResponse ));
151174 }
152175
176+ private static void givenMockRespondsWithDowngradedV0ApiVersionsResponse (MockServerKroxyliciousTester tester , ApiKeys keys , short minVersion , short maxVersion ) {
177+ ApiVersionsResponseData mockResponse = new ApiVersionsResponseData ();
178+ ApiVersionsResponseData .ApiVersion version = new ApiVersionsResponseData .ApiVersion ();
179+ version .setApiKey (keys .id ).setMinVersion (minVersion ).setMaxVersion (maxVersion );
180+ mockResponse .apiKeys ().add (version );
181+ mockResponse .setErrorCode (Errors .UNSUPPORTED_VERSION .code ());
182+ tester .addMockResponseForApiKey (new ResponsePayload (ApiKeys .API_VERSIONS , (short ) 3 , mockResponse ));
183+ }
184+
153185 private static Response whenGetApiVersionsFromKroxylicious (KafkaClient client ) {
154186 return client .getSync (new Request (ApiKeys .API_VERSIONS , (short ) 3 , "client" , new ApiVersionsRequestData ()));
155187 }
156188
157- private static void assertKroxyliciousResponseOffersApiVersionsForApiKey (Response response , ApiKeys apiKeys , short minVersion , short maxVersion ) {
189+ private static void assertKroxyliciousResponseOffersApiVersionsForApiKey (Response response , ApiKeys apiKeys , short minVersion , short maxVersion , short expected ) {
158190 ResponsePayload payload = response .payload ();
159191 assertEquals (ApiKeys .API_VERSIONS , payload .apiKeys ());
160192 assertEquals ((short ) 3 , payload .apiVersion ());
161193 ApiVersionsResponseData message = (ApiVersionsResponseData ) payload .message ();
162- assertEquals (1 , message .apiKeys ().size ());
163- ApiVersionsResponseData .ApiVersion singletonVersion = message .apiKeys ().iterator ().next ();
164- assertEquals (apiKeys .id , singletonVersion .apiKey ());
165- assertEquals (minVersion , singletonVersion .minVersion ());
166- assertEquals (maxVersion , singletonVersion .maxVersion ());
194+ assertThat (message .errorCode ()).isEqualTo (expected );
195+ assertThat (message .apiKeys ())
196+ .singleElement ()
197+ .satisfies (apiVersion -> {
198+ assertThat (apiVersion .apiKey ()).isEqualTo (apiKeys .id );
199+ assertThat (apiVersion .minVersion ()).isEqualTo (minVersion );
200+ assertThat (apiVersion .maxVersion ()).isEqualTo (maxVersion );
201+ });
167202 }
168203
169204}
0 commit comments