Skip to content

Commit 9dda834

Browse files
committed
Test KafkaClient ensures frame entirely read by header/body decode
We want to use KafkaClient to declare what response apiVersion we expect the proxy to respond with. The kafka-client code we depend on to read the messages will sometimes be backwards compatible, for example if we try to read a v1 or v2 ApiVersions response message, then it will successfully read the v0 portion of the message, leaving some additional fields like throttleTimeMs unread. By KIP-511 the contract is we will send a v0 response, not a v0 compatible response, so I think it's good for the test client to require that all bytes are read at the response apiVersion we declare we expect. Signed-off-by: Robert Young <[email protected]>
1 parent ff365d5 commit 9dda834

File tree

2 files changed

+18
-0
lines changed

2 files changed

+18
-0
lines changed

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ else if (LOGGER.isDebugEnabled()) {
6767
ApiMessage body = BodyDecoder.decodeResponse(apiKey, apiVersion, accessor);
6868
log().trace("{}: Body: {}", ctx, body);
6969
frame = new DecodedResponseFrame<>(apiVersion, correlationId, header, body);
70+
if (in.readableBytes() != 0) {
71+
throw new RuntimeException("Unread bytes remaining in frame, potentially response api version differs from expectation");
72+
}
7073
correlation.responseFuture().complete(new SequencedResponse(frame, i++));
7174
return frame;
7275
}

kroxylicious-integration-test-support/src/test/java/io/kroxylicious/test/client/KafkaClientTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,21 @@ void testClientCanHandleResponseApiVersionDifferentFromRequestApiVersion() {
103103
}
104104
}
105105

106+
@Test
107+
void unreadBytesAfterFrameDecodeThrowsException() {
108+
ApiVersionsResponseData message = new ApiVersionsResponseData();
109+
message.setErrorCode(Errors.UNSUPPORTED_VERSION.code());
110+
message.setThrottleTimeMs(22);
111+
ResponsePayload v0Payload = new ResponsePayload(ApiKeys.API_VERSIONS, (short) 1, message);
112+
try (var mockServer = MockServer.startOnRandomPort(v0Payload);
113+
var kafkaClient = new KafkaClient("localhost", mockServer.port())) {
114+
CompletableFuture<Response> future = kafkaClient.get(new Request(ApiKeys.API_VERSIONS, (short) 3, "client", new ApiVersionsRequestData(), (short) 0));
115+
// fails to decode v0 response because client encodes v1, which is backwards compatible but emits additional bytes. We want to be as sure as we can be that it was a v0 response.
116+
assertThat(future).failsWithin(10, TimeUnit.SECONDS).withThrowableThat()
117+
.withMessageContaining("Unread bytes remaining in frame, potentially response api version differs from expectation");
118+
}
119+
}
120+
106121
@Test
107122
void unexpectedResponseFormatTriggersFailure() {
108123
ApiVersionsResponseData message = new ApiVersionsResponseData();

0 commit comments

Comments
 (0)