Skip to content

Commit 11347ca

Browse files
committed
Addressed review feedback:
- moved test out of ESLoggingHandlerIt into a separate ClusterConnectionManagerIntegTests file - moved connection history into NodeConnectionsService, and adopted a consistency scheme - rewrote re-connection log message to include duration - changed log level of local disconnect with exception to debug
1 parent bd688bb commit 11347ca

File tree

9 files changed

+176
-164
lines changed

9 files changed

+176
-164
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -126,24 +126,4 @@ public void testExceptionalDisconnectLogging() throws Exception {
126126

127127
mockLog.assertAllExpectationsMatched();
128128
}
129-
130-
@TestLogging(
131-
value = "org.elasticsearch.transport.ClusterConnectionManager:WARN",
132-
reason = "to ensure we log cluster manager disconnect events on WARN level"
133-
)
134-
public void testExceptionalDisconnectLoggingInClusterConnectionManager() throws Exception {
135-
mockLog.addExpectation(
136-
new MockLog.PatternSeenEventExpectation(
137-
"cluster connection manager exceptional disconnect log",
138-
ClusterConnectionManager.class.getCanonicalName(),
139-
Level.WARN,
140-
"transport connection to \\[.*\\] closed (by remote )?with exception .*"
141-
)
142-
);
143-
144-
final String nodeName = internalCluster().startNode();
145-
internalCluster().restartNode(nodeName);
146-
147-
mockLog.assertAllExpectationsMatched();
148-
}
149129
}

server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,20 @@
1717
import org.elasticsearch.cluster.node.DiscoveryNode;
1818
import org.elasticsearch.cluster.node.DiscoveryNodes;
1919
import org.elasticsearch.cluster.service.ClusterApplier;
20+
import org.elasticsearch.common.ReferenceDocs;
2021
import org.elasticsearch.common.component.AbstractLifecycleComponent;
2122
import org.elasticsearch.common.settings.Setting;
2223
import org.elasticsearch.common.settings.Settings;
2324
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
25+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
26+
import org.elasticsearch.core.Nullable;
2427
import org.elasticsearch.core.Releasable;
2528
import org.elasticsearch.core.Releasables;
2629
import org.elasticsearch.core.TimeValue;
2730
import org.elasticsearch.injection.guice.Inject;
2831
import org.elasticsearch.threadpool.ThreadPool;
32+
import org.elasticsearch.transport.Transport;
33+
import org.elasticsearch.transport.TransportConnectionListener;
2934
import org.elasticsearch.transport.TransportService;
3035

3136
import java.util.ArrayList;
@@ -35,6 +40,7 @@
3540
import java.util.List;
3641
import java.util.Map;
3742
import java.util.Set;
43+
import java.util.concurrent.ConcurrentMap;
3844
import java.util.concurrent.atomic.AtomicInteger;
3945
import java.util.concurrent.atomic.AtomicReference;
4046

@@ -79,12 +85,14 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
7985

8086
private final TimeValue reconnectInterval;
8187
private volatile ConnectionChecker connectionChecker;
88+
private final ConnectionHistory connectionHistory;
8289

8390
@Inject
8491
public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
8592
this.threadPool = threadPool;
8693
this.transportService = transportService;
8794
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
95+
this.connectionHistory = new ConnectionHistory();
8896
}
8997

