Skip to content

Commit e018314

Browse files
committed
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA‑19614
2 parents 23f3c93 + d6688f8 commit e018314

File tree

265 files changed

+4405
-3321
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

265 files changed

+4405
-3321
lines changed

.github/workflows/docker_scan.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
strategy:
2727
matrix:
2828
# This is an array of supported tags. Make sure this array only contains the supported tags
29-
supported_image_tag: ['latest', '3.7.2', '3.8.1', '3.9.1', '4.0.0']
29+
supported_image_tag: ['latest', '3.9.1', '4.0.0', '4.1.0']
3030
steps:
3131
- name: Run CVE scan
3232
uses: aquasecurity/trivy-action@6e7b7d1fd3e4fef0c5fa8cce1229c54b2c9bd0d8 # v0.24.0

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientsTestUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public static <K, V> void sendAndAwaitAsyncCommit(
311311
Optional<Map<TopicPartition, OffsetAndMetadata>> offsetsOpt
312312
) throws InterruptedException {
313313

314-
var commitCallback = new RetryCommitCallback(consumer, offsetsOpt);
314+
var commitCallback = new RetryCommitCallback<>(consumer, offsetsOpt);
315315
sendAsyncCommit(consumer, commitCallback, offsetsOpt);
316316

317317
TestUtils.waitForCondition(() -> {

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetricsDuringTopicCreationDeletionTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ private void createAndDeleteTopics() {
137137
}
138138
}
139139

140+
@SuppressWarnings("unchecked")
140141
private Gauge<Integer> getGauge(String metricName) {
141142
return KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
142143
.filter(entry -> entry.getKey().getName().endsWith(metricName))

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ClientTelemetryTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.time.Duration;
4747
import java.util.ArrayList;
4848
import java.util.Arrays;
49+
import java.util.Collection;
4950
import java.util.Collections;
5051
import java.util.HashMap;
5152
import java.util.List;
@@ -54,7 +55,6 @@
5455
import java.util.UUID;
5556
import java.util.concurrent.ExecutionException;
5657
import java.util.stream.Collectors;
57-
import java.util.stream.Stream;
5858

5959
import static java.util.Arrays.asList;
6060
import static org.apache.kafka.clients.admin.AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG;
@@ -128,7 +128,7 @@ public void testIntervalMsParser(ClusterInstance clusterInstance) {
128128
List<String> alterOpts = asList("--bootstrap-server", clusterInstance.bootstrapServers(),
129129
"--alter", "--entity-type", "client-metrics", "--entity-name", "test", "--add-config", "interval.ms=bbb");
130130
try (Admin client = clusterInstance.admin()) {
131-
ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts));
131+
ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(Set.of(alterOpts)));
132132

133133
Throwable e = assertThrows(ExecutionException.class, () -> ConfigCommand.alterConfig(client, addOpts));
134134
assertTrue(e.getMessage().contains(InvalidConfigurationException.class.getSimpleName()));
@@ -152,15 +152,16 @@ public void testMetrics(ClusterInstance clusterInstance) {
152152
}
153153
}
154154

155-
private static String[] toArray(List<String>... lists) {
156-
return Stream.of(lists).flatMap(List::stream).toArray(String[]::new);
155+
private static String[] toArray(Collection<List<String>> lists) {
156+
return lists.stream().flatMap(List::stream).toArray(String[]::new);
157157
}
158158

159159
/**
160160
* We should add a ClientTelemetry into plugins to test the clientInstanceId method Otherwise the
161161
* {@link org.apache.kafka.common.protocol.ApiKeys#GET_TELEMETRY_SUBSCRIPTIONS} command will not be supported
162162
* by the server
163163
**/
164+
@SuppressWarnings("unused")
164165
public static class GetIdClientTelemetry implements ClientTelemetry, MetricsReporter {
165166

166167

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import org.apache.kafka.clients.producer.Producer;
20+
import org.apache.kafka.clients.producer.ProducerRecord;
21+
import org.apache.kafka.common.Node;
22+
import org.apache.kafka.common.TopicPartition;
23+
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
24+
import org.apache.kafka.common.test.ClusterInstance;
25+
import org.apache.kafka.common.test.api.ClusterTest;
26+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
27+
import org.apache.kafka.test.TestUtils;
28+
29+
import org.junit.jupiter.api.BeforeEach;
30+
31+
import java.util.List;
32+
33+
import static org.junit.jupiter.api.Assertions.assertEquals;
34+
import static org.junit.jupiter.api.Assertions.assertFalse;
35+
import static org.junit.jupiter.api.Assertions.assertNotNull;
36+
37+
38+
@ClusterTestDefaults(
39+
brokers = 3
40+
)
41+
class DescribeProducersWithBrokerIdTest {
42+
private static final String TOPIC_NAME = "test-topic";
43+
private static final int NUM_PARTITIONS = 1;
44+
private static final short REPLICATION_FACTOR = 3;
45+
46+
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC_NAME, 0);
47+
48+
private final ClusterInstance clusterInstance;
49+
50+
public DescribeProducersWithBrokerIdTest(ClusterInstance clusterInstance) {
51+
this.clusterInstance = clusterInstance;
52+
}
53+
54+
private static void sendTestRecords(Producer<byte[], byte[]> producer) {
55+
producer.send(new ProducerRecord<>(TOPIC_NAME, TOPIC_PARTITION.partition(), "key-0".getBytes(), "value-0".getBytes()));
56+
producer.flush();
57+
}
58+
59+
@BeforeEach
60+
void setUp() throws InterruptedException {
61+
clusterInstance.createTopic(TOPIC_NAME, NUM_PARTITIONS, REPLICATION_FACTOR);
62+
}
63+
64+
private List<Integer> getReplicaBrokerIds(Admin admin) throws Exception {
65+
var topicDescription = admin.describeTopics(List.of(TOPIC_PARTITION.topic())).allTopicNames().get().get(TOPIC_PARTITION.topic());
66+
return topicDescription.partitions().get(TOPIC_PARTITION.partition()).replicas().stream()
67+
.map(Node::id)
68+
.toList();
69+
}
70+
71+
private int getNonReplicaBrokerId(Admin admin) throws Exception {
72+
var replicaBrokerIds = getReplicaBrokerIds(admin);
73+
return clusterInstance.brokerIds().stream()
74+
.filter(id -> !replicaBrokerIds.contains(id))
75+
.findFirst()
76+
.orElseThrow(() -> new IllegalStateException("No non-replica broker found"));
77+
}
78+
79+
private int getFollowerBrokerId(Admin admin) throws Exception {
80+
var replicaBrokerIds = getReplicaBrokerIds(admin);
81+
var leaderBrokerId = clusterInstance.getLeaderBrokerId(TOPIC_PARTITION);
82+
return replicaBrokerIds.stream()
83+
.filter(id -> id != leaderBrokerId)
84+
.findFirst()
85+
.orElseThrow(() -> new IllegalStateException("No follower found for partition " + TOPIC_PARTITION));
86+
}
87+
88+
@ClusterTest
89+
void testDescribeProducersDefaultRoutesToLeader() throws Exception {
90+
try (Producer<byte[], byte[]> producer = clusterInstance.producer();
91+
var admin = clusterInstance.admin()) {
92+
sendTestRecords(producer);
93+
94+
var stateWithExplicitLeader = admin.describeProducers(
95+
List.of(TOPIC_PARTITION),
96+
new DescribeProducersOptions().brokerId(clusterInstance.getLeaderBrokerId(TOPIC_PARTITION))
97+
).partitionResult(TOPIC_PARTITION).get();
98+
99+
var stateWithDefaultRouting = admin.describeProducers(
100+
List.of(TOPIC_PARTITION)
101+
).partitionResult(TOPIC_PARTITION).get();
102+
103+
assertNotNull(stateWithDefaultRouting);
104+
assertFalse(stateWithDefaultRouting.activeProducers().isEmpty());
105+
assertEquals(stateWithExplicitLeader.activeProducers(), stateWithDefaultRouting.activeProducers());
106+
}
107+
}
108+
109+
@ClusterTest
110+
void testDescribeProducersFromFollower() throws Exception {
111+
try (Producer<byte[], byte[]> producer = clusterInstance.producer();
112+
var admin = clusterInstance.admin()) {
113+
sendTestRecords(producer);
114+
115+
var followerState = admin.describeProducers(
116+
List.of(TOPIC_PARTITION),
117+
new DescribeProducersOptions().brokerId(getFollowerBrokerId(admin))
118+
).partitionResult(TOPIC_PARTITION).get();
119+
120+
var leaderState = admin.describeProducers(
121+
List.of(TOPIC_PARTITION)
122+
).partitionResult(TOPIC_PARTITION).get();
123+
124+
assertNotNull(followerState);
125+
assertFalse(followerState.activeProducers().isEmpty());
126+
assertEquals(leaderState.activeProducers(), followerState.activeProducers());
127+
}
128+
}
129+
130+
@ClusterTest(brokers = 4)
131+
void testDescribeProducersWithInvalidBrokerId() throws Exception {
132+
try (Producer<byte[], byte[]> producer = clusterInstance.producer();
133+
var admin = clusterInstance.admin()) {
134+
sendTestRecords(producer);
135+
136+
TestUtils.assertFutureThrows(NotLeaderOrFollowerException.class,
137+
admin.describeProducers(
138+
List.of(TOPIC_PARTITION),
139+
new DescribeProducersOptions().brokerId(getNonReplicaBrokerId(admin))
140+
).partitionResult(TOPIC_PARTITION));
141+
}
142+
}
143+
}

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.UUID;
6464
import java.util.concurrent.CompletableFuture;
6565
import java.util.concurrent.ExecutionException;
66+
import java.util.concurrent.Executors;
6667
import java.util.concurrent.Future;
6768
import java.util.concurrent.TimeUnit;
6869
import java.util.concurrent.atomic.AtomicReference;
@@ -109,6 +110,7 @@
109110
import static org.junit.jupiter.api.Assertions.assertNull;
110111
import static org.junit.jupiter.api.Assertions.assertThrows;
111112
import static org.junit.jupiter.api.Assertions.assertTrue;
113+
import static org.junit.jupiter.api.Assertions.fail;
112114

113115
@ClusterTestDefaults(
114116
types = {Type.KRAFT},
@@ -553,14 +555,19 @@ private void testInterceptors(Map<String, Object> consumerConfig) throws Excepti
553555

554556
// commit sync and verify onCommit is called
555557
var commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue();
556-
consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L)));
557-
assertEquals(2, consumer.committed(Set.of(TP)).get(TP).offset());
558+
consumer.commitSync(Map.of(TP, new OffsetAndMetadata(2L, "metadata")));
559+
OffsetAndMetadata metadata = consumer.committed(Set.of(TP)).get(TP);
560+
assertEquals(2, metadata.offset());
561+
assertEquals("metadata", metadata.metadata());
558562
assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
559563

