Skip to content

Commit b6ccd12

Browse files
committed
Account trimAboveSeqNo in committed translog generation (#50205)
Today we do not consider trimAboveSeqNo when calculating the translog generation of an index commit. If there is no new indexing after the primary promotion, then we won't be able to clean up the translog.
1 parent 3ccd93f commit b6ccd12

File tree

3 files changed

+18
-6
lines changed

3 files changed

+18
-6
lines changed

server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,17 @@ private void write(DataOutput out) throws IOException {
120120
out.writeLong(trimmedAboveSeqNo);
121121
}
122122

123+
/**
124+
* Returns the maximum sequence number of operations in this checkpoint after applying {@link #trimmedAboveSeqNo}.
125+
*/
126+
long maxEffectiveSeqNo() {
127+
if (trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
128+
return maxSeqNo;
129+
} else {
130+
return Math.min(trimmedAboveSeqNo, maxSeqNo);
131+
}
132+
}
133+
123134
static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint,
124135
long minTranslogGeneration) {
125136
final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;

server/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -702,11 +702,7 @@ private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo)
702702
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
703703
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
704704
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
705-
return Stream.concat(readers.stream(), Stream.of(current))
706-
.filter(reader -> {
707-
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
708-
return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
709-
});
705+
return Stream.concat(readers.stream(), Stream.of(current)).filter(reader -> minSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo());
710706
}
711707

712708
/**
@@ -1638,7 +1634,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
16381634
*/
16391635
long minTranslogFileGeneration = this.currentFileGeneration();
16401636
for (final TranslogReader reader : readers) {
1641-
if (seqNo <= reader.getCheckpoint().maxSeqNo) {
1637+
if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) {
16421638
minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration());
16431639
}
16441640
}

server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,11 @@ public void testRollbackOnPromotion() throws Exception {
802802
shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback);
803803
done.set(true);
804804
thread.join();
805+
806+
for (IndexShard shard : shards) {
807+
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
808+
assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0));
809+
}
805810
}
806811
}
807812

0 commit comments

Comments
 (0)