Skip to content

Commit 7d54620

Browse files
committed
transport: log network reconnects with same peer process
ClusterConnectionManager now caches the previous ephemeralId (created on process-start) of peer nodes on disconnect in a connection history table. On reconnect, when a peer has the same ephemeralId as it did previously, this is logged to indicate a network failure. The connectionHistory is trimmed to the current set of peers by NodeConnectionsService.
1 parent 80a41a7 commit 7d54620

File tree

8 files changed

+248
-20
lines changed

8 files changed

+248
-20
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.test.ESIntegTestCase;
1616
import org.elasticsearch.test.MockLog;
1717
import org.elasticsearch.test.junit.annotations.TestLogging;
18+
import org.elasticsearch.transport.ClusterConnectionManager;
1819
import org.elasticsearch.transport.TcpTransport;
1920
import org.elasticsearch.transport.TransportLogger;
2021

@@ -27,7 +28,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
2728

2829
public void setUp() throws Exception {
2930
super.setUp();
30-
mockLog = MockLog.capture(ESLoggingHandler.class, TransportLogger.class, TcpTransport.class);
31+
mockLog = MockLog.capture(ESLoggingHandler.class, TransportLogger.class, TcpTransport.class, ClusterConnectionManager.class);
3132
}
3233

3334
public void tearDown() throws Exception {
@@ -125,4 +126,24 @@ public void testExceptionalDisconnectLogging() throws Exception {
125126

126127
mockLog.assertAllExpectationsMatched();
127128
}
129+
130+
@TestLogging(
131+
value = "org.elasticsearch.transport.ClusterConnectionManager:WARN",
132+
reason = "to ensure we log cluster manager disconnect events on WARN level"
133+
)
134+
public void testExceptionalDisconnectLoggingInClusterConnectionManager() throws Exception {
135+
mockLog.addExpectation(
136+
new MockLog.PatternSeenEventExpectation(
137+
"cluster connection manager exceptional disconnect log",
138+
ClusterConnectionManager.class.getCanonicalName(),
139+
Level.WARN,
140+
"transport connection to \\[.*\\] closed (by remote )?with exception .*"
141+
)
142+
);
143+
144+
final String nodeName = internalCluster().startNode();
145+
internalCluster().restartNode(nodeName);
146+
147+
mockLog.assertAllExpectationsMatched();
148+
}
128149
}

server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,13 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion)
9999
}
100100

101101
final List<Runnable> runnables = new ArrayList<>(discoveryNodes.getSize());
102+
final List<DiscoveryNode> nodes = new ArrayList<>(discoveryNodes.getSize());
102103
try (var refs = new RefCountingRunnable(onCompletion)) {
103104
synchronized (mutex) {
104105
// Ugly hack: when https://github.com/elastic/elasticsearch/issues/94946 is fixed, just iterate over discoveryNodes here
105106
for (final Iterator<DiscoveryNode> iterator = discoveryNodes.mastersFirstStream().iterator(); iterator.hasNext();) {
106107
final DiscoveryNode discoveryNode = iterator.next();
108+
nodes.add(discoveryNode);
107109
ConnectionTarget connectionTarget = targetsByNode.get(discoveryNode);
108110
final boolean isNewNode = connectionTarget == null;
109111
if (isNewNode) {
@@ -120,6 +122,7 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion)
120122
runnables.add(connectionTarget.connect(null));
121123
}
122124
}
125+
transportService.retainConnectionHistory(nodes);
123126
}
124127
}
125128
runnables.forEach(Runnable::run);

server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java

Lines changed: 103 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@
2727

2828
import java.util.Collections;
2929
import java.util.Iterator;
30+
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Set;
3233
import java.util.concurrent.ConcurrentMap;
3334
import java.util.concurrent.CountDownLatch;
3435
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.stream.Collectors;
37+
38+
import static org.elasticsearch.core.Strings.format;
3539

