Skip to content

Commit 7498303

Browse files
committed
transport: pass network channel exceptions to close listeners
Previously, exceptions encountered on a netty channel were caught and logged at some level, but not passed to the TcpChannel or Transport.Connection close listeners. This limited observability. This change implements this exception reporting and passing, with TcpChannel.onException and NodeChannels.closeAndFail reporting exceptions and their close listeners receiving them. Some test infrastructure (FakeTcpChannel) and assertions in close listener onFailure methods have been updated. Closes: ES-11644
1 parent 7d06f81 commit 7498303

File tree

14 files changed

+145
-19
lines changed

14 files changed

+145
-19
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,25 @@ public class Netty4TcpChannel implements TcpChannel {
3434
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
3535
private final ChannelStats stats = new ChannelStats();
3636
private final boolean rstOnClose;
37+
private volatile Exception channelError = null;
3738

3839
Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) {
3940
this.channel = channel;
4041
this.isServer = isServer;
4142
this.profile = profile;
4243
this.connectContext = new ListenableFuture<>();
4344
this.rstOnClose = rstOnClose;
44-
addListener(this.channel.closeFuture(), closeContext);
4545
addListener(connectFuture, connectContext);
46+
// The netty closeFuture is programmed to never return an exception.
47+
// This listener takes close-causing exceptions reported during the
48+
// channel lifetime, and reports them to the closeListener.
49+
addListener(this.channel.closeFuture(), ActionListener.running(() -> {
50+
if (channelError != null) {
51+
closeContext.onFailure(channelError);
52+
} else {
53+
closeContext.onResponse(null);
54+
}
55+
}));
4656
}
4757

4858
@Override
@@ -95,6 +105,11 @@ public void addConnectListener(ActionListener<Void> listener) {
95105
connectContext.addListener(listener);
96106
}
97107

108+
@Override
109+
public void onException(Exception e) {
110+
channelError = e;
111+
}
112+
98113
@Override
99114
public ChannelStats getChannelStats() {
100115
return stats;

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.lucene.util.BytesRef;
2929
import org.elasticsearch.ExceptionsHelper;
3030
import org.elasticsearch.TransportVersion;
31+
import org.elasticsearch.action.ActionListener;
3132
import org.elasticsearch.cluster.node.DiscoveryNode;
3233
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
3334
import org.elasticsearch.common.network.NetworkService;
@@ -281,6 +282,7 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile
281282
rstOnClose,
282283
connectFuture
283284
);
285+
addClosedExceptionLogger(nettyChannel);
284286
channel.attr(CHANNEL_KEY).set(nettyChannel);
285287

286288
return nettyChannel;
@@ -312,14 +314,19 @@ protected class ClientChannelInitializer extends ChannelInitializer<Channel> {
312314

313315
@Override
314316
protected void initChannel(Channel ch) throws Exception {
315-
addClosedExceptionLogger(ch);
316317
assert ch instanceof Netty4NioSocketChannel;
317318
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
318319
setupPipeline(ch, false);
319320
}
320321

321322
@Override
322323
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
324+
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
325+
if (cause instanceof Error) {
326+
channel.onException(new Exception(cause));
327+
} else {
328+
channel.onException((Exception) cause);
329+
}
323330
ExceptionsHelper.maybeDieOnAnotherThread(cause);
324331
super.exceptionCaught(ctx, cause);
325332
}
@@ -337,17 +344,23 @@ protected ServerChannelInitializer(String name) {
337344

338345
@Override
339346
protected void initChannel(Channel ch) throws Exception {
340-
addClosedExceptionLogger(ch);
341347
assert ch instanceof Netty4NioSocketChannel;
342348
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
343349
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture());
350+
addClosedExceptionLogger(nettyTcpChannel);
344351
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
345352
setupPipeline(ch, isRemoteClusterServerChannel);
346353
serverAcceptedChannel(nettyTcpChannel);
347354
}
348355

349356
@Override
350357
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
358+
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
359+
if (cause instanceof Error) {
360+
channel.onException(new Exception(cause));
361+
} else {
362+
channel.onException((Exception) cause);
363+
}
351364
ExceptionsHelper.maybeDieOnAnotherThread(cause);
352365
super.exceptionCaught(ctx, cause);
353366
}
@@ -383,12 +396,12 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
383396
);
384397
}
385398

386-
private static void addClosedExceptionLogger(Channel channel) {
387-
Netty4Utils.addListener(channel.closeFuture(), channelFuture -> {
388-
if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) {
389-
logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause());
399+
private static void addClosedExceptionLogger(Netty4TcpChannel channel) {
400+
channel.addCloseListener(ActionListener.wrap((ignored) -> {}, (e) -> {
401+
if (logger.isDebugEnabled()) {
402+
logger.debug(format("exception while closing channel: %s", channel), e);
390403
}
391-
});
404+
}));
392405
}
393406

394407
@ChannelHandler.Sharable

