Skip to content

Commit 722bb3a

Browse files
committed
RATIS-2235. Allow only one thread to perform appendLog (#1206)
1 parent 66cd822 commit 722bb3a

File tree

1 file changed

+24
-5
lines changed

1 file changed

+24
-5
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
import java.util.concurrent.ThreadLocalRandom;
133133
import java.util.concurrent.TimeUnit;
134134
import java.util.concurrent.atomic.AtomicBoolean;
135+
import java.util.concurrent.atomic.AtomicReference;
135136
import java.util.function.Function;
136137
import java.util.function.Supplier;
137138
import java.util.stream.Collectors;
@@ -249,6 +250,8 @@ public long[] getFollowerNextIndices() {
249250
private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true);
250251
private final ThreadGroup threadGroup;
251252

253+
private final AtomicReference<CompletableFuture<Void>> appendLogFuture;
254+
252255
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option)
253256
throws IOException {
254257
final RaftPeerId id = proxy.getId();
@@ -282,6 +285,7 @@ public long[] getFollowerNextIndices() {
282285
this.transferLeadership = new TransferLeadership(this, properties);
283286
this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
284287
this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);
288+
this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null));
285289

286290
this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax(
287291
RaftServerConfigKeys.ThreadPool.serverCached(properties),
@@ -630,6 +634,15 @@ synchronized void changeToLeader() {
630634

631635
@Override
632636
public Collection<CommitInfoProto> getCommitInfos() {
637+
try {
638+
return getCommitInfosImpl();
639+
} catch (Throwable t) {
640+
LOG.warn("{} Failed to getCommitInfos", getMemberId(), t);
641+
return Collections.emptyList();
642+
}
643+
}
644+
645+
private Collection<CommitInfoProto> getCommitInfosImpl() {
633646
final List<CommitInfoProto> infos = new ArrayList<>();
634647
// add the commit info of this server
635648
final long commitIndex = updateCommitInfoCache();
@@ -803,8 +816,6 @@ private CompletableFuture<RaftClientReply> appendTransaction(
803816
CodeInjectionForTesting.execute(APPEND_TRANSACTION, getId(),
804817
request.getClientId(), request, context, cacheEntry);
805818

806-
assertLifeCycleState(LifeCycle.States.RUNNING);
807-
808819
final PendingRequest pending;
809820
synchronized (this) {
810821
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry);
@@ -823,6 +834,7 @@ private CompletableFuture<RaftClientReply> appendTransaction(
823834
return cacheEntry.getReplyFuture();
824835
}
825836
try {
837+
assertLifeCycleState(LifeCycle.States.RUNNING);
826838
state.appendLog(context);
827839
} catch (StateMachineException e) {
828840
// the StateMachineException is thrown by the SM in the preAppend stage.
@@ -834,6 +846,9 @@ private CompletableFuture<RaftClientReply> appendTransaction(
834846
leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION);
835847
}
836848
return CompletableFuture.completedFuture(exceptionReply);
849+
} catch (ServerNotReadyException e) {
850+
final RaftClientReply exceptionReply = newExceptionReply(request, e);
851+
return CompletableFuture.completedFuture(exceptionReply);
837852
}
838853

839854
// put the request into the pending queue
@@ -1585,9 +1600,9 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
15851600
state.updateConfiguration(entries);
15861601
}
15871602
future.join();
1603+
final CompletableFuture<Void> appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null)
1604+
: appendLog(entries);
15881605

1589-
final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
1590-
: state.getLog().append(entries);
15911606
proto.getCommitInfosList().forEach(commitInfoCache::update);
15921607

15931608
CodeInjectionForTesting.execute(LOG_SYNC, getId(), null);
@@ -1601,7 +1616,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16011616

16021617
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
16031618
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
1604-
return JavaUtils.allOf(futures).whenCompleteAsync((r, t) -> {
1619+
return appendLog.whenCompleteAsync((r, t) -> {
16051620
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
16061621
timer.stop();
16071622
}, getServerExecutor()).thenApply(v -> {
@@ -1618,6 +1633,10 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16181633
return reply;
16191634
});
16201635
}
1636+
private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
1637+
return appendLogFuture.updateAndGet(f -> f.thenCompose(
1638+
ignored -> JavaUtils.allOf(state.getLog().append(entries))));
1639+
}
16211640

16221641
private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryProto> entries) {
16231642
// Check if a snapshot installation through state machine is in progress.

0 commit comments

Comments
 (0)