3640
/**
3741
* This class manages node connections within a cluster. The connection is opened by the underlying transport.
@@ -47,6 +51,9 @@ public class ClusterConnectionManager implements ConnectionManager {
4751
.newConcurrentMap();
4852
private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);
4953

54+
record NodeConnectionHistory(String ephemeralId, Exception disconnectCause) {}
55+
private final ConcurrentMap<String, NodeConnectionHistory> nodeHistory = ConcurrentCollections.newConcurrentMap();
56+
5057
private final Transport transport;
5158
private final ThreadContext threadContext;
5259
private final ConnectionProfile defaultProfile;
@@ -226,6 +233,29 @@ private void connectToNodeOrRetry(
226233
} else {
227234
logger.debug("connected to node [{}]", node);
228235
managerRefs.mustIncRef();
236+
237+
// log case where the remote node has same ephemeralId as its previous connection
238+
// (the network was disrupted, but not the remote process)
239+
final DiscoveryNode connNode = conn.getNode();
240+
NodeConnectionHistory hist = nodeHistory.remove(connNode.getId());
241+
if (hist != null && hist.ephemeralId.equals(connNode.getEphemeralId())) {
242+
if (hist.disconnectCause != null) {
243+
logger.warn(
244+
() -> format(
245+
"transport connection reopened to node with same ephemeralId [%s], close exception:",
246+
node.descriptionWithoutAttributes()
247+
),
248+
hist.disconnectCause
249+
);
250+
} else {
251+
logger.warn(
252+
"""
253+
transport connection reopened to node with same ephemeralId [{}]""",
254+
node.descriptionWithoutAttributes()
255+
);
256+
}
257+
}
258+
229259
try {
230260
connectionListener.onNodeConnected(node, conn);
231261
} finally {
@@ -235,25 +265,65 @@ private void connectToNodeOrRetry(
235265
managerRefs.decRef();
236266
}));
237267

238-
conn.addCloseListener(ActionListener.running(() -> {
239-
if (connectingRefCounter.hasReferences() == false) {
240-
logger.trace("connection manager shut down, closing transport connection to [{}]", node);
241-
} else if (conn.hasReferences()) {
242-
logger.info(
243-
"""
244-
transport connection to [{}] closed by remote; \
245-
if unexpected, see [{}] for troubleshooting guidance""",
246-
node.descriptionWithoutAttributes(),
247-
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
248-
);
249-
// In production code we only close connections via ref-counting, so this message confirms that a
250-
// 'node-left ... reason: disconnected' event was caused by external factors. Put differently, if a
251-
// node leaves the cluster with "reason: disconnected" but without this message being logged then
252-
// that's a bug.
253-
} else {
254-
logger.debug("closing unused transport connection to [{}]", node);
268+
conn.addCloseListener(new ActionListener<Void>() {
269+
@Override
270+
public void onResponse(Void ignored) {
271+
final NodeConnectionHistory hist = new NodeConnectionHistory(node.getEphemeralId(), null);
272+
nodeHistory.put(conn.getNode().getId(), hist);
255273
}
256-
}));
274+
275+
@Override
276+
public void onFailure(Exception e) {
277+
final NodeConnectionHistory hist = new NodeConnectionHistory(node.getEphemeralId(), e);
278+
nodeHistory.put(conn.getNode().getId(), hist);
279+
}
280+
});
281+
282+
conn.addCloseListener(new ActionListener<Void>() {
283+
@Override
284+
public void onResponse(Void ignored) {
285+
if (connectingRefCounter.hasReferences() == false) {
286+
logger.trace("connection manager shut down, closing transport connection to [{}]", node);
287+
} else if (conn.hasReferences()) {
288+
logger.info(
289+
"""
290+
transport connection to [{}] closed by remote; \
291+
if unexpected, see [{}] for troubleshooting guidance""",
292+
node.descriptionWithoutAttributes(),
293+
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
294+
);
295+
// In production code we only close connections via ref-counting, so this message confirms that
296+
// a 'node-left ... reason: disconnected' event was caused by external factors. Put
297+
// differently, if a node leaves the cluster with "reason: disconnected" but without this
298+
// message being logged then that's a bug.
299+
} else {
300+
logger.debug("closing unused transport connection to [{}]", node);
301+
}
302+
}
303+
304+
@Override
305+
public void onFailure(Exception e) {
306+
if (conn.hasReferences()) {
307+
logger.warn(
308+
"""
309+
transport connection to [{}] closed by remote with exception [{}]; \
310+
if unexpected, see [{}] for troubleshooting guidance""",
311+
node.descriptionWithoutAttributes(),
312+
e,
313+
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
314+
);
315+
} else {
316+
logger.warn(
317+
"""
318+
transport connection to [{}] closed with exception [{}]; \
319+
if unexpected, see [{}] for troubleshooting guidance""",
320+
node.descriptionWithoutAttributes(),
321+
e,
322+
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
323+
);
324+
}
325+
}
326+
});
257327
}
258328
}
259329
} finally {
@@ -276,6 +346,21 @@ private void connectToNodeOrRetry(
276346
);
277347
}
278348

349+
@Override
350+
public void retainConnectionHistory(List<DiscoveryNode> nodes) {
351+
List<String> nodeIds = nodes.stream().map(node -> node.getId()).collect(Collectors.toList());
352+
353+
final int startSize = nodeHistory.size();
354+
// the keyset propagates changes to the underlying map
355+
nodeHistory.keySet().retainAll(nodeIds);
356+
logger.trace("Connection history garbage-collected from {} to {} entries", startSize, nodeHistory.size());
357+
}
358+
359+
@Override
360+
public int connectionHistorySize() {
361+
return nodeHistory.size();
362+
}
363+
279364
/**
280365
* Returns a connection for the given node if the node is connected.
281366
* Connections returned from this method must not be closed. The lifecycle of this connection is

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.core.Releasable;
1616

1717
import java.io.Closeable;
18+
import java.util.List;
1819
import java.util.Set;
1920
import java.util.concurrent.CopyOnWriteArrayList;
2021

@@ -50,6 +51,16 @@ void connectToNode(
5051

5152
ConnectionProfile getConnectionProfile();
5253

54+
/**
55+
* Keep the connection history for the nodes listed
56+
*/
57+
void retainConnectionHistory(List<DiscoveryNode> nodes);
58+
59+
/**
60+
* Exposed for tests
61+
*/
62+
int connectionHistorySize();
63+
5364
@FunctionalInterface
5465
interface ConnectionValidator {
5566
void validate(Transport.Connection connection, ConnectionProfile profile, ActionListener<Void> listener);

server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,15 @@ public void closeNoBlock() {
205205
delegate.closeNoBlock();
206206
}
207207

208+
@Override
209+
public void retainConnectionHistory(List<DiscoveryNode> nodes) {
210+
delegate.retainConnectionHistory(nodes);
211+
}
212+
213+
public int connectionHistorySize() {
214+
return delegate.connectionHistorySize();
215+
}
216+
208217
/**
209218
* This method returns a remote cluster alias for the given transport connection if it targets a node in the remote cluster.
210219
* This method will return an optional empty in case the connection targets the local node or the node in the local cluster.

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,10 @@ public boolean nodeConnected(DiscoveryNode node) {
483483
return isLocalNode(node) || connectionManager.nodeConnected(node);
484484
}
485485

486+
public void retainConnectionHistory(List<DiscoveryNode> nodes) {
487+
connectionManager.retainConnectionHistory(nodes);
488+
}
489+
486490
/**
487491
* Connect to the specified node with the given connection profile.
488492
* The ActionListener will be called on the calling thread or the generic thread pool.

0 commit comments

Comments
 (0)