Skip to content

Commit 162db13

Browse files
MINOR: simplify last known elr update (#20629)
Simplify the last known elr update logic. This way can make a more robust logic. Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent f68a149 commit 162db13

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -494,12 +494,10 @@ private void setAssignmentChanges(PartitionChangeRecord record) {
494494

495495
private void maybeUpdateLastKnownLeader(PartitionChangeRecord record) {
496496
if (!useLastKnownLeaderInBalancedRecovery || !eligibleLeaderReplicasEnabled) return;
497-
if (record.isr() != null && record.isr().isEmpty() && (partition.lastKnownElr.length != 1 ||
498-
partition.lastKnownElr[0] != partition.leader)) {
497+
if (record.leader() == NO_LEADER && partition.lastKnownElr.length == 0) {
499498
// Only update the last known leader when the first time the partition becomes leaderless.
500499
record.setLastKnownElr(List.of(partition.leader));
501-
} else if ((record.leader() >= 0 || (partition.leader != NO_LEADER && record.leader() != NO_LEADER))
502-
&& partition.lastKnownElr.length > 0) {
500+
} else if (record.leader() >= 0 && partition.lastKnownElr.length > 0) {
503501
// Clear the LastKnownElr field if the partition will have or continues to have a valid leader.
504502
record.setLastKnownElr(List.of());
505503
}

metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,48 @@ public void testEligibleLeaderReplicas_IsrShrinkBelowMinISR(short version) {
822822
}
823823
}
824824

825+
@Test
826+
public void testEligibleLeaderReplicas_lastKnownElrShouldBePopulatedWhenNoLeader() {
827+
PartitionRegistration partition = new PartitionRegistration.Builder()
828+
.setReplicas(new int[] {1, 2, 3})
829+
.setDirectories(new Uuid[] {
830+
DirectoryId.UNASSIGNED,
831+
DirectoryId.UNASSIGNED,
832+
DirectoryId.UNASSIGNED
833+
})
834+
.setIsr(new int[] {1})
835+
.setElr(new int[] {2})
836+
.setLeader(1)
837+
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED)
838+
.setLeaderEpoch(100)
839+
.setPartitionEpoch(200)
840+
.build();
841+
842+
short version = 2; // ELR supported
843+
Uuid topicId = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
844+
845+
// No replica is acceptable as leader, so election yields NO_LEADER.
846+
// We intentionally do not change target ISR so record.isr remains null.
847+
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, topicId, 0, r -> false,
848+
metadataVersionForPartitionChangeRecordVersion(version), 3)
849+
.setElection(Election.PREFERRED)
850+
.setEligibleLeaderReplicasEnabled(isElrEnabled(version))
851+
.setDefaultDirProvider(DEFAULT_DIR_PROVIDER)
852+
.setUseLastKnownLeaderInBalancedRecovery(true);
853+
854+
ApiMessageAndVersion change = builder.build().get();
855+
PartitionChangeRecord record = (PartitionChangeRecord) change.message();
856+
857+
assertEquals(NO_LEADER, record.leader());
858+
// There is no ISR update if we do not perform the leader verification on the ISR members.
859+
assertNull(record.isr(), record.toString());
860+
assertEquals(1, record.lastKnownElr().size(), record.toString());
861+
assertEquals(1, record.lastKnownElr().get(0), record.toString());
862+
partition = partition.merge((PartitionChangeRecord) builder.build().get().message());
863+
assertArrayEquals(new int[] {1}, partition.lastKnownElr);
864+
}
865+
866+
825867
@ParameterizedTest
826868
@MethodSource("partitionChangeRecordVersions")
827869
public void testEligibleLeaderReplicas_IsrExpandAboveMinISR(short version) {

0 commit comments

Comments
 (0)