diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 420fe16dcb689..195bf1e870486 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -34,6 +34,7 @@ public class Netty4TcpChannel implements TcpChannel { private final ListenableFuture closeContext = new ListenableFuture<>(); private final ChannelStats stats = new ChannelStats(); private final boolean rstOnClose; + private volatile Exception channelError = null; Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) { this.channel = channel; @@ -41,8 +42,14 @@ public class Netty4TcpChannel implements TcpChannel { this.profile = profile; this.connectContext = new ListenableFuture<>(); this.rstOnClose = rstOnClose; - addListener(this.channel.closeFuture(), closeContext); addListener(connectFuture, connectContext); + addListener(this.channel.closeFuture(), ActionListener.running(() -> { + if (channelError != null) { + closeContext.onFailure(channelError); + } else { + closeContext.onResponse(null); + } + })); } @Override @@ -95,6 +102,11 @@ public void addConnectListener(ActionListener listener) { connectContext.addListener(listener); } + @Override + public void onException(Exception e) { + channelError = e; + } + @Override public ChannelStats getChannelStats() { return stats; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d8b02a0e9a0df..4b079b89bf7fe 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -28,6 +28,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -281,8 +282,8 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile rstOnClose, connectFuture ); + addClosedExceptionLogger(nettyChannel); channel.attr(CHANNEL_KEY).set(nettyChannel); - return nettyChannel; } @@ -312,7 +313,6 @@ protected class ClientChannelInitializer extends ChannelInitializer { @Override protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); assert ch instanceof Netty4NioSocketChannel; NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); setupPipeline(ch, false); @@ -320,6 +320,12 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); + if (cause instanceof Error) { + channel.onException(new Exception(cause)); + } else { + channel.onException((Exception) cause); + } ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -337,10 +343,10 @@ protected ServerChannelInitializer(String name) { @Override protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); assert ch instanceof Netty4NioSocketChannel; NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture()); + addClosedExceptionLogger(nettyTcpChannel); ch.attr(CHANNEL_KEY).set(nettyTcpChannel); setupPipeline(ch, isRemoteClusterServerChannel); serverAcceptedChannel(nettyTcpChannel); @@ -348,6 +354,12 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); + if (cause instanceof Error) { + channel.onException(new Exception(cause)); + } else { + channel.onException((Exception) cause); + } ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -383,12 +395,12 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster ); } - private static void addClosedExceptionLogger(Channel channel) { - Netty4Utils.addListener(channel.closeFuture(), channelFuture -> { - if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) { - logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause()); + private static void addClosedExceptionLogger(Netty4TcpChannel channel) { + channel.addCloseListener(ActionListener.wrap((ignored) -> {}, (e) -> { + if (logger.isDebugEnabled()) { + logger.debug(format("exception while closing channel: %s", channel), e); } - }); + })); } @ChannelHandler.Sharable diff --git a/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java index 0ca3182854f81..a7e24ada01dba 100644 --- a/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java +++ b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java @@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable { * channel. If the channel is already closed when the listener is added the listener will immediately be * executed by the thread that is attempting to add the listener. * + * When the close completes but an exception prompted the closure, the exception will be passed to the + * listener's onFailure method. + * * @param listener to be executed */ void addCloseListener(ActionListener listener); diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index 7e193567dc9fe..d1cf4bd799e4b 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu return curr; }); if (tracker.registered.compareAndSet(false, true)) { - channel.addCloseListener(ActionListener.wrap(r -> { + channel.addCloseListener(ActionListener.running(() -> { final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel); assert removedTracker == tracker; onChannelClosed(tracker); - }, e -> { assert false : new AssertionError("must not be here", e); })); + })); } return tracker; } diff --git a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java index 6148ed6121125..61d7869aec326 100644 --- a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java @@ -48,6 +48,12 @@ public void close() { } } + public void closeAndFail(Exception e) { + if (closed.compareAndSet(false, true)) { + closeContext.onFailure(e); + } + } + @Override public void onRemoved() { if (removed.compareAndSet(false, true)) { diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index f8a798e15f282..0c0c32656a837 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -47,6 +47,12 @@ public class ClusterConnectionManager implements ConnectionManager { .newConcurrentMap(); private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete); + record NodeConnectionHistory(String ephemeralId, long disconnectTime) {} + + // map of nodeId -> NodeConnectionHistory entries updated on connection close, with any error + private final ConcurrentMap nodeHistory = ConcurrentCollections.newConcurrentMap(); + private long nodeHistoryLastGC = 0; + private final Transport transport; private final ThreadContext threadContext; private final ConnectionProfile defaultProfile; @@ -226,6 +232,19 @@ private void connectToNodeOrRetry( } else { logger.debug("connected to node [{}]", node); managerRefs.mustIncRef(); + + // log case where remote has same ephemeralId as previous connection (the network was disrupted, but not the + // remote process), and update history with removal (we just connected successfully) + final DiscoveryNode connNode = conn.getNode(); + NodeConnectionHistory hist = nodeHistory.remove(connNode.getId()); + if (hist != null && hist.ephemeralId.equals(connNode.getEphemeralId())) { + logger.warn( + """ + transport connection to [{}] reopened, with same ephemeralId found.""", + node.descriptionWithoutAttributes() + ); + } + try { connectionListener.onNodeConnected(node, conn); } finally { @@ -235,25 +254,75 @@ private void connectToNodeOrRetry( managerRefs.decRef(); })); - conn.addCloseListener(ActionListener.running(() -> { - if (connectingRefCounter.hasReferences() == false) { - logger.trace("connection manager shut down, closing transport connection to [{}]", node); - } else if (conn.hasReferences()) { - logger.info( - """ - transport connection to [{}] closed by remote; \ - if unexpected, see [{}] for troubleshooting guidance""", - node.descriptionWithoutAttributes(), - ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + conn.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + final NodeConnectionHistory hist = new NodeConnectionHistory( + node.getEphemeralId(), + System.currentTimeMillis() ); - // In production code we only close connections via ref-counting, so this message confirms that a - // 'node-left ... reason: disconnected' event was caused by external factors. Put differently, if a - // node leaves the cluster with "reason: disconnected" but without this message being logged then - // that's a bug. - } else { - logger.debug("closing unused transport connection to [{}]", node); + nodeHistory.put(conn.getNode().getId(), hist); } - })); + + @Override + public void onFailure(Exception e) { + final NodeConnectionHistory hist = new NodeConnectionHistory( + node.getEphemeralId(), + System.currentTimeMillis() + ); + nodeHistory.put(conn.getNode().getId(), hist); + } + }); + + conn.addCloseListener(ActionListener.running(ClusterConnectionManager.this::collectHistoryGarbage)); + + conn.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + if (connectingRefCounter.hasReferences() == false) { + logger.trace("connection manager shut down, closing transport connection to [{}]", node); + } else if (conn.hasReferences()) { + // the connection is closing down, but hasn't been released by the client/library side + // this is an event coming up from the network side + logger.info( + """ + transport connection to [{}] closed by remote; \ + if unexpected, see [{}] for troubleshooting guidance""", + node.descriptionWithoutAttributes(), + ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + ); + // In production code we only close connections via ref-counting, so this message confirms that + // a 'node-left ... reason: disconnected' event was caused by external factors. Put + // differently, if a node leaves the cluster with "reason: disconnected" but without this + // message being logged then that's a bug. + } else { + logger.debug("closing unused transport connection to [{}]", node); + } + } + + @Override + public void onFailure(Exception e) { + if (conn.hasReferences()) { + logger.warn( + """ + transport connection to [{}] closed by remote with exception [{}]; \ + if unexpected, see [{}] for troubleshooting guidance""", + node.descriptionWithoutAttributes(), + e, + ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + ); + } else { + logger.warn( + """ + transport connection to [{}] closed with exception [{}]; \ + if unexpected, see [{}] for troubleshooting guidance""", + node.descriptionWithoutAttributes(), + e, + ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + ); + } + } + }); } } } finally { @@ -276,6 +345,26 @@ private void connectToNodeOrRetry( ); } + /** + * Removes entries in the nodeHistory table that are too old + */ + private void collectHistoryGarbage() { + final long now = System.currentTimeMillis(); + final long hour = 60 * 60 * 1000; + + if (now - hour > nodeHistoryLastGC) { + final int startSize = nodeHistory.size(); + nodeHistoryLastGC = now; + final long expire = now - hour; + for (Map.Entry entry : nodeHistory.entrySet()) { + if (expire > entry.getValue().disconnectTime) { + nodeHistory.remove(entry.getKey(), entry.getValue()); + } + } + logger.trace("ClusterConnectionManager GCed connection history from {} to {} entries", startSize, nodeHistory.size()); + } + } + /** * Returns a connection for the given node if the node is connected. * Connections returned from this method must not be closed. The lifecycle of this connection is diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index ee4e5e31c584a..268139a836c73 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -390,6 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message) () -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel", e ); + channel.onException(e); channel.close(); } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index e198b8bd19bcc..a33a257f4785f 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -168,6 +168,7 @@ void sendResponse( ), ex ); + channel.onException(ex); channel.close(); } else { sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex); @@ -204,6 +205,7 @@ void sendErrorResponse( } catch (Exception sendException) { sendException.addSuppressed(error); logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException); + channel.onException(sendException); channel.close(); } } @@ -431,6 +433,7 @@ private void maybeLogSlowMessage(boolean success) { } }); } catch (RuntimeException ex) { + channel.onException(ex); Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel)); throw ex; } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java index 6e39d388a40b9..d760ce670a41a 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel { */ void addConnectListener(ActionListener listener); + /** + * Report an exception on this channel + * + * @param e the exception + */ + void onException(Exception e); + /** * Returns stats about this channel */ diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 9e2fbe737a81d..70818d37d16ac 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -278,13 +278,26 @@ public TcpChannel channel(TransportRequestOptions.Type type) { @Override public void close() { + handleClose(null); + } + + @Override + public void closeAndFail(Exception e) { + handleClose(e); + } + + private void handleClose(Exception e) { if (isClosing.compareAndSet(false, true)) { try { boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false; CloseableChannel.closeChannels(channels, block); } finally { // Call the super method to trigger listeners - super.close(); + if (e == null) { + super.close(); + } else { + super.closeAndFail(e); + } } } } @@ -760,6 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle } } finally { if (closeChannel) { + channel.onException(e); CloseableChannel.closeChannel(channel); } } @@ -1120,7 +1134,17 @@ public void onResponse(Void v) { nodeChannels.channels.forEach(ch -> { // Mark the channel init time ch.getChannelStats().markAccessed(relativeMillisTime); - ch.addCloseListener(ActionListener.running(nodeChannels::close)); + ch.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + nodeChannels.close(); + } + + @Override + public void onFailure(Exception e) { + nodeChannels.closeAndFail(e); + } + }); }); keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile); nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime)); @@ -1181,7 +1205,16 @@ public void onResponse(Void ignored) { @Override public void onFailure(Exception e) { - assert false : e; // never called + long closeTimeMillis = threadPool.relativeTimeInMillis(); + logger.info( + () -> format( + "closed transport connection [{}] to [{}] with age [{}ms], exception:", + connectionId, + node, + closeTimeMillis - openTimeMillis + ), + e + ); } } diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index a1d35ce3f255a..13b2752c929bb 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -113,9 +113,9 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp TransportException; /** - * The listener's {@link ActionListener#onResponse(Object)} method will be called when this - * connection is closed. No implementations currently throw an exception during close, so - * {@link ActionListener#onFailure(Exception)} will not be called. + * The listener will be called when this connection has completed closing. The {@link ActionListener#onResponse(Object)} method + * will be called when the connection closed gracefully, and the {@link ActionListener#onFailure(Exception)} method will be called + * when the connection has successfully closed, but an exception has prompted the close. * * @param listener to be called */ diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 7099c33dda75f..80982c4f80d59 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; /** Unit tests for {@link TcpTransport} */ @@ -606,7 +605,10 @@ private void testExceptionHandling( if (expectClosed) { assertTrue(listener.isDone()); - assertThat(listener.actionGet(), nullValue()); + try { + listener.get(); + assert false : "channel should have an exception reported"; + } catch (Exception e) {} } else { assertFalse(listener.isDone()); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java index c4c7a4f16da84..503fb48e5c95d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java @@ -29,6 +29,8 @@ public class FakeTcpChannel implements TcpChannel { private final AtomicReference messageCaptor; private final AtomicReference> listenerCaptor; + private Exception channelError = null; + public FakeTcpChannel() { this(false, "profile", new AtomicReference<>()); } @@ -94,10 +96,19 @@ public void addConnectListener(ActionListener listener) { @Override public void close() { if (closed.compareAndSet(false, true)) { - closeContext.onResponse(null); + if (channelError != null) { + closeContext.onFailure(channelError); + } else { + closeContext.onResponse(null); + } } } + @Override + public void onException(Exception e) { + channelError = e; + } + @Override public void addCloseListener(ActionListener listener) { closeContext.addListener(listener);