1212import org .apache .logging .log4j .Logger ;
1313import org .elasticsearch .action .ActionListener ;
1414import org .elasticsearch .action .support .ContextPreservingActionListener ;
15+ import org .elasticsearch .action .support .SubscribableListener ;
1516import org .elasticsearch .cluster .node .DiscoveryNode ;
1617import org .elasticsearch .common .ReferenceDocs ;
1718import org .elasticsearch .common .settings .Settings ;
1819import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
19- import org .elasticsearch .common .util .concurrent .ListenableFuture ;
2020import org .elasticsearch .common .util .concurrent .RunOnce ;
2121import org .elasticsearch .common .util .concurrent .ThreadContext ;
2222import org .elasticsearch .core .AbstractRefCounted ;
@@ -43,7 +43,7 @@ public class ClusterConnectionManager implements ConnectionManager {
4343 private static final Logger logger = LogManager .getLogger (ClusterConnectionManager .class );
4444
4545 private final ConcurrentMap <DiscoveryNode , Transport .Connection > connectedNodes = ConcurrentCollections .newConcurrentMap ();
46- private final ConcurrentMap <DiscoveryNode , ListenableFuture <Transport .Connection >> pendingConnections = ConcurrentCollections
46+ private final ConcurrentMap <DiscoveryNode , SubscribableListener <Transport .Connection >> pendingConnections = ConcurrentCollections
4747 .newConcurrentMap ();
4848 private final AbstractRefCounted connectingRefCounter = AbstractRefCounted .of (this ::pendingConnectionsComplete );
4949
@@ -184,8 +184,8 @@ private void connectToNodeOrRetry(
184184 return ;
185185 }
186186
187- final ListenableFuture <Transport .Connection > currentListener = new ListenableFuture <>();
188- final ListenableFuture <Transport .Connection > existingListener = pendingConnections .putIfAbsent (node , currentListener );
187+ final SubscribableListener <Transport .Connection > currentListener = new SubscribableListener <>();
188+ final SubscribableListener <Transport .Connection > existingListener = pendingConnections .putIfAbsent (node , currentListener );
189189 if (existingListener != null ) {
190190 try {
191191 // wait on previous entry to complete connection attempt
@@ -203,7 +203,7 @@ private void connectToNodeOrRetry(
203203 // extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending.
204204 final Transport .Connection existingConnectionRecheck = connectedNodes .get (node );
205205 if (existingConnectionRecheck != null ) {
206- ListenableFuture < Transport . Connection > future = pendingConnections .remove (node );
206+ var future = pendingConnections .remove (node );
207207 assert future == currentListener : "Listener in pending map is different than the expected listener" ;
208208 connectingRefCounter .decRef ();
209209 future .onResponse (existingConnectionRecheck );
@@ -257,7 +257,7 @@ private void connectToNodeOrRetry(
257257 }
258258 }
259259 } finally {
260- ListenableFuture < Transport . Connection > future = pendingConnections .remove (node );
260+ var future = pendingConnections .remove (node );
261261 assert future == currentListener : "Listener in pending map is different than the expected listener" ;
262262 managerRefs .decRef ();
263263 releaseOnce .run ();
@@ -387,9 +387,9 @@ private void failConnectionListener(
387387 DiscoveryNode node ,
388388 RunOnce releaseOnce ,
389389 Exception e ,
390- ListenableFuture <Transport .Connection > expectedListener
390+ SubscribableListener <Transport .Connection > expectedListener
391391 ) {
392- ListenableFuture < Transport . Connection > future = pendingConnections .remove (node );
392+ final var future = pendingConnections .remove (node );
393393 releaseOnce .run ();
394394 if (future != null ) {
395395 assert future == expectedListener : "Listener in pending map is different than the expected listener" ;
0 commit comments