Skip to content

Commit d824c4e

Browse files
RATIS-2317. Move acquire PendingRequestPermit out of synchronized block in RaftServerImpl#appendTransaction. (#1275)
1 parent b73c44c commit d824c4e

File tree

2 files changed

+33
-30
lines changed

2 files changed

+33
-30
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public RaftPeerRole getCurrentRole() {
171171

172172
@Override
173173
public boolean isLeaderReady() {
174-
return isLeader() && getRole().isLeaderReady();
174+
return getRole().isLeaderReady();
175175
}
176176

177177
@Override
@@ -752,19 +752,18 @@ RaftClientReply newExceptionReply(RaftClientRequest request, RaftException excep
752752
}
753753

754754
private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request) {
755+
try {
756+
assertGroup(getMemberId(), request);
757+
} catch (GroupMismatchException e) {
758+
return JavaUtils.completeExceptionally(e);
759+
}
755760
return checkLeaderState(request, null);
756761
}
757762

758763
/**
759764
* @return null if the server is in leader state.
760765
*/
761766
private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) {
762-
try {
763-
assertGroup(getMemberId(), request);
764-
} catch (GroupMismatchException e) {
765-
return RetryCacheImpl.failWithException(e, entry);
766-
}
767-
768767
if (!getInfo().isLeader()) {
769768
NotLeaderException exception = generateNotLeaderException();
770769
final RaftClientReply reply = newExceptionReply(request, exception);
@@ -809,6 +808,11 @@ void assertLifeCycleState(Set<LifeCycle.State> expected) throws ServerNotReadyEx
809808
getMemberId() + " is not in " + expected + ": current state is " + c), expected);
810809
}
811810

811+
private CompletableFuture<RaftClientReply> getResourceUnavailableReply(RaftClientRequest request, CacheEntry entry) {
812+
return entry.failWithException(new ResourceUnavailableException(
813+
getMemberId() + ": Failed to acquire a pending write request for " + request));
814+
}
815+
812816
/**
813817
* Handle a normal update request from client.
814818
*/
@@ -819,23 +823,33 @@ private CompletableFuture<RaftClientReply> appendTransaction(
819823

820824
assertLifeCycleState(LifeCycle.States.RUNNING);
821825

826+
final LeaderStateImpl unsyncedLeaderState = role.getLeaderState().orElse(null);
827+
if (unsyncedLeaderState == null) {
828+
final RaftClientReply reply = newExceptionReply(request, generateNotLeaderException());
829+
return RetryCacheImpl.failWithReply(reply, cacheEntry);
830+
}
831+
final PendingRequests.Permit unsyncedPermit = unsyncedLeaderState.tryAcquirePendingRequest(request.getMessage());
832+
if (unsyncedPermit == null) {
833+
return getResourceUnavailableReply(request, cacheEntry);
834+
}
835+
836+
final LeaderStateImpl leaderState;
822837
final PendingRequest pending;
823838
synchronized (this) {
824839
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, cacheEntry);
825840
if (reply != null) {
826841
return reply;
827842
}
828843

829-
// append the message to its local log
830-
final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
831-
writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());
832-
833-
final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage());
844+
leaderState = role.getLeaderStateNonNull();
845+
final PendingRequests.Permit permit = leaderState == unsyncedLeaderState? unsyncedPermit
846+
: leaderState.tryAcquirePendingRequest(request.getMessage());
834847
if (permit == null) {
835-
cacheEntry.failWithException(new ResourceUnavailableException(
836-
getMemberId() + ": Failed to acquire a pending write request for " + request));
837-
return cacheEntry.getReplyFuture();
848+
return getResourceUnavailableReply(request, cacheEntry);
838849
}
850+
851+
// append the message to its local log
852+
writeIndexCache.add(request.getClientId(), context.getLogIndexFuture());
839853
try {
840854
state.appendLog(context);
841855
} catch (StateMachineException e) {
@@ -853,13 +867,11 @@ private CompletableFuture<RaftClientReply> appendTransaction(
853867
// put the request into the pending queue
854868
pending = leaderState.addPendingRequest(permit, request, context);
855869
if (pending == null) {
856-
cacheEntry.failWithException(new ResourceUnavailableException(
870+
return cacheEntry.failWithException(new ResourceUnavailableException(
857871
getMemberId() + ": Failed to add a pending write request for " + request));
858-
return cacheEntry.getReplyFuture();
859872
}
860-
leaderState.notifySenders();
861873
}
862-
874+
leaderState.notifySenders();
863875
return pending.getFuture();
864876
}
865877

ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,10 @@ void failWithReply(RaftClientReply reply) {
8484
replyFuture.complete(reply);
8585
}
8686

87-
void failWithException(Throwable t) {
87+
CompletableFuture<RaftClientReply> failWithException(Throwable t) {
8888
failed = true;
8989
replyFuture.completeExceptionally(t);
90+
return replyFuture;
9091
}
9192

9293
@Override
@@ -266,14 +267,4 @@ static CompletableFuture<RaftClientReply> failWithReply(
266267
return CompletableFuture.completedFuture(reply);
267268
}
268269
}
269-
270-
static CompletableFuture<RaftClientReply> failWithException(
271-
Throwable t, CacheEntry entry) {
272-
if (entry != null) {
273-
entry.failWithException(t);
274-
return entry.getReplyFuture();
275-
} else {
276-
return JavaUtils.completeExceptionally(t);
277-
}
278-
}
279270
}

0 commit comments

Comments
 (0)