9098
/**
@@ -99,13 +107,12 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion)
99107
}
100108

101109
final List<Runnable> runnables = new ArrayList<>(discoveryNodes.getSize());
102-
final List<DiscoveryNode> nodes = new ArrayList<>(discoveryNodes.getSize());
103110
try (var refs = new RefCountingRunnable(onCompletion)) {
104111
synchronized (mutex) {
112+
connectionHistory.reserveConnectionHistoryForNodes(DiscoveryNodes);
105113
// Ugly hack: when https://github.com/elastic/elasticsearch/issues/94946 is fixed, just iterate over discoveryNodes here
106114
for (final Iterator<DiscoveryNode> iterator = discoveryNodes.mastersFirstStream().iterator(); iterator.hasNext();) {
107115
final DiscoveryNode discoveryNode = iterator.next();
108-
nodes.add(discoveryNode);
109116
ConnectionTarget connectionTarget = targetsByNode.get(discoveryNode);
110117
final boolean isNewNode = connectionTarget == null;
111118
if (isNewNode) {
@@ -122,7 +129,6 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion)
122129
runnables.add(connectionTarget.connect(null));
123130
}
124131
}
125-
transportService.retainConnectionHistory(nodes);
126132
}
127133
}
128134
runnables.forEach(Runnable::run);
@@ -140,6 +146,7 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
140146
nodesToDisconnect.remove(discoveryNode);
141147
}
142148

149+
connectionHistory.removeConnectionHistoryForNodes(nodesToDisconnect);
143150
for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
144151
runnables.add(targetsByNode.remove(discoveryNode)::disconnect);
145152
}
@@ -350,4 +357,113 @@ public String toString() {
350357
}
351358
}
352359
}
360+
361+
private class ConnectionHistory {
362+
record NodeConnectionHistory(String ephemeralId, long disconnectTime, Exception disconnectCause) {}
363+
364+
/**
365+
* Holds the DiscoveryNode nodeId to connection history record.
366+
*
367+
* Entries for each node are reserved during NodeConnectionsService.connectToNodes, by placing a (nodeId, dummy) entry
368+
* for each node in the cluster. On node disconnect, this entry is updated with its NodeConnectionHistory. On node
369+
* connect, this entry is reset to the dummy value. On NodeConnectionsService.disconnectFromNodesExcept, node entries
370+
* are removed.
371+
*
372+
* Each node in the cluster always has a nodeHistory entry that is either the dummy value or a connection history record. This
373+
* allows node disconnect callbacks to discard their entry if the disconnect occurred because of a change in cluster state.
374+
*/
375+
private final NodeConnectionHistory dummy = new NodeConnectionHistory("", 0, null);
376+
private final ConcurrentMap<String, NodeConnectionHistory> nodeHistory = ConcurrentCollections.newConcurrentMap();
377+
378+
ConnectionHistory() {
379+
NodeConnectionsService.this.transportService.addConnectionListener(new TransportConnectionListener() {
380+
@Override
381+
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
382+
// log case where the remote node has same ephemeralId as its previous connection
383+
// (the network was disrupted, but not the remote process)
384+
NodeConnectionHistory nodeConnectionHistory = nodeHistory.get(node.getId());
385+
if (nodeConnectionHistory != null) {
386+
nodeHistory.replace(node.getId(), nodeConnectionHistory, dummy);
387+
}
388+
389+
if (nodeConnectionHistory != null
390+
&& nodeConnectionHistory != dummy
391+
&& nodeConnectionHistory.ephemeralId.equals(node.getEphemeralId())) {
392+
if (nodeConnectionHistory.disconnectCause != null) {
393+
logger.warn(
394+
() -> format(
395+
"reopened transport connection to node [%s] "
396+
+ "which disconnected exceptionally [%dms] ago but did not "
397+
+ "restart, so the disconnection is unexpected; "
398+
+ "if unexpected, see [{}] for troubleshooting guidance",
399+
node.descriptionWithoutAttributes(),
400+
nodeConnectionHistory.disconnectTime,
401+
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
402+
),
403+
nodeConnectionHistory.disconnectCause
404+
);
405+
} else {
406+
logger.warn(
407+
"""
408+
reopened transport connection to node [{}] \
409+
which disconnected gracefully [{}ms] ago but did not \
410+
restart, so the disconnection is unexpected; \
411+
if unexpected, see [{}] for troubleshooting guidance""",
412+
node.descriptionWithoutAttributes(),
413+
nodeConnectionHistory.disconnectTime,
414+
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
415+
);
416+
}
417+
}
418+
}
419+
420+
@Override
421+
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
422+
connection.addCloseListener(new ActionListener<Void>() {
423+
@Override
424+
public void onResponse(Void ignored) {
425+
insertNodeConnectionHistory(null);
426+
}
427+
428+
@Override
429+
public void onFailure(Exception e) {
430+
insertNodeConnectionHistory(e);
431+
}
432+
433+
private void insertNodeConnectionHistory(@Nullable Exception e) {
434+
final long disconnectTime = threadPool.absoluteTimeInMillis();
435+
final NodeConnectionHistory nodeConnectionHistory = new NodeConnectionHistory(
436+
node.getEphemeralId(),
437+
disconnectTime,
438+
e
439+
);
440+
final String nodeId = node.getId();
441+
NodeConnectionHistory previousConnectionHistory = nodeHistory.get(nodeId);
442+
if (previousConnectionHistory != null) {
443+
nodeHistory.replace(nodeId, previousConnectionHistory, nodeConnectionHistory);
444+
}
445+
}
446+
});
447+
}
448+
});
449+
}
450+
451+
void reserveConnectionHistoryForNodes(DiscoveryNodes nodes) {
452+
for (DiscoveryNode node : nodes) {
453+
nodeHistory.put(node.getId(), dummy);
454+
}
455+
}
456+
457+
void removeConnectionHistoryForNodes(Set<DiscoveryNode> nodes) {
458+
final int startSize = nodeHistory.size();
459+
for (DiscoveryNode node : nodes) {
460+
nodeHistory.remove(node.getId());
461+
}
462+
logger.trace("Connection history garbage-collected from {} to {} entries", startSize, nodeHistory.size());
463+
}
464+
465+
int connectionHistorySize() {
466+
return nodeHistory.size();
467+
}
468+
}
353469
}

