Skip to content

Commit e27ea8d

Browse files
authored
KAFKA-19702 Move MetadataVersionConfigValidator and related test code to metadata module (#20526)
1. Move `MetadataVersionConfigValidator` to metadata module. 2. Move `MetadataVersionConfigValidatorTest` to metadata module. 3. Remove `KafkaConfig#validateWithMetadataVersion`. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 7d098cf commit e27ea8d

File tree

6 files changed

+169
-149
lines changed

6 files changed

+169
-149
lines changed

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar
4141
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService}
4242
import org.apache.kafka.coordinator.transaction.ProducerIdManager
4343
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
44-
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
44+
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator}
4545
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, ScramPublisher}
4646
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
4747
import org.apache.kafka.server.authorizer.Authorizer
@@ -469,7 +469,10 @@ class BrokerServer(
469469
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
470470
config.numIoThreads, "RequestHandlerAvgIdlePercent")
471471

472-
metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler))
472+
metadataPublishers.add(new MetadataVersionConfigValidator(config.brokerId,
473+
() => config.processRoles.contains(ProcessRole.BrokerRole) && config.logDirs().size() > 1,
474+
sharedServer.metadataPublishingFaultHandler
475+
))
473476
brokerMetadataPublisher = new BrokerMetadataPublisher(config,
474477
metadataCache,
475478
logManager,

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
4343
import org.apache.kafka.security.authorizer.AuthorizerUtils
4444
import org.apache.kafka.server.ProcessRole
4545
import org.apache.kafka.server.authorizer.Authorizer
46-
import org.apache.kafka.server.common.MetadataVersion
4746
import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
4847
import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
4948
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -653,18 +652,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
653652
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
654653
}
655654

