diff --git a/clients/src/main/java/org/apache/kafka/common/ElectionType.java b/clients/src/main/java/org/apache/kafka/common/ElectionType.java index 8de7af65e7baf..891c0d340350c 100644 --- a/clients/src/main/java/org/apache/kafka/common/ElectionType.java +++ b/clients/src/main/java/org/apache/kafka/common/ElectionType.java @@ -24,7 +24,7 @@ * Options for {@link org.apache.kafka.clients.admin.Admin#electLeaders(ElectionType, Set, org.apache.kafka.clients.admin.ElectLeadersOptions)}. */ public enum ElectionType { - PREFERRED((byte) 0), UNCLEAN((byte) 1); + PREFERRED((byte) 0), UNCLEAN((byte) 1), DESIGNATED((byte) 2); public final byte value; @@ -37,6 +37,8 @@ public static ElectionType valueOf(byte value) { return PREFERRED; } else if (value == UNCLEAN.value) { return UNCLEAN; + } else if (value == DESIGNATED.value) { + return DESIGNATED; } else { throw new IllegalArgumentException( String.format("Value %s must be one of %s", value, Arrays.asList(ElectionType.values()))); diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java index 7c8fe79e91193..42beae0b492cf 100644 --- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java @@ -22,7 +22,7 @@ /** * A topic name and partition number */ -public final class TopicPartition implements Serializable { +public class TopicPartition implements Serializable { private static final long serialVersionUID = -613627415771699627L; private int hash = 0; diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionDesignated.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionDesignated.java new file mode 100644 index 0000000000000..ee73dedfc914c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionDesignated.java @@ -0,0 +1,17 @@ +package org.apache.kafka.common; + +public class TopicPartitionDesignated extends TopicPartition{ + + private int designatedLeader = -1; + + public TopicPartitionDesignated(String topic, int partition) { + super(topic, partition); + } + + public void setDesignatedLeader(int designatedLeader) { + this.designatedLeader = designatedLeader; + } + + public int getDesignatedLeader(){ return designatedLeader; } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DesignatedLeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/DesignatedLeaderNotAvailableException.java new file mode 100644 index 0000000000000..89889db742624 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/DesignatedLeaderNotAvailableException.java @@ -0,0 +1,11 @@ +package org.apache.kafka.common.errors; + +public class DesignatedLeaderNotAvailableException extends InvalidMetadataException { + public DesignatedLeaderNotAvailableException(String message) { + super(message); + } + + public DesignatedLeaderNotAvailableException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index a27a7fcf23c77..4907f2f5ae4c3 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.errors.DelegationTokenExpiredException; import org.apache.kafka.common.errors.DelegationTokenNotFoundException; import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException; +import org.apache.kafka.common.errors.DesignatedLeaderNotAvailableException; import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException; import org.apache.kafka.common.errors.DuplicateResourceException; import org.apache.kafka.common.errors.DuplicateSequenceException; @@ -418,8 +419,8 @@ public enum Errors { STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new), STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new), STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new), - SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new); - + SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new), + DESIGNATED_LEADER_NOT_AVAILABLE(134, "Supplied designated leader was not available", DesignatedLeaderNotAvailableException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); private static final Map, Errors> CLASS_TO_ERROR = new HashMap<>(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java index 8ed9cb676717f..55a19a3636c6c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionDesignated; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions; @@ -78,6 +79,13 @@ private ElectLeadersRequestData toRequestData(short version) { data.topicPartitions().add(tps); } tps.partitions().add(tp.partition()); + if (version >= 3 && tp instanceof TopicPartitionDesignated && electionType == ElectionType.DESIGNATED) { + tps.designatedLeaders().add(((TopicPartitionDesignated) tp).getDesignatedLeader()); + if (tps.designatedLeaders().size() != tps.partitions().size()) { + throw new IllegalStateException("Both desiredLeaders and partitions must be the same size " + tps.designatedLeaders().size() + " " + tps.partitions().size()); + } + } + }); } else { data.setTopicPartitions(null); diff --git a/clients/src/main/resources/common/message/ElectLeadersRequest.json b/clients/src/main/resources/common/message/ElectLeadersRequest.json index bce04585a70e2..8231e6bc342eb 100644 --- a/clients/src/main/resources/common/message/ElectLeadersRequest.json +++ b/clients/src/main/resources/common/message/ElectLeadersRequest.json @@ -21,7 +21,9 @@ // Version 1 implements multiple leader election types, as described by KIP-460. // // Version 2 is the first flexible version. - "validVersions": "0-2", + // + // Version 3 implements designatedLeaders, as described by KIP-966 + "validVersions": "0-3", "flexibleVersions": "2+", "fields": [ { "name": "ElectionType", "type": "int8", "versions": "1+", @@ -32,7 +34,9 @@ { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true, "about": "The name of a topic." }, { "name": "Partitions", "type": "[]int32", "versions": "0+", - "about": "The partitions of this topic whose leader should be elected." } + "about": "The partitions of this topic whose leader should be elected." }, + { "name": "DesignatedLeaders", "type": "[]int32", "versions": "3+", "nullableVersions": "3+", + "about": "The designated leaders only meant for designated elections. The entry should match with the entry in Partitions by the index." } ] }, { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", diff --git a/clients/src/main/resources/common/message/ElectLeadersResponse.json b/clients/src/main/resources/common/message/ElectLeadersResponse.json index 2da4982da9817..210b84c1dcb21 100644 --- a/clients/src/main/resources/common/message/ElectLeadersResponse.json +++ b/clients/src/main/resources/common/message/ElectLeadersResponse.json @@ -20,7 +20,9 @@ // Version 1 adds a top-level error code. // // Version 2 is the first flexible version. - "validVersions": "0-2", + // + // Version 3 implements designatedLeaders, as described by KIP-966 + "validVersions": "0-3", "flexibleVersions": "2+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java index 8697ad009620b..c3290326b0b93 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java +++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java @@ -77,7 +77,11 @@ public enum Election { /** * Prefer replicas in the ISR but keep the partition online even if it requires picking a leader that is not in the ISR. */ - UNCLEAN + UNCLEAN, + /** + * Perform leader election for designated leader + */ + DESIGNATED } private final PartitionRegistration partition; @@ -98,6 +102,7 @@ public enum Election { private LeaderRecoveryState targetLeaderRecoveryState; private boolean eligibleLeaderReplicasEnabled; private DefaultDirProvider defaultDirProvider; + private int designatedLeader; // Whether allow electing last known leader in a Balanced recovery. Note, the last known leader will be stored in the // lastKnownElr field if enabled. @@ -196,6 +201,11 @@ public PartitionChangeBuilder setDefaultDirProvider(DefaultDirProvider defaultDi return this; } + public PartitionChangeBuilder setDesignatedLeader(int designatedLeader) { + this.designatedLeader = designatedLeader; + return this; + } + // VisibleForTesting static class ElectionResult { final int node; @@ -220,11 +230,29 @@ public List targetIsr() { ElectionResult electLeader() { if (election == Election.PREFERRED) { return electPreferredLeader(); + } else if (election == Election.DESIGNATED) { + return electDesignatedLeader(); } - return electAnyLeader(); } + /** + * Assumes that the election type is Election.DESIGNATED + */ + private ElectionResult electDesignatedLeader() { + if (isValidNewLeader(designatedLeader)) { + return new ElectionResult(designatedLeader, false); + } + + // In this case, designatedLeader is outside ELR / ISR + // As such we designate the election as "unclean" + if (isAcceptableLeader.test(designatedLeader)) { + return new ElectionResult(designatedLeader, true); + } + + return new ElectionResult(NO_LEADER, false); + } + /** * Assumes that the election type is Election.PREFERRED */ diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index a1e93b3f10ff6..a221fa4acf145 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; +import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidPartitionsException; import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; import org.apache.kafka.common.errors.InvalidReplicationFactorException; @@ -1500,10 +1501,44 @@ void handleDirectoriesOffline( } } - ControllerResult electLeaders(ElectLeadersRequestData request) { + private void electDesignatedLeaders(ElectLeadersRequestData request, ElectLeadersResponseData response, List records) { + ElectionType electionType = electionType(request.electionType()); + if (request.topicPartitions() == null) { + // TODO Decide on whether we should have designated leadership election + throw new InvalidConfigurationException("No topic partitions specified for designated leader election."); + } + + // Show a simpler "all-or-nothing" approach to clients. + // We don't want to trigger any elections if there is a potential administrative error like having wrong number + // of designated-leaders for some topic... + for (TopicPartitions topic : request.topicPartitions()) { + if (topic.designatedLeaders().size() != topic.partitions().size()) { + log.info("Invalid designated leaders: in topic={} {} partitions were provided but only {} designated leaders were specified", + topic.topic(), topic.partitions().size(), topic.designatedLeaders().size()); + throw new InvalidConfigurationException("Mismatch between size of topic partitions and designated leaders."); + } + } + for (TopicPartitions topic : request.topicPartitions()) { + ReplicaElectionResult topicResults = + new ReplicaElectionResult().setTopic(topic.topic()); + response.replicaElectionResults().add(topicResults); + + Iterator partitions = topic.partitions().iterator(); + Iterator designatedLeaders = topic.designatedLeaders().iterator(); + while (partitions.hasNext() && designatedLeaders.hasNext()) { + Integer partitionId = partitions.next(); + Integer designatedLeaderId = designatedLeaders.next(); + ApiError error = electLeader(topic.topic(), partitionId, electionType, records, Optional.of(designatedLeaderId)); + topicResults.partitionResult().add(new PartitionResult(). + setPartitionId(partitionId). + setErrorCode(error.error().code()). + setErrorMessage(error.message())); + } + } + } + + private void electUncleanOrPreferredLeaders(ElectLeadersRequestData request, ElectLeadersResponseData response, List records) { ElectionType electionType = electionType(request.electionType()); - List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); - ElectLeadersResponseData response = new ElectLeadersResponseData(); if (request.topicPartitions() == null) { // If topicPartitions is null, we try to elect a new leader for every partition. There // are some obvious issues with this wire protocol. For example, what if we have too @@ -1513,38 +1548,47 @@ ControllerResult electLeaders(ElectLeadersRequestData for (Entry topicEntry : topicsByName.entrySet()) { String topicName = topicEntry.getKey(); ReplicaElectionResult topicResults = - new ReplicaElectionResult().setTopic(topicName); + new ReplicaElectionResult().setTopic(topicName); response.replicaElectionResults().add(topicResults); TopicControlInfo topic = topics.get(topicEntry.getValue()); if (topic != null) { for (int partitionId : topic.parts.keySet()) { - ApiError error = electLeader(topicName, partitionId, electionType, records); - + ApiError error = electLeader(topicName, partitionId, electionType, records, Optional.empty()); // When electing leaders for all partitions, we do not return // partitions which already have the desired leader. if (error.error() != Errors.ELECTION_NOT_NEEDED) { topicResults.partitionResult().add(new PartitionResult(). - setPartitionId(partitionId). - setErrorCode(error.error().code()). - setErrorMessage(error.message())); + setPartitionId(partitionId). + setErrorCode(error.error().code()). + setErrorMessage(error.message())); } } } } - } else { - for (TopicPartitions topic : request.topicPartitions()) { - ReplicaElectionResult topicResults = + return; + } + for (TopicPartitions topic : request.topicPartitions()) { + ReplicaElectionResult topicResults = new ReplicaElectionResult().setTopic(topic.topic()); - response.replicaElectionResults().add(topicResults); - for (int partitionId : topic.partitions()) { - ApiError error = electLeader(topic.topic(), partitionId, electionType, records); - topicResults.partitionResult().add(new PartitionResult(). + response.replicaElectionResults().add(topicResults); + for (int partitionId : topic.partitions()) { + ApiError error = electLeader(topic.topic(), partitionId, electionType, records, Optional.empty()); + topicResults.partitionResult().add(new PartitionResult(). setPartitionId(partitionId). setErrorCode(error.error().code()). setErrorMessage(error.message())); - } } } + } + + ControllerResult electLeaders(ElectLeadersRequestData request) { + var electionType = electionType(request.electionType()); + var response = new ElectLeadersResponseData(); + List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); + switch (electionType) { + case UNCLEAN, PREFERRED -> electUncleanOrPreferredLeaders(request, response, records); + case DESIGNATED -> electDesignatedLeaders(request, response, records); + } return ControllerResult.of(records, response); } @@ -1557,7 +1601,7 @@ private static ElectionType electionType(byte electionType) { } ApiError electLeader(String topic, int partitionId, ElectionType electionType, - List records) { + List records, Optional designatedLeader) { Uuid topicId = topicsByName.get(topic); if (topicId == null) { return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, @@ -1573,15 +1617,28 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such partition as " + topic + "-" + partitionId); } - if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader()) - || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) { - return new ApiError(Errors.ELECTION_NOT_NEEDED); - } + // TODO Programmer error if this assertion crashes + // Need to check whether assertions are kosher in this code base... + assert electionType != ElectionType.DESIGNATED || designatedLeader.isPresent(); - PartitionChangeBuilder.Election election = PartitionChangeBuilder.Election.PREFERRED; - if (electionType == ElectionType.UNCLEAN) { - election = PartitionChangeBuilder.Election.UNCLEAN; + switch (electionType) { + case PREFERRED -> { + if (partition.hasPreferredLeader()) { + return new ApiError(Errors.ELECTION_NOT_NEEDED); + } + } + case UNCLEAN, DESIGNATED -> { + if (partition.hasLeader()) { + return new ApiError(Errors.ELECTION_NOT_NEEDED); + } + } } + + PartitionChangeBuilder.Election election = switch (electionType) { + case UNCLEAN -> PartitionChangeBuilder.Election.UNCLEAN; + case DESIGNATED -> PartitionChangeBuilder.Election.DESIGNATED; + case PREFERRED -> PartitionChangeBuilder.Election.PREFERRED; + }; Optional record = new PartitionChangeBuilder( partition, topicId, @@ -1593,12 +1650,19 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, .setElection(election) .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()) .setDefaultDirProvider(clusterDescriber) + .setDesignatedLeader(designatedLeader.orElse(-1)) .build(); if (record.isEmpty()) { - if (electionType == ElectionType.PREFERRED) { - return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE); - } else { - return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE); + switch (electionType) { + case UNCLEAN -> { + return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE); + } + case PREFERRED -> { + return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE); + } + case DESIGNATED -> { + return new ApiError(Errors.DESIGNATED_LEADER_NOT_AVAILABLE); + } } } records.add(record.get()); @@ -1791,7 +1855,7 @@ void maybeTriggerUncleanLeaderElectionForLeaderlessPartitions( TopicControlInfo topic = topics.get(topicIdPartition.topicId()); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) { ApiError result = electLeader(topic.name, topicIdPartition.partitionId(), - ElectionType.UNCLEAN, records); + ElectionType.UNCLEAN, records, Optional.empty()); if (result.error().equals(Errors.NONE)) { log.info("Triggering unclean leader election for offline partition {}-{}.", topic.name, topicIdPartition.partitionId()); diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java index 312a207f8d74a..3b79e00755ff6 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java @@ -279,7 +279,14 @@ public void testElectLeader(short version) { 4, false ); - + assertElectLeaderEquals( + createFooBuilder(version).setElection(Election.DESIGNATED).setDesignatedLeader(2), 2, false); + assertElectLeaderEquals(createBazBuilder(version) + .setElection(Election.DESIGNATED) + .setDesignatedLeader(2), 2, true); + assertElectLeaderEquals(createBazBuilder(version) + .setElection(Election.DESIGNATED) + .setDesignatedLeader(1), 1, false); assertElectLeaderEquals(createBazBuilder(version).setElection(Election.PREFERRED), 3, false); assertElectLeaderEquals(createBazBuilder(version), 3, false); assertElectLeaderEquals(createBazBuilder(version).setElection(Election.UNCLEAN), 3, false); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 7024d0de3a23a..aa1ea028c2b57 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -147,6 +147,7 @@ import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID; import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION; +import static org.apache.kafka.common.protocol.Errors.DESIGNATED_LEADER_NOT_AVAILABLE; import static org.apache.kafka.controller.ControllerRequestContextUtil.QUOTA_EXCEEDED_IN_TEST_MSG; import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextFor; import static org.apache.kafka.controller.ControllerRequestContextUtil.anonymousContextWithMutationQuotaExceededFor; @@ -2767,6 +2768,62 @@ public void testElectPreferredLeaders() { election2Result.records()); } + @Test + public void testDesignatedLeaderElection() { + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); + ReplicationControlManager replication = ctx.replicationControl; + + ctx.registerBrokers(0, 1, 2, 3, 4); + ctx.unfenceBrokers(0, 1, 2, 3, 4); + + Uuid fooId = ctx.createTestTopic("foo", new int[][]{ + new int[]{0, 1, 2}, new int[]{1, 2, 3}, new int[]{2, 3, 4}}).topicId(); + + var t0 = new TopicIdPartition(fooId, 0); + var t1 = new TopicIdPartition(fooId, 1); + var t2 = new TopicIdPartition(fooId, 2); + + ctx.fenceBrokers(0, 1, 2); + + assertLeaderAndIsr(replication, t0, NO_LEADER, new int[] {2}); + assertLeaderAndIsr(replication, t1, 3, new int[] {3}); + assertLeaderAndIsr(replication, t2, 3, new int[] {3, 4}); + + ElectLeadersRequestData request1 = new ElectLeadersRequestData(). + setElectionType(ElectionType.DESIGNATED.value). + setTopicPartitions(new TopicPartitionsCollection(List.of( + new TopicPartitions().setTopic("foo"). + setPartitions(List.of(0, 1, 2)). + setDesignatedLeaders(List.of(1, 3, 3))).iterator())); + ControllerResult election1Result = + replication.electLeaders(request1); + ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, false, Utils.mkMap( + Utils.mkEntry( + new TopicPartition("foo", 0), + new ApiError(DESIGNATED_LEADER_NOT_AVAILABLE) + ), + Utils.mkEntry( + new TopicPartition("foo", 1), + new ApiError(ELECTION_NOT_NEEDED) + ), + Utils.mkEntry( + new TopicPartition("foo", 2), + new ApiError(ELECTION_NOT_NEEDED) + ) + )); + assertElectLeadersResponse(expectedResponse1, election1Result.response()); + assertEquals(List.of(), election1Result.records()); + + ctx.unfenceBrokers(0); + + ElectLeadersRequestData request2 = new ElectLeadersRequestData(). + setElectionType(ElectionType.DESIGNATED.value). + setTopicPartitions(new TopicPartitionsCollection(List.of( + new TopicPartitions().setTopic("foo"). + setPartitions(List.of(0)). + setDesignatedLeaders(List.of(0))).iterator())); + } + @Test public void testBalancePartitionLeaders() { ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder().build(); diff --git a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java index 4a9a8d270ea5a..91c98b50ec6a9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.common.ElectionType; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionDesignated; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ElectionNotNeededException; import org.apache.kafka.common.errors.TimeoutException; @@ -86,7 +87,7 @@ static void run(Duration timeoutMs, String... args) throws Exception { ElectionType electionType = commandOptions.getElectionType(); Optional> jsonFileTopicPartitions = Optional.ofNullable(commandOptions.getPathToJsonFile()) - .map(LeaderElectionCommand::parseReplicaElectionData); + .map(path -> parseReplicaElectionData(electionType, path)); Optional topicOption = Optional.ofNullable(commandOptions.getTopic()); Optional partitionOption = Optional.ofNullable(commandOptions.getPartition()); @@ -194,13 +195,13 @@ private static void electLeaders(Admin client, ElectionType electionType, Option } } - private static Set parseReplicaElectionData(String path) { + private static Set parseReplicaElectionData(ElectionType electionType, String path) { Optional jsonFile; try { jsonFile = Json.parseFull(Utils.readFileAsString(path)); return jsonFile.map(js -> { try { - return topicPartitions(js); + return topicPartitions(electionType, js); } catch (JsonMappingException e) { throw new RuntimeException(e); } @@ -210,11 +211,11 @@ private static Set parseReplicaElectionData(String path) { } } - private static Set topicPartitions(JsonValue js) throws JsonMappingException { + private static Set topicPartitions(ElectionType electionType, JsonValue js) throws JsonMappingException { return js.asJsonObject().get("partitions") .map(partitionsList -> { try { - return toTopicPartition(partitionsList); + return toTopicPartition(electionType, partitionsList); } catch (JsonMappingException e) { throw new RuntimeException(e); } @@ -222,7 +223,7 @@ private static Set topicPartitions(JsonValue js) throws JsonMapp .orElseThrow(() -> new AdminOperationException("Replica election data is missing \"partitions\" field")); } - private static Set toTopicPartition(JsonValue partitionsList) throws JsonMappingException { + private static Set toTopicPartition(ElectionType electionType, JsonValue partitionsList) throws JsonMappingException { List partitions = new ArrayList<>(); Iterator iterator = partitionsList.asJsonArray().iterator(); @@ -230,7 +231,17 @@ private static Set toTopicPartition(JsonValue partitionsList) th JsonObject partitionJs = iterator.next().asJsonObject(); String topic = partitionJs.apply("topic").to(STRING); int partition = partitionJs.apply("partition").to(INT); - partitions.add(new TopicPartition(topic, partition)); + int designatedLeader = -1; + if(electionType == ElectionType.DESIGNATED ) { + if (!partitionJs.node().has("designatedLeader")) { + throw new RuntimeException("Election type is designated but no designated leader declared"); + } else { + designatedLeader = partitionJs.apply("designatedLeader").to(INT); + } + } + TopicPartitionDesignated topicPartition = new TopicPartitionDesignated(topic, partition); + topicPartition.setDesignatedLeader(designatedLeader); + partitions.add(topicPartition); } Set duplicatePartitions = partitions.stream() @@ -309,7 +320,7 @@ public LeaderElectionCommandOptions(String[] args) { electionType = parser .accepts( "election-type", - "Type of election to attempt. Possible values are \"preferred\" for preferred leader election or \"unclean\" for unclean leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.") + "Type of election to attempt. Possible values are \"preferred\" for preferred leader election, \"unclean\" for unclean leader election, and \"designated\" for designated leader election. If preferred election is selection, the election is only performed if the current leader is not the preferred leader for the topic partition. If unclean election is selected, the election is only performed if there are no leader for the topic partition. REQUIRED.If designated election is selected, the election is the current leader is not the designated leader") .withRequiredArg() .describedAs("election type") .withValuesConvertedBy(new ElectionTypeConverter()); @@ -399,7 +410,7 @@ public void validate() { public void maybePrintHelpOrVersion() { CommandLineUtils.maybePrintHelpOrVersion( this, - "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas and unclean replicas." + "This tool attempts to elect a new leader for a set of topic partitions. The type of elections supported are preferred replicas, unclean replicas, and designated replicas." ); } diff --git a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java index 2e3ebf6a7bd08..8f08969d7154e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionDesignated; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfigProperty; @@ -251,6 +252,48 @@ public void testPathToJsonFile() throws Exception { } } + @ClusterTest + public void testDesignatedReplicaElection() throws Exception { + if (cluster.type().toString() == "ZK") { + return; + } + String topic = "designated-topic"; + int partition = 0; + List assignment = Arrays.asList(broker2, broker3); + + cluster.waitForReadyBrokers(); + Admin client = cluster.createAdminClient(); + Map> partitionAssignment = new HashMap<>(); + partitionAssignment.put(partition, assignment); + + createTopic(client, topic, partitionAssignment); + + TopicPartitionDesignated topicPartition = new TopicPartitionDesignated(topic, partition); + topicPartition.setDesignatedLeader(1); + Path topicPartitionPath = tempTopicPartitionFile(Collections.singletonList(topicPartition), true); + + TestUtils.assertLeader(client, topicPartition, broker2); + + cluster.shutdownBroker(broker2); + TestUtils.waitForBrokersOutOfIsr(client, + JavaConverters.asScalaBuffer(Collections.singletonList(topicPartition)).toSet(), + JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet() + ); + TestUtils.assertLeader(client, topicPartition, broker3); + cluster.startBroker(broker2); + TestUtils.waitForBrokersInIsr(client, topicPartition, + JavaConverters.asScalaBuffer(Collections.singletonList(broker2)).toSet() + ); + + LeaderElectionCommand.mainNoExit( + "--bootstrap-server", cluster.bootstrapServers(), + "--election-type", "designated", + "--path-to-json-file", topicPartitionPath.toString() + ); + + TestUtils.assertLeader(client, topicPartition, broker2); + } + @ClusterTest public void testPreferredReplicaElection() throws InterruptedException, ExecutionException { String topic = "preferred-topic";