Skip to content

Commit 6f6dd7d

Browse files
committed
RATIS-2340. Add ReadIndexAsync executor for readIndex stateMachine query
1 parent 7e66f1a commit 6f6dd7d

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,9 @@ private CompletableFuture<Long> getReadIndex(RaftClientRequest request, LeaderSt
10541054
return writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex);
10551055
}
10561056

1057+
private final ExecutorService readAsyncExecutor = ConcurrentUtils.newThreadPoolWithMax(false,
1058+
Math.max(2, Runtime.getRuntime().availableProcessors()), "ReadAsync");
1059+
10571060
private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
10581061
if (request.getType().getRead().getPreferNonLinearizable()
10591062
|| readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
@@ -1087,7 +1090,8 @@ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request)
10871090

10881091
return replyFuture
10891092
.thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
1090-
.thenCompose(readIndex -> queryStateMachine(request))
1093+
.thenComposeAsync(ignored -> stateMachine.query(request.getMessage()), readAsyncExecutor)
1094+
.thenCompose(reply -> processQueryFuture(CompletableFuture.completedFuture(reply), request))
10911095
.exceptionally(e -> readException2Reply(request, e));
10921096
} else {
10931097
throw new IllegalStateException("Unexpected read option: " + readOption);

0 commit comments

Comments
 (0)