diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 419aa189e352c..f7d119fa9dc48 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -292,6 +292,8 @@ public void skipUnavailableChanged( String linkedProjectAlias, boolean skipUnavailable ) { + assert crossProjectEnabled == false + : "Cannot configure setting [" + RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getKey() + "] in CPS environments."; final var remote = getConnectionsMapForProject(originProjectId).get(linkedProjectAlias); if (remote != null) { remote.setSkipUnavailable(skipUnavailable); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java index 3c2b02e8533ce..30ac3bef9bffc 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterSettings.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.SecureSetting; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Setting; @@ -70,7 +71,7 @@ public class RemoteClusterSettings { (ns, key) -> boolSetting( key, DEFAULT_SKIP_UNAVAILABLE, - new RemoteConnectionEnabled<>(ns, key), + new UnsupportedInCPSValidator<>(ns, key), Setting.Property.Dynamic, Setting.Property.NodeScope ) @@ -390,6 +391,10 @@ private RemoteConnectionEnabled(String clusterAlias, String key) { this.key = key; } + protected String getKey() { + return key; + } + @Override public void validate(T value) {} @@ -483,4 +488,25 @@ public Iterator> settings() { return settingStream.iterator(); } } + + private static class UnsupportedInCPSValidator extends RemoteConnectionEnabled { + private final Setting cpsSetting = Setting.boolSetting("serverless.cross_project.enabled", false); + + private UnsupportedInCPSValidator(String clusterAlias, String key) { + super(clusterAlias, key); + } + + @Override + public void validate(T value, Map, Object> settings, boolean isPresent) { + if (isPresent && (Boolean) settings.get(cpsSetting)) { + throw new IllegalArgumentException("setting [" + getKey() + "] is unavailable when CPS is enabled"); + } + super.validate(value, settings, isPresent); + } + + @Override + public Iterator> settings() { + return Iterators.concat(super.settings(), Iterators.single(cpsSetting)); + } + } } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java index 5b5788bb66397..786fe22e49f79 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterSettingsTests.java @@ -22,8 +22,10 @@ import static org.elasticsearch.test.NodeRoles.nonRemoteClusterClientNode; import static org.elasticsearch.test.NodeRoles.remoteClusterClientNode; +import static org.elasticsearch.transport.RemoteClusterSettings.ProxyConnectionStrategySettings.PROXY_ADDRESS; import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CLUSTER_CREDENTIALS; import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CLUSTER_SKIP_UNAVAILABLE; +import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_CONNECTION_MODE; import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING; import static org.elasticsearch.transport.RemoteClusterSettings.REMOTE_NODE_ATTRIBUTE; import static org.elasticsearch.transport.RemoteClusterSettings.SniffConnectionStrategySettings.REMOTE_CLUSTERS_PROXY; @@ -95,4 +97,51 @@ public void testProxyDefault() { final String alias = randomAlphaOfLength(8); assertThat(REMOTE_CLUSTERS_PROXY.getConcreteSettingForNamespace(alias).get(Settings.EMPTY), equalTo("")); } + + public void testSkipUnavailableAlwaysTrueIfCPSEnabled() { + final var alias = randomAlphaOfLength(8); + final var skipUnavailableSetting = REMOTE_CLUSTER_SKIP_UNAVAILABLE.getConcreteSettingForNamespace(alias); + final var modeSetting = REMOTE_CONNECTION_MODE.getConcreteSettingForNamespace(alias); + final var proxyAddressSetting = PROXY_ADDRESS.getConcreteSettingForNamespace(alias); + final var cpsEnabledSettings = Settings.builder().put("serverless.cross_project.enabled", true).build(); + final var proxyEnabledSettings = Settings.builder() + .put(modeSetting.getKey(), RemoteConnectionStrategy.ConnectionStrategy.PROXY.toString()) + .put(proxyAddressSetting.getKey(), "localhost:9400") + .build(); + + // Ensure the validator still throws in non-CPS environment if a connection mode is not set. + var exception = expectThrows( + IllegalArgumentException.class, + () -> skipUnavailableSetting.get(Settings.builder().put(skipUnavailableSetting.getKey(), true).build()) + ); + assertThat( + exception.getMessage(), + equalTo("Cannot configure setting [" + skipUnavailableSetting.getKey() + "] if remote cluster is not enabled.") + ); + + // Ensure we can still get the set value in non-CPS environment. + final var randomSkipUnavailableSettingValue = randomBoolean(); + assertThat( + skipUnavailableSetting.get( + Settings.builder().put(proxyEnabledSettings).put(skipUnavailableSetting.getKey(), randomSkipUnavailableSettingValue).build() + ), + equalTo(randomSkipUnavailableSettingValue) + ); + + // Check the validator rejects the skip_unavailable setting if present when CPS is enabled. + exception = expectThrows( + IllegalArgumentException.class, + () -> skipUnavailableSetting.get( + Settings.builder() + .put(cpsEnabledSettings) + .put(proxyEnabledSettings) + .put(skipUnavailableSetting.getKey(), randomBoolean()) + .build() + ) + ); + assertThat(exception.getMessage(), equalTo("setting [" + skipUnavailableSetting.getKey() + "] is unavailable when CPS is enabled")); + + // Should not throw if the setting is not present, returning the expected default value of true. + assertTrue(skipUnavailableSetting.get(Settings.builder().put(cpsEnabledSettings).put(proxyEnabledSettings).build())); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlCpsDoesNotUseSkipUnavailableIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlCpsDoesNotUseSkipUnavailableIT.java deleted file mode 100644 index 42fd91ea1811a..0000000000000 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlCpsDoesNotUseSkipUnavailableIT.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.action; - -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.CollectionUtils; -import org.elasticsearch.plugins.ClusterPlugin; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.transport.ConnectTransportException; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; - -// TODO: Move this test to the Serverless repo once the IT framework is ready there. -public class EsqlCpsDoesNotUseSkipUnavailableIT extends AbstractCrossClusterTestCase { - - public static class CpsPlugin extends Plugin implements ClusterPlugin { - @Override - public List> getSettings() { - return List.of(CpsEnableSetting); - } - } - - private static final Setting CpsEnableSetting = Setting.simpleString( - "serverless.cross_project.enabled", - Setting.Property.NodeScope - ); - - @Override - protected List remoteClusterAlias() { - return List.of(REMOTE_CLUSTER_1); - } - - @Override - protected Collection> nodePlugins(String clusterAlias) { - return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CpsPlugin.class); - } - - @Override - protected Settings nodeSettings() { - return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build(); - } - - @Override - protected Map skipUnavailableForRemoteClusters() { - // Setting skip_unavailable=false results in a fatal error when the linked cluster is not available. - return Map.of(REMOTE_CLUSTER_1, false); - } - - public void testCpsShouldNotUseSkipUnavailable() throws Exception { - // Add some dummy data to prove we are communicating fine with the remote. - assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index")); - client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get(); - client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get(); - - // Shut down the linked cluster we'd be targeting in the search. - try { - cluster(REMOTE_CLUSTER_1).close(); - } catch (Exception e) { - throw new AssertionError(e); - } - - /* - * Under normal circumstances, we should get a fatal error for when skip_unavailable=false for a linked cluster - * and that cluster is targeted in a search op. However, in CPS environment, setting allow_partial_search_results=true - * should not result in a fatal error. - */ - - EsqlQueryRequest request = new EsqlQueryRequest(); - request.query("FROM *,*:* | limit 10"); - request.allowPartialResults(true); - try (EsqlQueryResponse response = runQuery(request)) { - assertThat(response.isPartial(), is(true)); - EsqlExecutionInfo info = response.getExecutionInfo(); - assertThat(info.getCluster(REMOTE_CLUSTER_1).getStatus(), is(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - } - - request = new EsqlQueryRequest().query("FROM *,*:* | limit 10"); - try (EsqlQueryResponse response = runQuery(request)) { - fail("a fatal error should be thrown since allow_partial_results=false"); - } catch (Exception e) { - assertThat(e, instanceOf(ConnectTransportException.class)); - } - - } - -}