diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java index c5a4754e419b4..d821b67747458 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java @@ -12,7 +12,7 @@ import io.netty.channel.Channel; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.http.HttpChannel; import org.elasticsearch.http.HttpResponse; @@ -25,7 +25,7 @@ public class Netty4HttpChannel implements HttpChannel { private final Channel channel; - private final ListenableFuture closeContext = new ListenableFuture<>(); + private final SubscribableListener closeContext = new SubscribableListener<>(); Netty4HttpChannel(Channel channel) { this.channel = channel; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java index eefa455d4137f..d7dcea9700adc 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java @@ -12,7 +12,7 @@ import io.netty.channel.Channel; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.http.HttpServerChannel; import java.net.InetSocketAddress; @@ -22,7 +22,7 @@ public class Netty4HttpServerChannel implements HttpServerChannel { private final Channel channel; - private final ListenableFuture closeContext = new ListenableFuture<>(); + private final SubscribableListener closeContext = new SubscribableListener<>(); Netty4HttpServerChannel(Channel channel) { this.channel = channel; diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java index 9b8fd6ff2d116..5ce886a430532 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java @@ -14,8 +14,8 @@ import io.netty.channel.ChannelOption; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Releasables; import org.elasticsearch.transport.TcpChannel; @@ -30,8 +30,8 @@ public class Netty4TcpChannel implements TcpChannel { private final Channel channel; private final boolean isServer; private final String profile; - private final ListenableFuture connectContext; - private final ListenableFuture closeContext = new ListenableFuture<>(); + private final SubscribableListener connectContext = new SubscribableListener<>(); + private final SubscribableListener closeContext = new SubscribableListener<>(); private final ChannelStats stats = new ChannelStats(); private final boolean rstOnClose; /** @@ -43,7 +43,6 @@ public class Netty4TcpChannel implements TcpChannel { this.channel = channel; this.isServer = isServer; this.profile = profile; - this.connectContext = new ListenableFuture<>(); this.rstOnClose = rstOnClose; addListener(connectFuture, connectContext); addListener(this.channel.closeFuture(), new ActionListener<>() { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java index 85a109070df8d..b097e814563ef 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java @@ -12,7 +12,7 @@ import io.netty.channel.Channel; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.transport.TcpServerChannel; import java.net.InetSocketAddress; @@ -22,7 +22,7 @@ public class Netty4TcpServerChannel implements TcpServerChannel { private final Channel channel; - private final ListenableFuture closeContext = new ListenableFuture<>(); + private final SubscribableListener closeContext = new SubscribableListener<>(); Netty4TcpServerChannel(Channel channel) { this.channel = channel; diff --git a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java index 61d7869aec326..4b76e4b134833 100644 --- a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java @@ -10,7 +10,7 @@ package org.elasticsearch.transport; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.core.AbstractRefCounted; import java.util.concurrent.atomic.AtomicBoolean; @@ -20,8 +20,8 @@ */ public abstract class CloseableConnection extends AbstractRefCounted implements Transport.Connection { - private final ListenableFuture closeContext = new ListenableFuture<>(); - private final ListenableFuture removeContext = new ListenableFuture<>(); + private final SubscribableListener closeContext = new SubscribableListener<>(); + private final SubscribableListener removeContext = new SubscribableListener<>(); private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean removed = new AtomicBoolean(false); diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index f8a798e15f282..5d13b10f5a6a4 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -12,11 +12,11 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.AbstractRefCounted; @@ -43,7 +43,7 @@ public class ClusterConnectionManager implements ConnectionManager { private static final Logger logger = LogManager.getLogger(ClusterConnectionManager.class); private final ConcurrentMap connectedNodes = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap> pendingConnections = ConcurrentCollections + private final ConcurrentMap> pendingConnections = ConcurrentCollections .newConcurrentMap(); private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete); @@ -184,8 +184,8 @@ private void connectToNodeOrRetry( return; } - final ListenableFuture currentListener = new ListenableFuture<>(); - final ListenableFuture existingListener = pendingConnections.putIfAbsent(node, currentListener); + final SubscribableListener currentListener = new SubscribableListener<>(); + final SubscribableListener existingListener = pendingConnections.putIfAbsent(node, currentListener); if (existingListener != null) { try { // wait on previous entry to complete connection attempt @@ -203,7 +203,7 @@ private void connectToNodeOrRetry( // extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending. final Transport.Connection existingConnectionRecheck = connectedNodes.get(node); if (existingConnectionRecheck != null) { - ListenableFuture future = pendingConnections.remove(node); + var future = pendingConnections.remove(node); assert future == currentListener : "Listener in pending map is different than the expected listener"; connectingRefCounter.decRef(); future.onResponse(existingConnectionRecheck); @@ -257,7 +257,7 @@ private void connectToNodeOrRetry( } } } finally { - ListenableFuture future = pendingConnections.remove(node); + var future = pendingConnections.remove(node); assert future == currentListener : "Listener in pending map is different than the expected listener"; managerRefs.decRef(); releaseOnce.run(); @@ -387,9 +387,9 @@ private void failConnectionListener( DiscoveryNode node, RunOnce releaseOnce, Exception e, - ListenableFuture expectedListener + SubscribableListener expectedListener ) { - ListenableFuture future = pendingConnections.remove(node); + final var future = pendingConnections.remove(node); releaseOnce.run(); if (future != null) { assert future == expectedListener : "Listener in pending map is different than the expected listener";