Skip to content

Commit 51aea98

Browse files
committed
Fail fast if a client sends bytes for an apiVersion below the oldest
To workaround a librdkafka issue, the proxy (following the behaviour of upstream Kafka) sets the min version for Produce RPCs to 0, despite only being able to decode v3+. If a client did send us v0-v2 messages then we will probably fail to decode the message, and the kafka message read will throw some unhelpful message, instead we can fail earlier with a better message logged. Note: there was some confusion over the meaning of the terms used by ApiKeys: - Supported means it can be decoded, so it encompasses the full range from oldest to latest unstable version. - Deprecated versions are supported versions that are eligible for deletion in future. If there are deprecated versions, then the deprecated version range will be a subset of the supported version range. Else it will be the range 0 to -1 when there are no deprecations. - The term enabled is used to talk about the supported versions, but depending on whether the broker has enabled unstableLastVersion. A supported version may not be enabled due to broker settings in upstream apache kafka. Signed-off-by: Robert Young <[email protected]>
1 parent 1f2240f commit 51aea98

File tree

4 files changed

+38
-6
lines changed

4 files changed

+38
-6
lines changed

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/codec/KafkaMessageDecoder.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,10 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
5656
}
5757
}
5858
catch (Exception e) {
59-
log().error("{}: Error in decoder", ctx, e);
59+
log().atError()
60+
.setMessage("{}: Error in decoder: " + e.getMessage())
61+
.addArgument(ctx)
62+
.setCause(log().isDebugEnabled() ? e : null).log();
6063
throw e;
6164
}
6265
}

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/codec/KafkaRequestDecoder.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66
package io.kroxylicious.proxy.internal.codec;
77

