diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index 9df2fa86f4f7d..c8153b8845998 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -17,15 +17,19 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -188,6 +192,7 @@ public String toString() { @Override protected void doStart() { + transportService.addConnectionListener(new ConnectionChangeListener()); final ConnectionChecker connectionChecker = new ConnectionChecker(); this.connectionChecker = connectionChecker; connectionChecker.scheduleNextCheck(); @@ -209,12 +214,33 @@ public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletio }); } + // exposed for testing + protected DisconnectionHistory disconnectionHistoryForNode(DiscoveryNode node) { + synchronized (mutex) { + ConnectionTarget connectionTarget = targetsByNode.get(node); + if (connectionTarget != null) { + return connectionTarget.disconnectionHistory; + } + } + return null; + } + + /** + * Time of disconnect in absolute time ({@link ThreadPool#absoluteTimeInMillis()}), + * and disconnect-causing exception, if any + */ + record DisconnectionHistory(long disconnectTimeMillis, @Nullable Exception disconnectCause) {} + private class ConnectionTarget { private final DiscoveryNode discoveryNode; private final AtomicInteger consecutiveFailureCount = new AtomicInteger(); private final AtomicReference connectionRef = new AtomicReference<>(); + // access is synchronized by the service mutex + @Nullable // null when node is connected or initialized; non-null in between disconnects and connects + private DisconnectionHistory disconnectionHistory = null; + // all access to these fields is synchronized private List pendingRefs; private boolean connectionInProgress; @@ -345,4 +371,70 @@ public String toString() { return "ConnectionTarget{discoveryNode=" + discoveryNode + '}'; } } + + /** + * Receives connection/disconnection events from the transport, and records them in per-node DisconnectionHistory + * structures for logging network issues. DisconnectionHistory records are stored in their node's ConnectionTarget. + * + * Network issues (that this listener monitors for) occur whenever a reconnection to a node succeeds, + * and it has the same ephemeral ID as it did during the last connection; this happens when a connection event + * occurs, and its ConnectionTarget entry has a previous DisconnectionHistory stored. + */ + private class ConnectionChangeListener implements TransportConnectionListener { + @Override + public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) { + DisconnectionHistory disconnectionHistory = null; + synchronized (mutex) { + ConnectionTarget connectionTarget = targetsByNode.get(node); + if (connectionTarget != null) { + disconnectionHistory = connectionTarget.disconnectionHistory; + connectionTarget.disconnectionHistory = null; + } + } + + if (disconnectionHistory != null) { + long millisSinceDisconnect = threadPool.absoluteTimeInMillis() - disconnectionHistory.disconnectTimeMillis; + TimeValue timeValueSinceDisconnect = TimeValue.timeValueMillis(millisSinceDisconnect); + if (disconnectionHistory.disconnectCause != null) { + logger.warn( + () -> format( + """ + reopened transport connection to node [%s] \ + which disconnected exceptionally [%s/%dms] ago but did not \ + restart, so the disconnection is unexpected; \ + see [%s] for troubleshooting guidance""", + node.descriptionWithoutAttributes(), + timeValueSinceDisconnect, + millisSinceDisconnect, + ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + ), + disconnectionHistory.disconnectCause + ); + } else { + logger.warn( + """ + reopened transport connection to node [{}] \ + which disconnected gracefully [{}/{}ms] ago but did not \ + restart, so the disconnection is unexpected; \ + see [{}] for troubleshooting guidance""", + node.descriptionWithoutAttributes(), + timeValueSinceDisconnect, + millisSinceDisconnect, + ReferenceDocs.NETWORK_DISCONNECT_TROUBLESHOOTING + ); + } + } + } + + @Override + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { + DisconnectionHistory disconnectionHistory = new DisconnectionHistory(threadPool.absoluteTimeInMillis(), closeException); + synchronized (mutex) { + ConnectionTarget connectionTarget = targetsByNode.get(node); + if (connectionTarget != null) { + connectionTarget.disconnectionHistory = disconnectionHistory; + } + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index f0edd4aeba126..0ad96cdb1f9d9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.cluster.NodeConnectionsService.DisconnectionHistory; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -49,6 +50,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportStats; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; @@ -75,6 +77,7 @@ public class NodeConnectionsServiceTests extends ESTestCase { private ThreadPool threadPool; private TransportService transportService; private Map> nodeConnectionBlocks; + private Map nodeCloseExceptions; private List generateNodes() { List nodes = new ArrayList<>(); @@ -246,6 +249,110 @@ public String toString() { assertConnectedExactlyToNodes(transportService, targetNodes); } + public void testDisconnectionHistory() { + final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(); + final ThreadPool threadPool = deterministicTaskQueue.getThreadPool(); + final TimeValue reconnectIntervalTimeValue = CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(Settings.EMPTY); + final long reconnectIntervalMillis = reconnectIntervalTimeValue.millis(); + + MockTransport transport = new MockTransport(threadPool); + TestTransportService transportService = new TestTransportService(transport, threadPool); + transportService.start(); + transportService.acceptIncomingRequests(); + + final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); + service.start(); + + final DiscoveryNode noClose = DiscoveryNodeUtils.create("noClose"); + final DiscoveryNode gracefulClose = DiscoveryNodeUtils.create("gracefulClose"); + final DiscoveryNode exceptionalClose = DiscoveryNodeUtils.create("exceptionalClose"); + + nodeCloseExceptions.put(exceptionalClose, new RuntimeException()); + + final AtomicBoolean connectionCompleted = new AtomicBoolean(); + DiscoveryNodes nodes = DiscoveryNodes.builder().add(noClose).add(gracefulClose).add(exceptionalClose).build(); + + service.connectToNodes(nodes, () -> connectionCompleted.set(true)); + deterministicTaskQueue.runAllRunnableTasks(); + assertTrue(connectionCompleted.get()); + + assertNullDisconnectionHistory(service, noClose); + assertNullDisconnectionHistory(service, gracefulClose); + assertNullDisconnectionHistory(service, exceptionalClose); + + transportService.disconnectFromNode(gracefulClose); + transportService.disconnectFromNode(exceptionalClose); + + // check disconnection history set after close + assertNullDisconnectionHistory(service, noClose); + assertDisconnectionHistoryDetails(service, threadPool, gracefulClose, null); + assertDisconnectionHistoryDetails(service, threadPool, exceptionalClose, RuntimeException.class); + + try (var mockLog = MockLog.capture(NodeConnectionsService.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "reconnect after graceful close", + NodeConnectionsService.class.getCanonicalName(), + Level.WARN, + "reopened transport connection to node [" + + gracefulClose.descriptionWithoutAttributes() + + "] which disconnected gracefully [" + + reconnectIntervalTimeValue + + "/" + + reconnectIntervalMillis + + "ms] ago " + + "but did not restart, so the disconnection is unexpected; " + + "see [https://www.elastic.co/docs/*] for troubleshooting guidance" + ) + ); + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "reconnect after exceptional close", + NodeConnectionsService.class.getCanonicalName(), + Level.WARN, + "reopened transport connection to node [" + + exceptionalClose.descriptionWithoutAttributes() + + "] which disconnected exceptionally [" + + reconnectIntervalTimeValue + + "/" + + reconnectIntervalMillis + + "ms] ago " + + "but did not restart, so the disconnection is unexpected; " + + "see [https://www.elastic.co/docs/*] for troubleshooting guidance" + ) + ); + runTasksUntil(deterministicTaskQueue, deterministicTaskQueue.getCurrentTimeMillis() + reconnectIntervalMillis); + mockLog.assertAllExpectationsMatched(); + } + + // check on reconnect -- disconnection history is reset + assertNullDisconnectionHistory(service, noClose); + assertNullDisconnectionHistory(service, gracefulClose); + assertNullDisconnectionHistory(service, exceptionalClose); + } + + private void assertNullDisconnectionHistory(NodeConnectionsService service, DiscoveryNode node) { + DisconnectionHistory disconnectionHistory = service.disconnectionHistoryForNode(node); + assertNull(disconnectionHistory); + } + + private void assertDisconnectionHistoryDetails( + NodeConnectionsService service, + ThreadPool threadPool, + DiscoveryNode node, + @Nullable Class disconnectCauseClass + ) { + DisconnectionHistory disconnectionHistory = service.disconnectionHistoryForNode(node); + assertNotNull(disconnectionHistory); + assertTrue(threadPool.absoluteTimeInMillis() - disconnectionHistory.disconnectTimeMillis() >= 0); + assertTrue(threadPool.absoluteTimeInMillis() - disconnectionHistory.disconnectTimeMillis() <= 200); + if (disconnectCauseClass != null) { + assertThat(disconnectionHistory.disconnectCause(), Matchers.isA(disconnectCauseClass)); + } else { + assertNull(disconnectionHistory.disconnectCause()); + } + } + public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); @@ -526,6 +633,7 @@ public void setUp() throws Exception { ThreadPool threadPool = new TestThreadPool(getClass().getName()); this.threadPool = threadPool; nodeConnectionBlocks = newConcurrentMap(); + nodeCloseExceptions = newConcurrentMap(); transportService = new TestTransportService(new MockTransport(threadPool), threadPool); transportService.start(); transportService.acceptIncomingRequests(); @@ -644,7 +752,12 @@ public void addCloseListener(ActionListener listener1) { @Override public void close() { - closeListener.onResponse(null); + Exception closeException = nodeCloseExceptions.get(node); + if (closeException != null) { + closeListener.onFailure(closeException); + } else { + closeListener.onResponse(null); + } } @Override