Skip to content

Commit 84b0db4

Browse files
committed
KAFKA-19719 --no-initial-controllers should not assume kraft.version=1 (apache#20604)
``` commit ec37eb5 (HEAD -> KAFKA-19719-cherry-pick-41, origin/KAFKA-19719-cherry-pick-41) Author: Kevin Wu <[email protected]> Date: Thu Sep 25 11:56:16 2025 -0500 KAFKA-19719: --no-initial-controllers should not assume kraft.version=1 (apache#20551) Just because a controller node sets --no-initial-controllers flag does not mean it is necessarily running kraft.version=1. The more precise meaning is that the controller node being formatted does not know what kraft version the cluster should be in, and therefore it is only safe to assume kraft.version=0. Only by setting --standalone,--initial-controllers, or --no-initial-controllers AND not specifying the controller.quorum.voters static config, is it known kraft.version > 0. For example, it is a valid configuration (although confusing) to run a static quorum defined by controller.quorum.voters but have all the controllers format with --no-initial-controllers. In this case, specifying --no-initial-controllers alongside a metadata version that does not support kraft.version=1 causes formatting to fail, which is does not support kraft.version=1 causes formatting to fail, which is a regression. Additionally, the formatter should not check the kraft.version against the release version, since kraft.version does not actually depend on any release version. It should only check the kraft.version against the static voters config/format arguments. This PR also cleans up the integration test framework to match the semantics of formatting an actual cluster. Reviewers: TengYao Chi <[email protected]>, Kuan-Po Tseng <[email protected]>, Chia-Ping Tsai <[email protected]>, José Armando García Sancio <[email protected]> Conflicts: core/src/main/scala/kafka/tools/StorageTool.scala Minor conflicts. Keep changes from cherry-pick. core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java Remove auto-join tests, since 4.1 does not support it. docs/ops.html Keep docs section from cherry-pick. metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java Minor conflicts. Keep cherry-picked changes. test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java Conflicts due to integration test framework changes. Keep new changes. commit 02d58b1 (upstream/4.1) ``` Reviewers: Chia-Ping Tsai <[email protected]> Conflicts: core/src/main/scala/kafka/tools/StorageTool.scala core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java core/src/test/java/kafka/testkit/KafkaClusterTestKit.java docs/ops.html metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java Minor conflicts mostly. The main difference is 3.9 does not have integration test support for KIP-853, so did not cherry-pick those changes for this branch.
1 parent e37f31e commit 84b0db4

File tree

6 files changed

+149
-133
lines changed

6 files changed

+149
-133
lines changed

core/src/main/scala/kafka/tools/StorageTool.scala

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -128,21 +128,30 @@ object StorageTool extends Logging {
128128
featureNamesAndLevels(_).foreach {
129129
kv => formatter.setFeatureLevel(kv._1, kv._2)
130130
})
131-
Option(namespace.getString("initial_controllers")).
131+
val initialControllers = namespace.getString("initial_controllers")
132+
val isStandalone = namespace.getBoolean("standalone")
133+
val staticVotersEmpty = config.quorumVoters.isEmpty
134+
formatter.setHasDynamicQuorum(staticVotersEmpty)
135+
if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) {
136+
throw new TerseFailure("You cannot specify " +
137+
QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
138+
"with --initial-controllers or --standalone. " +
139+
"If you want to use dynamic quorum, please remove " +
140+
QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " +
141+
QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.")
142+
}
143+
Option(initialControllers).
132144
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
133-
if (namespace.getBoolean("standalone")) {
145+
if (isStandalone) {
134146
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
135147
}
136-
if (namespace.getBoolean("no_initial_controllers")) {
137-
formatter.setNoInitialControllersFlag(true)
138-
} else {
139-
if (config.processRoles.contains(ProcessRole.ControllerRole)) {
140-
if (config.quorumVoters.isEmpty() && !formatter.initialVoters().isPresent()) {
148+
if (!namespace.getBoolean("no_initial_controllers") &&
149+
config.processRoles.contains(ProcessRole.ControllerRole) &&
150+
staticVotersEmpty &&
151+
formatter.initialVoters().isEmpty) {
141152
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
142153
" is not set on this controller, you must specify one of the following: " +
143154
"--standalone, --initial-controllers, or --no-initial-controllers.");
144-
}
145-
}
146155
}
147156
Option(namespace.getList("add_scram")).
148157
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
@@ -206,18 +215,21 @@ object StorageTool extends Logging {
206215
action(append())
207216
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
208217
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
209-
.help("Used to initialize a controller as a single-node dynamic quorum.")
218+
.help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " +
219+
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
210220
.action(storeTrue())
211221

212222
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
213-
.help("Used to initialize a server without a dynamic quorum topology.")
223+
.help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " +
224+
"the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.")
214225
.action(storeTrue())
215226

216227
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
217-
.help("Used to initialize a server with a specific dynamic quorum topology. The argument " +
228+
.help("Used to initialize a server with the specified dynamic quorum. The argument " +
218229
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
219230
"format all nodes. For example:\n[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" +
220-
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
231+
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " +
232+
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
221233
.action(store())
222234
parser.parseArgs(args)
223235
}

core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,10 @@ Found problem:
400400
def testFormatWithStandaloneFlagOnBrokerFails(): Unit = {
401401
val availableDirs = Seq(TestUtils.tempDir())
402402
val properties = new Properties()
403-
properties.putAll(defaultStaticQuorumProperties)
403+
properties.setProperty("process.roles", "broker")
404+
properties.setProperty("node.id", "0")
405+
properties.setProperty("controller.listener.names", "CONTROLLER")
406+
properties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
404407
properties.setProperty("log.dirs", availableDirs.mkString(","))
405408
val stream = new ByteArrayOutputStream()
406409
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone")
@@ -483,19 +486,14 @@ Found problem:
483486
Seq("--release-version", "3.9-IV0"))).getMessage)
484487
}
485488

