Skip to content

Commit 0d420c5

Browse files
Merge remote-tracking branch 'origin/trunk' into KAFKA-19019
2 parents 3d02695 + 42799f7 commit 0d420c5

File tree

194 files changed

+5971
-1736
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

194 files changed

+5971
-1736
lines changed

.github/scripts/pr-format.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,10 @@ def check(positive_assertion, ok_msg, err_msg):
162162
new_lines.append(textwrap.fill(line, width=72, break_long_words=False, break_on_hyphens=False, replace_whitespace=False))
163163
rewrapped_p = "\n".join(new_lines)
164164
else:
165-
rewrapped_p = textwrap.fill("".join(p), width=72, break_long_words=False, break_on_hyphens=False, replace_whitespace=True)
165+
indent = ""
166+
if len(p) > 0 and p[0].startswith("Reviewers:"):
167+
indent = " "
168+
rewrapped_p = textwrap.fill("".join(p), subsequent_indent=indent, width=72, break_long_words=False, break_on_hyphens=False, replace_whitespace=True)
166169
new_paragraphs.append(rewrapped_p + "\n")
167170
body = "\n".join(new_paragraphs)
168171

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ Using compiled files:
109109

110110
Using docker image:
111111

112-
docker run -p 9092:9092 apache/kafka:3.7.0
112+
docker run -p 9092:9092 apache/kafka:latest
113113

114114
### Cleaning the build ###
115115
./gradlew clean

bin/kafka-streams-groups.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.streams.StreamsGroupCommand "$@"

checkstyle/suppressions.xml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
106106

107107
<suppress checks="NPathComplexity"
108-
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
108+
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell|MockConsumer).java"/>
109109

110110
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
111111
files="CoordinatorClient.java"/>
@@ -168,6 +168,9 @@
168168
<suppress checks="NPathComplexity"
169169
files="(DistributedHerder|AbstractHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
170170

171+
<suppress checks="ClassFanOutComplexity"
172+
files="ShareConsumerTest.java"/>
173+
171174
<!-- connect tests-->
172175
<suppress checks="ClassDataAbstractionCoupling"
173176
files="(DistributedHerder|KafkaBasedLog|WorkerSourceTaskWithTopicCreation|WorkerSourceTask)Test.java"/>

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 419 additions & 20 deletions
Large diffs are not rendered by default.

clients/clients-integration-tests/src/test/java/org/apache/kafka/server/quota/CustomQuotaCallbackTest.java

Lines changed: 74 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,39 +19,44 @@
1919
import org.apache.kafka.clients.admin.Admin;
2020
import org.apache.kafka.clients.admin.NewTopic;
2121
import org.apache.kafka.common.Cluster;
22+
import org.apache.kafka.common.MetricName;
23+
import org.apache.kafka.common.metrics.Gauge;
24+
import org.apache.kafka.common.metrics.Metrics;
25+
import org.apache.kafka.common.metrics.Monitorable;
26+
import org.apache.kafka.common.metrics.PluginMetrics;
2227
import org.apache.kafka.common.security.auth.KafkaPrincipal;
2328
import org.apache.kafka.common.test.ClusterInstance;
2429
import org.apache.kafka.common.test.TestUtils;
2530
import org.apache.kafka.common.test.api.ClusterConfigProperty;
2631
import org.apache.kafka.common.test.api.ClusterTest;
27-
import org.apache.kafka.common.test.api.ClusterTestDefaults;
2832
import org.apache.kafka.common.test.api.Type;
2933
import org.apache.kafka.server.config.QuotaConfig;
3034

35+
import java.util.LinkedHashMap;
3136
import java.util.List;
3237
import java.util.Map;
3338
import java.util.concurrent.ConcurrentHashMap;
3439
import java.util.concurrent.atomic.AtomicInteger;
3540

