80
80
import java .util .concurrent .CountDownLatch ;
81
81
import java .util .concurrent .LinkedBlockingDeque ;
82
82
import java .util .concurrent .TimeUnit ;
83
+ import java .util .concurrent .atomic .AtomicReference ;
84
+ import java .util .function .Consumer ;
83
85
import java .util .function .Function ;
84
86
import java .util .function .Supplier ;
85
87
@@ -104,6 +106,7 @@ public class MockTransportService extends TransportService {
104
106
private final Map <DiscoveryNode , List <Transport .Connection >> openConnections = new HashMap <>();
105
107
106
108
private final List <Runnable > onStopListeners = new CopyOnWriteArrayList <>();
109
+ private final AtomicReference <Consumer <Transport .Connection >> onConnectionClosedCallback = new AtomicReference <>();
107
110
108
111
public static class TestPlugin extends Plugin {
109
112
@ Override
@@ -788,6 +791,19 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi
788
791
}));
789
792
}
790
793
794
+ public void setOnConnectionClosedCallback (Consumer <Transport .Connection > callback ) {
795
+ onConnectionClosedCallback .set (callback );
796
+ }
797
+
798
+ @ Override
799
+ public void onConnectionClosed (Transport .Connection connection ) {
800
+ final Consumer <Transport .Connection > callback = onConnectionClosedCallback .get ();
801
+ if (callback != null ) {
802
+ callback .accept (connection );
803
+ }
804
+ super .onConnectionClosed (connection );
805
+ }
806
+
791
807
public void addOnStopListener (Runnable listener ) {
792
808
onStopListeners .add (listener );
793
809
}
0 commit comments