Skip to content

Commit c69361c

Browse files
authored
RATIS-2349. NettyClient#writeAndFlush should support throwing AlreadyClosedException (#1303)
1 parent 2d82b4c commit c69361c

File tree

2 files changed

+20
-5
lines changed

2 files changed

+20
-5
lines changed

ratis-netty/src/main/java/org/apache/ratis/netty/NettyClient.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.ratis.netty;
1919

20+
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
2021
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
2122
import org.apache.ratis.thirdparty.io.netty.channel.Channel;
2223
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
@@ -64,9 +65,19 @@ public void close() {
6465
lifeCycle.checkStateAndClose(() -> NettyUtils.closeChannel(channel, serverAddress));
6566
}
6667

67-
public ChannelFuture writeAndFlush(Object msg) {
68-
lifeCycle.assertCurrentState(LifeCycle.States.RUNNING);
69-
return channel.writeAndFlush(msg);
68+
public ChannelFuture writeAndFlush(Object msg) throws AlreadyClosedException {
69+
final LifeCycle.State state = lifeCycle.getCurrentState();
70+
if (state.isRunning()) {
71+
return channel.writeAndFlush(msg);
72+
}
73+
// For CLOSING, CLOSED, and EXCEPTION states, throw AlreadyClosedException to trigger reconnection
74+
if (state.isClosingOrClosed() || state == LifeCycle.State.EXCEPTION) {
75+
throw new AlreadyClosedException(
76+
"Client is closed or failed: state=" + state + ", channel=" + channel);
77+
}
78+
// For other states (NEW, STARTING, PAUSING, PAUSED), this is a programming error
79+
throw new IllegalStateException("Client is in unexpected state for writeAndFlush: " +
80+
"state=" + state + ", channel=" + channel);
7081
}
7182

7283
@Override

ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
157157
}
158158

159159
synchronized ChannelFuture offer(RaftNettyServerRequestProto request,
160-
CompletableFuture<RaftNettyServerReplyProto> reply) {
160+
CompletableFuture<RaftNettyServerReplyProto> reply) throws AlreadyClosedException {
161161
replies.offer(reply);
162162
return client.writeAndFlush(request);
163163
}
@@ -199,7 +199,11 @@ public void close() {
199199

200200
public CompletableFuture<RaftNettyServerReplyProto> sendAsync(RaftNettyServerRequestProto proto) {
201201
final CompletableFuture<RaftNettyServerReplyProto> reply = new CompletableFuture<>();
202-
connection.offer(proto, reply);
202+
try {
203+
connection.offer(proto, reply);
204+
} catch (AlreadyClosedException e) {
205+
reply.completeExceptionally(e);
206+
}
203207
return reply;
204208
}
205209

0 commit comments

Comments
 (0)