Skip to content

Commit ab7f490

Browse files
committed
Addressed next round of feedback:
- updated ConnectionChangeListener constructor and moved registration to service.doStart() - renamed DisconnectionHistory record and ConnectionChangeListener - fixed up DiscoveryNode vs. Transport.Connection.getNode() confusion (these are the same) - fixed log formatting - edited TransportConnectionListener interface to take a nullable closeException instead of the Transport.Connection - completed test of DisconnectionHistory, at init, post-connection, post-disconnection, and post-reconnection - moved docs for ConnectionChangeListener, and added docs for DisconnectionHistory
1 parent 0cc8084 commit ab7f490

File tree

12 files changed

+111
-65
lines changed

12 files changed

+111
-65
lines changed

server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java

Lines changed: 49 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,12 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
8383

8484
private final TimeValue reconnectInterval;
8585
private volatile ConnectionChecker connectionChecker;
86-
private final ConnectionHistoryListener connectionHistoryListener;
8786

8887
@Inject
8988
public NodeConnectionsService(Settings settings, ThreadPool threadPool, TransportService transportService) {
9089
this.threadPool = threadPool;
9190
this.transportService = transportService;
9291
this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
93-
this.connectionHistoryListener = new ConnectionHistoryListener();
9492
}
9593

9694
/**
@@ -194,6 +192,7 @@ public String toString() {
194192

195193
@Override
196194
protected void doStart() {
195+
transportService.addConnectionListener(new ConnectionChangeListener());
197196
final ConnectionChecker connectionChecker = new ConnectionChecker();
198197
this.connectionChecker = connectionChecker;
199198
connectionChecker.scheduleNextCheck();
@@ -215,16 +214,35 @@ public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletio
215214
});
216215
}
217216

218-
record ConnectionHistory(String ephemeralId, long disconnectTime, Exception disconnectCause) {}
217+
// exposed for testing
218+
protected ConnectionTarget connectionTargetForNode(DiscoveryNode node) {
219+
synchronized (mutex) {
220+
return targetsByNode.get(node);
221+
}
222+
}
223+
224+
/**
225+
* Time of disconnect in absolute time ({@link ThreadPool#absoluteTimeInMillis()}),
226+
* and disconnect-causing exception, if any
227+
*/
228+
record DisconnectionHistory(long disconnectTimeMillis, @Nullable Exception disconnectCause) {
229+
public long getDisconnectTimeMillis() {
230+
return disconnectTimeMillis;
231+
}
219232

220-
private class ConnectionTarget {
233+
public Exception getDisconnectCause() {
234+
return disconnectCause;
235+
}
236+
}
237+
238+
protected class ConnectionTarget {
221239
private final DiscoveryNode discoveryNode;
222240

223241
private final AtomicInteger consecutiveFailureCount = new AtomicInteger();
224242
private final AtomicReference<Releasable> connectionRef = new AtomicReference<>();
225243

226244
// access is synchronized by the service mutex
227-
protected ConnectionHistory connectionHistory = null;
245+
protected DisconnectionHistory disconnectionHistory = null;
228246

229247
// all access to these fields is synchronized
230248
private List<Releasable> pendingRefs;
@@ -359,43 +377,41 @@ public String toString() {
359377
}
360378
}
361379

362-
private class ConnectionHistoryListener implements TransportConnectionListener {
363-
/**
364-
* Receives connection/disconnection events from the transport, and records in per-node ConnectionHistory
365-
* structures for logging network issues. ConnectionHistory records are stored in ConnectionTargets.
366-
*
367-
* Network issues (that this listener monitors for) occur whenever a reconnection to a node succeeds,
368-
* and it has the same ephemeral Id as it did during the last connection.
369-
*/
370-
ConnectionHistoryListener() {
371-
transportService.addConnectionListener(ConnectionHistoryListener.this);
372-
}
373-
380+
/**
381+
* Receives connection/disconnection events from the transport, and records them in per-node DisconnectionHistory
382+
* structures for logging network issues. DisconnectionHistory records are stored their node's ConnectionTarget.
383+
*
384+
* Network issues (that this listener monitors for) occur whenever a reconnection to a node succeeds,
385+
* and it has the same ephemeral ID as it did during the last connection; this happens when a connection event
386+
* occurs, and its ConnectionTarget entry has a previous DisconnectionHistory stored.
387+
*/
388+
private class ConnectionChangeListener implements TransportConnectionListener {
374389
@Override
375390
public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) {
376-
ConnectionHistory connectionHistory = null;
391+
DisconnectionHistory disconnectionHistory = null;
377392
synchronized (mutex) {
378393
ConnectionTarget connectionTarget = targetsByNode.get(node);
379394
if (connectionTarget != null) {
380-
connectionHistory = connectionTarget.connectionHistory;
381-
connectionTarget.connectionHistory = null;
395+
disconnectionHistory = connectionTarget.disconnectionHistory;
396+
connectionTarget.disconnectionHistory = null;
382397
}
383398
}
384399

385-
if (connectionHistory != null && connectionHistory.ephemeralId.equals(connection.getNode().getEphemeralId())) {
386-
long millisSinceDisconnect = threadPool.absoluteTimeInMillis() - connectionHistory.disconnectTime;
387-
if (connectionHistory.disconnectCause != null) {
400+
if (disconnectionHistory != null) {
401+
long millisSinceDisconnect = threadPool.absoluteTimeInMillis() - disconnectionHistory.disconnectTimeMillis;
402+
if (disconnectionHistory.disconnectCause != null) {
388403
logger.warn(
389404
() -> format(
390-
"reopened transport connection to node [%s] "
391-
+ "which disconnected exceptionally [%dms] ago but did not "
392-
+ "restart, so the disconnection is unexpected; "
393-
+ "see [%s] for troubleshooting guidance",
394-
connection.getNode().descriptionWithoutAttributes(),
405+
"""
406+
reopened transport connection to node [%s] \
407+
which disconnected exceptionally [%dms] ago but did not \
408+
restart, so the disconnection is unexpected; \
409+
see [%s] for troubleshooting guidance""",
410+
node.descriptionWithoutAttributes(),
395411
millisSinceDisconnect,
396412
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
397413
),
398-
connectionHistory.disconnectCause
414+
disconnectionHistory.disconnectCause
399415
);
400416
} else {
401417
logger.warn(
@@ -404,7 +420,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
404420
which disconnected gracefully [{}ms] ago but did not \
405421
restart, so the disconnection is unexpected; \
406422
see [{}] for troubleshooting guidance""",
407-
connection.getNode().descriptionWithoutAttributes(),
423+
node.descriptionWithoutAttributes(),
408424
millisSinceDisconnect,
409425
ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING
410426
);
@@ -413,16 +429,12 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
413429
}
414430

415431
@Override
416-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
417-
ConnectionHistory connectionHistory = new ConnectionHistory(
418-
connection.getNode().getEphemeralId(),
419-
threadPool.absoluteTimeInMillis(),
420-
closeException
421-
);
432+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
433+
DisconnectionHistory disconnectionHistory = new DisconnectionHistory(threadPool.absoluteTimeInMillis(), closeException);
422434
synchronized (mutex) {
423435
ConnectionTarget connectionTarget = targetsByNode.get(node);
424436
if (connectionTarget != null) {
425-
connectionTarget.connectionHistory = connectionHistory;
437+
connectionTarget.disconnectionHistory = disconnectionHistory;
426438
}
427439
}
428440
}

server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.transport.AbstractTransportRequest;
3535
import org.elasticsearch.transport.ConnectTransportException;
3636
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
37-
import org.elasticsearch.transport.Transport;
3837
import org.elasticsearch.transport.TransportConnectionListener;
3938
import org.elasticsearch.transport.TransportException;
4039
import org.elasticsearch.transport.TransportRequestOptions;
@@ -138,7 +137,7 @@ public FollowersChecker(
138137
);
139138
transportService.addConnectionListener(new TransportConnectionListener() {
140139
@Override
141-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
140+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
142141
handleDisconnectedNode(node);
143142
}
144143
});

server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.transport.ConnectTransportException;
3333
import org.elasticsearch.transport.NodeDisconnectedException;
3434
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
35-
import org.elasticsearch.transport.Transport;
3635
import org.elasticsearch.transport.TransportConnectionListener;
3736
import org.elasticsearch.transport.TransportException;
3837
import org.elasticsearch.transport.TransportRequestOptions;
@@ -124,7 +123,7 @@ public class LeaderChecker {
124123

125124
transportService.addConnectionListener(new TransportConnectionListener() {
126125
@Override
127-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
126+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
128127
handleDisconnectedNode(node);
129128
}
130129
});

server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ public void onFailure(Exception e) {
242242

243243
void handleClose(@Nullable Exception e) {
244244
connectedNodes.remove(node, conn);
245-
connectionListener.onNodeDisconnected(node, conn, e);
245+
connectionListener.onNodeDisconnected(node, e);
246246
managerRefs.decRef();
247247
}
248248
});

server/src/main/java/org/elasticsearch/transport/ConnectionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,9 @@ final class DelegatingNodeConnectionListener implements TransportConnectionListe
6060
private final CopyOnWriteArrayList<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
6161

6262
@Override
63-
public void onNodeDisconnected(DiscoveryNode key, Transport.Connection connection, @Nullable Exception closeException) {
63+
public void onNodeDisconnected(DiscoveryNode key, @Nullable Exception closeException) {
6464
for (TransportConnectionListener listener : listeners) {
65-
listener.onNodeDisconnected(key, connection, closeException);
65+
listener.onNodeDisconnected(key, closeException);
6666
}
6767
}
6868

server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection)
6161
}
6262

6363
@Override
64-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
64+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
6565
removeConnectedNode(node);
6666
}
6767
});

server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ boolean shouldRebuildConnection(Settings newSettings) {
340340
protected abstract ConnectionStrategy strategyType();
341341

342342
@Override
343-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
343+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
344344
if (shouldOpenMoreConnections()) {
345345
// try to reconnect and fill up the slot of the disconnected node
346346
connect(

server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,5 @@ default void onNodeConnected(DiscoveryNode node, Transport.Connection connection
3939
/**
4040
* Called once a node connection is closed and unregistered.
4141
*/
42-
default void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {}
42+
default void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {}
4343
}

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -832,7 +832,7 @@ public void testCCSRemoteReduceWithDisconnectedRemoteClusters() throws Exception
832832
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
833833
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
834834
@Override
835-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
835+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
836836
if (disconnectedNodes.remove(node)) {
837837
disconnectedLatch.countDown();
838838
}
@@ -1135,7 +1135,7 @@ public void testCollectSearchShards() throws Exception {
11351135
CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters);
11361136
RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() {
11371137
@Override
1138-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
1138+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
11391139
if (disconnectedNodes.remove(node)) {
11401140
disconnectedLatch.countDown();
11411141
}

server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.ActionRunnable;
1818
import org.elasticsearch.action.support.PlainActionFuture;
1919
import org.elasticsearch.action.support.SubscribableListener;
20+
import org.elasticsearch.cluster.NodeConnectionsService.ConnectionTarget;
2021
import org.elasticsearch.cluster.node.DiscoveryNode;
2122
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
2223
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -49,6 +50,7 @@
4950
import org.elasticsearch.transport.TransportRequestOptions;
5051
import org.elasticsearch.transport.TransportService;
5152
import org.elasticsearch.transport.TransportStats;
53+
import org.hamcrest.Matchers;
5254
import org.junit.After;
5355
import org.junit.Before;
5456

@@ -247,36 +249,38 @@ public String toString() {
247249
assertConnectedExactlyToNodes(transportService, targetNodes);
248250
}
249251

250-
public void testConnectionHistoryLogging() {
252+
public void testDisconnectionHistory() {
251253
final Settings.Builder settings = Settings.builder();
252-
settings.put(CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "10ms");
254+
settings.put(CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.getKey(), "100ms");
253255

254256
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue();
257+
final ThreadPool threadPool = deterministicTaskQueue.getThreadPool();
255258

256259
MockTransport transport = new MockTransport(deterministicTaskQueue.getThreadPool());
257260
TestTransportService transportService = new TestTransportService(transport, deterministicTaskQueue.getThreadPool());
258261
transportService.start();
259262
transportService.acceptIncomingRequests();
260263

261-
final NodeConnectionsService service = new NodeConnectionsService(
262-
settings.build(),
263-
deterministicTaskQueue.getThreadPool(),
264-
transportService
265-
);
264+
final NodeConnectionsService service = new NodeConnectionsService(settings.build(), threadPool, transportService);
266265
service.start();
267266

267+
final DiscoveryNode noClose = DiscoveryNodeUtils.create("noClose");
268268
final DiscoveryNode gracefulClose = DiscoveryNodeUtils.create("gracefulClose");
269269
final DiscoveryNode exceptionalClose = DiscoveryNodeUtils.create("exceptionalClose");
270270

271271
nodeCloseExceptions.put(exceptionalClose, new RuntimeException());
272272

273273
final AtomicBoolean connectionCompleted = new AtomicBoolean();
274-
DiscoveryNodes nodes = DiscoveryNodes.builder().add(gracefulClose).add(exceptionalClose).build();
274+
DiscoveryNodes nodes = DiscoveryNodes.builder().add(noClose).add(gracefulClose).add(exceptionalClose).build();
275275

276276
service.connectToNodes(nodes, () -> connectionCompleted.set(true));
277277
deterministicTaskQueue.runAllRunnableTasks();
278278
assertTrue(connectionCompleted.get());
279279

280+
assertNullDisconnectionHistory(service, noClose);
281+
assertNullDisconnectionHistory(service, gracefulClose);
282+
assertNullDisconnectionHistory(service, exceptionalClose);
283+
280284
try (var mockLog = MockLog.capture(NodeConnectionsService.class)) {
281285
mockLog.addExpectation(
282286
new MockLog.SeenEventExpectation(
@@ -305,19 +309,51 @@ public void testConnectionHistoryLogging() {
305309
transportService.disconnectFromNode(gracefulClose);
306310
transportService.disconnectFromNode(exceptionalClose);
307311

308-
runTasksUntil(deterministicTaskQueue, 1000);
312+
// check disconnection history set after close
313+
assertNullDisconnectionHistory(service, noClose);
314+
assertDisconnectionHistoryDetails(service, threadPool, gracefulClose, null);
315+
assertDisconnectionHistoryDetails(service, threadPool, exceptionalClose, RuntimeException.class);
316+
317+
runTasksUntil(deterministicTaskQueue, 200);
318+
319+
// check on reconnect -- disconnection history is reset
320+
assertNullDisconnectionHistory(service, noClose);
321+
assertNullDisconnectionHistory(service, gracefulClose);
322+
assertNullDisconnectionHistory(service, exceptionalClose);
309323

310324
mockLog.assertAllExpectationsMatched();
311325
}
312326
}
313327

328+
private void assertNullDisconnectionHistory(NodeConnectionsService service, DiscoveryNode node) {
329+
ConnectionTarget nodeTarget = service.connectionTargetForNode(node);
330+
assertNull(nodeTarget.disconnectionHistory);
331+
}
332+
333+
private void assertDisconnectionHistoryDetails(
334+
NodeConnectionsService service,
335+
ThreadPool threadPool,
336+
DiscoveryNode node,
337+
@Nullable Class<?> disconnectCauseClass
338+
) {
339+
ConnectionTarget nodeTarget = service.connectionTargetForNode(node);
340+
assertNotNull(nodeTarget.disconnectionHistory);
341+
assertTrue(threadPool.absoluteTimeInMillis() - nodeTarget.disconnectionHistory.getDisconnectTimeMillis() >= 0);
342+
assertTrue(threadPool.absoluteTimeInMillis() - nodeTarget.disconnectionHistory.getDisconnectTimeMillis() <= 200);
343+
if (disconnectCauseClass != null) {
344+
assertThat(nodeTarget.disconnectionHistory.getDisconnectCause(), Matchers.isA(disconnectCauseClass));
345+
} else {
346+
assertNull(nodeTarget.disconnectionHistory.getDisconnectCause());
347+
}
348+
}
349+
314350
public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception {
315351
final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService);
316352

317353
final AtomicReference<ActionListener<DiscoveryNode>> disconnectListenerRef = new AtomicReference<>();
318354
transportService.addConnectionListener(new TransportConnectionListener() {
319355
@Override
320-
public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection, @Nullable Exception closeException) {
356+
public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {
321357
final ActionListener<DiscoveryNode> disconnectListener = disconnectListenerRef.getAndSet(null);
322358
if (disconnectListener != null) {
323359
disconnectListener.onResponse(node);

0 commit comments

Comments
 (0)