Skip to content

Commit 0cc8084

Browse files
committed
Addressed 2nd round review feedback:
- consolidated ConnectionHistory into ConnectionTarget, protected with the service mutex - added logging test for reconnection with and without exception - grew TransportConnectionListener onNodeDisconnected to include a nullable exception - reverted ClusterConnectionManager tests and logging
1 parent 11347ca commit 0cc8084

File tree

14 files changed

+186
-258
lines changed

14 files changed

+186
-258
lines changed

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/transport/netty4/ESLoggingHandlerIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.test.ESIntegTestCase;
1616
import org.elasticsearch.test.MockLog;
1717
import org.elasticsearch.test.junit.annotations.TestLogging;
18-
import org.elasticsearch.transport.ClusterConnectionManager;
1918
import org.elasticsearch.transport.TcpTransport;
2019
import org.elasticsearch.transport.TransportLogger;
2120

@@ -28,7 +27,7 @@ public class ESLoggingHandlerIT extends ESNetty4IntegTestCase {
2827

2928
public void setUp() throws Exception {
3029
super.setUp();
31-
mockLog = MockLog.capture(ESLoggingHandler.class, TransportLogger.class, TcpTransport.class, ClusterConnectionManager.class);
30+
mockLog = MockLog.capture(ESLoggingHandler.class, TransportLogger.class, TcpTransport.class);
3231
}
3332

3433
public void tearDown() throws Exception {

server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java

Lines changed: 62 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.common.settings.Setting;
2323
import org.elasticsearch.common.settings.Settings;
2424
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
25-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2625
import org.elasticsearch.core.Nullable;
2726
import org.elasticsearch.core.Releasable;
2827
import org.elasticsearch.core.Releasables;
@@ -40,7 +39,6 @@
4039
import java.util.List;
4140
import java.util.Map;
4241
import java.util.Set;
43-
import java.util.concurrent.ConcurrentMap;
4442
import java.util.concurrent.atomic.AtomicInteger;
4543
import java.util.concurrent.atomic.AtomicReference;
4644

@@ -85,14 +83,14 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
8583

8684
private final TimeValue reconnectInterval;
8785
private volatile ConnectionChecker connectionChecker;
88-
private final ConnectionHistory connectionHistory;
86+
private final ConnectionHistoryListener connectionHistoryListener;
8987

9088
@Inject
9189
public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
9290
this.threadPool = threadPool;
9391
this.transportService = transportService;
9492
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
95-
this.connectionHistory = new ConnectionHistory();
93+
this.connectionHistoryListener = new ConnectionHistoryListener();
9694
}
9795

9896
/**
@@ -109,7 +107,6 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion)
109107
final List<Runnable> runnables = new ArrayList<>(discoveryNodes.getSize());
110108
try (var refs = new RefCountingRunnable(onCompletion)) {
111109
synchronized (mutex) {
112-
connectionHistory.reserveConnectionHistoryForNodes(DiscoveryNodes);
113110
// Ugly hack: when https://github.com/elastic/elasticsearch/issues/94946 is fixed, just iterate over discoveryNodes here
114111
for (final Iterator<DiscoveryNode> iterator = discoveryNodes.mastersFirstStream().iterator(); iterator.hasNext();) {
115112
final DiscoveryNode discoveryNode = iterator.next();
@@ -146,7 +143,6 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) {
146143
nodesToDisconnect.remove(discoveryNode);
147144
}
148145

149-
connectionHistory.removeConnectionHistoryForNodes(nodesToDisconnect);
150146
for (final DiscoveryNode discoveryNode : nodesToDisconnect) {
151147
runnables.add(targetsByNode.remove(discoveryNode)::disconnect);
152148
}
@@ -219,12 +215,17 @@ public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletio
219215
});
220216
}
221217

218+
record ConnectionHistory(String ephemeralId, long disconnectTime, Exception disconnectCause) {}
219+
222220
private class ConnectionTarget {
223221
private final DiscoveryNode discoveryNode;
224222

225223
private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
226224
private final AtomicReference<Releasable> connectionRef = new AtomicReference<>();
227225

226+
// access is synchronized by the service mutex
227+
protected ConnectionHistory connectionHistory = null;
228+
228229
// all access to these fields is synchronized
229230
private List<Releasable> pendingRefs;
230231
private boolean connectionInProgress;
@@ -358,112 +359,72 @@ public String toString() {
358359
}
359360
}
360361

361-
private class ConnectionHistory {
362-
record NodeConnectionHistory(String ephemeralId, long disconnectTime, Exception disconnectCause) {}
363-
362+
private class ConnectionHistoryListener implements TransportConnectionListener {
364363
/**
365-
* Holds the DiscoveryNode nodeId to connection history record.
364+
* Receives connection/disconnection events from the transport, and records in per-node ConnectionHistory
365+
* structures for logging network issues. ConnectionHistory records are stored in ConnectionTargets.
366366
*
367-
* Entries for each node are reserved during NodeConnectionsService.connectToNodes, by placing a (nodeId, dummy) entry
368-
* for each node in the cluster. On node disconnect, this entry is updated with its NodeConnectionHistory. On node
369-
* connect, this entry is reset to the dummy value. On NodeConnectionsService.disconnectFromNodesExcept, node entries
370-
* are removed.
371-
*
372-
* Each node in the cluster always has a nodeHistory entry that is either the dummy value or a connection history record. This
373-
* allows node disconnect callbacks to discard their entry if the disconnect occurred because of a change in cluster state.
367+
* Network issues (that this listener monitors for) occur whenever a reconnection to a node succeeds,
368+
* and it has the same ephemeral Id as it did during the last connection.
374369
*/
375-
private final NodeConnectionHistory dummy = new NodeConnectionHistory("", 0, null);
376-
private final ConcurrentMap<String, NodeConnectionHistory> nodeHistory = ConcurrentCollections.newConcurrentMap();
377-
378-
ConnectionHistory() {
379-
NodeConnectionsService.this.transportService.addConnectionListener(new TransportConnectionListener() {
380-
@Override
381-
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
382-
// log case where the remote node has same ephemeralId as its previous connection
383-
// (the network was disrupted, but not the remote process)
384-
NodeConnectionHistory nodeConnectionHistory = nodeHistory.get(node.getId());
385-
if (nodeConnectionHistory != null) {
386-
nodeHistory.replace(node.getId(), nodeConnectionHistory, dummy);
387-
}
370+
ConnectionHistoryListener() {
371+
transportService.addConnectionListener(ConnectionHistoryListener.this);
372+
}
388373

389-
if (nodeConnectionHistory != null
390-
&& nodeConnectionHistory != dummy
391-
&& nodeConnectionHistory.ephemeralId.equals(node.getEphemeralId())) {
392-
if (nodeConnectionHistory.disconnectCause != null) {
393-
logger.warn(
394-
() -> format(
395-
"reopened transport connection to node [%s] "
396-
+ "which disconnected exceptionally [%dms] ago but did not "
397-
+ "restart, so the disconnection is unexpected; "
398-
+ "if unexpected, see [{}] for troubleshooting guidance",
399-
node.descriptionWithoutAttributes(),
400-
nodeConnectionHistory.disconnectTime,
401-
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
402-
),
403-
nodeConnectionHistory.disconnectCause
404-
);
405-
} else {
406-
logger.warn(
407-
"""
408-
reopened transport connection to node [{}] \
409-
which disconnected gracefully [{}ms] ago but did not \
410-
restart, so the disconnection is unexpected; \
411-
if unexpected, see [{}] for troubleshooting guidance""",
412-
node.descriptionWithoutAttributes(),
413-
nodeConnectionHistory.disconnectTime,
414-
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
415-
);
416-
}
417-
}
374+
@Override
375+
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
376+
ConnectionHistory connectionHistory = null;
377+
synchronized (mutex) {
378+
ConnectionTarget connectionTarget = targetsByNode.get(node);
379+
if (connectionTarget != null) {
380+
connectionHistory = connectionTarget.connectionHistory;
381+
connectionTarget.connectionHistory = null;
418382
}
383+
}
419384

420-
@Override
421-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
422-
connection.addCloseListener(new ActionListener<Void>() {
423-
@Override
424-
public void onResponse(Void ignored) {
425-
insertNodeConnectionHistory(null);
426-
}
427-
428-
@Override
429-
public void onFailure(Exception e) {
430-
insertNodeConnectionHistory(e);
431-
}
432-
433-
private void insertNodeConnectionHistory(@Nullable Exception e) {
434-
final long disconnectTime = threadPool.absoluteTimeInMillis();
435-
final NodeConnectionHistory nodeConnectionHistory = new NodeConnectionHistory(
436-
node.getEphemeralId(),
437-
disconnectTime,
438-
e
439-
);
440-
final String nodeId = node.getId();
441-
NodeConnectionHistory previousConnectionHistory = nodeHistory.get(nodeId);
442-
if (previousConnectionHistory != null) {
443-
nodeHistory.replace(nodeId, previousConnectionHistory, nodeConnectionHistory);
444-
}
445-
}
446-
});
385+
if (connectionHistory != null && connectionHistory.ephemeralId.equals(connection.getNode().getEphemeralId())) {
386+
long millisSinceDisconnect = threadPool.absoluteTimeInMillis() - connectionHistory.disconnectTime;
387+
if (connectionHistory.disconnectCause != null) {
388+
logger.warn(
389+
() -> format(
390+
"reopened transport connection to node [%s] "
391+
+ "which disconnected exceptionally [%dms] ago but did not "
392+
+ "restart, so the disconnection is unexpected; "
393+
+ "see [%s] for troubleshooting guidance",
394+
connection.getNode().descriptionWithoutAttributes(),
395+
millisSinceDisconnect,
396+
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
397+
),
398+
connectionHistory.disconnectCause
399+
);
400+
} else {
401+
logger.warn(
402+
"""
403+
reopened transport connection to node [{}] \
404+
which disconnected gracefully [{}ms] ago but did not \
405+
restart, so the disconnection is unexpected; \
406+
see [{}] for troubleshooting guidance""",
407+
connection.getNode().descriptionWithoutAttributes(),
408+
millisSinceDisconnect,
409+
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
410+
);
447411
}
448-
});
449-
}
450-
451-
void reserveConnectionHistoryForNodes(DiscoveryNodes nodes) {
452-
for (DiscoveryNode node : nodes) {
453-
nodeHistory.put(node.getId(), dummy);
454412
}
455413
}
456414

457-
void removeConnectionHistoryForNodes(Set<DiscoveryNode> nodes) {
458-
final int startSize = nodeHistory.size();
459-
for (DiscoveryNode node : nodes) {
460-
nodeHistory.remove(node.getId());
415+
@Override
416+
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
417+
ConnectionHistory connectionHistory = new ConnectionHistory(
418+
connection.getNode().getEphemeralId(),
419+
threadPool.absoluteTimeInMillis(),
420+
closeException
421+
);
422+
synchronized (mutex) {
423+
ConnectionTarget connectionTarget = targetsByNode.get(node);
424+
if (connectionTarget != null) {
425+
connectionTarget.connectionHistory = connectionHistory;
426+
}
461427
}
462-
logger.trace("Connection history garbage-collected from {} to {} entries", startSize, nodeHistory.size());
463-
}
464-
465-
int connectionHistorySize() {
466-
return nodeHistory.size();
467428
}
468429
}
469430
}

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.util.concurrent.EsExecutors;
2727
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2828
import org.elasticsearch.core.CheckedRunnable;
29+
import org.elasticsearch.core.Nullable;
2930
import org.elasticsearch.core.TimeValue;
3031
import org.elasticsearch.monitor.NodeHealthService;
3132
import org.elasticsearch.monitor.StatusInfo;
@@ -137,7 +138,7 @@ public FollowersChecker(
137138
);
138139
transportService.addConnectionListener(new TransportConnectionListener() {
139140
@Override
140-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
141+
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
141142
handleDisconnectedNode(node);
142143
}
143144
});

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public class LeaderChecker {
124124

125125
transportService.addConnectionListener(new TransportConnectionListener() {
126126
@Override
127-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {
127+
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
128128
handleDisconnectedNode(node);
129129
}
130130
});

0 commit comments

Comments
 (0)