486-
@ParameterizedTest
487-
@ValueSource(booleans = Array(false, true))
488-
def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = {
489+
@Test
490+
def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
489491
val availableDirs = Seq(TestUtils.tempDir())
490492
val properties = new Properties()
491493
properties.putAll(defaultDynamicQuorumProperties)
492494
properties.setProperty("log.dirs", availableDirs.mkString(","))
493495
val stream = new ByteArrayOutputStream()
494496
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers")
495-
if (setKraftVersionFeature) {
496-
arguments += "--feature"
497-
arguments += "kraft.version=1"
498-
}
499497
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
500498
assertTrue(stream.toString().
501499
contains("Formatting metadata directory %s".format(availableDirs.head)),

docs/ops.html

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3843,42 +3843,26 @@ <h5 class="anchor-heading"><a id="static_versus_dynamic_kraft_quorums" class="an
38433843
If you are not sure whether you are using static or dynamic quorums, you can determine this by
38443844
running something like the following:<p>
38453845

3846-
<pre><code class="language-bash">
3847-
$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe
3848-
</code></pre><p>
3849-
3850-
If the <code>kraft.version</code> field is level 0 or absent, you are using a static quorum. If
3851-
it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static
3852-
quorum:<p/>
3853-
<pre><code class="language-bash">
3854-
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5
3855-
Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
3856-
</code></pre><p/>
3857-
3858-
Here is another example of a static quorum:<p/>
3859-
<pre><code class="language-bash">
3860-
Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5
3861-
</code></pre><p/>
3862-
3863-
Here is an example of a dynamic quorum:<p/>
3864-
<pre><code class="language-bash">
3865-
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
3866-
Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
3867-
</code></pre><p/>
3868-
3869-
The static versus dynamic nature of the quorum is determined at the time of formatting.
3870-
Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is
3871-
<b>not</b> present, and if the software version is Apache Kafka 3.9 or newer. If you have
3872-
followed the instructions earlier in this document, you will get a dynamic quorum.<p>
3873-
3874-
If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your
3875-
controllers using the <code>--feature kraft.version=1</code>. (Note that you should not supply
3876-
this flag when formatting brokers -- only when formatting controllers.)<p>
3877-
3878-
<pre><code class="language-bash">
3879-
$ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller_static.properties
3880-
Cannot set kraft.version to 1 unless KIP-853 configuration is present. Try removing the --feature flag for kraft.version.
3881-
</code></pre><p>
3846+
<pre><code class="language-bash">$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe</code></pre>
3847+
<p>
3848+
If the <code>kraft.version</code> field is level 0 or absent, you are using a static quorum. If
3849+
it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static
3850+
quorum:<p>
3851+
<pre><code class="language-bash">Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5
3852+
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5</code></pre>
3853+
<p>
3854+
Here is another example of a static quorum:<p>
3855+
<pre><code class="language-bash">Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5</code></pre>
3856+
<p>
3857+
Here is an example of a dynamic quorum:<p>
3858+
<pre><code class="language-bash">Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
3859+
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5</code></pre>
3860+
<p>
3861+
The static versus dynamic nature of the quorum is determined at the time of formatting.
3862+
Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is
3863+
<b>not</b> present, and one of --standalone, --initial-controllers, or --no-initial-controllers is set.
3864+
If you have followed the instructions earlier in this document, you will get a dynamic quorum.
3865+
<p>
38823866

38833867
Note: Currently it is <b>not</b> possible to convert clusters using a static controller quorum to
38843868
use a dynamic controller quorum. This function will be supported in the future release.

metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public class Formatter {
133133
* The initial KIP-853 voters.
134134
*/
135135
private Optional<DynamicVoters> initialControllers = Optional.empty();
136-
private boolean noInitialControllersFlag = false;
136+
private boolean hasDynamicQuorum = false;
137137

138138
public Formatter setPrintStream(PrintStream printStream) {
139139
this.printStream = printStream;
@@ -210,8 +210,8 @@ public Formatter setInitialControllers(DynamicVoters initialControllers) {
210210
return this;
211211
}
212212

213-
public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) {
214-
this.noInitialControllersFlag = noInitialControllersFlag;
213+
public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
214+
this.hasDynamicQuorum = hasDynamicQuorum;
215215
return this;
216216
}
217217

@@ -220,7 +220,7 @@ public Optional<DynamicVoters> initialVoters() {
220220
}
221221

222222
boolean hasDynamicQuorum() {
223-
return initialControllers.isPresent() || noInitialControllersFlag;
223+
return hasDynamicQuorum;
224224
}
225225

226226
public BootstrapMetadata bootstrapMetadata() {
@@ -336,8 +336,8 @@ Map<String, Short> calculateEffectiveFeatureLevels() {
336336
/**
337337
* Calculate the effective feature level for kraft.version. In order to keep existing
338338
* command-line invocations of StorageTool working, we default this to 0 if no dynamic
339-
* voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments
340-
* were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version).
339+
* voter quorum arguments were provided. As a convenience, if the static voters config is
340+
* empty, we set the latest kraft.version. (Currently there is only 1 non-zero version).
341341
*
342342
* @param configuredKRaftVersionLevel The configured level for kraft.version
343343
* @return The effective feature level.
@@ -346,15 +346,21 @@ private short effectiveKRaftFeatureLevel(Optional<Short> configuredKRaftVersionL
346346
if (configuredKRaftVersionLevel.isPresent()) {
347347
if (configuredKRaftVersionLevel.get() == 0) {
348348
if (hasDynamicQuorum()) {
349-
throw new FormatterException("Cannot set kraft.version to " +
350-
configuredKRaftVersionLevel.get() + " if KIP-853 configuration is present. " +
351-
"Try removing the --feature flag for kraft.version.");
349+
throw new FormatterException(
350+
"Cannot set kraft.version to 0 if controller.quorum.voters is empty and one of the flags " +
351+
"--standalone, --initial-controllers, or --no-initial-controllers is used. For dynamic " +
352+
"controllers support, try removing the --feature flag for kraft.version."
353+
);
352354
}
353355
} else {
354356
if (!hasDynamicQuorum()) {
355-
throw new FormatterException("Cannot set kraft.version to " +
356-
configuredKRaftVersionLevel.get() + " unless KIP-853 configuration is present. " +
357-
"Try removing the --feature flag for kraft.version.");
357+
throw new FormatterException(
358+
"Cannot set kraft.version to " + configuredKRaftVersionLevel.get() +
359+
" unless controller.quorum.voters is empty and one of the flags --standalone, " +
360+
"--initial-controllers, or --no-initial-controllers is used. " +
361+
"For dynamic controllers support, try using one of --standalone, --initial-controllers, " +
362+
"or --no-initial-controllers and removing controller.quorum.voters."
363+
);
358364
}
359365
}
360366
return configuredKRaftVersionLevel.get();

0 commit comments

Comments
 (0)