Skip to content

Commit 45a51fe

Browse files
authored
KAFKA-18928 Support dynamically changing configs for a specific dynamic quorum controller (#20938)
Fix https://issues.apache.org/jira/browse/KAFKA-18928 Currently. It only support dynamically changing configs for a specific **static** quorum controller. ### Test Environment Dynamic quorum environment: `controller.quorum.bootstrap.servers=ip1:port,ip2:port,ip3:port ` Static quorum environment: `controller.quorum.voters=id1@ip1:port,id2@ip2:port,id3@ip3:port ` ### Test command `./kafka-configs.sh--bootstrap-controller ip:port --entity-type brokers --entity-name 1001 --command-config /etc/kafka/admin_config.properties --alter --add-config max.connections=10086` Completed updating config for broker 1001. ### Verify command `./kafka-configs.sh --bootstrap-controller ip:port --entity-type brokers --entity-name 1001 --command-config /etc/kafka/admin_config.properties --describe --all |grep -E 'max.connections|node'` max.connections=10086 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:max.connections=100005, DEFAULT_CONFIG:max.connections=2147483647} node.id=1001 sensitive=false synonyms={STATIC_BROKER_CONFIG:node.id=1001} ### Test result The issue on the dynamic quorum: ``` java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.BrokerIdNotRegisteredException: No node with id 1001 found. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:173) at kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:425) at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:192) at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:165) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:87) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) Caused by: org.apache.kafka.common.errors.BrokerIdNotRegisteredException: No node with id 1001 found. ``` <img width="1715" height="205" alt="image" src="https://github.com/user-attachments/assets/e0b096b4-2a1c-48dd-8aaf-0cf9855437df" /> ### The result of the fix <img width="1877" height="226" alt="image" src="https://github.com/user-attachments/assets/b09ca3d5-f470-4f43-936c-f8deb209b406" /> Reviewers: Alyssa Huang <[email protected]>, TaiJuWu <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent aaca67c commit 45a51fe

File tree

10 files changed

+161
-28
lines changed

10 files changed

+161
-28
lines changed

docs/getting-started/upgrade.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type: docs
3232

3333
### Notable changes in 4.3.0
3434

35+
* Support dynamically changing configs for dynamic quorum controllers. Previously only brokers and static quorum controllers were supported. For further details, please refer to [KAFKA-18928](https://issues.apache.org/jira/browse/KAFKA-18928).
3536
* Two new configs have been introduced: `group.coordinator.cached.buffer.max.bytes` and `share.coordinator.cached.buffer.max.bytes`. They allow the respective coordinators to set the maximum buffer size retained for reuse. For further details, please refer to [KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg).
3637
* The new config have been introduced: `remote.log.metadata.topic.min.isr` with 2 as default value. You can correct the min.insync.replicas for the existed __remote_log_metadata topic via kafka-configs.sh if needed. For further details, please refer to [KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).
3738

docs/getting-started/zk2kraft.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ Note that the entity-type must be specified as `broker-loggers`, even though we
153153
154154

155155

156-
It is not currently possible to apply a dynamic configuration on only a single controller.
156+
Prior to version 4.3, dynamic configuration updates were not supported unless a static quorum was used.
157157

158158

159159

metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,10 @@ Map<Integer, BrokerRegistration> brokerRegistrations() {
339339
return brokerRegistrations;
340340
}
341341

342+
Map<Integer, ControllerRegistration> controllerRegistrations() {
343+
return controllerRegistrations;
344+
}
345+
342346
/**
343347
* Process an incoming broker registration request.
344348
*/

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -479,10 +479,8 @@ public void accept(ConfigResource configResource) {
479479
throw new InvalidRequestException("Invalid broker name " +
480480
configResource.name());
481481
}
482-
if (!(clusterControl.brokerRegistrations().containsKey(nodeId) ||
483-
featureControl.isControllerId(nodeId))) {
484-
throw new BrokerIdNotRegisteredException("No node with id " +
485-
nodeId + " found.");
482+
if (!isNodeIdRegistered(nodeId)) {
483+
throw new BrokerIdNotRegisteredException("No node with id " + nodeId + " found.");
486484
}
487485
break;
488486
case TOPIC:
@@ -495,6 +493,19 @@ public void accept(ConfigResource configResource) {
495493
break;
496494
}
497495
}
496+
497+
/**
498+
* Checks if a node id is registered as a broker, controller in static/dynamic quorum.
499+
*/
500+
private boolean isNodeIdRegistered(int nodeId) {
501+
if (clusterControl.brokerRegistrations().containsKey(nodeId)) {
502+
return true;
503+
}
504+
if (featureControl.isControllerId(nodeId)) {
505+
return true;
506+
}
507+
return clusterControl.controllerRegistrations().containsKey(nodeId);
508+
}
498509
}
499510

500511
class QuorumClusterFeatureSupportDescriber implements ClusterFeatureSupportDescriber {

metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,6 +1404,32 @@ public void testConfigResourceExistenceChecker() throws Throwable {
14041404
}
14051405
}
14061406

1407+
@Test
1408+
public void testIsNodeIdRegisteredWithDynamicQuorum() throws Throwable {
1409+
try (
1410+
MockRaftClientTestEnv clientEnv = new MockRaftClientTestEnv.Builder(3).build();
1411+
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(clientEnv).build()
1412+
) {
1413+
QuorumController active = controlEnv.activeController();
1414+
ConfigResourceExistenceChecker checker = active.new ConfigResourceExistenceChecker();
1415+
1416+
// Register dynamic controller with ID 100
1417+
active.registerController(ANONYMOUS_CONTEXT,
1418+
new ControllerRegistrationRequestData()
1419+
.setControllerId(100)
1420+
.setIncarnationId(Uuid.randomUuid())
1421+
.setZkMigrationReady(false)
1422+
.setListeners(new ControllerRegistrationRequestData.ListenerCollection())
1423+
.setFeatures(new ControllerRegistrationRequestData.FeatureCollection())).get();
1424+
1425+
checker.accept(new ConfigResource(BROKER, "100"));
1426+
1427+
// Unregistered node should throw exception
1428+
assertThrows(BrokerIdNotRegisteredException.class,
1429+
() -> checker.accept(new ConfigResource(BROKER, "999")));
1430+
}
1431+
}
1432+
14071433
@Test
14081434
public void testFatalMetadataReplayErrorOnActive() throws Throwable {
14091435
try (

server/src/test/java/org/apache/kafka/server/BootstrapControllersIntegrationTest.java

Lines changed: 94 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
5050
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
5151
import org.apache.kafka.common.errors.UnsupportedVersionException;
52+
import org.apache.kafka.common.metrics.KafkaMetric;
53+
import org.apache.kafka.common.metrics.Metrics;
5254
import org.apache.kafka.common.resource.PatternType;
5355
import org.apache.kafka.common.resource.ResourcePattern;
5456
import org.apache.kafka.common.resource.ResourceType;
@@ -58,6 +60,7 @@
5860
import org.apache.kafka.common.test.api.ClusterTestDefaults;
5961
import org.apache.kafka.common.test.api.Type;
6062
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
63+
import org.apache.kafka.network.SocketServerConfigs;
6164
import org.apache.kafka.server.common.MetadataVersion;
6265
import org.apache.kafka.test.TestUtils;
6366

@@ -226,30 +229,91 @@ public void testIncrementalAlterConfigs(ClusterInstance clusterInstance) throws
226229
}
227230

228231
private void testIncrementalAlterConfigs(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
232+
Collection<Integer> nodeIds = usingBootstrapControllers ?
233+
clusterInstance.controllerIds() : clusterInstance.brokers().keySet();
229234
try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) {
230-
int nodeId = usingBootstrapControllers ?
231-
clusterInstance.controllers().values().iterator().next().config().nodeId() :
232-
clusterInstance.brokers().values().iterator().next().config().nodeId();
233-
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
234-
ConfigResource defaultResource = new ConfigResource(BROKER, "");
235-
Map<ConfigResource, Collection<AlterConfigOp>> alterations = Map.of(
236-
nodeResource, List.of(new AlterConfigOp(new ConfigEntry("my.custom.config", "foo"), AlterConfigOp.OpType.SET)),
237-
defaultResource, List.of(new AlterConfigOp(new ConfigEntry("my.custom.config", "bar"), AlterConfigOp.OpType.SET))
238-
);
239-
admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES);
240-
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
241-
Config config = admin.describeConfigs(List.of(nodeResource)).
242-
all().get(1, TimeUnit.MINUTES).get(nodeResource);
243-
ConfigEntry entry = config.entries().stream().
244-
filter(e -> e.name().equals("my.custom.config")).
245-
findFirst().orElseThrow();
246-
assertEquals(DYNAMIC_BROKER_CONFIG, entry.source(),
247-
"Expected entry for my.custom.config to come from DYNAMIC_BROKER_CONFIG. " +
248-
"Instead, the entry was: " + entry);
249-
});
235+
for (int nodeId : nodeIds) {
236+
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
237+
ConfigResource defaultResource = new ConfigResource(BROKER, "");
238+
String nodeMaxConnectionsValue = String.valueOf(1000 + nodeId);
239+
String defaultMaxConnectionsValue = String.valueOf(2000 + nodeId);
240+
String defaultConnectionRateValue = String.valueOf(2000 + nodeId);
241+
242+
// Set configs: MAX_CONNECTIONS_CONFIG for per-broker, both configs for default
243+
Map<ConfigResource, Collection<AlterConfigOp>> alterations = Map.of(
244+
nodeResource, List.of(
245+
new AlterConfigOp(new ConfigEntry(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, nodeMaxConnectionsValue), AlterConfigOp.OpType.SET)
246+
),
247+
defaultResource, List.of(
248+
new AlterConfigOp(new ConfigEntry(SocketServerConfigs.MAX_CONNECTIONS_CONFIG, defaultMaxConnectionsValue), AlterConfigOp.OpType.SET),
249+
new AlterConfigOp(new ConfigEntry(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, defaultConnectionRateValue), AlterConfigOp.OpType.SET)
250+
)
251+
);
252+
admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES);
253+
254+
// Verify per-broker configs: MAX_CONNECTIONS_CONFIG and MAX_CONNECTION_CREATION_RATE_CONFIG
255+
verifyConfigValue(admin, nodeResource, SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
256+
DYNAMIC_BROKER_CONFIG, nodeMaxConnectionsValue);
257+
verifyConfigValue(admin, nodeResource, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
258+
DYNAMIC_DEFAULT_BROKER_CONFIG, defaultConnectionRateValue);
259+
260+
// Verify default broker configs: MAX_CONNECTIONS_CONFIG and MAX_CONNECTION_CREATION_RATE_CONFIG
261+
verifyConfigValue(admin, defaultResource, SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
262+
DYNAMIC_DEFAULT_BROKER_CONFIG, defaultMaxConnectionsValue);
263+
verifyConfigValue(admin, defaultResource, SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
264+
DYNAMIC_DEFAULT_BROKER_CONFIG, defaultConnectionRateValue);
265+
266+
Object node = (usingBootstrapControllers ? clusterInstance.controllers() : clusterInstance.brokers()).get(nodeId);
267+
// Verify that SocketServer has actually updated the max connection limit in ConnectionQuotas
268+
// The per-broker config should take precedence over the default config
269+
verifySocketServerMaxConnectionsUpdated(node, Integer.parseInt(nodeMaxConnectionsValue));
270+
// Verify MAX_CONNECTION_CREATION_RATE_CONFIG is also updated
271+
// The default config value should be used (per-broker config doesn't set this)
272+
verifySocketServerMaxConnectionCreationRateUpdated(node, Integer.parseInt(defaultConnectionRateValue));
273+
}
250274
}
251275
}
252276

