diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index c7b4f28e336d8..5b60b6f389a29 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -135,21 +135,30 @@ object StorageTool extends Logging { featureNamesAndLevels(_).foreachEntry { (k, v) => formatter.setFeatureLevel(k, v) }) - Option(namespace.getString("initial_controllers")). + val initialControllers = namespace.getString("initial_controllers") + val isStandalone = namespace.getBoolean("standalone") + val staticVotersEmpty = config.quorumConfig.voters().isEmpty + formatter.setHasDynamicQuorum(staticVotersEmpty) + if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) { + throw new TerseFailure("You cannot specify " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " + + "with --initial-controllers or --standalone. " + + "If you want to use dynamic quorum, please remove " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " + + QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.") + } + Option(initialControllers). foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) - if (namespace.getBoolean("standalone")) { + if (isStandalone) { formatter.setInitialControllers(createStandaloneDynamicVoters(config)) } - if (namespace.getBoolean("no_initial_controllers")) { - formatter.setNoInitialControllersFlag(true) - } else { - if (config.processRoles.contains(ProcessRole.ControllerRole)) { - if (config.quorumConfig.voters().isEmpty && formatter.initialVoters().isEmpty) { + if (!namespace.getBoolean("no_initial_controllers") && + config.processRoles.contains(ProcessRole.ControllerRole) && + staticVotersEmpty && + formatter.initialVoters().isEmpty) { throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG + " is not set on this controller, you must specify one of the following: " + "--standalone, --initial-controllers, or --no-initial-controllers."); - } - } } Option(namespace.getList("add_scram")). foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]])) @@ -319,18 +328,21 @@ object StorageTool extends Logging { val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup() reconfigurableQuorumOptions.addArgument("--standalone", "-s") - .help("Used to initialize a controller as a single-node dynamic quorum.") + .help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " + + "the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.") .action(storeTrue()) reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N") - .help("Used to initialize a server without a dynamic quorum topology.") + .help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " + + "the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.") .action(storeTrue()) reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I") - .help("Used to initialize a server with a specific dynamic quorum topology. The argument " + + .help("Used to initialize a server with the specified dynamic quorum. The argument " + "is a comma-separated list of id@hostname:port:directory. The same values must be used to " + "format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" + - "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n") + "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " + + "the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.") .action(store()) } diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 28b132243e7af..e27902937f020 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -458,19 +458,14 @@ Found problem: Seq("--release-version", "3.9-IV0"))).getMessage) } - @ParameterizedTest - @ValueSource(booleans = Array(false, true)) - def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = { + @Test + def testFormatWithNoInitialControllersSucceedsOnController(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() properties.putAll(defaultDynamicQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers") - if (setKraftVersionFeature) { - arguments += "--feature" - arguments += "kraft.version=1" - } assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq)) assertTrue(stream.toString(). contains("Formatting metadata directory %s".format(availableDirs.head)), diff --git a/docs/ops.html b/docs/ops.html index be0bfe89e8bf9..5ea9e121a72df 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -4072,45 +4072,27 @@
- $ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe -

- - If the kraft.version field is level 0 or absent, you are using a static quorum. If - it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static - quorum:

-


-Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 0 Epoch: 5
-Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
-

- - Here is another example of a static quorum:

-


-Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0  Epoch: 5
-

- - Here is an example of a dynamic quorum:

-


-Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 1 Epoch: 5
-Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
-

- - The static versus dynamic nature of the quorum is determined at the time of formatting. - Specifically, the quorum will be formatted as dynamic if controller.quorum.voters is - not present, and if the software version is Apache Kafka 3.9 or newer. If you have - followed the instructions earlier in this document, you will get a dynamic quorum.

- - If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your - controllers using the --feature kraft.version=1. (Note that you should not supply - this flag when formatting brokers -- only when formatting controllers.)

- -


-  $ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller.properties
-  Cannot set kraft.version to 1 unless KIP-853 configuration is present. Try removing the --feature flag for kraft.version.
-

- - Note: Currently it is not possible to convert clusters using a static controller quorum to - use a dynamic controller quorum. This function will be supported in the future release. +

$ bin/kafka-features.sh --bootstrap-controller localhost:9093 describe
+

+ If the kraft.version field is level 0 or absent, you are using a static quorum. If + it is 1 or above, you are using a dynamic quorum. For example, here is an example of a static + quorum:

+

Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 0 Epoch: 5
+Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
+

+ Here is another example of a static quorum:

+

Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.8-IV0 FinalizedVersionLevel: 3.8-IV0  Epoch: 5
+

+ Here is an example of a dynamic quorum:

+

Feature: kraft.version  SupportedMinVersion: 0  SupportedMaxVersion: 1  FinalizedVersionLevel: 1 Epoch: 5
+Feature: metadata.version       SupportedMinVersion: 3.3-IV3    SupportedMaxVersion: 3.9-IV0 FinalizedVersionLevel: 3.9-IV0  Epoch: 5
+

+ The static versus dynamic nature of the quorum is determined at the time of formatting. + Specifically, the quorum will be formatted as dynamic if controller.quorum.voters is + not present, and one of --standalone, --initial-controllers, or --no-initial-controllers is set. + If you have followed the instructions earlier in this document, you will get a dynamic quorum. +

Upgrade section.

Add New Controller
If a dynamic controller cluster already exists, it can be expanded by first provisioning a new controller using the kafka-storage.sh tool and starting the controller. diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java index 04b52c9e66589..97f53fc902aa0 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java @@ -131,7 +131,7 @@ public class Formatter { * The initial KIP-853 voters. */ private Optional initialControllers = Optional.empty(); - private boolean noInitialControllersFlag = false; + private boolean hasDynamicQuorum = false; public Formatter setPrintStream(PrintStream printStream) { this.printStream = printStream; @@ -217,8 +217,8 @@ public Formatter setInitialControllers(DynamicVoters initialControllers) { return this; } - public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) { - this.noInitialControllersFlag = noInitialControllersFlag; + public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) { + this.hasDynamicQuorum = hasDynamicQuorum; return this; } @@ -227,7 +227,7 @@ public Optional initialVoters() { } boolean hasDynamicQuorum() { - return initialControllers.isPresent() || noInitialControllersFlag; + return hasDynamicQuorum; } public BootstrapMetadata bootstrapMetadata() { @@ -337,8 +337,8 @@ Map calculateEffectiveFeatureLevels() { /** * Calculate the effective feature level for kraft.version. In order to keep existing * command-line invocations of StorageTool working, we default this to 0 if no dynamic - * voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments - * were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version). + * voter quorum arguments were provided. As a convenience, if the static voters config is + * empty, we set the latest kraft.version. (Currently there is only 1 non-zero version). * * @param configuredKRaftVersionLevel The configured level for kraft.version * @return The effective feature level. @@ -348,20 +348,19 @@ private short effectiveKRaftFeatureLevel(Optional configuredKRaftVersionL if (configuredKRaftVersionLevel.get() == 0) { if (hasDynamicQuorum()) { throw new FormatterException( - "Cannot set kraft.version to " + - configuredKRaftVersionLevel.get() + - " if one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " + - "For dynamic controllers support, try removing the --feature flag for kraft.version." + "Cannot set kraft.version to 0 if controller.quorum.voters is empty and one of the flags " + + "--standalone, --initial-controllers, or --no-initial-controllers is used. For dynamic " + + "controllers support, try removing the --feature flag for kraft.version." ); } } else { if (!hasDynamicQuorum()) { throw new FormatterException( - "Cannot set kraft.version to " + - configuredKRaftVersionLevel.get() + - " unless one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " + - "For dynamic controllers support, try using one of --standalone, --initial-controllers, or " + - "--no-initial-controllers." + "Cannot set kraft.version to " + configuredKRaftVersionLevel.get() + + " unless controller.quorum.voters is empty and one of the flags --standalone, " + + "--initial-controllers, or --no-initial-controllers is used. " + + "For dynamic controllers support, try using one of --standalone, --initial-controllers, " + + "or --no-initial-controllers and removing controller.quorum.voters." ); } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 5ddcd2d88890d..8a41fdd6aa980 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.GroupVersion; +import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.TestFeatureVersion; import org.apache.kafka.server.common.TransactionVersion; @@ -200,6 +201,7 @@ public void testStandaloneWithIgnoreFormatted() throws Exception { String newDirectoryId = Uuid.randomUuid().toString(); formatter1.formatter .setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + originalDirectoryId)) + .setHasDynamicQuorum(true) .run(); assertEquals("Formatting dynamic metadata voter directory " + testEnv.directory(0) + " with metadata.version " + MetadataVersion.latestProduction() + ".", @@ -417,13 +419,14 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); if (specifyKRaftVersion) { - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1); } formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.setHasDynamicQuorum(true); formatter1.formatter.run(); - assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); + assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); assertEquals(List.of( String.format("Formatting data directory %s with %s %s.", testEnv.directory(1), @@ -450,49 +453,66 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 0); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.setHasDynamicQuorum(true); assertTrue(formatter1.formatter.hasDynamicQuorum()); assertEquals( - "Cannot set kraft.version to 0 if one of the flags --standalone, --initial-controllers, or " + - "--no-initial-controllers is used. For dynamic controllers support, try removing the " + - "--feature flag for kraft.version.", - assertThrows(FormatterException.class, () -> formatter1.formatter.run()).getMessage() + "Cannot set kraft.version to 0 if controller.quorum.voters is empty " + + "and one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " + + "For dynamic controllers support, try removing the --feature flag for kraft.version.", + assertThrows(FormatterException.class, formatter1.formatter::run).getMessage() ); } } @Test - public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() throws Exception { + public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); assertFalse(formatter1.formatter.hasDynamicQuorum()); assertEquals( - "Cannot set kraft.version to 1 unless one of the flags --standalone, --initial-controllers, or " + - "--no-initial-controllers is used. For dynamic controllers support, try using one of " + - "--standalone, --initial-controllers, or --no-initial-controllers.", - assertThrows(FormatterException.class, () -> formatter1.formatter.run()).getMessage() + "Cannot set kraft.version to 1 unless controller.quorum.voters is empty and " + + "one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " + + "For dynamic controllers support, try using one of --standalone, --initial-controllers, " + + "or --no-initial-controllers and removing controller.quorum.voters.", + assertThrows(FormatterException.class, formatter1.formatter::run).getMessage() ); } } @Test - public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Exception { + public void testFormatWithInitialVotersWithOlderMetadataVersion() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - assertEquals("kraft.version could not be set to 1 because it depends on " + - "metadata.version level 21", - assertThrows(IllegalArgumentException.class, - () -> formatter1.formatter.run()).getMessage()); + formatter1.formatter.setHasDynamicQuorum(true); + formatter1.formatter.run(); + assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean hasDynamicQuorum) throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); + formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum); + formatter1.formatter.run(); + if (hasDynamicQuorum) { + assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + } else { + assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + } } } @@ -513,6 +533,7 @@ public void testFormatElrEnabledWithMetadataVersions(MetadataVersion metadataVer formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 1); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.setHasDynamicQuorum(true); if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) { assertDoesNotThrow(() -> formatter1.formatter.run()); } else { @@ -524,20 +545,14 @@ public void testFormatElrEnabledWithMetadataVersions(MetadataVersion metadataVer } } - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception { + @Test + public void testFormatWithNoInitialControllers() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - if (specifyKRaftVersion) { - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); - } formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setNoInitialControllersFlag(true); - assertTrue(formatter1.formatter.hasDynamicQuorum()); - + assertFalse(formatter1.formatter.hasDynamicQuorum()); formatter1.formatter.run(); - assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); + assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); assertEquals(List.of( String.format("Formatting data directory %s with %s %s.", testEnv.directory(1), @@ -557,38 +572,4 @@ public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) thro assertNotNull(logDirProps1); } } - - @Test - public void testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws Exception { - try (TestEnv testEnv = new TestEnv(2)) { - FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); - formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setNoInitialControllersFlag(false); - assertFalse(formatter1.formatter.hasDynamicQuorum()); - assertEquals( - "Cannot set kraft.version to 1 unless one of the flags --standalone, --initial-controllers, or " + - "--no-initial-controllers is used. For dynamic controllers support, try using one of " + - "--standalone, --initial-controllers, or --no-initial-controllers.", - assertThrows(FormatterException.class, formatter1.formatter::run).getMessage() - ); - } - } - - @Test - public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() throws Exception { - try (TestEnv testEnv = new TestEnv(2)) { - FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); - formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setNoInitialControllersFlag(true); - assertTrue(formatter1.formatter.hasDynamicQuorum()); - assertEquals( - "Cannot set kraft.version to 0 if one of the flags --standalone, --initial-controllers, or " + - "--no-initial-controllers is used. For dynamic controllers support, try removing the " + - "--feature flag for kraft.version.", - assertThrows(FormatterException.class, formatter1.formatter::run).getMessage() - ); - } - } } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java index 463cc2a015ce3..d797880c77667 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java @@ -72,12 +72,7 @@ public MetadataVersion bootstrapMetadataVersion() { @Override public Map dependencies() { - if (this.featureLevel == 0) { - return Map.of(); - } else { - return Map.of( - MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_9_IV0.featureLevel()); - } + return Map.of(); } public boolean isAtLeast(KRaftVersion otherVersion) {