Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* Options for {@link org.apache.kafka.clients.admin.Admin#electLeaders(ElectionType, Set, org.apache.kafka.clients.admin.ElectLeadersOptions)}.
*/
public enum ElectionType {
PREFERRED((byte) 0), UNCLEAN((byte) 1);
PREFERRED((byte) 0), UNCLEAN((byte) 1), DESIGNATED((byte) 2);

public final byte value;

Expand All @@ -37,6 +37,8 @@ public static ElectionType valueOf(byte value) {
return PREFERRED;
} else if (value == UNCLEAN.value) {
return UNCLEAN;
} else if (value == DESIGNATED.value) {
return DESIGNATED;
} else {
throw new IllegalArgumentException(
String.format("Value %s must be one of %s", value, Arrays.asList(ElectionType.values())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
/**
* A topic name and partition number
*/
public final class TopicPartition implements Serializable {
public class TopicPartition implements Serializable {
private static final long serialVersionUID = -613627415771699627L;

private int hash = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.apache.kafka.common;

public class TopicPartitionDesignated extends TopicPartition{

private int designatedLeader = -1;

public TopicPartitionDesignated(String topic, int partition) {
super(topic, partition);
}

public void setDesignatedLeader(int designatedLeader) {
this.designatedLeader = designatedLeader;
}

public int getDesignatedLeader(){ return designatedLeader; }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.kafka.common.errors;

public class DesignatedLeaderNotAvailableException extends InvalidMetadataException {
public DesignatedLeaderNotAvailableException(String message) {
super(message);
}

public DesignatedLeaderNotAvailableException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
import org.apache.kafka.common.errors.DesignatedLeaderNotAvailableException;
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
import org.apache.kafka.common.errors.DuplicateResourceException;
import org.apache.kafka.common.errors.DuplicateSequenceException;
Expand Down Expand Up @@ -418,8 +419,8 @@ public enum Errors {
STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new),
STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new),
STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new),
SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new);

SHARE_SESSION_LIMIT_REACHED(133, "The limit of share sessions has been reached.", ShareSessionLimitReachedException::new),
DESIGNATED_LEADER_NOT_AVAILABLE(134, "Supplied designated leader was not available", DesignatedLeaderNotAvailableException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

private static final Map<Class<?>, Errors> CLASS_TO_ERROR = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionDesignated;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
Expand Down Expand Up @@ -78,6 +79,13 @@ private ElectLeadersRequestData toRequestData(short version) {
data.topicPartitions().add(tps);
}
tps.partitions().add(tp.partition());
if (version >= 3 && tp instanceof TopicPartitionDesignated && electionType == ElectionType.DESIGNATED) {
tps.designatedLeaders().add(((TopicPartitionDesignated) tp).getDesignatedLeader());
if (tps.designatedLeaders().size() != tps.partitions().size()) {
throw new IllegalStateException("Both desiredLeaders and partitions must be the same size " + tps.designatedLeaders().size() + " " + tps.partitions().size());
}
}

});
} else {
data.setTopicPartitions(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
// Version 1 implements multiple leader election types, as described by KIP-460.
//
// Version 2 is the first flexible version.
"validVersions": "0-2",
//
// Version 3 implements designatedLeaders, as described by KIP-966
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "ElectionType", "type": "int8", "versions": "1+",
Expand All @@ -32,7 +34,9 @@
{ "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "mapKey": true,
"about": "The name of a topic." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
"about": "The partitions of this topic whose leader should be elected." }
"about": "The partitions of this topic whose leader should be elected." },
{ "name": "DesignatedLeaders", "type": "[]int32", "versions": "3+", "nullableVersions": "3+",
"about": "The designated leaders only meant for designated elections. The entry should match with the entry in Partitions by the index." }
]
},
{ "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
// Version 1 adds a top-level error code.
//
// Version 2 is the first flexible version.
"validVersions": "0-2",
//
// Version 3 implements designatedLeaders, as described by KIP-966
"validVersions": "0-3",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ public enum Election {
/**
* Prefer replicas in the ISR but keep the partition online even if it requires picking a leader that is not in the ISR.
*/
UNCLEAN
UNCLEAN,
/**
* Perform leader election for designated leader
*/
DESIGNATED
}

private final PartitionRegistration partition;
Expand All @@ -98,6 +102,7 @@ public enum Election {
private LeaderRecoveryState targetLeaderRecoveryState;
private boolean eligibleLeaderReplicasEnabled;
private DefaultDirProvider defaultDirProvider;
private int designatedLeader;

// Whether allow electing last known leader in a Balanced recovery. Note, the last known leader will be stored in the
// lastKnownElr field if enabled.
Expand Down Expand Up @@ -196,6 +201,11 @@ public PartitionChangeBuilder setDefaultDirProvider(DefaultDirProvider defaultDi
return this;
}

public PartitionChangeBuilder setDesignatedLeader(int designatedLeader) {
this.designatedLeader = designatedLeader;
return this;
}

// VisibleForTesting
static class ElectionResult {
final int node;
Expand All @@ -220,11 +230,29 @@ public List<Integer> targetIsr() {
ElectionResult electLeader() {
if (election == Election.PREFERRED) {
return electPreferredLeader();
} else if (election == Election.DESIGNATED) {
return electDesignatedLeader();
}

return electAnyLeader();
}

/**
* Assumes that the election type is Election.DESIGNATED
*/
private ElectionResult electDesignatedLeader() {
if (isValidNewLeader(designatedLeader)) {
return new ElectionResult(designatedLeader, false);
}

// In this case, designatedLeader is outside ELR / ISR
// As such we designate the election as "unclean"
if (isAcceptableLeader.test(designatedLeader)) {
return new ElectionResult(designatedLeader, true);
}

return new ElectionResult(NO_LEADER, false);
}

/**
* Assumes that the election type is Election.PREFERRED
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
Expand Down Expand Up @@ -1500,10 +1501,44 @@ void handleDirectoriesOffline(
}
}

ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
private void electDesignatedLeaders(ElectLeadersRequestData request, ElectLeadersResponseData response, List<ApiMessageAndVersion> records) {
ElectionType electionType = electionType(request.electionType());
if (request.topicPartitions() == null) {
// TODO Decide on whether we should have designated leadership election
throw new InvalidConfigurationException("No topic partitions specified for designated leader election.");
}

// Show a simpler "all-or-nothing" approach to clients.
// We don't want to trigger any elections if there is a potential administrative error like having wrong number
// of designated-leaders for some topic...
for (TopicPartitions topic : request.topicPartitions()) {
if (topic.designatedLeaders().size() != topic.partitions().size()) {
log.info("Invalid designated leaders: in topic={} {} partitions were provided but only {} designated leaders were specified",
topic.topic(), topic.partitions().size(), topic.designatedLeaders().size());
throw new InvalidConfigurationException("Mismatch between size of topic partitions and designated leaders.");
}
}
for (TopicPartitions topic : request.topicPartitions()) {
ReplicaElectionResult topicResults =
new ReplicaElectionResult().setTopic(topic.topic());
response.replicaElectionResults().add(topicResults);

Iterator<Integer> partitions = topic.partitions().iterator();
Iterator<Integer> designatedLeaders = topic.designatedLeaders().iterator();
while (partitions.hasNext() && designatedLeaders.hasNext()) {
Integer partitionId = partitions.next();
Integer designatedLeaderId = designatedLeaders.next();
ApiError error = electLeader(topic.topic(), partitionId, electionType, records, Optional.of(designatedLeaderId));
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
}
}
}

private void electUncleanOrPreferredLeaders(ElectLeadersRequestData request, ElectLeadersResponseData response, List<ApiMessageAndVersion> records) {
ElectionType electionType = electionType(request.electionType());
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
ElectLeadersResponseData response = new ElectLeadersResponseData();
if (request.topicPartitions() == null) {
// If topicPartitions is null, we try to elect a new leader for every partition. There
// are some obvious issues with this wire protocol. For example, what if we have too
Expand All @@ -1513,38 +1548,47 @@ ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData
for (Entry<String, Uuid> topicEntry : topicsByName.entrySet()) {
String topicName = topicEntry.getKey();
ReplicaElectionResult topicResults =
new ReplicaElectionResult().setTopic(topicName);
new ReplicaElectionResult().setTopic(topicName);
response.replicaElectionResults().add(topicResults);
TopicControlInfo topic = topics.get(topicEntry.getValue());
if (topic != null) {
for (int partitionId : topic.parts.keySet()) {
ApiError error = electLeader(topicName, partitionId, electionType, records);

ApiError error = electLeader(topicName, partitionId, electionType, records, Optional.empty());
// When electing leaders for all partitions, we do not return
// partitions which already have the desired leader.
if (error.error() != Errors.ELECTION_NOT_NEEDED) {
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
setPartitionId(partitionId).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
}
}
}
}
} else {
for (TopicPartitions topic : request.topicPartitions()) {
ReplicaElectionResult topicResults =
return;
}
for (TopicPartitions topic : request.topicPartitions()) {
ReplicaElectionResult topicResults =
new ReplicaElectionResult().setTopic(topic.topic());
response.replicaElectionResults().add(topicResults);
for (int partitionId : topic.partitions()) {
ApiError error = electLeader(topic.topic(), partitionId, electionType, records);
topicResults.partitionResult().add(new PartitionResult().
response.replicaElectionResults().add(topicResults);
for (int partitionId : topic.partitions()) {
ApiError error = electLeader(topic.topic(), partitionId, electionType, records, Optional.empty());
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
setErrorMessage(error.message()));
}
}
}
}

ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
var electionType = electionType(request.electionType());
var response = new ElectLeadersResponseData();
List<ApiMessageAndVersion> records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
switch (electionType) {
case UNCLEAN, PREFERRED -> electUncleanOrPreferredLeaders(request, response, records);
case DESIGNATED -> electDesignatedLeaders(request, response, records);
}
return ControllerResult.of(records, response);
}

Expand All @@ -1557,7 +1601,7 @@ private static ElectionType electionType(byte electionType) {
}

ApiError electLeader(String topic, int partitionId, ElectionType electionType,
List<ApiMessageAndVersion> records) {
List<ApiMessageAndVersion> records, Optional<Integer> designatedLeader) {
Uuid topicId = topicsByName.get(topic);
if (topicId == null) {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
Expand All @@ -1573,15 +1617,28 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such partition as " + topic + "-" + partitionId);
}
if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
|| (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
return new ApiError(Errors.ELECTION_NOT_NEEDED);
}
// TODO Programmer error if this assertion crashes
// Need to check whether assertions are kosher in this code base...
assert electionType != ElectionType.DESIGNATED || designatedLeader.isPresent();

PartitionChangeBuilder.Election election = PartitionChangeBuilder.Election.PREFERRED;
if (electionType == ElectionType.UNCLEAN) {
election = PartitionChangeBuilder.Election.UNCLEAN;
switch (electionType) {
case PREFERRED -> {
if (partition.hasPreferredLeader()) {
return new ApiError(Errors.ELECTION_NOT_NEEDED);
}
}
case UNCLEAN, DESIGNATED -> {
if (partition.hasLeader()) {
return new ApiError(Errors.ELECTION_NOT_NEEDED);
}
}
}

PartitionChangeBuilder.Election election = switch (electionType) {
case UNCLEAN -> PartitionChangeBuilder.Election.UNCLEAN;
case DESIGNATED -> PartitionChangeBuilder.Election.DESIGNATED;
case PREFERRED -> PartitionChangeBuilder.Election.PREFERRED;
};
Optional<ApiMessageAndVersion> record = new PartitionChangeBuilder(
partition,
topicId,
Expand All @@ -1593,12 +1650,19 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
.setElection(election)
.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled())
.setDefaultDirProvider(clusterDescriber)
.setDesignatedLeader(designatedLeader.orElse(-1))
.build();
if (record.isEmpty()) {
if (electionType == ElectionType.PREFERRED) {
return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
} else {
return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE);
switch (electionType) {
case UNCLEAN -> {
return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE);
}
case PREFERRED -> {
return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
}
case DESIGNATED -> {
return new ApiError(Errors.DESIGNATED_LEADER_NOT_AVAILABLE);
}
}
}
records.add(record.get());
Expand Down Expand Up @@ -1791,7 +1855,7 @@ void maybeTriggerUncleanLeaderElectionForLeaderlessPartitions(
TopicControlInfo topic = topics.get(topicIdPartition.topicId());
if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
ApiError result = electLeader(topic.name, topicIdPartition.partitionId(),
ElectionType.UNCLEAN, records);
ElectionType.UNCLEAN, records, Optional.empty());
if (result.error().equals(Errors.NONE)) {
log.info("Triggering unclean leader election for offline partition {}-{}.",
topic.name, topicIdPartition.partitionId());
Expand Down
Loading