Skip to content

Commit 9e71faa

Browse files
authored
Only downgrade PRODUCE version to v0 if min version <= lowestSupported (kroxylicious#2917)
If the upstream, or upstream filters advertise a minimum supported API_VERSIONS less than or equal to the lowest supported (currently 3) , then the framework will set it to 0. Why: We made changes recently to allow a v0 minimum Produce version to be passed through to the client unchanged to support librdkafka. This matches upstream behaviour where they advertise v0-v2 as being available despite kafka 4.0+ not supporting those versions. That worked, but we also sometimes write Filters that manipulate ApiVersions and this is a trap. If a Filter restricts the minimum Produce version then it could break librdkafka. We also do not wish to overly impact a Filter that sets the minimum Produce version higher than the lowest supported, we should forward that on to clients, even if it breaks some clients. We can safely downgrade the lowest support version because the proxy cannot interpret the bytes for older versions correctly in any case, and an exception will be thrown the closes the channel. See also: - kroxylicious#2844 - https://issues.apache.org/jira/browse/KAFKA-18659 Only set produce to v0 if upstream min version <= latest supported version Signed-off-by: Robert Young <[email protected]>
1 parent edaa452 commit 9e71faa

File tree

3 files changed

+81
-17
lines changed

3 files changed

+81
-17
lines changed

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ApiVersionsIT.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,12 @@ void shouldOfferBrokerApisThatAreKnownToKroxy() {
160160
.collect(Collectors.toMap(k -> ApiKeys.forId(k.apiKey()), k -> k));
161161
for (ApiKeys knownValue : ApiKeys.values()) {
162162
assertTrue(responseVersions.containsKey(knownValue));
163-
assertEquals(knownValue.oldestVersion(), responseVersions.get(knownValue).minVersion());
163+
if (knownValue == ApiKeys.PRODUCE) {
164+
assertEquals(knownValue.messageType.lowestDeprecatedVersion(), responseVersions.get(knownValue).minVersion());
165+
}
166+
else {
167+
assertEquals(knownValue.oldestVersion(), responseVersions.get(knownValue).minVersion());
168+
}
164169
assertEquals(knownValue.latestVersion(), responseVersions.get(knownValue).maxVersion());
165170
}
166171
}
@@ -199,6 +204,40 @@ void shouldMarkProduceV0toV2AsSupportedVersions() {
199204
}
200205
}
201206

