Skip to content

Commit e3751a8

Browse files
authored
KAFKA-17794: Add some formatting safeguards for KIP-853 (apache#17504)
KIP-853 adds support for dynamic KRaft quorums. This means that the quorum topology is no longer statically determined by the controller.quorum.voters configuration. Instead, it is contained in the storage directories of each controller and broker. Users of dynamic quorums must format at least one controller storage directory with either the --initial-controllers or --standalone flags. If they fail to do this, no quorum can be established. This PR changes the storage tool to warn about the case where a KIP-853 flag has not been supplied to format a KIP-853 controller. (Note that broker storage directories can continue to be formatted without a KIP-853 flag.) There are cases where we don't want to specify initial voters when formatting a controller. One example is where we format a single controller with --standalone, and then dynamically add 4 more controllers with no initial topology. In this case, we want the 4 later controllers to grab the quorum topology from the initial one. To support this case, this PR adds the --no-initial-controllers flag. Reviewers: José Armando García Sancio <[email protected]>, Federico Valeri <[email protected]>
1 parent 7efbed4 commit e3751a8

File tree

6 files changed

+96
-21
lines changed

6 files changed

+96
-21
lines changed

core/src/main/scala/kafka/tools/StorageTool.scala

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.{Exit, Utils}
3030
import org.apache.kafka.server.common.{Features, MetadataVersion}
3131
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
3232
import org.apache.kafka.metadata.storage.{Formatter, FormatterException}
33-
import org.apache.kafka.raft.DynamicVoters
33+
import org.apache.kafka.raft.{DynamicVoters, QuorumConfig}
3434
import org.apache.kafka.server.ProcessRole
3535
import org.apache.kafka.server.config.ReplicationConfigs
3636

@@ -135,9 +135,20 @@ object StorageTool extends Logging {
135135
foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString)))
136136
}
137137
Option(namespace.getString("initial_controllers")).
138-
foreach(v => formatter.setInitialVoters(DynamicVoters.parse(v)))
138+
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
139139
if (namespace.getBoolean("standalone")) {
140-
formatter.setInitialVoters(createStandaloneDynamicVoters(config))
140+
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
141+
}
142+
if (!namespace.getBoolean("no_initial_controllers")) {
143+
if (config.processRoles.contains(ProcessRole.ControllerRole)) {
144+
if (config.quorumConfig.voters().isEmpty) {
145+
if (formatter.initialVoters().isEmpty()) {
146+
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
147+
" is not set on this controller, you must specify one of the following: " +
148+
"--standalone, --initial-controllers, or --no-initial-controllers.");
149+
}
150+
}
151+
}
141152
}
142153
Option(namespace.getList("add_scram")).
143154
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
@@ -238,7 +249,7 @@ object StorageTool extends Logging {
238249
config: KafkaConfig
239250
): DynamicVoters = {
240251
if (!config.processRoles.contains(ProcessRole.ControllerRole)) {
241-
throw new TerseFailure("You cannot use --standalone on a broker node.")
252+
throw new TerseFailure("You can only use --standalone on a controller.")
242253
}
243254
if (config.effectiveAdvertisedControllerListeners.isEmpty) {
244255
throw new RuntimeException("No controller listeners found.")
@@ -306,12 +317,18 @@ object StorageTool extends Logging {
306317

307318
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
308319
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
309-
.help("Used to initialize a single-node quorum controller quorum.")
320+
.help("Used to initialize a controller as a single-node dynamic quorum.")
321+
.action(storeTrue())
322+
323+
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
324+
.help("Used to initialize a server without a dynamic quorum topology.")
310325
.action(storeTrue())
311326

312327
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
313-
.help("The initial controllers, as a comma-separated list of id@hostname:port:directory. The same values must be used to format all nodes. For example:\n" +
314-
"[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
328+
.help("Used to initialize a server with a specific dynamic quorum topology. The argument " +
329+
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
330+
"format all nodes. For example:\n[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" +
331+
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
315332
.action(store())
316333
}
317334

core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,9 @@ Found problem:
177177
defaultDynamicQuorumProperties.setProperty("process.roles", "controller")
178178
defaultDynamicQuorumProperties.setProperty("node.id", "0")
179179
defaultDynamicQuorumProperties.setProperty("controller.listener.names", "CONTROLLER")
180-
defaultDynamicQuorumProperties.setProperty("controller.quorum.voters", "0@localhost:9093")
181-
defaultDynamicQuorumProperties.setProperty("listeners", "CONTROLLER://127.0.0.1:9093")
180+
defaultDynamicQuorumProperties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
181+
defaultDynamicQuorumProperties.setProperty("listeners", "CONTROLLER://:9093")
182+
defaultDynamicQuorumProperties.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093")
182183
defaultDynamicQuorumProperties.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
183184
defaultDynamicQuorumProperties.setProperty(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG , "true")
184185

@@ -378,7 +379,7 @@ Found problem:
378379
properties.setProperty("log.dirs", availableDirs.mkString(","))
379380
val stream = new ByteArrayOutputStream()
380381
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone")
381-
assertEquals("You cannot use --standalone on a broker node.",
382+
assertEquals("You can only use --standalone on a controller.",
382383
assertThrows(classOf[TerseFailure],
383384
() => runFormatCommand(stream, properties, arguments.toSeq)).getMessage)
384385
}
@@ -437,6 +438,56 @@ Found problem:
437438
"Failed to find content in output: " + stream.toString())
438439
}
439440

441+
@ParameterizedTest
442+
@ValueSource(strings = Array("controller", "broker,controller"))
443+
def testFormatWithoutStaticQuorumFailsWithoutInitialControllersOnController(processRoles: String): Unit = {
444+
val availableDirs = Seq(TestUtils.tempDir())
445+
val properties = new Properties()
446+
properties.putAll(defaultDynamicQuorumProperties)
447+
if (processRoles.contains("broker")) {
448+
properties.setProperty("listeners", "PLAINTEXT://:9092,CONTROLLER://:9093")
449+
properties.setProperty("advertised.listeners", "PLAINTEXT://127.0.0.1:9092,CONTROLLER://127.0.0.1:9093")
450+
}
451+
properties.setProperty("process.roles", processRoles)
452+
properties.setProperty("log.dirs", availableDirs.mkString(","))
453+
assertEquals("Because controller.quorum.voters is not set on this controller, you must " +
454+
"specify one of the following: --standalone, --initial-controllers, or " +
455+
"--no-initial-controllers.",
456+
assertThrows(classOf[TerseFailure],
457+
() => runFormatCommand(new ByteArrayOutputStream(), properties,
458+
Seq("--release-version", "3.9-IV0"))).getMessage)
459+
}
460+
461+
@Test
462+
def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
463+
val availableDirs = Seq(TestUtils.tempDir())
464+
val properties = new Properties()
465+
properties.putAll(defaultDynamicQuorumProperties)
466+
properties.setProperty("log.dirs", availableDirs.mkString(","))
467+
val stream = new ByteArrayOutputStream()
468+
assertEquals(0, runFormatCommand(stream, properties,
469+
Seq("--no-initial-controllers", "--release-version", "3.9-IV0")))
470+
assertTrue(stream.toString().
471+
contains("Formatting metadata directory %s".format(availableDirs.head)),
472+
"Failed to find content in output: " + stream.toString())
473+
}
474+
475+
@Test
476+
def testFormatWithoutStaticQuorumSucceedsWithoutInitialControllersOnBroker(): Unit = {
477+
val availableDirs = Seq(TestUtils.tempDir())
478+
val properties = new Properties()
479+
properties.putAll(defaultDynamicQuorumProperties)
480+
properties.setProperty("listeners", "PLAINTEXT://:9092")
481+
properties.setProperty("advertised.listeners", "PLAINTEXT://127.0.0.1:9092")
482+
properties.setProperty("process.roles", "broker")
483+
properties.setProperty("log.dirs", availableDirs.mkString(","))
484+
val stream = new ByteArrayOutputStream()
485+
assertEquals(0, runFormatCommand(stream, properties, Seq("--release-version", "3.9-IV0")))
486+
assertTrue(stream.toString().
487+
contains("Formatting metadata directory %s".format(availableDirs.head)),
488+
"Failed to find content in output: " + stream.toString())
489+
}
490+
440491
private def runVersionMappingCommand(
441492
stream: ByteArrayOutputStream,
442493
releaseVersion: String
@@ -620,7 +671,7 @@ Found problem:
620671
def testBootstrapScramRecords(): Unit = {
621672
val availableDirs = Seq(TestUtils.tempDir())
622673
val properties = new Properties()
623-
properties.putAll(defaultDynamicQuorumProperties)
674+
properties.putAll(defaultStaticQuorumProperties)
624675
properties.setProperty("log.dirs", availableDirs.mkString(","))
625676
val stream = new ByteArrayOutputStream()
626677
val arguments = ListBuffer[String](
@@ -647,7 +698,7 @@ Found problem:
647698
def testScramRecordsOldReleaseVersion(): Unit = {
648699
val availableDirs = Seq(TestUtils.tempDir())
649700
val properties = new Properties()
650-
properties.putAll(defaultDynamicQuorumProperties)
701+
properties.putAll(defaultStaticQuorumProperties)
651702
properties.setProperty("log.dirs", availableDirs.mkString(","))
652703
val stream = new ByteArrayOutputStream()
653704
val arguments = ListBuffer[String](

docs/ops.html

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3817,9 +3817,9 @@ <h5 class="anchor-heading"><a id="kraft_storage_voters" class="anchor-link"></a>
38173817
In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the replica id, 3Db5QLSqSZieL3rJBUUegA is the replica directory id, controller-0 is the replica's host and 1234 is the replica's port.
38183818

38193819
<h5 class="anchor-heading"><a id="kraft_storage_observers" class="anchor-link"></a><a href="#kraft_storage_observers">Formatting Brokers and New Controllers</a></h5>
3820-
When provisioning new broker and controller nodes that we want to add to an existing Kafka cluster, use the <code>kafka-storage.sh format</code> command without the --standalone or --initial-controllers flags.
3820+
When provisioning new broker and controller nodes that we want to add to an existing Kafka cluster, use the <code>kafka-storage.sh format</code> command with the --no-initial-controllers flag.
38213821

3822-
<pre><code class="language-bash">$ bin/kafka-storage.sh format --cluster-id &lt;cluster-id&gt; --config server.properties</code></pre>
3822+
<pre><code class="language-bash">$ bin/kafka-storage.sh format --cluster-id &lt;cluster-id&gt; --config server.properties --no-initial-controllers</code></pre>
38233823

38243824
<h4 class="anchor-heading"><a id="kraft_reconfig" class="anchor-link"></a><a href="#kraft_reconfig">Controller membership changes</a></h4>
38253825

metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,15 @@ public Formatter setMetadataLogDirectory(String metadataLogDirectory) {
202202
return this;
203203
}
204204

205-
public Formatter setInitialVoters(DynamicVoters initialControllers) {
205+
public Formatter setInitialControllers(DynamicVoters initialControllers) {
206206
this.initialControllers = Optional.of(initialControllers);
207207
return this;
208208
}
209209

210+
public Optional<DynamicVoters> initialVoters() {
211+
return initialControllers;
212+
}
213+
210214
boolean hasDynamicQuorum() {
211215
if (initialControllers.isPresent()) {
212216
return true;

metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce
376376
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
377377
}
378378
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
379-
formatter1.formatter.setInitialVoters(DynamicVoters.
379+
formatter1.formatter.setInitialControllers(DynamicVoters.
380380
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
381381
formatter1.formatter.run();
382382
assertEquals(Arrays.asList(
@@ -407,7 +407,7 @@ public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Excep
407407
FormatterContext formatter1 = testEnv.newFormatter();
408408
formatter1.formatter.setFeatureLevel("kraft.version", (short) 0);
409409
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
410-
formatter1.formatter.setInitialVoters(DynamicVoters.
410+
formatter1.formatter.setInitialControllers(DynamicVoters.
411411
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
412412
assertTrue(formatter1.formatter.hasDynamicQuorum());
413413
assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " +
@@ -437,7 +437,7 @@ public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex
437437
FormatterContext formatter1 = testEnv.newFormatter();
438438
formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0);
439439
formatter1.formatter.setFeatureLevel("kraft.version", (short) 1);
440-
formatter1.formatter.setInitialVoters(DynamicVoters.
440+
formatter1.formatter.setInitialControllers(DynamicVoters.
441441
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
442442
formatter1.formatter.setUnstableFeatureVersionsEnabled(true);
443443
assertEquals("kraft.version could not be set to 1 because it depends on " +

tests/kafkatest/services/kafka/kafka.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -869,9 +869,12 @@ def start_node(self, node, timeout_sec=60, **kwargs):
869869
cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID)
870870
if self.dynamicRaftQuorum:
871871
cmd += " --feature kraft.version=1"
872-
if not self.standalone_controller_bootstrapped and self.node_quorum_info.has_controller_role:
873-
cmd += " --standalone"
874-
self.standalone_controller_bootstrapped = True
872+
if self.node_quorum_info.has_controller_role:
873+
if self.standalone_controller_bootstrapped:
874+
cmd += " --no-initial-controllers"
875+
else:
876+
cmd += " --standalone"
877+
self.standalone_controller_bootstrapped = True
875878
self.logger.info("Running log directory format command...\n%s" % cmd)
876879
node.account.ssh(cmd)
877880

0 commit comments

Comments
 (0)