Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,30 @@ object StorageTool extends Logging {
featureNamesAndLevels(_).foreachEntry {
(k, v) => formatter.setFeatureLevel(k, v)
})
Option(namespace.getString("initial_controllers")).
val initialControllers = namespace.getString("initial_controllers")
val isStandalone = namespace.getBoolean("standalone")
val staticVotersEmpty = config.quorumConfig.voters().isEmpty
formatter.setHasDynamicQuorum(staticVotersEmpty)
if (!staticVotersEmpty && (Option(initialControllers).isDefined || isStandalone)) {
throw new TerseFailure("You cannot specify " +
QuorumConfig.QUORUM_VOTERS_CONFIG + " and format the node " +
"with --initial-controllers or --standalone. " +
"If you want to use dynamic quorum, please remove " +
QuorumConfig.QUORUM_VOTERS_CONFIG + " and specify " +
QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG + " instead.")
}
Option(initialControllers).
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v)))
if (namespace.getBoolean("standalone")) {
if (isStandalone) {
formatter.setInitialControllers(createStandaloneDynamicVoters(config))
}
if (namespace.getBoolean("no_initial_controllers")) {
formatter.setNoInitialControllersFlag(true)
} else {
if (config.processRoles.contains(ProcessRole.ControllerRole)) {
if (config.quorumConfig.voters().isEmpty && formatter.initialVoters().isEmpty) {
if (!namespace.getBoolean("no_initial_controllers") &&
config.processRoles.contains(ProcessRole.ControllerRole) &&
staticVotersEmpty &&
formatter.initialVoters().isEmpty) {
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG +
" is not set on this controller, you must specify one of the following: " +
"--standalone, --initial-controllers, or --no-initial-controllers.");
}
}
}
Option(namespace.getList("add_scram")).
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]]))
Expand Down Expand Up @@ -319,18 +328,21 @@ object StorageTool extends Logging {

val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup()
reconfigurableQuorumOptions.addArgument("--standalone", "-s")
.help("Used to initialize a controller as a single-node dynamic quorum.")
.help("Used to initialize a controller as a single-node dynamic quorum. When setting this flag, " +
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue())

reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N")
.help("Used to initialize a server without a dynamic quorum topology.")
.help("Used to initialize a server without specifying a dynamic quorum. When setting this flag, " +
"the controller.quorum.voters config should not be set, and controller.quorum.bootstrap.servers is set instead.")
.action(storeTrue())

reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I")
.help("Used to initialize a server with a specific dynamic quorum topology. The argument " +
.help("Used to initialize a server with the specified dynamic quorum. The argument " +
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " +
"format all nodes. For example:\[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" +
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n")
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n. When setting this flag, " +
"the controller.quorum.voters config must not be set, and controller.quorum.bootstrap.servers is set instead.")
.action(store())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -83,9 +84,8 @@ public void testCreateAndDestroyReconfigurableCluster() throws Exception {
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
build()
).build()) {
).setStandalone(true).build()) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Expand All @@ -107,13 +107,23 @@ static Map<Integer, Uuid> findVoterDirs(Admin admin) throws Exception {

@Test
public void testRemoveController() throws Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(3).
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
build()
).build()) {
final var nodes = new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(3).
build();

final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}

try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
setInitialVoterSet(initialVoters).
build()
) {
cluster.format();
cluster.startup();
try (Admin admin = Admin.create(cluster.clientProperties())) {
Expand All @@ -132,12 +142,22 @@ public void testRemoveController() throws Exception {

@Test
public void testRemoveAndAddSameController() throws Exception {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(4).
setFeature(KRaftVersion.FEATURE_NAME, KRaftVersion.KRAFT_VERSION_1.featureLevel()).
build()).build()
final var nodes = new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(4).
build();

final Map<Integer, Uuid> initialVoters = new HashMap<>();
for (final var controllerNode : nodes.controllerNodes().values()) {
initialVoters.put(
controllerNode.id(),
controllerNode.metadataDirectoryId()
);
}

try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).
setInitialVoterSet(initialVoters).
build()
) {
cluster.format();
cluster.startup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1011,8 +1011,7 @@ class KRaftClusterTest {
val cluster = new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder().
setNumBrokerNodes(1).
setNumControllerNodes(1).
setFeature(KRaftVersion.FEATURE_NAME, 1.toShort).build()).build()
setNumControllerNodes(1).build()).setStandalone(true).build()
try {
cluster.format()
cluster.startup()
Expand Down
14 changes: 6 additions & 8 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,10 @@ Found problem:
def testFormatWithStandaloneFlagOnBrokerFails(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("process.roles", "broker")
properties.setProperty("node.id", "0")
properties.setProperty("controller.listener.names", "CONTROLLER")
properties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093")
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone")
Expand Down Expand Up @@ -458,19 +461,14 @@ Found problem:
Seq("--release-version", "3.9-IV0"))).getMessage)
}

