From f5473df477d425695a2baceb0f1675933fce656a Mon Sep 17 00:00:00 2001 From: Kevin Wu Date: Tue, 30 Sep 2025 09:30:23 -0500 Subject: [PATCH 1/4] KAFKA-19719 --no-initial-controllers should not assume kraft.version=1 (#20604) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` commit ec37eb538b7d7e113b80e09276606395b007127e (HEAD -> KAFKA-19719-cherry-pick-41, origin/KAFKA-19719-cherry-pick-41) Author: Kevin Wu Date: Thu Sep 25 11:56:16 2025 -0500 KAFKA-19719: --no-initial-controllers should not assume kraft.version=1 (#20551) Just because a controller node sets --no-initial-controllers flag does not mean it is necessarily running kraft.version=1. The more precise meaning is that the controller node being formatted does not know what kraft version the cluster should be in, and therefore it is only safe to assume kraft.version=0. Only by setting --standalone,--initial-controllers, or --no-initial-controllers AND not specifying the controller.quorum.voters static config, is it known kraft.version > 0. For example, it is a valid configuration (although confusing) to run a static quorum defined by controller.quorum.voters but have all the controllers format with --no-initial-controllers. In this case, specifying --no-initial-controllers alongside a metadata version that does not support kraft.version=1 causes formatting to fail, which is does not support kraft.version=1 causes formatting to fail, which is a regression. Additionally, the formatter should not check the kraft.version against the release version, since kraft.version does not actually depend on any release version. It should only check the kraft.version against the static voters config/format arguments. This PR also cleans up the integration test framework to match the semantics of formatting an actual cluster. Reviewers: TengYao Chi , Kuan-Po Tseng , Chia-Ping Tsai , José Armando García Sancio Conflicts: core/src/main/scala/kafka/tools/StorageTool.scala Minor conflicts. Keep changes from cherry-pick. core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java Remove auto-join tests, since 4.1 does not support it. docs/ops.html Keep docs section from cherry-pick. metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java Minor conflicts. Keep cherry-picked changes. test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java Conflicts due to integration test framework changes. Keep new changes. commit 02d58b176c32917962ab25b6d685059179d06f26 (upstream/4.1) ``` Reviewers: Chia-Ping Tsai Conflicts: core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java docs/ops.html metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java All minor conflicts. Keep cherry-picked changes. --- .../main/scala/kafka/tools/StorageTool.scala | 38 +++-- .../ReconfigurableQuorumIntegrationTest.java | 50 ++++-- .../kafka/server/KRaftClusterTest.scala | 3 +- .../unit/kafka/tools/StorageToolTest.scala | 14 +- docs/ops.html | 62 +++----- .../kafka/metadata/storage/Formatter.java | 30 ++-- .../kafka/metadata/storage/FormatterTest.java | 145 +++++++++++------- .../kafka/server/common/KRaftVersion.java | 7 +- .../common/test/KafkaClusterTestKit.java | 110 +++++++++---- .../kafka/common/test/TestKitNodes.java | 5 - 10 files changed, 279 insertions(+), 185 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 40892bca38c92..594109e15d260 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -135,21 +135,30 @@ object StorageTool extends Logging { featureNamesAndLevels(_).foreachEntry { (k, v) => formatter.setFeatureLevel(k, v) }) - Option(namespace.getString("initial_controllers")). + val initialControllers = namespace.getString("initial_controllers") + val isStandalone = namespace.getBoolean("standalone") + val staticVotersEmpty = config.quorumConfig.voters().isEmpty + formatter.setHasDynamicQuorum(staticVotersEmpty) + if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) { + throw new TerseFailure("You cannot specify " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " + + "with --initial-controllers or --standalone. " + + "If you want to use dynamic quorum, please remove " + + QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " + + QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.") + } + Option(initialControllers). foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) - if (namespace.getBoolean("standalone")) { + if (isStandalone) { formatter.setInitialControllers(createStandaloneDynamicVoters(config)) } - if (namespace.getBoolean("no_initial_controllers")) { - formatter.setNoInitialControllersFlag(true) - } else { - if (config.processRoles.contains(ProcessRole.ControllerRole)) { - if (config.quorumConfig.voters().isEmpty && formatter.initialVoters().isEmpty) { + if (!namespace.getBoolean("no_initial_controllers") && + config.processRoles.contains(ProcessRole.ControllerRole) && + staticVotersEmpty && + formatter.initialVoters().isEmpty) { throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG + " is not set on this controller, you must specify one of the following: " + "--standalone, --initial-controllers, or --no-initial-controllers."); - } - } } Option(namespace.getList("add_scram")). foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]])) @@ -319,18 +328,21 @@ object StorageTool extends Logging { val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup() reconfigurableQuorumOptions.addArgument("--standalone", "-s") - .help("Used to initialize a controller as a single-node dynamic quorum.") + .help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " + + "the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.") .action(storeTrue()) reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N") - .help("Used to initialize a server without a dynamic quorum topology.") + .help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " + + "the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.") .action(storeTrue()) reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I") - .help("Used to initialize a server with a specific dynamic quorum topology. The argument " + + .help("Used to initialize a server with the specified dynamic quorum. The argument " + "is a comma-separated list of id@hostname:port:directory. The same values must be used to " + "format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" + - "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n") + "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " + + "the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.") .action(store()) } diff --git a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java index 981217ce28770..fe1b6b5da9f83 100644 --- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java +++ b/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java @@ -31,6 +31,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.TreeMap; @@ -83,9 +84,8 @@ public void testCreateAndDestroyReconfigurableCluster() throws Exception { new TestKitNodes.Builder(). setNumBrokerNodes(1). setNumControllerNodes(1). - setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()). build() - ).build()) { + ).setStandalone(true).build()) { cluster.format(); cluster.startup(); try (Admin admin = Admin.create(cluster.clientProperties())) { @@ -107,13 +107,23 @@ static Map findVoterDirs(Admin admin) throws Exception { @Test public void testRemoveController() throws Exception { - try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( - new TestKitNodes.Builder(). - setNumBrokerNodes(1). - setNumControllerNodes(3). - setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()). - build() - ).build()) { + final var nodes = new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(3). + build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes). + setInitialVoterSet(initialVoters). + build() + ) { cluster.format(); cluster.startup(); try (Admin admin = Admin.create(cluster.clientProperties())) { @@ -132,12 +142,22 @@ public void testRemoveController() throws Exception { @Test public void testRemoveAndAddSameController() throws Exception { - try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder( - new TestKitNodes.Builder(). - setNumBrokerNodes(1). - setNumControllerNodes(4). - setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()). - build()).build() + final var nodes = new TestKitNodes.Builder(). + setNumBrokerNodes(1). + setNumControllerNodes(4). + build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes). + setInitialVoterSet(initialVoters). + build() ) { cluster.format(); cluster.startup(); diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala index 4fe4fb48cd85e..2b99be9321fc5 100644 --- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala +++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala @@ -1013,8 +1013,7 @@ class KRaftClusterTest { val cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder(). setNumBrokerNodes(1). - setNumControllerNodes(1). - setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build() + setNumControllerNodes(1).build()).setStandalone(true).build() try { cluster.format() cluster.startup() diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 9fde243ec1997..8440450709324 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -375,7 +375,10 @@ Found problem: def testFormatWithStandaloneFlagOnBrokerFails(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() - properties.putAll(defaultStaticQuorumProperties) + properties.setProperty("process.roles", "broker") + properties.setProperty("node.id", "0") + properties.setProperty("controller.listener.names", "CONTROLLER") + properties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone") @@ -458,19 +461,14 @@ Found problem: Seq("--release-version", "3.9-IV0"))).getMessage) } - @ParameterizedTest - @ValueSource(booleans = Array(false, true)) - def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = { + @Test + def testFormatWithNoInitialControllersSucceedsOnController(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() properties.putAll(defaultDynamicQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers") - if (setKraftVersionFeature) { - arguments += "--feature" - arguments += "kraft.version=1" - } assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq)) assertTrue(stream.toString(). contains("Formatting metadata directory %s".format(availableDirs.head)), diff --git a/docs/ops.html b/docs/ops.html index 5a60a4cde89c6..3d1c99e8522ce 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3869,45 +3869,29 @@
Add New Controller
If a dynamic controller cluster already exists, it can be expanded by first provisioning a new controller using the kafka-storage.sh tool and starting the controller. diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java index acba0f7a04bd9..e79ca41ff80cc 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java @@ -132,7 +132,7 @@ public class Formatter { * The initial KIP-853 voters. */ private Optional initialControllers = Optional.empty(); - private boolean noInitialControllersFlag = false; + private boolean hasDynamicQuorum = false; public Formatter setPrintStream(PrintStream printStream) { this.printStream = printStream; @@ -218,8 +218,8 @@ public Formatter setInitialControllers(DynamicVoters initialControllers) { return this; } - public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) { - this.noInitialControllersFlag = noInitialControllersFlag; + public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) { + this.hasDynamicQuorum = hasDynamicQuorum; return this; } @@ -228,7 +228,7 @@ public Optional initialVoters() { } boolean hasDynamicQuorum() { - return initialControllers.isPresent() || noInitialControllersFlag; + return hasDynamicQuorum; } public BootstrapMetadata bootstrapMetadata() { @@ -338,8 +338,8 @@ Map calculateEffectiveFeatureLevels() { /** * Calculate the effective feature level for kraft.version. In order to keep existing * command-line invocations of StorageTool working, we default this to 0 if no dynamic - * voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments - * were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version). + * voter quorum arguments were provided. As a convenience, if the static voters config is + * empty, we set the latest kraft.version. (Currently there is only 1 non-zero version). * * @param configuredKRaftVersionLevel The configured level for kraft.version * @return The effective feature level. @@ -348,15 +348,21 @@ private short effectiveKRaftFeatureLevel(Optional configuredKRaftVersionL if (configuredKRaftVersionLevel.isPresent()) { if (configuredKRaftVersionLevel.get() == 0) { if (hasDynamicQuorum()) { - throw new FormatterException("Cannot set kraft.version to " + - configuredKRaftVersionLevel.get() + " if KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version."); + throw new FormatterException( + "Cannot set kraft.version to 0 if controller.quorum.voters is empty and one of the flags " + + "--standalone, --initial-controllers, or --no-initial-controllers is used. For dynamic " + + "controllers support, try removing the --feature flag for kraft.version." + ); } } else { if (!hasDynamicQuorum()) { - throw new FormatterException("Cannot set kraft.version to " + - configuredKRaftVersionLevel.get() + " unless KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version."); + throw new FormatterException( + "Cannot set kraft.version to " + configuredKRaftVersionLevel.get() + + " unless controller.quorum.voters is empty and one of the flags --standalone, " + + "--initial-controllers, or --no-initial-controllers is used. " + + "For dynamic controllers support, try using one of --standalone, --initial-controllers, " + + "or --no-initial-controllers and removing controller.quorum.voters." + ); } } return configuredKRaftVersionLevel.get(); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 0706e4e738fdd..83f3a7d7e095a 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.GroupVersion; +import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.TestFeatureVersion; import org.apache.kafka.server.common.TransactionVersion; @@ -194,6 +195,40 @@ public void testIgnoreFormatted() throws Exception { } } + @Test + public void testStandaloneWithIgnoreFormatted() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext formatter1 = testEnv.newFormatter(); + String originalDirectoryId = Uuid.randomUuid().toString(); + String newDirectoryId = Uuid.randomUuid().toString(); + formatter1.formatter + .setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + originalDirectoryId)) + .setHasDynamicQuorum(true) + .run(); + assertEquals("Formatting dynamic metadata voter directory " + testEnv.directory(0) + + " with metadata.version " + MetadataVersion.latestProduction() + ".", + formatter1.output().trim()); + assertMetadataDirectoryId(testEnv, Uuid.fromString(originalDirectoryId)); + + FormatterContext formatter2 = testEnv.newFormatter(); + formatter2.formatter + .setIgnoreFormatted(true) + .setInitialControllers(DynamicVoters.parse("1@localhost:8020:" + newDirectoryId)) + .run(); + assertEquals("All of the log directories are already formatted.", + formatter2.output().trim()); + assertMetadataDirectoryId(testEnv, Uuid.fromString(originalDirectoryId)); + } + } + + private void assertMetadataDirectoryId(TestEnv testEnv, Uuid expectedDirectoryId) throws Exception { + MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). + addLogDirs(testEnv.directories). + load(); + MetaProperties logDirProps0 = ensemble.logDirProps().get(testEnv.directory(0)); + assertEquals(expectedDirectoryId, logDirProps0.directoryId().get()); + } + @Test public void testOneDirectoryFormattedAndOthersNotFormatted() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { @@ -383,14 +418,20 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); if (specifyKRaftVersion) { - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1); } formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.setHasDynamicQuorum(true); formatter1.formatter.run(); +<<<<<<< HEAD assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); assertEquals(Arrays.asList( +======= + assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + assertEquals(List.of( +>>>>>>> 012e4ca6d8 (KAFKA-19719 --no-initial-controllers should not assume kraft.version=1 (#20604)) String.format("Formatting data directory %s with %s %s.", testEnv.directory(1), MetadataVersion.FEATURE_NAME, @@ -416,45 +457,66 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 0); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.setHasDynamicQuorum(true); assertTrue(formatter1.formatter.hasDynamicQuorum()); - assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version.", - assertThrows(FormatterException.class, - () -> formatter1.formatter.run()).getMessage()); + assertEquals( + "Cannot set kraft.version to 0 if controller.quorum.voters is empty " + + "and one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " + + "For dynamic controllers support, try removing the --feature flag for kraft.version.", + assertThrows(FormatterException.class, formatter1.formatter::run).getMessage() + ); } } @Test - public void testFormatWithoutInitialVotersFailsWithNewerKraftVersion() throws Exception { + public void testFormatWithStaticQuorumFailsWithNewerKraftVersion() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); assertFalse(formatter1.formatter.hasDynamicQuorum()); - assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version.", - assertThrows(FormatterException.class, - () -> formatter1.formatter.run()).getMessage()); + assertEquals( + "Cannot set kraft.version to 1 unless controller.quorum.voters is empty and " + + "one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " + + "For dynamic controllers support, try using one of --standalone, --initial-controllers, " + + "or --no-initial-controllers and removing controller.quorum.voters.", + assertThrows(FormatterException.class, formatter1.formatter::run).getMessage() + ); } } @Test - public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Exception { + public void testFormatWithInitialVotersWithOlderMetadataVersion() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); + formatter1.formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, (short) 1); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - assertEquals("kraft.version could not be set to 1 because it depends on " + - "metadata.version level 21", - assertThrows(IllegalArgumentException.class, - () -> formatter1.formatter.run()).getMessage()); + formatter1.formatter.setHasDynamicQuorum(true); + formatter1.formatter.run(); + assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testFormatWithNoInitialControllersWithOlderMetadataVersion(boolean hasDynamicQuorum) throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); + formatter1.formatter.setHasDynamicQuorum(hasDynamicQuorum); + formatter1.formatter.run(); + if (hasDynamicQuorum) { + assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + } else { + assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + } } } @@ -475,6 +537,7 @@ public void testFormatElrEnabledWithMetadataVersions(MetadataVersion metadataVer formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 1); formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + formatter1.formatter.setHasDynamicQuorum(true); if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) { assertDoesNotThrow(() -> formatter1.formatter.run()); } else { @@ -486,21 +549,15 @@ public void testFormatElrEnabledWithMetadataVersions(MetadataVersion metadataVer } } - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception { + @Test + public void testFormatWithNoInitialControllers() throws Exception { try (TestEnv testEnv = new TestEnv(2)) { FormatterContext formatter1 = testEnv.newFormatter(); - if (specifyKRaftVersion) { - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); - } formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setNoInitialControllersFlag(true); - assertTrue(formatter1.formatter.hasDynamicQuorum()); - + assertFalse(formatter1.formatter.hasDynamicQuorum()); formatter1.formatter.run(); - assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); - assertEquals(Arrays.asList( + assertEquals((short) 0, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); + assertEquals(List.of( String.format("Formatting data directory %s with %s %s.", testEnv.directory(1), MetadataVersion.FEATURE_NAME, @@ -519,34 +576,4 @@ public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) thro assertNotNull(logDirProps1); } } - - @Test - public void testFormatWithoutNoInitialControllersFailsWithNewerKraftVersion() throws Exception { - try (TestEnv testEnv = new TestEnv(2)) { - FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); - formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setNoInitialControllersFlag(false); - assertFalse(formatter1.formatter.hasDynamicQuorum()); - assertEquals("Cannot set kraft.version to 1 unless KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version.", - assertThrows(FormatterException.class, - formatter1.formatter::run).getMessage()); - } - } - - @Test - public void testFormatWithNoInitialControllersFailsWithOlderKraftVersion() throws Exception { - try (TestEnv testEnv = new TestEnv(2)) { - FormatterContext formatter1 = testEnv.newFormatter(); - formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); - formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setNoInitialControllersFlag(true); - assertTrue(formatter1.formatter.hasDynamicQuorum()); - assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " + - "Try removing the --feature flag for kraft.version.", - assertThrows(FormatterException.class, - formatter1.formatter::run).getMessage()); - } - } } diff --git a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java index 211f6dcac4424..15d6ee2e43e1f 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java @@ -73,12 +73,7 @@ public MetadataVersion bootstrapMetadataVersion() { @Override public Map dependencies() { - if (this.featureLevel == 0) { - return Collections.emptyMap(); - } else { - return Collections.singletonMap( - MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_3_9_IV0.featureLevel()); - } + return Map.of(); } public short quorumStateVersion() { diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index 20963665440d5..18d8e5b39d524 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -27,10 +27,12 @@ import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.test.api.TestKitDefaults; import org.apache.kafka.common.utils.ThreadUtils; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -42,7 +44,6 @@ import org.apache.kafka.raft.DynamicVoters; import org.apache.kafka.raft.QuorumConfig; import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.config.KRaftConfigs; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.fault.FaultHandler; @@ -114,6 +115,8 @@ public static class Builder { private final String controllerListenerName; private final String brokerSecurityProtocol; private final String controllerSecurityProtocol; + private boolean standalone; + private Optional> initialVoterSet = Optional.empty(); private boolean deleteOnClose; public Builder(TestKitNodes nodes) { @@ -130,6 +133,16 @@ public Builder setConfigProp(String key, Object value) { return this; } + public Builder setStandalone(boolean standalone) { + this.standalone = standalone; + return this; + } + + public Builder setInitialVoterSet(Map initialVoterSet) { + this.initialVoterSet = Optional.of(initialVoterSet); + return this; + } + private KafkaConfig createNodeConfig(TestKitNode node) throws IOException { TestKitNode brokerNode = nodes.brokerNodes().get(node.id()); TestKitNode controllerNode = nodes.controllerNodes().get(node.id()); @@ -168,18 +181,31 @@ private KafkaConfig createNodeConfig(TestKitNode node) throws IOException { props.putIfAbsent(INTER_BROKER_LISTENER_NAME_CONFIG, brokerListenerName); props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, controllerListenerName); - StringBuilder quorumVoterStringBuilder = new StringBuilder(); - String prefix = ""; - for (int nodeId : nodes.controllerNodes().keySet()) { - quorumVoterStringBuilder.append(prefix). - append(nodeId). - append("@"). - append("localhost"). - append(":"). - append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName)); - prefix = ","; + if (!standalone && initialVoterSet.isEmpty()) { + StringBuilder quorumVoterStringBuilder = new StringBuilder(); + String prefix = ""; + for (int nodeId : nodes.controllerNodes().keySet()) { + quorumVoterStringBuilder.append(prefix). + append(nodeId). + append("@"). + append("localhost"). + append(":"). + append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName)); + prefix = ","; + } + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString()); + } else { + StringBuilder bootstrapServersStringBuilder = new StringBuilder(); + String prefix = ""; + for (int nodeId : nodes.controllerNodes().keySet()) { + bootstrapServersStringBuilder.append(prefix). + append("localhost"). + append(":"). + append(socketFactoryManager.getOrCreatePortForListener(nodeId, controllerListenerName)); + prefix = ","; + } + props.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, bootstrapServersStringBuilder.toString()); } - props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, quorumVoterStringBuilder.toString()); // reduce log cleaner offset map memory usage props.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152"); @@ -251,7 +277,7 @@ public KafkaClusterTestKit build() throws Exception { Time.SYSTEM, new Metrics(), CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())), - Collections.emptyList(), + QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()), faultHandlerFactory, socketFactoryManager.getOrCreateSocketFactory(node.id()) ); @@ -279,7 +305,7 @@ public KafkaClusterTestKit build() throws Exception { Time.SYSTEM, new Metrics(), CompletableFuture.completedFuture(QuorumConfig.parseVoterConnections(config.quorumConfig().voters())), - Collections.emptyList(), + QuorumConfig.parseBootstrapServers(config.quorumConfig().bootstrapServers()), faultHandlerFactory, socketFactoryManager.getOrCreateSocketFactory(node.id()) ); @@ -315,7 +341,9 @@ public KafkaClusterTestKit build() throws Exception { baseDirectory, faultHandlerFactory, socketFactoryManager, - Optional.ofNullable(jaasFile), + Optional.of(jaasFile), + standalone, + initialVoterSet, deleteOnClose); } @@ -361,6 +389,8 @@ private static void setupNodeDirectories(File baseDirectory, private final PreboundSocketFactoryManager socketFactoryManager; private final String controllerListenerName; private final Optional jaasFile; + private final boolean standalone; + private final Optional> initialVoterSet; private final boolean deleteOnClose; private KafkaClusterTestKit( @@ -371,6 +401,8 @@ private KafkaClusterTestKit( SimpleFaultHandlerFactory faultHandlerFactory, PreboundSocketFactoryManager socketFactoryManager, Optional jaasFile, + boolean standalone, + Optional> initialVoterSet, boolean deleteOnClose ) { /* @@ -388,6 +420,8 @@ private KafkaClusterTestKit( this.socketFactoryManager = socketFactoryManager; this.controllerListenerName = nodes.controllerListenerName().value(); this.jaasFile = jaasFile; + this.standalone = standalone; + this.initialVoterSet = initialVoterSet; this.deleteOnClose = deleteOnClose; } @@ -422,8 +456,9 @@ private void formatNode( boolean writeMetadataDirectory ) { try { + final var nodeId = ensemble.nodeId().getAsInt(); Formatter formatter = new Formatter(); - formatter.setNodeId(ensemble.nodeId().getAsInt()); + formatter.setNodeId(nodeId); formatter.setClusterId(ensemble.clusterId().get()); if (writeMetadataDirectory) { formatter.setDirectories(ensemble.logDirProps().keySet()); @@ -436,8 +471,6 @@ private void formatNode( return; } formatter.setReleaseVersion(nodes.bootstrapMetadata().metadataVersion()); - formatter.setFeatureLevel(KRaftVersion.FEATURE_NAME, - nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME)); formatter.setUnstableFeatureVersionsEnabled(true); formatter.setIgnoreFormatted(false); formatter.setControllerListenerName(controllerListenerName); @@ -446,18 +479,43 @@ private void formatNode( } else { formatter.setMetadataLogDirectory(Optional.empty()); } - if (nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME) > 0) { - StringBuilder dynamicVotersBuilder = new StringBuilder(); - String prefix = ""; - for (TestKitNode controllerNode : nodes.controllerNodes().values()) { - int port = socketFactoryManager. - getOrCreatePortForListener(controllerNode.id(), controllerListenerName); + StringBuilder dynamicVotersBuilder = new StringBuilder(); + String prefix = ""; + if (standalone) { + if (nodeId == TestKitDefaults.BROKER_ID_OFFSET + TestKitDefaults.CONTROLLER_ID_OFFSET) { + final var controllerNode = nodes.controllerNodes().get(nodeId); + dynamicVotersBuilder.append( + String.format( + "%d@localhost:%d:%s", + controllerNode.id(), + socketFactoryManager. + getOrCreatePortForListener(controllerNode.id(), controllerListenerName), + controllerNode.metadataDirectoryId() + ) + ); + formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString())); + } + // when the nodeId != TestKitDefaults.CONTROLLER_ID_OFFSET, the node is formatting with + // the --no-initial-controllers flag + formatter.setHasDynamicQuorum(true); + } else if (initialVoterSet.isPresent()) { + for (final var controllerNode : initialVoterSet.get().entrySet()) { + final var voterId = controllerNode.getKey(); + final var voterDirectoryId = controllerNode.getValue(); dynamicVotersBuilder.append(prefix); prefix = ","; - dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s", - controllerNode.id(), port, controllerNode.metadataDirectoryId())); + dynamicVotersBuilder.append( + String.format( + "%d@localhost:%d:%s", + voterId, + socketFactoryManager. + getOrCreatePortForListener(voterId, controllerListenerName), + voterDirectoryId + ) + ); } formatter.setInitialControllers(DynamicVoters.parse(dynamicVotersBuilder.toString())); + formatter.setHasDynamicQuorum(true); } formatter.run(); } catch (Exception e) { diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java index 759e86c200bab..cd8879a84db44 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java @@ -94,11 +94,6 @@ public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) { return this; } - public Builder setFeature(String featureName, short level) { - this.bootstrapMetadata = bootstrapMetadata.copyWithFeatureRecord(featureName, level); - return this; - } - public Builder setCombined(boolean combined) { this.combined = combined; return this; From fe3a568bb0c425479aeb33b915a57da2ffc55a96 Mon Sep 17 00:00:00 2001 From: Kevin Wu Date: Tue, 30 Sep 2025 13:32:09 -0500 Subject: [PATCH 2/4] resolve missed conflict --- .../org/apache/kafka/metadata/storage/FormatterTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 83f3a7d7e095a..83946c426fcd6 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -425,13 +425,8 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); formatter1.formatter.setHasDynamicQuorum(true); formatter1.formatter.run(); -<<<<<<< HEAD - assertEquals((short) 1, formatter1.formatter.featureLevels.getOrDefault("kraft.version", (short) 0)); - assertEquals(Arrays.asList( -======= assertEquals((short) 1, formatter1.formatter.featureLevels.get(KRaftVersion.FEATURE_NAME)); assertEquals(List.of( ->>>>>>> 012e4ca6d8 (KAFKA-19719 --no-initial-controllers should not assume kraft.version=1 (#20604)) String.format("Formatting data directory %s with %s %s.", testEnv.directory(1), MetadataVersion.FEATURE_NAME, From 8a97e65fc45866fd2a2db7cb3d7e62b3314d276e Mon Sep 17 00:00:00 2001 From: Kevin Wu Date: Wed, 1 Oct 2025 14:19:51 -0500 Subject: [PATCH 3/4] fix build --- .../main/java/org/apache/kafka/server/common/KRaftVersion.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java index 15d6ee2e43e1f..988d58c3e0ef2 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/KRaftVersion.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.server.common; -import java.util.Collections; import java.util.Map; public enum KRaftVersion implements FeatureVersion { From 520baa971422690e5d853cc1a984a9bd786bce2b Mon Sep 17 00:00:00 2001 From: Kevin Wu Date: Wed, 1 Oct 2025 17:23:29 -0500 Subject: [PATCH 4/4] fixing tests --- .../java/org/apache/kafka/common/test/KafkaClusterTestKit.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index 18d8e5b39d524..1a16a7a0a34c5 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -341,7 +341,7 @@ public KafkaClusterTestKit build() throws Exception { baseDirectory, faultHandlerFactory, socketFactoryManager, - Optional.of(jaasFile), + Optional.ofNullable(jaasFile), standalone, initialVoterSet, deleteOnClose);