8080import java .util .concurrent .CountDownLatch ;
8181import java .util .concurrent .LinkedBlockingDeque ;
8282import java .util .concurrent .TimeUnit ;
83+ import java .util .concurrent .atomic .AtomicReference ;
84+ import java .util .function .Consumer ;
8385import java .util .function .Function ;
8486import java .util .function .Supplier ;
8587
@@ -104,6 +106,7 @@ public class MockTransportService extends TransportService {
104106 private final Map <DiscoveryNode , List <Transport .Connection >> openConnections = new HashMap <>();
105107
106108 private final List <Runnable > onStopListeners = new CopyOnWriteArrayList <>();
109+ private final AtomicReference <Consumer <Transport .Connection >> onConnectionClosedCallback = new AtomicReference <>();
107110
108111 public static class TestPlugin extends Plugin {
109112 @ Override
@@ -788,6 +791,19 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi
788791 }));
789792 }
790793
794+ public void setOnConnectionClosedCallback (Consumer <Transport .Connection > callback ) {
795+ onConnectionClosedCallback .set (callback );
796+ }
797+
798+ @ Override
799+ public void onConnectionClosed (Transport .Connection connection ) {
800+ final Consumer <Transport .Connection > callback = onConnectionClosedCallback .get ();
801+ if (callback != null ) {
802+ callback .accept (connection );
803+ }
804+ super .onConnectionClosed (connection );
805+ }
806+
791807 public void addOnStopListener (Runnable listener ) {
792808 onStopListeners .add (listener );
793809 }
0 commit comments