1717import org .elasticsearch .action .ActionRunnable ;
1818import org .elasticsearch .action .support .PlainActionFuture ;
1919import org .elasticsearch .action .support .SubscribableListener ;
20+ import org .elasticsearch .cluster .NodeConnectionsService .DisconnectionHistory ;
2021import org .elasticsearch .cluster .node .DiscoveryNode ;
2122import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
2223import org .elasticsearch .cluster .node .DiscoveryNodeUtils ;
4950import org .elasticsearch .transport .TransportRequestOptions ;
5051import org .elasticsearch .transport .TransportService ;
5152import org .elasticsearch .transport .TransportStats ;
53+ import org .hamcrest .Matchers ;
5254import org .junit .After ;
5355import org .junit .Before ;
5456
@@ -75,6 +77,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
7577 private ThreadPool threadPool ;
7678 private TransportService transportService ;
7779 private Map <DiscoveryNode , CheckedRunnable <Exception >> nodeConnectionBlocks ;
80+ private Map <DiscoveryNode , Exception > nodeCloseExceptions ;
7881
7982 private List <DiscoveryNode > generateNodes () {
8083 List <DiscoveryNode > nodes = new ArrayList <>();
@@ -246,6 +249,110 @@ public String toString() {
246249 assertConnectedExactlyToNodes (transportService , targetNodes );
247250 }
248251
252+ public void testDisconnectionHistory () {
253+ final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue ();
254+ final ThreadPool threadPool = deterministicTaskQueue .getThreadPool ();
255+ final TimeValue reconnectIntervalTimeValue = CLUSTER_NODE_RECONNECT_INTERVAL_SETTING .get (Settings .EMPTY );
256+ final long reconnectIntervalMillis = reconnectIntervalTimeValue .millis ();
257+
258+ MockTransport transport = new MockTransport (threadPool );
259+ TestTransportService transportService = new TestTransportService (transport , threadPool );
260+ transportService .start ();
261+ transportService .acceptIncomingRequests ();
262+
263+ final NodeConnectionsService service = new NodeConnectionsService (Settings .EMPTY , threadPool , transportService );
264+ service .start ();
265+
266+ final DiscoveryNode noClose = DiscoveryNodeUtils .create ("noClose" );
267+ final DiscoveryNode gracefulClose = DiscoveryNodeUtils .create ("gracefulClose" );
268+ final DiscoveryNode exceptionalClose = DiscoveryNodeUtils .create ("exceptionalClose" );
269+
270+ nodeCloseExceptions .put (exceptionalClose , new RuntimeException ());
271+
272+ final AtomicBoolean connectionCompleted = new AtomicBoolean ();
273+ DiscoveryNodes nodes = DiscoveryNodes .builder ().add (noClose ).add (gracefulClose ).add (exceptionalClose ).build ();
274+
275+ service .connectToNodes (nodes , () -> connectionCompleted .set (true ));
276+ deterministicTaskQueue .runAllRunnableTasks ();
277+ assertTrue (connectionCompleted .get ());
278+
279+ assertNullDisconnectionHistory (service , noClose );
280+ assertNullDisconnectionHistory (service , gracefulClose );
281+ assertNullDisconnectionHistory (service , exceptionalClose );
282+
283+ transportService .disconnectFromNode (gracefulClose );
284+ transportService .disconnectFromNode (exceptionalClose );
285+
286+ // check disconnection history set after close
287+ assertNullDisconnectionHistory (service , noClose );
288+ assertDisconnectionHistoryDetails (service , threadPool , gracefulClose , null );
289+ assertDisconnectionHistoryDetails (service , threadPool , exceptionalClose , RuntimeException .class );
290+
291+ try (var mockLog = MockLog .capture (NodeConnectionsService .class )) {
292+ mockLog .addExpectation (
293+ new MockLog .SeenEventExpectation (
294+ "reconnect after graceful close" ,
295+ NodeConnectionsService .class .getCanonicalName (),
296+ Level .WARN ,
297+ "reopened transport connection to node ["
298+ + gracefulClose .descriptionWithoutAttributes ()
299+ + "] which disconnected gracefully ["
300+ + reconnectIntervalTimeValue
301+ + "/"
302+ + reconnectIntervalMillis
303+ + "ms] ago "
304+ + "but did not restart, so the disconnection is unexpected; "
305+ + "see [https://www.elastic.co/docs/*] for troubleshooting guidance"
306+ )
307+ );
308+ mockLog .addExpectation (
309+ new MockLog .SeenEventExpectation (
310+ "reconnect after exceptional close" ,
311+ NodeConnectionsService .class .getCanonicalName (),
312+ Level .WARN ,
313+ "reopened transport connection to node ["
314+ + exceptionalClose .descriptionWithoutAttributes ()
315+ + "] which disconnected exceptionally ["
316+ + reconnectIntervalTimeValue
317+ + "/"
318+ + reconnectIntervalMillis
319+ + "ms] ago "
320+ + "but did not restart, so the disconnection is unexpected; "
321+ + "see [https://www.elastic.co/docs/*] for troubleshooting guidance"
322+ )
323+ );
324+ runTasksUntil (deterministicTaskQueue , deterministicTaskQueue .getCurrentTimeMillis () + reconnectIntervalMillis );
325+ mockLog .assertAllExpectationsMatched ();
326+ }
327+
328+ // check on reconnect -- disconnection history is reset
329+ assertNullDisconnectionHistory (service , noClose );
330+ assertNullDisconnectionHistory (service , gracefulClose );
331+ assertNullDisconnectionHistory (service , exceptionalClose );
332+ }
333+
334+ private void assertNullDisconnectionHistory (NodeConnectionsService service , DiscoveryNode node ) {
335+ DisconnectionHistory disconnectionHistory = service .disconnectionHistoryForNode (node );
336+ assertNull (disconnectionHistory );
337+ }
338+
339+ private void assertDisconnectionHistoryDetails (
340+ NodeConnectionsService service ,
341+ ThreadPool threadPool ,
342+ DiscoveryNode node ,
343+ @ Nullable Class <?> disconnectCauseClass
344+ ) {
345+ DisconnectionHistory disconnectionHistory = service .disconnectionHistoryForNode (node );
346+ assertNotNull (disconnectionHistory );
347+ assertTrue (threadPool .absoluteTimeInMillis () - disconnectionHistory .disconnectTimeMillis () >= 0 );
348+ assertTrue (threadPool .absoluteTimeInMillis () - disconnectionHistory .disconnectTimeMillis () <= 200 );
349+ if (disconnectCauseClass != null ) {
350+ assertThat (disconnectionHistory .disconnectCause (), Matchers .isA (disconnectCauseClass ));
351+ } else {
352+ assertNull (disconnectionHistory .disconnectCause ());
353+ }
354+ }
355+
249356 public void testOnlyBlocksOnConnectionsToNewNodes () throws Exception {
250357 final NodeConnectionsService service = new NodeConnectionsService (Settings .EMPTY , threadPool , transportService );
251358
@@ -526,6 +633,7 @@ public void setUp() throws Exception {
526633 ThreadPool threadPool = new TestThreadPool (getClass ().getName ());
527634 this .threadPool = threadPool ;
528635 nodeConnectionBlocks = newConcurrentMap ();
636+ nodeCloseExceptions = newConcurrentMap ();
529637 transportService = new TestTransportService (new MockTransport (threadPool ), threadPool );
530638 transportService .start ();
531639 transportService .acceptIncomingRequests ();
@@ -644,7 +752,12 @@ public void addCloseListener(ActionListener<Void> listener1) {
644752
645753 @ Override
646754 public void close () {
647- closeListener .onResponse (null );
755+ Exception closeException = nodeCloseExceptions .get (node );
756+ if (closeException != null ) {
757+ closeListener .onFailure (closeException );
758+ } else {
759+ closeListener .onResponse (null );
760+ }
648761 }
649762
650763 @ Override
0 commit comments