277+
private void verifySocketServerMaxConnectionsUpdated(Object node, int expectedMaxConnections) throws Exception {
278+
Object socketServer = node.getClass().getMethod("socketServer").invoke(node);
279+
Object connectionQuotas = socketServer.getClass().getMethod("connectionQuotas").invoke(socketServer);
280+
java.lang.reflect.Field field = connectionQuotas.getClass().getDeclaredField("brokerMaxConnections");
281+
field.setAccessible(true);
282+
int actualMaxConnections = ((Number) field.get(connectionQuotas)).intValue();
283+
assertEquals(expectedMaxConnections, actualMaxConnections,
284+
"SocketServer ConnectionQuotas.brokerMaxConnections should be " + expectedMaxConnections +
285+
" but was " + actualMaxConnections);
286+
}
287+
288+
private void verifySocketServerMaxConnectionCreationRateUpdated(Object node, int expectedMaxConnectionCreationRate) throws Exception {
289+
Metrics metrics = (Metrics) node.getClass().getMethod("metrics").invoke(node);
290+
KafkaMetric metric = metrics.metrics().entrySet().stream()
291+
.filter(entry -> "broker-connection-accept-rate".equals(entry.getKey().name()))
292+
.map(java.util.Map.Entry::getValue)
293+
.findFirst()
294+
.orElseThrow(() -> new AssertionError("Broker connection rate metric not found"));
295+
double actualBound = metric.config().quota().bound();
296+
assertEquals(expectedMaxConnectionCreationRate, actualBound,
297+
"Connection creation rate quota should be " + expectedMaxConnectionCreationRate + " but was " + actualBound);
298+
}
299+
300+
private void verifyConfigValue(Admin admin, ConfigResource resource, String configName,
301+
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource expectedSource,
302+
String expectedValue) throws Exception {
303+
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
304+
Config config = admin.describeConfigs(List.of(resource)).all().get(1, TimeUnit.MINUTES).get(resource);
305+
ConfigEntry entry = config.entries().stream()
306+
.filter(e -> e.name().equals(configName))
307+
.findFirst().orElseThrow();
308+
assertEquals(expectedSource, entry.source(),
309+
"Expected entry for " + configName + " to come from " + expectedSource +
310+
". Instead, the entry was: " + entry);
311+
assertEquals(expectedValue, entry.value(),
312+
"Expected value for " + configName + " to be " + expectedValue +
313+
". Instead, the value was: " + entry.value());
314+
});
315+
}
316+
253317
@ClusterTest(brokers = 3)
254318
public void testAlterReassignmentsWithBootstrapControllers(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
255319
String topicName = "foo";
@@ -357,4 +421,14 @@ public void testDescribeConfigs(ClusterInstance clusterInstance) throws Exceptio
357421
assertEquals("2", configEntry.value());
358422
}
359423
}
424+
425+
@ClusterTest(controllers = 1, standalone = true)
426+
public void testIncrementalAlterConfigsBySingleControllerWithDynamicQuorum(ClusterInstance clusterInstance) throws Exception {
427+
testIncrementalAlterConfigs(clusterInstance, true);
428+
}
429+
430+
@ClusterTest(controllers = 3, standalone = true)
431+
public void testIncrementalAlterConfigsByAllControllersWithDynamicQuorum(ClusterInstance clusterInstance) throws Exception {
432+
testIncrementalAlterConfigs(clusterInstance, true);
433+
}
360434
}