server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java

Lines changed: 3 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,11 @@
2727

2828
import java.util.Collections;
2929
import java.util.Iterator;
30-
import java.util.List;
3130
import java.util.Map;
3231
import java.util.Set;
3332
import java.util.concurrent.ConcurrentMap;
3433
import java.util.concurrent.CountDownLatch;
3534
import java.util.concurrent.atomic.AtomicBoolean;
36-
import java.util.stream.Collectors;
37-
38-
import static org.elasticsearch.core.Strings.format;
3935

4036
/**
4137
* This class manages node connections within a cluster. The connection is opened by the underlying transport.
@@ -51,10 +47,6 @@ public class ClusterConnectionManager implements ConnectionManager {
5147
.newConcurrentMap();
5248
private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);
5349

54-
record NodeConnectionHistory(String ephemeralId, Exception disconnectCause) {}
55-
56-
private final ConcurrentMap<String, NodeConnectionHistory> nodeHistory = ConcurrentCollections.newConcurrentMap();
57-
5850
private final Transport transport;
5951
private final ThreadContext threadContext;
6052
private final ConnectionProfile defaultProfile;
@@ -234,29 +226,6 @@ private void connectToNodeOrRetry(
234226
} else {
235227
logger.debug("connected to node [{}]", node);
236228
managerRefs.mustIncRef();
237-
238-
// log case where the remote node has same ephemeralId as its previous connection
239-
// (the network was disrupted, but not the remote process)
240-
final DiscoveryNode connNode = conn.getNode();
241-
NodeConnectionHistory hist = nodeHistory.remove(connNode.getId());
242-
if (hist != null && hist.ephemeralId.equals(connNode.getEphemeralId())) {
243-
if (hist.disconnectCause != null) {
244-
logger.warn(
245-
() -> format(
246-
"transport connection reopened to node with same ephemeralId [%s], close exception:",
247-
node.descriptionWithoutAttributes()
248-
),
249-
hist.disconnectCause
250-
);
251-
} else {
252-
logger.warn(
253-
"""
254-
transport connection reopened to node with same ephemeralId [{}]""",
255-
node.descriptionWithoutAttributes()
256-
);
257-
}
258-
}
259-
260229
try {
261230
connectionListener.onNodeConnected(node, conn);
262231
} finally {
@@ -266,20 +235,6 @@ private void connectToNodeOrRetry(
266235
managerRefs.decRef();
267236
}));
268237

269-
conn.addCloseListener(new ActionListener<Void>() {
270-
@Override
271-
public void onResponse(Void ignored) {
272-
final NodeConnectionHistory hist = new NodeConnectionHistory(node.getEphemeralId(), null);
273-
nodeHistory.put(conn.getNode().getId(), hist);
274-
}
275-
276-
@Override
277-
public void onFailure(Exception e) {
278-
final NodeConnectionHistory hist = new NodeConnectionHistory(node.getEphemeralId(), e);
279-
nodeHistory.put(conn.getNode().getId(), hist);
280-
}
281-
});
282-
283238
conn.addCloseListener(new ActionListener<Void>() {
284239
@Override
285240
public void onResponse(Void ignored) {
@@ -314,13 +269,10 @@ public void onFailure(Exception e) {
314269
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
315270
);
316271
} else {
317-
logger.warn(
318-
"""
319-
transport connection to [{}] closed with exception [{}]; \
320-
if unexpected, see [{}] for troubleshooting guidance""",
272+
logger.debug(
273+
"closing unused transport connection to [{}], exception [{}]",
321274
node.descriptionWithoutAttributes(),
322-
e,
323-
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
275+
e
324276
);
325277
}
326278
}
@@ -347,21 +299,6 @@ public void onFailure(Exception e) {
347299
);
348300
}
349301

350-
@Override
351-
public void retainConnectionHistory(List<DiscoveryNode> nodes) {
352-
List<String> nodeIds = nodes.stream().map(node -> node.getId()).collect(Collectors.toList());
353-
354-
final int startSize = nodeHistory.size();
355-
// the keyset propagates changes to the underlying map
356-
nodeHistory.keySet().retainAll(nodeIds);
357-
logger.trace("Connection history garbage-collected from {} to {} entries", startSize, nodeHistory.size());
358-
}
359-
360-
@Override
361-
public int connectionHistorySize() {
362-
return nodeHistory.size();
363-
}
364-
365302
/**
366303
* Returns a connection for the given node if the node is connected.
367304
* Connections returned from this method must not be closed. The lifecycle of this connection is

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.core.Releasable;
1616

1717
import java.io.Closeable;
18-
import java.util.List;
1918
import java.util.Set;
2019
import java.util.concurrent.CopyOnWriteArrayList;
2120

@@ -51,16 +50,6 @@ void connectToNode(
5150

5251
ConnectionProfile getConnectionProfile();
5352

54-
/**
55-
* Keep the connection history for the nodes listed
56-
*/
57-
void retainConnectionHistory(List<DiscoveryNode> nodes);
58-
59-
/**
60-
* Exposed for tests
61-
*/
62-
int connectionHistorySize();
63-
6453
@FunctionalInterface
6554
interface ConnectionValidator {
6655
void validate(Transport.Connection connection, ConnectionProfile profile, ActionListener<Void> listener);

server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -205,15 +205,6 @@ public void closeNoBlock() {
205205
delegate.closeNoBlock();
206206
}
207207

208-
@Override
209-
public void retainConnectionHistory(List<DiscoveryNode> nodes) {
210-
delegate.retainConnectionHistory(nodes);
211-
}
212-
213-
public int connectionHistorySize() {
214-
return delegate.connectionHistorySize();
215-
}
216-
217208
/**
218209
* This method returns a remote cluster alias for the given transport connection if it targets a node in the remote cluster.
219210
* This method will return an optional empty in case the connection targets the local node or the node in the local cluster.

server/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,6 @@ public boolean nodeConnected(DiscoveryNode node) {
483483
return isLocalNode(node) || connectionManager.nodeConnected(node);
484484
}
485485

486-
public void retainConnectionHistory(List<DiscoveryNode> nodes) {
487-
connectionManager.retainConnectionHistory(nodes);
488-
}
489-
490486
/**
491487
* Connect to the specified node with the given connection profile.
492488
* The ActionListener will be called on the calling thread or the generic thread pool.

0 commit comments

Comments
 (0)