Skip to content

Commit 423330e

Browse files
authored
KAFKA-19692 improve the docs of "clusterId" for AddRaftVoterOptions and RemoveRaftVoterOptions (#20555)
Improves the documentation of the clusterId field in AddRaftVoterOptions and RemoveRaftVoterOptions. The changes include: 1. Adding Javadoc to both addRaftVoter and removeRaftVoter methods to explain the behavior of the optional clusterId. 2. Integration tests have been added to verify the correct behavior of add and remove voter operations with and without clusterId, including scenarios with inconsistent cluster ids. Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent d1a8212 commit 423330e

File tree

4 files changed

+167
-2
lines changed

4 files changed

+167
-2
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.clients.admin;
19+
20+
import org.apache.kafka.common.Uuid;
21+
import org.apache.kafka.common.errors.InconsistentClusterIdException;
22+
import org.apache.kafka.common.test.KafkaClusterTestKit;
23+
import org.apache.kafka.common.test.TestKitNodes;
24+
import org.apache.kafka.test.TestUtils;
25+
26+
import org.junit.jupiter.api.Tag;
27+
import org.junit.jupiter.api.Test;
28+
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
import java.util.Optional;
32+
import java.util.Set;
33+
import java.util.concurrent.ExecutionException;
34+
import java.util.stream.Collectors;
35+
36+
import static org.junit.jupiter.api.Assertions.assertEquals;
37+
38+
@Tag("integration")
39+
public class ReconfigurableQuorumIntegrationTest {
40+
41+
static Map<Integer, Uuid> descVoterDirs(Admin admin) throws ExecutionException, InterruptedException {
42+
var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
43+
return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId));
44+
}
45+
46+
@Test
47+
public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
48+
final var nodes = new TestKitNodes.Builder()
49+
.setClusterId("test-cluster")
50+
.setNumBrokerNodes(1)
51+
.setNumControllerNodes(3)
52+
.build();
53+
54+
final Map<Integer, Uuid> initialVoters = new HashMap<>();
55+
for (final var controllerNode : nodes.controllerNodes().values()) {
56+
initialVoters.put(
57+
controllerNode.id(),
58+
controllerNode.metadataDirectoryId()
59+
);
60+
}
61+
62+
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
63+
cluster.format();
64+
cluster.startup();
65+
try (Admin admin = Admin.create(cluster.clientProperties())) {
66+
TestUtils.waitForCondition(() -> {
67+
Map<Integer, Uuid> voters = descVoterDirs(admin);
68+
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
69+
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
70+
}, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs");
71+
72+
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
73+
admin.removeRaftVoter(
74+
3000,
75+
dirId,
76+
new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
77+
).all().get();
78+
TestUtils.waitForCondition(() -> {
79+
Map<Integer, Uuid> voters = descVoterDirs(admin);
80+
assertEquals(Set.of(3001, 3002), voters.keySet());
81+
return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID));
82+
}, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs");
83+
84+
admin.addRaftVoter(
85+
3000,
86+
dirId,
87+
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
88+
new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
89+
).all().get();
90+
}
91+
}
92+
}
93+
94+
@Test
95+
public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception {
96+
final var nodes = new TestKitNodes.Builder()
97+
.setClusterId("test-cluster")
98+
.setNumBrokerNodes(1)
99+
.setNumControllerNodes(3)
100+
.build();
101+
102+
final Map<Integer, Uuid> initialVoters = new HashMap<>();
103+
for (final var controllerNode : nodes.controllerNodes().values()) {
104+
initialVoters.put(
105+
controllerNode.id(),
106+
controllerNode.metadataDirectoryId()
107+
);
108+
}
109+
110+
try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
111+
cluster.format();
112+
cluster.startup();
113+
try (Admin admin = Admin.create(cluster.clientProperties())) {
114+
Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
115+
var removeFuture = admin.removeRaftVoter(
116+
3000,
117+
dirId,
118+
new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
119+
).all();
120+
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture);
121+
122+
var addFuture = admin.addRaftVoter(
123+
3000,
124+
dirId,
125+
Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)),
126+
new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
127+
).all();
128+
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
129+
}
130+
}
131+
}
132+
}

