Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.ClusterConnectionManager;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.TransportLogger;

Expand All @@ -27,7 +28,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {

public void setUp() throws Exception {
super.setUp();
mockLog = MockLog.capture(ESLoggingHandler.class, TransportLogger.class, TcpTransport.class);
mockLog = MockLog.capture(ESLoggingHandler.class, TransportLogger.class, TcpTransport.class, ClusterConnectionManager.class);
}

public void tearDown() throws Exception {
Expand Down Expand Up @@ -125,4 +126,24 @@ public void testExceptionalDisconnectLogging() throws Exception {

mockLog.assertAllExpectationsMatched();
}

@TestLogging(
value = "org.elasticsearch.transport.ClusterConnectionManager:WARN",
reason = "to ensure we log cluster manager disconnect events on WARN level"
)
public void testExceptionalDisconnectLoggingInClusterConnectionManager() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put this into its own test suite? This suite is supposed to be about ESLoggingHandler which is unrelated to the logging in ClusterConnectionManager. I think this test should work fine in the :server test suite, no need to hide it in the transport-netty4 module.

Also could you open a separate PR to move testConnectionLogging and testExceptionalDisconnectLogging out of this test suite - they're testing the logging in TcpTransport which is similarly unrelated to ESLoggingHandler. IIRC they were added here for historical reasons, but these days we use the Netty transport everywhere so these should work in :server too.

mockLog.addExpectation(
new MockLog.PatternSeenEventExpectation(
"cluster connection manager exceptional disconnect log",
ClusterConnectionManager.class.getCanonicalName(),
Level.WARN,
"transport connection to \\[.*\\] closed (by remote )?with exception .*"
)
);

final String nodeName = internalCluster().startNode();
internalCluster().restartNode(nodeName);

mockLog.assertAllExpectationsMatched();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion)
}

final List<Runnable> runnables = new ArrayList<>(discoveryNodes.getSize());
final List<DiscoveryNode> nodes = new ArrayList<>(discoveryNodes.getSize());
try (var refs = new RefCountingRunnable(onCompletion)) {
synchronized (mutex) {
// Ugly hack: when https://github.com/elastic/elasticsearch/issues/94946 is fixed, just iterate over discoveryNodes here
for (final Iterator<DiscoveryNode> iterator = discoveryNodes.mastersFirstStream().iterator(); iterator.hasNext();) {
final DiscoveryNode discoveryNode = iterator.next();
nodes.add(discoveryNode);
ConnectionTarget connectionTarget = targetsByNode.get(discoveryNode);
final boolean isNewNode = connectionTarget == null;
if (isNewNode) {
Expand All @@ -120,6 +122,7 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion)
runnables.add(connectionTarget.connect(null));
}
}
transportService.retainConnectionHistory(nodes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might be able to use DiscoveryNodes#getAllNodes() rather than building up an auxiliary collection, that might be marginally more efficient? Set#retainAll seems to take a Collection, but we'd need to change the ConnectionManager#retainConnectionHistory interface to accommodate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a separate collection here at all? We could just pass discoveryNodes around I think.

But also, really this is cleaning out the nodes about which we no longer care, so I think we should be doing this in disconnectFromNodesExcept instead.

Copy link
Contributor Author

@schase-es schase-es May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nick raised an important point about the race between the connection history table and the close callback.

A connection's close callback will always put an entry in the history table. If this close is a consequence of a cluster state change and disconnect in NodeConnectionsService, then it will add a node history right after it's supposed to be cleaned out.

Cleaning out the node history table whenever we disconnect from some nodes or connect to some new nodes works fine, but it means the history table will always lag a version behind, in what it's holding onto.

I came up with a concurrency scheme that works for keeping the node history current in NodeConnectionsService, but it's more complicated.

}
}
runnables.forEach(Runnable::run);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.elasticsearch.core.Strings.format;

