From fc968045a07555b0ad7a4858c84aea8babf3b7ea Mon Sep 17 00:00:00 2001 From: Robert Young Date: Wed, 23 Jul 2025 14:03:25 +1200 Subject: [PATCH 1/3] Test ApiVersions v0 upstream response is forwarded * Makes the test KafkaClient less smart, it must be told the expected response apiVersion, rather than implementing a fallback if it cannot understand the response. This means that our test that expects to successfully decode a v0 ApiVersions response would fail if it received a message encoded with another apiVersion. * Fixed the MockServer constructor, it wasn't wiring up the response apiVersion correctly. * Fixed a problem where an error at decode time in the test client resulted in a timeout. If we've obtained a correlation, then we can complete the response future exceptionally. This makes the test fail faster if the message can't be decoded at the expected version. Signed-off-by: Robert Young --- .../java/io/kroxylicious/test/Request.java | 10 ++++-- .../test/client/CorrelationManager.java | 19 ++++++----- .../kroxylicious/test/client/KafkaClient.java | 2 +- .../test/codec/DecodedRequestFrame.java | 12 ++++++- .../test/codec/KafkaRequestDecoder.java | 2 +- .../test/codec/KafkaRequestEncoder.java | 2 +- .../test/codec/KafkaResponseDecoder.java | 33 +++++++++++-------- .../kroxylicious/test/codec/RequestFrame.java | 4 +++ .../kroxylicious/test/server/MockHandler.java | 10 +++--- .../kroxylicious/test/server/MockServer.java | 2 +- .../src/main/templates/BodyDecoder.ftl | 22 +------------ .../test/client/CorrelationManagerTest.java | 10 +++--- .../test/client/KafkaClientTest.java | 17 ++++++++-- .../io/kroxylicious/proxy/ApiVersionsIT.java | 11 ++++--- 14 files changed, 91 insertions(+), 65 deletions(-) diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/Request.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/Request.java index 2ee07b1835..d69cc21205 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/Request.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/Request.java @@ -12,6 +12,12 @@ public record Request(ApiKeys apiKeys, short apiVersion, String clientIdHeader, - ApiMessage message) { - + ApiMessage message, + short responseApiVersion) { + public Request(ApiKeys apiKeys, + short apiVersion, + String clientIdHeader, + ApiMessage message) { + this(apiKeys, apiVersion, clientIdHeader, message, apiVersion); + } } diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/CorrelationManager.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/CorrelationManager.java index 556ae0fc35..996139ee31 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/CorrelationManager.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/CorrelationManager.java @@ -33,15 +33,16 @@ public CorrelationManager() { /** * Allocate and return a correlation id for an outgoing request to the broker. * - * @param apiKey The API key. - * @param apiVersion The API version. - * @param correlationId The request's correlation id. + * @param apiKey The API key. + * @param apiVersion The API version. + * @param correlationId The request's correlation id. * @param responseFuture The future to complete with the response + * @param responseApiVersion */ public void putBrokerRequest(short apiKey, short apiVersion, - int correlationId, CompletableFuture responseFuture) { - Correlation existing = this.brokerRequests.put(correlationId, new Correlation(apiKey, apiVersion, responseFuture)); + int correlationId, CompletableFuture responseFuture, short responseApiVersion) { + Correlation existing = this.brokerRequests.put(correlationId, new Correlation(apiKey, apiVersion, responseFuture, responseApiVersion)); if (existing != null) { LOGGER.error("Duplicate upstream correlation id {}", correlationId); } @@ -72,13 +73,15 @@ public void onChannelClose() { */ public record Correlation(short apiKey, short apiVersion, - CompletableFuture responseFuture) { + CompletableFuture responseFuture, + short responseApiVersion) { @Override public String toString() { return "Correlation(" + "apiKey=" + ApiKeys.forId(apiKey) + ", apiVersion=" + apiVersion + + ", responseApiVersion=" + responseApiVersion + ')'; } @@ -91,12 +94,12 @@ public boolean equals(Object o) { return false; } Correlation that = (Correlation) o; - return apiKey == that.apiKey && apiVersion == that.apiVersion; + return apiKey == that.apiKey && apiVersion == that.apiVersion && responseApiVersion == that.responseApiVersion; } @Override public int hashCode() { - return Objects.hash(apiKey, apiVersion); + return Objects.hash(apiKey, apiVersion, responseApiVersion); } } diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/KafkaClient.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/KafkaClient.java index af9e742eec..305db5299c 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/KafkaClient.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/KafkaClient.java @@ -93,7 +93,7 @@ private static DecodedRequestFrame toApiRequest(Request request) { var header = new RequestHeaderData().setRequestApiKey(messageType.apiKey()).setRequestApiVersion(request.apiVersion()); header.setClientId(request.clientIdHeader()); header.setCorrelationId(correlationId.incrementAndGet()); - return new DecodedRequestFrame<>(header.requestApiVersion(), header.correlationId(), header, request.message()); + return new DecodedRequestFrame<>(header.requestApiVersion(), header.correlationId(), header, request.message(), request.responseApiVersion()); } // TODO return a Response class with jsonObject() and frame() methods diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/DecodedRequestFrame.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/DecodedRequestFrame.java index 4046513837..bf07c86156 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/DecodedRequestFrame.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/DecodedRequestFrame.java @@ -22,19 +22,24 @@ public class DecodedRequestFrame implements RequestFrame { private final CompletableFuture responseFuture = new CompletableFuture<>(); + private final short responseApiVersion; /** * Create a decoded request frame + * * @param apiVersion apiVersion * @param correlationId correlationId * @param header header * @param body body + * @param responseApiVersion */ public DecodedRequestFrame(short apiVersion, int correlationId, RequestHeaderData header, - B body) { + B body, + short responseApiVersion) { super(apiVersion, correlationId, header, body); + this.responseApiVersion = responseApiVersion; } @Override @@ -54,4 +59,9 @@ public CompletableFuture getResponseFuture() { public boolean hasResponse() { return !(body instanceof ProduceRequest pr && pr.acks() == 0); } + + @Override + public short responseApiVersion() { + return responseApiVersion; + } } diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestDecoder.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestDecoder.java index 0b99767333..2c527b971b 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestDecoder.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestDecoder.java @@ -71,7 +71,7 @@ protected Frame decodeHeaderAndBody(ChannelHandlerContext ctx, ByteBuf in, final log().trace("{}: body {}", ctx, body); } - frame = new DecodedRequestFrame<>(apiVersion, correlationId, header, body); + frame = new DecodedRequestFrame<>(apiVersion, correlationId, header, body, apiVersion); if (log().isTraceEnabled()) { log().trace("{}: frame {}", ctx, frame); } diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestEncoder.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestEncoder.java index 676d93bfd3..a21d1714de 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestEncoder.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestEncoder.java @@ -39,7 +39,7 @@ protected Logger log() { protected void encode(ChannelHandlerContext ctx, RequestFrame frame, ByteBuf out) throws Exception { super.encode(ctx, frame, out); if (frame.hasResponse()) { - correlationManager.putBrokerRequest(frame.apiKey().id, frame.apiVersion(), frame.correlationId(), frame.getResponseFuture()); + correlationManager.putBrokerRequest(frame.apiKey().id, frame.apiVersion(), frame.correlationId(), frame.getResponseFuture(), frame.responseApiVersion()); } } diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java index e73a0c6288..73aab9b9a6 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java @@ -55,20 +55,25 @@ protected Frame decodeHeaderAndBody(ChannelHandlerContext ctx, ByteBuf in, int l else if (LOGGER.isDebugEnabled()) { LOGGER.debug("{}: Recovered correlation {} for upstream correlation id {}", ctx, correlation, correlationId); } - - final DecodedResponseFrame frame; - ApiKeys apiKey = ApiKeys.forId(correlation.apiKey()); - short apiVersion = correlation.apiVersion(); - var accessor = new ByteBufAccessorImpl(in); - short headerVersion = apiKey.responseHeaderVersion(apiVersion); - log().trace("{}: Header version: {}", ctx, headerVersion); - ResponseHeaderData header = readHeader(headerVersion, accessor); - log().trace("{}: Header: {}", ctx, header); - ApiMessage body = BodyDecoder.decodeResponse(apiKey, apiVersion, accessor); - log().trace("{}: Body: {}", ctx, body); - frame = new DecodedResponseFrame<>(apiVersion, correlationId, header, body); - correlation.responseFuture().complete(new SequencedResponse(frame, i++)); - return frame; + try { + final DecodedResponseFrame frame; + ApiKeys apiKey = ApiKeys.forId(correlation.apiKey()); + short apiVersion = correlation.responseApiVersion(); + var accessor = new ByteBufAccessorImpl(in); + short headerVersion = apiKey.responseHeaderVersion(apiVersion); + log().trace("{}: Header version: {}", ctx, headerVersion); + ResponseHeaderData header = readHeader(headerVersion, accessor); + log().trace("{}: Header: {}", ctx, header); + ApiMessage body = BodyDecoder.decodeResponse(apiKey, apiVersion, accessor); + log().trace("{}: Body: {}", ctx, body); + frame = new DecodedResponseFrame<>(apiVersion, correlationId, header, body); + correlation.responseFuture().complete(new SequencedResponse(frame, i++)); + return frame; + } + catch (Exception e) { + correlation.responseFuture().completeExceptionally(e); + throw e; + } } private ResponseHeaderData readHeader(short headerVersion, Readable accessor) { diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/RequestFrame.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/RequestFrame.java index 2981a133f1..de92f93b95 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/RequestFrame.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/RequestFrame.java @@ -36,4 +36,8 @@ default boolean hasResponse() { */ short apiVersion(); + default short responseApiVersion() { + return apiVersion(); + } + } diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockHandler.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockHandler.java index 6806820a9b..653563e956 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockHandler.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockHandler.java @@ -25,6 +25,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.kroxylicious.test.Request; +import io.kroxylicious.test.ResponsePayload; import io.kroxylicious.test.codec.DecodedRequestFrame; /** @@ -51,11 +52,12 @@ private record ConditionalMockResponse(Matcher matcher, Action action, /** * Create mockhandler with initial message to serve - * @param message message to respond with, nullable + * @param payload payload to respond with, nullable */ - public MockHandler(ApiMessage message) { - if (message != null) { - setMockResponseForApiKey(ApiKeys.forId(message.apiKey()), message); + public MockHandler(ResponsePayload payload) { + if (payload != null && payload.message() != null) { + ApiMessage message = payload.message(); + setMockResponseForApiKey(ApiKeys.forId(message.apiKey()), message, payload.responseApiVersion()); } } diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockServer.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockServer.java index 21cdcb6abf..ecd47737da 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockServer.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockServer.java @@ -125,7 +125,7 @@ public int start(int port, ResponsePayload response, SslContext serverSslContext final EventGroupConfig eventGroupConfig = EventGroupConfig.create(); bossGroup = eventGroupConfig.newBossGroup(); workerGroup = eventGroupConfig.newWorkerGroup(); - serverHandler = new MockHandler(response == null ? null : response.message()); + serverHandler = new MockHandler(response); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(eventGroupConfig.serverChannelClass()) diff --git a/kroxylicious-integration-test-support/src/main/templates/BodyDecoder.ftl b/kroxylicious-integration-test-support/src/main/templates/BodyDecoder.ftl index 1efb85c112..d84c5bcc77 100644 --- a/kroxylicious-integration-test-support/src/main/templates/BodyDecoder.ftl +++ b/kroxylicious-integration-test-support/src/main/templates/BodyDecoder.ftl @@ -73,27 +73,7 @@ public class BodyDecoder { return switch (apiKey) { <#list messageSpecs as messageSpec> <#if messageSpec.type?lower_case == 'response'> - <#if messageSpec.name == 'ApiVersionsResponse'> - case ${retrieveApiKey(messageSpec)} -> { - // KIP-511 when the client receives an unsupported version for the ApiVersionResponse, it fails back to version 0 - // Use the same algorithm as https://github.com/apache/kafka/blob/a41c10fd49841381b5207c184a385622094ed440/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java#L90-L106 - int prev = accessor.readerIndex(); - try { - yield new ${messageSpec.name}Data(accessor, apiVersion); - } - catch (RuntimeException e) { - accessor.readerIndex(prev); - if (apiVersion != 0) { - yield new ${messageSpec.name}Data(accessor, (short) 0); - } - else { - throw e; - } - } - } - <#else> - case ${retrieveApiKey(messageSpec)} -> new ${messageSpec.name}Data(accessor, apiVersion); - + case ${retrieveApiKey(messageSpec)} -> new ${messageSpec.name}Data(accessor, apiVersion); default -> throw new IllegalArgumentException("Unsupported RPC " + apiKey); diff --git a/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/CorrelationManagerTest.java b/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/CorrelationManagerTest.java index 3072077ea9..7806e3a154 100644 --- a/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/CorrelationManagerTest.java +++ b/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/CorrelationManagerTest.java @@ -21,8 +21,8 @@ void testPendingFuturesCompletedExceptionallyOnChannelClose() { CorrelationManager correlationManager = new CorrelationManager(); CompletableFuture responseFuture = new CompletableFuture<>(); CompletableFuture responseFuture2 = new CompletableFuture<>(); - correlationManager.putBrokerRequest((short) 1, (short) 1, 1, responseFuture); - correlationManager.putBrokerRequest((short) 1, (short) 1, 2, responseFuture2); + correlationManager.putBrokerRequest((short) 1, (short) 1, 1, responseFuture, (short) 1); + correlationManager.putBrokerRequest((short) 1, (short) 1, 2, responseFuture2, (short) 1); // when correlationManager.onChannelClose(); @@ -37,8 +37,8 @@ void testNullFutureToleratedOnChannelClose() { // given CorrelationManager correlationManager = new CorrelationManager(); CompletableFuture responseFuture = new CompletableFuture<>(); - correlationManager.putBrokerRequest((short) 1, (short) 1, 1, responseFuture); - correlationManager.putBrokerRequest((short) 1, (short) 1, 2, null); + correlationManager.putBrokerRequest((short) 1, (short) 1, 1, responseFuture, (short) 1); + correlationManager.putBrokerRequest((short) 1, (short) 1, 2, null, (short) 1); // when correlationManager.onChannelClose(); @@ -53,7 +53,7 @@ void testCorrelationRetrievableOnceOnly() { int correlationId = 1; CorrelationManager correlationManager = new CorrelationManager(); CompletableFuture responseFuture = new CompletableFuture<>(); - correlationManager.putBrokerRequest((short) 1, (short) 1, correlationId, responseFuture); + correlationManager.putBrokerRequest((short) 1, (short) 1, correlationId, responseFuture, (short) 1); CorrelationManager.Correlation brokerCorrelation = correlationManager.getBrokerCorrelation(correlationId); assertThat(brokerCorrelation).isNotNull(); diff --git a/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/KafkaClientTest.java b/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/KafkaClientTest.java index 88eb164ffa..333a55d0e0 100644 --- a/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/KafkaClientTest.java +++ b/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/KafkaClientTest.java @@ -88,13 +88,13 @@ void testClientCanSendOpaqueFrame() { // brokers can respond with a v0 response if they do not support the ApiVersions request version, see KIP-511 @Test - void testClientCanTolerateV0ApiVersionsResponseToHigherRequestVersion() { + void testClientCanHandleResponseApiVersionDifferentFromRequestApiVersion() { ApiVersionsResponseData message = new ApiVersionsResponseData(); message.setErrorCode(Errors.UNSUPPORTED_VERSION.code()); ResponsePayload v0Payload = new ResponsePayload(ApiKeys.API_VERSIONS, (short) 0, message); try (var mockServer = MockServer.startOnRandomPort(v0Payload); var kafkaClient = new KafkaClient("localhost", mockServer.port())) { - CompletableFuture future = kafkaClient.get(new Request(ApiKeys.API_VERSIONS, (short) 0, "client", new ApiVersionsRequestData())); + CompletableFuture future = kafkaClient.get(new Request(ApiKeys.API_VERSIONS, (short) 3, "client", new ApiVersionsRequestData(), (short) 0)); assertThat(future).succeedsWithin(10, TimeUnit.SECONDS).satisfies(response -> { assertThat(response.payload().message()).isInstanceOfSatisfying(ApiVersionsResponseData.class, apiVersionsRequestData -> { assertThat(apiVersionsRequestData.errorCode()).isEqualTo(Errors.UNSUPPORTED_VERSION.code()); @@ -103,6 +103,19 @@ void testClientCanTolerateV0ApiVersionsResponseToHigherRequestVersion() { } } + @Test + void unexpectedResponseFormatTriggersFailure() { + ApiVersionsResponseData message = new ApiVersionsResponseData(); + message.setErrorCode(Errors.UNSUPPORTED_VERSION.code()); + ResponsePayload v0Payload = new ResponsePayload(ApiKeys.API_VERSIONS, (short) 0, message); + try (var mockServer = MockServer.startOnRandomPort(v0Payload); + var kafkaClient = new KafkaClient("localhost", mockServer.port())) { + CompletableFuture future = kafkaClient.get(new Request(ApiKeys.API_VERSIONS, (short) 3, "client", new ApiVersionsRequestData())); + // fails to decode v0 response because client expects v3 + assertThat(future).failsWithin(10, TimeUnit.SECONDS).withThrowableThat().withMessageContaining("non-nullable field apiKeys was serialized as null"); + } + } + @Test void shouldWorkWithTls() throws Exception { shouldWorkWithTls(SslContextBuilder.forClient() diff --git a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ApiVersionsIT.java b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ApiVersionsIT.java index 6e9343d303..419958d182 100644 --- a/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ApiVersionsIT.java +++ b/kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ApiVersionsIT.java @@ -91,7 +91,7 @@ void shouldHandleVersionZeroErrorResponseWhenKroxyliciousIsAheadOfBroker() { var client = tester.simpleTestClient()) { short brokerMaxVersion = (short) (ApiKeys.API_VERSIONS.latestVersion() - 1); givenMockRespondsWithDowngradedV0ApiVersionsResponse(tester, ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.oldestVersion(), brokerMaxVersion); - Response response = whenGetApiVersionsFromKroxylicious(client); + Response response = whenGetApiVersionsFromKroxylicious(client, (short) 3, (short) 0); assertKroxyliciousResponseOffersApiVersionsForApiKey(response, ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.oldestVersion(), brokerMaxVersion, Errors.UNSUPPORTED_VERSION.code()); @@ -179,17 +179,20 @@ private static void givenMockRespondsWithDowngradedV0ApiVersionsResponse(MockSer version.setApiKey(keys.id).setMinVersion(minVersion).setMaxVersion(maxVersion); mockResponse.apiKeys().add(version); mockResponse.setErrorCode(Errors.UNSUPPORTED_VERSION.code()); - tester.addMockResponseForApiKey(new ResponsePayload(ApiKeys.API_VERSIONS, (short) 3, mockResponse)); + tester.addMockResponseForApiKey(new ResponsePayload(ApiKeys.API_VERSIONS, (short) 0, mockResponse)); } private static Response whenGetApiVersionsFromKroxylicious(KafkaClient client) { - return client.getSync(new Request(ApiKeys.API_VERSIONS, (short) 3, "client", new ApiVersionsRequestData())); + return whenGetApiVersionsFromKroxylicious(client, (short) 3, (short) 3); + } + + private static Response whenGetApiVersionsFromKroxylicious(KafkaClient client, short requestApiVersion, short responseApiVersion) { + return client.getSync(new Request(ApiKeys.API_VERSIONS, requestApiVersion, "client", new ApiVersionsRequestData(), responseApiVersion)); } private static void assertKroxyliciousResponseOffersApiVersionsForApiKey(Response response, ApiKeys apiKeys, short minVersion, short maxVersion, short expected) { ResponsePayload payload = response.payload(); assertEquals(ApiKeys.API_VERSIONS, payload.apiKeys()); - assertEquals((short) 3, payload.apiVersion()); ApiVersionsResponseData message = (ApiVersionsResponseData) payload.message(); assertThat(message.errorCode()).isEqualTo(expected); assertThat(message.apiKeys()) From ff365d5496273571347ecd0726ba15f8e8ff5a68 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Wed, 23 Jul 2025 14:34:30 +1200 Subject: [PATCH 2/3] Add coverage of undecodable ApiVersions responses Signed-off-by: Robert Young --- .../internal/codec/ResponseDecoderTest.java | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/codec/ResponseDecoderTest.java b/kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/codec/ResponseDecoderTest.java index f5effecf3a..5359c8f563 100644 --- a/kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/codec/ResponseDecoderTest.java +++ b/kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/codec/ResponseDecoderTest.java @@ -5,6 +5,9 @@ */ package io.kroxylicious.proxy.internal.codec; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -30,6 +33,7 @@ import static io.kroxylicious.proxy.internal.codec.ByteBufs.writeByteBuf; import static io.kroxylicious.proxy.model.VirtualClusterModel.DEFAULT_SOCKET_FRAME_MAX_SIZE_BYTES; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; @@ -182,4 +186,41 @@ void supportsFallbackToApiResponseV0() { .extracting("body") .isEqualTo(new ApiVersionsResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code())); } + + @Test + void throwsOnUndecodableV0ApiVersionsResponse() throws IOException { + int upstreamCorrelationId = mgr.putBrokerRequest(ApiKeys.API_VERSIONS.id, (short) 0, 52, true, null, null, true); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + dataOutputStream.writeInt(4); // framesize + dataOutputStream.writeInt(upstreamCorrelationId); + dataOutputStream.close(); + // given + ByteBuf buffer = Unpooled.wrappedBuffer(byteArrayOutputStream.toByteArray()); + List objects = new ArrayList<>(); + + // when + assertThatThrownBy(() -> { + responseDecoder.decode(null, buffer, objects); + }).isInstanceOf(IndexOutOfBoundsException.class); + + } + + @Test + void throwsOnUndecodableNonV0ApiVersionsResponse() throws IOException { + int upstreamCorrelationId = mgr.putBrokerRequest(ApiKeys.API_VERSIONS.id, (short) 3, 52, true, null, null, true); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + dataOutputStream.writeInt(4); // framesize + dataOutputStream.writeInt(upstreamCorrelationId); + dataOutputStream.close(); + // given + ByteBuf buffer = Unpooled.wrappedBuffer(byteArrayOutputStream.toByteArray()); + List objects = new ArrayList<>(); + + // then + assertThatThrownBy(() -> { + responseDecoder.decode(null, buffer, objects); + }).isInstanceOf(IndexOutOfBoundsException.class); + } } \ No newline at end of file From 9dda83474422e03f29207f3c0b5dea3fdc814974 Mon Sep 17 00:00:00 2001 From: Robert Young Date: Wed, 23 Jul 2025 15:49:16 +1200 Subject: [PATCH 3/3] Test KafkaClient ensures frame entirely read by header/body decode We want to use KafkaClient to declare what response apiVersion we expect the proxy to respond with. The kafka-client code we depend on to read the messages will sometimes be backwards compatible, for example if we try to read a v1 or v2 ApiVersions response message, then it will successfully read the v0 portion of the message, leaving some additional fields like throttleTimeMs unread. By KIP-511 the contract is we will send a v0 response, not a v0 compatible response, so I think it's good for the test client to require that all bytes are read at the response apiVersion we declare we expect. Signed-off-by: Robert Young --- .../test/codec/KafkaResponseDecoder.java | 3 +++ .../kroxylicious/test/client/KafkaClientTest.java | 15 +++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java index 73aab9b9a6..1abecf2b2b 100644 --- a/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java +++ b/kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java @@ -67,6 +67,9 @@ else if (LOGGER.isDebugEnabled()) { ApiMessage body = BodyDecoder.decodeResponse(apiKey, apiVersion, accessor); log().trace("{}: Body: {}", ctx, body); frame = new DecodedResponseFrame<>(apiVersion, correlationId, header, body); + if (in.readableBytes() != 0) { + throw new RuntimeException("Unread bytes remaining in frame, potentially response api version differs from expectation"); + } correlation.responseFuture().complete(new SequencedResponse(frame, i++)); return frame; } diff --git a/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/KafkaClientTest.java b/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/KafkaClientTest.java index 333a55d0e0..28bba6f8e0 100644 --- a/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/KafkaClientTest.java +++ b/kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/KafkaClientTest.java @@ -103,6 +103,21 @@ void testClientCanHandleResponseApiVersionDifferentFromRequestApiVersion() { } } + @Test + void unreadBytesAfterFrameDecodeThrowsException() { + ApiVersionsResponseData message = new ApiVersionsResponseData(); + message.setErrorCode(Errors.UNSUPPORTED_VERSION.code()); + message.setThrottleTimeMs(22); + ResponsePayload v0Payload = new ResponsePayload(ApiKeys.API_VERSIONS, (short) 1, message); + try (var mockServer = MockServer.startOnRandomPort(v0Payload); + var kafkaClient = new KafkaClient("localhost", mockServer.port())) { + CompletableFuture future = kafkaClient.get(new Request(ApiKeys.API_VERSIONS, (short) 3, "client", new ApiVersionsRequestData(), (short) 0)); + // fails to decode v0 response because client encodes v1, which is backwards compatible but emits additional bytes. We want to be as sure as we can be that it was a v0 response. + assertThat(future).failsWithin(10, TimeUnit.SECONDS).withThrowableThat() + .withMessageContaining("Unread bytes remaining in frame, potentially response api version differs from expectation"); + } + } + @Test void unexpectedResponseFormatTriggersFailure() { ApiVersionsResponseData message = new ApiVersionsResponseData();