From 345acad85c52814a0c3d1ee5ba8ad890ff87f310 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Thu, 30 Oct 2025 17:12:09 -0400 Subject: [PATCH 1/7] Add connection failure metrics in RemoteConnectionStrategy This change registers counters for initial and reconnect attempt failures. The change also required minor refactoring to make the metrics registry available from the TransportService that is passed to the RemoteConnectionStrategy constructor. This change builds on the work done in #134415. Resolves: ES-12695 --- .../elasticsearch/node/NodeConstruction.java | 2 +- .../node/NodeServiceProvider.java | 4 ++- .../transport/RemoteConnectionStrategy.java | 28 ++++++++++++++++++- .../transport/TransportService.java | 9 ++++++ .../RemoteConnectionStrategyTests.java | 28 +++++++++++++++---- .../java/org/elasticsearch/node/MockNode.java | 6 ++-- .../test/transport/MockTransportService.java | 18 ++++++++++++ 7 files changed, 85 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 6ae1ea9c9c6c8..48c35f4c299f7 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -1154,7 +1154,7 @@ public Map queryFields() { localNodeFactory, settingsModule.getClusterSettings(), taskManager, - telemetryProvider.getTracer(), + telemetryProvider, nodeEnvironment.nodeId(), linkedProjectConfigService, projectResolver diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 0b8c59b3c1957..1e7285169d826 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -40,6 +40,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ClusterConnectionManager; @@ -120,7 +121,7 @@ TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, TaskManager taskManager, - Tracer tracer, + TelemetryProvider telemetryProvider, String nodeId, LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver @@ -135,6 +136,7 @@ TransportService newTransportService( new ClusterConnectionManager(settings, transport, threadPool.getThreadContext()), taskManager, linkedProjectConfigService, + telemetryProvider, projectResolver ); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 9cde5c086ab39..729be2cafef14 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -20,6 +20,8 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.core.Nullable; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.LongCounter; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; @@ -79,6 +81,11 @@ public Writeable.Reader getReader() { private List> listeners = new ArrayList<>(); private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false); + static final String INITIAL_CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.initial.error.total"; + static final String RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.reconnect.error.total"; + private static LongCounter initialConnectionAttemptFailures; + private static LongCounter reconnectAttemptFailures; + protected final TransportService transportService; protected final RemoteConnectionManager connectionManager; protected final ProjectId originProjectId; @@ -92,9 +99,27 @@ public Writeable.Reader getReader() { this.transportService = transportService; this.connectionManager = connectionManager; this.maxPendingConnectionListeners = config.maxPendingConnectionListeners(); + registerMetrics(transportService.getTelemetryProvider()); connectionManager.addListener(this); } + private static synchronized void registerMetrics(TelemetryProvider telemetryProvider) { + final var meterRegistry = telemetryProvider == null ? null : telemetryProvider.getMeterRegistry(); + if (initialConnectionAttemptFailures != null || meterRegistry == null) { + return; + } + initialConnectionAttemptFailures = meterRegistry.registerLongCounter( + INITIAL_CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, + "linked project initial connection attempt failure count", + "count" + ); + reconnectAttemptFailures = meterRegistry.registerLongCounter( + RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, + "linked project reconnection attempt failure count", + "count" + ); + } + static ConnectionProfile buildConnectionProfile(LinkedProjectConfig config, String transportProfile) { ConnectionProfile.Builder builder = new ConnectionProfile.Builder().setConnectTimeout(config.transportConnectTimeout()) .setHandshakeTimeout(config.transportConnectTimeout()) @@ -221,7 +246,8 @@ private void connectionAttemptCompleted(@Nullable Exception e) { logger.debug(msgSupplier); } else { logger.warn(msgSupplier, e); - // TODO: ES-12695: Increment either the initial or retry connection failure metric. + final var counter = isInitialAttempt ? initialConnectionAttemptFailures : reconnectAttemptFailures; + counter.increment(); } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 785a56a41ddbe..f8d2ec9d0adac 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -50,6 +50,7 @@ import org.elasticsearch.node.ReportingService; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -140,6 +141,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { volatile String[] tracerLogExclude; private final LinkedProjectConfigService linkedProjectConfigService; + private final TelemetryProvider telemetryProvider; private final RemoteClusterService remoteClusterService; /** @@ -277,6 +279,7 @@ public TransportService( connectionManager, taskManger, new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE), + TelemetryProvider.NOOP, DefaultProjectResolver.INSTANCE ); } @@ -292,6 +295,7 @@ public TransportService( ConnectionManager connectionManager, TaskManager taskManger, LinkedProjectConfigService linkedProjectConfigService, + TelemetryProvider telemetryProvider, ProjectResolver projectResolver ) { this.transport = transport; @@ -308,6 +312,7 @@ public TransportService( this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.enableStackOverflowAvoidance = ENABLE_STACK_OVERFLOW_AVOIDANCE.get(settings); this.linkedProjectConfigService = linkedProjectConfigService; + this.telemetryProvider = telemetryProvider; remoteClusterService = new RemoteClusterService(settings, this, projectResolver); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { @@ -354,6 +359,10 @@ void setTracerLogExclude(List tracerLogExclude) { this.tracerLogExclude = tracerLogExclude.toArray(Strings.EMPTY_ARRAY); } + public TelemetryProvider getTelemetryProvider() { + return telemetryProvider; + } + @Override protected void doStart() { transport.setMessageListener(this); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 0a5cfd670dcf6..3a50700c6865a 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -20,6 +20,8 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.telemetry.InstrumentType; +import org.elasticsearch.telemetry.RecordingMeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.EnumSerializationTestUtils; import org.elasticsearch.test.MockLog; @@ -34,6 +36,8 @@ import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS; import static org.elasticsearch.transport.RemoteClusterSettings.toConfig; import static org.elasticsearch.transport.RemoteConnectionStrategy.buildConnectionProfile; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.mockito.Mockito.mock; public class RemoteConnectionStrategyTests extends ESTestCase { @@ -194,7 +198,7 @@ public void testConnectionStrategySerialization() { value = "org.elasticsearch.transport.RemoteConnectionStrategyTests.FakeConnectionStrategy:DEBUG", reason = "logging verification" ) - public void testConnectionAttemptLogging() { + public void testConnectionAttemptMetricsAndLogging() { final var originProjectId = randomUniqueProjectId(); final var linkedProjectId = randomUniqueProjectId(); final var alias = randomAlphanumericOfLength(10); @@ -208,8 +212,13 @@ public void testConnectionAttemptLogging() { new ClusterConnectionManager(TestProfiles.LIGHT_PROFILE, mock(Transport.class), threadContext) ) ) { + assert transportService.getTelemetryProvider() != null; + final var meterRegistry = transportService.getTelemetryProvider().getMeterRegistry(); + assert meterRegistry instanceof RecordingMeterRegistry; + final var metricRecorder = ((RecordingMeterRegistry) meterRegistry).getRecorder(); + for (boolean shouldConnectFail : new boolean[] { true, false }) { - for (boolean isIntialConnectAttempt : new boolean[] { true, false }) { + for (boolean isInitialConnectAttempt : new boolean[] { true, false }) { final var strategy = new FakeConnectionStrategy( originProjectId, linkedProjectId, @@ -217,7 +226,7 @@ public void testConnectionAttemptLogging() { transportService, connectionManager ); - if (isIntialConnectAttempt == false) { + if (isInitialConnectAttempt == false) { waitForConnect(strategy); } strategy.setShouldConnectFail(shouldConnectFail); @@ -228,7 +237,7 @@ public void testConnectionAttemptLogging() { shouldConnectFail ? "failed to connect" : "successfully connected", linkedProjectId, alias, - isIntialConnectAttempt ? "the initial connection" : "a reconnection" + isInitialConnectAttempt ? "the initial connection" : "a reconnection" ); assertThatLogger(() -> { if (shouldConnectFail) { @@ -243,12 +252,21 @@ public void testConnectionAttemptLogging() { + expectedLogLevel + " after a " + (shouldConnectFail ? "failed" : "successful") - + (isIntialConnectAttempt ? " initial connection attempt" : " reconnection attempt"), + + (isInitialConnectAttempt ? " initial connection attempt" : " reconnection attempt"), strategy.getClass().getCanonicalName(), expectedLogLevel, expectedLogMessage ) ); + if (shouldConnectFail) { + metricRecorder.collect(); + final var counterName = isInitialConnectAttempt + ? RemoteConnectionStrategy.INITIAL_CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME + : RemoteConnectionStrategy.RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME; + final var measurements = metricRecorder.getMeasurements(InstrumentType.LONG_COUNTER, counterName); + assertThat(measurements, hasSize(1)); + assertThat(measurements.getFirst().getLong(), equalTo(1L)); + } } } } diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index e7668269c0073..aa9f9c8de5ace 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -45,6 +45,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockHttpTransport; @@ -174,7 +175,7 @@ protected TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, TaskManager taskManager, - Tracer tracer, + TelemetryProvider telemetryProvider, String nodeId, LinkedProjectConfigService linkedProjectConfigService, ProjectResolver projectResolver @@ -194,7 +195,7 @@ protected TransportService newTransportService( localNodeFactory, clusterSettings, taskManager, - tracer, + telemetryProvider, nodeId, linkedProjectConfigService, projectResolver @@ -209,6 +210,7 @@ protected TransportService newTransportService( clusterSettings, MockTransportService.createTaskManager(settings, threadPool, taskManager.getTaskHeaders(), Tracer.NOOP, nodeId), linkedProjectConfigService, + telemetryProvider, projectResolver ); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 2c0a3dd267638..2c1ea9d329162 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -46,6 +46,9 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchModule; import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.telemetry.RecordingMeterRegistry; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; @@ -264,6 +267,19 @@ public MockTransportService( clusterSettings, createTaskManager(settings, threadPool, taskHeaders, Tracer.NOOP, nodeId), new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, DefaultProjectResolver.INSTANCE), + new TelemetryProvider() { + final MeterRegistry meterRegistry = new RecordingMeterRegistry(); + + @Override + public Tracer getTracer() { + return Tracer.NOOP; + } + + @Override + public MeterRegistry getMeterRegistry() { + return meterRegistry; + } + }, DefaultProjectResolver.INSTANCE ); } @@ -277,6 +293,7 @@ public MockTransportService( @Nullable ClusterSettings clusterSettings, TaskManager taskManager, LinkedProjectConfigService linkedProjectConfigService, + TelemetryProvider telemetryProvider, ProjectResolver projectResolver ) { super( @@ -289,6 +306,7 @@ public MockTransportService( new StubbableConnectionManager(new ClusterConnectionManager(settings, transport, threadPool.getThreadContext())), taskManager, linkedProjectConfigService, + telemetryProvider, projectResolver ); this.original = transport.getDelegate(); From fdd31bd2b36fe8b160eafc51de15b304c5d37f33 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Fri, 31 Oct 2025 17:05:40 -0400 Subject: [PATCH 2/7] Include linked project ID and alias as attributes --- .../transport/RemoteConnectionStrategy.java | 3 ++- .../transport/RemoteConnectionStrategyTests.java | 9 ++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index 729be2cafef14..eadec94a35a3d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -247,7 +248,7 @@ private void connectionAttemptCompleted(@Nullable Exception e) { } else { logger.warn(msgSupplier, e); final var counter = isInitialAttempt ? initialConnectionAttemptFailures : reconnectAttemptFailures; - counter.increment(); + counter.incrementBy(1, Map.of("linked_project_id", linkedProjectId.toString(), "linked_project_alias", clusterAlias)); } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 3a50700c6865a..0601e73abf5fc 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -30,6 +30,8 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Set; + import static org.elasticsearch.test.MockLog.assertThatLogger; import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS; import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE; @@ -265,7 +267,12 @@ public void testConnectionAttemptMetricsAndLogging() { : RemoteConnectionStrategy.RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME; final var measurements = metricRecorder.getMeasurements(InstrumentType.LONG_COUNTER, counterName); assertThat(measurements, hasSize(1)); - assertThat(measurements.getFirst().getLong(), equalTo(1L)); + final var measurement = measurements.getFirst(); + assertThat(measurement.getLong(), equalTo(1L)); + final var attributes = measurement.attributes(); + assertThat(attributes.keySet(), equalTo(Set.of("linked_project_id", "linked_project_alias"))); + assertThat(attributes.get("linked_project_id"), equalTo(linkedProjectId.toString())); + assertThat(attributes.get("linked_project_alias"), equalTo(alias)); } } } From 4f6263ddbd25d7fa121fec5ba04406e4066c34ba Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Wed, 5 Nov 2025 15:46:46 -0500 Subject: [PATCH 3/7] Remove extra RemoteClusterService instances --- .../transport/RemoteClusterServiceTests.java | 1389 ++++++++--------- 1 file changed, 667 insertions(+), 722 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 2cd195cb4a8ff..c2a59fda3a71b 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -73,7 +73,6 @@ public class RemoteClusterServiceTests extends ESTestCase { private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); private final ProjectResolver projectResolver = DefaultProjectResolver.INSTANCE; - private LinkedProjectConfigService linkedProjectConfigService = null; @Override public void tearDown() throws Exception { @@ -81,17 +80,8 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } - private RemoteClusterService createRemoteClusterService(Settings settings, MockTransportService transportService) { - return createRemoteClusterService(settings, ClusterSettings.createBuiltInClusterSettings(), transportService); - } - - private RemoteClusterService createRemoteClusterService( - Settings settings, - ClusterSettings clusterSettings, - MockTransportService transportService - ) { - linkedProjectConfigService = new ClusterSettingsLinkedProjectConfigService(settings, clusterSettings, projectResolver); - return new RemoteClusterService(settings, transportService, projectResolver); + private RemoteClusterService createRemoteClusterService(Settings settings) { + return new RemoteClusterService(settings, null, projectResolver); } private MockTransportService startTransport( @@ -156,7 +146,7 @@ public void testRemoteClusterSeedSetting() { assertEquals("failed to parse port", e.getMessage()); } - public void testGroupClusterIndices() throws IOException { + public void testGroupClusterIndices() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport( @@ -177,10 +167,13 @@ public void testGroupClusterIndices() throws IOException { knownNodes.add(cluster1Transport.getLocalNode()); knownNodes.add(cluster2Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try ( MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, + builder.build(), VersionInformation.CURRENT, TransportVersion.current(), threadPool, @@ -189,188 +182,182 @@ public void testGroupClusterIndices() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - assertFalse(isRemoteClusterRegistered(service, "foo")); - { - Map> perClusterIndices = service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - new String[] { - "cluster_1:bar", - "cluster_2:foo:bar", - "cluster_1:test", - "cluster_2:foo*", - "foo", - "cluster*:baz", - "*:boo" } - ); - List localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - assertNotNull(localIndices); - assertEquals("foo", localIndices.get(0)); - assertEquals(2, perClusterIndices.size()); - assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1")); - assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2")); - } - - expectThrows( - NoSuchRemoteClusterException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - new String[] { "foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo" } - ) + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteClusterRegistered(service, "foo")); + { + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + new String[] { + "cluster_1:bar", + "cluster_2:foo:bar", + "cluster_1:test", + "cluster_2:foo*", + "foo", + "cluster*:baz", + "*:boo" } ); + List localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertNotNull(localIndices); + assertEquals("foo", localIndices.get(0)); + assertEquals(2, perClusterIndices.size()); + assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1")); + assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2")); + } - expectThrows( - NoSuchRemoteClusterException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - new String[] { "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "does_not_exist:*" } - ) - ); + expectThrows( + NoSuchRemoteClusterException.class, + () -> service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + new String[] { "foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo" } + ) + ); - // test cluster exclusions - { - String[] indices = shuffledList(List.of("cluster*:foo*", "foo", "-cluster_1:*", "*:boo")).toArray(new String[0]); + expectThrows( + NoSuchRemoteClusterException.class, + () -> service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + new String[] { "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "does_not_exist:*" } + ) + ); - Map> perClusterIndices = service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - indices - ); - assertEquals(2, perClusterIndices.size()); - List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - assertNotNull(localIndexes); - assertEquals(1, localIndexes.size()); - assertEquals("foo", localIndexes.get(0)); - - List cluster2 = perClusterIndices.get("cluster_2"); - assertNotNull(cluster2); - assertEquals(2, cluster2.size()); - assertEquals(List.of("boo", "foo*"), cluster2.stream().sorted().toList()); - } - { - String[] indices = shuffledList(List.of("*:*", "-clu*_1:*", "*:boo")).toArray(new String[0]); + // test cluster exclusions + { + String[] indices = shuffledList(List.of("cluster*:foo*", "foo", "-cluster_1:*", "*:boo")).toArray(new String[0]); - Map> perClusterIndices = service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - indices - ); - assertEquals(1, perClusterIndices.size()); + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + indices + ); + assertEquals(2, perClusterIndices.size()); + List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertNotNull(localIndexes); + assertEquals(1, localIndexes.size()); + assertEquals("foo", localIndexes.get(0)); + + List cluster2 = perClusterIndices.get("cluster_2"); + assertNotNull(cluster2); + assertEquals(2, cluster2.size()); + assertEquals(List.of("boo", "foo*"), cluster2.stream().sorted().toList()); + } + { + String[] indices = shuffledList(List.of("*:*", "-clu*_1:*", "*:boo")).toArray(new String[0]); - List cluster2 = perClusterIndices.get("cluster_2"); - assertNotNull(cluster2); - assertEquals(2, cluster2.size()); - assertEquals(List.of("*", "boo"), cluster2.stream().sorted().toList()); - } - { - String[] indices = shuffledList(List.of("cluster*:foo*", "cluster_2:*", "foo", "-cluster_1:*", "-c*:*")).toArray( - new String[0] - ); + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + indices + ); + assertEquals(1, perClusterIndices.size()); - Map> perClusterIndices = service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - indices - ); - assertEquals(1, perClusterIndices.size()); - List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - assertNotNull(localIndexes); - assertEquals(1, localIndexes.size()); - assertEquals("foo", localIndexes.get(0)); - } - { - String[] indices = shuffledList(List.of("cluster*:*", "foo", "-*:*")).toArray(new String[0]); + List cluster2 = perClusterIndices.get("cluster_2"); + assertNotNull(cluster2); + assertEquals(2, cluster2.size()); + assertEquals(List.of("*", "boo"), cluster2.stream().sorted().toList()); + } + { + String[] indices = shuffledList(List.of("cluster*:foo*", "cluster_2:*", "foo", "-cluster_1:*", "-c*:*")).toArray( + new String[0] + ); - Map> perClusterIndices = service.groupClusterIndices( + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + indices + ); + assertEquals(1, perClusterIndices.size()); + List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertNotNull(localIndexes); + assertEquals(1, localIndexes.size()); + assertEquals("foo", localIndexes.get(0)); + } + { + String[] indices = shuffledList(List.of("cluster*:*", "foo", "-*:*")).toArray(new String[0]); + + Map> perClusterIndices = service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + indices + ); + assertEquals(1, perClusterIndices.size()); + List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + assertNotNull(localIndexes); + assertEquals(1, localIndexes.size()); + assertEquals("foo", localIndexes.get(0)); + } + { + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices( service.getRegisteredRemoteClusterNames(), - indices - ); - assertEquals(1, perClusterIndices.size()); - List localIndexes = perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - assertNotNull(localIndexes); - assertEquals(1, localIndexes.size()); - assertEquals("foo", localIndexes.get(0)); - } - { - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - // -cluster_1:foo* is not allowed, only -cluster_1:* - new String[] { "cluster_1:bar", "-cluster_2:foo*", "cluster_1:test", "cluster_2:foo*", "foo" } - ) - ); - assertThat( - e.getMessage(), - equalTo("To exclude a cluster you must specify the '*' wildcard for the index expression, but found: [foo*]") - ); - } - { - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - // -cluster_1:* will fail since cluster_1 was never included in order to qualify to be excluded - new String[] { "-cluster_1:*", "cluster_2:foo*", "foo" } - ) - ); - assertThat( - e.getMessage(), - equalTo( - "Attempt to exclude cluster [cluster_1] failed as it is not included in the list of clusters to " - + "be included: [(local), cluster_2]. Input: [-cluster_1:*,cluster_2:foo*,foo]" - ) - ); - } - { - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), new String[] { "-cluster_1:*" }) - ); - assertThat( - e.getMessage(), - equalTo( - "Attempt to exclude cluster [cluster_1] failed as it is not included in the list of clusters to " - + "be included: []. Input: [-cluster_1:*]" - ) - ); - } - { - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), new String[] { "-*:*" }) - ); - assertThat( - e.getMessage(), - equalTo( - "Attempt to exclude clusters [cluster_1, cluster_2] failed as they are not included in the list of " - + "clusters to be included: []. Input: [-*:*]" - ) - ); - } - { - String[] indices = shuffledList(List.of("cluster*:*", "*:foo", "-*:*")).toArray(new String[0]); + // -cluster_1:foo* is not allowed, only -cluster_1:* + new String[] { "cluster_1:bar", "-cluster_2:foo*", "cluster_1:test", "cluster_2:foo*", "foo" } + ) + ); + assertThat( + e.getMessage(), + equalTo("To exclude a cluster you must specify the '*' wildcard for the index expression, but found: [foo*]") + ); + } + { + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + // -cluster_1:* will fail since cluster_1 was never included in order to qualify to be excluded + new String[] { "-cluster_1:*", "cluster_2:foo*", "foo" } + ) + ); + assertThat( + e.getMessage(), + equalTo( + "Attempt to exclude cluster [cluster_1] failed as it is not included in the list of clusters to " + + "be included: [(local), cluster_2]. Input: [-cluster_1:*,cluster_2:foo*,foo]" + ) + ); + } + { + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), new String[] { "-cluster_1:*" }) + ); + assertThat( + e.getMessage(), + equalTo( + "Attempt to exclude cluster [cluster_1] failed as it is not included in the list of clusters to " + + "be included: []. Input: [-cluster_1:*]" + ) + ); + } + { + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), new String[] { "-*:*" }) + ); + assertThat( + e.getMessage(), + equalTo( + "Attempt to exclude clusters [cluster_1, cluster_2] failed as they are not included in the list of " + + "clusters to be included: []. Input: [-*:*]" + ) + ); + } + { + String[] indices = shuffledList(List.of("cluster*:*", "*:foo", "-*:*")).toArray(new String[0]); - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), indices) - ); - assertThat( - e.getMessage(), - containsString("The '-' exclusions in the index expression list excludes all indexes. Nothing to search.") - ); - } + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> service.groupClusterIndices(service.getRegisteredRemoteClusterNames(), indices) + ); + assertThat( + e.getMessage(), + containsString("The '-' exclusions in the index expression list excludes all indexes. Nothing to search.") + ); } } } } - public void testGroupIndices() throws IOException { + public void testGroupIndices() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport( @@ -391,10 +378,13 @@ public void testGroupIndices() throws IOException { knownNodes.add(cluster1Transport.getLocalNode()); knownNodes.add(cluster2Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try ( MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, + builder.build(), VersionInformation.CURRENT, TransportVersion.current(), threadPool, @@ -403,62 +393,53 @@ public void testGroupIndices() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - assertFalse(isRemoteClusterRegistered(service, "foo")); - { - Map perClusterIndices = service.groupIndices( - IndicesOptions.LENIENT_EXPAND_OPEN, - new String[] { - "cluster_1:bar", - "cluster_2:foo:bar", - "cluster_1:test", - "cluster_2:foo*", - "foo", - "cluster*:baz", - "*:boo" } - ); - assertEquals(3, perClusterIndices.size()); - assertArrayEquals( - new String[] { "foo" }, - perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices() - ); - assertArrayEquals(new String[] { "bar", "test", "baz", "boo" }, perClusterIndices.get("cluster_1").indices()); - assertArrayEquals(new String[] { "foo:bar", "foo*", "baz", "boo" }, perClusterIndices.get("cluster_2").indices()); - } - { - expectThrows( - NoSuchRemoteClusterException.class, - () -> service.groupClusterIndices( - service.getRegisteredRemoteClusterNames(), - new String[] { "foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo" } - ) - ); - } - { - Map perClusterIndices = service.groupIndices( - IndicesOptions.LENIENT_EXPAND_OPEN, - new String[] { "cluster_1:bar", "cluster_2:foo*" } - ); - assertEquals(2, perClusterIndices.size()); - assertArrayEquals(new String[] { "bar" }, perClusterIndices.get("cluster_1").indices()); - assertArrayEquals(new String[] { "foo*" }, perClusterIndices.get("cluster_2").indices()); - } - { - Map perClusterIndices = service.groupIndices( - IndicesOptions.LENIENT_EXPAND_OPEN, - Strings.EMPTY_ARRAY - ); - assertEquals(1, perClusterIndices.size()); - assertArrayEquals(Strings.EMPTY_ARRAY, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices()); - } + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteClusterRegistered(service, "foo")); + { + Map perClusterIndices = service.groupIndices( + IndicesOptions.LENIENT_EXPAND_OPEN, + new String[] { + "cluster_1:bar", + "cluster_2:foo:bar", + "cluster_1:test", + "cluster_2:foo*", + "foo", + "cluster*:baz", + "*:boo" } + ); + assertEquals(3, perClusterIndices.size()); + assertArrayEquals(new String[] { "foo" }, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices()); + assertArrayEquals(new String[] { "bar", "test", "baz", "boo" }, perClusterIndices.get("cluster_1").indices()); + assertArrayEquals(new String[] { "foo:bar", "foo*", "baz", "boo" }, perClusterIndices.get("cluster_2").indices()); + } + { + expectThrows( + NoSuchRemoteClusterException.class, + () -> service.groupClusterIndices( + service.getRegisteredRemoteClusterNames(), + new String[] { "foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo" } + ) + ); + } + { + Map perClusterIndices = service.groupIndices( + IndicesOptions.LENIENT_EXPAND_OPEN, + new String[] { "cluster_1:bar", "cluster_2:foo*" } + ); + assertEquals(2, perClusterIndices.size()); + assertArrayEquals(new String[] { "bar" }, perClusterIndices.get("cluster_1").indices()); + assertArrayEquals(new String[] { "foo*" }, perClusterIndices.get("cluster_2").indices()); + } + { + Map perClusterIndices = service.groupIndices( + IndicesOptions.LENIENT_EXPAND_OPEN, + Strings.EMPTY_ARRAY + ); + assertEquals(1, perClusterIndices.size()); + assertArrayEquals(Strings.EMPTY_ARRAY, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices()); } } } @@ -469,7 +450,7 @@ public void testGroupIndicesWithoutRemoteClusterClientRole() throws Exception { Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "node-1").build(), Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE) ); - try (RemoteClusterService service = createRemoteClusterService(settings, null)) { + try (RemoteClusterService service = createRemoteClusterService(settings)) { expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertFalse(hasRegisteredClusters(service)); final IllegalArgumentException error = expectThrows( @@ -480,7 +461,7 @@ public void testGroupIndicesWithoutRemoteClusterClientRole() throws Exception { } } - public void testIncrementallyAddClusters() throws IOException { + public void testIncrementallyAddClusters() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport( @@ -513,50 +494,41 @@ public void testIncrementallyAddClusters() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertFalse(hasRegisteredClusters(service)); - Settings cluster1Settings = createSettings( - "cluster_1", - Collections.singletonList(cluster1Seed.getAddress().toString()) - ); - PlainActionFuture clusterAdded = new PlainActionFuture<>(); - // Add the cluster on a different thread to test that we wait for a new cluster to - // connect before returning. - new Thread(() -> { - try { - service.updateLinkedProject(toConfig("cluster_1", cluster1Settings)); - clusterAdded.onResponse(null); - } catch (Exception e) { - clusterAdded.onFailure(e); - } - }).start(); - clusterAdded.actionGet(); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - Settings cluster2Settings = createSettings( - "cluster_2", - Collections.singletonList(cluster2Seed.getAddress().toString()) - ); - service.updateLinkedProject(toConfig("cluster_2", cluster2Settings)); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList()); - service.updateLinkedProject(toConfig("cluster_2", cluster2SettingsDisabled)); - assertFalse(isRemoteClusterRegistered(service, "cluster_2")); - IllegalArgumentException iae = expectThrows( - IllegalArgumentException.class, - () -> service.updateLinkedProject(toConfig(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)) - ); - assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); - } + final var service = transportService.getRemoteClusterService(); + assertFalse(hasRegisteredClusters(service)); + Settings cluster1Settings = createSettings("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString())); + PlainActionFuture clusterAdded = new PlainActionFuture<>(); + // Add the cluster on a different thread to test that we wait for a new cluster to + // connect before returning. + new Thread(() -> { + try { + service.updateLinkedProject(toConfig("cluster_1", cluster1Settings)); + clusterAdded.onResponse(null); + } catch (Exception e) { + clusterAdded.onFailure(e); + } + }).start(); + clusterAdded.actionGet(); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + Settings cluster2Settings = createSettings("cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString())); + service.updateLinkedProject(toConfig("cluster_2", cluster2Settings)); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + Settings cluster2SettingsDisabled = createSettings("cluster_2", Collections.emptyList()); + service.updateLinkedProject(toConfig("cluster_2", cluster2SettingsDisabled)); + assertFalse(isRemoteClusterRegistered(service, "cluster_2")); + IllegalArgumentException iae = expectThrows( + IllegalArgumentException.class, + () -> service.updateLinkedProject(toConfig(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY, Settings.EMPTY)) + ); + assertEquals("remote clusters must not have the empty string as its key", iae.getMessage()); } } } - public void testDefaultPingSchedule() throws IOException { + public void testDefaultPingSchedule() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService seedTransport = startTransport( @@ -589,23 +561,20 @@ public void testDefaultPingSchedule() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - final var newSettings = createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); - final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); - service.updateLinkedProject(toConfig("cluster_1", mergedSettings)); - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); - assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getConnectionProfile().getPingInterval()); - } + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); + final var newSettings = createSettings("cluster_1", Collections.singletonList(seedNode.getAddress().toString())); + final var mergedSettings = Settings.builder().put(settings, false).put(newSettings, false).build(); + service.updateLinkedProject(toConfig("cluster_1", mergedSettings)); + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getConnectionProfile().getPingInterval()); } } } - public void testCustomPingSchedule() throws IOException { + public void testCustomPingSchedule() { List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService cluster1Transport = startTransport( @@ -626,12 +595,19 @@ public void testCustomPingSchedule() throws IOException { knownNodes.add(cluster1Transport.getLocalNode()); knownNodes.add(cluster2Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); - Settings.Builder settingsBuilder = Settings.builder(); + Settings.Builder builder = Settings.builder(); if (randomBoolean()) { - settingsBuilder.put(TransportSettings.PING_SCHEDULE.getKey(), TimeValue.timeValueSeconds(randomIntBetween(1, 10))); + builder.put(TransportSettings.PING_SCHEDULE.getKey(), TimeValue.timeValueSeconds(randomIntBetween(1, 10))); } - Settings transportSettings = settingsBuilder.build(); - + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); + TimeValue pingSchedule1 = // randomBoolean() ? TimeValue.MINUS_ONE : + TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1); + TimeValue pingSchedule2 = // randomBoolean() ? TimeValue.MINUS_ONE : + TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); + Settings transportSettings = builder.build(); try ( MockTransportService transportService = MockTransportService.createNewService( transportSettings, @@ -643,23 +619,12 @@ public void testCustomPingSchedule() throws IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); - TimeValue pingSchedule1 = // randomBoolean() ? TimeValue.MINUS_ONE : - TimeValue.timeValueSeconds(randomIntBetween(1, 10)); - builder.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule1); - TimeValue pingSchedule2 = // randomBoolean() ? TimeValue.MINUS_ONE : - TimeValue.timeValueSeconds(randomIntBetween(1, 10)); - builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); - try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - initializeRemoteClusters(service); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); - assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); - RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); - assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getConnectionProfile().getPingInterval()); - } + final var service = transportService.getRemoteClusterService(); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); + RemoteClusterConnection remoteClusterConnection2 = service.getRemoteClusterConnection("cluster_2"); + assertEquals(pingSchedule2, remoteClusterConnection2.getConnectionManager().getConnectionProfile().getPingInterval()); } } } @@ -677,10 +642,12 @@ public void testChangeSettings() throws Exception { DiscoveryNode cluster1Seed = cluster1Transport.getLocalNode(); knownNodes.add(cluster1Transport.getLocalNode()); Collections.shuffle(knownNodes, random()); + Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); try ( MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, + builder.build(), VersionInformation.CURRENT, TransportVersion.current(), threadPool, @@ -689,41 +656,37 @@ public void testChangeSettings() throws Exception { ) { transportService.start(); transportService.acceptIncomingRequests(); - Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - try (RemoteClusterService service = createRemoteClusterService(builder.build(), transportService)) { - initializeRemoteClusters(service); - RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); - Settings.Builder settingsChange = Settings.builder(); - TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); - settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule); - boolean compressionScheme = randomBoolean(); - Compression.Enabled enabledChange = randomFrom(Compression.Enabled.TRUE, Compression.Enabled.FALSE); - if (compressionScheme) { - settingsChange.put("cluster.remote.cluster_1.transport.compression_scheme", Compression.Scheme.DEFLATE); - } else { - settingsChange.put("cluster.remote.cluster_1.transport.compress", enabledChange); - } - settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); - service.updateLinkedProject(toConfig("cluster_1", settingsChange.build())); - assertBusy(remoteClusterConnection::isClosed); - - remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); - ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); - assertEquals(pingSchedule, connectionProfile.getPingInterval()); - if (compressionScheme) { - assertEquals(Compression.Enabled.INDEXING_DATA, connectionProfile.getCompressionEnabled()); - assertEquals(Compression.Scheme.DEFLATE, connectionProfile.getCompressionScheme()); - } else { - assertEquals(enabledChange, connectionProfile.getCompressionEnabled()); - assertEquals(Compression.Scheme.LZ4, connectionProfile.getCompressionScheme()); - } + final var service = transportService.getRemoteClusterService(); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + Settings.Builder settingsChange = Settings.builder(); + TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); + settingsChange.put("cluster.remote.cluster_1.transport.ping_schedule", pingSchedule); + boolean compressionScheme = randomBoolean(); + Compression.Enabled enabledChange = randomFrom(Compression.Enabled.TRUE, Compression.Enabled.FALSE); + if (compressionScheme) { + settingsChange.put("cluster.remote.cluster_1.transport.compression_scheme", Compression.Scheme.DEFLATE); + } else { + settingsChange.put("cluster.remote.cluster_1.transport.compress", enabledChange); + } + settingsChange.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); + service.updateLinkedProject(toConfig("cluster_1", settingsChange.build())); + assertBusy(remoteClusterConnection::isClosed); + + remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + ConnectionProfile connectionProfile = remoteClusterConnection.getConnectionManager().getConnectionProfile(); + assertEquals(pingSchedule, connectionProfile.getPingInterval()); + if (compressionScheme) { + assertEquals(Compression.Enabled.INDEXING_DATA, connectionProfile.getCompressionEnabled()); + assertEquals(Compression.Scheme.DEFLATE, connectionProfile.getCompressionScheme()); + } else { + assertEquals(enabledChange, connectionProfile.getCompressionEnabled()); + assertEquals(Compression.Scheme.LZ4, connectionProfile.getCompressionScheme()); } } } } - public void testRemoteNodeAttribute() throws IOException, InterruptedException { + public void testRemoteNodeAttribute() throws InterruptedException { final Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build(); final List knownNodes = new CopyOnWriteArrayList<>(); final Settings gateway = Settings.builder().put("node.attr.gateway", true).build(); @@ -776,33 +739,30 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertFalse(hasRegisteredClusters(service)); - - final CountDownLatch firstLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); - firstLatch.await(); - - final CountDownLatch secondLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); - secondLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); - assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); - assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); - assertEquals(0, transportService.getConnectionManager().size()); - } + final var service = transportService.getRemoteClusterService(); + assertFalse(hasRegisteredClusters(service)); + + final CountDownLatch firstLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); + assertEquals(0, transportService.getConnectionManager().size()); } } } - public void testRemoteNodeRoles() throws IOException, InterruptedException { + public void testRemoteNodeRoles() throws InterruptedException { final Settings settings = Settings.EMPTY; final List knownNodes = new CopyOnWriteArrayList<>(); final Settings data = nonMasterNode(); @@ -858,28 +818,25 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertFalse(hasRegisteredClusters(service)); - - final CountDownLatch firstLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); - firstLatch.await(); - - final CountDownLatch secondLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); - secondLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); - assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); - assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); - assertEquals(0, transportService.getConnectionManager().size()); - } + final var service = transportService.getRemoteClusterService(); + assertFalse(hasRegisteredClusters(service)); + + final CountDownLatch firstLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + assertFalse(isRemoteNodeConnected(service, "cluster_1", c1N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_1", c1N2Node)); + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + assertFalse(isRemoteNodeConnected(service, "cluster_2", c2N1Node)); + assertTrue(isRemoteNodeConnected(service, "cluster_2", c2N2Node)); + assertEquals(0, transportService.getConnectionManager().size()); } } } @@ -945,127 +902,124 @@ public void testCollectNodes() throws InterruptedException, IOException { ) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertFalse(hasRegisteredClusters(service)); - - final CountDownLatch firstLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); - firstLatch.await(); - - final CountDownLatch secondLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); - secondLatch.await(); - CountDownLatch latch = new CountDownLatch(1); + final var service = transportService.getRemoteClusterService(); + assertFalse(hasRegisteredClusters(service)); + + final CountDownLatch firstLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_1", settings, List.of(c1N1Node, c1N2Node), connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_2", settings, List.of(c2N1Node, c2N2Node), connectionListener(secondLatch)); + secondLatch.await(); + CountDownLatch latch = new CountDownLatch(1); + service.collectNodes( + new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), + new ActionListener>() { + @Override + public void onResponse(BiFunction func) { + try { + assertEquals(c1N1Node, func.apply("cluster_1", c1N1Node.getId())); + assertEquals(c1N2Node, func.apply("cluster_1", c1N2Node.getId())); + assertEquals(c2N1Node, func.apply("cluster_2", c2N1Node.getId())); + assertEquals(c2N2Node, func.apply("cluster_2", c2N2Node.getId())); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Exception e) { + try { + throw new AssertionError(e); + } finally { + latch.countDown(); + } + } + } + ); + latch.await(); + { + CountDownLatch failLatch = new CountDownLatch(1); + AtomicReference ex = new AtomicReference<>(); service.collectNodes( - new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), + new HashSet<>(Arrays.asList("cluster_1", "cluster_2", "no such cluster")), new ActionListener>() { @Override - public void onResponse(BiFunction func) { + public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { try { - assertEquals(c1N1Node, func.apply("cluster_1", c1N1Node.getId())); - assertEquals(c1N2Node, func.apply("cluster_1", c1N2Node.getId())); - assertEquals(c2N1Node, func.apply("cluster_2", c2N1Node.getId())); - assertEquals(c2N2Node, func.apply("cluster_2", c2N2Node.getId())); + fail("should not be called"); } finally { - latch.countDown(); + failLatch.countDown(); } } @Override public void onFailure(Exception e) { try { - throw new AssertionError(e); + ex.set(e); } finally { - latch.countDown(); + failLatch.countDown(); } } } ); - latch.await(); - { - CountDownLatch failLatch = new CountDownLatch(1); - AtomicReference ex = new AtomicReference<>(); - service.collectNodes( - new HashSet<>(Arrays.asList("cluster_1", "cluster_2", "no such cluster")), - new ActionListener>() { - @Override - public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { - try { - fail("should not be called"); - } finally { - failLatch.countDown(); - } - } - - @Override - public void onFailure(Exception e) { - try { - ex.set(e); - } finally { - failLatch.countDown(); - } + failLatch.await(); + assertNotNull(ex.get()); + assertTrue(ex.get() instanceof NoSuchRemoteClusterException); + assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage()); + } + { + logger.info("closing all source nodes"); + // close all targets and check for the transport level failure path + IOUtils.close(c1N1, c1N2, c2N1, c2N2); + logger.info("all source nodes are closed"); + CountDownLatch failLatch = new CountDownLatch(1); + AtomicReference ex = new AtomicReference<>(); + service.collectNodes( + new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), + new ActionListener>() { + @Override + public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { + try { + fail("should not be called"); + } finally { + failLatch.countDown(); } } - ); - failLatch.await(); - assertNotNull(ex.get()); - assertTrue(ex.get() instanceof NoSuchRemoteClusterException); - assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage()); - } - { - logger.info("closing all source nodes"); - // close all targets and check for the transport level failure path - IOUtils.close(c1N1, c1N2, c2N1, c2N2); - logger.info("all source nodes are closed"); - CountDownLatch failLatch = new CountDownLatch(1); - AtomicReference ex = new AtomicReference<>(); - service.collectNodes( - new HashSet<>(Arrays.asList("cluster_1", "cluster_2")), - new ActionListener>() { - @Override - public void onResponse(BiFunction stringStringDiscoveryNodeBiFunction) { - try { - fail("should not be called"); - } finally { - failLatch.countDown(); - } - } - @Override - public void onFailure(Exception e) { - try { - ex.set(e); - } finally { - failLatch.countDown(); - } + @Override + public void onFailure(Exception e) { + try { + ex.set(e); + } finally { + failLatch.countDown(); } } - ); - failLatch.await(); - assertNotNull(ex.get()); - if (ex.get() instanceof IllegalStateException) { - assertThat( - ex.get().getMessage(), - either(equalTo("Unable to open any connections to remote cluster [cluster_1]")).or( - equalTo("Unable to open any connections to remote cluster [cluster_2]") - ) - ); - } else { - assertThat( - ex.get(), - either(instanceOf(TransportException.class)).or(instanceOf(NoSuchRemoteClusterException.class)) - .or(instanceOf(NoSeedNodeLeftException.class)) - ); } + ); + failLatch.await(); + assertNotNull(ex.get()); + if (ex.get() instanceof IllegalStateException) { + assertThat( + ex.get().getMessage(), + either(equalTo("Unable to open any connections to remote cluster [cluster_1]")).or( + equalTo("Unable to open any connections to remote cluster [cluster_2]") + ) + ); + } else { + assertThat( + ex.get(), + either(instanceOf(TransportException.class)).or(instanceOf(NoSuchRemoteClusterException.class)) + .or(instanceOf(NoSeedNodeLeftException.class)) + ); } } } } } - public void testCollectNodesConcurrentWithSettingsChanges() throws IOException { + public void testCollectNodesConcurrentWithSettingsChanges() { final List knownNodes_c1 = new CopyOnWriteArrayList<>(); try ( @@ -1075,24 +1029,25 @@ public void testCollectNodesConcurrentWithSettingsChanges() throws IOException { VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY - ); - var transportService = MockTransportService.createNewService( - Settings.EMPTY, - VersionInformation.CURRENT, - TransportVersion.current(), - threadPool, - null ) ) { final var c1N1Node = c1N1.getLocalNode(); knownNodes_c1.add(c1N1Node); final var seedList = List.of(c1N1Node.getAddress().toString()); - transportService.start(); - transportService.acceptIncomingRequests(); final var initialSettings = createSettings("cluster_1", seedList); - try (RemoteClusterService service = createRemoteClusterService(initialSettings, transportService)) { - initializeRemoteClusters(service); + try ( + var transportService = MockTransportService.createNewService( + initialSettings, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool, + null + ) + ) { + transportService.start(); + transportService.acceptIncomingRequests(); + final var service = transportService.getRemoteClusterService(); assertTrue(hasRegisteredClusters(service)); final var numTasks = between(3, 5); final var taskLatch = new CountDownLatch(numTasks); @@ -1263,10 +1218,13 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { knownNodes.add(node0); knownNodes.add(node1); Collections.shuffle(knownNodes, random()); + final Settings.Builder builder = Settings.builder(); + builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); + final var initialSettings = builder.build(); try ( MockTransportService transportService = MockTransportService.createNewService( - Settings.EMPTY, + initialSettings, VersionInformation.CURRENT, TransportVersion.current(), threadPool, @@ -1275,57 +1233,50 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { ) { transportService.start(); transportService.acceptIncomingRequests(); + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); - final Settings.Builder builder = Settings.builder(); - builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); - final var initialSettings = builder.build(); - try (RemoteClusterService service = createRemoteClusterService(initialSettings, transportService)) { - assertFalse(hasRegisteredClusters(service)); - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - - final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); - assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); - assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); - assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); - assertFalse(firstRemoteClusterConnection.isClosed()); - - final CountDownLatch firstLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_test", initialSettings, List.of(node0), connectionListener(firstLatch)); - firstLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); - assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); - assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); - assertFalse(firstRemoteClusterConnection.isClosed()); - assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test")); - - final List newSeeds = new ArrayList<>(); - newSeeds.add(node1); - if (randomBoolean()) { - newSeeds.add(node0); - Collections.shuffle(newSeeds, random()); - } + final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); + assertFalse(firstRemoteClusterConnection.isClosed()); - final CountDownLatch secondLatch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_test", initialSettings, newSeeds, connectionListener(secondLatch)); - secondLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertBusy(() -> { - assertFalse(firstRemoteClusterConnection.isNodeConnected(node0)); - assertFalse(firstRemoteClusterConnection.isNodeConnected(node1)); - assertEquals(0, firstRemoteClusterConnection.getNumNodesConnected()); - assertTrue(firstRemoteClusterConnection.isClosed()); - }); - - final RemoteClusterConnection secondRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); - assertTrue(secondRemoteClusterConnection.isNodeConnected(node0)); - assertTrue(secondRemoteClusterConnection.isNodeConnected(node1)); - assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected()); - assertFalse(secondRemoteClusterConnection.isClosed()); + final CountDownLatch firstLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_test", initialSettings, List.of(node0), connectionListener(firstLatch)); + firstLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected()); + assertFalse(firstRemoteClusterConnection.isClosed()); + assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test")); + + final List newSeeds = new ArrayList<>(); + newSeeds.add(node1); + if (randomBoolean()) { + newSeeds.add(node0); + Collections.shuffle(newSeeds, random()); } + + final CountDownLatch secondLatch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_test", initialSettings, newSeeds, connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertBusy(() -> { + assertFalse(firstRemoteClusterConnection.isNodeConnected(node0)); + assertFalse(firstRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(0, firstRemoteClusterConnection.getNumNodesConnected()); + assertTrue(firstRemoteClusterConnection.isClosed()); + }); + + final RemoteClusterConnection secondRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test"); + assertTrue(secondRemoteClusterConnection.isNodeConnected(node0)); + assertTrue(secondRemoteClusterConnection.isNodeConnected(node1)); + assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected()); + assertFalse(secondRemoteClusterConnection.isClosed()); } } } @@ -1418,7 +1369,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(nodeNameSettings) .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)) .build(); - try (RemoteClusterService service = createRemoteClusterService(settingsWithRemoteClusterClientRole, null)) { + try (RemoteClusterService service = createRemoteClusterService(settingsWithRemoteClusterClientRole)) { service.ensureClientIsEnabled(); } @@ -1427,7 +1378,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(nodeNameSettings) .put(removeRoles(Set.of(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .build(); - try (RemoteClusterService service = createRemoteClusterService(settingsWithoutRemoteClusterClientRole, null)) { + try (RemoteClusterService service = createRemoteClusterService(settingsWithoutRemoteClusterClientRole)) { final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertThat(exception.getMessage(), equalTo("node [node-1] does not have the [remote_cluster_client] role")); } @@ -1438,7 +1389,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.INDEX_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledSettingsOnNonSearchNode, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledSettingsOnNonSearchNode)) { final var exception = expectThrows(IllegalArgumentException.class, service::ensureClientIsEnabled); assertThat( exception.getMessage(), @@ -1455,7 +1406,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.SEARCH_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeSettings, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeSettings)) { service.ensureClientIsEnabled(); } final var statelessEnabledOnRemoteClusterClientSettings = Settings.builder() @@ -1463,7 +1414,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRole(DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE)) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnRemoteClusterClientSettings, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnRemoteClusterClientSettings)) { service.ensureClientIsEnabled(); } final var statelessEnabledOnSearchNodeAndRemoteClusterClientSettings = Settings.builder() @@ -1471,7 +1422,7 @@ public void testRemoteClusterServiceEnsureClientIsEnabled() throws IOException { .put(onlyRoles(Set.of(DiscoveryNodeRole.SEARCH_ROLE, DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE))) .put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, true) .build(); - try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings, null)) { + try (RemoteClusterService service = createRemoteClusterService(statelessEnabledOnSearchNodeAndRemoteClusterClientSettings)) { service.ensureClientIsEnabled(); } } @@ -1500,7 +1451,7 @@ public void testRemoteClusterServiceNotEnabledGetCollectNodes() { } } - public void testUseDifferentTransportProfileForCredentialsProtectedRemoteClusters() throws IOException, InterruptedException { + public void testUseDifferentTransportProfileForCredentialsProtectedRemoteClusters() throws InterruptedException { final List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService c1 = startTransport( @@ -1542,56 +1493,53 @@ public void testUseDifferentTransportProfileForCredentialsProtectedRemoteCluster }); transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - initializeRemoteClusters(service); - - final CountDownLatch firstLatch = new CountDownLatch(1); - final Settings.Builder firstRemoteClusterSettingsBuilder = Settings.builder(); - final boolean firstRemoteClusterProxyMode = randomBoolean(); - if (firstRemoteClusterProxyMode) { - firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.mode", "proxy") - .put("cluster.remote.cluster_1.proxy_address", c1Node.getAddress().toString()); - } else { - firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.seeds", c1Node.getAddress().toString()); - } - final var updatedSettings1 = firstRemoteClusterSettingsBuilder.build(); - updateRemoteCluster(service, "cluster_1", settings, updatedSettings1, connectionListener(firstLatch)); - firstLatch.await(); - - final CountDownLatch secondLatch = new CountDownLatch(1); - final Settings.Builder secondRemoteClusterSettingsBuilder = Settings.builder(); - final boolean secondRemoteClusterProxyMode = randomBoolean(); - if (secondRemoteClusterProxyMode) { - secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.mode", "proxy") - .put("cluster.remote.cluster_2.proxy_address", c2Node.getAddress().toString()); - } else { - secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.seeds", c2Node.getAddress().toString()); - } - final var updatedSettings2 = secondRemoteClusterSettingsBuilder.build(); - updateRemoteCluster(service, "cluster_2", settings, updatedSettings2, connectionListener(secondLatch)); - secondLatch.await(); - - assertTrue(hasRegisteredClusters(service)); - assertTrue(isRemoteClusterRegistered(service, "cluster_1")); - if (firstRemoteClusterProxyMode) { - assertFalse(isRemoteNodeConnected(service, "cluster_1", c1Node)); - } else { - assertTrue(isRemoteNodeConnected(service, "cluster_1", c1Node)); - } - assertTrue(isRemoteClusterRegistered(service, "cluster_2")); - if (secondRemoteClusterProxyMode) { - assertFalse(isRemoteNodeConnected(service, "cluster_2", c2Node)); - } else { - assertTrue(isRemoteNodeConnected(service, "cluster_2", c2Node)); - } - // No local node connection - assertEquals(0, transportService.getConnectionManager().size()); + final var service = transportService.getRemoteClusterService(); + final CountDownLatch firstLatch = new CountDownLatch(1); + final Settings.Builder firstRemoteClusterSettingsBuilder = Settings.builder(); + final boolean firstRemoteClusterProxyMode = randomBoolean(); + if (firstRemoteClusterProxyMode) { + firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.mode", "proxy") + .put("cluster.remote.cluster_1.proxy_address", c1Node.getAddress().toString()); + } else { + firstRemoteClusterSettingsBuilder.put("cluster.remote.cluster_1.seeds", c1Node.getAddress().toString()); + } + final var updatedSettings1 = firstRemoteClusterSettingsBuilder.build(); + updateRemoteCluster(service, "cluster_1", settings, updatedSettings1, connectionListener(firstLatch)); + firstLatch.await(); + + final CountDownLatch secondLatch = new CountDownLatch(1); + final Settings.Builder secondRemoteClusterSettingsBuilder = Settings.builder(); + final boolean secondRemoteClusterProxyMode = randomBoolean(); + if (secondRemoteClusterProxyMode) { + secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.mode", "proxy") + .put("cluster.remote.cluster_2.proxy_address", c2Node.getAddress().toString()); + } else { + secondRemoteClusterSettingsBuilder.put("cluster.remote.cluster_2.seeds", c2Node.getAddress().toString()); + } + final var updatedSettings2 = secondRemoteClusterSettingsBuilder.build(); + updateRemoteCluster(service, "cluster_2", settings, updatedSettings2, connectionListener(secondLatch)); + secondLatch.await(); + + assertTrue(hasRegisteredClusters(service)); + assertTrue(isRemoteClusterRegistered(service, "cluster_1")); + if (firstRemoteClusterProxyMode) { + assertFalse(isRemoteNodeConnected(service, "cluster_1", c1Node)); + } else { + assertTrue(isRemoteNodeConnected(service, "cluster_1", c1Node)); + } + assertTrue(isRemoteClusterRegistered(service, "cluster_2")); + if (secondRemoteClusterProxyMode) { + assertFalse(isRemoteNodeConnected(service, "cluster_2", c2Node)); + } else { + assertTrue(isRemoteNodeConnected(service, "cluster_2", c2Node)); } + // No local node connection + assertEquals(0, transportService.getConnectionManager().size()); } } } - public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfile() throws IOException, InterruptedException { + public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfile() throws InterruptedException { final List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService c = startTransport( @@ -1617,52 +1565,48 @@ public void testUpdateRemoteClusterCredentialsRebuildsConnectionWithCorrectProfi ) { transportService.start(); transportService.acceptIncomingRequests(); + final var service = transportService.getRemoteClusterService(); + final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString()); + final CountDownLatch latch = new CountDownLatch(1); + updateRemoteCluster(service, "cluster_1", Settings.EMPTY, clusterSettings, connectionListener(latch)); + latch.await(); + + assertConnectionHasProfile(service.getRemoteClusterConnection("cluster_1"), "default"); + + { + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("cluster.remote.cluster_1.credentials", randomAlphaOfLength(10)); + final PlainActionFuture listener = new PlainActionFuture<>(); + final Settings settings = Settings.builder().put(clusterSettings).setSecureSettings(secureSettings).build(); + final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); + assertThat(result.addedClusterAliases(), equalTo(Set.of("cluster_1"))); + final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, settings); + service.updateRemoteCluster(config, true, listener); + listener.actionGet(10, TimeUnit.SECONDS); + } - try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - initializeRemoteClusters(service); - - final Settings clusterSettings = buildRemoteClusterSettings("cluster_1", discoNode.getAddress().toString()); - final CountDownLatch latch = new CountDownLatch(1); - updateRemoteCluster(service, "cluster_1", Settings.EMPTY, clusterSettings, connectionListener(latch)); - latch.await(); - - assertConnectionHasProfile(service.getRemoteClusterConnection("cluster_1"), "default"); - - { - final MockSecureSettings secureSettings = new MockSecureSettings(); - secureSettings.setString("cluster.remote.cluster_1.credentials", randomAlphaOfLength(10)); - final PlainActionFuture listener = new PlainActionFuture<>(); - final Settings settings = Settings.builder().put(clusterSettings).setSecureSettings(secureSettings).build(); - final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); - assertThat(result.addedClusterAliases(), equalTo(Set.of("cluster_1"))); - final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, settings); - service.updateRemoteCluster(config, true, listener); - listener.actionGet(10, TimeUnit.SECONDS); - } - - assertConnectionHasProfile( - service.getRemoteClusterConnection("cluster_1"), - RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE - ); - - { - final PlainActionFuture listener = new PlainActionFuture<>(); - // Settings without credentials constitute credentials removal - final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(clusterSettings); - assertThat(result.addedClusterAliases().size(), equalTo(0)); - assertThat(result.removedClusterAliases(), equalTo(Set.of("cluster_1"))); - final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, clusterSettings); - service.updateRemoteCluster(config, true, listener); - listener.actionGet(10, TimeUnit.SECONDS); - } + assertConnectionHasProfile( + service.getRemoteClusterConnection("cluster_1"), + RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE + ); - assertConnectionHasProfile(service.getRemoteClusterConnection("cluster_1"), "default"); + { + final PlainActionFuture listener = new PlainActionFuture<>(); + // Settings without credentials constitute credentials removal + final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(clusterSettings); + assertThat(result.addedClusterAliases().size(), equalTo(0)); + assertThat(result.removedClusterAliases(), equalTo(Set.of("cluster_1"))); + final var config = buildLinkedProjectConfig("cluster_1", Settings.EMPTY, clusterSettings); + service.updateRemoteCluster(config, true, listener); + listener.actionGet(10, TimeUnit.SECONDS); } + + assertConnectionHasProfile(service.getRemoteClusterConnection("cluster_1"), "default"); } } } - public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespiteFailures() throws IOException, InterruptedException { + public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespiteFailures() throws InterruptedException { final List knownNodes = new CopyOnWriteArrayList<>(); try ( MockTransportService c1 = startTransport( @@ -1711,83 +1655,80 @@ public void testUpdateRemoteClusterCredentialsRebuildsMultipleConnectionsDespite alias -> alias.equals(goodCluster) || alias.equals(badCluster), () -> randomAlphaOfLength(10) ); - try (RemoteClusterService service = createRemoteClusterService(Settings.EMPTY, transportService)) { - initializeRemoteClusters(service); - - final Settings cluster1Settings = buildRemoteClusterSettings(goodCluster, c1DiscoNode.getAddress().toString()); - final var latch = new CountDownLatch(1); - updateRemoteCluster(service, goodCluster, Settings.EMPTY, cluster1Settings, connectionListener(latch)); - latch.await(); - - final Settings cluster2Settings = buildRemoteClusterSettings(badCluster, c2DiscoNode.getAddress().toString()); - final PlainActionFuture future = new PlainActionFuture<>(); - updateRemoteCluster(service, badCluster, Settings.EMPTY, cluster2Settings, future); - final var ex = expectThrows(Exception.class, () -> future.actionGet(10, TimeUnit.SECONDS)); - assertThat(ex.getMessage(), containsString("bad cluster")); - - assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default"); - assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default"); - expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); - final Set aliases = Set.of(badCluster, goodCluster, missingCluster); - final ActionListener noop = ActionListener.noop(); - - { - final MockSecureSettings secureSettings = new MockSecureSettings(); - secureSettings.setString("cluster.remote." + badCluster + ".credentials", randomAlphaOfLength(10)); - secureSettings.setString("cluster.remote." + goodCluster + ".credentials", randomAlphaOfLength(10)); - secureSettings.setString("cluster.remote." + missingCluster + ".credentials", randomAlphaOfLength(10)); - final PlainActionFuture listener = new PlainActionFuture<>(); - final Settings settings = Settings.builder() - .put(cluster1Settings) - .put(cluster2Settings) - .setSecureSettings(secureSettings) - .build(); - final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); - assertThat(result.addedClusterAliases(), equalTo(aliases)); - try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { - for (String alias : aliases) { - final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings); - service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire())); - } + final var service = transportService.getRemoteClusterService(); + final Settings cluster1Settings = buildRemoteClusterSettings(goodCluster, c1DiscoNode.getAddress().toString()); + final var latch = new CountDownLatch(1); + updateRemoteCluster(service, goodCluster, Settings.EMPTY, cluster1Settings, connectionListener(latch)); + latch.await(); + + final Settings cluster2Settings = buildRemoteClusterSettings(badCluster, c2DiscoNode.getAddress().toString()); + final PlainActionFuture future = new PlainActionFuture<>(); + updateRemoteCluster(service, badCluster, Settings.EMPTY, cluster2Settings, future); + final var ex = expectThrows(Exception.class, () -> future.actionGet(10, TimeUnit.SECONDS)); + assertThat(ex.getMessage(), containsString("bad cluster")); + + assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default"); + assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default"); + expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); + final Set aliases = Set.of(badCluster, goodCluster, missingCluster); + final ActionListener noop = ActionListener.noop(); + + { + final MockSecureSettings secureSettings = new MockSecureSettings(); + secureSettings.setString("cluster.remote." + badCluster + ".credentials", randomAlphaOfLength(10)); + secureSettings.setString("cluster.remote." + goodCluster + ".credentials", randomAlphaOfLength(10)); + secureSettings.setString("cluster.remote." + missingCluster + ".credentials", randomAlphaOfLength(10)); + final PlainActionFuture listener = new PlainActionFuture<>(); + final Settings settings = Settings.builder() + .put(cluster1Settings) + .put(cluster2Settings) + .setSecureSettings(secureSettings) + .build(); + final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); + assertThat(result.addedClusterAliases(), equalTo(aliases)); + try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { + for (String alias : aliases) { + final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings); + service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire())); } - listener.actionGet(10, TimeUnit.SECONDS); } + listener.actionGet(10, TimeUnit.SECONDS); + } - assertConnectionHasProfile( - service.getRemoteClusterConnection(goodCluster), - RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE - ); - assertConnectionHasProfile( - service.getRemoteClusterConnection(badCluster), - RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE - ); - expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); - - { - final PlainActionFuture listener = new PlainActionFuture<>(); - final Settings settings = Settings.builder().put(cluster1Settings).put(cluster2Settings).build(); - // Settings without credentials constitute credentials removal - final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); - assertThat(result.addedClusterAliases().size(), equalTo(0)); - assertThat(result.removedClusterAliases(), equalTo(aliases)); - try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { - for (String alias : aliases) { - final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings); - service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire())); - } + assertConnectionHasProfile( + service.getRemoteClusterConnection(goodCluster), + RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE + ); + assertConnectionHasProfile( + service.getRemoteClusterConnection(badCluster), + RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE + ); + expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); + + { + final PlainActionFuture listener = new PlainActionFuture<>(); + final Settings settings = Settings.builder().put(cluster1Settings).put(cluster2Settings).build(); + // Settings without credentials constitute credentials removal + final var result = service.getRemoteClusterCredentialsManager().updateClusterCredentials(settings); + assertThat(result.addedClusterAliases().size(), equalTo(0)); + assertThat(result.removedClusterAliases(), equalTo(aliases)); + try (var connectionRefs = new RefCountingRunnable(() -> listener.onResponse(null))) { + for (String alias : aliases) { + final var config = buildLinkedProjectConfig(alias, Settings.EMPTY, settings); + service.updateRemoteCluster(config, true, ActionListener.releaseAfter(noop, connectionRefs.acquire())); } - listener.actionGet(10, TimeUnit.SECONDS); } - - assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default"); - assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default"); - expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); + listener.actionGet(10, TimeUnit.SECONDS); } + + assertConnectionHasProfile(service.getRemoteClusterConnection(goodCluster), "default"); + assertConnectionHasProfile(service.getRemoteClusterConnection(badCluster), "default"); + expectThrows(NoSuchRemoteClusterException.class, () -> service.getRemoteClusterConnection(missingCluster)); } } } - public void testCorrectTransportProfileUsedWhenCPSEnabled() throws IOException { + public void testCorrectTransportProfileUsedWhenCPSEnabled() { final var versionInfo = VersionInformation.CURRENT; final var transportVers = TransportVersion.current(); final var knownNodes = new CopyOnWriteArrayList(); @@ -1805,14 +1746,12 @@ public void testCorrectTransportProfileUsedWhenCPSEnabled() throws IOException { try (var transportService = MockTransportService.createNewService(settings, versionInfo, transportVers, threadPool)) { transportService.start(); transportService.acceptIncomingRequests(); - try (RemoteClusterService service = createRemoteClusterService(settings, transportService)) { - initializeRemoteClusters(service); - assertTrue(hasRegisteredClusters(service)); - assertConnectionHasProfile( - service.getRemoteClusterConnection("cluster1"), - RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE - ); - } + final var service = transportService.getRemoteClusterService(); + assertTrue(hasRegisteredClusters(service)); + assertConnectionHasProfile( + service.getRemoteClusterConnection("cluster1"), + RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE + ); } } } @@ -1836,14 +1775,20 @@ private Settings buildRemoteClusterSettings(String clusterAlias, String address) return settings.build(); } - public void testLogsConnectionResult() throws IOException { + public void testLogsConnectionResult() { final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(); try ( var remote = startTransport("remote", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); - var local = startTransport("local", List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); - var remoteClusterService = createRemoteClusterService(Settings.EMPTY, clusterSettings, local) + var local = MockTransportService.createNewService( + Settings.EMPTY, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool, + clusterSettings + ) ) { - linkedProjectConfigService.register(remoteClusterService); + local.start(); + local.acceptIncomingRequests(); assertThatLogger( () -> clusterSettings.applySettings( @@ -1882,7 +1827,7 @@ public void testLogsConnectionResult() throws IOException { } } - public void testSetSkipUnavailable() throws IOException { + public void testSetSkipUnavailable() { final var skipUnavailableProperty = RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("remote") .getKey(); final var seedNodeProperty = SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("remote").getKey(); @@ -1891,11 +1836,15 @@ public void testSetSkipUnavailable() throws IOException { try ( var remote1Transport = startTransport("remote1"); var remote2Transport = startTransport("remote2"); - var local = startTransport("local"); - var remoteClusterService = createRemoteClusterService(Settings.EMPTY, clusterSettings, local) + var local = MockTransportService.createNewService( + Settings.EMPTY, + VersionInformation.CURRENT, + TransportVersion.current(), + threadPool, + clusterSettings + ) ) { - linkedProjectConfigService.register(remoteClusterService); - + final var remoteClusterService = local.getRemoteClusterService(); record SkipUnavailableTestConfig( boolean skipUnavailable, MockTransportService seedNodeTransportService, @@ -1964,10 +1913,6 @@ private void updateRemoteCluster( service.updateRemoteCluster(buildLinkedProjectConfig(alias, settings, newSettings), false, listener); } - private void initializeRemoteClusters(RemoteClusterService remoteClusterService) { - remoteClusterService.initializeRemoteClusters(linkedProjectConfigService.getInitialLinkedProjectConfigs()); - } - private static Settings createSettings(String clusterAlias, List seeds) { Settings.Builder builder = Settings.builder(); builder.put( From 97bc022b5e36b9baa79dbe7b4250e9647e502c46 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Wed, 5 Nov 2025 15:46:57 -0500 Subject: [PATCH 4/7] Move metric registration to RemoteClusterService --- .../transport/RemoteClusterService.java | 10 ++++ .../transport/RemoteConnectionStrategy.java | 46 ++++++++++--------- .../RemoteConnectionStrategyTests.java | 20 +++++--- 3 files changed, 48 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index bec802147ea90..4ea73854f359e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -65,6 +65,7 @@ public final class RemoteClusterService extends RemoteClusterAware private static final Logger logger = LogManager.getLogger(RemoteClusterService.class); public static final String REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME = "cluster:internal/remote_cluster/handshake"; + public static final String CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.error.total"; private final boolean isRemoteClusterClient; private final boolean isSearchNode; @@ -101,6 +102,15 @@ public boolean isRemoteClusterServerEnabled() { * the functionality to do it the right way is not yet ready -- replace this code when it's ready. */ this.crossProjectEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false); + final var telemetryProvider = transportService == null ? null : transportService.getTelemetryProvider(); + final var meterRegistry = telemetryProvider == null ? null : telemetryProvider.getMeterRegistry(); + if (meterRegistry != null) { + meterRegistry.registerLongCounter( + CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, + "linked project connection attempt failure count", + "count" + ); + } } public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index eadec94a35a3d..fa4d4ff255318 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -73,6 +73,11 @@ public Writeable.Reader getReader() { } } + enum ConnectionAttempt { + initial, + reconnect + } + private final int maxPendingConnectionListeners; protected final Logger logger = LogManager.getLogger(getClass()); @@ -81,11 +86,7 @@ public Writeable.Reader getReader() { private final Object mutex = new Object(); private List> listeners = new ArrayList<>(); private final AtomicBoolean initialConnectionAttempted = new AtomicBoolean(false); - - static final String INITIAL_CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.initial.error.total"; - static final String RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME = "es.projects.linked.connections.reconnect.error.total"; - private static LongCounter initialConnectionAttemptFailures; - private static LongCounter reconnectAttemptFailures; + private final LongCounter connectionAttemptFailures; protected final TransportService transportService; protected final RemoteConnectionManager connectionManager; @@ -100,25 +101,13 @@ public Writeable.Reader getReader() { this.transportService = transportService; this.connectionManager = connectionManager; this.maxPendingConnectionListeners = config.maxPendingConnectionListeners(); - registerMetrics(transportService.getTelemetryProvider()); + this.connectionAttemptFailures = lookupConnectionFailureMetric(transportService.getTelemetryProvider()); connectionManager.addListener(this); } - private static synchronized void registerMetrics(TelemetryProvider telemetryProvider) { + private LongCounter lookupConnectionFailureMetric(TelemetryProvider telemetryProvider) { final var meterRegistry = telemetryProvider == null ? null : telemetryProvider.getMeterRegistry(); - if (initialConnectionAttemptFailures != null || meterRegistry == null) { - return; - } - initialConnectionAttemptFailures = meterRegistry.registerLongCounter( - INITIAL_CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, - "linked project initial connection attempt failure count", - "count" - ); - reconnectAttemptFailures = meterRegistry.registerLongCounter( - RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, - "linked project reconnection attempt failure count", - "count" - ); + return meterRegistry == null ? null : meterRegistry.getLongCounter(RemoteClusterService.CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME); } static ConnectionProfile buildConnectionProfile(LinkedProjectConfig config, String transportProfile) { @@ -247,8 +236,21 @@ private void connectionAttemptCompleted(@Nullable Exception e) { logger.debug(msgSupplier); } else { logger.warn(msgSupplier, e); - final var counter = isInitialAttempt ? initialConnectionAttemptFailures : reconnectAttemptFailures; - counter.incrementBy(1, Map.of("linked_project_id", linkedProjectId.toString(), "linked_project_alias", clusterAlias)); + if (connectionAttemptFailures != null) { + connectionAttemptFailures.incrementBy( + 1, + Map.of( + "linked_project_id", + linkedProjectId.toString(), + "linked_project_alias", + clusterAlias, + "attempt", + isInitialAttempt ? ConnectionAttempt.initial : ConnectionAttempt.reconnect, + "strategy", + strategyType() + ) + ); + } } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 0601e73abf5fc..3078caf1ee7c9 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -262,17 +262,25 @@ public void testConnectionAttemptMetricsAndLogging() { ); if (shouldConnectFail) { metricRecorder.collect(); - final var counterName = isInitialConnectAttempt - ? RemoteConnectionStrategy.INITIAL_CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME - : RemoteConnectionStrategy.RECONNECTION_ATTEMPT_FAILURES_COUNTER_NAME; + final var counterName = RemoteClusterService.CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME; final var measurements = metricRecorder.getMeasurements(InstrumentType.LONG_COUNTER, counterName); - assertThat(measurements, hasSize(1)); - final var measurement = measurements.getFirst(); + assertFalse(measurements.isEmpty()); + final var measurement = measurements.getLast(); assertThat(measurement.getLong(), equalTo(1L)); final var attributes = measurement.attributes(); - assertThat(attributes.keySet(), equalTo(Set.of("linked_project_id", "linked_project_alias"))); + final var keySet = Set.of("linked_project_id", "linked_project_alias", "attempt", "strategy"); + assertThat(attributes.keySet(), equalTo(keySet)); assertThat(attributes.get("linked_project_id"), equalTo(linkedProjectId.toString())); assertThat(attributes.get("linked_project_alias"), equalTo(alias)); + assertThat( + attributes.get("attempt"), + equalTo( + isInitialConnectAttempt + ? RemoteConnectionStrategy.ConnectionAttempt.initial + : RemoteConnectionStrategy.ConnectionAttempt.reconnect + ) + ); + assertThat(attributes.get("strategy"), equalTo(strategy.strategyType())); } } } From 96114ac839b8d958d312542a3f3b6274278bd1a9 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 5 Nov 2025 20:55:12 +0000 Subject: [PATCH 5/7] [CI] Auto commit changes from spotless --- .../elasticsearch/transport/RemoteConnectionStrategyTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 3078caf1ee7c9..5639fd76e3855 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -39,7 +39,6 @@ import static org.elasticsearch.transport.RemoteClusterSettings.toConfig; import static org.elasticsearch.transport.RemoteConnectionStrategy.buildConnectionProfile; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; import static org.mockito.Mockito.mock; public class RemoteConnectionStrategyTests extends ESTestCase { From af436be920b00e32e6aff745f8e29f2b1a99c8af Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Wed, 5 Nov 2025 17:11:23 -0500 Subject: [PATCH 6/7] Fix metric attribute value type --- .../transport/RemoteConnectionStrategy.java | 4 ++-- .../transport/RemoteConnectionStrategyTests.java | 14 +++++--------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java index fa4d4ff255318..3986788ac4a4e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionStrategy.java @@ -245,9 +245,9 @@ private void connectionAttemptCompleted(@Nullable Exception e) { "linked_project_alias", clusterAlias, "attempt", - isInitialAttempt ? ConnectionAttempt.initial : ConnectionAttempt.reconnect, + (isInitialAttempt ? ConnectionAttempt.initial : ConnectionAttempt.reconnect).toString(), "strategy", - strategyType() + strategyType().toString() ) ); } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java index 5639fd76e3855..aa4260a1cd20c 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionStrategyTests.java @@ -268,18 +268,14 @@ public void testConnectionAttemptMetricsAndLogging() { assertThat(measurement.getLong(), equalTo(1L)); final var attributes = measurement.attributes(); final var keySet = Set.of("linked_project_id", "linked_project_alias", "attempt", "strategy"); + final var expectedAttemptType = isInitialConnectAttempt + ? RemoteConnectionStrategy.ConnectionAttempt.initial + : RemoteConnectionStrategy.ConnectionAttempt.reconnect; assertThat(attributes.keySet(), equalTo(keySet)); assertThat(attributes.get("linked_project_id"), equalTo(linkedProjectId.toString())); assertThat(attributes.get("linked_project_alias"), equalTo(alias)); - assertThat( - attributes.get("attempt"), - equalTo( - isInitialConnectAttempt - ? RemoteConnectionStrategy.ConnectionAttempt.initial - : RemoteConnectionStrategy.ConnectionAttempt.reconnect - ) - ); - assertThat(attributes.get("strategy"), equalTo(strategy.strategyType())); + assertThat(attributes.get("attempt"), equalTo(expectedAttemptType.toString())); + assertThat(attributes.get("strategy"), equalTo(strategy.strategyType().toString())); } } } From d06dfbda85fa1c7486a2924382d6323b5231b289 Mon Sep 17 00:00:00 2001 From: Jeremy Dahlgren Date: Fri, 7 Nov 2025 17:46:55 -0500 Subject: [PATCH 7/7] remove extra null checks --- .../transport/RemoteClusterService.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 4ea73854f359e..31d73822ecd9d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -102,14 +102,10 @@ public boolean isRemoteClusterServerEnabled() { * the functionality to do it the right way is not yet ready -- replace this code when it's ready. */ this.crossProjectEnabled = settings.getAsBoolean("serverless.cross_project.enabled", false); - final var telemetryProvider = transportService == null ? null : transportService.getTelemetryProvider(); - final var meterRegistry = telemetryProvider == null ? null : telemetryProvider.getMeterRegistry(); - if (meterRegistry != null) { - meterRegistry.registerLongCounter( - CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, - "linked project connection attempt failure count", - "count" - ); + if (transportService != null) { + transportService.getTelemetryProvider() + .getMeterRegistry() + .registerLongCounter(CONNECTION_ATTEMPT_FAILURES_COUNTER_NAME, "linked project connection attempt failure count", "count"); } }