diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java deleted file mode 100644 index f02db36c0615e..0000000000000 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.admin; - -import org.apache.kafka.common.Uuid; -import org.apache.kafka.common.errors.InconsistentClusterIdException; -import org.apache.kafka.common.test.KafkaClusterTestKit; -import org.apache.kafka.common.test.TestKitNodes; -import org.apache.kafka.test.TestUtils; - -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -@Tag("integration") -public class ReconfigurableQuorumIntegrationTest { - - static Map descVoterDirs(Admin admin) throws ExecutionException, InterruptedException { - var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get(); - return quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId, QuorumInfo.ReplicaState::replicaDirectoryId)); - } - - @Test - public void testRemoveAndAddVoterWithValidClusterId() throws Exception { - final var nodes = new TestKitNodes.Builder() - .setClusterId("test-cluster") - .setNumBrokerNodes(1) - .setNumControllerNodes(3) - .build(); - - final Map initialVoters = new HashMap<>(); - for (final var controllerNode : nodes.controllerNodes().values()) { - initialVoters.put( - controllerNode.id(), - controllerNode.metadataDirectoryId() - ); - } - - try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) { - cluster.format(); - cluster.startup(); - try (Admin admin = Admin.create(cluster.clientProperties())) { - TestUtils.waitForCondition(() -> { - Map voters = descVoterDirs(admin); - assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); - return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID)); - }, "Initial quorum voters should be {3000, 3001, 3002} and all should have non-zero directory IDs"); - - Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId(); - admin.removeRaftVoter( - 3000, - dirId, - new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster")) - ).all().get(); - TestUtils.waitForCondition(() -> { - Map voters = descVoterDirs(admin); - assertEquals(Set.of(3001, 3002), voters.keySet()); - return voters.values().stream().noneMatch(directory -> directory.equals(Uuid.ZERO_UUID)); - }, "After removing voter 3000, remaining voters should be {3001, 3002} with non-zero directory IDs"); - - admin.addRaftVoter( - 3000, - dirId, - Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)), - new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster")) - ).all().get(); - } - } - } - - @Test - public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception { - final var nodes = new TestKitNodes.Builder() - .setClusterId("test-cluster") - .setNumBrokerNodes(1) - .setNumControllerNodes(3) - .build(); - - final Map initialVoters = new HashMap<>(); - for (final var controllerNode : nodes.controllerNodes().values()) { - initialVoters.put( - controllerNode.id(), - controllerNode.metadataDirectoryId() - ); - } - - try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) { - cluster.format(); - cluster.startup(); - try (Admin admin = Admin.create(cluster.clientProperties())) { - Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId(); - var removeFuture = admin.removeRaftVoter( - 3000, - dirId, - new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent")) - ).all(); - TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture); - - var addFuture = admin.addRaftVoter( - 3000, - dirId, - Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)), - new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent")) - ).all(); - TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture); - } - } - } -} diff --git a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java similarity index 69% rename from core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java rename to server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java index c67e941dd7a82..ad30708d7c5db 100644 --- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java +++ b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java @@ -15,13 +15,16 @@ * limitations under the License. */ -package kafka.server; +package org.apache.kafka.server; +import org.apache.kafka.clients.admin.AddRaftVoterOptions; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.FeatureMetadata; import org.apache.kafka.clients.admin.QuorumInfo; import org.apache.kafka.clients.admin.RaftVoterEndpoint; +import org.apache.kafka.clients.admin.RemoveRaftVoterOptions; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentClusterIdException; import org.apache.kafka.common.test.KafkaClusterTestKit; import org.apache.kafka.common.test.TestKitNodes; import org.apache.kafka.common.test.api.TestKitDefaults; @@ -29,10 +32,12 @@ import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; @@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +@Tag("integration") public class ReconfigurableQuorumIntegrationTest { static void checkKRaftVersions(Admin admin, short finalized) throws Exception { FeatureMetadata featureMetadata = admin.describeFeatures().featureMetadata().get(); @@ -70,7 +76,7 @@ public void testCreateAndDestroyNonReconfigurableCluster() throws Exception { ).build()) { cluster.format(); cluster.startup(); - try (Admin admin = Admin.create(cluster.clientProperties())) { + try (var admin = Admin.create(cluster.clientProperties())) { TestUtils.retryOnExceptionWithTimeout(30_000, () -> { checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_0.featureLevel()); }); @@ -88,7 +94,7 @@ public void testCreateAndDestroyReconfigurableCluster() throws Exception { ).setStandalone(true).build()) { cluster.format(); cluster.startup(); - try (Admin admin = Admin.create(cluster.clientProperties())) { + try (var admin = Admin.create(cluster.clientProperties())) { TestUtils.retryOnExceptionWithTimeout(30_000, () -> { checkKRaftVersions(admin, KRaftVersion.KRAFT_VERSION_1.featureLevel()); }); @@ -126,7 +132,7 @@ public void testRemoveController() throws Exception { ) { cluster.format(); cluster.startup(); - try (Admin admin = Admin.create(cluster.clientProperties())) { + try (var admin = Admin.create(cluster.clientProperties())) { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { Map voters = findVoterDirs(admin); assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); @@ -161,7 +167,7 @@ public void testRemoveAndAddSameController() throws Exception { ) { cluster.format(); cluster.startup(); - try (Admin admin = Admin.create(cluster.clientProperties())) { + try (var admin = Admin.create(cluster.clientProperties())) { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { Map voters = findVoterDirs(admin); assertEquals(Set.of(3000, 3001, 3002, 3003), voters.keySet()); @@ -200,7 +206,7 @@ public void testControllersAutoJoinStandaloneVoter() throws Exception { ) { cluster.format(); cluster.startup(); - try (Admin admin = Admin.create(cluster.clientProperties())) { + try (var admin = Admin.create(cluster.clientProperties())) { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { Map voters = findVoterDirs(admin); assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); @@ -238,7 +244,7 @@ public void testNewVoterAutoRemovesAndAdds() throws Exception { ) { cluster.format(); cluster.startup(); - try (Admin admin = Admin.create(cluster.clientProperties())) { + try (var admin = Admin.create(cluster.clientProperties())) { TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { Map voters = findVoterDirs(admin); assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); @@ -249,4 +255,95 @@ public void testNewVoterAutoRemovesAndAdds() throws Exception { } } } -} + + @Test + public void testRemoveAndAddVoterWithValidClusterId() throws Exception { + final var nodes = new TestKitNodes.Builder() + .setClusterId("test-cluster") + .setNumBrokerNodes(1) + .setNumControllerNodes(3) + .build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) { + cluster.format(); + cluster.startup(); + try (var admin = Admin.create(cluster.clientProperties())) { + TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { + Map voters = findVoterDirs(admin); + assertEquals(Set.of(3000, 3001, 3002), voters.keySet()); + for (int replicaId : new int[] {3000, 3001, 3002}) { + assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId)); + } + }); + + Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId(); + admin.removeRaftVoter( + 3000, + dirId, + new RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster")) + ).all().get(); + TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> { + Map voters = findVoterDirs(admin); + assertEquals(Set.of(3001, 3002), voters.keySet()); + for (int replicaId : new int[] {3001, 3002}) { + assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId)); + } + }); + + admin.addRaftVoter( + 3000, + dirId, + Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)), + new AddRaftVoterOptions().setClusterId(Optional.of("test-cluster")) + ).all().get(); + } + } + } + + @Test + public void testRemoveAndAddVoterWithInconsistentClusterId() throws Exception { + final var nodes = new TestKitNodes.Builder() + .setClusterId("test-cluster") + .setNumBrokerNodes(1) + .setNumControllerNodes(3) + .build(); + + final Map initialVoters = new HashMap<>(); + for (final var controllerNode : nodes.controllerNodes().values()) { + initialVoters.put( + controllerNode.id(), + controllerNode.metadataDirectoryId() + ); + } + + try (var cluster = new KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) { + cluster.format(); + cluster.startup(); + try (var admin = Admin.create(cluster.clientProperties())) { + Uuid dirId = cluster.nodes().controllerNodes().get(3000).metadataDirectoryId(); + var removeFuture = admin.removeRaftVoter( + 3000, + dirId, + new RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent")) + ).all(); + TestUtils.assertFutureThrows(InconsistentClusterIdException.class, removeFuture); + + var addFuture = admin.addRaftVoter( + 3000, + dirId, + Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 8080)), + new AddRaftVoterOptions().setClusterId(Optional.of("inconsistent")) + ).all(); + TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture); + } + } + } +} \ No newline at end of file