diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index d1816c7fc1687..ac5233f1d54b4 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -40,7 +40,6 @@ import java.io.Closeable; import java.io.IOException; import java.util.Arrays; -import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -653,10 +652,6 @@ public RemoteClusterClient getRemoteClusterClient( }); } - Collection getConnections() { - return remoteClusters.values(); - } - static void registerRemoteClusterHandshakeRequestHandler(TransportService transportService) { transportService.registerRequestHandler( REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME, diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index f47b5988b363e..9f286efe28083 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -838,14 +838,18 @@ public void testCCSRemoteReduceWithDisconnectedRemoteClusters() throws Exception } CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); - RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { - if (disconnectedNodes.remove(node)) { - disconnectedLatch.countDown(); + RemoteClusterServiceTests.addConnectionListener( + remoteClusterService, + remoteIndicesByCluster.keySet(), + new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); + } } } - }); + ); for (DiscoveryNode disconnectedNode : disconnectedNodes) { service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); } @@ -1149,14 +1153,18 @@ public void testCollectSearchShards() throws Exception { } CountDownLatch disconnectedLatch = new CountDownLatch(numDisconnectedClusters); - RemoteClusterServiceTests.addConnectionListener(remoteClusterService, new TransportConnectionListener() { - @Override - public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { - if (disconnectedNodes.remove(node)) { - disconnectedLatch.countDown(); + RemoteClusterServiceTests.addConnectionListener( + remoteClusterService, + remoteIndicesByCluster.keySet(), + new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node, @Nullable Exception closeException) { + if (disconnectedNodes.remove(node)) { + disconnectedLatch.countDown(); + } } } - }); + ); for (DiscoveryNode disconnectedNode : disconnectedNodes) { service.addFailToSendNoConnectRule(disconnectedNode.getAddress()); } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 76e280c987ae1..99c4dde4d396f 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -1250,8 +1250,13 @@ public static void updateSkipUnavailable(RemoteClusterService service, String cl connection.setSkipUnavailable(skipUnavailable); } - public static void addConnectionListener(RemoteClusterService service, TransportConnectionListener listener) { - for (RemoteClusterConnection connection : service.getConnections()) { + public static void addConnectionListener( + RemoteClusterService service, + Set clusterAliases, + TransportConnectionListener listener + ) { + for (final var clusterAlias : clusterAliases) { + final var connection = service.getRemoteClusterConnection(clusterAlias); ConnectionManager connectionManager = connection.getConnectionManager(); connectionManager.addListener(listener); }