Skip to content

Commit de58f05

Browse files
authored
RATIS-2282. LogAppender Restart Due to Premature Log Entry Access During Concurrent Write Processing (#1249)
1 parent ca81a21 commit de58f05

File tree

1 file changed

+15
-17
lines changed
  • ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented

1 file changed

+15
-17
lines changed

ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -454,10 +454,13 @@ private void append(Op op, ReferenceCountedObject<LogEntryProto> entryRef,
454454
boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer) {
455455
final LogEntryProto entry = entryRef.retain();
456456
try {
457-
final LogRecord record = appendLogRecord(op, entry);
457+
final LogRecord record = new LogRecord(totalFileSize, entry);
458458
if (keepEntryInCache) {
459459
putEntryCache(record.getTermIndex(), entryRef, op);
460460
}
461+
appendLogRecord(op, record);
462+
totalFileSize += getEntrySize(entry, op);
463+
461464
if (logConsumer != null) {
462465
logConsumer.accept(entry);
463466
}
@@ -466,24 +469,22 @@ private void append(Op op, ReferenceCountedObject<LogEntryProto> entryRef,
466469
}
467470
}
468471

469-
470-
private LogRecord appendLogRecord(Op op, LogEntryProto entry) {
471-
Objects.requireNonNull(entry, "entry == null");
472+
private void appendLogRecord(Op op, LogRecord record) {
473+
Objects.requireNonNull(record, "record == null");
472474
final LogRecord currentLast = records.getLast();
475+
476+
final long index = record.getTermIndex().getIndex();
473477
if (currentLast == null) {
474-
Preconditions.assertTrue(entry.getIndex() == startIndex,
475-
"gap between start index %s and first entry to append %s",
476-
startIndex, entry.getIndex());
478+
Preconditions.assertTrue(index == startIndex,
479+
"%s: gap between start index %s and the entry to append %s", op, startIndex, index);
477480
} else {
478-
Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
479-
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
481+
final long currentLastIndex = currentLast.getTermIndex().getIndex();
482+
Preconditions.assertTrue(index == currentLastIndex + 1,
483+
"%s: gap between last entry %s and the entry to append %s", op, currentLastIndex, index);
480484
}
481485

482-
final LogRecord record = new LogRecord(totalFileSize, entry);
483486
records.append(record);
484-
totalFileSize += getEntrySize(entry, op);
485-
endIndex = entry.getIndex();
486-
return record;
487+
endIndex = index;
487488
}
488489

489490
ReferenceCountedObject<LogEntryProto> getEntryFromCache(TermIndex ti) {
@@ -514,10 +515,7 @@ synchronized ReferenceCountedObject<LogEntryProto> loadCache(TermIndex ti) throw
514515
}
515516

516517
LogRecord getLogRecord(long index) {
517-
if (index >= startIndex && index <= endIndex) {
518-
return records.get(index);
519-
}
520-
return null;
518+
return records.get(index);
521519
}
522520

523521
TermIndex getLastTermIndex() {

0 commit comments

Comments
 (0)