Skip to content

Commit 9b74401

Browse files
authored
RATIS-2235. Allow only one thread to perform appendLog (#1206)
1 parent 5e6cc9d commit 9b74401

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
import java.util.concurrent.ThreadLocalRandom;
134134
import java.util.concurrent.TimeUnit;
135135
import java.util.concurrent.atomic.AtomicBoolean;
136+
import java.util.concurrent.atomic.AtomicReference;
136137
import java.util.function.Function;
137138
import java.util.function.Supplier;
138139
import java.util.stream.Collectors;
@@ -250,6 +251,8 @@ public long[] getFollowerNextIndices() {
250251
private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
251252
private final ThreadGroup threadGroup;
252253

254+
private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
255+
253256
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
254257
throws IOException {
255258
final RaftPeerId id = proxy.getId();
@@ -283,6 +286,7 @@ public long[] getFollowerNextIndices() {
283286
this.transferLeadership = new TransferLeadership(this, properties);
284287
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
285288
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);
289+
this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null));
286290

287291
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
288292
RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -1641,9 +1645,9 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16411645
state.updateConfiguration(entries);
16421646
}
16431647
future.join();
1648+
final CompletableFuture<Void> appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null)
1649+
: appendLog(requestRef.delegate(entries));
16441650

1645-
final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
1646-
: state.getLog().append(requestRef.delegate(entries));
16471651
proto.getCommitInfosList().forEach(commitInfoCache::update);
16481652

16491653
CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
@@ -1657,7 +1661,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16571661

16581662
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
16591663
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
1660-
return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
1664+
return appendLog.whenCompleteAsync((r, t) -> {
16611665
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
16621666
timer.stop();
16631667
}, getServerExecutor()).thenApply(v -> {
@@ -1674,6 +1678,12 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16741678
return reply;
16751679
});
16761680
}
1681+
private CompletableFuture<Void> appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
1682+
entriesRef.retain();
1683+
return appendLogFuture.updateAndGet(f -> f.thenCompose(
1684+
ignored -> JavaUtils.allOf(state.getLog().append(entriesRef))))
1685+
.whenComplete((v, e) -> entriesRef.release());
1686+
}
16771687

16781688
private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
16791689
// Check if a snapshot installation through state machine is in progress.

0 commit comments

Comments
 (0)