Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,22 @@ public class Netty4TcpChannel implements TcpChannel {
private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
private final ChannelStats stats = new ChannelStats();
private final boolean rstOnClose;
private volatile Exception channelError = null;

Netty4TcpChannel(Channel channel, boolean isServer, String profile, boolean rstOnClose, ChannelFuture connectFuture) {
this.channel = channel;
this.isServer = isServer;
this.profile = profile;
this.connectContext = new ListenableFuture<>();
this.rstOnClose = rstOnClose;
addListener(this.channel.closeFuture(), closeContext);
addListener(connectFuture, connectContext);
addListener(this.channel.closeFuture(), ActionListener.running(() -> {
if (channelError != null) {
closeContext.onFailure(channelError);
} else {
closeContext.onResponse(null);
}
}));
}

@Override
Expand Down Expand Up @@ -95,6 +102,11 @@ public void addConnectListener(ActionListener<Void> listener) {
connectContext.addListener(listener);
}

@Override
public void onException(Exception e) {
channelError = e;
}

@Override
public ChannelStats getChannelStats() {
return stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -281,8 +282,8 @@ protected Netty4TcpChannel initiateChannel(DiscoveryNode node, ConnectionProfile
rstOnClose,
connectFuture
);
addClosedExceptionLogger(nettyChannel);
channel.attr(CHANNEL_KEY).set(nettyChannel);

return nettyChannel;
}

Expand Down Expand Up @@ -312,14 +313,19 @@ protected class ClientChannelInitializer extends ChannelInitializer<Channel> {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
setupPipeline(ch, false);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
if (cause instanceof Error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also use org.elasticsearch.ExceptionsHelper#maybeDieOnAnotherThread here - if an Error is thrown then we cannot continue and must not just quietly suppress it.

Copy link
Contributor Author

@schase-es schase-es May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very new to how exceptions are reported here. The maybeDieOnAnotherThread helper is invoked a few lines below -- it's snuck in on line 329 in this PR.

An observation I made is that these changes shouldn't modify how the exception is responded to (I almost did this in a few cases!). In this commit, it's more about pinning it to the channel, so if it's closed there's some attribution around why.

Does this explanation address things? I really barely know what that helper does!

channel.onException(new Exception(cause));
} else {
channel.onException((Exception) cause);
}
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
Expand All @@ -337,17 +343,23 @@ protected ServerChannelInitializer(String name) {

@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
assert ch instanceof Netty4NioSocketChannel;
NetUtils.tryEnsureReasonableKeepAliveConfig(((Netty4NioSocketChannel) ch).javaChannel());
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, rstOnClose, ch.newSucceededFuture());
addClosedExceptionLogger(nettyTcpChannel);
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
setupPipeline(ch, isRemoteClusterServerChannel);
serverAcceptedChannel(nettyTcpChannel);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4TcpChannel channel = ctx.channel().attr(CHANNEL_KEY).get();
if (cause instanceof Error) {
channel.onException(new Exception(cause));
} else {
channel.onException((Exception) cause);
}
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
Expand Down Expand Up @@ -383,12 +395,12 @@ protected InboundPipeline getInboundPipeline(Channel ch, boolean isRemoteCluster
);
}

private static void addClosedExceptionLogger(Channel channel) {
Netty4Utils.addListener(channel.closeFuture(), channelFuture -> {
if (channelFuture.isSuccess() == false && logger.isDebugEnabled()) {
logger.debug(format("exception while closing channel: %s", channelFuture.channel()), channelFuture.cause());
private static void addClosedExceptionLogger(Netty4TcpChannel channel) {
channel.addCloseListener(ActionListener.wrap((ignored) -> {}, (e) -> {
if (logger.isDebugEnabled()) {
logger.debug(format("exception while closing channel: %s", channel), e);
}
});
}));
}

@ChannelHandler.Sharable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface CloseableChannel extends Closeable {
* channel. If the channel is already closed when the listener is added the listener will immediately be
* executed by the thread that is attempting to add the listener.
*
* When the close completes but an exception prompted the closure, the exception will be passed to the
* listener's onFailure method.
*
* @param listener to be executed
*/
void addCloseListener(ActionListener<Void> listener);
Expand Down
4 changes: 2 additions & 2 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -736,11 +736,11 @@ private ChannelPendingTaskTracker startTrackingChannel(TcpChannel channel, Consu
return curr;
});
if (tracker.registered.compareAndSet(false, true)) {
channel.addCloseListener(ActionListener.wrap(r -> {
channel.addCloseListener(ActionListener.running(() -> {
final ChannelPendingTaskTracker removedTracker = channelPendingTaskTrackers.remove(channel);
assert removedTracker == tracker;
onChannelClosed(tracker);
}, e -> { assert false : new AssertionError("must not be here", e); }));
}));
}
return tracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public void close() {
}
}

public void closeAndFail(Exception e) {
if (closed.compareAndSet(false, true)) {
closeContext.onFailure(e);
}
}

@Override
public void onRemoved() {
if (removed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class ClusterConnectionManager implements ConnectionManager {
.newConcurrentMap();
private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);

record NodeConnectionHistory(String ephemeralId, long disconnectTime) {}

// map of nodeId -> NodeConnectionHistory entries updated on connection close, with any error
private final ConcurrentMap<String, NodeConnectionHistory> nodeHistory = ConcurrentCollections.newConcurrentMap();
private long nodeHistoryLastGC = 0;

private final Transport transport;
private final ThreadContext threadContext;
private final ConnectionProfile defaultProfile;
Expand Down Expand Up @@ -226,6 +232,19 @@ private void connectToNodeOrRetry(
} else {
logger.debug("connected to node [{}]", node);
managerRefs.mustIncRef();

// log case where remote has same ephemeralId as previous connection (the network was disrupted, but not the
// remote process), and update history with removal (we just connected successfully)
final DiscoveryNode connNode = conn.getNode();
NodeConnectionHistory hist = nodeHistory.remove(connNode.getId());
if (hist != null && hist.ephemeralId.equals(connNode.getEphemeralId())) {
logger.warn(
"""
transport connection to [{}] reopened, with same ephemeralId found.""",
node.descriptionWithoutAttributes()
);
}

try {
connectionListener.onNodeConnected(node, conn);
} finally {
Expand All @@ -235,25 +254,75 @@ private void connectToNodeOrRetry(
managerRefs.decRef();
}));

conn.addCloseListener(ActionListener.running(() -> {
if (connectingRefCounter.hasReferences() == false) {
logger.trace("connection manager shut down, closing transport connection to [{}]", node);
} else if (conn.hasReferences()) {
logger.info(
"""
transport connection to [{}] closed by remote; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
conn.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
final NodeConnectionHistory hist = new NodeConnectionHistory(
node.getEphemeralId(),
System.currentTimeMillis()
);
// In production code we only close connections via ref-counting, so this message confirms that a
// 'node-left ... reason: disconnected' event was caused by external factors. Put differently, if a
// node leaves the cluster with "reason: disconnected" but without this message being logged then
// that's a bug.
} else {
logger.debug("closing unused transport connection to [{}]", node);
nodeHistory.put(conn.getNode().getId(), hist);
}
}));

@Override
public void onFailure(Exception e) {
final NodeConnectionHistory hist = new NodeConnectionHistory(
node.getEphemeralId(),
System.currentTimeMillis()
);
nodeHistory.put(conn.getNode().getId(), hist);
}
});

conn.addCloseListener(ActionListener.running(ClusterConnectionManager.this::collectHistoryGarbage));

conn.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
if (connectingRefCounter.hasReferences() == false) {
logger.trace("connection manager shut down, closing transport connection to [{}]", node);
} else if (conn.hasReferences()) {
// the connection is closing down, but hasn't been released by the client/library side
// this is an event coming up from the network side
logger.info(
"""
transport connection to [{}] closed by remote; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
// In production code we only close connections via ref-counting, so this message confirms that
// a 'node-left ... reason: disconnected' event was caused by external factors. Put
// differently, if a node leaves the cluster with "reason: disconnected" but without this
// message being logged then that's a bug.
} else {
logger.debug("closing unused transport connection to [{}]", node);
}
}

@Override
public void onFailure(Exception e) {
if (conn.hasReferences()) {
logger.warn(
"""
transport connection to [{}] closed by remote with exception [{}]; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
e,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
} else {
logger.warn(
"""
transport connection to [{}] closed with exception [{}]; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
e,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
}
}
});
}
}
} finally {
Expand All @@ -276,6 +345,26 @@ private void connectToNodeOrRetry(
);
}

/**
* Removes entries in the nodeHistory table that are too old
*/
private void collectHistoryGarbage() {
final long now = System.currentTimeMillis();
final long hour = 60 * 60 * 1000;
Comment on lines +351 to +353
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be time-based. I'd rather we dropped the entry when the node is removed from the cluster membership (unexpected membership changes are already logged appropriately - see JoinReasonService). If the node remains in the cluster membership then we should report an unexpected reconnection no matter how long it's been disconnected.

(If we were to make this time-based then we should make the timeout configurable via a setting rather than hard-coded at 1 hour).

In principle we could also just make it size-based, expiring the oldest entries to limit the size of the map to no more than (say) double the size of the cluster. That's what JoinReasonService does, because there we cannot fall back on something stable like the cluster membership. But here we can, so I think we should use that precision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!


if (now - hour > nodeHistoryLastGC) {
final int startSize = nodeHistory.size();
nodeHistoryLastGC = now;
final long expire = now - hour;
for (Map.Entry<String, NodeConnectionHistory> entry : nodeHistory.entrySet()) {
if (expire > entry.getValue().disconnectTime) {
nodeHistory.remove(entry.getKey(), entry.getValue());
}
}
logger.trace("ClusterConnectionManager GCed connection history from {} to {} entries", startSize, nodeHistory.size());
}
}

/**
* Returns a connection for the given node if the node is connected.
* Connections returned from this method must not be closed. The lifecycle of this connection is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ private void handleHandshakeRequest(TcpChannel channel, InboundMessage message)
() -> "error processing handshake version [" + header.getVersion() + "] received on [" + channel + "], closing channel",
e
);
channel.onException(e);
channel.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ void sendResponse(
),
ex
);
channel.onException(ex);
channel.close();
} else {
sendErrorResponse(transportVersion, channel, requestId, action, responseStatsConsumer, ex);
Expand Down Expand Up @@ -204,6 +205,7 @@ void sendErrorResponse(
} catch (Exception sendException) {
sendException.addSuppressed(error);
logger.error(() -> format("Failed to send error response on channel [%s], closing channel", channel), sendException);
channel.onException(sendException);
channel.close();
}
}
Expand Down Expand Up @@ -431,6 +433,7 @@ private void maybeLogSlowMessage(boolean success) {
}
});
} catch (RuntimeException ex) {
channel.onException(ex);
Releasables.closeExpectNoException(() -> listener.onFailure(ex), () -> CloseableChannel.closeChannel(channel));
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public interface TcpChannel extends CloseableChannel {
*/
void addConnectListener(ActionListener<Void> listener);

/**
* Report an exception on this channel
*
* @param e the exception
*/
void onException(Exception e);

/**
* Returns stats about this channel
*/
Expand Down
Loading