8+
import org.apache.kafka.common.errors.UnsupportedVersionException;
89
import org.apache.kafka.common.message.ApiVersionsRequestData;
910
import org.apache.kafka.common.message.RequestHeaderData;
1011
import org.apache.kafka.common.protocol.ApiKeys;
@@ -70,16 +71,21 @@ protected Frame decodeHeaderAndBody(ChannelHandlerContext ctx, ByteBuf in, final
7071
final RequestFrame frame;
7172
if (decodeRequest) {
7273
short highestProxyVersion = apiVersionsService.latestVersion(apiKey);
74+
short oldestProxyVersion = apiKey.oldestVersion();
7375
boolean clientAheadOfProxy = apiVersion > highestProxyVersion;
76+
boolean proxyAheadOfClient = apiVersion < oldestProxyVersion;
7477
if (clientAheadOfProxy) {
7578
if (apiKey == ApiKeys.API_VERSIONS) {
7679
return createV0ApiVersionRequestFrame(ctx, correlationId);
7780
}
7881
else {
79-
log().error("{}: apiVersion {} for {} ahead of proxy maximum: {}", ctx, apiVersion, apiKey, highestProxyVersion);
80-
throw new IllegalStateException("client apiVersion " + apiVersion + " ahead of proxy maximum " + highestProxyVersion + " for api key: " + apiKey);
82+
throw new UnsupportedVersionException(
83+
"client apiVersion " + apiVersion + " ahead of proxy maximum " + highestProxyVersion + " for api key: " + apiKey);
8184
}
8285
}
86+
else if (proxyAheadOfClient) {
87+
throw new UnsupportedVersionException("client apiVersion " + apiVersion + " below proxy minimum " + oldestProxyVersion + " for api key: " + apiKey);
88+
}
8389
DecodedBufer result = decodeRequest(ctx, in, headerVersion, sof);
8490
ApiMessage body = BodyDecoder.decodeRequest(apiKey, apiVersion, result.accessor());
8591
if (log().isTraceEnabled()) {

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/codec/KafkaRequestDecoderTest.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.stream.IntStream;
1616
import java.util.stream.Stream;
1717

18+
import org.apache.kafka.common.errors.UnsupportedVersionException;
1819
import org.apache.kafka.common.message.ApiVersionsRequestData;
1920
import org.apache.kafka.common.message.ProduceRequestData;
2021
import org.apache.kafka.common.message.RequestHeaderData;
@@ -125,7 +126,7 @@ void decodeUnknownApiVersions() {
125126

126127
// after ApiVersions negotiation we should never encounter a request from the client for an api version unknown to the proxy
127128
@Test
128-
void throwsOnUnsupportedVersionOfNonApiVersionsRequests() {
129+
void throwsOnUnsupportedVersion_NonApiVersionsRequestsAheadOfProxyMaximum() {
129130
EmbeddedChannel embeddedChannel = newEmbeddedChannel(new ApiVersionsServiceImpl(), RequestDecoderTest.DECODE_EVERYTHING);
130131
short maxSupportedVersion = ApiKeys.METADATA.latestVersion(true);
131132
short unsupportedVersion = (short) (maxSupportedVersion + 1);
@@ -140,10 +141,30 @@ void throwsOnUnsupportedVersionOfNonApiVersionsRequests() {
140141
accessor.writeInt(messageSize);
141142
header.write(accessor, cache, requestHeaderVersion);
142143
accessor.writeByteArray(arbitraryBodyBytes);
143-
assertThatThrownBy(() -> embeddedChannel.writeInbound(buffer)).isInstanceOf(DecoderException.class).cause().isInstanceOf(IllegalStateException.class)
144+
assertThatThrownBy(() -> embeddedChannel.writeInbound(buffer)).isInstanceOf(DecoderException.class).cause().isInstanceOf(UnsupportedVersionException.class)
144145
.hasMessage("client apiVersion %d ahead of proxy maximum %d for api key: METADATA", unsupportedVersion, maxSupportedVersion);
145146
}
146147

148+
@Test
149+
void throwsOnUnsupportedVersionRequestBelowProxyMinimum() {
150+
EmbeddedChannel embeddedChannel = newEmbeddedChannel(new ApiVersionsServiceImpl(), RequestDecoderTest.DECODE_EVERYTHING);
151+
short minSupportedVersion = ApiKeys.PRODUCE.oldestVersion();
152+
short unsupportedVersion = (short) (minSupportedVersion - 1);
153+
RequestHeaderData header = latestVersionHeaderWithAllFields(ApiKeys.PRODUCE, unsupportedVersion);
154+
byte[] arbitraryBodyBytes = new byte[]{ 1, 2, 3, 4 };
155+
ObjectSerializationCache cache = new ObjectSerializationCache();
156+
short requestHeaderVersion = ApiKeys.PRODUCE.requestHeaderVersion(ApiKeys.PRODUCE.latestVersion());
157+
int headerSize = header.size(cache, requestHeaderVersion);
158+
int messageSize = headerSize + arbitraryBodyBytes.length;
159+
ByteBuf buffer = Unpooled.buffer();
160+
ByteBufAccessorImpl accessor = new ByteBufAccessorImpl(buffer);
161+
accessor.writeInt(messageSize);
162+
header.write(accessor, cache, requestHeaderVersion);
163+
accessor.writeByteArray(arbitraryBodyBytes);
164+
assertThatThrownBy(() -> embeddedChannel.writeInbound(buffer)).isInstanceOf(DecoderException.class).cause().isInstanceOf(UnsupportedVersionException.class)
165+
.hasMessage("client apiVersion %d below proxy minimum %d for api key: PRODUCE", unsupportedVersion, minSupportedVersion);
166+
}
167+
147168
private static Stream<Arguments> produceRequestCases() {
148169
return Stream.concat(produceRequestsDoNotRequireResponse((short) 0, false), produceRequestsDoNotRequireResponse((short) 1, true));
149170
}

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/codec/RequestDecoderMockitoTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.ArrayList;
1010
import java.util.List;
1111

12+
import org.apache.kafka.common.protocol.ApiKeys;
1213
import org.junit.jupiter.api.BeforeEach;
1314
import org.junit.jupiter.api.Test;
1415
import org.junit.jupiter.api.extension.ExtendWith;
@@ -57,6 +58,7 @@ void shouldRewindBufferIfNotEnoughBytesAvailable() {
5758

5859
// Then
5960
verify(frameBuffer).readerIndex(initialIndex);
61+
6062
}
6163

6264
@SuppressWarnings("DataFlowIssue")
@@ -69,6 +71,7 @@ void shouldThrowOnPartialIfReadingPartialFrame() {
6971
when(frameBuffer.readInt()).thenReturn(10);
7072
when(frameBuffer.readableBytes()).thenReturn(20);
7173
when(frameBuffer.readSlice(anyInt())).thenReturn(frameSlice);
74+
when(frameSlice.readShort()).thenReturn(ApiKeys.API_VERSIONS.id, (short) 0);
7275

7376
List<Object> messageReceiver = new ArrayList<>();
7477

@@ -77,6 +80,5 @@ void shouldThrowOnPartialIfReadingPartialFrame() {
7780
assertThatThrownBy(() -> kafkaRequestDecoder.decode(null, frameBuffer, messageReceiver))
7881
.isInstanceOf(IllegalStateException.class)
7982
.hasMessageContaining("decodeHeaderAndBody did not read all of the buffer");
80-
8183
}
8284
}

0 commit comments

Comments
 (0)