diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 4aa70bab8ba1d..8a8ffc11b97bf 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -128,21 +128,30 @@ object StorageTool extends Logging { featureNamesAndLevels(_).foreach { kv => formatter.setFeatureLevel(kv._1, kv._2) }) - Option(namespace.getString("initial_controllers")). + val initialControllers = namespace.getString("initial_controllers") + val isStandalone = namespace.getBoolean("standalone") + val staticVotersEmpty = config.quorumVoters.isEmpty + formatter.setHasDynamicQuorum(staticVotersEmpty) + if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) { + throw new TerseFailure("You cannot specify " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " + + "with --initial-controllers or --standalone. " + + "If you want to use dynamic quorum, please remove " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " + + QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.") + } + Option(initialControllers). foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) - if (namespace.getBoolean("standalone")) { + if (isStandalone) { formatter.setInitialControllers(createStandaloneDynamicVoters(config)) } - if (namespace.getBoolean("no_initial_controllers")) { - formatter.setNoInitialControllersFlag(true) - } else { - if (config.processRoles.contains(ProcessRole.ControllerRole)) { - if (config.quorumVoters.isEmpty() && !formatter.initialVoters().isPresent()) { + if (!namespace.getBoolean("no_initial_controllers") && + config.processRoles.contains(ProcessRole.ControllerRole) && + staticVotersEmpty && + !formatter.initialVoters().isPresent) { throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG + " is not set on this controller, you must specify one of the following: " + "--standalone, --initial-controllers, or --no-initial-controllers."); - } - } } Option(namespace.getList("add_scram")). foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]])) @@ -206,18 +215,21 @@ object StorageTool extends Logging { action(append()) val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup() reconfigurableQuorumOptions.addArgument("--standalone", "-s") - .help("Used to initialize a controller as a single-node dynamic quorum.") + .help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " + + "the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.") .action(storeTrue()) reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N") - .help("Used to initialize a server without a dynamic quorum topology.") + .help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " + + "the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.") .action(storeTrue()) reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I") - .help("Used to initialize a server with a specific dynamic quorum topology. The argument " + + .help("Used to initialize a server with the specified dynamic quorum. The argument " + "is a comma-separated list of id@hostname:port:directory. The same values must be used to " + "format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" + - "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n") + "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " + + "the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.") .action(store()) parser.parseArgs(args) } diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 926e851ac98e8..f8507708d1177 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -400,7 +400,10 @@ Found problem: def testFormatWithStandaloneFlagOnBrokerFails(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() - properties.putAll(defaultStaticQuorumProperties) + properties.setProperty("process.roles", "broker") + properties.setProperty("node.id", "0") + properties.setProperty("controller.listener.names", "CONTROLLER") + properties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone") @@ -483,19 +486,14 @@ Found problem: Seq("--release-version", "3.9-IV0"))).getMessage) } - @ParameterizedTest - @ValueSource(booleans = Array(false, true)) - def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = { + @Test + def testFormatWithNoInitialControllersSucceedsOnController(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() properties.putAll(defaultDynamicQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers") - if (setKraftVersionFeature) { - arguments += "--feature" - arguments += "kraft.version=1" - } assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq)) assertTrue(stream.toString(). contains("Formatting metadata directory %s".format(availableDirs.head)), diff --git a/docs/ops.html b/docs/ops.html index e8309187a5686..455c831f0ec14 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3843,42 +3843,26 @@
-
- If the kraft.version
field is level 0 or absent, you are using a static quorum. If
- it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static
- quorum:
-Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5
-Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
-
-
- Here is another example of a static quorum:
-
-Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5
-
-
- Here is an example of a dynamic quorum:
-
-Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
-Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
-
-
- The static versus dynamic nature of the quorum is determined at the time of formatting.
- Specifically, the quorum will be formatted as dynamic if controller.quorum.voters
is
- not present, and if the software version is Apache Kafka 3.9 or newer. If you have
- followed the instructions earlier in this document, you will get a dynamic quorum.
-
- If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your
- controllers using the --feature kraft.version=1
. (Note that you should not supply
- this flag when formatting brokers -- only when formatting controllers.)
- -
- $ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller_static.properties
- Cannot set kraft.version to 1 unless KIP-853 configuration is present. Try removing the --feature flag for kraft.version.
-
+
$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe
+
+ If the kraft.version
field is level 0 or absent, you are using a static quorum. If
+ it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static
+ quorum:
+
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 0 Epoch: 5
+Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
+ + Here is another example of a static quorum:
+
Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0 Epoch: 5
+ + Here is an example of a dynamic quorum:
+
Feature: kraft.version SupportedMinVersion: 0 SupportedMaxVersion: 1 FinalizedVersionLevel: 1 Epoch: 5
+Feature: metadata.version SupportedMinVersion: 3.3-IV3 SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0 Epoch: 5
+
+ The static versus dynamic nature of the quorum is determined at the time of formatting.
+ Specifically, the quorum will be formatted as dynamic if controller.quorum.voters
is
+ not present, and one of --standalone, --initial-controllers, or --no-initial-controllers is set.
+ If you have followed the instructions earlier in this document, you will get a dynamic quorum.
+
Note: Currently it is not possible to convert clusters using a static controller quorum to
use a dynamic controller quorum. This function will be supported in the future release.
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 9d642e1c1ee90..16e380c0ec5d1 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -133,7 +133,7 @@ public class Formatter {
* The initial KIP-853 voters.
*/
private Optional