656-
/**
657-
* Validate some configurations for new MetadataVersion. A new MetadataVersion can take place when
658-
* a FeatureLevelRecord for "metadata.version" is read from the cluster metadata.
659-
*/
660-
def validateWithMetadataVersion(metadataVersion: MetadataVersion): Unit = {
661-
if (processRoles.contains(ProcessRole.BrokerRole) && logDirs.size > 1) {
662-
require(metadataVersion.isDirectoryAssignmentSupported,
663-
s"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion ${metadataVersion}. " +
664-
s"Need ${MetadataVersion.IBP_3_7_IV2} or higher")
665-
}
666-
}
667-
668655
/**
669656
* Copy the subset of properties that are relevant to Logs. The individual properties
670657
* are listed here since the names are slightly different in each Config class...

core/src/test/java/kafka/server/MetadataVersionConfigValidatorTest.java

Lines changed: 0 additions & 103 deletions
This file was deleted.

core/src/test/scala/unit/kafka/log/LogConfigTest.scala

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
2323
import org.apache.kafka.common.config.ConfigDef.Type.INT
2424
import org.apache.kafka.common.config.{ConfigException, SslConfigs, TopicConfig}
2525
import org.apache.kafka.common.errors.InvalidConfigurationException
26-
import org.apache.kafka.server.common.MetadataVersion
2726
import org.junit.jupiter.api.Assertions._
2827
import org.junit.jupiter.api.Test
2928

@@ -429,21 +428,4 @@ class LogConfigTest {
429428
logProps.put(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, deleteOnDisable.toString)
430429
LogConfig.validate(logProps)
431430
}
432-
433-
@Test
434-
def testValidateWithMetadataVersionJbodSupport(): Unit = {
435-
def validate(metadataVersion: MetadataVersion, jbodConfig: Boolean): Unit =
436-
KafkaConfig.fromProps(
437-
TestUtils.createBrokerConfig(nodeId = 0, logDirCount = if (jbodConfig) 2 else 1)
438-
).validateWithMetadataVersion(metadataVersion)
439-
440-
validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = false)
441-
validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = false)
442-
validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = false)
443-
assertThrows(classOf[IllegalArgumentException], () =>
444-
validate(MetadataVersion.IBP_3_6_IV2, jbodConfig = true))
445-
assertThrows(classOf[IllegalArgumentException], () =>
446-
validate(MetadataVersion.IBP_3_7_IV0, jbodConfig = true))
447-
validate(MetadataVersion.IBP_3_7_IV2, jbodConfig = true)
448-
}
449431
}

core/src/main/java/kafka/server/MetadataVersionConfigValidator.java renamed to metadata/src/main/java/org/apache/kafka/metadata/MetadataVersionConfigValidator.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package kafka.server;
18+
package org.apache.kafka.metadata;
1919

2020
import org.apache.kafka.image.MetadataDelta;
2121
import org.apache.kafka.image.MetadataImage;
@@ -24,18 +24,20 @@
2424
import org.apache.kafka.server.common.MetadataVersion;
2525
import org.apache.kafka.server.fault.FaultHandler;
2626

27+
import java.util.function.Supplier;
28+
2729
public class MetadataVersionConfigValidator implements MetadataPublisher {
2830
private final String name;
29-
private final KafkaConfig config;
31+
private final Supplier<Boolean> hasMultiLogDirs;
3032
private final FaultHandler faultHandler;
3133

3234
public MetadataVersionConfigValidator(
33-
KafkaConfig config,
34-
FaultHandler faultHandler
35+
int id,
36+
Supplier<Boolean> hasMultiLogDirs,
37+
FaultHandler faultHandler
3538
) {
36-
int id = config.brokerId();
3739
this.name = "MetadataVersionPublisher(id=" + id + ")";
38-
this.config = config;
40+
this.hasMultiLogDirs = hasMultiLogDirs;
3941
this.faultHandler = faultHandler;
4042
}
4143

@@ -46,9 +48,9 @@ public String name() {
4648

4749
@Override
4850
public void onMetadataUpdate(
49-
MetadataDelta delta,
50-
MetadataImage newImage,
51-
LoaderManifest manifest
51+
MetadataDelta delta,
52+
MetadataImage newImage,
53+
LoaderManifest manifest
5254
) {
5355
if (delta.featuresDelta() != null) {
5456
if (delta.metadataVersionChanged().isPresent()) {
@@ -57,13 +59,22 @@ public void onMetadataUpdate(
5759
}
5860
}
5961

62+
/**
63+
* Validate some configurations for the new MetadataVersion. A new MetadataVersion can take place when
64+
* a FeatureLevelRecord for "metadata.version" is read from the cluster metadata.
65+
*/
6066
@SuppressWarnings("ThrowableNotThrown")
6167
private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
62-
try {
63-
this.config.validateWithMetadataVersion(metadataVersion);
64-
} catch (Throwable t) {
68+
if (this.hasMultiLogDirs.get() && !metadataVersion.isDirectoryAssignmentSupported()) {
69+
String errorMsg = String.format(
70+
"Multiple log directories (aka JBOD) are not supported in the current MetadataVersion %s. Need %s or higher",
71+
metadataVersion, MetadataVersion.IBP_3_7_IV2
72+
);
73+
6574
this.faultHandler.handleFault(
66-
"Broker configuration does not support the cluster MetadataVersion", t);
75+
"Broker configuration does not support the cluster MetadataVersion",
76+
new IllegalArgumentException(errorMsg)
77+
);
6778
}
6879
}
6980
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.metadata;
19+
20+
import org.apache.kafka.common.metadata.FeatureLevelRecord;
21+
import org.apache.kafka.image.MetadataDelta;
22+
import org.apache.kafka.image.MetadataImage;
23+
import org.apache.kafka.image.MetadataProvenance;
24+
import org.apache.kafka.image.loader.LogDeltaManifest;
25+
import org.apache.kafka.raft.LeaderAndEpoch;
26+
import org.apache.kafka.server.common.MetadataVersion;
27+
import org.apache.kafka.server.fault.FaultHandler;
28+
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.function.Supplier;
32+
33+
import static org.mockito.ArgumentMatchers.any;
34+
import static org.mockito.ArgumentMatchers.eq;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.times;
37+
import static org.mockito.Mockito.verify;
38+
import static org.mockito.Mockito.verifyNoMoreInteractions;
39+
import static org.mockito.Mockito.when;
40+
41+
@SuppressWarnings({"unchecked", "ThrowableNotThrown"})
42+
public class MetadataVersionConfigValidatorTest {
43+
44+
private static final LogDeltaManifest TEST_MANIFEST = LogDeltaManifest.newBuilder()
45+
.provenance(MetadataProvenance.EMPTY)
46+
.leaderAndEpoch(LeaderAndEpoch.UNKNOWN)
47+
.numBatches(1)
48+
.elapsedNs(90)
49+
.numBytes(88)
50+
.build();
51+
public static final MetadataProvenance TEST_PROVENANCE =
52+
new MetadataProvenance(50, 3, 8000, true);
53+
54+
void executeMetadataUpdate(
55+
MetadataVersion metadataVersion,
56+
Supplier<Boolean> multiLogDirSupplier,
57+
FaultHandler faultHandler
58+
) throws Exception {
59+
try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(0, multiLogDirSupplier, faultHandler)) {
60+
MetadataDelta delta = new MetadataDelta.Builder()
61+
.setImage(MetadataImage.EMPTY)
62+
.build();
63+
if (metadataVersion != null) {
64+
delta.replay(new FeatureLevelRecord().
65+
setName(MetadataVersion.FEATURE_NAME).
66+
setFeatureLevel(metadataVersion.featureLevel()));
67+
}
68+
MetadataImage image = delta.apply(TEST_PROVENANCE);
69+
70+
validator.onMetadataUpdate(delta, image, TEST_MANIFEST);
71+
}
72+
}
73+
74+
@Test
75+
void testValidatesConfigOnMetadataChange() throws Exception {
76+
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV2;
77+
FaultHandler faultHandler = mock(FaultHandler.class);
78+
Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
79+
when(multiLogDirSupplier.get()).thenReturn(false);
80+
81+
executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler);
82+
83+
verify(multiLogDirSupplier, times(1)).get();
84+
verifyNoMoreInteractions(faultHandler);
85+
}
86+
87+
@Test
88+
void testInvokesFaultHandlerOnException() throws Exception {
89+
MetadataVersion metadataVersion = MetadataVersion.IBP_3_7_IV1;
90+
Supplier<Boolean> multiLogDirSupplier = mock(Supplier.class);
91+
FaultHandler faultHandler = mock(FaultHandler.class);
92+
93+
when(multiLogDirSupplier.get()).thenReturn(true);
94+
95+
executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler);
96+
97+
verify(multiLogDirSupplier, times(1)).get();
98+
verify(faultHandler, times(1)).handleFault(
99+
eq("Broker configuration does not support the cluster MetadataVersion"),
100+
any(IllegalArgumentException.class));
101+
}
102+
103+
@Test
104+
void testValidateWithMetadataVersionJbodSupport() throws Exception {
105+
FaultHandler faultHandler = mock(FaultHandler.class);
106+
validate(MetadataVersion.IBP_3_6_IV2, false, faultHandler);
107+
verifyNoMoreInteractions(faultHandler);
108+
109+
faultHandler = mock(FaultHandler.class);
110+
validate(MetadataVersion.IBP_3_7_IV0, false, faultHandler);
111+
verifyNoMoreInteractions(faultHandler);
112+
113+
faultHandler = mock(FaultHandler.class);
114+
validate(MetadataVersion.IBP_3_7_IV2, false, faultHandler);
115+
verifyNoMoreInteractions(faultHandler);
116+
117+
faultHandler = mock(FaultHandler.class);
118+
validate(MetadataVersion.IBP_3_6_IV2, true, faultHandler);
119+
verify(faultHandler, times(1)).handleFault(
120+
eq("Broker configuration does not support the cluster MetadataVersion"),
121+
any(IllegalArgumentException.class));
122+
123+
faultHandler = mock(FaultHandler.class);
124+
validate(MetadataVersion.IBP_3_7_IV0, true, faultHandler);
125+
verify(faultHandler, times(1)).handleFault(
126+
eq("Broker configuration does not support the cluster MetadataVersion"),
127+
any(IllegalArgumentException.class));
128+
129+
faultHandler = mock(FaultHandler.class);
130+
validate(MetadataVersion.IBP_3_7_IV2, true, faultHandler);
131+
verifyNoMoreInteractions(faultHandler);
132+
}
133+
134+
private void validate(MetadataVersion metadataVersion, boolean jbodConfig, FaultHandler faultHandler)
135+
throws Exception {
136+
Supplier<Boolean> multiLogDirSupplier = () -> jbodConfig;
137+
138+
executeMetadataUpdate(metadataVersion, multiLogDirSupplier, faultHandler);
139+
}
140+
}

0 commit comments

Comments
 (0)