Skip to content

Commit 2fe59d3

Browse files
ivandika3OneSizeFitsQuorum
authored andcommitted
RATIS-2186. Raft log should not purge index lower than the log start index (#1175)
1 parent 5ed4341 commit 2fe59d3

File tree

3 files changed

+45
-11
lines changed

3 files changed

+45
-11
lines changed

ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -314,20 +314,28 @@ public final CompletableFuture<Long> truncate(long index) {
314314

315315
@Override
316316
public final CompletableFuture<Long> purge(long suggestedIndex) {
317+
final long adjustedIndex;
317318
if (purgePreservation > 0) {
318319
final long currentIndex = getNextIndex() - 1;
319-
suggestedIndex = Math.min(suggestedIndex, currentIndex - purgePreservation);
320+
adjustedIndex = Math.min(suggestedIndex, currentIndex - purgePreservation);
321+
} else {
322+
adjustedIndex = suggestedIndex;
320323
}
321324
final long lastPurge = purgeIndex.get();
322-
if (suggestedIndex - lastPurge < purgeGap) {
325+
if (adjustedIndex - lastPurge < purgeGap) {
326+
return CompletableFuture.completedFuture(lastPurge);
327+
}
328+
final long startIndex = getStartIndex();
329+
if (adjustedIndex < startIndex) {
330+
LOG.info("{}: purge({}) is skipped: adjustedIndex = {} < startIndex = {}, purgePreservation = {}",
331+
getName(), suggestedIndex, adjustedIndex, startIndex, purgePreservation);
323332
return CompletableFuture.completedFuture(lastPurge);
324333
}
325-
LOG.info("{}: purge {}", getName(), suggestedIndex);
326-
final long finalSuggestedIndex = suggestedIndex;
327-
return purgeImpl(suggestedIndex).whenComplete((purged, e) -> {
334+
LOG.info("{}: purge {}", getName(), adjustedIndex );
335+
return purgeImpl(adjustedIndex).whenComplete((purged, e) -> {
328336
updatePurgeIndex(purged);
329337
if (e != null) {
330-
LOG.warn(getName() + ": Failed to purge " + finalSuggestedIndex, e);
338+
LOG.warn(getName() + ": Failed to purge " + adjustedIndex, e);
331339
}
332340
});
333341
}

ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,11 @@ TruncationSegments truncate(long index, LogSegment openSegment, Runnable clearOp
356356
TruncationSegments purge(long index) {
357357
try (AutoCloseableLock writeLock = writeLock()) {
358358
int segmentIndex = binarySearch(index);
359-
List<SegmentFileInfo> list = new ArrayList<>();
359+
if (segmentIndex == -1) {
360+
// nothing to purge
361+
return null;
362+
}
363+
List<SegmentFileInfo> list = new LinkedList<>();
360364

361365
if (segmentIndex == -segments.size() - 1) {
362366
for (LogSegment ls : segments) {

ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,7 @@ public void testPurgeOnOpenSegment() throws Exception {
563563
int segmentSize = 200;
564564
long beginIndexOfOpenSegment = segmentSize * (endTerm - startTerm - 1);
565565
long expectedIndex = segmentSize * (endTerm - startTerm - 1);
566+
long purgePreservation = 0L;
566567
purgeAndVerify(startTerm, endTerm, segmentSize, 1, beginIndexOfOpenSegment, expectedIndex);
567568
}
568569

@@ -599,15 +600,36 @@ public void testPurgeOnClosedSegmentsWithPurgeGap() throws Exception {
599600
purgeAndVerify(startTerm, endTerm, segmentSize, 1000, endIndexOfClosedSegment, expectedIndex);
600601
}
601602

603+
@Test
604+
public void testPurgeWithLargePurgePreservationAndSmallPurgeGap() throws Exception {
605+
int startTerm = 0;
606+
int endTerm = 5;
607+
int segmentSize = 200;
608+
long endIndex = segmentSize * (endTerm - startTerm) - 1;
609+
// start index is set so that the suggested index will not be negative, which will not trigger any purge
610+
long startIndex = 200;
611+
// purge preservation is larger than the total size of the log entries
612+
// which causes suggested index to be lower than the start index
613+
long purgePreservation = (segmentSize * (endTerm - startTerm )) + 100;
614+
// if the suggested index is lower than the start index due to the purge preservation, we should not purge anything
615+
purgeAndVerify(startTerm, endTerm, segmentSize, 1, endIndex, startIndex, startIndex, purgePreservation);
616+
}
617+
602618
private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex,
603-
long expectedIndex) throws Exception {
604-
List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, 0);
619+
long expectedIndex) throws Exception {
620+
purgeAndVerify(startTerm, endTerm, segmentSize, purgeGap, purgeIndex, expectedIndex, 0, 0);
621+
}
622+
623+
private void purgeAndVerify(int startTerm, int endTerm, int segmentSize, int purgeGap, long purgeIndex,
624+
long expectedIndex, long startIndex, long purgePreservation) throws Exception {
625+
List<SegmentRange> ranges = prepareRanges(startTerm, endTerm, segmentSize, startIndex);
605626
List<LogEntryProto> entries = prepareLogEntries(ranges, null);
606627

607628
final RaftProperties p = new RaftProperties();
608629
RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
609-
try (SegmentedRaftLog raftLog = newSegmentedRaftLog(storage, p)) {
610-
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
630+
RaftServerConfigKeys.Log.setPurgePreservationLogNum(p, purgePreservation);
631+
try (SegmentedRaftLog raftLog = newSegmentedRaftLogWithSnapshotIndex(storage, p, () -> startIndex - 1)) {
632+
raftLog.open(startIndex - 1, null);
611633
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
612634
final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
613635
final Long purged = f.get();

0 commit comments

Comments
 (0)