clients/src/main/java/org/apache/kafka/clients/admin/AddRaftVoterOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,20 @@
1717
package org.apache.kafka.clients.admin;
1818

1919
import org.apache.kafka.common.annotation.InterfaceStability;
20+
import org.apache.kafka.common.protocol.Errors;
2021

2122
import java.util.Optional;
2223

2324
/**
2425
* Options for {@link Admin#addRaftVoter}.
26+
*
27+
* <p>
28+
* The clusterId is optional.
29+
* <p>
30+
* If provided, the request will only succeed if the cluster id matches the id of the current cluster.
31+
* If the cluster id does not match, the request will fail with {@link Errors#INCONSISTENT_CLUSTER_ID}.
32+
* <p>
33+
* If not provided, the cluster id check is skipped.
2534
*/
2635
@InterfaceStability.Stable
2736
public class AddRaftVoterOptions extends AbstractOptions<AddRaftVoterOptions> {

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.kafka.common.annotation.InterfaceStability;
3333
import org.apache.kafka.common.config.ConfigResource;
3434
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
35+
import org.apache.kafka.common.errors.InconsistentClusterIdException;
3536
import org.apache.kafka.common.errors.InterruptException;
3637
import org.apache.kafka.common.metrics.KafkaMetric;
3738
import org.apache.kafka.common.quota.ClientQuotaAlteration;
@@ -1866,10 +1867,17 @@ default AddRaftVoterResult addRaftVoter(
18661867
/**
18671868
* Add a new voter node to the KRaft metadata quorum.
18681869
*
1870+
* <p>
1871+
* The clusterId in {@link AddRaftVoterOptions} is optional.
1872+
* If provided, the operation will only succeed if the cluster id matches the id
1873+
* of the current cluster. If the cluster id does not match, the operation
1874+
* will fail with {@link InconsistentClusterIdException}.
1875+
* If not provided, the cluster id check is skipped.
1876+
*
18691877
* @param voterId The node ID of the voter.
18701878
* @param voterDirectoryId The directory ID of the voter.
18711879
* @param endpoints The endpoints that the new voter has.
1872-
* @param options The options to use when adding the new voter node.
1880+
* @param options Additional options for the operation, including optional cluster ID.
18731881
*/
18741882
AddRaftVoterResult addRaftVoter(
18751883
int voterId,
@@ -1894,9 +1902,16 @@ default RemoveRaftVoterResult removeRaftVoter(
18941902
/**
18951903
* Remove a voter node from the KRaft metadata quorum.
18961904
*
1905+
* <p>
1906+
* The clusterId in {@link AddRaftVoterOptions} is optional.
1907+
* If provided, the operation will only succeed if the cluster id matches the id
1908+
* of the current cluster. If the cluster id does not match, the operation
1909+
* will fail with {@link InconsistentClusterIdException}.
1910+
* If not provided, the cluster id check is skipped.
1911+
*
18971912
* @param voterId The node ID of the voter.
18981913
* @param voterDirectoryId The directory ID of the voter.
1899-
* @param options The options to use when removing the voter node.
1914+
* @param options Additional options for the operation, including optional cluster ID.
19001915
*/
19011916
RemoveRaftVoterResult removeRaftVoter(
19021917
int voterId,

clients/src/main/java/org/apache/kafka/clients/admin/RemoveRaftVoterOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,20 @@
1717
package org.apache.kafka.clients.admin;
1818

1919
import org.apache.kafka.common.annotation.InterfaceStability;
20+
import org.apache.kafka.common.protocol.Errors;
2021

2122
import java.util.Optional;
2223

2324
/**
2425
* Options for {@link Admin#removeRaftVoter}.
26+
*
27+
* <p>
28+
* The clusterId is optional.
29+
* <p>
30+
* If provided, the request will only succeed if the cluster id matches the id of the current cluster.
31+
* If the cluster id does not match, the request will fail with {@link Errors#INCONSISTENT_CLUSTER_ID}.
32+
* <p>
33+
* If not provided, the cluster id check is skipped.
2534
*/
2635
@InterfaceStability.Stable
2736
public class RemoveRaftVoterOptions extends AbstractOptions<RemoveRaftVoterOptions> {

0 commit comments

Comments
 (0)