/**
* This class manages node connections within a cluster. The connection is opened by the underlying transport.
Expand All @@ -47,6 +51,10 @@ public class ClusterConnectionManager implements ConnectionManager {
.newConcurrentMap();
private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);

record NodeConnectionHistory(String ephemeralId, Exception disconnectCause) {}

private final ConcurrentMap<String, NodeConnectionHistory> nodeHistory = ConcurrentCollections.newConcurrentMap();

private final Transport transport;
private final ThreadContext threadContext;
private final ConnectionProfile defaultProfile;
Expand Down Expand Up @@ -226,6 +234,29 @@ private void connectToNodeOrRetry(
} else {
logger.debug("connected to node [{}]", node);
managerRefs.mustIncRef();

// log case where the remote node has same ephemeralId as its previous connection
// (the network was disrupted, but not the remote process)
final DiscoveryNode connNode = conn.getNode();
NodeConnectionHistory hist = nodeHistory.remove(connNode.getId());
if (hist != null && hist.ephemeralId.equals(connNode.getEphemeralId())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we extract this to a separate method rather than adding to this already over-long and over-nested code directly?

Also I'd rather use nodeConnectionHistory instead of hist. Abbreviated variable names are a hindrance to readers, particularly if they don't have English as a first language, and there's no disadvantage to using the full type name here.

(nit: also it can be final)

if (hist.disconnectCause != null) {
logger.warn(
() -> format(
"transport connection reopened to node with same ephemeralId [%s], close exception:",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users don't really know what ephemeralId is so I think will find this message confusing. Could we say something like reopened transport connection to node [%s] which disconnected exceptionally [%s/%dms] ago but did not restart, so the disconnection is unexpected? NB also tracking the disconnection duration here.

Similarly disconnected gracefully in the other branch.

Also can we link ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING?

node.descriptionWithoutAttributes()
),
hist.disconnectCause
);
} else {
logger.warn(
"""
transport connection reopened to node with same ephemeralId [{}]""",
node.descriptionWithoutAttributes()
);
}
}

try {
connectionListener.onNodeConnected(node, conn);
} finally {
Expand All @@ -235,25 +266,65 @@ private void connectToNodeOrRetry(
managerRefs.decRef();
}));

conn.addCloseListener(ActionListener.running(() -> {
if (connectingRefCounter.hasReferences() == false) {
logger.trace("connection manager shut down, closing transport connection to [{}]", node);
} else if (conn.hasReferences()) {
logger.info(
"""
transport connection to [{}] closed by remote; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
// In production code we only close connections via ref-counting, so this message confirms that a
// 'node-left ... reason: disconnected' event was caused by external factors. Put differently, if a
// node leaves the cluster with "reason: disconnected" but without this message being logged then
// that's a bug.
} else {
logger.debug("closing unused transport connection to [{}]", node);
conn.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
final NodeConnectionHistory hist = new NodeConnectionHistory(node.getEphemeralId(), null);
nodeHistory.put(conn.getNode().getId(), hist);
}
}));

@Override
public void onFailure(Exception e) {
final NodeConnectionHistory hist = new NodeConnectionHistory(node.getEphemeralId(), e);
nodeHistory.put(conn.getNode().getId(), hist);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to store the connection history even when conn.hasReferences() == false ? I'm not 100% familiar with this code, but I wonder if we might get the occasional ungraceful disconnect after we've released all our references?

I guess in that case we would eventually discard the entry via retainConnectionHistory anyway.

Do we need to be careful with the timing of calls to retainConnectionHistory versus the these close handlers firing? I guess any entries that are added after a purge would not survive subsequent purges.

});

conn.addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
if (connectingRefCounter.hasReferences() == false) {
logger.trace("connection manager shut down, closing transport connection to [{}]", node);
} else if (conn.hasReferences()) {
logger.info(
"""
transport connection to [{}] closed by remote; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
// In production code we only close connections via ref-counting, so this message confirms that
// a 'node-left ... reason: disconnected' event was caused by external factors. Put
// differently, if a node leaves the cluster with "reason: disconnected" but without this
// message being logged then that's a bug.
} else {
logger.debug("closing unused transport connection to [{}]", node);
}
}

@Override
public void onFailure(Exception e) {
if (conn.hasReferences()) {
logger.warn(
"""
transport connection to [{}] closed by remote with exception [{}]; \
if unexpected, see [{}] for troubleshooting guidance""",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this isn't guaranteed to be a WARN worthy event - if the node shut down then we might get a Connection reset or similar but that's not something that needs action, and we do log those exceptions elsewhere. On reflection I'd rather leave the logging in ClusterConnectionManager alone in this PR and just look at the new logs from the NodeConnectionsService.

node.descriptionWithoutAttributes(),
e,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
} else {
logger.warn(
"""
transport connection to [{}] closed with exception [{}]; \
if unexpected, see [{}] for troubleshooting guidance""",
node.descriptionWithoutAttributes(),
e,
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like previously we would only have logged at debug level in this scenario? unless I'm reading it wrong. I'm not sure how interesting this case is (as we were disconnecting from the node anyway)?

}
}
});
}
}
} finally {
Expand All @@ -276,6 +347,21 @@ private void connectToNodeOrRetry(
);
}

@Override
public void retainConnectionHistory(List<DiscoveryNode> nodes) {
List<String> nodeIds = nodes.stream().map(node -> node.getId()).collect(Collectors.toList());

final int startSize = nodeHistory.size();
// the keyset propagates changes to the underlying map
nodeHistory.keySet().retainAll(nodeIds);
logger.trace("Connection history garbage-collected from {} to {} entries", startSize, nodeHistory.size());
}

@Override
public int connectionHistorySize() {
return nodeHistory.size();
}

/**
* Returns a connection for the given node if the node is connected.
* Connections returned from this method must not be closed. The lifecycle of this connection is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.core.Releasable;

import java.io.Closeable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;

Expand Down Expand Up @@ -50,6 +51,16 @@ void connectToNode(

ConnectionProfile getConnectionProfile();

/**
* Keep the connection history for the nodes listed
*/
void retainConnectionHistory(List<DiscoveryNode> nodes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the javadoc I think we should mention that we discard history for nodes not in the list? If you know the Set API then it's suggested by the name retain, but if you don't it might not be obvious.


/**
* Exposed for tests
*/
int connectionHistorySize();

@FunctionalInterface
interface ConnectionValidator {
void validate(Transport.Connection connection, ConnectionProfile profile, ActionListener<Void> listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,15 @@ public void closeNoBlock() {
delegate.closeNoBlock();
}

@Override
public void retainConnectionHistory(List<DiscoveryNode> nodes) {
delegate.retainConnectionHistory(nodes);
}

public int connectionHistorySize() {
return delegate.connectionHistorySize();
}

/**
* This method returns a remote cluster alias for the given transport connection if it targets a node in the remote cluster.
* This method will return an optional empty in case the connection targets the local node or the node in the local cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ public boolean nodeConnected(DiscoveryNode node) {
return isLocalNode(node) || connectionManager.nodeConnected(node);
}

public void retainConnectionHistory(List<DiscoveryNode> nodes) {
connectionManager.retainConnectionHistory(nodes);
}

/**
* Connect to the specified node with the given connection profile.
* The ActionListener will be called on the calling thread or the generic thread pool.
Expand Down
Loading