Skip to content

Commit 8208eb4

Browse files
committed
Changes to support Apache Kafka 4.0
* Also upgrades Kroxylicious Junit Ext to 0.11.0 Signed-off-by: Keith Wall <kwall@apache.org>
1 parent 00214c2 commit 8208eb4

File tree

26 files changed

+238
-230
lines changed

26 files changed

+238
-230
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Format `<github issue/pr number>: <short description>`.
77

88
## SNAPSHOT
99

10+
* [#1855](https://github.com/kroxylicious/kroxylicious/issues/1855) Upgrade to Apache Kafka 4.0
1011
* [#1928](https://github.com/kroxylicious/kroxylicious/pull/1928) Make Kroxylicious Operator metrics available for collection
1112

1213
## 0.11.0

kroxylicious-api/pom.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,15 @@
162162
<breakBuildBasedOnSemanticVersioningForMajorVersionZero>${ApiCompatability.EnforceForMajorVersionZero}</breakBuildBasedOnSemanticVersioningForMajorVersionZero>
163163
<excludes>
164164
<exclude>io.kroxylicious.proxy.filter.FilterFactoryContext#eventLoop()</exclude> <!-- https://github.com/kroxylicious/kroxylicious/issues/1380 scheduled removal of deprecated API method -->
165+
<!-- The following filter exclusions relate to RPCs that were removed at Kafka 4.0 -->
166+
<exclude>io.kroxylicious.proxy.filter.ControlledShutdownRequestFilter</exclude>
167+
<exclude>io.kroxylicious.proxy.filter.ControlledShutdownResponseFilter</exclude>
168+
<excludes>io.kroxylicious.proxy.filter.LeaderAndIsrRequestFilter</excludes>
169+
<excludes>io.kroxylicious.proxy.filter.LeaderAndIsrResponseFilter</excludes>
170+
<excludes>io.kroxylicious.proxy.filter.StopReplicaRequestFilter</excludes>
171+
<excludes>io.kroxylicious.proxy.filter.StopReplicaResponseFilter</excludes>
172+
<excludes>io.kroxylicious.proxy.filter.UpdateMetadataRequestFilter</excludes>
173+
<excludes>io.kroxylicious.proxy.filter.UpdateMetadataResponseFilter</excludes>
165174
</excludes>
166175
<!-- see documentation -->
167176
</parameter>

kroxylicious-filter-test-support/src/main/java/io/kroxylicious/test/RequestFactory.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.Set;
1616
import java.util.function.Consumer;
1717
import java.util.function.Function;
18+
import java.util.function.Predicate;
1819
import java.util.stream.Stream;
1920

2021
import org.apache.kafka.common.Uuid;
@@ -42,7 +43,6 @@
4243
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
4344
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
4445
import org.apache.kafka.common.message.ShareGroupDescribeRequestData;
45-
import org.apache.kafka.common.message.UpdateMetadataRequestData;
4646
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
4747
import org.apache.kafka.common.protocol.ApiKeys;
4848
import org.apache.kafka.common.protocol.ApiMessage;
@@ -66,6 +66,11 @@ public class RequestFactory {
6666
ApiKeys.ALTER_REPLICA_LOG_DIRS, ApiKeys.CREATE_PARTITIONS, ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS,
6767
ApiKeys.ALTER_USER_SCRAM_CREDENTIALS, ApiKeys.DESCRIBE_PRODUCERS, ApiKeys.DESCRIBE_TRANSACTIONS, ApiKeys.DESCRIBE_TOPIC_PARTITIONS);
6868

69+
/** The following API keys are no longer used by Kafka. They were removed by Kafka 4.0. */
70+
@VisibleForTesting
71+
protected static final EnumSet<ApiKeys> REMOVED_API_KEYS = EnumSet.of(ApiKeys.CONTROLLED_SHUTDOWN, ApiKeys.LEADER_AND_ISR, ApiKeys.STOP_REPLICA,
72+
ApiKeys.UPDATE_METADATA);
73+
6974
@VisibleForTesting
7075
protected static final Map<ApiKeys, Consumer<ApiMessage>> messagePopulators = new EnumMap<>(ApiKeys.class);
7176

@@ -74,7 +79,6 @@ public class RequestFactory {
7479
messagePopulators.put(ApiKeys.LIST_OFFSETS, RequestFactory::populateListOffsetsRequest);
7580
messagePopulators.put(ApiKeys.OFFSET_FETCH, RequestFactory::populateOffsetFetchRequest);
7681
messagePopulators.put(ApiKeys.METADATA, RequestFactory::populateMetadataRequest);
77-
messagePopulators.put(ApiKeys.UPDATE_METADATA, RequestFactory::populateUpdateMetadataRequest);
7882
messagePopulators.put(ApiKeys.LEAVE_GROUP, RequestFactory::populateLeaveGroupRequest);
7983
messagePopulators.put(ApiKeys.DESCRIBE_GROUPS, RequestFactory::populateDescribeGroupsRequest);
8084
messagePopulators.put(ApiKeys.CONSUMER_GROUP_DESCRIBE, RequestFactory::populateConsumeGroupDescribeRequest);
@@ -100,7 +104,14 @@ private RequestFactory() {
100104
}
101105

102106
public static Stream<ApiMessageVersion> apiMessageFor(Function<ApiKeys, Short> versionFunction) {
103-
return apiMessageFor(versionFunction, EnumSet.complementOf(RequestFactory.SPECIAL_CASES));
107+
return apiMessageFor(versionFunction, supportedApiKeys());
108+
}
109+
110+
@NonNull
111+
public static Set<ApiKeys> supportedApiKeys() {
112+
var excluded = RequestFactory.SPECIAL_CASES;
113+
excluded.addAll(REMOVED_API_KEYS);
114+
return EnumSet.complementOf(excluded);
104115
}
105116

106117
public static Stream<ApiMessageVersion> apiMessageFor(Function<ApiKeys, Short> versionFunction, ApiKeys... apiKeys) {
@@ -115,6 +126,7 @@ public static Stream<ApiMessageVersion> apiMessageFor(Function<ApiKeys, Short> v
115126
public static Stream<ApiMessageVersion> apiMessageFor(Function<ApiKeys, Short> versionFunction, Set<ApiKeys> apiKeys) {
116127
return Stream.of(apiKeys)
117128
.flatMap(Collection::stream)
129+
.filter(Predicate.not(x -> x.messageType.requestSchemas().length == 0 && x.messageType.responseSchemas().length == 0)) // TOOD is there a better way?
118130
.map(apiKey -> {
119131
final ApiMessage apiMessage = apiMessageForApiKey(apiKey);
120132
final Short apiVersion = versionFunction.apply(apiKey);
@@ -176,13 +188,6 @@ private static void populateMetadataRequest(ApiMessage apiMessage) {
176188
metadataRequestData.setTopics(List.of(t1));
177189
}
178190

179-
private static void populateUpdateMetadataRequest(ApiMessage apiMessage) {
180-
final UpdateMetadataRequestData updateMetadataRequestData = (UpdateMetadataRequestData) apiMessage;
181-
final UpdateMetadataRequestData.UpdateMetadataTopicState t1 = new UpdateMetadataRequestData.UpdateMetadataTopicState();
182-
t1.setTopicId(Uuid.randomUuid());
183-
updateMetadataRequestData.setTopicStates(List.of(t1));
184-
}
185-
186191
private static void populateLeaveGroupRequest(ApiMessage apiMessage) {
187192
final LeaveGroupRequestData leaveGroupRequestData = (LeaveGroupRequestData) apiMessage;
188193
final LeaveGroupRequestData.MemberIdentity memberIdentity = new LeaveGroupRequestData.MemberIdentity();

kroxylicious-filter-test-support/src/main/java/io/kroxylicious/test/record/RecordTestUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import java.util.HexFormat;
1414
import java.util.List;
1515

16-
import org.apache.kafka.common.TopicPartition;
1716
import org.apache.kafka.common.compress.Compression;
1817
import org.apache.kafka.common.header.Header;
1918
import org.apache.kafka.common.record.ControlRecordType;
@@ -507,7 +506,7 @@ public static MemoryRecords memoryRecordsWithAllRecordsRemoved(long baseOffset)
507506
memoryRecordsBuilder.append(DEFAULT_TIMESTAMP, new byte[]{ 1, 2, 3 }, new byte[]{ 1, 2, 3 });
508507
MemoryRecords records = memoryRecordsBuilder.build();
509508
ByteBuffer output = ByteBuffer.allocate(1024);
510-
records.filterTo(new TopicPartition("any", 1), new MemoryRecords.RecordFilter(DEFAULT_TIMESTAMP, 0L) {
509+
MemoryRecords.RecordFilter recordFilter = new MemoryRecords.RecordFilter(DEFAULT_TIMESTAMP, 0L) {
511510
@Override
512511
protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
513512
return new BatchRetentionResult(BatchRetention.RETAIN_EMPTY, false);
@@ -517,7 +516,8 @@ protected BatchRetentionResult checkBatchRetention(RecordBatch batch) {
517516
protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
518517
return false;
519518
}
520-
}, output, 1, BufferSupplier.NO_CACHING);
519+
};
520+
records.filterTo(recordFilter, output, BufferSupplier.NO_CACHING);
521521

522522
output.flip();
523523
return MemoryRecords.readableRecords(output);

kroxylicious-filter-test-support/src/test/java/io/kroxylicious/test/RequestFactoryTest.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ class RequestFactoryTest {
2727
@Test
2828
void shouldGenerateRequestForEveryApiKey() {
2929
// Given
30-
final EnumSet<ApiKeys> expectedKeys = EnumSet.complementOf(RequestFactory.SPECIAL_CASES);
30+
var expectedKeys = RequestFactory.supportedApiKeys();
31+
var absentKeys = EnumSet.complementOf(EnumSet.copyOf(expectedKeys));
3132

3233
// When
3334
final Stream<RequestFactory.ApiMessageVersion> apiMessages = RequestFactory.apiMessageFor(ApiKeys::latestVersion);
@@ -39,7 +40,7 @@ void shouldGenerateRequestForEveryApiKey() {
3940
RequestFactory.ApiMessageVersion::apiMessage));
4041
assertThat(allMessages).hasSameSizeAs(expectedKeys)
4142
.allSatisfy((apiKeys, apiMessage) -> {
42-
assertThat(apiKeys).isNotIn(RequestFactory.SPECIAL_CASES);
43+
assertThat(apiKeys).isNotIn(absentKeys);
4344
assertThat(apiMessages).isNotNull();
4445
});
4546
}
@@ -73,4 +74,19 @@ void shouldThrowExceptionIfSpecialCaseRequested(ApiKeys apiKey) {
7374
static Stream<ApiKeys> specialCases() {
7475
return RequestFactory.SPECIAL_CASES.stream();
7576
}
77+
78+
@ParameterizedTest
79+
@MethodSource("removedApiKeys")
80+
void shouldThrowExceptionIfRemovedCaseRequested(ApiKeys apiKey) {
81+
// Given
82+
83+
// When
84+
assertThatThrownBy(() -> RequestFactory.apiMessageFor(ApiKeys::latestVersion, apiKey)).isInstanceOf(IllegalArgumentException.class);
85+
86+
// Then
87+
}
88+
89+
static Stream<ApiKeys> removedApiKeys() {
90+
return RequestFactory.REMOVED_API_KEYS.stream();
91+
}
7692
}

kroxylicious-filters/kroxylicious-multitenant/src/main/java/io/kroxylicious/proxy/filter/multitenant/MultiTenantFilter.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
1717
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
1818
import org.apache.kafka.common.message.ApiVersionsResponseData;
19+
import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData;
20+
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
1921
import org.apache.kafka.common.message.CreateTopicsRequestData;
2022
import org.apache.kafka.common.message.CreateTopicsResponseData;
2123
import org.apache.kafka.common.message.DeleteTopicsRequestData;
@@ -60,6 +62,8 @@
6062
import io.kroxylicious.proxy.filter.AddPartitionsToTxnRequestFilter;
6163
import io.kroxylicious.proxy.filter.AddPartitionsToTxnResponseFilter;
6264
import io.kroxylicious.proxy.filter.ApiVersionsResponseFilter;
65+
import io.kroxylicious.proxy.filter.ConsumerGroupDescribeRequestFilter;
66+
import io.kroxylicious.proxy.filter.ConsumerGroupDescribeResponseFilter;
6367
import io.kroxylicious.proxy.filter.CreateTopicsRequestFilter;
6468
import io.kroxylicious.proxy.filter.CreateTopicsResponseFilter;
6569
import io.kroxylicious.proxy.filter.DeleteTopicsRequestFilter;
@@ -125,6 +129,7 @@ class MultiTenantFilter
125129
OffsetDeleteRequestFilter, OffsetDeleteResponseFilter,
126130
OffsetForLeaderEpochRequestFilter, OffsetForLeaderEpochResponseFilter,
127131
FindCoordinatorRequestFilter, FindCoordinatorResponseFilter,
132+
ConsumerGroupDescribeRequestFilter, ConsumerGroupDescribeResponseFilter,
128133
ListGroupsResponseFilter,
129134
JoinGroupRequestFilter,
130135
SyncGroupRequestFilter,
@@ -324,6 +329,20 @@ public CompletionStage<ResponseFilterResult> onFindCoordinatorResponse(short api
324329
return context.forwardResponse(header, response);
325330
}
326331

332+
@Override
333+
public CompletionStage<RequestFilterResult> onConsumerGroupDescribeRequest(short apiVersion, RequestHeaderData header, ConsumerGroupDescribeRequestData request,
334+
FilterContext context) {
335+
request.setGroupIds(request.groupIds().stream().map(group -> applyTenantPrefix(context, group)).toList());
336+
return context.forwardRequest(header, request);
337+
}
338+
339+
@Override
340+
public CompletionStage<ResponseFilterResult> onConsumerGroupDescribeResponse(short apiVersion, ResponseHeaderData header, ConsumerGroupDescribeResponseData response,
341+
FilterContext context) {
342+
response.groups().forEach(group -> removeTenantPrefix(context, group::groupId, group::setGroupId, false));
343+
return context.forwardResponse(header, response);
344+
}
345+
327346
@Override
328347
public CompletionStage<ResponseFilterResult> onListGroupsResponse(short apiVersion, ResponseHeaderData header, ListGroupsResponseData response,
329348
FilterContext context) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#
2+
# Copyright Kroxylicious Authors.
3+
#
4+
# Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
#
6+
7+
- apiMessageType: CONSUMER_GROUP_DESCRIBE
8+
version: 1
9+
request:
10+
payload:
11+
groupIds:
12+
- mygroup
13+
includeAuthorizedOperations: false
14+
diff:
15+
- op: replace
16+
path: "/groupIds/0"
17+
value: tenant1-mygroup
18+
response:
19+
payload:
20+
groups:
21+
- groupId: tenant1-mygroup
22+
groupState: ""
23+
groupEpoch: 0
24+
assignmentEpoch: 0
25+
assignorName: ""
26+
members: []
27+
authorizedOperations: 0
28+
errorCode: 0
29+
errorMessage: ""
30+
throttleTimeMs: 0
31+
diff:
32+
- op: replace
33+
path: "/groups/0/groupId"
34+
value: mygroup
35+
disabled: false

kroxylicious-integration-tests/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@
196196
<artifactId>junit-jupiter-params</artifactId>
197197
<scope>test</scope>
198198
</dependency>
199+
<dependency>
200+
<groupId>org.junit-pioneer</groupId>
201+
<artifactId>junit-pioneer</artifactId>
202+
<scope>test</scope>
203+
</dependency>
199204
<dependency>
200205
<groupId>org.apache.kafka</groupId>
201206
<artifactId>kafka_2.13</artifactId>

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/ExpositionIT.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import io.kroxylicious.testing.kafka.common.BrokerCluster;
6363
import io.kroxylicious.testing.kafka.common.KeytoolCertificateGenerator;
6464
import io.kroxylicious.testing.kafka.common.SaslMechanism;
65-
import io.kroxylicious.testing.kafka.common.ZooKeeperCluster;
6665
import io.kroxylicious.testing.kafka.junit5ext.KafkaClusterExtension;
6766

6867
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -638,25 +637,25 @@ void targetClusterDynamicallyAddsBroker(@BrokerCluster KafkaCluster cluster) thr
638637
}
639638
}
640639

641-
// we currently cannot influence the node ids, so we start a 2 node cluster and shutdown node 0
642-
// cannot use KRaft as node 0 is a controller
640+
// Test extension does not allow us to influence the node ids, so we start a 3 node cluster
641+
// and shutdown node 1, leaving us with nodes 0 (controller/broker) and 2 (broker).
643642
@Test
644-
void canConfigurePortIdentifiesNodeWithRange(@ZooKeeperCluster @BrokerCluster(numBrokers = 2) KafkaCluster cluster, Admin admin) throws Exception {
645-
cluster.removeBroker(0);
643+
void canConfigurePortIdentifiesNodeWithRanges(@BrokerCluster(numBrokers = 3) KafkaCluster cluster, Admin admin) throws Exception {
644+
cluster.removeBroker(1);
646645
await().atMost(Duration.ofSeconds(5)).until(() -> admin.describeCluster().nodes().get(),
647-
n -> n.size() == 1 && n.iterator().next().id() == 1);
646+
n -> n.stream().map(Node::id).collect(Collectors.toSet()).equals(Set.of(0, 2)));
648647
var builder = new ConfigurationBuilder()
649648
.addToVirtualClusters(KroxyliciousConfigUtils.baseVirtualClusterBuilder(cluster, "demo")
650649
.addToGateways(defaultGatewayBuilder()
651650
.withNewPortIdentifiesNode()
652651
.withBootstrapAddress(PROXY_ADDRESS)
653-
.withNodeIdRanges(new NamedRange("myrange", 1, 2))
652+
.withNodeIdRanges(new NamedRange("range1", 0, 0), new NamedRange("range2", 2, 2))
654653
.endPortIdentifiesNode()
655654
.build())
656655
.build());
657656

658657
try (var tester = kroxyliciousTester(builder)) {
659-
assertThat(cluster.getNumOfBrokers()).isEqualTo(1);
658+
assertThat(cluster.getNumOfBrokers()).isEqualTo(2);
660659
verifyAllBrokersAvailableViaProxy(tester, cluster);
661660
}
662661
}

kroxylicious-integration-tests/src/test/java/io/kroxylicious/proxy/filter/oauthbearer/OauthBearerValidationIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.junit.jupiter.api.condition.EnabledIf;
3333
import org.junit.jupiter.api.extension.ExtendWith;
3434
import org.junit.jupiter.api.io.TempDir;
35+
import org.junitpioneer.jupiter.RestoreSystemProperties;
3536
import org.testcontainers.DockerClientFactory;
3637
import org.testcontainers.containers.GenericContainer;
3738
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
@@ -64,6 +65,7 @@
6465
@ExtendWith(KafkaClusterExtension.class)
6566
@ExtendWith(NettyLeakDetectorExtension.class)
6667
@EnabledIf(value = "isDockerAvailable", disabledReason = "docker unavailable")
68+
@RestoreSystemProperties
6769
class OauthBearerValidationIT {
6870

6971
private static final DockerImageName DOCKER_IMAGE_NAME = DockerImageName.parse("ghcr.io/navikt/mock-oauth2-server:2.1.10");
@@ -83,6 +85,7 @@ class OauthBearerValidationIT {
8385
private static final Predicate<SimpleMetric> DOWNSTREAM_SASL_AUTHENTICATE_PREDICATE = m -> m.name().equals(KROXYLICIOUS_PAYLOAD_SIZE_BYTES_COUNT_METRIC)
8486
&& m.labels().entrySet().containsAll(
8587
Map.of("ApiKey", "SASL_AUTHENTICATE", "flowing", "downstream").entrySet());
88+
private static final String ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG = "org.apache.kafka.sasl.oauthbearer.allowed.urls";
8689
@SaslMechanism(value = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM)
8790
@BrokerConfig(name = "listener.name.external.sasl.oauthbearer.jwks.endpoint.url", value = JWKS_ENDPOINT_URL)
8891
@BrokerConfig(name = "listener.name.external.sasl.oauthbearer.expected.audience", value = EXPECTED_AUDIENCE)
@@ -92,6 +95,9 @@ class OauthBearerValidationIT {
9295

9396
@BeforeAll
9497
public static void beforeAll() {
98+
// required to permit Kafka to interact with our endpoint.
99+
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, JWKS_ENDPOINT_URL + "," + TOKEN_ENDPOINT_URL);
100+
95101
oauthServer = new OauthServerContainer(OauthBearerValidationIT.DOCKER_IMAGE_NAME);
96102
oauthServer.setWaitStrategy(new LogMessageWaitStrategy().withRegEx(".*started server on address.*"));
97103
oauthServer.addFixedExposedPort(OAUTH_SERVER_PORT, OAUTH_SERVER_PORT);
@@ -157,6 +163,8 @@ void authWithBadToken(@TempDir Path tempdir) throws Exception {
157163
Files.writeString(badTokenFile, BAD_TOKEN);
158164
var config = getClientConfig(badTokenFile.toUri());
159165

166+
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, JWKS_ENDPOINT_URL + "," + badTokenFile.toUri());
167+
160168
try (var tester = kroxyliciousTester(getConfiguredProxyBuilder());
161169
var admin = tester.admin(config);
162170
var ahc = tester.getManagementClient()) {

0 commit comments

Comments
 (0)