diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 31a90c1ca1631..656d19373d7e7 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -172,17 +172,6 @@ public DiscoveryNode getLocalNode() { return transportService.getLocalNode(); } - /** - * Returns true if at least one remote cluster is configured - */ - public boolean isCrossClusterSearchEnabled() { - return remoteClusters.isEmpty() == false; - } - - boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode node) { - return remoteClusters.get(remoteCluster).isNodeConnected(node); - } - /** * Group indices by cluster alias mapped to OriginalIndices for that cluster. * @param remoteClusterNames Set of configured remote cluster names. @@ -258,13 +247,6 @@ public Set getConfiguredClusters() { return getRegisteredRemoteClusterNames(); } - /** - * Returns true iff the given cluster is configured as a remote cluster. Otherwise false - */ - boolean isRemoteClusterRegistered(String clusterName) { - return remoteClusters.containsKey(clusterName); - } - /** * Returns the registered remote cluster names. */ diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java index 2d5e7982aa899..112c30cd02b8b 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterClientTests.java @@ -40,6 +40,7 @@ import static org.elasticsearch.test.NodeRoles.removeRoles; import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING; import static org.elasticsearch.transport.RemoteClusterConnectionTests.startTransport; +import static org.elasticsearch.transport.RemoteClusterServiceTests.isRemoteNodeConnected; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -95,7 +96,7 @@ public void testConnectAndExecuteRequest() throws Exception { service.acceptIncomingRequests(); logger.info("now accepting incoming requests on local transport"); RemoteClusterService remoteClusterService = service.getRemoteClusterService(); - assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); + assertTrue(isRemoteNodeConnected(remoteClusterService, "test", remoteNode)); var client = remoteClusterService.getRemoteClusterClient( "test", threadPool.executor(TEST_THREAD_POOL_NAME), @@ -172,7 +173,7 @@ public void testEnsureWeReconnect() throws Exception { // the right calls in place in the RemoteAwareClient service.acceptIncomingRequests(); RemoteClusterService remoteClusterService = service.getRemoteClusterService(); - assertBusy(() -> assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode))); + assertBusy(() -> assertTrue(isRemoteNodeConnected(remoteClusterService, "test", remoteNode))); for (int i = 0; i < 10; i++) { RemoteClusterConnection remoteClusterConnection = remoteClusterService.getRemoteClusterConnection("test"); assertBusy(remoteClusterConnection::assertNoRunningConnections); @@ -286,7 +287,7 @@ public void testQuicklySkipUnavailableClusters() throws Exception { ); try { - assertFalse(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); + assertFalse(isRemoteNodeConnected(remoteClusterService, "test", remoteNode)); // check that we quickly fail if (randomBoolean()) { @@ -325,7 +326,7 @@ public void testQuicklySkipUnavailableClusters() throws Exception { () -> safeAwait(listener -> client.getConnection(null, listener.map(v -> v))) ) ); - assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); + assertTrue(isRemoteNodeConnected(remoteClusterService, "test", remoteNode)); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 48ef90d0772db..30699b9346300 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -164,12 +164,12 @@ public void testGroupClusterIndices() throws IOException { builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); - assertTrue(service.isRemoteClusterRegistered("cluster_2")); - assertFalse(service.isRemoteClusterRegistered("foo")); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteClusterRegistered(service, "foo")); { Map> perClusterIndices = service.groupClusterIndices( service.getRegisteredRemoteClusterNames(), @@ -378,12 +378,12 @@ public void testGroupIndices() throws IOException { builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); - assertTrue(service.isRemoteClusterRegistered("cluster_2")); - assertFalse(service.isRemoteClusterRegistered("foo")); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteClusterRegistered(service, "foo")); { Map perClusterIndices = service.groupIndices( IndicesOptions.LENIENT_EXPAND_OPEN, @@ -442,7 +442,7 @@ public void testGroupIndicesWithoutRemoteClusterClientRole() throws Exception { ); try (RemoteClusterService service = new RemoteClusterService(settings, null)) { assertFalse(service.isEnabled()); - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); final IllegalArgumentException error = expectThrows( IllegalArgumentException.class, () -> service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN, new String[] { "cluster_1:bar", "cluster_2:foo*" }) @@ -485,9 +485,9 @@ public void testIncrementallyAddClusters() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); Settings cluster1Settings = createSettings( "cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()) @@ -504,19 +504,19 @@ public void testIncrementallyAddClusters() throws IOException { } }).start(); clusterAdded.actionGet(); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); Settings cluster2Settings = createSettings( "cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()) ); service.validateAndUpdateRemoteCluster("cluster_2", cluster2Settings); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); - assertTrue(service.isRemoteClusterRegistered("cluster_2")); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList()); service.validateAndUpdateRemoteCluster("cluster_2", cluster2SettingsDisabled); - assertFalse(service.isRemoteClusterRegistered("cluster_2")); + assertFalse(isRemoteClusterRegistered(service, "cluster_2")); IllegalArgumentException iae = expectThrows( IllegalArgumentException.class, () -> service.validateAndUpdateRemoteCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY) @@ -561,15 +561,15 @@ public void testDefaultPingSchedule() throws IOException { transportService.start(); transportService.acceptIncomingRequests(); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); - assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(hasRegisteredClusters(service)); service.validateAndUpdateRemoteCluster( "cluster_1", createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())) ); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getConnectionProfile().getPingInterval()); } @@ -626,7 +626,7 @@ public void testCustomPingSchedule() throws IOException { builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { service.initializeRemoteClusters(); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); @@ -749,9 +749,9 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { transportService.start(); transportService.acceptIncomingRequests(); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( @@ -769,13 +769,13 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { ); secondLatch.await(); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); - assertFalse(service.isRemoteNodeConnected("cluster_1", c1N1Node)); - assertTrue(service.isRemoteNodeConnected("cluster_1", c1N2Node)); - assertTrue(service.isRemoteClusterRegistered("cluster_2")); - assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node)); - assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node)); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); assertEquals(0, transportService.getConnectionManager().size()); } } @@ -839,9 +839,9 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { transportService.start(); transportService.acceptIncomingRequests(); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); final CountDownLatch firstLatch = new CountDownLatch(1); service.updateRemoteCluster( @@ -859,13 +859,13 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { ); secondLatch.await(); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); - assertFalse(service.isRemoteNodeConnected("cluster_1", c1N1Node)); - assertTrue(service.isRemoteNodeConnected("cluster_1", c1N2Node)); - assertTrue(service.isRemoteClusterRegistered("cluster_2")); - assertFalse(service.isRemoteNodeConnected("cluster_2", c2N1Node)); - assertTrue(service.isRemoteNodeConnected("cluster_2", c2N2Node)); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); assertEquals(0, transportService.getConnectionManager().size()); } } @@ -934,9 +934,9 @@ public void testCollectNodes() throws InterruptedException, IOException { transportService.start(); transportService.acceptIncomingRequests(); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -1089,7 +1089,7 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException { try (RemoteClusterService service = new RemoteClusterService(createSettings("cluster_1", seedList), transportService)) { service.initializeRemoteClusters(); - assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(hasRegisteredClusters(service)); final var numTasks = between(3, 5); final var taskLatch = new CountDownLatch(numTasks); @@ -1272,9 +1272,9 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { final Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - assertFalse(service.isCrossClusterSearchEnabled()); + assertFalse(hasRegisteredClusters(service)); service.initializeRemoteClusters(); - assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(hasRegisteredClusters(service)); final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); @@ -1290,7 +1290,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { ); firstLatch.await(); - assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(hasRegisteredClusters(service)); assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); @@ -1308,7 +1308,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { service.updateRemoteCluster("cluster_test", createSettings("cluster_test", newSeeds), connectionListener(secondLatch)); secondLatch.await(); - assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(hasRegisteredClusters(service)); assertBusy(() -> { assertFalse(firstRemoteClusterConnection.isNodeConnected(node0)); assertFalse(firstRemoteClusterConnection.isNodeConnected(node1)); @@ -1493,18 +1493,18 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster service.updateRemoteCluster("cluster_2", secondRemoteClusterSettingsBuilder.build(), connectionListener(secondLatch)); secondLatch.await(); - assertTrue(service.isCrossClusterSearchEnabled()); - assertTrue(service.isRemoteClusterRegistered("cluster_1")); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); if (firstRemoteClusterProxyMode) { - assertFalse(service.isRemoteNodeConnected("cluster_1", c1Node)); + assertFalse(isRemoteNodeConnected(service, "cluster_1", c1Node)); } else { - assertTrue(service.isRemoteNodeConnected("cluster_1", c1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_1", c1Node)); } - assertTrue(service.isRemoteClusterRegistered("cluster_2")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); if (secondRemoteClusterProxyMode) { - assertFalse(service.isRemoteNodeConnected("cluster_2", c2Node)); + assertFalse(isRemoteNodeConnected(service, "cluster_2", c2Node)); } else { - assertTrue(service.isRemoteNodeConnected("cluster_2", c2Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_2", c2Node)); } // No local node connection assertEquals(0, transportService.getConnectionManager().size()); @@ -1766,4 +1766,15 @@ private static Settings createSettings(String clusterAlias, List seeds) return builder.build(); } + private static boolean hasRegisteredClusters(RemoteClusterService service) { + return service.getRegisteredRemoteClusterNames().isEmpty() == false; + } + + private static boolean isRemoteClusterRegistered(RemoteClusterService service, String clusterName) { + return service.getRegisteredRemoteClusterNames().contains(clusterName); + } + + public static boolean isRemoteNodeConnected(RemoteClusterService service, String clusterName, DiscoveryNode node) { + return service.getRemoteClusterConnection(clusterName).isNodeConnected(node); + } }