diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java index 0ddf0e5513feb..9d054839849de 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java @@ -96,7 +96,7 @@ public void testConnectionLogging() throws IOException { "close connection log", TcpTransport.class.getCanonicalName(), Level.DEBUG, - ".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\].*" + ".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\]$" ) ); @@ -105,4 +105,24 @@ public void testConnectionLogging() throws IOException { mockLog.assertAllExpectationsMatched(); } + + @TestLogging( + value = "org.elasticsearch.transport.TcpTransport:DEBUG", + reason = "to ensure we log exception disconnect events on DEBUG level" + ) + public void testExceptionalDisconnectLogging() throws Exception { + mockLog.addExpectation( + new MockLog.PatternSeenEventExpectation( + "exceptional close connection log", + TcpTransport.class.getCanonicalName(), + Level.DEBUG, + ".*closed transport connection \\[[1-9][0-9]*\\] to .* with age \\[[0-9]+ms\\], exception:.*" + ) + ); + + final String nodeName = internalCluster().startNode(); + internalCluster().restartNode(nodeName); + + mockLog.assertAllExpectationsMatched(); + } } diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 420fe16dcb689..9b8fd6ff2d116 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -34,6 +34,10 @@ public class Netty4TcpChannel implements TcpChannel { private final ListenableFuture closeContext = new ListenableFuture<>(); private final ChannelStats stats = new ChannelStats(); private final boolean rstOnClose; + /** + * Exception causing a close, reported to the {@link #closeContext} listener + */ + private volatile Exception closeException = null; Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) { this.channel = channel; @@ -41,8 +45,22 @@ public class Netty4TcpChannel implements TcpChannel { this.profile = profile; this.connectContext = new ListenableFuture<>(); this.rstOnClose = rstOnClose; - addListener(this.channel.closeFuture(), closeContext); addListener(connectFuture, connectContext); + addListener(this.channel.closeFuture(), new ActionListener<>() { + @Override + public void onResponse(Void ignored) { + if (closeException != null) { + closeContext.onFailure(closeException); + } else { + closeContext.onResponse(null); + } + } + + @Override + public void onFailure(Exception e) { + assert false : new AssertionError("netty channel closeFuture should never report a failure"); + } + }); } @Override @@ -95,6 +113,11 @@ public void addConnectListener(ActionListener listener) { connectContext.addListener(listener); } + @Override + public void setCloseException(Exception e) { + closeException = e; + } + @Override public ChannelStats getChannelStats() { return stats; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d8b02a0e9a0df..14607f563f1c6 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -54,7 +54,6 @@ import java.util.Map; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE; import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED; @@ -308,11 +307,18 @@ protected void stopInternal() { }, serverBootstraps::clear, () -> clientBootstrap = null); } + static Exception exceptionFromThrowable(Throwable cause) { + if (cause instanceof Error) { + return new Exception(cause); + } else { + return (Exception) cause; + } + } + protected class ClientChannelInitializer extends ChannelInitializer { @Override protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); assert ch instanceof Netty4NioSocketChannel; NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); setupPipeline(ch, false); @@ -320,6 +326,8 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); + channel.setCloseException(exceptionFromThrowable(cause)); ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -337,7 +345,6 @@ protected ServerChannelInitializer(String name) { @Override protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); assert ch instanceof Netty4NioSocketChannel; NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture()); @@ -348,6 +355,8 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); + channel.setCloseException(exceptionFromThrowable(cause)); ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -383,14 +392,6 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster ); } - private static void addClosedExceptionLogger(Channel channel) { - Netty4Utils.addListener(channel.closeFuture(), channelFuture -> { - if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) { - logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause()); - } - }); - } - @ChannelHandler.Sharable private static class ServerChannelExceptionHandler extends ChannelInboundHandlerAdapter { @@ -398,11 +399,7 @@ private static class ServerChannelExceptionHandler extends ChannelInboundHandler public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ExceptionsHelper.maybeDieOnAnotherThread(cause); Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get(); - if (cause instanceof Error) { - onServerException(serverChannel, new Exception(cause)); - } else { - onServerException(serverChannel, (Exception) cause); - } + onServerException(serverChannel, exceptionFromThrowable(cause)); } } } diff --git a/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java index 0ca3182854f81..a7e24ada01dba 100644 --- a/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java +++ b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java @@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable { * channel. If the channel is already closed when the listener is added the listener will immediately be * executed by the thread that is attempting to add the listener. * + * When the close completes but an exception prompted the closure, the exception will be passed to the + * listener's onFailure method. + * * @param listener to be executed */ void addCloseListener(ActionListener listener); diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index 7e193567dc9fe..d1cf4bd799e4b 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu return curr; }); if (tracker.registered.compareAndSet(false, true)) { - channel.addCloseListener(ActionListener.wrap(r -> { + channel.addCloseListener(ActionListener.running(() -> { final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel); assert removedTracker == tracker; onChannelClosed(tracker); - }, e -> { assert false : new AssertionError("must not be here", e); })); + })); } return tracker; } diff --git a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java index 6148ed6121125..61d7869aec326 100644 --- a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java @@ -48,6 +48,12 @@ public void close() { } } + public void closeAndFail(Exception e) { + if (closed.compareAndSet(false, true)) { + closeContext.onFailure(e); + } + } + @Override public void onRemoved() { if (removed.compareAndSet(false, true)) { diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 20055ee92f187..b58b9a4afcc0c 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -358,6 +358,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message) () -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel", e ); + channel.setCloseException(e); channel.close(); } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index d309bb1014bae..70b032a4cd4ce 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -166,6 +166,7 @@ void sendResponse( ), ex ); + channel.setCloseException(ex); channel.close(); } else { sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex); @@ -202,6 +203,7 @@ void sendErrorResponse( } catch (Exception sendException) { sendException.addSuppressed(error); logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException); + channel.setCloseException(sendException); channel.close(); } } @@ -457,6 +459,7 @@ private void maybeLogSlowMessage(boolean success) { } }); } catch (RuntimeException ex) { + channel.setCloseException(ex); Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel)); throw ex; } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java index 6e39d388a40b9..00014e715afa2 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel { */ void addConnectListener(ActionListener listener); + /** + * Report a close-causing exception on this channel + * + * @param e the exception + */ + void setCloseException(Exception e); + /** * Returns stats about this channel */ diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 5eb51d3cadcc6..a71ae49d6e648 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -278,13 +278,26 @@ public TcpChannel channel(TransportRequestOptions.Type type) { @Override public void close() { + handleClose(null); + } + + @Override + public void closeAndFail(Exception e) { + handleClose(e); + } + + private void handleClose(Exception e) { if (isClosing.compareAndSet(false, true)) { try { boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false; CloseableChannel.closeChannels(channels, block); } finally { // Call the super method to trigger listeners - super.close(); + if (e == null) { + super.close(); + } else { + super.closeAndFail(e); + } } } } @@ -760,6 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle } } finally { if (closeChannel) { + channel.setCloseException(e); CloseableChannel.closeChannel(channel); } } @@ -1115,7 +1129,17 @@ public void onResponse(Void v) { nodeChannels.channels.forEach(ch -> { // Mark the channel init time ch.getChannelStats().markAccessed(relativeMillisTime); - ch.addCloseListener(ActionListener.running(nodeChannels::close)); + ch.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + nodeChannels.close(); + } + + @Override + public void onFailure(Exception e) { + nodeChannels.closeAndFail(e); + } + }); }); keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile); nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime)); @@ -1176,7 +1200,16 @@ public void onResponse(Void ignored) { @Override public void onFailure(Exception e) { - assert false : e; // never called + long closeTimeMillis = threadPool.relativeTimeInMillis(); + logger.debug( + () -> format( + "closed transport connection [%d] to [%s] with age [%dms], exception:", + connectionId, + node, + closeTimeMillis - openTimeMillis + ), + e + ); } } diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index a1d35ce3f255a..13b2752c929bb 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -113,9 +113,9 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp TransportException; /** - * The listener's {@link ActionListener#onResponse(Object)} method will be called when this - * connection is closed. No implementations currently throw an exception during close, so - * {@link ActionListener#onFailure(Exception)} will not be called. + * The listener will be called when this connection has completed closing. The {@link ActionListener#onResponse(Object)} method + * will be called when the connection closed gracefully, and the {@link ActionListener#onFailure(Exception)} method will be called + * when the connection has successfully closed, but an exception has prompted the close. * * @param listener to be called */ diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java index 48fd4208f2829..32171db433efd 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java @@ -55,6 +55,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -107,6 +108,8 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti DiscoveryNode node = DiscoveryNodeUtils.create("", new TransportAddress(InetAddress.getLoopbackAddress(), 0)); Transport.Connection connection = new TestConnect(node); + PlainActionFuture closeListener = new PlainActionFuture<>(); + connection.addCloseListener(closeListener); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; @@ -137,6 +140,7 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti connection.close(); } assertTrue(connection.isClosed()); + assertThat(closeListener.actionGet(), nullValue()); assertEquals(0, connectionManager.size()); assertEquals(1, nodeConnectedCount.get()); assertEquals(1, nodeDisconnectedCount.get()); diff --git a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java index bfb98a5377d28..5c63616215d66 100644 --- a/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/InboundHandlerTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.ReleasableBytesReference; @@ -239,6 +240,9 @@ public void testClosesChannelOnErrorInHandshake() throws Exception { final AtomicBoolean isClosed = new AtomicBoolean(); channel.addCloseListener(ActionListener.running(() -> assertTrue(isClosed.compareAndSet(false, true)))); + PlainActionFuture closeListener = new PlainActionFuture<>(); + channel.addCloseListener(closeListener); + final TransportVersion remoteVersion = TransportVersionUtils.randomVersionBetween( random(), TransportVersionUtils.getFirstVersion(), @@ -256,6 +260,8 @@ public void testClosesChannelOnErrorInHandshake() throws Exception { requestHeader.headers = Tuple.tuple(Map.of(), Map.of()); handler.inboundMessage(channel, requestMessage); assertTrue(isClosed.get()); + assertTrue(closeListener.isDone()); + expectThrows(Exception.class, () -> closeListener.get()); assertNull(channel.getMessageCaptor().get()); mockLog.assertAllExpectationsMatched(); } diff --git a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java index 813e5d70ab3a2..d9882f3bb497d 100644 --- a/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/OutboundHandlerTests.java @@ -67,6 +67,7 @@ public class OutboundHandlerTests extends ESTestCase { private InboundPipeline pipeline; private OutboundHandler handler; private FakeTcpChannel channel; + private PlainActionFuture closeListener; private DiscoveryNode node; private Compression.Scheme compressionScheme; @@ -74,6 +75,8 @@ public class OutboundHandlerTests extends ESTestCase { public void setUp() throws Exception { super.setUp(); channel = new FakeTcpChannel(randomBoolean(), buildNewFakeTransportAddress().address(), buildNewFakeTransportAddress().address()); + closeListener = new PlainActionFuture<>(); + channel.addCloseListener(closeListener); TransportAddress transportAddress = buildNewFakeTransportAddress(); node = DiscoveryNodeUtils.create("", transportAddress); StatsTracker statsTracker = new StatsTracker(); @@ -379,6 +382,7 @@ public void onResponseSent(long requestId, String action, Exception error) { assertThat(rme.getCause().getMessage(), equalTo("simulated cbe")); } assertTrue(channel.isOpen()); + assertFalse(closeListener.isDone()); } public void testFailToSendResponseThenFailToSendError() { @@ -389,6 +393,8 @@ public void sendMessage(BytesReference reference, ActionListener listener) throw new IllegalStateException("pipe broken"); } }; + closeListener = new PlainActionFuture<>(); + channel.addCloseListener(closeListener); TransportVersion version = TransportVersionUtils.randomVersion(); String action = randomAlphaOfLength(10); long requestId = randomLongBetween(0, 300); @@ -432,6 +438,8 @@ public void writeTo(StreamOutput out) { assertNull(channel.getMessageCaptor().get()); assertNull(channel.getListenerCaptor().get()); assertFalse(channel.isOpen()); + assertTrue(closeListener.isDone()); + expectThrows(Exception.class, () -> closeListener.get()); } public void testFailToSendHandshakeResponse() { @@ -474,6 +482,8 @@ public void onResponseSent(long requestId, String action, Exception error) { assertEquals(action, actionRef.get()); assertTrue(response.released.get()); assertFalse(channel.isOpen()); + assertTrue(closeListener.isDone()); + expectThrows(Exception.class, () -> closeListener.get()); } public void testFailToSendErrorResponse() { @@ -484,6 +494,8 @@ public void sendMessage(BytesReference reference, ActionListener listener) throw new IllegalStateException("pipe broken"); } }; + closeListener = new PlainActionFuture<>(); + channel.addCloseListener(closeListener); TransportVersion version = TransportVersionUtils.randomVersion(); String action = randomAlphaOfLength(10); long requestId = randomLongBetween(0, 300); @@ -516,6 +528,8 @@ public void onResponseSent(long requestId, String action, Exception error) { assertFalse(channel.isOpen()); assertNull(channel.getMessageCaptor().get()); assertNull(channel.getListenerCaptor().get()); + assertTrue(closeListener.isDone()); + expectThrows(Exception.class, () -> closeListener.get()); } /** diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 7099c33dda75f..65fa39b87178f 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; /** Unit tests for {@link TcpTransport} */ @@ -606,7 +605,7 @@ private void testExceptionHandling( if (expectClosed) { assertTrue(listener.isDone()); - assertThat(listener.actionGet(), nullValue()); + expectThrows(Exception.class, () -> listener.get()); } else { assertFalse(listener.isDone()); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java index c4c7a4f16da84..e648d7baecf99 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java @@ -29,6 +29,8 @@ public class FakeTcpChannel implements TcpChannel { private final AtomicReference messageCaptor; private final AtomicReference> listenerCaptor; + private volatile Exception closeException = null; + public FakeTcpChannel() { this(false, "profile", new AtomicReference<>()); } @@ -94,10 +96,19 @@ public void addConnectListener(ActionListener listener) { @Override public void close() { if (closed.compareAndSet(false, true)) { - closeContext.onResponse(null); + if (closeException != null) { + closeContext.onFailure(closeException); + } else { + closeContext.onResponse(null); + } } } + @Override + public void setCloseException(Exception e) { + closeException = e; + } + @Override public void addCloseListener(ActionListener listener) { closeContext.addListener(listener);