From 1584302e15b32f1dac73daaae014d001fe225c40 Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 29 Apr 2025 17:03:43 -0700 Subject: [PATCH 1/2] transport: pass network channel exceptions to close listeners Previously, exceptions encountered on a netty channel were caught and logged at some level, but not passed to the TcpChannel or Transport.Connection close listeners. This limited observability. This change implements this exception reporting and passing, with TcpChannel.onException and NodeChannels.closeAndFail reporting exceptions and their close listeners receiving them. Some test infrastructure (FakeTcpChannel) and assertions in close listener onFailure methods have been updated. --- .../transport/netty4/Netty4TcpChannel.java | 14 ++++++- .../transport/netty4/Netty4Transport.java | 28 +++++++++---- .../common/network/CloseableChannel.java | 3 ++ .../org/elasticsearch/tasks/TaskManager.java | 4 +- .../transport/CloseableConnection.java | 6 +++ .../transport/InboundHandler.java | 1 + .../transport/OutboundHandler.java | 3 ++ .../elasticsearch/transport/TcpChannel.java | 7 ++++ .../elasticsearch/transport/TcpTransport.java | 39 +++++++++++++++++-- .../elasticsearch/transport/Transport.java | 6 +-- .../transport/TcpTransportTests.java | 6 ++- .../transport/FakeTcpChannel.java | 13 ++++++- 12 files changed, 110 insertions(+), 20 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 420fe16dcb689..195bf1e870486 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -34,6 +34,7 @@ public class Netty4TcpChannel implements TcpChannel { private final ListenableFuture closeContext = new ListenableFuture<>(); private final ChannelStats stats = new ChannelStats(); private final boolean rstOnClose; + private volatile Exception channelError = null; Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) { this.channel = channel; @@ -41,8 +42,14 @@ public class Netty4TcpChannel implements TcpChannel { this.profile = profile; this.connectContext = new ListenableFuture<>(); this.rstOnClose = rstOnClose; - addListener(this.channel.closeFuture(), closeContext); addListener(connectFuture, connectContext); + addListener(this.channel.closeFuture(), ActionListener.running(() -> { + if (channelError != null) { + closeContext.onFailure(channelError); + } else { + closeContext.onResponse(null); + } + })); } @Override @@ -95,6 +102,11 @@ public void addConnectListener(ActionListener listener) { connectContext.addListener(listener); } + @Override + public void onException(Exception e) { + channelError = e; + } + @Override public ChannelStats getChannelStats() { return stats; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index d8b02a0e9a0df..4b079b89bf7fe 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -28,6 +28,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; @@ -281,8 +282,8 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile rstOnClose, connectFuture ); + addClosedExceptionLogger(nettyChannel); channel.attr(CHANNEL_KEY).set(nettyChannel); - return nettyChannel; } @@ -312,7 +313,6 @@ protected class ClientChannelInitializer extends ChannelInitializer { @Override protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); assert ch instanceof Netty4NioSocketChannel; NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); setupPipeline(ch, false); @@ -320,6 +320,12 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); + if (cause instanceof Error) { + channel.onException(new Exception(cause)); + } else { + channel.onException((Exception) cause); + } ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -337,10 +343,10 @@ protected ServerChannelInitializer(String name) { @Override protected void initChannel(Channel ch) throws Exception { - addClosedExceptionLogger(ch); assert ch instanceof Netty4NioSocketChannel; NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel()); Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture()); + addClosedExceptionLogger(nettyTcpChannel); ch.attr(CHANNEL_KEY).set(nettyTcpChannel); setupPipeline(ch, isRemoteClusterServerChannel); serverAcceptedChannel(nettyTcpChannel); @@ -348,6 +354,12 @@ protected void initChannel(Channel ch) throws Exception { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get(); + if (cause instanceof Error) { + channel.onException(new Exception(cause)); + } else { + channel.onException((Exception) cause); + } ExceptionsHelper.maybeDieOnAnotherThread(cause); super.exceptionCaught(ctx, cause); } @@ -383,12 +395,12 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster ); } - private static void addClosedExceptionLogger(Channel channel) { - Netty4Utils.addListener(channel.closeFuture(), channelFuture -> { - if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) { - logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause()); + private static void addClosedExceptionLogger(Netty4TcpChannel channel) { + channel.addCloseListener(ActionListener.wrap((ignored) -> {}, (e) -> { + if (logger.isDebugEnabled()) { + logger.debug(format("exception while closing channel: %s", channel), e); } - }); + })); } @ChannelHandler.Sharable diff --git a/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java index 0ca3182854f81..a7e24ada01dba 100644 --- a/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java +++ b/server/src/main/java/org/elasticsearch/common/network/CloseableChannel.java @@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable { * channel. If the channel is already closed when the listener is added the listener will immediately be * executed by the thread that is attempting to add the listener. * + * When the close completes but an exception prompted the closure, the exception will be passed to the + * listener's onFailure method. + * * @param listener to be executed */ void addCloseListener(ActionListener listener); diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index 7e193567dc9fe..d1cf4bd799e4b 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu return curr; }); if (tracker.registered.compareAndSet(false, true)) { - channel.addCloseListener(ActionListener.wrap(r -> { + channel.addCloseListener(ActionListener.running(() -> { final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel); assert removedTracker == tracker; onChannelClosed(tracker); - }, e -> { assert false : new AssertionError("must not be here", e); })); + })); } return tracker; } diff --git a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java index 6148ed6121125..61d7869aec326 100644 --- a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java @@ -48,6 +48,12 @@ public void close() { } } + public void closeAndFail(Exception e) { + if (closed.compareAndSet(false, true)) { + closeContext.onFailure(e); + } + } + @Override public void onRemoved() { if (removed.compareAndSet(false, true)) { diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index ee4e5e31c584a..268139a836c73 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -390,6 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message) () -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel", e ); + channel.onException(e); channel.close(); } } diff --git a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java index e198b8bd19bcc..a33a257f4785f 100644 --- a/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/OutboundHandler.java @@ -168,6 +168,7 @@ void sendResponse( ), ex ); + channel.onException(ex); channel.close(); } else { sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex); @@ -204,6 +205,7 @@ void sendErrorResponse( } catch (Exception sendException) { sendException.addSuppressed(error); logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException); + channel.onException(sendException); channel.close(); } } @@ -431,6 +433,7 @@ private void maybeLogSlowMessage(boolean success) { } }); } catch (RuntimeException ex) { + channel.onException(ex); Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel)); throw ex; } diff --git a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java index 6e39d388a40b9..d760ce670a41a 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpChannel.java @@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel { */ void addConnectListener(ActionListener listener); + /** + * Report an exception on this channel + * + * @param e the exception + */ + void onException(Exception e); + /** * Returns stats about this channel */ diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index 9e2fbe737a81d..70818d37d16ac 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -278,13 +278,26 @@ public TcpChannel channel(TransportRequestOptions.Type type) { @Override public void close() { + handleClose(null); + } + + @Override + public void closeAndFail(Exception e) { + handleClose(e); + } + + private void handleClose(Exception e) { if (isClosing.compareAndSet(false, true)) { try { boolean block = lifecycle.stopped() && Transports.isTransportThread(Thread.currentThread()) == false; CloseableChannel.closeChannels(channels, block); } finally { // Call the super method to trigger listeners - super.close(); + if (e == null) { + super.close(); + } else { + super.closeAndFail(e); + } } } } @@ -760,6 +773,7 @@ static void handleException(TcpChannel channel, Exception e, Lifecycle lifecycle } } finally { if (closeChannel) { + channel.onException(e); CloseableChannel.closeChannel(channel); } } @@ -1120,7 +1134,17 @@ public void onResponse(Void v) { nodeChannels.channels.forEach(ch -> { // Mark the channel init time ch.getChannelStats().markAccessed(relativeMillisTime); - ch.addCloseListener(ActionListener.running(nodeChannels::close)); + ch.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + nodeChannels.close(); + } + + @Override + public void onFailure(Exception e) { + nodeChannels.closeAndFail(e); + } + }); }); keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile); nodeChannels.addCloseListener(new ChannelCloseLogger(node, connectionId, relativeMillisTime)); @@ -1181,7 +1205,16 @@ public void onResponse(Void ignored) { @Override public void onFailure(Exception e) { - assert false : e; // never called + long closeTimeMillis = threadPool.relativeTimeInMillis(); + logger.info( + () -> format( + "closed transport connection [{}] to [{}] with age [{}ms], exception:", + connectionId, + node, + closeTimeMillis - openTimeMillis + ), + e + ); } } diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index a1d35ce3f255a..13b2752c929bb 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -113,9 +113,9 @@ void sendRequest(long requestId, String action, TransportRequest request, Transp TransportException; /** - * The listener's {@link ActionListener#onResponse(Object)} method will be called when this - * connection is closed. No implementations currently throw an exception during close, so - * {@link ActionListener#onFailure(Exception)} will not be called. + * The listener will be called when this connection has completed closing. The {@link ActionListener#onResponse(Object)} method + * will be called when the connection closed gracefully, and the {@link ActionListener#onFailure(Exception)} method will be called + * when the connection has successfully closed, but an exception has prompted the close. * * @param listener to be called */ diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index 7099c33dda75f..80982c4f80d59 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -40,7 +40,6 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; /** Unit tests for {@link TcpTransport} */ @@ -606,7 +605,10 @@ private void testExceptionHandling( if (expectClosed) { assertTrue(listener.isDone()); - assertThat(listener.actionGet(), nullValue()); + try { + listener.get(); + assert false : "channel should have an exception reported"; + } catch (Exception e) {} } else { assertFalse(listener.isDone()); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java index c4c7a4f16da84..503fb48e5c95d 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/FakeTcpChannel.java @@ -29,6 +29,8 @@ public class FakeTcpChannel implements TcpChannel { private final AtomicReference messageCaptor; private final AtomicReference> listenerCaptor; + private Exception channelError = null; + public FakeTcpChannel() { this(false, "profile", new AtomicReference<>()); } @@ -94,10 +96,19 @@ public void addConnectListener(ActionListener listener) { @Override public void close() { if (closed.compareAndSet(false, true)) { - closeContext.onResponse(null); + if (channelError != null) { + closeContext.onFailure(channelError); + } else { + closeContext.onResponse(null); + } } } + @Override + public void onException(Exception e) { + channelError = e; + } + @Override public void addCloseListener(ActionListener listener) { closeContext.addListener(listener); From a6679d7b14de9b0cd3113a09180b67ce5a81396c Mon Sep 17 00:00:00 2001 From: Simon Chase Date: Tue, 29 Apr 2025 17:33:01 -0700 Subject: [PATCH 2/2] transport: log network reconnects with same peer process ClusterConnectionManager now caches the previous ephemeralId (process-scope) of peer nodes when they disconnect. On reconnect, when a peer has the same ephemeralId as it did in the previous connection, this is logged to indicate a network failure. The ephemeralId table is garbage-collected every hour, with entries older than an hour removed. --- .../transport/ClusterConnectionManager.java | 123 +++++++++++++++--- 1 file changed, 106 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index f8a798e15f282..0c0c32656a837 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -47,6 +47,12 @@ public class ClusterConnectionManager implements ConnectionManager { .newConcurrentMap(); private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete); + record NodeConnectionHistory(String ephemeralId, long disconnectTime) {} + + // map of nodeId -> NodeConnectionHistory entries updated on connection close, with any error + private final ConcurrentMap nodeHistory = ConcurrentCollections.newConcurrentMap(); + private long nodeHistoryLastGC = 0; + private final Transport transport; private final ThreadContext threadContext; private final ConnectionProfile defaultProfile; @@ -226,6 +232,19 @@ private void connectToNodeOrRetry( } else { logger.debug("connected to node [{}]", node); managerRefs.mustIncRef(); + + // log case where remote has same ephemeralId as previous connection (the network was disrupted, but not the + // remote process), and update history with removal (we just connected successfully) + final DiscoveryNode connNode = conn.getNode(); + NodeConnectionHistory hist = nodeHistory.remove(connNode.getId()); + if (hist != null && hist.ephemeralId.equals(connNode.getEphemeralId())) { + logger.warn( + """ + transport connection to [{}] reopened, with same ephemeralId found.""", + node.descriptionWithoutAttributes() + ); + } + try { connectionListener.onNodeConnected(node, conn); } finally { @@ -235,25 +254,75 @@ private void connectToNodeOrRetry( managerRefs.decRef(); })); - conn.addCloseListener(ActionListener.running(() -> { - if (connectingRefCounter.hasReferences() == false) { - logger.trace("connection manager shut down, closing transport connection to [{}]", node); - } else if (conn.hasReferences()) { - logger.info( - """ - transport connection to [{}] closed by remote; \ - if unexpected, see [{}] for troubleshooting guidance""", - node.descriptionWithoutAttributes(), - ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + conn.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + final NodeConnectionHistory hist = new NodeConnectionHistory( + node.getEphemeralId(), + System.currentTimeMillis() ); - // In production code we only close connections via ref-counting, so this message confirms that a - // 'node-left ... reason: disconnected' event was caused by external factors. Put differently, if a - // node leaves the cluster with "reason: disconnected" but without this message being logged then - // that's a bug. - } else { - logger.debug("closing unused transport connection to [{}]", node); + nodeHistory.put(conn.getNode().getId(), hist); } - })); + + @Override + public void onFailure(Exception e) { + final NodeConnectionHistory hist = new NodeConnectionHistory( + node.getEphemeralId(), + System.currentTimeMillis() + ); + nodeHistory.put(conn.getNode().getId(), hist); + } + }); + + conn.addCloseListener(ActionListener.running(ClusterConnectionManager.this::collectHistoryGarbage)); + + conn.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + if (connectingRefCounter.hasReferences() == false) { + logger.trace("connection manager shut down, closing transport connection to [{}]", node); + } else if (conn.hasReferences()) { + // the connection is closing down, but hasn't been released by the client/library side + // this is an event coming up from the network side + logger.info( + """ + transport connection to [{}] closed by remote; \ + if unexpected, see [{}] for troubleshooting guidance""", + node.descriptionWithoutAttributes(), + ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + ); + // In production code we only close connections via ref-counting, so this message confirms that + // a 'node-left ... reason: disconnected' event was caused by external factors. Put + // differently, if a node leaves the cluster with "reason: disconnected" but without this + // message being logged then that's a bug. + } else { + logger.debug("closing unused transport connection to [{}]", node); + } + } + + @Override + public void onFailure(Exception e) { + if (conn.hasReferences()) { + logger.warn( + """ + transport connection to [{}] closed by remote with exception [{}]; \ + if unexpected, see [{}] for troubleshooting guidance""", + node.descriptionWithoutAttributes(), + e, + ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + ); + } else { + logger.warn( + """ + transport connection to [{}] closed with exception [{}]; \ + if unexpected, see [{}] for troubleshooting guidance""", + node.descriptionWithoutAttributes(), + e, + ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + ); + } + } + }); } } } finally { @@ -276,6 +345,26 @@ private void connectToNodeOrRetry( ); } + /** + * Removes entries in the nodeHistory table that are too old + */ + private void collectHistoryGarbage() { + final long now = System.currentTimeMillis(); + final long hour = 60 * 60 * 1000; + + if (now - hour > nodeHistoryLastGC) { + final int startSize = nodeHistory.size(); + nodeHistoryLastGC = now; + final long expire = now - hour; + for (Map.Entry entry : nodeHistory.entrySet()) { + if (expire > entry.getValue().disconnectTime) { + nodeHistory.remove(entry.getKey(), entry.getValue()); + } + } + logger.trace("ClusterConnectionManager GCed connection history from {} to {} entries", startSize, nodeHistory.size()); + } + } + /** * Returns a connection for the given node if the node is connected. * Connections returned from this method must not be closed. The lifecycle of this connection is