17
17
import org .elasticsearch .action .ActionRunnable ;
18
18
import org .elasticsearch .action .support .PlainActionFuture ;
19
19
import org .elasticsearch .action .support .SubscribableListener ;
20
+ import org .elasticsearch .cluster .NodeConnectionsService .DisconnectionHistory ;
20
21
import org .elasticsearch .cluster .node .DiscoveryNode ;
21
22
import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
22
23
import org .elasticsearch .cluster .node .DiscoveryNodeUtils ;
49
50
import org .elasticsearch .transport .TransportRequestOptions ;
50
51
import org .elasticsearch .transport .TransportService ;
51
52
import org .elasticsearch .transport .TransportStats ;
53
+ import org .hamcrest .Matchers ;
52
54
import org .junit .After ;
53
55
import org .junit .Before ;
54
56
@@ -75,6 +77,7 @@ public class NodeConnectionsServiceTests extends ESTestCase {
75
77
private ThreadPool threadPool ;
76
78
private TransportService transportService ;
77
79
private Map <DiscoveryNode , CheckedRunnable <Exception >> nodeConnectionBlocks ;
80
+ private Map <DiscoveryNode , Exception > nodeCloseExceptions ;
78
81
79
82
private List <DiscoveryNode > generateNodes () {
80
83
List <DiscoveryNode > nodes = new ArrayList <>();
@@ -246,6 +249,110 @@ public String toString() {
246
249
assertConnectedExactlyToNodes (transportService , targetNodes );
247
250
}
248
251
252
+ public void testDisconnectionHistory () {
253
+ final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue ();
254
+ final ThreadPool threadPool = deterministicTaskQueue .getThreadPool ();
255
+ final TimeValue reconnectIntervalTimeValue = CLUSTER_NODE_RECONNECT_INTERVAL_SETTING .get (Settings .EMPTY );
256
+ final long reconnectIntervalMillis = reconnectIntervalTimeValue .millis ();
257
+
258
+ MockTransport transport = new MockTransport (threadPool );
259
+ TestTransportService transportService = new TestTransportService (transport , threadPool );
260
+ transportService .start ();
261
+ transportService .acceptIncomingRequests ();
262
+
263
+ final NodeConnectionsService service = new NodeConnectionsService (Settings .EMPTY , threadPool , transportService );
264
+ service .start ();
265
+
266
+ final DiscoveryNode noClose = DiscoveryNodeUtils .create ("noClose" );
267
+ final DiscoveryNode gracefulClose = DiscoveryNodeUtils .create ("gracefulClose" );
268
+ final DiscoveryNode exceptionalClose = DiscoveryNodeUtils .create ("exceptionalClose" );
269
+
270
+ nodeCloseExceptions .put (exceptionalClose , new RuntimeException ());
271
+
272
+ final AtomicBoolean connectionCompleted = new AtomicBoolean ();
273
+ DiscoveryNodes nodes = DiscoveryNodes .builder ().add (noClose ).add (gracefulClose ).add (exceptionalClose ).build ();
274
+
275
+ service .connectToNodes (nodes , () -> connectionCompleted .set (true ));
276
+ deterministicTaskQueue .runAllRunnableTasks ();
277
+ assertTrue (connectionCompleted .get ());
278
+
279
+ assertNullDisconnectionHistory (service , noClose );
280
+ assertNullDisconnectionHistory (service , gracefulClose );
281
+ assertNullDisconnectionHistory (service , exceptionalClose );
282
+
283
+ transportService .disconnectFromNode (gracefulClose );
284
+ transportService .disconnectFromNode (exceptionalClose );
285
+
286
+ // check disconnection history set after close
287
+ assertNullDisconnectionHistory (service , noClose );
288
+ assertDisconnectionHistoryDetails (service , threadPool , gracefulClose , null );
289
+ assertDisconnectionHistoryDetails (service , threadPool , exceptionalClose , RuntimeException .class );
290
+
291
+ try (var mockLog = MockLog .capture (NodeConnectionsService .class )) {
292
+ mockLog .addExpectation (
293
+ new MockLog .SeenEventExpectation (
294
+ "reconnect after graceful close" ,
295
+ NodeConnectionsService .class .getCanonicalName (),
296
+ Level .WARN ,
297
+ "reopened transport connection to node ["
298
+ + gracefulClose .descriptionWithoutAttributes ()
299
+ + "] which disconnected gracefully ["
300
+ + reconnectIntervalTimeValue
301
+ + "/"
302
+ + reconnectIntervalMillis
303
+ + "ms] ago "
304
+ + "but did not restart, so the disconnection is unexpected; "
305
+ + "see [https://www.elastic.co/guide/*] for troubleshooting guidance"
306
+ )
307
+ );
308
+ mockLog .addExpectation (
309
+ new MockLog .SeenEventExpectation (
310
+ "reconnect after exceptional close" ,
311
+ NodeConnectionsService .class .getCanonicalName (),
312
+ Level .WARN ,
313
+ "reopened transport connection to node ["
314
+ + exceptionalClose .descriptionWithoutAttributes ()
315
+ + "] which disconnected exceptionally ["
316
+ + reconnectIntervalTimeValue
317
+ + "/"
318
+ + reconnectIntervalMillis
319
+ + "ms] ago "
320
+ + "but did not restart, so the disconnection is unexpected; "
321
+ + "see [https://www.elastic.co/guide/*] for troubleshooting guidance"
322
+ )
323
+ );
324
+ runTasksUntil (deterministicTaskQueue , deterministicTaskQueue .getCurrentTimeMillis () + reconnectIntervalMillis );
325
+ mockLog .assertAllExpectationsMatched ();
326
+ }
327
+
328
+ // check on reconnect -- disconnection history is reset
329
+ assertNullDisconnectionHistory (service , noClose );
330
+ assertNullDisconnectionHistory (service , gracefulClose );
331
+ assertNullDisconnectionHistory (service , exceptionalClose );
332
+ }
333
+
334
+ private void assertNullDisconnectionHistory (NodeConnectionsService service , DiscoveryNode node ) {
335
+ DisconnectionHistory disconnectionHistory = service .disconnectionHistoryForNode (node );
336
+ assertNull (disconnectionHistory );
337
+ }
338
+
339
+ private void assertDisconnectionHistoryDetails (
340
+ NodeConnectionsService service ,
341
+ ThreadPool threadPool ,
342
+ DiscoveryNode node ,
343
+ @ Nullable Class <?> disconnectCauseClass
344
+ ) {
345
+ DisconnectionHistory disconnectionHistory = service .disconnectionHistoryForNode (node );
346
+ assertNotNull (disconnectionHistory );
347
+ assertTrue (threadPool .absoluteTimeInMillis () - disconnectionHistory .disconnectTimeMillis () >= 0 );
348
+ assertTrue (threadPool .absoluteTimeInMillis () - disconnectionHistory .disconnectTimeMillis () <= 200 );
349
+ if (disconnectCauseClass != null ) {
350
+ assertThat (disconnectionHistory .disconnectCause (), Matchers .isA (disconnectCauseClass ));
351
+ } else {
352
+ assertNull (disconnectionHistory .disconnectCause ());
353
+ }
354
+ }
355
+
249
356
public void testOnlyBlocksOnConnectionsToNewNodes () throws Exception {
250
357
final NodeConnectionsService service = new NodeConnectionsService (Settings .EMPTY , threadPool , transportService );
251
358
@@ -526,6 +633,7 @@ public void setUp() throws Exception {
526
633
ThreadPool threadPool = new TestThreadPool (getClass ().getName ());
527
634
this .threadPool = threadPool ;
528
635
nodeConnectionBlocks = newConcurrentMap ();
636
+ nodeCloseExceptions = newConcurrentMap ();
529
637
transportService = new TestTransportService (new MockTransport (threadPool ), threadPool );
530
638
transportService .start ();
531
639
transportService .acceptIncomingRequests ();
@@ -644,7 +752,12 @@ public void addCloseListener(ActionListener<Void> listener1) {
644
752
645
753
@ Override
646
754
public void close () {
647
- closeListener .onResponse (null );
755
+ Exception closeException = nodeCloseExceptions .get (node );
756
+ if (closeException != null ) {
757
+ closeListener .onFailure (closeException );
758
+ } else {
759
+ closeListener .onResponse (null );
760
+ }
648
761
}
649
762
650
763
@ Override
0 commit comments