Skip to content

Commit 857b1e9

Browse files
authored
KAFKA-19719: --no-initial-controllers should not assume kraft.version=1 (#20551)
Just because a controller node sets --no-initial-controllers flag does not mean it is necessarily running kraft.version=1. The more precise meaning is that the controller node being formatted does not know what kraft version the cluster should be in, and therefore it is only safe to assume kraft.version=0. Only by setting --standalone,--initial-controllers, or --no-initial-controllers AND not specifying the controller.quorum.voters static config, is it known kraft.version > 0. For example, it is a valid configuration (although confusing) to run a static quorum defined by controller.quorum.voters but have all the controllers format with --no-initial-controllers. In this case, specifying --no-initial-controllers alongside a metadata version that does not support kraft.version=1 causes formatting to fail, which is a regression. Additionally, the formatter should not check the kraft.version against the release version, since kraft.version does not actually depend on any release version. It should only check the kraft.version against the static voters config/format arguments. This PR also cleans up the integration test framework to match the semantics of formatting an actual cluster. Reviewers: TengYao Chi <[email protected]>, Kuan-Po Tseng <[email protected]>, Chia-Ping Tsai <[email protected]>, José Armando García Sancio <[email protected]>
1 parent f4e00e9 commit 857b1e9

File tree

10 files changed

+171
-195
lines changed

10 files changed

+171
-195
lines changed

core/src/main/scala/kafka/tools/StorageTool.scala

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,9 @@ object StorageTool extends Logging {
144144
})
145145
val initialControllers = namespace.getString("initial_controllers")
146146
val isStandalone = namespace.getBoolean("standalone")
147-
if (!config.quorumConfig.voters().isEmpty &&
148-
(Option(initialControllers).isDefined || isStandalone)) {
147+
val staticVotersEmpty = config.quorumConfig.voters().isEmpty
148+
formatter.setHasDynamicQuorum(staticVotersEmpty)
149+
if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) {
149150
throw new TerseFailure("You cannot specify " +
150151
QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
151152
"with --initial-controllers or --standalone. " +
@@ -158,16 +159,13 @@ object StorageTool extends Logging {
158159
if (isStandalone) {
159160
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
160161
}
161-
if (namespace.getBoolean("no_initial_controllers")) {
162-
formatter.setNoInitialControllersFlag(true)
163-
} else {
164-
if (config.processRoles.contains(ProcessRole.ControllerRole)) {
165-
if (config.quorumConfig.voters().isEmpty && formatter.initialVoters().isEmpty) {
162+
if (!namespace.getBoolean("no_initial_controllers") &&
163+
config.processRoles.contains(ProcessRole.ControllerRole) &&
164+
staticVotersEmpty &&
165+
formatter.initialVoters().isEmpty) {
166166
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
167167
" is not set on this controller, you must specify one of the following: " +
168168
"--standalone, --initial-controllers, or --no-initial-controllers.");
169-
}
170-
}
171169
}
172170
Option(namespace.getList("add_scram")).
173171
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
@@ -336,18 +334,21 @@ object StorageTool extends Logging {
336334

337335
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
338336
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
339-
.help("Used to initialize a controller as a single-node dynamic quorum.")
337+
.help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " +
338+
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
340339
.action(storeTrue())
341340

342341
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
343-
.help("Used to initialize a server without a dynamic quorum topology.")
342+
.help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " +
343+
"the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.")
344344
.action(storeTrue())
345345

346346
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
347-
.help("Used to initialize a server with a specific dynamic quorum topology. The argument " +
347+
.help("Used to initialize a server with the specified dynamic quorum. The argument " +
348348
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
349349
"format all nodes. For example:\n[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" +
350-
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
350+
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " +
351+
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
351352
.action(store())
352353
}
353354

core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ public void testCreateAndDestroyReconfigurableCluster() throws Exception {
8484
new TestKitNodes.Builder().
8585
setNumBrokerNodes(1).
8686
setNumControllerNodes(1).
87-
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
8887
build()
89-
).build()) {
88+
).setStandalone(true).build()) {
9089
cluster.format();
9190
cluster.startup();
9291
try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -108,13 +107,23 @@ static Map<Integer, Uuid> findVoterDirs(Admin admin) throws Exception {
108107

109108
@Test
110109
public void testRemoveController() throws Exception {
111-
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
112-
new TestKitNodes.Builder().
113-
setNumBrokerNodes(1).
114-
setNumControllerNodes(3).
115-
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
116-
build()
117-
).build()) {
110+
final var nodes = new TestKitNodes.Builder().
111+
setNumBrokerNodes(1).
112+
setNumControllerNodes(3).
113+
build();
114+
115+
final Map<Integer, Uuid> initialVoters = new HashMap<>();
116+
for (final var controllerNode : nodes.controllerNodes().values()) {
117+
initialVoters.put(
118+
controllerNode.id(),
119+
controllerNode.metadataDirectoryId()
120+
);
121+
}
122+
123+
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
124+
setInitialVoterSet(initialVoters).
125+
build()
126+
) {
118127
cluster.format();
119128
cluster.startup();
120129
try (Admin admin = Admin.create(cluster.clientProperties())) {
@@ -133,12 +142,22 @@ public void testRemoveController() throws Exception {
133142

134143
@Test
135144
public void testRemoveAndAddSameController() throws Exception {
136-
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
137-
new TestKitNodes.Builder().
138-
setNumBrokerNodes(1).
139-
setNumControllerNodes(4).
140-
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
141-
build()).build()
145+
final var nodes = new TestKitNodes.Builder().
146+
setNumBrokerNodes(1).
147+
setNumControllerNodes(4).
148+
build();
149+
150+
final Map<Integer, Uuid> initialVoters = new HashMap<>();
151+
for (final var controllerNode : nodes.controllerNodes().values()) {
152+
initialVoters.put(
153+
controllerNode.id(),
154+
controllerNode.metadataDirectoryId()
155+
);
156+
}
157+
158+
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
159+
setInitialVoterSet(initialVoters).
160+
build()
142161
) {
143162
cluster.format();
144163
cluster.startup();
@@ -173,7 +192,6 @@ public void testControllersAutoJoinStandaloneVoter() throws Exception {
173192
final var nodes = new TestKitNodes.Builder().
174193
setNumBrokerNodes(1).
175194
setNumControllerNodes(3).
176-
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
177195
build();
178196
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
179197
setConfigProp(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, true).
@@ -199,7 +217,6 @@ public void testNewVoterAutoRemovesAndAdds() throws Exception {
199217
final var nodes = new TestKitNodes.Builder().
200218
setNumBrokerNodes(1).
201219
setNumControllerNodes(3).
202-
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
203220
build();
204221

205222
// Configure the initial voters with one voter having a different directory ID.

core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,8 +1035,7 @@ class KRaftClusterTest {
10351035
val cluster = new KafkaClusterTestKit.Builder(
10361036
new TestKitNodes.Builder().
10371037
setNumBrokerNodes(1).
1038-
setNumControllerNodes(1).
1039-
setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
1038+
setNumControllerNodes(1).build()).setStandalone(true).build()
10401039
try {
10411040
cluster.format()
10421041
cluster.startup()

core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -605,19 +605,14 @@ Found problem:
605605
Seq("--release-version", "3.9-IV0"))).getMessage)
606606
}
607607

608-
@ParameterizedTest
609-
@ValueSource(booleans = Array(false, true))
610-
def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = {
608+
@Test
609+
def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
611610
val availableDirs = Seq(TestUtils.tempDir())
612611
val properties = new Properties()
613612
properties.putAll(defaultDynamicQuorumProperties)
614613
properties.setProperty("log.dirs", availableDirs.mkString(","))
615614
val stream = new ByteArrayOutputStream()
616615
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers")
617-
if (setKraftVersionFeature) {
618-
arguments += "--feature"
619-
arguments += "kraft.version=1"
620-
}
621616
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
622617
assertTrue(stream.toString().
623618
contains("Formatting metadata directory %s".format(availableDirs.head)),

docs/ops.html

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4099,14 +4099,8 @@ <h5 class="anchor-heading"><a id="static_versus_dynamic_kraft_quorums" class="an
40994099
<p>
41004100
The static versus dynamic nature of the quorum is determined at the time of formatting.
41014101
Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is
4102-
<b>not</b> present, and if the software version is Apache Kafka 3.9 or newer. If you have
4103-
followed the instructions earlier in this document, you will get a dynamic quorum.<p>
4104-
4105-
If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your
4106-
controllers using the <code>--feature kraft.version=1</code>. (Note that you should not supply
4107-
this flag when formatting brokers -- only when formatting controllers.)<p>
4108-
4109-
<pre><code class="language-bash">$ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller.properties</code></pre>
4102+
<b>not</b> present, and one of --standalone, --initial-controllers, or --no-initial-controllers is set.
4103+
If you have followed the instructions earlier in this document, you will get a dynamic quorum.
41104104
<p>
41114105
Note: To migrate from static voter set to dynamic voter set, please refer to the <a href="#kraft_upgrade">Upgrade</a> section.
41124106

metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public class Formatter {
131131
* The initial KIP-853 voters.
132132
*/
133133
private Optional<DynamicVoters> initialControllers = Optional.empty();
134-
private boolean noInitialControllersFlag = false;
134+
private boolean hasDynamicQuorum = false;
135135

136136
public Formatter setPrintStream(PrintStream printStream) {
137137
this.printStream = printStream;
@@ -217,8 +217,8 @@ public Formatter setInitialControllers(DynamicVoters initialControllers) {
217217
return this;
218218
}
219219

220-
public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) {
221-
this.noInitialControllersFlag = noInitialControllersFlag;
220+
public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
221+
this.hasDynamicQuorum = hasDynamicQuorum;
222222
return this;
223223
}
224224

@@ -227,7 +227,7 @@ public Optional<DynamicVoters> initialVoters() {
227227
}
228228

229229
boolean hasDynamicQuorum() {
230-
return initialControllers.isPresent() || noInitialControllersFlag;
230+
return hasDynamicQuorum;
231231
}
232232

233233
public BootstrapMetadata bootstrapMetadata() {
@@ -337,8 +337,8 @@ Map<String, Short> calculateEffectiveFeatureLevels() {
337337
/**
338338
* Calculate the effective feature level for kraft.version. In order to keep existing
339339
* command-line invocations of StorageTool working, we default this to 0 if no dynamic
340-
* voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments
341-
* were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version).
340+
* voter quorum arguments were provided. As a convenience, if the static voters config is
341+
* empty, we set the latest kraft.version. (Currently there is only 1 non-zero version).
342342
*
343343
* @param configuredKRaftVersionLevel The configured level for kraft.version
344344
* @return The effective feature level.
@@ -348,20 +348,19 @@ private short effectiveKRaftFeatureLevel(Optional<Short> configuredKRaftVersionL
348348
if (configuredKRaftVersionLevel.get() == 0) {
349349
if (hasDynamicQuorum()) {
350350
throw new FormatterException(
351-
"Cannot set kraft.version to " +
352-
configuredKRaftVersionLevel.get() +
353-
" if one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
354-
"For dynamic controllers support, try removing the --feature flag for kraft.version."
351+
"Cannot set kraft.version to 0 if controller.quorum.voters is empty and one of the flags " +
352+
"--standalone, --initial-controllers, or --no-initial-controllers is used. For dynamic " +
353+
"controllers support, try removing the --feature flag for kraft.version."
355354
);
356355
}
357356
} else {
358357
if (!hasDynamicQuorum()) {
359358
throw new FormatterException(
360-
"Cannot set kraft.version to " +
361-
configuredKRaftVersionLevel.get() +
362-
" unless one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
363-
"For dynamic controllers support, try using one of --standalone, --initial-controllers, or " +
364-
"--no-initial-controllers."
359+
"Cannot set kraft.version to " + configuredKRaftVersionLevel.get() +
360+
" unless controller.quorum.voters is empty and one of the flags --standalone, " +
361+
"--initial-controllers, or --no-initial-controllers is used. " +
362+
"For dynamic controllers support, try using one of --standalone, --initial-controllers, " +
363+
"or --no-initial-controllers and removing controller.quorum.voters."
365364
);
366365
}
367366
}

0 commit comments

Comments
 (0)