Skip to content

Commit b73cf97

Browse files
symiousszetszwo
andauthored
RATIS-2129. Low replication performance because of lock contention on RaftLog (#1322)
Co-authored-by: Tsz-Wo Nicholas Sze <[email protected]>
1 parent 7dae4db commit b73cf97

File tree

5 files changed

+90
-31
lines changed

5 files changed

+90
-31
lines changed

ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,16 @@ static void setSegmentSizeMax(RaftProperties properties, SizeInBytes segmentSize
415415
setSizeInBytes(properties::set, SEGMENT_SIZE_MAX_KEY, segmentSizeMax);
416416
}
417417

418+
String READ_LOCK_ENABLED_KEY = PREFIX + ".read.lock.enabled";
419+
boolean READ_LOCK_ENABLED_DEFAULT = true;
420+
static boolean readLockEnabled(RaftProperties properties) {
421+
return getBoolean(properties::getBoolean,
422+
READ_LOCK_ENABLED_KEY, READ_LOCK_ENABLED_DEFAULT, getDefaultLog());
423+
}
424+
static void setReadLockEnabled(RaftProperties properties, boolean readLockEnabled) {
425+
setBoolean(properties::setBoolean, READ_LOCK_ENABLED_KEY, readLockEnabled);
426+
}
427+
418428
/**
419429
* Besides the open segment, the max number of segments caching log entries.
420430
*/

ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@
3636

3737
import java.io.File;
3838
import java.io.IOException;
39-
import java.util.ArrayList;
4039
import java.nio.file.Path;
4140
import java.util.Comparator;
42-
import java.util.List;
4341
import java.util.Map;
4442
import java.util.Objects;
4543
import java.util.concurrent.ConcurrentHashMap;
44+
import java.util.concurrent.ConcurrentNavigableMap;
45+
import java.util.concurrent.ConcurrentSkipListMap;
4646
import java.util.concurrent.atomic.AtomicInteger;
4747
import java.util.concurrent.atomic.AtomicLong;
4848
import java.util.concurrent.atomic.AtomicReference;
@@ -105,6 +105,44 @@ long getOffset() {
105105
}
106106
}
107107

