diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 31d73822ecd9d..5830af8725511 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -102,11 +102,9 @@ public boolean isRemoteClusterServerEnabled() { * the functionality to do it the right way is not yet ready -- replace this code when it's ready. */ this.crossProjectEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false); - if (transportService != null) { - transportService.getTelemetryProvider() - .getMeterRegistry() - .registerLongCounter(CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, "linked project connection attempt failure count", "count"); - } + transportService.getTelemetryProvider() + .getMeterRegistry() + .registerLongCounter(CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, "linked project connection attempt failure count", "count"); } public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index c2a59fda3a71b..9f9af69464f32 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.project.DefaultProjectResolver; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.AbstractScopedSettings; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.MockSecureSettings; @@ -71,6 +72,10 @@ public class RemoteClusterServiceTests extends ESTestCase { + private static final Settings REMOTE_CLUSTER_SERVER_ENABLED_SETTINGS = Settings.builder() + .put(RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.getKey(), "true") + .put(RemoteClusterPortSettings.PORT.getKey(), "0") + .build(); private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); private final ProjectResolver projectResolver = DefaultProjectResolver.INSTANCE; @@ -80,31 +85,35 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } - private RemoteClusterService createRemoteClusterService(Settings settings) { - return new RemoteClusterService(settings, null, projectResolver); + private MockTransportService startTransport(final String id, final List knownNodes, final Settings settings) { + return RemoteClusterConnectionTests.startTransport( + id, + knownNodes, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool, + settings + ); } - private MockTransportService startTransport( - String id, - List knownNodes, - VersionInformation version, - TransportVersion transportVersion - ) { - return startTransport(id, knownNodes, version, transportVersion, Settings.EMPTY); + private MockTransportService startTransport() { + return startTransport(Settings.EMPTY); } - private MockTransportService startTransport( - final String id, - final List knownNodes, - final VersionInformation version, - final TransportVersion transportVersion, - final Settings settings - ) { - return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, transportVersion, threadPool, settings); + private MockTransportService startTransport(Settings settings) { + return startTransport(UUIDs.randomBase64UUID(), List.of(), settings); } private MockTransportService startTransport(final String id) { - return startTransport(id, List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); + return startTransport(id, List.of(), Settings.EMPTY); + } + + private MockTransportService startTransport(final String id, Settings settings) { + return startTransport(id, List.of(), settings); + } + + private MockTransportService startTransport(final String id, List knownNodes) { + return startTransport(id, knownNodes, Settings.EMPTY); } public void testSettingsAreRegistered() { @@ -149,18 +158,8 @@ public void testRemoteClusterSeedSetting() { public void testGroupClusterIndices() { List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport( - "cluster_1_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ); - MockTransportService cluster2Transport = startTransport( - "cluster_2_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes) ) { DiscoveryNode cluster1Seed = cluster1Transport.getLocalNode(); DiscoveryNode cluster2Seed = cluster2Transport.getLocalNode(); @@ -171,17 +170,7 @@ public void testGroupClusterIndices() { builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - try ( - MockTransportService transportService = MockTransportService.createNewService( - builder.build(), - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport(builder.build())) { final var service = transportService.getRemoteClusterService(); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); @@ -360,18 +349,8 @@ public void testGroupClusterIndices() { public void testGroupIndices() { List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport( - "cluster_1_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ); - MockTransportService cluster2Transport = startTransport( - "cluster_2_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes) ) { DiscoveryNode cluster1Seed = cluster1Transport.getLocalNode(); DiscoveryNode cluster2Seed = cluster2Transport.getLocalNode(); @@ -382,17 +361,7 @@ public void testGroupIndices() { builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - try ( - MockTransportService transportService = MockTransportService.createNewService( - builder.build(), - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport(builder.build())) { final var service = transportService.getRemoteClusterService(); assertTrue(hasRegisteredClusters(service)); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); @@ -450,7 +419,8 @@ public void testGroupIndicesWithoutRemoteClusterClientRole() throws Exception { Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "node-1").build(), Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE) ); - try (RemoteClusterService service = createRemoteClusterService(settings)) { + try (var transportService = startTransport("node-1", settings)) { + final var service = transportService.getRemoteClusterService(); expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertFalse(hasRegisteredClusters(service)); final IllegalArgumentException error = expectThrows( @@ -464,18 +434,8 @@ public void testGroupIndicesWithoutRemoteClusterClientRole() throws Exception { public void testIncrementallyAddClusters() { List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport( - "cluster_1_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ); - MockTransportService cluster2Transport = startTransport( - "cluster_2_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes) ) { DiscoveryNode cluster1Seed = cluster1Transport.getLocalNode(); DiscoveryNode cluster2Seed = cluster2Transport.getLocalNode(); @@ -483,17 +443,7 @@ public void testIncrementallyAddClusters() { knownNodes.add(cluster2Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); - try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport()) { final var service = transportService.getRemoteClusterService(); assertFalse(hasRegisteredClusters(service)); Settings cluster1Settings = createSettings("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString())); @@ -530,14 +480,7 @@ public void testIncrementallyAddClusters() { public void testDefaultPingSchedule() { List knownNodes = new CopyOnWriteArrayList<>(); - try ( - MockTransportService seedTransport = startTransport( - "cluster_1_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ) - ) { + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes)) { DiscoveryNode seedNode = seedTransport.getLocalNode(); knownNodes.add(seedTransport.getLocalNode()); TimeValue pingSchedule; @@ -550,17 +493,7 @@ public void testDefaultPingSchedule() { pingSchedule = TimeValue.MINUS_ONE; } Settings settings = settingsBuilder.build(); - try ( - MockTransportService transportService = MockTransportService.createNewService( - settings, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport(settings)) { final var service = transportService.getRemoteClusterService(); assertTrue(hasRegisteredClusters(service)); final var newSettings = createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); @@ -577,18 +510,8 @@ public void testDefaultPingSchedule() { public void testCustomPingSchedule() { List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster1Transport = startTransport( - "cluster_1_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ); - MockTransportService cluster2Transport = startTransport( - "cluster_2_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ) + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes) ) { DiscoveryNode cluster1Seed = cluster1Transport.getLocalNode(); DiscoveryNode cluster2Seed = cluster2Transport.getLocalNode(); @@ -607,18 +530,7 @@ public void testCustomPingSchedule() { TimeValue pingSchedule2 = // randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); - Settings transportSettings = builder.build(); - try ( - MockTransportService transportService = MockTransportService.createNewService( - transportSettings, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport(builder.build())) { final var service = transportService.getRemoteClusterService(); assertTrue(isRemoteClusterRegistered(service, "cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); @@ -631,31 +543,14 @@ public void testCustomPingSchedule() { public void testChangeSettings() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); - try ( - MockTransportService cluster1Transport = startTransport( - "cluster_1_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ) - ) { + try (MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes)) { DiscoveryNode cluster1Seed = cluster1Transport.getLocalNode(); knownNodes.add(cluster1Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - try ( - MockTransportService transportService = MockTransportService.createNewService( - builder.build(), - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport(builder.build())) { final var service = transportService.getRemoteClusterService(); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); Settings.Builder settingsChange = Settings.builder(); @@ -691,32 +586,10 @@ public void testRemoteNodeAttribute() throws InterruptedException { final List knownNodes = new CopyOnWriteArrayList<>(); final Settings gateway = Settings.builder().put("node.attr.gateway", true).build(); try ( - MockTransportService c1N1 = startTransport( - "cluster_1_node_1", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ); - MockTransportService c1N2 = startTransport( - "cluster_1_node_2", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - gateway - ); - MockTransportService c2N1 = startTransport( - "cluster_2_node_1", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ); - MockTransportService c2N2 = startTransport( - "cluster_2_node_2", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - gateway - ) + MockTransportService c1N1 = startTransport("cluster_1_node_1", knownNodes); + MockTransportService c1N2 = startTransport("cluster_1_node_2", knownNodes, gateway); + MockTransportService c2N1 = startTransport("cluster_2_node_1", knownNodes); + MockTransportService c2N2 = startTransport("cluster_2_node_2", knownNodes, gateway) ) { final DiscoveryNode c1N1Node = c1N1.getLocalNode(); final DiscoveryNode c1N2Node = c1N2.getLocalNode(); @@ -728,17 +601,7 @@ public void testRemoteNodeAttribute() throws InterruptedException { knownNodes.add(c2N2Node); Collections.shuffle(knownNodes, random()); - try ( - MockTransportService transportService = MockTransportService.createNewService( - settings, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport(settings)) { final var service = transportService.getRemoteClusterService(); assertFalse(hasRegisteredClusters(service)); @@ -768,34 +631,10 @@ public void testRemoteNodeRoles() throws InterruptedException { final Settings data = nonMasterNode(); final Settings dedicatedMaster = masterOnlyNode(); try ( - MockTransportService c1N1 = startTransport( - "cluster_1_node_1", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - dedicatedMaster - ); - MockTransportService c1N2 = startTransport( - "cluster_1_node_2", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - data - ); - MockTransportService c2N1 = startTransport( - "cluster_2_node_1", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - dedicatedMaster - ); - MockTransportService c2N2 = startTransport( - "cluster_2_node_2", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - data - ) + MockTransportService c1N1 = startTransport("cluster_1_node_1", knownNodes, dedicatedMaster); + MockTransportService c1N2 = startTransport("cluster_1_node_2", knownNodes, data); + MockTransportService c2N1 = startTransport("cluster_2_node_1", knownNodes, dedicatedMaster); + MockTransportService c2N2 = startTransport("cluster_2_node_2", knownNodes, data) ) { final DiscoveryNode c1N1Node = c1N1.getLocalNode(); final DiscoveryNode c1N2Node = c1N2.getLocalNode(); @@ -807,17 +646,7 @@ public void testRemoteNodeRoles() throws InterruptedException { knownNodes.add(c2N2Node); Collections.shuffle(knownNodes, random()); - try ( - MockTransportService transportService = MockTransportService.createNewService( - settings, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport(settings)) { final var service = transportService.getRemoteClusterService(); assertFalse(hasRegisteredClusters(service)); @@ -851,34 +680,10 @@ public void testCollectNodes() throws InterruptedException, IOException { final List knownNodes_c2 = new CopyOnWriteArrayList<>(); try ( - MockTransportService c1N1 = startTransport( - "cluster_1_node_1", - knownNodes_c1, - VersionInformation.CURRENT, - TransportVersion.current(), - settings - ); - MockTransportService c1N2 = startTransport( - "cluster_1_node_2", - knownNodes_c1, - VersionInformation.CURRENT, - TransportVersion.current(), - settings - ); - MockTransportService c2N1 = startTransport( - "cluster_2_node_1", - knownNodes_c2, - VersionInformation.CURRENT, - TransportVersion.current(), - settings - ); - MockTransportService c2N2 = startTransport( - "cluster_2_node_2", - knownNodes_c2, - VersionInformation.CURRENT, - TransportVersion.current(), - settings - ) + MockTransportService c1N1 = startTransport("cluster_1_node_1", knownNodes_c1, settings); + MockTransportService c1N2 = startTransport("cluster_1_node_2", knownNodes_c1, settings); + MockTransportService c2N1 = startTransport("cluster_2_node_1", knownNodes_c2, settings); + MockTransportService c2N2 = startTransport("cluster_2_node_2", knownNodes_c2, settings) ) { final DiscoveryNode c1N1Node = c1N1.getLocalNode(); final DiscoveryNode c1N2Node = c1N2.getLocalNode(); @@ -891,17 +696,7 @@ public void testCollectNodes() throws InterruptedException, IOException { Collections.shuffle(knownNodes_c1, random()); Collections.shuffle(knownNodes_c2, random()); - try ( - MockTransportService transportService = MockTransportService.createNewService( - settings, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport(settings)) { final var service = transportService.getRemoteClusterService(); assertFalse(hasRegisteredClusters(service)); @@ -1022,31 +817,13 @@ public void onFailure(Exception e) { public void testCollectNodesConcurrentWithSettingsChanges() { final List knownNodes_c1 = new CopyOnWriteArrayList<>(); - try ( - var c1N1 = startTransport( - "cluster_1_node_1", - knownNodes_c1, - VersionInformation.CURRENT, - TransportVersion.current(), - Settings.EMPTY - ) - ) { + try (var c1N1 = startTransport("cluster_1_node_1", knownNodes_c1)) { final var c1N1Node = c1N1.getLocalNode(); knownNodes_c1.add(c1N1Node); final var seedList = List.of(c1N1Node.getAddress().toString()); final var initialSettings = createSettings("cluster_1", seedList); - try ( - var transportService = MockTransportService.createNewService( - initialSettings, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (var transportService = startTransport(initialSettings)) { final var service = transportService.getRemoteClusterService(); assertTrue(hasRegisteredClusters(service)); final var numTasks = between(3, 5); @@ -1199,18 +976,8 @@ public void testRemoteClusterSkipIfDisconnectedSetting() { public void testReconnectWhenStrategySettingsUpdated() throws Exception { List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService cluster_node_0 = startTransport( - "cluster_node_0", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ); - MockTransportService cluster_node_1 = startTransport( - "cluster_node_1", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ) + MockTransportService cluster_node_0 = startTransport("cluster_node_0", knownNodes); + MockTransportService cluster_node_1 = startTransport("cluster_node_1", knownNodes) ) { final DiscoveryNode node0 = cluster_node_0.getLocalNode(); @@ -1222,17 +989,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); final var initialSettings = builder.build(); - try ( - MockTransportService transportService = MockTransportService.createNewService( - initialSettings, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport(initialSettings)) { final var service = transportService.getRemoteClusterService(); assertTrue(hasRegisteredClusters(service)); @@ -1300,30 +1057,12 @@ public static void addConnectionListener( public void testSkipUnavailable() { List knownNodes = new CopyOnWriteArrayList<>(); - try ( - MockTransportService seedTransport = startTransport( - "seed_node", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current() - ) - ) { + try (MockTransportService seedTransport = startTransport("seed_node", knownNodes)) { DiscoveryNode seedNode = seedTransport.getLocalNode(); knownNodes.add(seedNode); Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster1.seeds", seedTransport.getLocalNode().getAddress().toString()); - try ( - MockTransportService service = MockTransportService.createNewService( - builder.build(), - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - service.start(); - service.acceptIncomingRequests(); - + try (MockTransportService service = startTransport(builder.build())) { assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1").orElse(true)); if (randomBoolean()) { @@ -1342,17 +1081,7 @@ public void testRemoteClusterServiceNotEnabledGetRemoteClusterConnection() { .put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .put(Node.NODE_NAME_SETTING.getKey(), "node-1") .build(); - try ( - MockTransportService service = MockTransportService.createNewService( - settings, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - service.start(); - service.acceptIncomingRequests(); + try (MockTransportService service = startTransport("node-1", settings)) { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> service.getRemoteClusterService().getRemoteClusterConnection("test") @@ -1369,8 +1098,8 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(nodeNameSettings) .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)) .build(); - try (RemoteClusterService service = createRemoteClusterService(settingsWithRemoteClusterClientRole)) { - service.ensureClientIsEnabled(); + try (var transportService = startTransport(settingsWithRemoteClusterClientRole)) { + transportService.getRemoteClusterService().ensureClientIsEnabled(); } // Expect throws when missing the remote cluster client role. @@ -1378,7 +1107,8 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(nodeNameSettings) .put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .build(); - try (RemoteClusterService service = createRemoteClusterService(settingsWithoutRemoteClusterClientRole)) { + try (var transportService = startTransport("node-1", settingsWithoutRemoteClusterClientRole)) { + final var service = transportService.getRemoteClusterService(); final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertThat(exception.getMessage(), equalTo("node [node-1] does not have the [remote_cluster_client] role")); } @@ -1389,7 +1119,8 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.INDEX_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledSettingsOnNonSearchNode)) { + try (var transportService = startTransport("node-1", statelessEnabledSettingsOnNonSearchNode)) { + final var service = transportService.getRemoteClusterService(); final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertThat( exception.getMessage(), @@ -1406,24 +1137,24 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeSettings)) { - service.ensureClientIsEnabled(); + try (var transportService = startTransport(statelessEnabledOnSearchNodeSettings)) { + transportService.getRemoteClusterService().ensureClientIsEnabled(); } final var statelessEnabledOnRemoteClusterClientSettings = Settings.builder() .put(nodeNameSettings) .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnRemoteClusterClientSettings)) { - service.ensureClientIsEnabled(); + try (var transportService = startTransport(statelessEnabledOnRemoteClusterClientSettings)) { + transportService.getRemoteClusterService().ensureClientIsEnabled(); } final var statelessEnabledOnSearchNodeAndRemoteClusterClientSettings = Settings.builder() .put(nodeNameSettings) .put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings)) { - service.ensureClientIsEnabled(); + try (var transportService = startTransport(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings)) { + transportService.getRemoteClusterService().ensureClientIsEnabled(); } } @@ -1432,17 +1163,7 @@ public void testRemoteClusterServiceNotEnabledGetCollectNodes() { .put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .put(Node.NODE_NAME_SETTING.getKey(), "node-1") .build(); - try ( - MockTransportService service = MockTransportService.createNewService( - settings, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - service.start(); - service.acceptIncomingRequests(); + try (MockTransportService service = startTransport("node-1", settings)) { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> service.getRemoteClusterService().collectNodes(Set.of(), ActionListener.noop()) @@ -1454,17 +1175,8 @@ public void testRemoteClusterServiceNotEnabledGetCollectNodes() { public void testUseDifferentTransportProfileForCredentialsProtectedRemoteClusters() throws InterruptedException { final List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService c1 = startTransport( - "cluster_1", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - Settings.builder() - .put(RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.getKey(), "true") - .put(RemoteClusterPortSettings.PORT.getKey(), "0") - .build() - ); - MockTransportService c2 = startTransport("cluster_2", knownNodes, VersionInformation.CURRENT, TransportVersion.current()); + MockTransportService c1 = startTransport("cluster_1", knownNodes, REMOTE_CLUSTER_SERVER_ENABLED_SETTINGS); + MockTransportService c2 = startTransport("cluster_2", knownNodes, REMOTE_CLUSTER_SERVER_ENABLED_SETTINGS) ) { final DiscoveryNode c1Node = c1.getLocalNode().withTransportAddress(c1.boundRemoteAccessAddress().publishAddress()); final DiscoveryNode c2Node = c2.getLocalNode(); @@ -1541,30 +1253,9 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfile() throws InterruptedException { final List knownNodes = new CopyOnWriteArrayList<>(); - try ( - MockTransportService c = startTransport( - "cluster_1", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - Settings.builder() - .put(RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.getKey(), "true") - .put(RemoteClusterPortSettings.PORT.getKey(), "0") - .build() - ) - ) { + try (MockTransportService c = startTransport("cluster_1", knownNodes, REMOTE_CLUSTER_SERVER_ENABLED_SETTINGS)) { final DiscoveryNode discoNode = c.getLocalNode().withTransportAddress(c.boundRemoteAccessAddress().publishAddress()); - try ( - MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null - ) - ) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (MockTransportService transportService = startTransport()) { final var service = transportService.getRemoteClusterService(); final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString()); final CountDownLatch latch = new CountDownLatch(1); @@ -1607,28 +1298,9 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi } public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespiteFailures() throws InterruptedException { - final List knownNodes = new CopyOnWriteArrayList<>(); try ( - MockTransportService c1 = startTransport( - "cluster_1", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - Settings.builder() - .put(RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.getKey(), "true") - .put(RemoteClusterPortSettings.PORT.getKey(), "0") - .build() - ); - MockTransportService c2 = startTransport( - "cluster_2", - knownNodes, - VersionInformation.CURRENT, - TransportVersion.current(), - Settings.builder() - .put(RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.getKey(), "true") - .put(RemoteClusterPortSettings.PORT.getKey(), "0") - .build() - ) + MockTransportService c1 = startTransport("cluster_1", REMOTE_CLUSTER_SERVER_ENABLED_SETTINGS); + MockTransportService c2 = startTransport("cluster_2", REMOTE_CLUSTER_SERVER_ENABLED_SETTINGS) ) { final DiscoveryNode c1DiscoNode = c1.getLocalNode().withTransportAddress(c1.boundRemoteAccessAddress().publishAddress()); final DiscoveryNode c2DiscoNode = c2.getLocalNode().withTransportAddress(c2.boundRemoteAccessAddress().publishAddress()); @@ -1729,23 +1401,14 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite } public void testCorrectTransportProfileUsedWhenCPSEnabled() { - final var versionInfo = VersionInformation.CURRENT; - final var transportVers = TransportVersion.current(); final var knownNodes = new CopyOnWriteArrayList(); - final var linkedTransportServiceSettings = Settings.builder() - .put(RemoteClusterPortSettings.REMOTE_CLUSTER_SERVER_ENABLED.getKey(), "true") - .put(RemoteClusterPortSettings.PORT.getKey(), "0") - .build(); - - try (var seedTransport = startTransport("seed_node", knownNodes, versionInfo, transportVers, linkedTransportServiceSettings)) { + try (var seedTransport = startTransport("seed_node", knownNodes, REMOTE_CLUSTER_SERVER_ENABLED_SETTINGS)) { knownNodes.add(seedTransport.getLocalNode()); final var settings = Settings.builder() .putList("cluster.remote.cluster1.seeds", seedTransport.getLocalNode().getAddress().toString()) .put("serverless.cross_project.enabled", true) .build(); - try (var transportService = MockTransportService.createNewService(settings, versionInfo, transportVers, threadPool)) { - transportService.start(); - transportService.acceptIncomingRequests(); + try (var transportService = startTransport(settings)) { final var service = transportService.getRemoteClusterService(); assertTrue(hasRegisteredClusters(service)); assertConnectionHasProfile( @@ -1778,7 +1441,7 @@ private Settings buildRemoteClusterSettings(String clusterAlias, String address) public void testLogsConnectionResult() { final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(); try ( - var remote = startTransport("remote", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); + var remote = startTransport("remote"); var local = MockTransportService.createNewService( Settings.EMPTY, VersionInformation.CURRENT,