Skip to content

Commit 31232bb

Browse files
authored
Merge pull request #38 from robobario/consolidate-apiVersions-handling-it
Test v0 ApiVersions forwarding with IT
2 parents b373fe4 + 9dda834 commit 31232bb

File tree

15 files changed

+150
-65
lines changed

15 files changed

+150
-65
lines changed

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/Request.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
public record Request(ApiKeys apiKeys,
1313
short apiVersion,
1414
String clientIdHeader,
15-
ApiMessage message) {
16-
15+
ApiMessage message,
16+
short responseApiVersion) {
17+
public Request(ApiKeys apiKeys,
18+
short apiVersion,
19+
String clientIdHeader,
20+
ApiMessage message) {
21+
this(apiKeys, apiVersion, clientIdHeader, message, apiVersion);
22+
}
1723
}

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/CorrelationManager.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,16 @@ public CorrelationManager() {
3333
/**
3434
* Allocate and return a correlation id for an outgoing request to the broker.
3535
*
36-
* @param apiKey The API key.
37-
* @param apiVersion The API version.
38-
* @param correlationId The request's correlation id.
36+
* @param apiKey The API key.
37+
* @param apiVersion The API version.
38+
* @param correlationId The request's correlation id.
3939
* @param responseFuture The future to complete with the response
40+
* @param responseApiVersion
4041
*/
4142
public void putBrokerRequest(short apiKey,
4243
short apiVersion,
43-
int correlationId, CompletableFuture<SequencedResponse> responseFuture) {
44-
Correlation existing = this.brokerRequests.put(correlationId, new Correlation(apiKey, apiVersion, responseFuture));
44+
int correlationId, CompletableFuture<SequencedResponse> responseFuture, short responseApiVersion) {
45+
Correlation existing = this.brokerRequests.put(correlationId, new Correlation(apiKey, apiVersion, responseFuture, responseApiVersion));
4546
if (existing != null) {
4647
LOGGER.error("Duplicate upstream correlation id {}", correlationId);
4748
}
@@ -72,13 +73,15 @@ public void onChannelClose() {
7273
*/
7374
public record Correlation(short apiKey,
7475
short apiVersion,
75-
CompletableFuture<SequencedResponse> responseFuture) {
76+
CompletableFuture<SequencedResponse> responseFuture,
77+
short responseApiVersion) {
7678

7779
@Override
7880
public String toString() {
7981
return "Correlation(" +
8082
"apiKey=" + ApiKeys.forId(apiKey) +
8183
", apiVersion=" + apiVersion +
84+
", responseApiVersion=" + responseApiVersion +
8285
')';
8386
}
8487

@@ -91,12 +94,12 @@ public boolean equals(Object o) {
9194
return false;
9295
}
9396
Correlation that = (Correlation) o;
94-
return apiKey == that.apiKey && apiVersion == that.apiVersion;
97+
return apiKey == that.apiKey && apiVersion == that.apiVersion && responseApiVersion == that.responseApiVersion;
9598
}
9699

97100
@Override
98101
public int hashCode() {
99-
return Objects.hash(apiKey, apiVersion);
102+
return Objects.hash(apiKey, apiVersion, responseApiVersion);
100103
}
101104

102105
}

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/client/KafkaClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ private static DecodedRequestFrame<?> toApiRequest(Request request) {
9393
var header = new RequestHeaderData().setRequestApiKey(messageType.apiKey()).setRequestApiVersion(request.apiVersion());
9494
header.setClientId(request.clientIdHeader());
9595
header.setCorrelationId(correlationId.incrementAndGet());
96-
return new DecodedRequestFrame<>(header.requestApiVersion(), header.correlationId(), header, request.message());
96+
return new DecodedRequestFrame<>(header.requestApiVersion(), header.correlationId(), header, request.message(), request.responseApiVersion());
9797
}
9898

9999
// TODO return a Response class with jsonObject() and frame() methods

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/DecodedRequestFrame.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,24 @@ public class DecodedRequestFrame<B extends ApiMessage>
2222
implements RequestFrame {
2323

2424
private final CompletableFuture<SequencedResponse> responseFuture = new CompletableFuture<>();
25+
private final short responseApiVersion;
2526

2627
/**
2728
* Create a decoded request frame
29+
*
2830
* @param apiVersion apiVersion
2931
* @param correlationId correlationId
3032
* @param header header
3133
* @param body body
34+
* @param responseApiVersion
3235
*/
3336
public DecodedRequestFrame(short apiVersion,
3437
int correlationId,
3538
RequestHeaderData header,
36-
B body) {
39+
B body,
40+
short responseApiVersion) {
3741
super(apiVersion, correlationId, header, body);
42+
this.responseApiVersion = responseApiVersion;
3843
}
3944

4045
@Override
@@ -54,4 +59,9 @@ public CompletableFuture<SequencedResponse> getResponseFuture() {
5459
public boolean hasResponse() {
5560
return !(body instanceof ProduceRequest pr && pr.acks() == 0);
5661
}
62+
63+
@Override
64+
public short responseApiVersion() {
65+
return responseApiVersion;
66+
}
5767
}

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ protected Frame decodeHeaderAndBody(ChannelHandlerContext ctx, ByteBuf in, final
7171
log().trace("{}: body {}", ctx, body);
7272
}
7373

74-
frame = new DecodedRequestFrame<>(apiVersion, correlationId, header, body);
74+
frame = new DecodedRequestFrame<>(apiVersion, correlationId, header, body, apiVersion);
7575
if (log().isTraceEnabled()) {
7676
log().trace("{}: frame {}", ctx, frame);
7777
}

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaRequestEncoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ protected Logger log() {
3939
protected void encode(ChannelHandlerContext ctx, RequestFrame frame, ByteBuf out) throws Exception {
4040
super.encode(ctx, frame, out);
4141
if (frame.hasResponse()) {
42-
correlationManager.putBrokerRequest(frame.apiKey().id, frame.apiVersion(), frame.correlationId(), frame.getResponseFuture());
42+
correlationManager.putBrokerRequest(frame.apiKey().id, frame.apiVersion(), frame.correlationId(), frame.getResponseFuture(), frame.responseApiVersion());
4343
}
4444
}
4545

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/KafkaResponseDecoder.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,28 @@ protected Frame decodeHeaderAndBody(ChannelHandlerContext ctx, ByteBuf in, int l
5555
else if (LOGGER.isDebugEnabled()) {
5656
LOGGER.debug("{}: Recovered correlation {} for upstream correlation id {}", ctx, correlation, correlationId);
5757
}
58-
59-
final DecodedResponseFrame<?> frame;
60-
ApiKeys apiKey = ApiKeys.forId(correlation.apiKey());
61-
short apiVersion = correlation.apiVersion();
62-
var accessor = new ByteBufAccessorImpl(in);
63-
short headerVersion = apiKey.responseHeaderVersion(apiVersion);
64-
log().trace("{}: Header version: {}", ctx, headerVersion);
65-
ResponseHeaderData header = readHeader(headerVersion, accessor);
66-
log().trace("{}: Header: {}", ctx, header);
67-
ApiMessage body = BodyDecoder.decodeResponse(apiKey, apiVersion, accessor);
68-
log().trace("{}: Body: {}", ctx, body);
69-
frame = new DecodedResponseFrame<>(apiVersion, correlationId, header, body);
70-
correlation.responseFuture().complete(new SequencedResponse(frame, i++));
71-
return frame;
58+
try {
59+
final DecodedResponseFrame<?> frame;
60+
ApiKeys apiKey = ApiKeys.forId(correlation.apiKey());
61+
short apiVersion = correlation.responseApiVersion();
62+
var accessor = new ByteBufAccessorImpl(in);
63+
short headerVersion = apiKey.responseHeaderVersion(apiVersion);
64+
log().trace("{}: Header version: {}", ctx, headerVersion);
65+
ResponseHeaderData header = readHeader(headerVersion, accessor);
66+
log().trace("{}: Header: {}", ctx, header);
67+
ApiMessage body = BodyDecoder.decodeResponse(apiKey, apiVersion, accessor);
68+
log().trace("{}: Body: {}", ctx, body);
69+
frame = new DecodedResponseFrame<>(apiVersion, correlationId, header, body);
70+
if (in.readableBytes() != 0) {
71+
throw new RuntimeException("Unread bytes remaining in frame, potentially response api version differs from expectation");
72+
}
73+
correlation.responseFuture().complete(new SequencedResponse(frame, i++));
74+
return frame;
75+
}
76+
catch (Exception e) {
77+
correlation.responseFuture().completeExceptionally(e);
78+
throw e;
79+
}
7280
}
7381

7482
private ResponseHeaderData readHeader(short headerVersion, Readable accessor) {

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/codec/RequestFrame.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,8 @@ default boolean hasResponse() {
3636
*/
3737
short apiVersion();
3838

39+
default short responseApiVersion() {
40+
return apiVersion();
41+
}
42+
3943
}

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockHandler.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.netty.channel.ChannelInboundHandlerAdapter;
2626

2727
import io.kroxylicious.test.Request;
28+
import io.kroxylicious.test.ResponsePayload;
2829
import io.kroxylicious.test.codec.DecodedRequestFrame;
2930

3031
/**
@@ -51,11 +52,12 @@ private record ConditionalMockResponse(Matcher<Request> matcher, Action action,
5152

5253
/**
5354
* Create mockhandler with initial message to serve
54-
* @param message message to respond with, nullable
55+
* @param payload payload to respond with, nullable
5556
*/
56-
public MockHandler(ApiMessage message) {
57-
if (message != null) {
58-
setMockResponseForApiKey(ApiKeys.forId(message.apiKey()), message);
57+
public MockHandler(ResponsePayload payload) {
58+
if (payload != null && payload.message() != null) {
59+
ApiMessage message = payload.message();
60+
setMockResponseForApiKey(ApiKeys.forId(message.apiKey()), message, payload.responseApiVersion());
5961
}
6062
}
6163

kroxylicious-integration-test-support/src/main/java/io/kroxylicious/test/server/MockServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public int start(int port, ResponsePayload response, SslContext serverSslContext
125125
final EventGroupConfig eventGroupConfig = EventGroupConfig.create();
126126
bossGroup = eventGroupConfig.newBossGroup();
127127
workerGroup = eventGroupConfig.newWorkerGroup();
128-
serverHandler = new MockHandler(response == null ? null : response.message());
128+
serverHandler = new MockHandler(response);
129129
ServerBootstrap b = new ServerBootstrap();
130130
b.group(bossGroup, workerGroup)
131131
.channel(eventGroupConfig.serverChannelClass())

0 commit comments

Comments
 (0)