server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable {
3838
* channel. If the channel is already closed when the listener is added the listener will immediately be
3939
* executed by the thread that is attempting to add the listener.
4040
*
41+
* When the close completes but an exception prompted the closure, the exception will be passed to the
42+
* listener's onFailure method.
43+
*
4144
* @param listener to be executed
4245
*/
4346
void addCloseListener(ActionListener<Void> listener);

server/src/main/java/org/elasticsearch/tasks/TaskManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu
736736
return curr;
737737
});
738738
if (tracker.registered.compareAndSet(false, true)) {
739-
channel.addCloseListener(ActionListener.wrap(r -> {
739+
channel.addCloseListener(ActionListener.running(() -> {
740740
final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel);
741741
assert removedTracker == tracker;
742742
onChannelClosed(tracker);
743-
}, e -> { assert false : new AssertionError("must not be here", e); }));
743+
}));
744744
}
745745
return tracker;
746746
}

server/src/main/java/org/elasticsearch/transport/CloseableConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ public void close() {
4848
}
4949
}
5050

51+
public void closeAndFail(Exception e) {
52+
if (closed.compareAndSet(false, true)) {
53+
closeContext.onFailure(e);
54+
}
55+
}
56+
5157
@Override
5258
public void onRemoved() {
5359
if (removed.compareAndSet(false, true)) {

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message)
390390
() -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel",
391391
e
392392
);
393+
channel.onException(e);
393394
channel.close();
394395
}
395396
}

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ void sendResponse(
168168
),
169169
ex
170170
);
171+
channel.onException(ex);
171172
channel.close();
172173
} else {
173174
sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex);
@@ -204,6 +205,7 @@ void sendErrorResponse(
204205
} catch (Exception sendException) {
205206
sendException.addSuppressed(error);
206207
logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException);
208+
channel.onException(sendException);
207209
channel.close();
208210
}
209211
}
@@ -431,6 +433,7 @@ private void maybeLogSlowMessage(boolean success) {
431433
}
432434
});
433435
} catch (RuntimeException ex) {
436+
channel.onException(ex);
434437
Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel));
435438
throw ex;
436439
}

server/src/main/java/org/elasticsearch/transport/TcpChannel.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel {
6666
*/
6767
void addConnectListener(ActionListener<Void> listener);
6868

69+
/**
70+
* Report an exception on this channel
71+
*
72+
* @param e the exception
73+
*/
74+
void onException(Exception e);
75+
6976
/**
7077
* Returns stats about this channel
7178
*/

server/src/main/java/org/elasticsearch/transport/TcpTransport.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,13 +278,26 @@ public TcpChannel channel(TransportRequestOptions.Type type) {
278278

279279
@Override
280280
public void close() {
281+
handleClose(null);
282+
}
283+
284+
@Override
285+
public void closeAndFail(Exception e) {
286+
handleClose(e);
287+
}
288+
289+
private void handleClose(Exception e) {
281290
if (isClosing.compareAndSet(false, true)) {
282291
try {
283292
boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false;
284293
CloseableChannel.closeChannels(channels, block);
285294
} finally {
286295
// Call the super method to trigger listeners
287-
super.close();
296+
if (e == null) {
297+
super.close();
298+
} else {
299+
super.closeAndFail(e);
300+
}
288301
}
289302
}
290303
}
@@ -760,6 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle
760773
}
761774
} finally {
762775
if (closeChannel) {
776+
channel.onException(e);
763777
CloseableChannel.closeChannel(channel);
764778
}
765779
}
@@ -1120,7 +1134,17 @@ public void onResponse(Void v) {
11201134
nodeChannels.channels.forEach(ch -> {
11211135
// Mark the channel init time
11221136
ch.getChannelStats().markAccessed(relativeMillisTime);
1123-
ch.addCloseListener(ActionListener.running(nodeChannels::close));
1137+
ch.addCloseListener(new ActionListener<Void>() {
1138+
@Override
1139+
public void onResponse(Void ignored) {
1140+
nodeChannels.close();
1141+
}
1142+
1143+
@Override
1144+
public void onFailure(Exception e) {
1145+
nodeChannels.closeAndFail(e);
1146+
}
1147+
});
11241148
});
11251149
keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
11261150
nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime));
@@ -1181,7 +1205,16 @@ public void onResponse(Void ignored) {
11811205

11821206
@Override
11831207
public void onFailure(Exception e) {
1184-
assert false : e; // never called
1208+
long closeTimeMillis = threadPool.relativeTimeInMillis();
1209+
logger.info(
1210+
() -> format(
1211+
"closed transport connection [{}] to [{}] with age [{}ms], exception:",
1212+
connectionId,
1213+
node,
1214+
closeTimeMillis - openTimeMillis
1215+
),
1216+
e
1217+
);
11851218
}
11861219
}
11871220

server/src/main/java/org/elasticsearch/transport/Transport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,9 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp
113113
TransportException;
114114

115115
/**
116-
* The listener's {@link ActionListener#onResponse(Object)} method will be called when this
117-
* connection is closed. No implementations currently throw an exception during close, so
118-
* {@link ActionListener#onFailure(Exception)} will not be called.
116+
* The listener will be called when this connection has completed closing. The {@link ActionListener#onResponse(Object)} method
117+
* will be called when the connection closed gracefully, and the {@link ActionListener#onFailure(Exception)} method will be called
118+
* when the connection has successfully closed, but an exception has prompted the close.
119119
*
120120
* @param listener to be called
121121
*/

0 commit comments

Comments
 (0)