Skip to content

Commit 1e37121

Browse files
committed
RATIS-2235. Allow only one thread to perform appendLog (#1206)
1 parent 48f6e52 commit 1e37121

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
import java.util.concurrent.ThreadLocalRandom;
133133
import java.util.concurrent.TimeUnit;
134134
import java.util.concurrent.atomic.AtomicBoolean;
135+
import java.util.concurrent.atomic.AtomicReference;
135136
import java.util.function.Function;
136137
import java.util.function.Supplier;
137138
import java.util.stream.Collectors;
@@ -249,6 +250,8 @@ public long[] getFollowerNextIndices() {
249250
private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
250251
private final ThreadGroup threadGroup;
251252

253+
private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
254+
252255
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
253256
throws IOException {
254257
final RaftPeerId id = proxy.getId();
@@ -282,6 +285,7 @@ public long[] getFollowerNextIndices() {
282285
this.transferLeadership = new TransferLeadership(this, properties);
283286
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
284287
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);
288+
this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null));
285289

286290
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
287291
RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -1585,9 +1589,9 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
15851589
state.updateConfiguration(entries);
15861590
}
15871591
future.join();
1592+
final CompletableFuture<Void> appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null)
1593+
: appendLog(entries);
15881594

1589-
final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
1590-
: state.getLog().append(entries);
15911595
proto.getCommitInfosList().forEach(commitInfoCache::update);
15921596

15931597
CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
@@ -1601,7 +1605,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16011605

16021606
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
16031607
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
1604-
return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
1608+
return appendLog.whenCompleteAsync((r, t) -> {
16051609
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
16061610
timer.stop();
16071611
}, getServerExecutor()).thenApply(v -> {
@@ -1618,6 +1622,10 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16181622
return reply;
16191623
});
16201624
}
1625+
private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
1626+
return appendLogFuture.updateAndGet(f -> f.thenCompose(
1627+
ignored -> JavaUtils.allOf(state.getLog().append(entries))));
1628+
}
16211629

16221630
private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
16231631
// Check if a snapshot installation through state machine is in progress.

0 commit comments

Comments
 (0)