108+
private static class Records {
109+
private final ConcurrentNavigableMap<Long, LogRecord> map = new ConcurrentSkipListMap<>();
110+
111+
int size() {
112+
return map.size();
113+
}
114+
115+
LogRecord getFirst() {
116+
final Map.Entry<Long, LogRecord> first = map.firstEntry();
117+
return first != null? first.getValue() : null;
118+
}
119+
120+
LogRecord getLast() {
121+
final Map.Entry<Long, LogRecord> last = map.lastEntry();
122+
return last != null? last.getValue() : null;
123+
}
124+
125+
LogRecord get(long i) {
126+
return map.get(i);
127+
}
128+
129+
long append(LogRecord record) {
130+
final long index = record.getTermIndex().getIndex();
131+
final LogRecord previous = map.put(index, record);
132+
Preconditions.assertNull(previous, "previous");
133+
return index;
134+
}
135+
136+
LogRecord removeLast() {
137+
final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
138+
return Objects.requireNonNull(last, "last == null").getValue();
139+
}
140+
141+
void clear() {
142+
map.clear();
143+
}
144+
}
145+
108146
static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
109147
SegmentedRaftLogMetrics raftLogMetrics) {
110148
Preconditions.assertTrue(start >= 0);
@@ -204,10 +242,12 @@ private void assertSegment(long expectedStart, int expectedEntryCount, boolean c
204242
final long expectedLastIndex = expectedStart + expectedEntryCount - 1;
205243
Preconditions.assertSame(expectedLastIndex, getEndIndex(), "Segment end index");
206244

207-
final LogRecord last = getLastRecord();
245+
final LogRecord last = records.getLast();
208246
if (last != null) {
209247
Preconditions.assertSame(expectedLastIndex, last.getTermIndex().getIndex(), "Index at the last record");
210-
Preconditions.assertSame(expectedStart, records.get(0).getTermIndex().getIndex(), "Index at the first record");
248+
final LogRecord first = records.getFirst();
249+
Objects.requireNonNull(first, "first record");
250+
Preconditions.assertSame(expectedStart, first.getTermIndex().getIndex(), "Index at the first record");
211251
}
212252
if (!corrupted) {
213253
Preconditions.assertSame(expectedEnd, expectedLastIndex, "End/last Index");
@@ -272,7 +312,7 @@ File getFile() {
272312
/**
273313
* the list of records is more like the index of a segment
274314
*/
275-
private final List<LogRecord> records = new ArrayList<>();
315+
private final Records records = new Records();
276316
/**
277317
* the entryCache caches the content of log entries.
278318
*/
@@ -293,15 +333,19 @@ long getStartIndex() {
293333
}
294334

295335
long getEndIndex() {
296-
return endIndex;
336+
if (!isOpen) {
337+
return endIndex;
338+
}
339+
final LogRecord last = records.getLast();
340+
return last == null ? getStartIndex() - 1 : last.getTermIndex().getIndex();
297341
}
298342

299343
boolean isOpen() {
300344
return isOpen;
301345
}
302346

303347
int numOfEntries() {
304-
return Math.toIntExact(endIndex - startIndex + 1);
348+
return Math.toIntExact(getEndIndex() - startIndex + 1);
305349
}
306350

307351
CorruptionPolicy getLogCorruptionPolicy() {
@@ -315,14 +359,12 @@ void appendToOpenSegment(LogEntryProto entry, Op op) {
315359

316360
private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
317361
Objects.requireNonNull(entry, "entry == null");
318-
if (records.isEmpty()) {
362+
final LogRecord currentLast = records.getLast();
363+
if (currentLast == null) {
319364
Preconditions.assertTrue(entry.getIndex() == startIndex,
320365
"gap between start index %s and first entry to append %s",
321366
startIndex, entry.getIndex());
322-
}
323-
324-
final LogRecord currentLast = getLastRecord();
325-
if (currentLast != null) {
367+
} else {
326368
Preconditions.assertTrue(entry.getIndex() == currentLast.getTermIndex().getIndex() + 1,
327369
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
328370
}
@@ -331,7 +373,7 @@ private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
331373
if (keepEntryInCache) {
332374
putEntryCache(record.getTermIndex(), entry, op);
333375
}
334-
records.add(record);
376+
records.append(record);
335377
totalFileSize += getEntrySize(entry, op);
336378
endIndex = entry.getIndex();
337379
}
@@ -358,18 +400,14 @@ synchronized LogEntryProto loadCache(LogRecord record) throws RaftLogIOException
358400
}
359401

360402
LogRecord getLogRecord(long index) {
361-
if (index >= startIndex && index <= endIndex) {
362-
return records.get(Math.toIntExact(index - startIndex));
403+
if (index >= startIndex && index <= getEndIndex()) {
404+
return records.get(index);
363405
}
364406
return null;
365407
}
366408

367-
private LogRecord getLastRecord() {
368-
return records.isEmpty() ? null : records.get(records.size() - 1);
369-
}
370-
371409
TermIndex getLastTermIndex() {
372-
LogRecord last = getLastRecord();
410+
final LogRecord last = records.getLast();
373411
return last == null ? null : last.getTermIndex();
374412
}
375413

@@ -387,7 +425,8 @@ long getTotalCacheSize() {
387425
synchronized void truncate(long fromIndex) {
388426
Preconditions.assertTrue(fromIndex >= startIndex && fromIndex <= endIndex);
389427
for (long index = endIndex; index >= fromIndex; index--) {
390-
LogRecord removed = records.remove(Math.toIntExact(index - startIndex));
428+
final LogRecord removed = records.removeLast();
429+
Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex");
391430
removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
392431
totalFileSize = removed.offset;
393432
}
@@ -458,7 +497,7 @@ boolean hasCache() {
458497
}
459498

460499
boolean containsIndex(long index) {
461-
return startIndex <= index && endIndex >= index;
500+
return startIndex <= index && getEndIndex() >= index;
462501
}
463502

464503
boolean hasEntries() {

ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ public TransactionContext getTransactionContext(LogEntryProto entry, boolean cre
202202
private final long segmentMaxSize;
203203
private final boolean stateMachineCachingEnabled;
204204
private final SegmentedRaftLogMetrics metrics;
205+
private final boolean readLockEnabled;
205206

206207
@SuppressWarnings({"squid:S2095"}) // Suppress closeable warning
207208
private SegmentedRaftLog(Builder b) {
@@ -217,6 +218,12 @@ private SegmentedRaftLog(Builder b) {
217218
this.fileLogWorker = new SegmentedRaftLogWorker(b.memberId, stateMachine,
218219
b.submitUpdateCommitEvent, b.server, storage, b.properties, getRaftLogMetrics());
219220
stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(b.properties);
221+
this.readLockEnabled = RaftServerConfigKeys.Log.readLockEnabled(b.properties);
222+
}
223+
224+
@Override
225+
public AutoCloseableLock readLock() {
226+
return readLockEnabled ? super.readLock() : null;
220227
}
221228

222229
@Override
@@ -338,8 +345,7 @@ private void checkAndEvictCache() {
338345
public TermIndex getTermIndex(long index) {
339346
checkLogState();
340347
try(AutoCloseableLock readLock = readLock()) {
341-
LogRecord record = cache.getLogRecord(index);
342-
return record != null ? record.getTermIndex() : null;
348+
return cache.getTermIndex(index);
343349
}
344350
}
345351

ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -547,9 +547,13 @@ LogSegment getSegment(long index) {
547547
}
548548
}
549549

550-
LogRecord getLogRecord(long index) {
550+
TermIndex getTermIndex(long index) {
551551
LogSegment segment = getSegment(index);
552-
return segment == null ? null : segment.getLogRecord(index);
552+
if (segment == null) {
553+
return null;
554+
}
555+
final LogRecord record = segment.getLogRecord(index);
556+
return record != null ? record.getTermIndex() : null;
553557
}
554558

555559
/**
@@ -610,8 +614,9 @@ long getLastIndexInClosedSegments() {
610614

611615
TermIndex getLastTermIndex() {
612616
try (AutoCloseableLock readLock = closedSegments.readLock()) {
613-
return (openSegment != null && openSegment.numOfEntries() > 0) ?
614-
openSegment.getLastTermIndex() :
617+
LogSegment tmpSegment = openSegment;
618+
return (tmpSegment != null && tmpSegment.getLastTermIndex() != null) ?
619+
tmpSegment.getLastTermIndex() :
615620
(closedSegments.isEmpty() ? null :
616621
closedSegments.get(closedSegments.size() - 1).getLastTermIndex());
617622
}

ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import static org.apache.ratis.server.metrics.SegmentedRaftLogMetrics.*;
2121
import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogTestUtils.MAX_OP_SIZE;
2222

23-
import java.io.IOException;
2423
import java.util.Iterator;
2524
import java.util.stream.IntStream;
2625

@@ -282,12 +281,12 @@ private void populatedSegment(int start, int end, int segmentSize, boolean isOpe
282281
});
283282
}
284283

285-
private void testIterator(long startIndex) throws IOException {
284+
private void testIterator(long startIndex) {
286285
Iterator<TermIndex> iterator = cache.iterator(startIndex);
287286
TermIndex prev = null;
288287
while (iterator.hasNext()) {
289288
TermIndex termIndex = iterator.next();
290-
Assertions.assertEquals(cache.getLogRecord(termIndex.getIndex()).getTermIndex(), termIndex);
289+
Assertions.assertEquals(cache.getTermIndex(termIndex.getIndex()), termIndex);
291290
if (prev != null) {
292291
Assertions.assertEquals(prev.getIndex() + 1, termIndex.getIndex());
293292
}

0 commit comments

Comments
 (0)