test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,15 @@ public class ClusterConfig {
5959
private final List<String> tags;
6060
private final Map<Integer, Map<String, String>> perServerProperties;
6161
private final Map<Feature, Short> features;
62+
private final boolean standalone;
6263

6364
@SuppressWarnings("checkstyle:ParameterNumber")
6465
private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPerBroker, boolean autoStart,
6566
SecurityProtocol brokerSecurityProtocol, ListenerName brokerListenerName,
6667
SecurityProtocol controllerSecurityProtocol, ListenerName controllerListenerName, File trustStoreFile,
6768
MetadataVersion metadataVersion, Map<String, String> serverProperties,
68-
Map<Integer, Map<String, String>> perServerProperties, List<String> tags, Map<Feature, Short> features) {
69+
Map<Integer, Map<String, String>> perServerProperties, List<String> tags, Map<Feature, Short> features,
70+
boolean standalone) {
6971
// do fail fast. the following values are invalid for kraft modes.
7072
if (brokers < 0) throw new IllegalArgumentException("Number of brokers must be greater or equal to zero.");
7173
if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero.");
@@ -86,6 +88,7 @@ private ClusterConfig(Set<Type> types, int brokers, int controllers, int disksPe
8688
this.perServerProperties = Objects.requireNonNull(perServerProperties);
8789
this.tags = Objects.requireNonNull(tags);
8890
this.features = Objects.requireNonNull(features);
91+
this.standalone = standalone;
8992
}
9093

9194
public Set<Type> clusterTypes() {
@@ -136,6 +139,10 @@ public MetadataVersion metadataVersion() {
136139
return metadataVersion;
137140
}
138141

142+
public boolean standalone() {
143+
return standalone;
144+
}
145+
139146
public Map<Integer, Map<String, String>> perServerOverrideProperties() {
140147
return perServerProperties;
141148
}
@@ -192,7 +199,8 @@ public static Builder builder(ClusterConfig clusterConfig) {
192199
.setServerProperties(clusterConfig.serverProperties)
193200
.setPerServerProperties(clusterConfig.perServerProperties)
194201
.setTags(clusterConfig.tags)
195-
.setFeatures(clusterConfig.features);
202+
.setFeatures(clusterConfig.features)
203+
.setStandalone(clusterConfig.standalone);
196204
}
197205

198206
public static class Builder {
@@ -211,6 +219,7 @@ public static class Builder {
211219
private Map<Integer, Map<String, String>> perServerProperties = Map.of();
212220
private List<String> tags = List.of();
213221
private Map<Feature, Short> features = Map.of();
222+
private boolean standalone = false;
214223

215224
private Builder() {}
216225

@@ -291,10 +300,15 @@ public Builder setFeatures(Map<Feature, Short> features) {
291300
return this;
292301
}
293302

303+
public Builder setStandalone(boolean standalone) {
304+
this.standalone = standalone;
305+
return this;
306+
}
307+
294308
public ClusterConfig build() {
295309
return new ClusterConfig(types, brokers, controllers, disksPerBroker, autoStart,
296310
brokerSecurityProtocol, brokerListenerName, controllerSecurityProtocol, controllerListenerName,
297-
trustStoreFile, metadataVersion, serverProperties, perServerProperties, tags, features);
311+
trustStoreFile, metadataVersion, serverProperties, perServerProperties, tags, features, standalone);
298312
}
299313
}
300314
}

test-common/test-common-internal-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,5 @@
5757
// users can add tags that they want to display in test
5858
String[] tags() default {};
5959
ClusterFeature[] features() default {};
60+
boolean standalone() default false;
6061
}

test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterTestExtensions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ private List<TestTemplateInvocationContext> processClusterTestInternal(
294294
.setMetadataVersion(clusterTest.metadataVersion())
295295
.setTags(List.of(clusterTest.tags()))
296296
.setFeatures(features)
297+
.setStandalone(clusterTest.standalone())
297298
.build();
298299

299300
return Arrays.stream(types)

test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ public void format() throws Exception {
324324
KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
325325
// Copy properties into the TestKit builder
326326
clusterConfig.serverProperties().forEach(builder::setConfigProp);
327+
builder.setStandalone(clusterConfig.standalone());
327328
this.clusterTestKit = builder.build();
328329
this.clusterTestKit.format();
329330
}

0 commit comments

Comments
 (0)