@ParameterizedTest
@ValueSource(booleans = Array(false, true))
def testFormatWithNoInitialControllersSucceedsOnController(setKraftVersionFeature: Boolean): Unit = {
@Test
def testFormatWithNoInitialControllersSucceedsOnController(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--no-initial-controllers")
if (setKraftVersionFeature) {
arguments += "--feature"
arguments += "kraft.version=1"
}
assertEquals(0, runFormatCommand(stream, properties, arguments.toSeq))
assertTrue(stream.toString().
contains("Formatting metadata directory %s".format(availableDirs.head)),
Expand Down
10 changes: 2 additions & 8 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -4089,14 +4089,8 @@ <h5 class="anchor-heading"><a id="static_versus_dynamic_kraft_quorums" class="an
<p>
The static versus dynamic nature of the quorum is determined at the time of formatting.
Specifically, the quorum will be formatted as dynamic if <code>controller.quorum.voters</code> is
<b>not</b> present, and if the software version is Apache Kafka 3.9 or newer. If you have
followed the instructions earlier in this document, you will get a dynamic quorum.<p>

If you would like the formatting process to fail if a dynamic quorum cannot be achieved, format your
controllers using the <code>--feature kraft.version=1</code>. (Note that you should not supply
this flag when formatting brokers -- only when formatting controllers.)<p>

<pre><code class="language-bash">$ bin/kafka-storage.sh format -t KAFKA_CLUSTER_ID --feature kraft.version=1 -c controller.properties</code></pre>
<b>not</b> present, and one of --standalone, --initial-controllers, or --no-initial-controllers is set.
If you have followed the instructions earlier in this document, you will get a dynamic quorum.
<p>
Note: To migrate from static voter set to dynamic voter set, please refer to the <a href="#kraft_upgrade">Upgrade</a> section.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public class Formatter {
* The initial KIP-853 voters.
*/
private Optional<DynamicVoters> initialControllers = Optional.empty();
private boolean noInitialControllersFlag = false;
private boolean hasDynamicQuorum = false;

public Formatter setPrintStream(PrintStream printStream) {
this.printStream = printStream;
Expand Down Expand Up @@ -217,8 +217,8 @@ public Formatter setInitialControllers(DynamicVoters initialControllers) {
return this;
}

public Formatter setNoInitialControllersFlag(boolean noInitialControllersFlag) {
this.noInitialControllersFlag = noInitialControllersFlag;
public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) {
this.hasDynamicQuorum = hasDynamicQuorum;
return this;
}

Expand All @@ -227,7 +227,7 @@ public Optional<DynamicVoters> initialVoters() {
}

boolean hasDynamicQuorum() {
return initialControllers.isPresent() || noInitialControllersFlag;
return hasDynamicQuorum;
}

public BootstrapMetadata bootstrapMetadata() {
Expand Down Expand Up @@ -337,8 +337,8 @@ Map<String, Short> calculateEffectiveFeatureLevels() {
/**
* Calculate the effective feature level for kraft.version. In order to keep existing
* command-line invocations of StorageTool working, we default this to 0 if no dynamic
* voter quorum arguments were provided. As a convenience, if dynamic voter quorum arguments
* were passed, we set the latest kraft.version. (Currently there is only 1 non-zero version).
* voter quorum arguments were provided. As a convenience, if the static voters config is
* empty, we set the latest kraft.version. (Currently there is only 1 non-zero version).
*
* @param configuredKRaftVersionLevel The configured level for kraft.version
* @return The effective feature level.
Expand All @@ -348,20 +348,19 @@ private short effectiveKRaftFeatureLevel(Optional<Short> configuredKRaftVersionL
if (configuredKRaftVersionLevel.get() == 0) {
if (hasDynamicQuorum()) {
throw new FormatterException(
"Cannot set kraft.version to " +
configuredKRaftVersionLevel.get() +
" if one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
"For dynamic controllers support, try removing the --feature flag for kraft.version."
"Cannot set kraft.version to 0 if controller.quorum.voters is empty and one of the flags " +
"--standalone, --initial-controllers, or --no-initial-controllers is used. For dynamic " +
"controllers support, try removing the --feature flag for kraft.version."
);
}
} else {
if (!hasDynamicQuorum()) {
throw new FormatterException(
"Cannot set kraft.version to " +
configuredKRaftVersionLevel.get() +
" unless one of the flags --standalone, --initial-controllers, or --no-initial-controllers is used. " +
"For dynamic controllers support, try using one of --standalone, --initial-controllers, or " +
"--no-initial-controllers."
"Cannot set kraft.version to " + configuredKRaftVersionLevel.get() +
" unless controller.quorum.voters is empty and one of the flags --standalone, " +
"--initial-controllers, or --no-initial-controllers is used. " +
"For dynamic controllers support, try using one of --standalone, --initial-controllers, " +
"or --no-initial-controllers and removing controller.quorum.voters."
);
}
}
Expand Down
Loading