Skip to content

Commit 7f10888

Browse files
133tosakarinOneSizeFitsQuorum
authored andcommitted
RATIS-2174. Move future.join outside the lock (#1168)
1 parent fbb9b5a commit 7f10888

File tree

4 files changed

+44
-34
lines changed

4 files changed

+44
-34
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@ private void startIfNew(Runnable starter) {
227227

228228
CompletableFuture<Void> shutdown() {
229229
lifeCycle.checkStateAndClose();
230-
stopped.complete(null);
231230
return stopped;
232231
}
233232

ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -705,7 +705,7 @@ void submitStepDownEvent(long term, StepDownReason reason) {
705705
private void stepDown(long term, StepDownReason reason) {
706706
try {
707707
lease.getAndSetEnabled(false);
708-
server.changeToFollowerAndPersistMetadata(term, false, reason);
708+
server.changeToFollowerAndPersistMetadata(term, false, reason).join();
709709
pendingStepDown.complete(server::newSuccessReply);
710710
} catch(IOException e) {
711711
final String s = this + ": Failed to persist metadata for term " + term;

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@
132132
import java.util.concurrent.ThreadLocalRandom;
133133
import java.util.concurrent.TimeUnit;
134134
import java.util.concurrent.atomic.AtomicBoolean;
135-
import java.util.concurrent.atomic.AtomicReference;
136135
import java.util.function.Function;
137136
import java.util.function.Supplier;
138137
import java.util.stream.Collectors;
@@ -570,14 +569,8 @@ void setFirstElection(Object reason) {
570569
* @param force Force to start a new {@link FollowerState} even if this server is already a follower.
571570
* @return if the term/votedFor should be updated to the new term
572571
*/
573-
private boolean changeToFollower(long newTerm, boolean force, boolean allowListener, Object reason) {
574-
final AtomicReference<Boolean> metadataUpdated = new AtomicReference<>();
575-
changeToFollowerAsync(newTerm, force, allowListener, reason, metadataUpdated).join();
576-
return metadataUpdated.get();
577-
}
578-
579-
private synchronized CompletableFuture<Void> changeToFollowerAsync(
580-
long newTerm, boolean force, boolean allowListener, Object reason, AtomicReference<Boolean> metadataUpdated) {
572+
private synchronized CompletableFuture<Void> changeToFollower(
573+
long newTerm, boolean force, boolean allowListener, Object reason, AtomicBoolean metadataUpdated) {
581574
final RaftPeerRole old = role.getCurrentRole();
582575
if (old == RaftPeerRole.LISTENER && !allowListener) {
583576
throw new IllegalStateException("Unexpected role " + old);
@@ -612,13 +605,16 @@ private synchronized CompletableFuture<Void> changeToFollowerAsync(
612605
return future;
613606
}
614607

615-
synchronized void changeToFollowerAndPersistMetadata(
608+
synchronized CompletableFuture<Void> changeToFollowerAndPersistMetadata(
616609
long newTerm,
617610
boolean allowListener,
618611
Object reason) throws IOException {
619-
if (changeToFollower(newTerm, false, allowListener, reason)) {
612+
final AtomicBoolean metadataUpdated = new AtomicBoolean();
613+
final CompletableFuture<Void> future = changeToFollower(newTerm, false, allowListener, reason, metadataUpdated);
614+
if (metadataUpdated.get()) {
620615
state.persistMetadata();
621616
}
617+
return future;
622618
}
623619

624620
synchronized void changeToLeader() {
@@ -1406,6 +1402,7 @@ private RequestVoteReplyProto requestVote(Phase phase,
14061402

14071403
boolean shouldShutdown = false;
14081404
final RequestVoteReplyProto reply;
1405+
CompletableFuture<Void> future = null;
14091406
synchronized (this) {
14101407
// Check life cycle state again to avoid the PAUSING/PAUSED state.
14111408
assertLifeCycleState(LifeCycle.States.RUNNING);
@@ -1415,12 +1412,12 @@ private RequestVoteReplyProto requestVote(Phase phase,
14151412
final boolean voteGranted = context.decideVote(candidate, candidateLastEntry);
14161413
if (candidate != null && phase == Phase.ELECTION) {
14171414
// change server state in the ELECTION phase
1418-
final boolean termUpdated =
1419-
changeToFollower(candidateTerm, true, false, "candidate:" + candidateId);
1415+
final AtomicBoolean termUpdated = new AtomicBoolean();
1416+
future = changeToFollower(candidateTerm, true, false, "candidate:" + candidateId, termUpdated);
14201417
if (voteGranted) {
14211418
state.grantVote(candidate.getId());
14221419
}
1423-
if (termUpdated || voteGranted) {
1420+
if (termUpdated.get() || voteGranted) {
14241421
state.persistMetadata(); // sync metafile
14251422
}
14261423
}
@@ -1436,6 +1433,9 @@ private RequestVoteReplyProto requestVote(Phase phase,
14361433
getMemberId(), phase, toRequestVoteReplyString(reply), state);
14371434
}
14381435
}
1436+
if (future != null) {
1437+
future.join();
1438+
}
14391439
return reply;
14401440
}
14411441

@@ -1533,6 +1533,7 @@ private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(RaftPeerId
15331533
final long followerCommit = state.getLog().getLastCommittedIndex();
15341534
final Optional<FollowerState> followerState;
15351535
final Timekeeper.Context timer = raftServerMetrics.getFollowerAppendEntryTimer(isHeartbeat).time();
1536+
final CompletableFuture<Void> future;
15361537
synchronized (this) {
15371538
// Check life cycle state again to avoid the PAUSING/PAUSED state.
15381539
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
@@ -1544,7 +1545,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, state.getNextIndex(),
15441545
AppendResult.NOT_LEADER, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat));
15451546
}
15461547
try {
1547-
changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
1548+
future = changeToFollowerAndPersistMetadata(leaderTerm, true, "appendEntries");
15481549
} catch (IOException e) {
15491550
return JavaUtils.completeExceptionally(e);
15501551
}
@@ -1569,11 +1570,12 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
15691570
AppendResult.INCONSISTENCY, callId, RaftLog.INVALID_LOG_INDEX, isHeartbeat);
15701571
LOG.info("{}: appendEntries* reply {}", getMemberId(), toAppendEntriesReplyString(reply));
15711572
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
1572-
return CompletableFuture.completedFuture(reply);
1573+
return future.thenApply(dummy -> reply);
15731574
}
15741575

15751576
state.updateConfiguration(entries);
15761577
}
1578+
future.join();
15771579

15781580
final List<CompletableFuture<Long>> futures = entries.isEmpty() ? Collections.emptyList()
15791581
: state.getLog().append(entries);

ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646

4747
import java.io.IOException;
4848
import java.util.Optional;
49+
import java.util.concurrent.CompletableFuture;
4950
import java.util.concurrent.atomic.AtomicBoolean;
5051
import java.util.concurrent.atomic.AtomicInteger;
5152
import java.util.concurrent.atomic.AtomicLong;
@@ -122,12 +123,12 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
122123
if (installSnapshotEnabled) {
123124
// Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot.
124125
if (request.hasSnapshotChunk()) {
125-
reply = checkAndInstallSnapshot(request, leaderId);
126+
reply = checkAndInstallSnapshot(request, leaderId).join();
126127
}
127128
} else {
128129
// Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot.
129130
if (request.hasNotification()) {
130-
reply = notifyStateMachineToInstallSnapshot(request, leaderId);
131+
reply = notifyStateMachineToInstallSnapshot(request, leaderId).join();
131132
}
132133
}
133134

@@ -156,21 +157,22 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
156157
return failedReply;
157158
}
158159

159-
private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequestProto request,
160+
private CompletableFuture<InstallSnapshotReplyProto> checkAndInstallSnapshot(InstallSnapshotRequestProto request,
160161
RaftPeerId leaderId) throws IOException {
161162
final long currentTerm;
162163
final long leaderTerm = request.getLeaderTerm();
163164
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
164165
final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
165166
final long lastIncludedIndex = lastIncluded.getIndex();
167+
final CompletableFuture<Void> future;
166168
synchronized (server) {
167169
final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm);
168170
currentTerm = state.getCurrentTerm();
169171
if (!recognized) {
170-
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
171-
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
172+
return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(),
173+
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER));
172174
}
173-
server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
175+
future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
174176
state.setLeader(leaderId, "installSnapshot");
175177

176178
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
@@ -186,8 +188,9 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest
186188
// have a lot of requests
187189
if (state.getLog().getLastCommittedIndex() >= lastIncludedIndex) {
188190
nextChunkIndex.set(snapshotChunkRequest.getRequestIndex() + 1);
189-
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
191+
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
190192
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.ALREADY_INSTALLED);
193+
return future.thenApply(dummy -> reply);
191194
}
192195

193196
//TODO: We should only update State with installed snapshot once the request is done.
@@ -210,25 +213,27 @@ private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequest
210213
if (snapshotChunkRequest.getDone()) {
211214
LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex);
212215
}
213-
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
216+
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
214217
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
218+
return future.thenApply(dummy -> reply);
215219
}
216220

217-
private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
221+
private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstallSnapshot(
218222
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
219223
final long currentTerm;
220224
final long leaderTerm = request.getLeaderTerm();
221225
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
222226
request.getNotification().getFirstAvailableTermIndex());
223227
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
228+
final CompletableFuture<Void> future;
224229
synchronized (server) {
225230
final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
226231
currentTerm = state.getCurrentTerm();
227232
if (!recognized) {
228-
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
229-
currentTerm, InstallSnapshotResult.NOT_LEADER);
233+
return CompletableFuture.completedFuture(toInstallSnapshotReplyProto(leaderId, getMemberId(),
234+
currentTerm, InstallSnapshotResult.NOT_LEADER));
230235
}
231-
server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
236+
future = server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
232237
state.setLeader(leaderId, "installSnapshot");
233238
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
234239

@@ -245,8 +250,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
245250
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
246251
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
247252
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
248-
return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
253+
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
249254
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
255+
return future.thenApply(dummy -> reply);
250256
}
251257

252258
final RaftPeerProto leaderProto;
@@ -323,8 +329,9 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
323329
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
324330
server.getStateMachine().event().notifySnapshotInstalled(
325331
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
326-
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
332+
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
327333
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
334+
return future.thenApply(dummy -> reply);
328335
}
329336

330337
// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
@@ -341,17 +348,19 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
341348
server.getStateMachine().event().notifySnapshotInstalled(
342349
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
343350
installedIndex.set(latestInstalledIndex);
344-
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
351+
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
345352
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());
353+
return future.thenApply(dummy -> reply);
346354
}
347355

348356
// Otherwise, Snapshot installation is in progress.
349357
if (LOG.isDebugEnabled()) {
350358
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
351359
InstallSnapshotResult.IN_PROGRESS);
352360
}
353-
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
361+
final InstallSnapshotReplyProto reply = toInstallSnapshotReplyProto(leaderId, getMemberId(),
354362
currentTerm, InstallSnapshotResult.IN_PROGRESS);
363+
return future.thenApply(dummy -> reply);
355364
}
356365
}
357366

0 commit comments

Comments
 (0)