36-
@ClusterTestDefaults(controllers = 3,
37-
types = {Type.KRAFT},
38-
serverProperties = {
39-
@ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
40-
@ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
41-
@ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
42-
}
43-
)
44-
public class CustomQuotaCallbackTest {
41+
import static org.junit.jupiter.api.Assertions.assertEquals;
4542

46-
private final ClusterInstance cluster;
43+
public class CustomQuotaCallbackTest {
4744

48-
public CustomQuotaCallbackTest(ClusterInstance clusterInstance) {
49-
this.cluster = clusterInstance;
45+
private static int controllerId(Type type) {
46+
return type == Type.KRAFT ? 3000 : 0;
5047
}
5148

52-
@ClusterTest
53-
public void testCustomQuotaCallbackWithControllerServer() throws InterruptedException {
54-
49+
@ClusterTest(
50+
controllers = 3,
51+
types = {Type.KRAFT},
52+
serverProperties = {
53+
@ClusterConfigProperty(id = 3000, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
54+
@ClusterConfigProperty(id = 3001, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
55+
@ClusterConfigProperty(id = 3002, key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$CustomQuotaCallback"),
56+
}
57+
)
58+
public void testCustomQuotaCallbackWithControllerServer(ClusterInstance cluster) throws InterruptedException {
59+
5560
try (Admin admin = cluster.admin(Map.of())) {
5661
admin.createTopics(List.of(new NewTopic("topic", 1, (short) 1)));
5762
TestUtils.waitForCondition(
@@ -69,10 +74,49 @@ public void testCustomQuotaCallbackWithControllerServer() throws InterruptedExce
6974
&& CustomQuotaCallback.COUNTERS.values().stream().allMatch(counter -> counter.get() > 0),
7075
"The CustomQuotaCallback not triggered in all controllers. "
7176
);
72-
77+
78+
}
79+
}
80+
81+
@ClusterTest(
82+
types = {Type.CO_KRAFT, Type.KRAFT},
83+
serverProperties = {
84+
@ClusterConfigProperty(key = QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG, value = "org.apache.kafka.server.quota.CustomQuotaCallbackTest$MonitorableCustomQuotaCallback"),
7385
}
86+
)
87+
public void testMonitorableCustomQuotaCallbackWithCombinedMode(ClusterInstance cluster) {
88+
assertMetrics(
89+
cluster.brokers().get(0).metrics(),
90+
expectedTags(Map.of("role", "broker"))
91+
);
92+
assertMetrics(
93+
cluster.controllers().get(controllerId(cluster.type())).metrics(),
94+
expectedTags(Map.of("role", "controller"))
95+
);
7496
}
7597

98+
private void assertMetrics(Metrics metrics, Map<String, String> expectedTags) {
99+
int found = 0;
100+
for (MetricName metricName : metrics.metrics().keySet()) {
101+
if (metricName.group().equals("plugins")) {
102+
Map<String, String> tags = metricName.tags();
103+
if (expectedTags.equals(tags)) {
104+
assertEquals(MonitorableCustomQuotaCallback.METRIC_NAME, metricName.name());
105+
assertEquals(MonitorableCustomQuotaCallback.METRIC_DESCRIPTION, metricName.description());
106+
found++;
107+
}
108+
}
109+
}
110+
assertEquals(1, found);
111+
}
112+
113+
private static Map<String, String> expectedTags(Map<String, String> extraTags) {
114+
Map<String, String> tags = new LinkedHashMap<>();
115+
tags.put("config", QuotaConfig.CLIENT_QUOTA_CALLBACK_CLASS_CONFIG);
116+
tags.put("class", MonitorableCustomQuotaCallback.class.getSimpleName());
117+
tags.putAll(extraTags);
118+
return tags;
119+
}
76120

77121
public static class CustomQuotaCallback implements ClientQuotaCallback {
78122

@@ -121,4 +165,17 @@ public void configure(Map<String, ?> configs) {
121165
}
122166

123167
}
168+
169+
public static class MonitorableCustomQuotaCallback extends CustomQuotaCallback implements Monitorable {
170+
171+
private static final String METRIC_NAME = "monitorable-custom-quota-callback-name";
172+
private static final String METRIC_DESCRIPTION = "monitorable-custom-quota-callback-description";
173+
174+
@Override
175+
public void withPluginMetrics(PluginMetrics metrics) {
176+
MetricName metricName = metrics.metricName(METRIC_NAME, METRIC_DESCRIPTION, Map.of());
177+
metrics.addMetric(metricName, (Gauge<Integer>) (config, now) -> 1);
178+
}
179+
180+
}
124181
}

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1947,13 +1947,28 @@ default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareG
19471947
}
19481948

19491949
/**
1950-
* Delete share groups from the cluster with the default options.
1950+
* Delete offsets for a set of partitions in a share group.
19511951
*
1952-
* @param groupIds Collection of share group ids which are to be deleted.
1953-
* @return The DeleteShareGroupsResult.
1952+
* @param groupId The group for which to delete offsets.
1953+
* @param partitions The topic-partitions.
1954+
* @param options The options to use when deleting offsets in a share group.
1955+
* @return The DeleteShareGroupOffsetsResult.
19541956
*/
1955-
default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
1956-
return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
1957+
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options);
1958+
1959+
/**
1960+
* Delete offsets for a set of partitions in a share group with the default options.
1961+
*
1962+
* <p>
1963+
* This is a convenience method for {@link #deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with default options.
1964+
* See the overload for more details.
1965+
*
1966+
* @param groupId The group for which to delete offsets.
1967+
* @param partitions The topic-partitions.
1968+
* @return The DeleteShareGroupOffsetsResult.
1969+
*/
1970+
default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
1971+
return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
19571972
}
19581973

19591974
/**
@@ -1965,6 +1980,16 @@ default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
19651980
*/
19661981
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);
19671982

1983+
/**
1984+
* Delete share groups from the cluster with the default options.
1985+
*
1986+
* @param groupIds Collection of share group ids which are to be deleted.
1987+
* @return The DeleteShareGroupsResult.
1988+
*/
1989+
default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
1990+
return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
1991+
}
1992+
19681993
/**
19691994
* Describe streams groups in the cluster.
19701995
*

server-common/src/main/java/org/apache/kafka/server/purgatory/GroupSyncKey.java renamed to clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsOptions.java

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,36 +14,18 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.kafka.server.purgatory;
1817

19-
import java.util.Objects;
18+
package org.apache.kafka.clients.admin;
2019

21-
/**
22-
* Used by delayed-sync operations
23-
*/
24-
public class GroupSyncKey implements DelayedOperationKey {
25-
26-
private final String groupId;
27-
28-
public GroupSyncKey(String groupId) {
29-
this.groupId = groupId;
30-
}
20+
import org.apache.kafka.common.annotation.InterfaceStability;
3121

32-
@Override
33-
public String keyLabel() {
34-
return "sync-" + groupId;
35-
}
22+
import java.util.Set;
3623

37-
@Override
38-
public boolean equals(Object o) {
39-
if (this == o) return true;
40-
if (o == null || getClass() != o.getClass()) return false;
41-
GroupSyncKey that = (GroupSyncKey) o;
42-
return Objects.equals(groupId, that.groupId);
43-
}
44-
45-
@Override
46-
public int hashCode() {
47-
return Objects.hash(groupId);
48-
}
24+
/**
25+
* Options for the {@link Admin#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call.
26+
* <p>
27+
* The API of this class is evolving, see {@link Admin} for details.
28+
*/
29+
@InterfaceStability.Evolving
30+
public class DeleteShareGroupOffsetsOptions extends AbstractOptions<DeleteShareGroupOffsetsOptions> {
4931
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.common.KafkaFuture;
20+
import org.apache.kafka.common.TopicPartition;
21+
import org.apache.kafka.common.annotation.InterfaceStability;
22+
import org.apache.kafka.common.errors.ApiException;
23+
import org.apache.kafka.common.internals.KafkaFutureImpl;
24+
25+
import java.util.Map;
26+
import java.util.Set;
27+
28+
/**
29+
* The result of the {@link Admin#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call.
30+
* <p>
31+
* The API of this class is evolving, see {@link Admin} for details.
32+
*/
33+
@InterfaceStability.Evolving
34+
public class DeleteShareGroupOffsetsResult {
35+
36+
private final KafkaFuture<Map<TopicPartition, ApiException>> future;
37+
private final Set<TopicPartition> partitions;
38+
39+
DeleteShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, ApiException>> future, Set<TopicPartition> partitions) {
40+
this.future = future;
41+
this.partitions = partitions;
42+
}
43+
44+
/**
45+
* Return a future which succeeds only if all the deletions succeed.
46+
* If not, the first partition error shall be returned.
47+
*/
48+
public KafkaFuture<Void> all() {
49+
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
50+
51+
this.future.whenComplete((topicPartitions, throwable) -> {
52+
if (throwable != null) {
53+
result.completeExceptionally(throwable);
54+
} else {
55+
for (TopicPartition partition : partitions) {
56+
if (maybeCompleteExceptionally(topicPartitions, partition, result)) {
57+
return;
58+
}
59+
}
60+
result.complete(null);
61+
}
62+
});
63+
return result;
64+
}
65+
66+
/**
67+
* Return a future which can be used to check the result for a given partition.
68+
*/
69+
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
70+
if (!partitions.contains(partition)) {
71+
throw new IllegalArgumentException("Partition " + partition + " was not included in the original request");
72+
}
73+
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
74+
75+
this.future.whenComplete((topicPartitions, throwable) -> {
76+
if (throwable != null) {
77+
result.completeExceptionally(throwable);
78+
} else if (!maybeCompleteExceptionally(topicPartitions, partition, result)) {
79+
result.complete(null);
80+
}
81+
});
82+
return result;
83+
}
84+
85+
private boolean maybeCompleteExceptionally(Map<TopicPartition, ApiException> partitionLevelErrors,
86+
TopicPartition partition,
87+
KafkaFutureImpl<Void> result) {
88+
Throwable exception;
89+
if (!partitionLevelErrors.containsKey(partition)) {
90+
exception = new IllegalArgumentException("Offset deletion result for partition \"" + partition + "\" was not included in the response");
91+
} else {
92+
exception = partitionLevelErrors.get(partition);
93+
}
94+
95+
if (exception != null) {
96+
result.completeExceptionally(exception);
97+
return true;
98+
} else {
99+
return false;
100+
}
101+
}
102+
}

0 commit comments

Comments
 (0)