From 96b37d188bb31bbe39eb815c24041335b6eae6a4 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Wed, 5 Nov 2025 15:54:26 -0500 Subject: [PATCH] Remove extra RemoteClusterService instances in unit test This refactoring is a split from #137406. A TransportService instance already creates a RemoteClusterService instance, with a getter method to get a reference to it. The unit tests were creating another instance, passing the existing TransportService instance into the constructor. This will cause one time metric registration to fail since the metrics have already been registered in the TransportService's internal RemoteClusterService instance. Relates: ES-12695 --- .../transport/RemoteClusterServiceTests.java | 1389 ++++++++--------- 1 file changed, 667 insertions(+), 722 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 2cd195cb4a8ff..c2a59fda3a71b 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -73,7 +73,6 @@ public class RemoteClusterServiceTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); private final ProjectResolver projectResolver = DefaultProjectResolver.INSTANCE; - private LinkedProjectConfigService linkedProjectConfigService = null; @Override public void tearDown() throws Exception { @@ -81,17 +80,8 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } - private RemoteClusterService createRemoteClusterService(Settings settings, MockTransportService transportService) { - return createRemoteClusterService(settings, ClusterSettings.createBuiltInClusterSettings(), transportService); - } - - private RemoteClusterService createRemoteClusterService( - Settings settings, - ClusterSettings clusterSettings, - MockTransportService transportService - ) { - linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, projectResolver); - return new RemoteClusterService(settings, transportService, projectResolver); + private RemoteClusterService createRemoteClusterService(Settings settings) { + return new RemoteClusterService(settings, null, projectResolver); } private MockTransportService startTransport( @@ -156,7 +146,7 @@ public void testRemoteClusterSeedSetting() { assertEquals("failed to parse port", e.getMessage()); } - public void testGroupClusterIndices() throws IOException { + public void testGroupClusterIndices() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport( @@ -177,10 +167,13 @@ public void testGroupClusterIndices() throws IOException { knownNodes.add(cluster1Transport.getLocalNode()); knownNodes.add(cluster2Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try ( MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, + builder.build(), VersionInformation.CURRENT, TransportVersion.current(), threadPool, @@ -189,188 +182,182 @@ public void testGroupClusterIndices() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - assertFalse(isRemoteClusterRegistered(service, "foo")); - { - Map> perClusterIndices = service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - new String[] { - "cluster_1:bar", - "cluster_2:foo:bar", - "cluster_1:test", - "cluster_2:foo*", - "foo", - "cluster*:baz", - "*:boo" } - ); - List localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - assertNotNull(localIndices); - assertEquals("foo", localIndices.get(0)); - assertEquals(2, perClusterIndices.size()); - assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1")); - assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2")); - } - - expectThrows( - NoSuchRemoteClusterException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - new String[] { "foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo" } - ) + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteClusterRegistered(service, "foo")); + { + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + new String[] { + "cluster_1:bar", + "cluster_2:foo:bar", + "cluster_1:test", + "cluster_2:foo*", + "foo", + "cluster*:baz", + "*:boo" } ); + List localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertNotNull(localIndices); + assertEquals("foo", localIndices.get(0)); + assertEquals(2, perClusterIndices.size()); + assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1")); + assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2")); + } - expectThrows( - NoSuchRemoteClusterException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - new String[] { "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "does_not_exist:*" } - ) - ); + expectThrows( + NoSuchRemoteClusterException.class, + () -> service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + new String[] { "foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo" } + ) + ); - // test cluster exclusions - { - String[] indices = shuffledList(List.of("cluster*:foo*", "foo", "-cluster_1:*", "*:boo")).toArray(new String[0]); + expectThrows( + NoSuchRemoteClusterException.class, + () -> service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + new String[] { "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "does_not_exist:*" } + ) + ); - Map> perClusterIndices = service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - indices - ); - assertEquals(2, perClusterIndices.size()); - List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - assertNotNull(localIndexes); - assertEquals(1, localIndexes.size()); - assertEquals("foo", localIndexes.get(0)); - - List cluster2 = perClusterIndices.get("cluster_2"); - assertNotNull(cluster2); - assertEquals(2, cluster2.size()); - assertEquals(List.of("boo", "foo*"), cluster2.stream().sorted().toList()); - } - { - String[] indices = shuffledList(List.of("*:*", "-clu*_1:*", "*:boo")).toArray(new String[0]); + // test cluster exclusions + { + String[] indices = shuffledList(List.of("cluster*:foo*", "foo", "-cluster_1:*", "*:boo")).toArray(new String[0]); - Map> perClusterIndices = service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - indices - ); - assertEquals(1, perClusterIndices.size()); + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + indices + ); + assertEquals(2, perClusterIndices.size()); + List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertNotNull(localIndexes); + assertEquals(1, localIndexes.size()); + assertEquals("foo", localIndexes.get(0)); + + List cluster2 = perClusterIndices.get("cluster_2"); + assertNotNull(cluster2); + assertEquals(2, cluster2.size()); + assertEquals(List.of("boo", "foo*"), cluster2.stream().sorted().toList()); + } + { + String[] indices = shuffledList(List.of("*:*", "-clu*_1:*", "*:boo")).toArray(new String[0]); - List cluster2 = perClusterIndices.get("cluster_2"); - assertNotNull(cluster2); - assertEquals(2, cluster2.size()); - assertEquals(List.of("*", "boo"), cluster2.stream().sorted().toList()); - } - { - String[] indices = shuffledList(List.of("cluster*:foo*", "cluster_2:*", "foo", "-cluster_1:*", "-c*:*")).toArray( - new String[0] - ); + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + indices + ); + assertEquals(1, perClusterIndices.size()); - Map> perClusterIndices = service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - indices - ); - assertEquals(1, perClusterIndices.size()); - List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - assertNotNull(localIndexes); - assertEquals(1, localIndexes.size()); - assertEquals("foo", localIndexes.get(0)); - } - { - String[] indices = shuffledList(List.of("cluster*:*", "foo", "-*:*")).toArray(new String[0]); + List cluster2 = perClusterIndices.get("cluster_2"); + assertNotNull(cluster2); + assertEquals(2, cluster2.size()); + assertEquals(List.of("*", "boo"), cluster2.stream().sorted().toList()); + } + { + String[] indices = shuffledList(List.of("cluster*:foo*", "cluster_2:*", "foo", "-cluster_1:*", "-c*:*")).toArray( + new String[0] + ); - Map> perClusterIndices = service.groupClusterIndices( + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + indices + ); + assertEquals(1, perClusterIndices.size()); + List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertNotNull(localIndexes); + assertEquals(1, localIndexes.size()); + assertEquals("foo", localIndexes.get(0)); + } + { + String[] indices = shuffledList(List.of("cluster*:*", "foo", "-*:*")).toArray(new String[0]); + + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + indices + ); + assertEquals(1, perClusterIndices.size()); + List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertNotNull(localIndexes); + assertEquals(1, localIndexes.size()); + assertEquals("foo", localIndexes.get(0)); + } + { + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices( service.getRegisteredRemoteClusterNames(), - indices - ); - assertEquals(1, perClusterIndices.size()); - List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - assertNotNull(localIndexes); - assertEquals(1, localIndexes.size()); - assertEquals("foo", localIndexes.get(0)); - } - { - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - // -cluster_1:foo* is not allowed, only -cluster_1:* - new String[] { "cluster_1:bar", "-cluster_2:foo*", "cluster_1:test", "cluster_2:foo*", "foo" } - ) - ); - assertThat( - e.getMessage(), - equalTo("To exclude a cluster you must specify the '*' wildcard for the index expression, but found: [foo*]") - ); - } - { - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - // -cluster_1:* will fail since cluster_1 was never included in order to qualify to be excluded - new String[] { "-cluster_1:*", "cluster_2:foo*", "foo" } - ) - ); - assertThat( - e.getMessage(), - equalTo( - "Attempt to exclude cluster [cluster_1] failed as it is not included in the list of clusters to " - + "be included: [(local), cluster_2]. Input: [-cluster_1:*,cluster_2:foo*,foo]" - ) - ); - } - { - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), new String[] { "-cluster_1:*" }) - ); - assertThat( - e.getMessage(), - equalTo( - "Attempt to exclude cluster [cluster_1] failed as it is not included in the list of clusters to " - + "be included: []. Input: [-cluster_1:*]" - ) - ); - } - { - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), new String[] { "-*:*" }) - ); - assertThat( - e.getMessage(), - equalTo( - "Attempt to exclude clusters [cluster_1, cluster_2] failed as they are not included in the list of " - + "clusters to be included: []. Input: [-*:*]" - ) - ); - } - { - String[] indices = shuffledList(List.of("cluster*:*", "*:foo", "-*:*")).toArray(new String[0]); + // -cluster_1:foo* is not allowed, only -cluster_1:* + new String[] { "cluster_1:bar", "-cluster_2:foo*", "cluster_1:test", "cluster_2:foo*", "foo" } + ) + ); + assertThat( + e.getMessage(), + equalTo("To exclude a cluster you must specify the '*' wildcard for the index expression, but found: [foo*]") + ); + } + { + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + // -cluster_1:* will fail since cluster_1 was never included in order to qualify to be excluded + new String[] { "-cluster_1:*", "cluster_2:foo*", "foo" } + ) + ); + assertThat( + e.getMessage(), + equalTo( + "Attempt to exclude cluster [cluster_1] failed as it is not included in the list of clusters to " + + "be included: [(local), cluster_2]. Input: [-cluster_1:*,cluster_2:foo*,foo]" + ) + ); + } + { + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), new String[] { "-cluster_1:*" }) + ); + assertThat( + e.getMessage(), + equalTo( + "Attempt to exclude cluster [cluster_1] failed as it is not included in the list of clusters to " + + "be included: []. Input: [-cluster_1:*]" + ) + ); + } + { + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), new String[] { "-*:*" }) + ); + assertThat( + e.getMessage(), + equalTo( + "Attempt to exclude clusters [cluster_1, cluster_2] failed as they are not included in the list of " + + "clusters to be included: []. Input: [-*:*]" + ) + ); + } + { + String[] indices = shuffledList(List.of("cluster*:*", "*:foo", "-*:*")).toArray(new String[0]); - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), indices) - ); - assertThat( - e.getMessage(), - containsString("The '-' exclusions in the index expression list excludes all indexes. Nothing to search.") - ); - } + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), indices) + ); + assertThat( + e.getMessage(), + containsString("The '-' exclusions in the index expression list excludes all indexes. Nothing to search.") + ); } } } } - public void testGroupIndices() throws IOException { + public void testGroupIndices() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport( @@ -391,10 +378,13 @@ public void testGroupIndices() throws IOException { knownNodes.add(cluster1Transport.getLocalNode()); knownNodes.add(cluster2Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try ( MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, + builder.build(), VersionInformation.CURRENT, TransportVersion.current(), threadPool, @@ -403,62 +393,53 @@ public void testGroupIndices() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - assertFalse(isRemoteClusterRegistered(service, "foo")); - { - Map perClusterIndices = service.groupIndices( - IndicesOptions.LENIENT_EXPAND_OPEN, - new String[] { - "cluster_1:bar", - "cluster_2:foo:bar", - "cluster_1:test", - "cluster_2:foo*", - "foo", - "cluster*:baz", - "*:boo" } - ); - assertEquals(3, perClusterIndices.size()); - assertArrayEquals( - new String[] { "foo" }, - perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices() - ); - assertArrayEquals(new String[] { "bar", "test", "baz", "boo" }, perClusterIndices.get("cluster_1").indices()); - assertArrayEquals(new String[] { "foo:bar", "foo*", "baz", "boo" }, perClusterIndices.get("cluster_2").indices()); - } - { - expectThrows( - NoSuchRemoteClusterException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - new String[] { "foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo" } - ) - ); - } - { - Map perClusterIndices = service.groupIndices( - IndicesOptions.LENIENT_EXPAND_OPEN, - new String[] { "cluster_1:bar", "cluster_2:foo*" } - ); - assertEquals(2, perClusterIndices.size()); - assertArrayEquals(new String[] { "bar" }, perClusterIndices.get("cluster_1").indices()); - assertArrayEquals(new String[] { "foo*" }, perClusterIndices.get("cluster_2").indices()); - } - { - Map perClusterIndices = service.groupIndices( - IndicesOptions.LENIENT_EXPAND_OPEN, - Strings.EMPTY_ARRAY - ); - assertEquals(1, perClusterIndices.size()); - assertArrayEquals(Strings.EMPTY_ARRAY, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices()); - } + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteClusterRegistered(service, "foo")); + { + Map perClusterIndices = service.groupIndices( + IndicesOptions.LENIENT_EXPAND_OPEN, + new String[] { + "cluster_1:bar", + "cluster_2:foo:bar", + "cluster_1:test", + "cluster_2:foo*", + "foo", + "cluster*:baz", + "*:boo" } + ); + assertEquals(3, perClusterIndices.size()); + assertArrayEquals(new String[] { "foo" }, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices()); + assertArrayEquals(new String[] { "bar", "test", "baz", "boo" }, perClusterIndices.get("cluster_1").indices()); + assertArrayEquals(new String[] { "foo:bar", "foo*", "baz", "boo" }, perClusterIndices.get("cluster_2").indices()); + } + { + expectThrows( + NoSuchRemoteClusterException.class, + () -> service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + new String[] { "foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo" } + ) + ); + } + { + Map perClusterIndices = service.groupIndices( + IndicesOptions.LENIENT_EXPAND_OPEN, + new String[] { "cluster_1:bar", "cluster_2:foo*" } + ); + assertEquals(2, perClusterIndices.size()); + assertArrayEquals(new String[] { "bar" }, perClusterIndices.get("cluster_1").indices()); + assertArrayEquals(new String[] { "foo*" }, perClusterIndices.get("cluster_2").indices()); + } + { + Map perClusterIndices = service.groupIndices( + IndicesOptions.LENIENT_EXPAND_OPEN, + Strings.EMPTY_ARRAY + ); + assertEquals(1, perClusterIndices.size()); + assertArrayEquals(Strings.EMPTY_ARRAY, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices()); } } } @@ -469,7 +450,7 @@ public void testGroupIndicesWithoutRemoteClusterClientRole() throws Exception { Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "node-1").build(), Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE) ); - try (RemoteClusterService service = createRemoteClusterService(settings, null)) { + try (RemoteClusterService service = createRemoteClusterService(settings)) { expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertFalse(hasRegisteredClusters(service)); final IllegalArgumentException error = expectThrows( @@ -480,7 +461,7 @@ public void testGroupIndicesWithoutRemoteClusterClientRole() throws Exception { } } - public void testIncrementallyAddClusters() throws IOException { + public void testIncrementallyAddClusters() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport( @@ -513,50 +494,41 @@ public void testIncrementallyAddClusters() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertFalse(hasRegisteredClusters(service)); - Settings cluster1Settings = createSettings( - "cluster_1", - Collections.singletonList(cluster1Seed.getAddress().toString()) - ); - PlainActionFuture clusterAdded = new PlainActionFuture<>(); - // Add the cluster on a different thread to test that we wait for a new cluster to - // connect before returning. - new Thread(() -> { - try { - service.updateLinkedProject(toConfig("cluster_1", cluster1Settings)); - clusterAdded.onResponse(null); - } catch (Exception e) { - clusterAdded.onFailure(e); - } - }).start(); - clusterAdded.actionGet(); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - Settings cluster2Settings = createSettings( - "cluster_2", - Collections.singletonList(cluster2Seed.getAddress().toString()) - ); - service.updateLinkedProject(toConfig("cluster_2", cluster2Settings)); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList()); - service.updateLinkedProject(toConfig("cluster_2", cluster2SettingsDisabled)); - assertFalse(isRemoteClusterRegistered(service, "cluster_2")); - IllegalArgumentException iae = expectThrows( - IllegalArgumentException.class, - () -> service.updateLinkedProject(toConfig(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)) - ); - assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); - } + final var service = transportService.getRemoteClusterService(); + assertFalse(hasRegisteredClusters(service)); + Settings cluster1Settings = createSettings("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString())); + PlainActionFuture clusterAdded = new PlainActionFuture<>(); + // Add the cluster on a different thread to test that we wait for a new cluster to + // connect before returning. + new Thread(() -> { + try { + service.updateLinkedProject(toConfig("cluster_1", cluster1Settings)); + clusterAdded.onResponse(null); + } catch (Exception e) { + clusterAdded.onFailure(e); + } + }).start(); + clusterAdded.actionGet(); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + Settings cluster2Settings = createSettings("cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString())); + service.updateLinkedProject(toConfig("cluster_2", cluster2Settings)); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList()); + service.updateLinkedProject(toConfig("cluster_2", cluster2SettingsDisabled)); + assertFalse(isRemoteClusterRegistered(service, "cluster_2")); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> service.updateLinkedProject(toConfig(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)) + ); + assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); } } } - public void testDefaultPingSchedule() throws IOException { + public void testDefaultPingSchedule() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService seedTransport = startTransport( @@ -589,23 +561,20 @@ public void testDefaultPingSchedule() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - final var newSettings = createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); - final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); - service.updateLinkedProject(toConfig("cluster_1", mergedSettings)); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); - assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getConnectionProfile().getPingInterval()); - } + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); + final var newSettings = createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + service.updateLinkedProject(toConfig("cluster_1", mergedSettings)); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getConnectionProfile().getPingInterval()); } } } - public void testCustomPingSchedule() throws IOException { + public void testCustomPingSchedule() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport( @@ -626,12 +595,19 @@ public void testCustomPingSchedule() throws IOException { knownNodes.add(cluster1Transport.getLocalNode()); knownNodes.add(cluster2Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); - Settings.Builder settingsBuilder = Settings.builder(); + Settings.Builder builder = Settings.builder(); if (randomBoolean()) { - settingsBuilder.put(TransportSettings.PING_SCHEDULE.getKey(), TimeValue.timeValueSeconds(randomIntBetween(1, 10))); + builder.put(TransportSettings.PING_SCHEDULE.getKey(), TimeValue.timeValueSeconds(randomIntBetween(1, 10))); } - Settings transportSettings = settingsBuilder.build(); - + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); + TimeValue pingSchedule1 = // randomBoolean() ? TimeValue.MINUS_ONE : + TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1); + TimeValue pingSchedule2 = // randomBoolean() ? TimeValue.MINUS_ONE : + TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); + Settings transportSettings = builder.build(); try ( MockTransportService transportService = MockTransportService.createNewService( transportSettings, @@ -643,23 +619,12 @@ public void testCustomPingSchedule() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - TimeValue pingSchedule1 = // randomBoolean() ? TimeValue.MINUS_ONE : - TimeValue.timeValueSeconds(randomIntBetween(1, 10)); - builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1); - TimeValue pingSchedule2 = // randomBoolean() ? TimeValue.MINUS_ONE : - TimeValue.timeValueSeconds(randomIntBetween(1, 10)); - builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); - try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - initializeRemoteClusters(service); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); - assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); - RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); - assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getConnectionProfile().getPingInterval()); - } + final var service = transportService.getRemoteClusterService(); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); + RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); + assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getConnectionProfile().getPingInterval()); } } } @@ -677,10 +642,12 @@ public void testChangeSettings() throws Exception { DiscoveryNode cluster1Seed = cluster1Transport.getLocalNode(); knownNodes.add(cluster1Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); try ( MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, + builder.build(), VersionInformation.CURRENT, TransportVersion.current(), threadPool, @@ -689,41 +656,37 @@ public void testChangeSettings() throws Exception { ) { transportService.start(); transportService.acceptIncomingRequests(); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - initializeRemoteClusters(service); - RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); - Settings.Builder settingsChange = Settings.builder(); - TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); - settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule); - boolean compressionScheme = randomBoolean(); - Compression.Enabled enabledChange = randomFrom(Compression.Enabled.TRUE, Compression.Enabled.FALSE); - if (compressionScheme) { - settingsChange.put("cluster.remote.cluster_1.transport.compression_scheme", Compression.Scheme.DEFLATE); - } else { - settingsChange.put("cluster.remote.cluster_1.transport.compress", enabledChange); - } - settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - service.updateLinkedProject(toConfig("cluster_1", settingsChange.build())); - assertBusy(remoteClusterConnection::isClosed); - - remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); - ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); - assertEquals(pingSchedule, connectionProfile.getPingInterval()); - if (compressionScheme) { - assertEquals(Compression.Enabled.INDEXING_DATA, connectionProfile.getCompressionEnabled()); - assertEquals(Compression.Scheme.DEFLATE, connectionProfile.getCompressionScheme()); - } else { - assertEquals(enabledChange, connectionProfile.getCompressionEnabled()); - assertEquals(Compression.Scheme.LZ4, connectionProfile.getCompressionScheme()); - } + final var service = transportService.getRemoteClusterService(); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + Settings.Builder settingsChange = Settings.builder(); + TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); + settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule); + boolean compressionScheme = randomBoolean(); + Compression.Enabled enabledChange = randomFrom(Compression.Enabled.TRUE, Compression.Enabled.FALSE); + if (compressionScheme) { + settingsChange.put("cluster.remote.cluster_1.transport.compression_scheme", Compression.Scheme.DEFLATE); + } else { + settingsChange.put("cluster.remote.cluster_1.transport.compress", enabledChange); + } + settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + service.updateLinkedProject(toConfig("cluster_1", settingsChange.build())); + assertBusy(remoteClusterConnection::isClosed); + + remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); + assertEquals(pingSchedule, connectionProfile.getPingInterval()); + if (compressionScheme) { + assertEquals(Compression.Enabled.INDEXING_DATA, connectionProfile.getCompressionEnabled()); + assertEquals(Compression.Scheme.DEFLATE, connectionProfile.getCompressionScheme()); + } else { + assertEquals(enabledChange, connectionProfile.getCompressionEnabled()); + assertEquals(Compression.Scheme.LZ4, connectionProfile.getCompressionScheme()); } } } } - public void testRemoteNodeAttribute() throws IOException, InterruptedException { + public void testRemoteNodeAttribute() throws InterruptedException { final Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); final List knownNodes = new CopyOnWriteArrayList<>(); final Settings gateway = Settings.builder().put("node.attr.gateway", true).build(); @@ -776,33 +739,30 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertFalse(hasRegisteredClusters(service)); - - final CountDownLatch firstLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); - firstLatch.await(); - - final CountDownLatch secondLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); - secondLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); - assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); - assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); - assertEquals(0, transportService.getConnectionManager().size()); - } + final var service = transportService.getRemoteClusterService(); + assertFalse(hasRegisteredClusters(service)); + + final CountDownLatch firstLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); + assertEquals(0, transportService.getConnectionManager().size()); } } } - public void testRemoteNodeRoles() throws IOException, InterruptedException { + public void testRemoteNodeRoles() throws InterruptedException { final Settings settings = Settings.EMPTY; final List knownNodes = new CopyOnWriteArrayList<>(); final Settings data = nonMasterNode(); @@ -858,28 +818,25 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertFalse(hasRegisteredClusters(service)); - - final CountDownLatch firstLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); - firstLatch.await(); - - final CountDownLatch secondLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); - secondLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); - assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); - assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); - assertEquals(0, transportService.getConnectionManager().size()); - } + final var service = transportService.getRemoteClusterService(); + assertFalse(hasRegisteredClusters(service)); + + final CountDownLatch firstLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); + assertEquals(0, transportService.getConnectionManager().size()); } } } @@ -945,127 +902,124 @@ public void testCollectNodes() throws InterruptedException, IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertFalse(hasRegisteredClusters(service)); - - final CountDownLatch firstLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); - firstLatch.await(); - - final CountDownLatch secondLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); - secondLatch.await(); - CountDownLatch latch = new CountDownLatch(1); + final var service = transportService.getRemoteClusterService(); + assertFalse(hasRegisteredClusters(service)); + + final CountDownLatch firstLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); + secondLatch.await(); + CountDownLatch latch = new CountDownLatch(1); + service.collectNodes( + new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), + new ActionListener>() { + @Override + public void onResponse(BiFunction func) { + try { + assertEquals(c1N1Node, func.apply("cluster_1", c1N1Node.getId())); + assertEquals(c1N2Node, func.apply("cluster_1", c1N2Node.getId())); + assertEquals(c2N1Node, func.apply("cluster_2", c2N1Node.getId())); + assertEquals(c2N2Node, func.apply("cluster_2", c2N2Node.getId())); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + } + ); + latch.await(); + { + CountDownLatch failLatch = new CountDownLatch(1); + AtomicReference ex = new AtomicReference<>(); service.collectNodes( - new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), + new HashSet<>(Arrays.asList("cluster_1", "cluster_2", "no such cluster")), new ActionListener>() { @Override - public void onResponse(BiFunction func) { + public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { try { - assertEquals(c1N1Node, func.apply("cluster_1", c1N1Node.getId())); - assertEquals(c1N2Node, func.apply("cluster_1", c1N2Node.getId())); - assertEquals(c2N1Node, func.apply("cluster_2", c2N1Node.getId())); - assertEquals(c2N2Node, func.apply("cluster_2", c2N2Node.getId())); + fail("should not be called"); } finally { - latch.countDown(); + failLatch.countDown(); } } @Override public void onFailure(Exception e) { try { - throw new AssertionError(e); + ex.set(e); } finally { - latch.countDown(); + failLatch.countDown(); } } } ); - latch.await(); - { - CountDownLatch failLatch = new CountDownLatch(1); - AtomicReference ex = new AtomicReference<>(); - service.collectNodes( - new HashSet<>(Arrays.asList("cluster_1", "cluster_2", "no such cluster")), - new ActionListener>() { - @Override - public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { - try { - fail("should not be called"); - } finally { - failLatch.countDown(); - } - } - - @Override - public void onFailure(Exception e) { - try { - ex.set(e); - } finally { - failLatch.countDown(); - } + failLatch.await(); + assertNotNull(ex.get()); + assertTrue(ex.get() instanceof NoSuchRemoteClusterException); + assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage()); + } + { + logger.info("closing all source nodes"); + // close all targets and check for the transport level failure path + IOUtils.close(c1N1, c1N2, c2N1, c2N2); + logger.info("all source nodes are closed"); + CountDownLatch failLatch = new CountDownLatch(1); + AtomicReference ex = new AtomicReference<>(); + service.collectNodes( + new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), + new ActionListener>() { + @Override + public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { + try { + fail("should not be called"); + } finally { + failLatch.countDown(); } } - ); - failLatch.await(); - assertNotNull(ex.get()); - assertTrue(ex.get() instanceof NoSuchRemoteClusterException); - assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage()); - } - { - logger.info("closing all source nodes"); - // close all targets and check for the transport level failure path - IOUtils.close(c1N1, c1N2, c2N1, c2N2); - logger.info("all source nodes are closed"); - CountDownLatch failLatch = new CountDownLatch(1); - AtomicReference ex = new AtomicReference<>(); - service.collectNodes( - new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), - new ActionListener>() { - @Override - public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { - try { - fail("should not be called"); - } finally { - failLatch.countDown(); - } - } - @Override - public void onFailure(Exception e) { - try { - ex.set(e); - } finally { - failLatch.countDown(); - } + @Override + public void onFailure(Exception e) { + try { + ex.set(e); + } finally { + failLatch.countDown(); } } - ); - failLatch.await(); - assertNotNull(ex.get()); - if (ex.get() instanceof IllegalStateException) { - assertThat( - ex.get().getMessage(), - either(equalTo("Unable to open any connections to remote cluster [cluster_1]")).or( - equalTo("Unable to open any connections to remote cluster [cluster_2]") - ) - ); - } else { - assertThat( - ex.get(), - either(instanceOf(TransportException.class)).or(instanceOf(NoSuchRemoteClusterException.class)) - .or(instanceOf(NoSeedNodeLeftException.class)) - ); } + ); + failLatch.await(); + assertNotNull(ex.get()); + if (ex.get() instanceof IllegalStateException) { + assertThat( + ex.get().getMessage(), + either(equalTo("Unable to open any connections to remote cluster [cluster_1]")).or( + equalTo("Unable to open any connections to remote cluster [cluster_2]") + ) + ); + } else { + assertThat( + ex.get(), + either(instanceOf(TransportException.class)).or(instanceOf(NoSuchRemoteClusterException.class)) + .or(instanceOf(NoSeedNodeLeftException.class)) + ); } } } } } - public void testCollectNodesConcurrentWithSettingsChanges() throws IOException { + public void testCollectNodesConcurrentWithSettingsChanges() { final List knownNodes_c1 = new CopyOnWriteArrayList<>(); try ( @@ -1075,24 +1029,25 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException { VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY - ); - var transportService = MockTransportService.createNewService( - Settings.EMPTY, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null ) ) { final var c1N1Node = c1N1.getLocalNode(); knownNodes_c1.add(c1N1Node); final var seedList = List.of(c1N1Node.getAddress().toString()); - transportService.start(); - transportService.acceptIncomingRequests(); final var initialSettings = createSettings("cluster_1", seedList); - try (RemoteClusterService service = createRemoteClusterService(initialSettings, transportService)) { - initializeRemoteClusters(service); + try ( + var transportService = MockTransportService.createNewService( + initialSettings, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + final var service = transportService.getRemoteClusterService(); assertTrue(hasRegisteredClusters(service)); final var numTasks = between(3, 5); final var taskLatch = new CountDownLatch(numTasks); @@ -1263,10 +1218,13 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { knownNodes.add(node0); knownNodes.add(node1); Collections.shuffle(knownNodes, random()); + final Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); + final var initialSettings = builder.build(); try ( MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, + initialSettings, VersionInformation.CURRENT, TransportVersion.current(), threadPool, @@ -1275,57 +1233,50 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { ) { transportService.start(); transportService.acceptIncomingRequests(); + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); - final Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); - final var initialSettings = builder.build(); - try (RemoteClusterService service = createRemoteClusterService(initialSettings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - - final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); - assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); - assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); - assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); - assertFalse(firstRemoteClusterConnection.isClosed()); - - final CountDownLatch firstLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_test", initialSettings, List.of(node0), connectionListener(firstLatch)); - firstLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); - assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); - assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); - assertFalse(firstRemoteClusterConnection.isClosed()); - assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test")); - - final List newSeeds = new ArrayList<>(); - newSeeds.add(node1); - if (randomBoolean()) { - newSeeds.add(node0); - Collections.shuffle(newSeeds, random()); - } + final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); + assertFalse(firstRemoteClusterConnection.isClosed()); - final CountDownLatch secondLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_test", initialSettings, newSeeds, connectionListener(secondLatch)); - secondLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertBusy(() -> { - assertFalse(firstRemoteClusterConnection.isNodeConnected(node0)); - assertFalse(firstRemoteClusterConnection.isNodeConnected(node1)); - assertEquals(0, firstRemoteClusterConnection.getNumNodesConnected()); - assertTrue(firstRemoteClusterConnection.isClosed()); - }); - - final RemoteClusterConnection secondRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); - assertTrue(secondRemoteClusterConnection.isNodeConnected(node0)); - assertTrue(secondRemoteClusterConnection.isNodeConnected(node1)); - assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected()); - assertFalse(secondRemoteClusterConnection.isClosed()); + final CountDownLatch firstLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_test", initialSettings, List.of(node0), connectionListener(firstLatch)); + firstLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); + assertFalse(firstRemoteClusterConnection.isClosed()); + assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test")); + + final List newSeeds = new ArrayList<>(); + newSeeds.add(node1); + if (randomBoolean()) { + newSeeds.add(node0); + Collections.shuffle(newSeeds, random()); } + + final CountDownLatch secondLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_test", initialSettings, newSeeds, connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertBusy(() -> { + assertFalse(firstRemoteClusterConnection.isNodeConnected(node0)); + assertFalse(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(0, firstRemoteClusterConnection.getNumNodesConnected()); + assertTrue(firstRemoteClusterConnection.isClosed()); + }); + + final RemoteClusterConnection secondRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); + assertTrue(secondRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(secondRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected()); + assertFalse(secondRemoteClusterConnection.isClosed()); } } } @@ -1418,7 +1369,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(nodeNameSettings) .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)) .build(); - try (RemoteClusterService service = createRemoteClusterService(settingsWithRemoteClusterClientRole, null)) { + try (RemoteClusterService service = createRemoteClusterService(settingsWithRemoteClusterClientRole)) { service.ensureClientIsEnabled(); } @@ -1427,7 +1378,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(nodeNameSettings) .put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .build(); - try (RemoteClusterService service = createRemoteClusterService(settingsWithoutRemoteClusterClientRole, null)) { + try (RemoteClusterService service = createRemoteClusterService(settingsWithoutRemoteClusterClientRole)) { final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertThat(exception.getMessage(), equalTo("node [node-1] does not have the [remote_cluster_client] role")); } @@ -1438,7 +1389,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.INDEX_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledSettingsOnNonSearchNode, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledSettingsOnNonSearchNode)) { final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertThat( exception.getMessage(), @@ -1455,7 +1406,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeSettings, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeSettings)) { service.ensureClientIsEnabled(); } final var statelessEnabledOnRemoteClusterClientSettings = Settings.builder() @@ -1463,7 +1414,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnRemoteClusterClientSettings, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnRemoteClusterClientSettings)) { service.ensureClientIsEnabled(); } final var statelessEnabledOnSearchNodeAndRemoteClusterClientSettings = Settings.builder() @@ -1471,7 +1422,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings)) { service.ensureClientIsEnabled(); } } @@ -1500,7 +1451,7 @@ public void testRemoteClusterServiceNotEnabledGetCollectNodes() { } } - public void testUseDifferentTransportProfileForCredentialsProtectedRemoteClusters() throws IOException, InterruptedException { + public void testUseDifferentTransportProfileForCredentialsProtectedRemoteClusters() throws InterruptedException { final List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService c1 = startTransport( @@ -1542,56 +1493,53 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster }); transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - initializeRemoteClusters(service); - - final CountDownLatch firstLatch = new CountDownLatch(1); - final Settings.Builder firstRemoteClusterSettingsBuilder = Settings.builder(); - final boolean firstRemoteClusterProxyMode = randomBoolean(); - if (firstRemoteClusterProxyMode) { - firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.mode", "proxy") - .put("cluster.remote.cluster_1.proxy_address", c1Node.getAddress().toString()); - } else { - firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.seeds", c1Node.getAddress().toString()); - } - final var updatedSettings1 = firstRemoteClusterSettingsBuilder.build(); - updateRemoteCluster(service, "cluster_1", settings, updatedSettings1, connectionListener(firstLatch)); - firstLatch.await(); - - final CountDownLatch secondLatch = new CountDownLatch(1); - final Settings.Builder secondRemoteClusterSettingsBuilder = Settings.builder(); - final boolean secondRemoteClusterProxyMode = randomBoolean(); - if (secondRemoteClusterProxyMode) { - secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.mode", "proxy") - .put("cluster.remote.cluster_2.proxy_address", c2Node.getAddress().toString()); - } else { - secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.seeds", c2Node.getAddress().toString()); - } - final var updatedSettings2 = secondRemoteClusterSettingsBuilder.build(); - updateRemoteCluster(service, "cluster_2", settings, updatedSettings2, connectionListener(secondLatch)); - secondLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - if (firstRemoteClusterProxyMode) { - assertFalse(isRemoteNodeConnected(service, "cluster_1", c1Node)); - } else { - assertTrue(isRemoteNodeConnected(service, "cluster_1", c1Node)); - } - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - if (secondRemoteClusterProxyMode) { - assertFalse(isRemoteNodeConnected(service, "cluster_2", c2Node)); - } else { - assertTrue(isRemoteNodeConnected(service, "cluster_2", c2Node)); - } - // No local node connection - assertEquals(0, transportService.getConnectionManager().size()); + final var service = transportService.getRemoteClusterService(); + final CountDownLatch firstLatch = new CountDownLatch(1); + final Settings.Builder firstRemoteClusterSettingsBuilder = Settings.builder(); + final boolean firstRemoteClusterProxyMode = randomBoolean(); + if (firstRemoteClusterProxyMode) { + firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.mode", "proxy") + .put("cluster.remote.cluster_1.proxy_address", c1Node.getAddress().toString()); + } else { + firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.seeds", c1Node.getAddress().toString()); + } + final var updatedSettings1 = firstRemoteClusterSettingsBuilder.build(); + updateRemoteCluster(service, "cluster_1", settings, updatedSettings1, connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + final Settings.Builder secondRemoteClusterSettingsBuilder = Settings.builder(); + final boolean secondRemoteClusterProxyMode = randomBoolean(); + if (secondRemoteClusterProxyMode) { + secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.mode", "proxy") + .put("cluster.remote.cluster_2.proxy_address", c2Node.getAddress().toString()); + } else { + secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.seeds", c2Node.getAddress().toString()); + } + final var updatedSettings2 = secondRemoteClusterSettingsBuilder.build(); + updateRemoteCluster(service, "cluster_2", settings, updatedSettings2, connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + if (firstRemoteClusterProxyMode) { + assertFalse(isRemoteNodeConnected(service, "cluster_1", c1Node)); + } else { + assertTrue(isRemoteNodeConnected(service, "cluster_1", c1Node)); + } + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + if (secondRemoteClusterProxyMode) { + assertFalse(isRemoteNodeConnected(service, "cluster_2", c2Node)); + } else { + assertTrue(isRemoteNodeConnected(service, "cluster_2", c2Node)); } + // No local node connection + assertEquals(0, transportService.getConnectionManager().size()); } } } - public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfile() throws IOException, InterruptedException { + public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfile() throws InterruptedException { final List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService c = startTransport( @@ -1617,52 +1565,48 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi ) { transportService.start(); transportService.acceptIncomingRequests(); + final var service = transportService.getRemoteClusterService(); + final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString()); + final CountDownLatch latch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_1", Settings.EMPTY, clusterSettings, connectionListener(latch)); + latch.await(); + + assertConnectionHasProfile(service.getRemoteClusterConnection("cluster_1"), "default"); + + { + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("cluster.remote.cluster_1.credentials", randomAlphaOfLength(10)); + final PlainActionFuture listener = new PlainActionFuture<>(); + final Settings settings = Settings.builder().put(clusterSettings).setSecureSettings(secureSettings).build(); + final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); + assertThat(result.addedClusterAliases(), equalTo(Set.of("cluster_1"))); + final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, settings); + service.updateRemoteCluster(config, true, listener); + listener.actionGet(10, TimeUnit.SECONDS); + } - try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - initializeRemoteClusters(service); - - final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString()); - final CountDownLatch latch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_1", Settings.EMPTY, clusterSettings, connectionListener(latch)); - latch.await(); - - assertConnectionHasProfile(service.getRemoteClusterConnection("cluster_1"), "default"); - - { - final MockSecureSettings secureSettings = new MockSecureSettings(); - secureSettings.setString("cluster.remote.cluster_1.credentials", randomAlphaOfLength(10)); - final PlainActionFuture listener = new PlainActionFuture<>(); - final Settings settings = Settings.builder().put(clusterSettings).setSecureSettings(secureSettings).build(); - final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); - assertThat(result.addedClusterAliases(), equalTo(Set.of("cluster_1"))); - final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, settings); - service.updateRemoteCluster(config, true, listener); - listener.actionGet(10, TimeUnit.SECONDS); - } - - assertConnectionHasProfile( - service.getRemoteClusterConnection("cluster_1"), - RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE - ); - - { - final PlainActionFuture listener = new PlainActionFuture<>(); - // Settings without credentials constitute credentials removal - final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(clusterSettings); - assertThat(result.addedClusterAliases().size(), equalTo(0)); - assertThat(result.removedClusterAliases(), equalTo(Set.of("cluster_1"))); - final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, clusterSettings); - service.updateRemoteCluster(config, true, listener); - listener.actionGet(10, TimeUnit.SECONDS); - } + assertConnectionHasProfile( + service.getRemoteClusterConnection("cluster_1"), + RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE + ); - assertConnectionHasProfile(service.getRemoteClusterConnection("cluster_1"), "default"); + { + final PlainActionFuture listener = new PlainActionFuture<>(); + // Settings without credentials constitute credentials removal + final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(clusterSettings); + assertThat(result.addedClusterAliases().size(), equalTo(0)); + assertThat(result.removedClusterAliases(), equalTo(Set.of("cluster_1"))); + final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, clusterSettings); + service.updateRemoteCluster(config, true, listener); + listener.actionGet(10, TimeUnit.SECONDS); } + + assertConnectionHasProfile(service.getRemoteClusterConnection("cluster_1"), "default"); } } } - public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespiteFailures() throws IOException, InterruptedException { + public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespiteFailures() throws InterruptedException { final List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService c1 = startTransport( @@ -1711,83 +1655,80 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite alias -> alias.equals(goodCluster) || alias.equals(badCluster), () -> randomAlphaOfLength(10) ); - try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - initializeRemoteClusters(service); - - final Settings cluster1Settings = buildRemoteClusterSettings(goodCluster, c1DiscoNode.getAddress().toString()); - final var latch = new CountDownLatch(1); - updateRemoteCluster(service, goodCluster, Settings.EMPTY, cluster1Settings, connectionListener(latch)); - latch.await(); - - final Settings cluster2Settings = buildRemoteClusterSettings(badCluster, c2DiscoNode.getAddress().toString()); - final PlainActionFuture future = new PlainActionFuture<>(); - updateRemoteCluster(service, badCluster, Settings.EMPTY, cluster2Settings, future); - final var ex = expectThrows(Exception.class, () -> future.actionGet(10, TimeUnit.SECONDS)); - assertThat(ex.getMessage(), containsString("bad cluster")); - - assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default"); - assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default"); - expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); - final Set aliases = Set.of(badCluster, goodCluster, missingCluster); - final ActionListener noop = ActionListener.noop(); - - { - final MockSecureSettings secureSettings = new MockSecureSettings(); - secureSettings.setString("cluster.remote." + badCluster + ".credentials", randomAlphaOfLength(10)); - secureSettings.setString("cluster.remote." + goodCluster + ".credentials", randomAlphaOfLength(10)); - secureSettings.setString("cluster.remote." + missingCluster + ".credentials", randomAlphaOfLength(10)); - final PlainActionFuture listener = new PlainActionFuture<>(); - final Settings settings = Settings.builder() - .put(cluster1Settings) - .put(cluster2Settings) - .setSecureSettings(secureSettings) - .build(); - final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); - assertThat(result.addedClusterAliases(), equalTo(aliases)); - try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { - for (String alias : aliases) { - final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings); - service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire())); - } + final var service = transportService.getRemoteClusterService(); + final Settings cluster1Settings = buildRemoteClusterSettings(goodCluster, c1DiscoNode.getAddress().toString()); + final var latch = new CountDownLatch(1); + updateRemoteCluster(service, goodCluster, Settings.EMPTY, cluster1Settings, connectionListener(latch)); + latch.await(); + + final Settings cluster2Settings = buildRemoteClusterSettings(badCluster, c2DiscoNode.getAddress().toString()); + final PlainActionFuture future = new PlainActionFuture<>(); + updateRemoteCluster(service, badCluster, Settings.EMPTY, cluster2Settings, future); + final var ex = expectThrows(Exception.class, () -> future.actionGet(10, TimeUnit.SECONDS)); + assertThat(ex.getMessage(), containsString("bad cluster")); + + assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default"); + assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default"); + expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); + final Set aliases = Set.of(badCluster, goodCluster, missingCluster); + final ActionListener noop = ActionListener.noop(); + + { + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("cluster.remote." + badCluster + ".credentials", randomAlphaOfLength(10)); + secureSettings.setString("cluster.remote." + goodCluster + ".credentials", randomAlphaOfLength(10)); + secureSettings.setString("cluster.remote." + missingCluster + ".credentials", randomAlphaOfLength(10)); + final PlainActionFuture listener = new PlainActionFuture<>(); + final Settings settings = Settings.builder() + .put(cluster1Settings) + .put(cluster2Settings) + .setSecureSettings(secureSettings) + .build(); + final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); + assertThat(result.addedClusterAliases(), equalTo(aliases)); + try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { + for (String alias : aliases) { + final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings); + service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire())); } - listener.actionGet(10, TimeUnit.SECONDS); } + listener.actionGet(10, TimeUnit.SECONDS); + } - assertConnectionHasProfile( - service.getRemoteClusterConnection(goodCluster), - RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE - ); - assertConnectionHasProfile( - service.getRemoteClusterConnection(badCluster), - RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE - ); - expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); - - { - final PlainActionFuture listener = new PlainActionFuture<>(); - final Settings settings = Settings.builder().put(cluster1Settings).put(cluster2Settings).build(); - // Settings without credentials constitute credentials removal - final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); - assertThat(result.addedClusterAliases().size(), equalTo(0)); - assertThat(result.removedClusterAliases(), equalTo(aliases)); - try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { - for (String alias : aliases) { - final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings); - service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire())); - } + assertConnectionHasProfile( + service.getRemoteClusterConnection(goodCluster), + RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE + ); + assertConnectionHasProfile( + service.getRemoteClusterConnection(badCluster), + RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE + ); + expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); + + { + final PlainActionFuture listener = new PlainActionFuture<>(); + final Settings settings = Settings.builder().put(cluster1Settings).put(cluster2Settings).build(); + // Settings without credentials constitute credentials removal + final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); + assertThat(result.addedClusterAliases().size(), equalTo(0)); + assertThat(result.removedClusterAliases(), equalTo(aliases)); + try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { + for (String alias : aliases) { + final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings); + service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire())); } - listener.actionGet(10, TimeUnit.SECONDS); } - - assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default"); - assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default"); - expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); + listener.actionGet(10, TimeUnit.SECONDS); } + + assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default"); + assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default"); + expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); } } } - public void testCorrectTransportProfileUsedWhenCPSEnabled() throws IOException { + public void testCorrectTransportProfileUsedWhenCPSEnabled() { final var versionInfo = VersionInformation.CURRENT; final var transportVers = TransportVersion.current(); final var knownNodes = new CopyOnWriteArrayList(); @@ -1805,14 +1746,12 @@ public void testCorrectTransportProfileUsedWhenCPSEnabled() throws IOException { try (var transportService = MockTransportService.createNewService(settings, versionInfo, transportVers, threadPool)) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - assertConnectionHasProfile( - service.getRemoteClusterConnection("cluster1"), - RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE - ); - } + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); + assertConnectionHasProfile( + service.getRemoteClusterConnection("cluster1"), + RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE + ); } } } @@ -1836,14 +1775,20 @@ private Settings buildRemoteClusterSettings(String clusterAlias, String address) return settings.build(); } - public void testLogsConnectionResult() throws IOException { + public void testLogsConnectionResult() { final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(); try ( var remote = startTransport("remote", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); - var local = startTransport("local", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); - var remoteClusterService = createRemoteClusterService(Settings.EMPTY, clusterSettings, local) + var local = MockTransportService.createNewService( + Settings.EMPTY, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool, + clusterSettings + ) ) { - linkedProjectConfigService.register(remoteClusterService); + local.start(); + local.acceptIncomingRequests(); assertThatLogger( () -> clusterSettings.applySettings( @@ -1882,7 +1827,7 @@ public void testLogsConnectionResult() throws IOException { } } - public void testSetSkipUnavailable() throws IOException { + public void testSetSkipUnavailable() { final var skipUnavailableProperty = RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("remote") .getKey(); final var seedNodeProperty = SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("remote").getKey(); @@ -1891,11 +1836,15 @@ public void testSetSkipUnavailable() throws IOException { try ( var remote1Transport = startTransport("remote1"); var remote2Transport = startTransport("remote2"); - var local = startTransport("local"); - var remoteClusterService = createRemoteClusterService(Settings.EMPTY, clusterSettings, local) + var local = MockTransportService.createNewService( + Settings.EMPTY, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool, + clusterSettings + ) ) { - linkedProjectConfigService.register(remoteClusterService); - + final var remoteClusterService = local.getRemoteClusterService(); record SkipUnavailableTestConfig( boolean skipUnavailable, MockTransportService seedNodeTransportService, @@ -1964,10 +1913,6 @@ private void updateRemoteCluster( service.updateRemoteCluster(buildLinkedProjectConfig(alias, settings, newSettings), false, listener); } - private void initializeRemoteClusters(RemoteClusterService remoteClusterService) { - remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.getInitialLinkedProjectConfigs()); - } - private static Settings createSettings(String clusterAlias, List seeds) { Settings.Builder builder = Settings.builder(); builder.put(