15
15
* limitations under the License.
16
16
*/
17
17
18
- package kafka .server ;
18
+ package org . apache . kafka .server ;
19
19
20
+ import org .apache .kafka .clients .admin .AddRaftVoterOptions ;
20
21
import org .apache .kafka .clients .admin .Admin ;
21
22
import org .apache .kafka .clients .admin .FeatureMetadata ;
22
23
import org .apache .kafka .clients .admin .QuorumInfo ;
23
24
import org .apache .kafka .clients .admin .RaftVoterEndpoint ;
25
+ import org .apache .kafka .clients .admin .RemoveRaftVoterOptions ;
24
26
import org .apache .kafka .common .Uuid ;
27
+ import org .apache .kafka .common .errors .InconsistentClusterIdException ;
25
28
import org .apache .kafka .common .test .KafkaClusterTestKit ;
26
29
import org .apache .kafka .common .test .TestKitNodes ;
27
30
import org .apache .kafka .common .test .api .TestKitDefaults ;
28
31
import org .apache .kafka .raft .QuorumConfig ;
29
32
import org .apache .kafka .server .common .KRaftVersion ;
30
33
import org .apache .kafka .test .TestUtils ;
31
34
35
+ import org .junit .jupiter .api .Tag ;
32
36
import org .junit .jupiter .api .Test ;
33
37
34
38
import java .util .HashMap ;
35
39
import java .util .Map ;
40
+ import java .util .Optional ;
36
41
import java .util .Set ;
37
42
import java .util .TreeMap ;
38
43
41
46
import static org .junit .jupiter .api .Assertions .assertNotEquals ;
42
47
import static org .junit .jupiter .api .Assertions .assertTrue ;
43
48
49
+ @ Tag ("integration" )
44
50
public class ReconfigurableQuorumIntegrationTest {
45
51
static void checkKRaftVersions (Admin admin , short finalized ) throws Exception {
46
52
FeatureMetadata featureMetadata = admin .describeFeatures ().featureMetadata ().get ();
@@ -70,7 +76,7 @@ public void testCreateAndDestroyNonReconfigurableCluster() throws Exception {
70
76
).build ()) {
71
77
cluster .format ();
72
78
cluster .startup ();
73
- try (Admin admin = Admin .create (cluster .clientProperties ())) {
79
+ try (var admin = Admin .create (cluster .clientProperties ())) {
74
80
TestUtils .retryOnExceptionWithTimeout (30_000 , () -> {
75
81
checkKRaftVersions (admin , KRaftVersion .KRAFT_VERSION_0 .featureLevel ());
76
82
});
@@ -88,7 +94,7 @@ public void testCreateAndDestroyReconfigurableCluster() throws Exception {
88
94
).setStandalone (true ).build ()) {
89
95
cluster .format ();
90
96
cluster .startup ();
91
- try (Admin admin = Admin .create (cluster .clientProperties ())) {
97
+ try (var admin = Admin .create (cluster .clientProperties ())) {
92
98
TestUtils .retryOnExceptionWithTimeout (30_000 , () -> {
93
99
checkKRaftVersions (admin , KRaftVersion .KRAFT_VERSION_1 .featureLevel ());
94
100
});
@@ -126,7 +132,7 @@ public void testRemoveController() throws Exception {
126
132
) {
127
133
cluster .format ();
128
134
cluster .startup ();
129
- try (Admin admin = Admin .create (cluster .clientProperties ())) {
135
+ try (var admin = Admin .create (cluster .clientProperties ())) {
130
136
TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
131
137
Map <Integer , Uuid > voters = findVoterDirs (admin );
132
138
assertEquals (Set .of (3000 , 3001 , 3002 ), voters .keySet ());
@@ -161,7 +167,7 @@ public void testRemoveAndAddSameController() throws Exception {
161
167
) {
162
168
cluster .format ();
163
169
cluster .startup ();
164
- try (Admin admin = Admin .create (cluster .clientProperties ())) {
170
+ try (var admin = Admin .create (cluster .clientProperties ())) {
165
171
TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
166
172
Map <Integer , Uuid > voters = findVoterDirs (admin );
167
173
assertEquals (Set .of (3000 , 3001 , 3002 , 3003 ), voters .keySet ());
@@ -200,7 +206,7 @@ public void testControllersAutoJoinStandaloneVoter() throws Exception {
200
206
) {
201
207
cluster .format ();
202
208
cluster .startup ();
203
- try (Admin admin = Admin .create (cluster .clientProperties ())) {
209
+ try (var admin = Admin .create (cluster .clientProperties ())) {
204
210
TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
205
211
Map <Integer , Uuid > voters = findVoterDirs (admin );
206
212
assertEquals (Set .of (3000 , 3001 , 3002 ), voters .keySet ());
@@ -238,7 +244,7 @@ public void testNewVoterAutoRemovesAndAdds() throws Exception {
238
244
) {
239
245
cluster .format ();
240
246
cluster .startup ();
241
- try (Admin admin = Admin .create (cluster .clientProperties ())) {
247
+ try (var admin = Admin .create (cluster .clientProperties ())) {
242
248
TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
243
249
Map <Integer , Uuid > voters = findVoterDirs (admin );
244
250
assertEquals (Set .of (3000 , 3001 , 3002 ), voters .keySet ());
@@ -249,4 +255,95 @@ public void testNewVoterAutoRemovesAndAdds() throws Exception {
249
255
}
250
256
}
251
257
}
252
- }
258
+
259
+ @ Test
260
+ public void testRemoveAndAddVoterWithValidClusterId () throws Exception {
261
+ final var nodes = new TestKitNodes .Builder ()
262
+ .setClusterId ("test-cluster" )
263
+ .setNumBrokerNodes (1 )
264
+ .setNumControllerNodes (3 )
265
+ .build ();
266
+
267
+ final Map <Integer , Uuid > initialVoters = new HashMap <>();
268
+ for (final var controllerNode : nodes .controllerNodes ().values ()) {
269
+ initialVoters .put (
270
+ controllerNode .id (),
271
+ controllerNode .metadataDirectoryId ()
272
+ );
273
+ }
274
+
275
+ try (var cluster = new KafkaClusterTestKit .Builder (nodes ).setInitialVoterSet (initialVoters ).build ()) {
276
+ cluster .format ();
277
+ cluster .startup ();
278
+ try (var admin = Admin .create (cluster .clientProperties ())) {
279
+ TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
280
+ Map <Integer , Uuid > voters = findVoterDirs (admin );
281
+ assertEquals (Set .of (3000 , 3001 , 3002 ), voters .keySet ());
282
+ for (int replicaId : new int [] {3000 , 3001 , 3002 }) {
283
+ assertNotEquals (Uuid .ZERO_UUID , voters .get (replicaId ));
284
+ }
285
+ });
286
+
287
+ Uuid dirId = cluster .nodes ().controllerNodes ().get (3000 ).metadataDirectoryId ();
288
+ admin .removeRaftVoter (
289
+ 3000 ,
290
+ dirId ,
291
+ new RemoveRaftVoterOptions ().setClusterId (Optional .of ("test-cluster" ))
292
+ ).all ().get ();
293
+ TestUtils .retryOnExceptionWithTimeout (30_000 , 10 , () -> {
294
+ Map <Integer , Uuid > voters = findVoterDirs (admin );
295
+ assertEquals (Set .of (3001 , 3002 ), voters .keySet ());
296
+ for (int replicaId : new int [] {3001 , 3002 }) {
297
+ assertNotEquals (Uuid .ZERO_UUID , voters .get (replicaId ));
298
+ }
299
+ });
300
+
301
+ admin .addRaftVoter (
302
+ 3000 ,
303
+ dirId ,
304
+ Set .of (new RaftVoterEndpoint ("CONTROLLER" , "example.com" , 8080 )),
305
+ new AddRaftVoterOptions ().setClusterId (Optional .of ("test-cluster" ))
306
+ ).all ().get ();
307
+ }
308
+ }
309
+ }
310
+
311
+ @ Test
312
+ public void testRemoveAndAddVoterWithInconsistentClusterId () throws Exception {
313
+ final var nodes = new TestKitNodes .Builder ()
314
+ .setClusterId ("test-cluster" )
315
+ .setNumBrokerNodes (1 )
316
+ .setNumControllerNodes (3 )
317
+ .build ();
318
+
319
+ final Map <Integer , Uuid > initialVoters = new HashMap <>();
320
+ for (final var controllerNode : nodes .controllerNodes ().values ()) {
321
+ initialVoters .put (
322
+ controllerNode .id (),
323
+ controllerNode .metadataDirectoryId ()
324
+ );
325
+ }
326
+
327
+ try (var cluster = new KafkaClusterTestKit .Builder (nodes ).setInitialVoterSet (initialVoters ).build ()) {
328
+ cluster .format ();
329
+ cluster .startup ();
330
+ try (var admin = Admin .create (cluster .clientProperties ())) {
331
+ Uuid dirId = cluster .nodes ().controllerNodes ().get (3000 ).metadataDirectoryId ();
332
+ var removeFuture = admin .removeRaftVoter (
333
+ 3000 ,
334
+ dirId ,
335
+ new RemoveRaftVoterOptions ().setClusterId (Optional .of ("inconsistent" ))
336
+ ).all ();
337
+ TestUtils .assertFutureThrows (InconsistentClusterIdException .class , removeFuture );
338
+
339
+ var addFuture = admin .addRaftVoter (
340
+ 3000 ,
341
+ dirId ,
342
+ Set .of (new RaftVoterEndpoint ("CONTROLLER" , "example.com" , 8080 )),
343
+ new AddRaftVoterOptions ().setClusterId (Optional .of ("inconsistent" ))
344
+ ).all ();
345
+ TestUtils .assertFutureThrows (InconsistentClusterIdException .class , addFuture );
346
+ }
347
+ }
348
+ }
349
+ }
0 commit comments