207+
// Kafka 4 removed support for ProduceRequest v0-v2 however there is an issue with libRDKafka versions <= v2.11.0 that meant this broke compression support
208+
// The proxy needs to replicate this special case handling so we can proxy older libRDKafka based clients (just about anything that isn't Java)
209+
// Some Filters also manipulate versions, to avoid them all having to replicate this logic the framework will
210+
// always set the minimum version for Produce to v0.
211+
@Test
212+
void shouldMarkProduceV0toV2AsSupportedVersionsEvenIfUpstreamDisagrees() {
213+
// Given
214+
try (var tester = mockKafkaKroxyliciousTester(KroxyliciousConfigUtils::proxy);
215+
var client = tester.simpleTestClient()) {
216+
ApiVersionsResponseData mockResponse = new ApiVersionsResponseData();
217+
ApiVersionsResponseData.ApiVersion version = new ApiVersionsResponseData.ApiVersion();
218+
version.setApiKey(ApiKeys.PRODUCE.id)
219+
.setMinVersion((short) 3)
220+
.setMaxVersion(ApiKeys.PRODUCE.latestVersion());
221+
mockResponse.apiKeys().add(version);
222+
tester.addMockResponseForApiKey(new ResponsePayload(ApiKeys.API_VERSIONS, (short) 3, mockResponse));
223+
224+
// When
225+
Response response = whenGetApiVersionsFromKroxylicious(client);
226+
227+
// Then
228+
ResponsePayload payload = response.payload();
229+
assertThat(payload.message())
230+
.asInstanceOf(InstanceOfAssertFactories.type(ApiVersionsResponseData.class))
231+
.satisfies(apiVersionsResponseData -> assertThat(apiVersionsResponseData.apiKeys())
232+
.singleElement()
233+
.satisfies(apiKeys -> assertThat(apiKeys)
234+
.satisfies(apiVersion -> {
235+
assertThat(apiVersion.apiKey()).isEqualTo(ApiKeys.PRODUCE.id);
236+
assertThat(apiVersion.minVersion()).isEqualTo(ApiKeys.PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION);
237+
})));
238+
}
239+
}
240+
202241
private static void givenMockRespondsWithApiVersionsForMetadataRequest(MockServerKroxyliciousTester tester, short minVersion, short maxVersion) {
203242
ApiVersionsResponseData mockResponse = new ApiVersionsResponseData();
204243
ApiVersionsResponseData.ApiVersion version = new ApiVersionsResponseData.ApiVersion();

kroxylicious-runtime/src/main/java/io/kroxylicious/proxy/internal/ApiVersionsServiceImpl.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,14 @@ private static void intersectApiVersion(String sessionId, ApiVersion key, ApiKey
9595
short mutualMin = (short) Math.max(
9696
key.minVersion(),
9797
apiKey.messageType.lowestSupportedVersion());
98-
if (mutualMin != key.minVersion()) {
99-
if (ApiKeys.PRODUCE.equals(apiKey)) {
100-
// Kafka 4 removed support for ProduceRequest v0-v2 however there is an issue with libRDKafka versions <= v2.11.0 that meant this broke compression support
101-
// https://issues.apache.org/jira/browse/KAFKA-18659 marks v0-v2 as supported versions however the broker will reject all uses of these old requests.
102-
// The proxy needs to replicate this special case handling so we can proxy older libRDKafka based clients (just about anything that isn't Java)
103-
LOGGER.trace("{}: {} min version unchanged (is: {}) to support KAFKA-18659", sessionId, apiKey, mutualMin);
104-
key.setMinVersion(apiKey.messageType.lowestDeprecatedVersion());
105-
}
106-
else {
107-
LOGGER.trace("{}: {} min version changed to {} (was: {})", sessionId, apiKey, mutualMin, key.maxVersion());
108-
key.setMinVersion(mutualMin);
109-
}
98+
if (ApiKeys.PRODUCE.equals(apiKey)
99+
&& key.minVersion() <= apiKey.messageType.lowestSupportedVersion()) {
100+
LOGGER.trace("{}: {} min version downgraded to v0 (is: {}) to support KAFKA-18659", sessionId, apiKey, key.minVersion());
101+
key.setMinVersion(apiKey.messageType.lowestDeprecatedVersion());
102+
}
103+
else if (mutualMin != key.minVersion()) {
104+
LOGGER.trace("{}: {} min version changed to {} (was: {})", sessionId, apiKey, mutualMin, key.maxVersion());
105+
key.setMinVersion(mutualMin);
110106
}
111107
else {
112108
LOGGER.trace("{}: {} min version unchanged (is: {})", sessionId, apiKey, mutualMin);

kroxylicious-runtime/src/test/java/io/kroxylicious/proxy/internal/ApiVersionsServiceImplTest.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,16 @@
77
package io.kroxylicious.proxy.internal;
88

99
import java.util.Map;
10+
import java.util.stream.IntStream;
11+
import java.util.stream.Stream;
1012

1113
import org.apache.kafka.common.message.ApiVersionsResponseData;
1214
import org.apache.kafka.common.protocol.ApiKeys;
1315
import org.junit.jupiter.api.BeforeEach;
1416
import org.junit.jupiter.api.Test;
17+
import org.junit.jupiter.params.ParameterizedTest;
18+
import org.junit.jupiter.params.provider.Arguments;
19+
import org.junit.jupiter.params.provider.MethodSource;
1520

1621
import static org.assertj.core.api.Assertions.assertThat;
1722
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -41,19 +46,43 @@ void testIntersection_UpstreamMinVersionLessThanApiKeys() {
4146
assertThatApiVersionsContainsExactly(upstreamApiVersions, ApiKeys.METADATA, ApiKeys.METADATA.oldestVersion(), ApiKeys.METADATA.latestVersion());
4247
}
4348

44-
@Test
45-
void shouldMarkProduceRequestV0AsSupported() {
49+
public static Stream<Arguments> shouldMarkProduceRequestV0AsSupportedIfVersionAtOrBelowLatestSupported() {
50+
return IntStream.rangeClosed(ApiKeys.PRODUCE.messageType.lowestDeprecatedVersion(), ApiKeys.PRODUCE.messageType.lowestSupportedVersion())
51+
.mapToObj(i -> Arguments.argumentSet("api version " + i, i));
52+
}
53+
54+
@MethodSource
55+
@ParameterizedTest
56+
void shouldMarkProduceRequestV0AsSupportedIfVersionAtOrBelowLatestSupported(int upstreamMinVersion) {
4657
// Given
47-
short oldestProduceRequest = ApiKeys.PRODUCE.messageType.lowestDeprecatedVersion();
48-
ApiVersionsResponseData upstreamApiVersions = createApiVersionsWith(ApiKeys.PRODUCE.id, oldestProduceRequest, ApiKeys.PRODUCE.latestVersion());
58+
ApiVersionsResponseData upstreamApiVersions = createApiVersionsWith(ApiKeys.PRODUCE.id, (short) upstreamMinVersion, ApiKeys.PRODUCE.latestVersion());
4959

5060
// When
5161
apiVersionsService.updateVersions("channel", upstreamApiVersions);
5262

5363
// Then
64+
short oldestProduceRequest = ApiKeys.PRODUCE.messageType.lowestDeprecatedVersion();
5465
assertThatApiVersionsContainsExactly(upstreamApiVersions, ApiKeys.PRODUCE, oldestProduceRequest, ApiKeys.PRODUCE.latestVersion());
5566
}
5667

68+
public static Stream<Arguments> shouldNotAlterProduceRequestVersionsMinVersionIfUpstreamMoreRestrictive() {
69+
return IntStream.rangeClosed(ApiKeys.PRODUCE.messageType.lowestSupportedVersion() + 1, ApiKeys.PRODUCE.messageType.highestSupportedVersion(true))
70+
.mapToObj(i -> Arguments.argumentSet("api version " + i, i));
71+
}
72+
73+
@MethodSource
74+
@ParameterizedTest
75+
void shouldNotAlterProduceRequestVersionsMinVersionIfUpstreamMoreRestrictive(int upstreamMinVersion) {
76+
// Given
77+
ApiVersionsResponseData upstreamApiVersions = createApiVersionsWith(ApiKeys.PRODUCE.id, (short) upstreamMinVersion, ApiKeys.PRODUCE.latestVersion());
78+
79+
// When
80+
apiVersionsService.updateVersions("channel", upstreamApiVersions);
81+
82+
// Then
83+
assertThatApiVersionsContainsExactly(upstreamApiVersions, ApiKeys.PRODUCE, (short) upstreamMinVersion, ApiKeys.PRODUCE.latestVersion());
84+
}
85+
5786
@Test
5887
void testIntersection_UpstreamMinVersionGreaterThanApiKeys() {
5988
ApiVersionsResponseData upstreamApiVersions = createApiVersionsWith(ApiKeys.METADATA.id, (short) (ApiKeys.METADATA.oldestVersion() + 1),

0 commit comments

Comments
 (0)