diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java index f3b4da611811a..f9af19bc2a151 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/FollowersChecker.java @@ -26,13 +26,13 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ReceiveTimeoutTransportException; -import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -137,7 +137,7 @@ public FollowersChecker( ); transportService.addConnectionListener(new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { handleDisconnectedNode(node); } }); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java index e58772d7e952a..646a4dc2c36f1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/LeaderChecker.java @@ -31,7 +31,6 @@ import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.ReceiveTimeoutTransportException; -import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -124,7 +123,7 @@ public class LeaderChecker { transportService.addConnectionListener(new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { handleDisconnectedNode(node); } }); diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index f8a798e15f282..41dc615543709 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -229,11 +229,26 @@ private void connectToNodeOrRetry( try { connectionListener.onNodeConnected(node, conn); } finally { - conn.addCloseListener(ActionListener.running(() -> { - connectedNodes.remove(node, conn); - connectionListener.onNodeDisconnected(node, conn); - managerRefs.decRef(); - })); + conn.addCloseListener(new ActionListener() { + @Override + public void onResponse(Void ignored) { + handleClose(null); + } + + @Override + public void onFailure(Exception e) { + handleClose(e); + } + + void handleClose(@Nullable Exception e) { + connectedNodes.remove(node, conn); + try { + connectionListener.onNodeDisconnected(node, e); + } finally { + managerRefs.decRef(); + } + } + }); conn.addCloseListener(ActionListener.running(() -> { if (connectingRefCounter.hasReferences() == false) { diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index daccac3fbe2cb..bc11ca0e8699f 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -60,9 +60,9 @@ final class DelegatingNodeConnectionListener implements TransportConnectionListe private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); @Override - public void onNodeDisconnected(DiscoveryNode key, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode key, @Nullable Exception closeException) { for (TransportConnectionListener listener : listeners) { - listener.onNodeDisconnected(key, connection); + listener.onNodeDisconnected(key, closeException); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java index 97520e8b939a6..b4e72804a0cc0 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java @@ -48,7 +48,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { removeConnectedNode(node); } }); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 248ca4313cefb..a715797b97977 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; @@ -339,7 +340,7 @@ boolean shouldRebuildConnection(Settings newSettings) { protected abstract ConnectionStrategy strategyType(); @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { if (shouldOpenMoreConnections()) { // try to reconnect and fill up the slot of the disconnected node connect( diff --git a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java index 92796f826fc3a..470014eb7a676 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportConnectionListener.java @@ -10,6 +10,7 @@ package org.elasticsearch.transport; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.core.Nullable; /** * A listener interface that allows to react on transport events. All methods may be @@ -38,5 +39,5 @@ default void onNodeConnected(DiscoveryNode node, Transport.Connection connection /** * Called once a node connection is closed and unregistered. */ - default void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) {} + default void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) {} } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 864ab02fc8cc8..a850f569b0413 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.Index; @@ -829,7 +830,7 @@ public void testCCSRemoteReduceWithDisconnectedRemoteClusters() throws Exception CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { if (disconnectedNodes.remove(node)) { disconnectedLatch.countDown(); } @@ -1132,7 +1133,7 @@ public void testCollectSearchShards() throws Exception { CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { if (disconnectedNodes.remove(node)) { disconnectedLatch.countDown(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index 8963bb10d6573..f0edd4aeba126 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -251,7 +252,7 @@ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { final AtomicReference> disconnectListenerRef = new AtomicReference<>(); transportService.addConnectionListener(new TransportConnectionListener() { @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { final ActionListener disconnectListener = disconnectListenerRef.getAndSet(null); if (disconnectListener != null) { disconnectListener.onResponse(node); diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java index 32171db433efd..3860c9f6bacc8 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; @@ -101,7 +102,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { nodeDisconnectedCount.incrementAndGet(); } }); @@ -658,7 +659,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { nodeDisconnectedCount.incrementAndGet(); } }); @@ -698,7 +699,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { nodeDisconnectedCount.incrementAndGet(); } }); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index a5bbb84ea5a31..67dbfeef098e3 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -202,7 +202,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { fail("disconnect should not be called " + node); } }; @@ -924,7 +924,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { latch.countDown(); } }; @@ -2124,7 +2124,7 @@ public void onNodeConnected(DiscoveryNode node, Transport.Connection connection) } @Override - public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { fail("disconnect should not be called " + node); } };