560564
// commit async and verify onCommit is called
561-
var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L));
565+
var offsetsToCommit = Map.of(TP, new OffsetAndMetadata(5L, null));
562566
sendAndAwaitAsyncCommit(consumer, Optional.of(offsetsToCommit));
563-
assertEquals(5, consumer.committed(Set.of(TP)).get(TP).offset());
567+
metadata = consumer.committed(Set.of(TP)).get(TP);
568+
assertEquals(5, metadata.offset());
569+
// null metadata will be converted to an empty string
570+
assertEquals("", metadata.metadata());
564571
assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue());
565572
}
566573
// cleanup
@@ -1588,6 +1595,75 @@ private void sendCompressedMessages(int numRecords, TopicPartition tp) {
15881595
}
15891596
}
15901597

1598+
@ClusterTest
1599+
public void testClassicConsumerStallBetweenPoll() throws Exception {
1600+
testStallBetweenPoll(GroupProtocol.CLASSIC);
1601+
}
1602+
1603+
@ClusterTest
1604+
public void testAsyncConsumerStallBetweenPoll() throws Exception {
1605+
testStallBetweenPoll(GroupProtocol.CONSUMER);
1606+
}
1607+
1608+
/**
1609+
* This test is to prove that the intermittent stalling that has been experienced when using the asynchronous
1610+
* consumer, as filed under KAFKA-19259, have been fixed.
1611+
*
1612+
* <p/>
1613+
*
1614+
* The basic idea is to have one thread that produces a record every 500 ms. and the main thread that consumes
1615+
* records without pausing between polls for much more than the produce delay. In the test case filed in
1616+
* KAFKA-19259, the consumer sometimes pauses for up to 5-10 seconds despite records being produced every second.
1617+
*/
1618+
private void testStallBetweenPoll(GroupProtocol groupProtocol) throws Exception {
1619+
var testTopic = "stall-test-topic";
1620+
var numPartitions = 6;
1621+
cluster.createTopic(testTopic, numPartitions, (short) BROKER_COUNT);
1622+
1623+
// The producer must produce slowly to tickle the scenario.
1624+
var produceDelay = 500;
1625+
1626+
var executor = Executors.newScheduledThreadPool(1);
1627+
1628+
try (var producer = cluster.producer()) {
1629+
// Start a thread running that produces records at a relative trickle.
1630+
executor.scheduleWithFixedDelay(
1631+
() -> producer.send(new ProducerRecord<>(testTopic, TestUtils.randomBytes(64))),
1632+
0,
1633+
produceDelay,
1634+
TimeUnit.MILLISECONDS
1635+
);
1636+
1637+
Map<String, Object> consumerConfig = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT));
1638+
1639+
// Assign a tolerance for how much time is allowed to pass between Consumer.poll() calls given that there
1640+
// should be *at least* one record to read every second.
1641+
var pollDelayTolerance = 2000;
1642+
1643+
try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) {
1644+
consumer.subscribe(List.of(testTopic));
1645+
1646+
// This is here to allow the consumer time to settle the group membership/assignment.
1647+
awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 0));
1648+
1649+
// Keep track of the last time the poll is invoked to ensure the deltas between invocations don't
1650+
// exceed the delay threshold defined above.
1651+
var beforePoll = System.currentTimeMillis();
1652+
consumer.poll(Duration.ofSeconds(5));
1653+
consumer.poll(Duration.ofSeconds(5));
1654+
var afterPoll = System.currentTimeMillis();
1655+
var pollDelay = afterPoll - beforePoll;
1656+
1657+
if (pollDelay > pollDelayTolerance)
1658+
fail("Detected a stall of " + pollDelay + " ms between Consumer.poll() invocations despite a Producer producing records every " + produceDelay + " ms");
1659+
} finally {
1660+
executor.shutdownNow();
1661+
// Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread
1662+
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor did not terminate");
1663+
}
1664+
}
1665+
}
1666+
15911667
private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
15921668
Consumer<byte[], byte[]> consumer,
15931669
TopicPartition tp

clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,14 @@ public class AdminClientConfig extends AbstractConfig {
155155
static {
156156
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
157157
Type.LIST,
158-
"",
158+
List.of(),
159+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
159160
Importance.HIGH,
160161
BOOTSTRAP_SERVERS_DOC).
161162
define(BOOTSTRAP_CONTROLLERS_CONFIG,
162163
Type.LIST,
163-
"",
164+
List.of(),
165+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
164166
Importance.HIGH,
165167
BOOTSTRAP_CONTROLLERS_DOC)
166168
.define(CLIENT_ID_CONFIG, Type.STRING, "", Importance.MEDIUM, CLIENT_ID_DOC)
@@ -239,6 +241,7 @@ public class AdminClientConfig extends AbstractConfig {
239241
.define(METRIC_REPORTER_CLASSES_CONFIG,
240242
Type.LIST,
241243
JmxReporter.class.getName(),
244+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
242245
Importance.LOW,
243246
METRIC_REPORTER_CLASSES_DOC)
244247
.define(METRICS_RECORDING_LEVEL_CONFIG,
@@ -284,7 +287,8 @@ public class AdminClientConfig extends AbstractConfig {
284287
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
285288
.define(CONFIG_PROVIDERS_CONFIG,
286289
ConfigDef.Type.LIST,
287-
List.of(),
290+
List.of(),
291+
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
288292
ConfigDef.Importance.LOW,
289293
CONFIG_PROVIDERS_DOC);
290294
}

clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@
2020
/**
2121
* Options for {@link Admin#unregisterBroker(int, UnregisterBrokerOptions)}.
2222
*/
23-
public class UnregisterBrokerOptions extends AbstractOptions<UpdateFeaturesOptions> {
23+
public class UnregisterBrokerOptions extends AbstractOptions<UnregisterBrokerOptions> {
2424
}

0 commit comments

Comments
 (0)