Skip to content

Commit 3837d06

Browse files
committed
Merge branch 'trunk' of https://github.com/apache/kafka into KAFKA‑19614
2 parents caf6142 + 2026833 commit 3837d06

File tree

94 files changed

+3309
-1139
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+3309
-1139
lines changed

build.gradle

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3316,6 +3316,22 @@ project(':streams:upgrade-system-tests-40') {
33163316
}
33173317
}
33183318

3319+
project(':streams:upgrade-system-tests-41') {
3320+
base {
3321+
archivesName = "kafka-streams-upgrade-system-tests-41"
3322+
}
3323+
3324+
dependencies {
3325+
testImplementation libs.kafkaStreams_41
3326+
testRuntimeOnly libs.junitJupiter
3327+
testRuntimeOnly runtimeTestLibs
3328+
}
3329+
3330+
systemTestLibs {
3331+
dependsOn testJar
3332+
}
3333+
}
3334+
33193335
project(':jmh-benchmarks') {
33203336

33213337
apply plugin: 'com.gradleup.shadow'

checkstyle/import-control-metadata.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@
161161
<allow pkg="org.apache.kafka.metadata" />
162162
<allow pkg="org.apache.kafka.queue" />
163163
<allow pkg="org.apache.kafka.raft" />
164+
<allow pkg="org.apache.kafka.security" />
164165
<allow pkg="org.apache.kafka.server.authorizer" />
165166
<allow pkg="org.apache.kafka.server.common" />
166167
<allow pkg="org.apache.kafka.server.fault" />

checkstyle/import-control-server-common.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
</subpackage>
6161

6262
<subpackage name="security">
63+
<allow pkg="org.apache.kafka.clients.admin" />
6364
<allow pkg="org.apache.kafka.common.config" />
6465
<allow pkg="org.apache.kafka.common.config.types" />
6566
<allow pkg="org.apache.kafka.server.util" />

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1517,7 +1517,7 @@ private Timer createTimerForCloseRequests(Duration timeout) {
15171517
}
15181518

15191519
private void autoCommitOnClose(final Timer timer) {
1520-
if (groupMetadata.get().isEmpty())
1520+
if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
15211521
return;
15221522

15231523
if (autoCommitEnabled)
@@ -1527,7 +1527,7 @@ private void autoCommitOnClose(final Timer timer) {
15271527
}
15281528

15291529
private void runRebalanceCallbacksOnClose() {
1530-
if (groupMetadata.get().isEmpty())
1530+
if (groupMetadata.get().isEmpty() || rebalanceListenerInvoker == null)
15311531
return;
15321532

15331533
int memberEpoch = groupMetadata.get().get().generationId();
@@ -1553,7 +1553,7 @@ private void runRebalanceCallbacksOnClose() {
15531553
}
15541554

15551555
private void leaveGroupOnClose(final Timer timer, final CloseOptions.GroupMembershipOperation membershipOperation) {
1556-
if (groupMetadata.get().isEmpty())
1556+
if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
15571557
return;
15581558

15591559
log.debug("Leaving the consumer group during consumer close");
@@ -1569,7 +1569,7 @@ private void leaveGroupOnClose(final Timer timer, final CloseOptions.GroupMember
15691569
}
15701570

15711571
private void stopFindCoordinatorOnClose() {
1572-
if (groupMetadata.get().isEmpty())
1572+
if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
15731573
return;
15741574
log.debug("Stop finding coordinator during consumer close");
15751575
applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
@@ -1634,7 +1634,7 @@ private void commitSync(Optional<Map<TopicPartition, OffsetAndMetadata>> offsets
16341634
}
16351635

16361636
private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean enableWakeup) {
1637-
if (lastPendingAsyncCommit == null) {
1637+
if (lastPendingAsyncCommit == null || offsetCommitCallbackInvoker == null) {
16381638
return;
16391639
}
16401640

clients/src/main/resources/common/message/DeleteAclsRequest.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
{ "name": "ResourceTypeFilter", "type": "int8", "versions": "0+",
3131
"about": "The resource type." },
3232
{ "name": "ResourceNameFilter", "type": "string", "versions": "0+", "nullableVersions": "0+",
33-
"about": "The resource name." },
33+
"about": "The resource name, or null to match any resource name." },
3434
{ "name": "PatternTypeFilter", "type": "int8", "versions": "1+", "default": "3", "ignorable": false,
3535
"about": "The pattern type." },
3636
{ "name": "PrincipalFilter", "type": "string", "versions": "0+", "nullableVersions": "0+",

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.apache.kafka.common.requests.MetadataResponse;
8484
import org.apache.kafka.common.requests.RequestTestUtils;
8585
import org.apache.kafka.common.serialization.StringDeserializer;
86+
import org.apache.kafka.common.utils.LogCaptureAppender;
8687
import org.apache.kafka.common.utils.LogContext;
8788
import org.apache.kafka.common.utils.MockTime;
8889
import org.apache.kafka.common.utils.Time;
@@ -2029,6 +2030,28 @@ public void testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() {
20292030
assertEquals(10, (double) metrics.metric(metrics.metricName("background-event-queue-time-max", CONSUMER_METRIC_GROUP)).metricValue());
20302031
}
20312032

2033+
@Test
2034+
public void testFailConstructor() {
2035+
final Properties props = requiredConsumerConfig();
2036+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
2037+
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "an.invalid.class");
2038+
final ConsumerConfig config = new ConsumerConfig(props);
2039+
2040+
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
2041+
KafkaException ce = assertThrows(
2042+
KafkaException.class,
2043+
() -> newConsumer(config));
2044+
assertTrue(ce.getMessage().contains("Failed to construct kafka consumer"), "Unexpected exception message: " + ce.getMessage());
2045+
assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class cannot be found"), "Unexpected cause: " + ce.getCause());
2046+
2047+
boolean npeLogged = appender.getEvents().stream()
2048+
.flatMap(event -> event.getThrowableInfo().stream())
2049+
.anyMatch(str -> str.contains("NullPointerException"));
2050+
2051+
assertFalse(npeLogged, "Unexpected NullPointerException during consumer construction");
2052+
}
2053+
}
2054+
20322055
private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
20332056
final TopicPartition t0 = new TopicPartition("t0", 2);
20342057
final TopicPartition t1 = new TopicPartition("t0", 3);

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,18 +213,19 @@ public void testFailConstructor() {
213213
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "an.invalid.class");
214214
final ConsumerConfig config = new ConsumerConfig(props);
215215

216-
LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
217-
KafkaException ce = assertThrows(
216+
try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
217+
KafkaException ce = assertThrows(
218218
KafkaException.class,
219219
() -> newConsumer(config));
220-
assertTrue(ce.getMessage().contains("Failed to construct Kafka share consumer"), "Unexpected exception message: " + ce.getMessage());
221-
assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class cannot be found"), "Unexpected cause: " + ce.getCause());
220+
assertTrue(ce.getMessage().contains("Failed to construct Kafka share consumer"), "Unexpected exception message: " + ce.getMessage());
221+
assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class cannot be found"), "Unexpected cause: " + ce.getCause());
222222

223-
boolean npeLogged = appender.getEvents().stream()
223+
boolean npeLogged = appender.getEvents().stream()
224224
.flatMap(event -> event.getThrowableInfo().stream())
225225
.anyMatch(str -> str.contains("NullPointerException"));
226226

227-
assertFalse(npeLogged, "Unexpected NullPointerException during consumer construction");
227+
assertFalse(npeLogged, "Unexpected NullPointerException during consumer construction");
228+
}
228229
}
229230

230231
@Test

core/src/main/java/kafka/server/ClientRequestQuotaManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public ClientRequestQuotaManager(
5656
Optional<Plugin<ClientQuotaCallback>> quotaCallbackPlugin
5757
) {
5858
super(config, metrics, QuotaType.REQUEST, time, threadNamePrefix, quotaCallbackPlugin);
59-
this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds);
59+
this.maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(config.quotaWindowSizeSeconds());
6060
this.metrics = metrics;
6161
this.exemptMetricName = metrics.metricName("exempt-request-time", QuotaType.REQUEST.toString(), "Tracking exempt-request-time utilization percentage");
6262
exemptSensor = getOrCreateSensor(EXEMPT_SENSOR_NAME, DEFAULT_INACTIVE_EXEMPT_SENSOR_EXPIRATION_TIME_SECONDS, sensor -> sensor.add(exemptMetricName, new Rate()));

0 commit comments

Comments
 (0)