Skip to content

Commit 8634542

Browse files
authored
RATIS-2294. Fix NettyClientRpc exception and timeout handling (#1264)
1 parent 2eda35d commit 8634542

File tree

3 files changed

+61
-9
lines changed

3 files changed

+61
-9
lines changed

ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.ratis.netty.client;
1919

20+
import org.apache.ratis.client.RaftClientConfigKeys;
2021
import org.apache.ratis.client.impl.ClientProtoUtils;
2122
import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
2223
import org.apache.ratis.conf.RaftProperties;
@@ -28,31 +29,76 @@
2829
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
2930
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
3031
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
32+
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
3133
import org.apache.ratis.util.JavaUtils;
34+
import org.apache.ratis.util.TimeDuration;
35+
import org.apache.ratis.util.TimeoutExecutor;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
3238

3339
import java.io.IOException;
3440
import java.util.concurrent.CompletableFuture;
3541

3642
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
43+
44+
public static final Logger LOG = LoggerFactory.getLogger(NettyClientRpc.class);
45+
46+
private ClientId clientId;
47+
private final TimeDuration requestTimeout;
48+
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
49+
3750
public NettyClientRpc(ClientId clientId, RaftProperties properties) {
3851
super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
52+
this.clientId = clientId;
53+
this.requestTimeout = RaftClientConfigKeys.Rpc.requestTimeout(properties);
3954
}
4055

4156
@Override
4257
public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest request) {
4358
final RaftPeerId serverId = request.getServerId();
59+
long callId = request.getCallId();
4460
try {
4561
final NettyRpcProxy proxy = getProxies().getProxy(serverId);
4662
final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request);
47-
return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
63+
final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
64+
65+
proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
4866
if (request instanceof GroupListRequest) {
4967
return ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
5068
} else if (request instanceof GroupInfoRequest) {
5169
return ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply());
5270
} else {
5371
return ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
5472
}
73+
}).whenComplete((reply, e) -> {
74+
if (e == null) {
75+
if (reply == null) {
76+
e = new NullPointerException("Both reply==null && e==null");
77+
}
78+
if (e == null) {
79+
e = reply.getNotLeaderException();
80+
}
81+
if (e == null) {
82+
e = reply.getLeaderNotReadyException();
83+
}
84+
}
85+
86+
if (e != null) {
87+
replyFuture.completeExceptionally(e);
88+
} else {
89+
replyFuture.complete(reply);
90+
}
5591
});
92+
93+
scheduler.onTimeout(requestTimeout, () -> {
94+
if (!replyFuture.isDone()) {
95+
final String s = clientId + "->" + serverId + " request #" +
96+
callId + " timeout " + requestTimeout.getDuration();
97+
replyFuture.completeExceptionally(new TimeoutIOException(s));
98+
}
99+
}, LOG, () -> "Timeout check for client request #" + callId);
100+
101+
return replyFuture;
56102
} catch (Throwable e) {
57103
return JavaUtils.completeExceptionally(e);
58104
}

ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
4848
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
4949
import org.apache.ratis.util.JavaUtils;
50+
import org.apache.ratis.util.PlatformUtils;
5051
import org.apache.ratis.util.Slf4jUtils;
5152
import org.apache.ratis.util.TimeDuration;
5253
import org.apache.ratis.util.function.CheckedRunnable;
@@ -83,6 +84,10 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
8384
{
8485
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
8586
SimpleStateMachine4Testing.class, StateMachine.class);
87+
if (!PlatformUtils.LINUX) {
88+
getProperties().setBoolean("raft.netty.server.use-epoll", false);
89+
getProperties().setBoolean("raft.netty.client.use-epoll", false);
90+
}
8691
}
8792

8893
@Test
@@ -282,8 +287,8 @@ public void testStaleReadAsync() throws Exception {
282287

283288
void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
284289
final int numMessages = 10;
285-
try (RaftClient client = cluster.createClient()) {
286-
RaftTestUtil.waitForLeader(cluster);
290+
RaftServer.Division division = waitForLeader(cluster);
291+
try (RaftClient client = cluster.createClient(division.getId())) {
287292

288293
// submit some messages
289294
final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
@@ -304,6 +309,7 @@ void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
304309
// Use a follower with the max commit index
305310
final RaftClientReply lastWriteReply = replies.get(replies.size() - 1);
306311
final RaftPeerId leader = lastWriteReply.getServerId();
312+
Assert.assertEquals(leader, lastWriteReply.getServerId());
307313
LOG.info("leader = " + leader);
308314
final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos();
309315
LOG.info("commitInfos = " + commitInfos);
@@ -366,8 +372,8 @@ public void testWriteAsyncCustomReplicationLevel() throws Exception {
366372

367373
void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws Exception {
368374
final int numMessages = 20;
369-
try (RaftClient client = cluster.createClient()) {
370-
RaftTestUtil.waitForLeader(cluster);
375+
final RaftPeerId leader = waitForLeader(cluster).getId();
376+
try (RaftClient client = cluster.createClient(leader)) {
371377

372378
// submit some messages
373379
for (int i = 0; i < numMessages; i++) {
@@ -417,13 +423,13 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
417423
LOG.info("Running testAppendEntriesTimeout");
418424
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
419425
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS));
420-
waitForLeader(cluster);
426+
final RaftPeerId leader = waitForLeader(cluster).getId();
421427
long time = System.currentTimeMillis();
422428
long waitTime = 5000;
423429
try (final RaftClient client = cluster.createClient()) {
424430
// block append requests
425431
cluster.getServerAliveStream()
426-
.filter(impl -> !impl.getInfo().isLeader())
432+
.filter(impl -> !impl.getInfo().isLeader() && !impl.getPeer().getId().equals(leader))
427433
.map(SimpleStateMachine4Testing::get)
428434
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
429435

@@ -433,7 +439,7 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
433439
Assert.assertFalse(replyFuture.isDone());
434440
// unblock append request.
435441
cluster.getServerAliveStream()
436-
.filter(impl -> !impl.getInfo().isLeader())
442+
.filter(impl -> !impl.getInfo().isLeader() && !impl.getPeer().getId().equals(leader))
437443
.map(SimpleStateMachine4Testing::get)
438444
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
439445

ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ public void testStateMachineMetrics() throws Exception {
457457

458458
static void runTestStateMachineMetrics(boolean async, MiniRaftCluster cluster) throws Exception {
459459
RaftServer.Division leader = waitForLeader(cluster);
460-
try (final RaftClient client = cluster.createClient()) {
460+
try (final RaftClient client = cluster.createClient(leader.getId())) {
461461
Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
462462
STATEMACHINE_APPLIED_INDEX_GAUGE);
463463
Gauge smAppliedIndexGauge = getStatemachineGaugeWithName(leader,

0 commit comments

Comments
 (0)