diff --git a/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java index 3cb06f7da007e..e16d72e311281 100644 --- a/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/AbstractLinkedProjectConfigService.java @@ -9,8 +9,6 @@ package org.elasticsearch.transport; -import org.elasticsearch.cluster.metadata.ProjectId; - import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -29,15 +27,4 @@ public void register(LinkedProjectConfigListener listener) { protected void handleUpdate(LinkedProjectConfig config) { listeners.forEach(listener -> listener.updateLinkedProject(config)); } - - protected void handleSkipUnavailableChanged( - ProjectId originProjectId, - ProjectId linkedProjectId, - String linkedProjectAlias, - boolean skipUnavailable - ) { - listeners.forEach( - listener -> listener.skipUnavailableChanged(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable) - ); - } } diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java index 4fa8619f8dbe3..caff218fcbe63 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigService.java @@ -48,6 +48,7 @@ public ClusterSettingsLinkedProjectConfigService( RemoteClusterSettings.REMOTE_CLUSTER_COMPRESS, RemoteClusterSettings.REMOTE_CLUSTER_PING_SCHEDULE, RemoteClusterSettings.REMOTE_CONNECTION_MODE, + RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE, RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY, RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS, RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_NODE_CONNECTIONS, @@ -56,11 +57,6 @@ public ClusterSettingsLinkedProjectConfigService( RemoteClusterSettings.ProxyConnectionStrategySettings.SERVER_NAME ); clusterSettings.addAffixGroupUpdateConsumer(remoteClusterSettings, this::settingsChangedCallback); - clusterSettings.addAffixUpdateConsumer( - RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE, - this::skipUnavailableChangedCallback, - (alias, value) -> {} - ); } } @@ -79,9 +75,4 @@ private void settingsChangedCallback(String clusterAlias, Settings newSettings) final var config = RemoteClusterSettings.toConfig(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, mergedSettings); handleUpdate(config); } - - @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") - private void skipUnavailableChangedCallback(String clusterAlias, Boolean skipUnavailable) { - handleSkipUnavailableChanged(projectResolver.getProjectId(), ProjectId.DEFAULT, clusterAlias, skipUnavailable); - } } diff --git a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java index 66c465f9162c2..219324a0cdfab 100644 --- a/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java +++ b/server/src/main/java/org/elasticsearch/transport/LinkedProjectConfigService.java @@ -9,8 +9,6 @@ package org.elasticsearch.transport; -import org.elasticsearch.cluster.metadata.ProjectId; - import java.util.Collection; /** @@ -29,22 +27,6 @@ interface LinkedProjectConfigListener { * @param config The updated {@link LinkedProjectConfig}. */ void updateLinkedProject(LinkedProjectConfig config); - - /** - * Called when the boolean skip_unavailable setting has changed for a linked project configuration. - * Note that skip_unavailable may not be supported in all contexts where linked projects are used. - * - * @param originProjectId The {@link ProjectId} of the owning project that has the linked project configuration. - * @param linkedProjectId The {@link ProjectId} of the linked project. - * @param linkedProjectAlias The alias used for the linked project. - * @param skipUnavailable The new value of the skip_unavailable setting. - */ - default void skipUnavailableChanged( - ProjectId originProjectId, - ProjectId linkedProjectId, - String linkedProjectAlias, - boolean skipUnavailable - ) {} } /** diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 45eee901881ca..0ea4dd6d4c3e1 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -293,21 +293,6 @@ public RemoteClusterConnection getRemoteClusterConnection(String cluster) { return connection; } - @Override - public void skipUnavailableChanged( - ProjectId originProjectId, - ProjectId linkedProjectId, - String linkedProjectAlias, - boolean skipUnavailable - ) { - assert crossProjectEnabled == false - : "Cannot configure setting [" + RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getKey() + "] in CPS environments."; - final var remote = getConnectionsMapForProject(originProjectId).get(linkedProjectAlias); - if (remote != null) { - remote.setSkipUnavailable(skipUnavailable); - } - } - @Override public void updateLinkedProject(LinkedProjectConfig config) { final var projectId = config.originProjectId(); @@ -385,6 +370,9 @@ public synchronized void updateRemoteCluster( remote = new RemoteClusterConnection(config, transportService, remoteClusterCredentialsManager, crossProjectEnabled); connectionMap.put(clusterAlias, remote); remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED)); + } else if (remote.isSkipUnavailable() != config.skipUnavailable()) { + remote.setSkipUnavailable(config.skipUnavailable()); + listener.onResponse(RemoteClusterConnectionStatus.UPDATED); } else { // No changes to connection configuration. listener.onResponse(RemoteClusterConnectionStatus.UNCHANGED); @@ -395,7 +383,8 @@ public enum RemoteClusterConnectionStatus { CONNECTED, DISCONNECTED, RECONNECTED, - UNCHANGED + UNCHANGED, + UPDATED } /** diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java index cbf248762a75f..c934d69df25ad 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterSettingsLinkedProjectConfigServiceTests.java @@ -9,7 +9,6 @@ package org.elasticsearch.transport; -import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.project.DefaultProjectResolver; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -25,35 +24,16 @@ public class ClusterSettingsLinkedProjectConfigServiceTests extends ESTestCase { - private record SkipUnavailableUpdate( - ProjectId originProjectId, - ProjectId linkedProjectId, - String linkedProjectAlias, - boolean skipUnavailable - ) {} - private static class StubLinkedProjectConfigListener implements LinkedProjectConfigListener { LinkedProjectConfig updatedConfig; - SkipUnavailableUpdate skipUnavailableUpdate; @Override public void updateLinkedProject(LinkedProjectConfig config) { updatedConfig = config; } - @Override - public void skipUnavailableChanged( - ProjectId originProjectId, - ProjectId linkedProjectId, - String linkedProjectAlias, - boolean skipUnavailable - ) { - skipUnavailableUpdate = new SkipUnavailableUpdate(originProjectId, linkedProjectId, linkedProjectAlias, skipUnavailable); - } - void reset() { updatedConfig = null; - skipUnavailableUpdate = null; } } @@ -66,9 +46,11 @@ public void testListenersReceiveUpdates() { final var alias = randomAlphaOfLength(10); final var initialProxyAddress = "localhost:9400"; + final var initialSkipUnavailable = randomBoolean(); final var initialSettings = Settings.builder() .put("cluster.remote." + alias + ".mode", "proxy") .put("cluster.remote." + alias + ".proxy_address", initialProxyAddress) + .put("cluster.remote." + alias + ".skip_unavailable", initialSkipUnavailable) .build(); final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(initialSettings); final var service = new ClusterSettingsLinkedProjectConfigService( @@ -76,7 +58,9 @@ public void testListenersReceiveUpdates() { clusterSettings, DefaultProjectResolver.INSTANCE ); - final var config = new ProxyLinkedProjectConfigBuilder(alias).proxyAddress(initialProxyAddress).build(); + final var config = new ProxyLinkedProjectConfigBuilder(alias).proxyAddress(initialProxyAddress) + .skipUnavailable(initialSkipUnavailable) + .build(); // Verify we can get the linked projects on startup. assertThat(service.getInitialLinkedProjectConfigs(), equalTo(List.of(config))); @@ -92,48 +76,23 @@ public void testListenersReceiveUpdates() { clusterSettings.applySettings(initialSettings); for (int i = 0; i < numListeners; ++i) { assertThat(listeners.get(i).updatedConfig, sameInstance(null)); - assertThat(listeners.get(i).skipUnavailableUpdate, sameInstance(null)); listeners.get(i).reset(); } - // Change the skip_unavailable, leave the other settings alone, we should get the skip_unavailable update only. - var expectedSkipUnavailableUpdate = new SkipUnavailableUpdate( - config.originProjectId(), - config.linkedProjectId(), - config.linkedProjectAlias(), - config.skipUnavailable() == false - ); - clusterSettings.applySettings( - Settings.builder() - .put(initialSettings) - .put("cluster.remote." + alias + ".skip_unavailable", expectedSkipUnavailableUpdate.skipUnavailable) - .build() - ); - for (int i = 0; i < numListeners; ++i) { - assertThat(listeners.get(i).updatedConfig, sameInstance(null)); - assertThat(listeners.get(i).skipUnavailableUpdate, equalTo(expectedSkipUnavailableUpdate)); - listeners.get(i).reset(); - } - - // Change the proxy address, and set skip_unavailable back to original value, we should get both updates. - expectedSkipUnavailableUpdate = new SkipUnavailableUpdate( - config.originProjectId(), - config.linkedProjectId(), - config.linkedProjectAlias(), - config.skipUnavailable() - ); + // Change the proxy address and skip_unavailable values. final var newProxyAddress = "localhost:9401"; + final var newSkipUnavailable = config.skipUnavailable() == false; clusterSettings.applySettings( Settings.builder() .put(initialSettings) .put("cluster.remote." + alias + ".proxy_address", newProxyAddress) - .put("cluster.remote." + alias + ".skip_unavailable", expectedSkipUnavailableUpdate.skipUnavailable) + .put("cluster.remote." + alias + ".skip_unavailable", newSkipUnavailable) .build() ); for (int i = 0; i < numListeners; ++i) { assertNotNull("expected non-null updatedConfig for listener " + i, listeners.get(i).updatedConfig); assertThat(listeners.get(i).updatedConfig.proxyAddress(), equalTo(newProxyAddress)); - assertThat(listeners.get(i).skipUnavailableUpdate, equalTo(expectedSkipUnavailableUpdate)); + assertThat(listeners.get(i).updatedConfig.skipUnavailable(), equalTo(newSkipUnavailable)); } } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 6a4ad6098b5c2..2cd195cb4a8ff 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.Consumer; import static org.elasticsearch.test.MockLog.assertThatLogger; import static org.elasticsearch.test.NodeRoles.masterOnlyNode; @@ -112,6 +113,10 @@ private MockTransportService startTransport( return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, transportVersion, threadPool, settings); } + private MockTransportService startTransport(final String id) { + return startTransport(id, List.of(), VersionInformation.CURRENT, TransportVersion.current(), Settings.EMPTY); + } + public void testSettingsAreRegistered() { assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterSettings.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); @@ -1877,6 +1882,61 @@ public void testLogsConnectionResult() throws IOException { } } + public void testSetSkipUnavailable() throws IOException { + final var skipUnavailableProperty = RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace("remote") + .getKey(); + final var seedNodeProperty = SniffConnectionStrategySettings.REMOTE_CLUSTER_SEEDS.getConcreteSettingForNamespace("remote").getKey(); + final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(); + + try ( + var remote1Transport = startTransport("remote1"); + var remote2Transport = startTransport("remote2"); + var local = startTransport("local"); + var remoteClusterService = createRemoteClusterService(Settings.EMPTY, clusterSettings, local) + ) { + linkedProjectConfigService.register(remoteClusterService); + + record SkipUnavailableTestConfig( + boolean skipUnavailable, + MockTransportService seedNodeTransportService, + boolean isNewConnection, + boolean rebuildExpected + ) {} + + final Consumer applySettingsAndVerify = (cfg) -> { + final var currentConnection = cfg.isNewConnection ? null : remoteClusterService.getRemoteClusterConnection("remote"); + clusterSettings.applySettings( + Settings.builder() + .put(skipUnavailableProperty, cfg.skipUnavailable) + .put(seedNodeProperty, cfg.seedNodeTransportService.getLocalNode().getAddress().toString()) + .build() + ); + assertTrue(isRemoteClusterRegistered(remoteClusterService, "remote")); + assertEquals(cfg.skipUnavailable, remoteClusterService.isSkipUnavailable("remote").orElseThrow()); + if (cfg.rebuildExpected) { + assertNotSame(currentConnection, remoteClusterService.getRemoteClusterConnection("remote")); + } else if (cfg.isNewConnection == false) { + assertSame(currentConnection, remoteClusterService.getRemoteClusterConnection("remote")); + } + }; + + // Apply the initial settings and verify the new connection is built. + var skipUnavailable = randomBoolean(); + applySettingsAndVerify.accept(new SkipUnavailableTestConfig(skipUnavailable, remote1Transport, true, true)); + + // Change skip_unavailable value, but not seed node, connection should not be rebuilt, but skip_unavailable should be modified. + skipUnavailable = skipUnavailable == false; + applySettingsAndVerify.accept(new SkipUnavailableTestConfig(skipUnavailable, remote1Transport, false, false)); + + // Change the seed node but not skip_unavailable, connection should be rebuilt and skip_unavailable should stay the same. + applySettingsAndVerify.accept(new SkipUnavailableTestConfig(skipUnavailable, remote2Transport, false, true)); + + // Change skip_unavailable value and the seed node, connection should be rebuilt and skip_unavailable should also be modified. + skipUnavailable = skipUnavailable == false; + applySettingsAndVerify.accept(new SkipUnavailableTestConfig(skipUnavailable, remote1Transport, false, true)); + } + } + @FixForMultiProject(description = "Refactor to add the linked project ID associated with the alias.") private LinkedProjectConfig buildLinkedProjectConfig(String alias, Settings staticSettings, Settings newSettings) { final var mergedSettings = Settings.builder().put(staticSettings, false).put(newSettings, false).build();