Skip to content

Commit 0fd47b7

Browse files
committed
Hoist the special casing out of BodyDecoder and into KafkaResponseDecoder
Signed-off-by: Sam Barker <sam@quadrocket.co.uk>
1 parent 0cc4121 commit 0fd47b7

File tree

2 files changed

+26
-23
lines changed

2 files changed

+26
-23
lines changed

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/codec/KafkaResponseDecoder.java

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import io.kroxylicious.proxy.frame.OpaqueResponseFrame;
2323
import io.kroxylicious.proxy.internal.InternalResponseFrame;
2424

25+
import edu.umd.cs.findbugs.annotations.NonNull;
2526
import edu.umd.cs.findbugs.annotations.Nullable;
2627

2728
public class KafkaResponseDecoder extends KafkaMessageDecoder {
2829

2930
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaResponseDecoder.class);
31+
public static final short EMERGENCY_API_VERSION = (short) 0;
3032

3133
private final CorrelationManager correlationManager;
3234

@@ -70,14 +72,14 @@ else if (LOGGER.isDebugEnabled()) {
7072
log().trace("{}: Header version: {}", ctx, headerVersion);
7173
ResponseHeaderData header = readHeader(headerVersion, accessor);
7274
log().trace("{}: Header: {}", ctx, header);
73-
ApiMessage body = BodyDecoder.decodeResponse(apiKey, apiVersion, accessor);
75+
ApiMessageVersion body = decodeBody(apiKey, apiVersion, accessor);
7476
log().trace("{}: Body: {}", ctx, body);
7577
Filter recipient = correlation.recipient();
7678
if (recipient == null) {
77-
frame = new DecodedResponseFrame<>(apiVersion, correlationId, header, body);
79+
frame = new DecodedResponseFrame<>(body.apiVersion(), correlationId, header, body.apiMessage());
7880
}
7981
else {
80-
frame = new InternalResponseFrame<>(recipient, apiVersion, correlationId, header, body, correlation.promise());
82+
frame = new InternalResponseFrame<>(recipient, body.apiVersion(), correlationId, header, body.apiMessage(), correlation.promise());
8183
}
8284
}
8385
else {
@@ -87,6 +89,26 @@ else if (LOGGER.isDebugEnabled()) {
8789
return frame;
8890
}
8991

92+
@NonNull
93+
private static ApiMessageVersion decodeBody(ApiKeys apiKey, short apiVersion, ByteBufAccessorImpl accessor) {
94+
int prev = accessor.readerIndex();
95+
try {
96+
return new ApiMessageVersion(BodyDecoder.decodeResponse(apiKey, apiVersion, accessor), apiVersion);
97+
}
98+
catch (RuntimeException e) {
99+
// KIP-511 when the client receives an unsupported version for the ApiVersionResponse, it fails back to version 0
100+
// Use the same algorithm as
101+
// https://github.com/apache/kafka/blob/a41c10fd49841381b5207c184a385622094ed440/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java#L90-L106
102+
accessor.readerIndex(prev);
103+
if (ApiKeys.API_VERSIONS.equals(apiKey) && apiVersion != 0) {
104+
return new ApiMessageVersion(BodyDecoder.decodeResponse(apiKey, EMERGENCY_API_VERSION, accessor), EMERGENCY_API_VERSION);
105+
}
106+
else {
107+
throw e;
108+
}
109+
}
110+
}
111+
90112
private OpaqueFrame opaqueFrame(short apiKeyId, short apiVersion, ByteBuf in, int correlationId, int length) {
91113
return new OpaqueResponseFrame(apiKeyId, apiVersion, in.readSlice(length).retain(), correlationId, length);
92114
}
@@ -95,4 +117,5 @@ private ResponseHeaderData readHeader(short headerVersion, Readable accessor) {
95117
return new ResponseHeaderData(accessor, headerVersion);
96118
}
97119

120+
record ApiMessageVersion(ApiMessage apiMessage, short apiVersion) {}
98121
}

kroxylicious-runtime/src/main/templates/BodyDecoder.ftl

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,27 +70,7 @@ public class BodyDecoder {
7070
return switch (apiKey) {
7171
<#list messageSpecs as messageSpec>
7272
<#if messageSpec.type?lower_case == 'response'>
73-
<#if messageSpec.name == 'ApiVersionsResponse'>
74-
case ${retrieveApiKey(messageSpec)} -> {
75-
// KIP-511 when the client receives an unsupported version for the ApiVersionResponse, it fails back to version 0
76-
// Use the same algorithm as https://github.com/apache/kafka/blob/a41c10fd49841381b5207c184a385622094ed440/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java#L90-L106
77-
int prev = accessor.readerIndex();
78-
try {
79-
yield new ${messageSpec.name}Data(accessor, apiVersion);
80-
}
81-
catch (RuntimeException e) {
82-
accessor.readerIndex(prev);
83-
if (apiVersion != 0) {
84-
yield new ${messageSpec.name}Data(accessor, (short) 0);
85-
}
86-
else {
87-
throw e;
88-
}
89-
}
90-
}
91-
<#else>
9273
case ${retrieveApiKey(messageSpec)} -> new ${messageSpec.name}Data(accessor, apiVersion);
93-
</#if>
9474
</#if>
9575
</#list>
9676

0 commit comments

Comments
 (0)