diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CpsDoesNotUseSkipUnavailableIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CpsDoesNotUseSkipUnavailableIT.java new file mode 100644 index 0000000000000..5345975d43417 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/ccs/CpsDoesNotUseSkipUnavailableIT.java @@ -0,0 +1,134 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.search.ccs; + +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.TransportSearchAction; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.RemoteTransportException; +import org.hamcrest.Matchers; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; + +// TODO: Move this test to the Serverless repo once the IT framework is ready there. +public class CpsDoesNotUseSkipUnavailableIT extends AbstractMultiClustersTestCase { + private static final String LINKED_CLUSTER_1 = "cluster-a"; + + public static class CpsPlugin extends Plugin implements ClusterPlugin { + @Override + public List> getSettings() { + return List.of(CpsEnableSetting); + } + } + + private static final Setting CpsEnableSetting = Setting.simpleString( + "serverless.cross_project.enabled", + Setting.Property.NodeScope + ); + + @Override + protected List remoteClusterAlias() { + return List.of(LINKED_CLUSTER_1); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CpsPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build(); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + // Setting skip_unavailable=false results in a fatal error when the linked cluster is not available. + return Map.of(LINKED_CLUSTER_1, false); + } + + public void testCpsShouldNotUseSkipUnavailable() throws Exception { + // Add some dummy data to prove we are communicating fine with the remote. + assertAcked(client(LINKED_CLUSTER_1).admin().indices().prepareCreate("test-index")); + client(LINKED_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get(); + client(LINKED_CLUSTER_1).admin().indices().prepareRefresh("test-index").get(); + + // Shut down the linked cluster we'd be targeting in the search. + try { + cluster(LINKED_CLUSTER_1).close(); + } catch (Exception e) { + throw new AssertionError(e); + } + + /* + * Under normal circumstances, we should get a fatal error for when skip_unavailable=false for a linked cluster + * and that cluster is targeted in a search op. However, in CPS environment, setting allow_partial_search_results=true + * should not result in a fatal error. + */ + { + var searchRequest = getSearchRequest(true); + searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); + assertResponse(client().execute(TransportSearchAction.TYPE, searchRequest), result -> { + var originCluster = result.getClusters().getCluster(LOCAL_CLUSTER); + assertThat(originCluster.getStatus(), Matchers.is(SearchResponse.Cluster.Status.SUCCESSFUL)); + + var linkedCluster = result.getClusters().getCluster(LINKED_CLUSTER_1); + assertThat(linkedCluster.getStatus(), Matchers.is(SearchResponse.Cluster.Status.SKIPPED)); + + var linkedClusterFailures = result.getClusters().getCluster(LINKED_CLUSTER_1).getFailures(); + assertThat(linkedClusterFailures.size(), Matchers.is(1)); + // Failure is something along the lines of shard failure and is caused by a connection error. + assertThat( + linkedClusterFailures.getFirst().getCause(), + Matchers.anyOf( + Matchers.instanceOf(RemoteTransportException.class), + Matchers.instanceOf(ConnectTransportException.class) + ) + ); + }); + } + + /* + * Previously, we did not get a fatal error even when skip_unavailable=false for the linked cluster. + * Now, we disable partial results and expect a fatal error. This proves that in CPS environment, + * search uses allow_partial_search_results and not skip_unavailable. + */ + { + var searchRequest = getSearchRequest(false); + searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); + var ae = expectThrows(AssertionError.class, () -> safeGet(client().execute(TransportSearchAction.TYPE, searchRequest))); + assertThat(ae.getCause(), Matchers.instanceOf(ExecutionException.class)); + assertThat( + ae.getCause().getCause(), + Matchers.anyOf(Matchers.instanceOf(RemoteTransportException.class), Matchers.instanceOf(ConnectTransportException.class)) + ); + } + } + + private SearchRequest getSearchRequest(boolean allowPartialResults) { + // Include both origin and linked cluster in the search op. + var searchRequest = new SearchRequest("*", "*:*"); + searchRequest.allowPartialSearchResults(allowPartialResults); + return searchRequest; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveClusterAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveClusterAction.java index 0e0406cc8e74c..7eb3670dccb53 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveClusterAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/resolve/TransportResolveClusterAction.java @@ -174,7 +174,7 @@ protected void doExecuteForked(Task task, ResolveClusterActionRequest request, A resolveClusterTask.ensureNotCancelled(); String clusterAlias = remoteIndices.getKey(); OriginalIndices originalIndices = remoteIndices.getValue(); - boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); + boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true); RemoteClusterClient remoteClusterClient = remoteClusterService.getRemoteClusterClient( clusterAlias, searchCoordinationExecutor, diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index dde4b81af94d2..9069e7439ad7a 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -332,7 +332,7 @@ private void doExecuteForked( ); boolean ensureConnected = forceConnectTimeoutSecs != null - || transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false; + || transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias).orElse(true) == false; transportService.getRemoteClusterService() .maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 1f676f29e446e..7b82116e5b447 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -508,14 +508,16 @@ public static final class Clusters implements ToXContentFragment, Writeable { * @param localIndices The localIndices to be searched - null if no local indices are to be searched * @param remoteClusterIndices mapping of clusterAlias -> OriginalIndices for each remote cluster * @param ccsMinimizeRoundtrips whether minimizing roundtrips for the CCS - * @param skipUnavailablePredicate given a cluster alias, returns true if that cluster is skip_unavailable=true - * and false otherwise + * @param skipOnFailurePredicate given a cluster alias, returns true if that cluster is marked as skippable + * and false otherwise. For a cluster to be considered as skippable, either + * we should be in CPS environment and allow_partial_results=true, or, + * skip_unavailable=true. */ public Clusters( @Nullable OriginalIndices localIndices, Map remoteClusterIndices, boolean ccsMinimizeRoundtrips, - Predicate skipUnavailablePredicate + Predicate skipOnFailurePredicate ) { assert remoteClusterIndices.size() > 0 : "At least one remote cluster must be passed into this Cluster constructor"; this.total = remoteClusterIndices.size() + (localIndices == null ? 0 : 1); @@ -531,8 +533,8 @@ public Clusters( } for (Map.Entry remote : remoteClusterIndices.entrySet()) { String clusterAlias = remote.getKey(); - boolean skipUnavailable = skipUnavailablePredicate.test(clusterAlias); - Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipUnavailable); + boolean skipOnFailure = skipOnFailurePredicate.test(clusterAlias); + Cluster c = new Cluster(clusterAlias, String.join(",", remote.getValue().indices()), skipOnFailure); m.put(clusterAlias, c); } this.clusterInfo = m; diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index bf85075781bc8..0233597033180 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -422,7 +422,7 @@ void executeRequest( resolvedIndices.getLocalIndices(), resolvedIndices.getRemoteClusterIndices(), true, - remoteClusterService::isSkipUnavailable + (clusterAlias) -> remoteClusterService.shouldSkipOnFailure(clusterAlias, rewritten.allowPartialSearchResults()) ); if (resolvedIndices.getLocalIndices() == null) { // Notify the progress listener that a CCS with minimize_roundtrips is happening remote-only (no local shards) @@ -458,7 +458,7 @@ void executeRequest( resolvedIndices.getLocalIndices(), resolvedIndices.getRemoteClusterIndices(), false, - remoteClusterService::isSkipUnavailable + (clusterAlias) -> remoteClusterService.shouldSkipOnFailure(clusterAlias, rewritten.allowPartialSearchResults()) ); // TODO: pass parentTaskId @@ -697,7 +697,7 @@ static void ccsRemoteReduce( // and we directly perform final reduction in the remote cluster Map.Entry entry = resolvedIndices.getRemoteClusterIndices().entrySet().iterator().next(); String clusterAlias = entry.getKey(); - boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); + boolean shouldSkipOnFailure = remoteClusterService.shouldSkipOnFailure(clusterAlias, searchRequest.allowPartialSearchResults()); OriginalIndices indices = entry.getValue(); SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest( parentTaskId, @@ -713,7 +713,7 @@ static void ccsRemoteReduce( @Override public void onResponse(SearchResponse searchResponse) { // overwrite the existing cluster entry with the updated one - ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, skipUnavailable); + ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, shouldSkipOnFailure); Map profileResults = searchResponse.getProfileResults(); SearchProfileResults profile = profileResults == null || profileResults.isEmpty() ? null @@ -744,9 +744,9 @@ public void onResponse(SearchResponse searchResponse) { @Override public void onFailure(Exception e) { ShardSearchFailure failure = new ShardSearchFailure(e); - logCCSError(failure, clusterAlias, skipUnavailable); - ccsClusterInfoUpdate(failure, clusters, clusterAlias, skipUnavailable); - if (skipUnavailable) { + logCCSError(failure, clusterAlias, shouldSkipOnFailure); + ccsClusterInfoUpdate(failure, clusters, clusterAlias, shouldSkipOnFailure); + if (shouldSkipOnFailure) { ActionListener.respondAndRelease(listener, SearchResponse.empty(timeProvider::buildTookInMillis, clusters)); } else { listener.onFailure(wrapRemoteClusterFailure(clusterAlias, e)); @@ -768,7 +768,7 @@ public void onFailure(Exception e) { remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable), + shouldEstablishConnection(forceConnectTimeoutSecs, shouldSkipOnFailure), connectionListener ); } else { @@ -785,7 +785,10 @@ public void onFailure(Exception e) { final CountDown countDown = new CountDown(totalClusters); for (Map.Entry entry : resolvedIndices.getRemoteClusterIndices().entrySet()) { String clusterAlias = entry.getKey(); - boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); + boolean shouldSkipOnFailure = remoteClusterService.shouldSkipOnFailure( + clusterAlias, + searchRequest.allowPartialSearchResults() + ); OriginalIndices indices = entry.getValue(); SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest( parentTaskId, @@ -797,7 +800,7 @@ public void onFailure(Exception e) { ); ActionListener ccsListener = createCCSListener( clusterAlias, - skipUnavailable, + shouldSkipOnFailure, countDown, exceptions, searchResponseMerger, @@ -826,7 +829,7 @@ public void onFailure(Exception e) { remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable), + shouldEstablishConnection(forceConnectTimeoutSecs, shouldSkipOnFailure), connectionListener ); } @@ -903,10 +906,10 @@ static void collectSearchShards( final AtomicReference exceptions = new AtomicReference<>(); for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { final String clusterAlias = entry.getKey(); - boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias); + boolean shouldSkipOnFailure = remoteClusterService.shouldSkipOnFailure(clusterAlias, allowPartialResults); CCSActionListener> singleListener = new CCSActionListener<>( clusterAlias, - skipUnavailable, + shouldSkipOnFailure, responsesCountDown, exceptions, clusters, @@ -975,7 +978,7 @@ Map createFinalResponse() { remoteClusterService.maybeEnsureConnectedAndGetConnection( clusterAlias, - shouldEstablishConnection(forceConnectTimeoutSecs, skipUnavailable), + shouldEstablishConnection(forceConnectTimeoutSecs, shouldSkipOnFailure), connectionListener ); } @@ -986,7 +989,7 @@ Map createFinalResponse() { */ private static ActionListener createCCSListener( String clusterAlias, - boolean skipUnavailable, + boolean shouldSkipOnFailure, CountDown countDown, AtomicReference exceptions, SearchResponseMerger searchResponseMerger, @@ -996,7 +999,7 @@ private static ActionListener createCCSListener( ) { return new CCSActionListener<>( clusterAlias, - skipUnavailable, + shouldSkipOnFailure, countDown, exceptions, clusters, @@ -1004,7 +1007,7 @@ private static ActionListener createCCSListener( ) { @Override void innerOnResponse(SearchResponse searchResponse) { - ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, skipUnavailable); + ccsClusterInfoUpdate(searchResponse, clusters, clusterAlias, shouldSkipOnFailure); searchResponseMerger.add(searchResponse); progressListener.notifyClusterResponseMinimizeRoundtrips(clusterAlias, searchResponse); } @@ -1022,18 +1025,18 @@ protected void releaseResponse(SearchResponse searchResponse) { } /** - * Creates a new Cluster object using the {@link ShardSearchFailure} info and skip_unavailable + * Creates a new Cluster object using the {@link ShardSearchFailure} info and shouldSkipOnFailure * flag to set Status. Then it swaps it in the clusters CHM at key clusterAlias */ static void ccsClusterInfoUpdate( ShardSearchFailure failure, SearchResponse.Clusters clusters, String clusterAlias, - boolean skipUnavailable + boolean shouldSkipOnFailure ) { clusters.swapCluster(clusterAlias, (k, v) -> { SearchResponse.Cluster.Status status; - if (skipUnavailable) { + if (shouldSkipOnFailure) { status = SearchResponse.Cluster.Status.SKIPPED; } else { status = SearchResponse.Cluster.Status.FAILED; @@ -1056,12 +1059,12 @@ private static void ccsClusterInfoUpdate( SearchResponse searchResponse, SearchResponse.Clusters clusters, String clusterAlias, - boolean skipUnavailable + boolean shouldSkipOnFailure ) { /* * Cluster Status logic: - * 1) FAILED if total_shards > 0 && all shards failed && skip_unavailable=false - * 2) SKIPPED if total_shards > 0 && all shards failed && skip_unavailable=true + * 1) FAILED if total_shards > 0 && all shards failed && shouldSkipOnFailure=false + * 2) SKIPPED if total_shards > 0 && all shards failed && shouldSkipOnFailure=true * 3) PARTIAL if it timed out * 4) PARTIAL if it at least one of the shards succeeded but not all * 5) SUCCESSFUL if no shards failed (and did not time out) @@ -1070,7 +1073,7 @@ private static void ccsClusterInfoUpdate( SearchResponse.Cluster.Status status; int totalShards = searchResponse.getTotalShards(); if (totalShards > 0 && searchResponse.getFailedShards() >= totalShards) { - if (skipUnavailable) { + if (shouldSkipOnFailure) { status = SearchResponse.Cluster.Status.SKIPPED; } else { status = SearchResponse.Cluster.Status.FAILED; @@ -1762,7 +1765,7 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int */ abstract static class CCSActionListener implements ActionListener { protected final String clusterAlias; - protected final boolean skipUnavailable; + protected final boolean skipOnFailure; private final CountDown countDown; private final AtomicReference exceptions; protected final SearchResponse.Clusters clusters; @@ -1773,14 +1776,14 @@ abstract static class CCSActionListener implements Acti */ CCSActionListener( String clusterAlias, - boolean skipUnavailable, + boolean skipOnFailure, CountDown countDown, AtomicReference exceptions, SearchResponse.Clusters clusters, ActionListener originalListener ) { this.clusterAlias = clusterAlias; - this.skipUnavailable = skipUnavailable; + this.skipOnFailure = skipOnFailure; this.countDown = countDown; this.exceptions = exceptions; this.clusters = clusters; @@ -1801,9 +1804,9 @@ public final void onResponse(Response response) { @Override public final void onFailure(Exception e) { ShardSearchFailure f = new ShardSearchFailure(e); - logCCSError(f, clusterAlias, skipUnavailable); + logCCSError(f, clusterAlias, skipOnFailure); SearchResponse.Cluster cluster = clusters.getCluster(clusterAlias); - if (skipUnavailable && ExceptionsHelper.isTaskCancelledException(e) == false) { + if (skipOnFailure && ExceptionsHelper.isTaskCancelledException(e) == false) { if (cluster != null) { ccsClusterInfoUpdate(f, clusters, clusterAlias, true); } @@ -1859,9 +1862,9 @@ protected void releaseResponse(FinalResponse response) {} * causes of shard failures. * @param f ShardSearchFailure to log * @param clusterAlias cluster on which the failure occurred - * @param skipUnavailable the skip_unavailable setting of the cluster with the search error + * @param shouldSkipOnFailure the shouldSkipOnFailure setting of the cluster with the search error */ - private static void logCCSError(ShardSearchFailure f, String clusterAlias, boolean skipUnavailable) { + private static void logCCSError(ShardSearchFailure f, String clusterAlias, boolean shouldSkipOnFailure) { String errorInfo; try { errorInfo = Strings.toString(f.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); @@ -1870,9 +1873,9 @@ private static void logCCSError(ShardSearchFailure f, String clusterAlias, boole errorInfo = f.toString(); } logger.debug( - "CCS remote cluster failure. Cluster [{}]. skip_unavailable: [{}]. Error: {}", + "CCS remote cluster failure. Cluster [{}]. shouldSkipOnFailure: [{}]. Error: {}", clusterAlias, - skipUnavailable, + shouldSkipOnFailure, errorInfo ); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index a9698fd9862bb..028f601f05ad7 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -82,6 +83,7 @@ public boolean isRemoteClusterServerEnabled() { private final Map> remoteClusters; private final RemoteClusterCredentialsManager remoteClusterCredentialsManager; private final ProjectResolver projectResolver; + private final boolean canUseSkipUnavailable; @FixForMultiProject(description = "Inject the ProjectResolver instance.") RemoteClusterService(Settings settings, TransportService transportService) { @@ -99,6 +101,11 @@ public boolean isRemoteClusterServerEnabled() { if (remoteClusterServerEnabled) { registerRemoteClusterHandshakeRequestHandler(transportService); } + /* + * TODO: This is not the right way to check if we're in CPS context and is more of a temporary measure since + * the functionality to do it the right way is not yet ready -- replace this code when it's ready. + */ + this.canUseSkipUnavailable = settings.getAsBoolean("serverless.cross_project.enabled", false) == false; } /** @@ -209,10 +216,28 @@ void ensureConnected(String clusterAlias, ActionListener listener) { } /** - * Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable + * Returns whether the cluster identified by the provided alias is configured to be skipped when unavailable. + * @param clusterAlias Name of the cluster + * @return A boolean optional that denotes if the cluster is configured to be skipped. In CPS-like environment, + * it returns an empty value where we default/fall back to true. + */ + public Optional isSkipUnavailable(String clusterAlias) { + if (canUseSkipUnavailable == false) { + return Optional.empty(); + } else { + return Optional.of(getRemoteClusterConnection(clusterAlias).isSkipUnavailable()); + } + } + + /** + * Signifies if an error can be skipped for the specified cluster based on skip_unavailable, or, + * allow_partial_search_results if in CPS-like environment. + * @param clusterAlias Name of the cluster + * @param allowPartialSearchResults If partial results can be served for the search request. + * @return boolean */ - public boolean isSkipUnavailable(String clusterAlias) { - return getRemoteClusterConnection(clusterAlias).isSkipUnavailable(); + public boolean shouldSkipOnFailure(String clusterAlias, Boolean allowPartialSearchResults) { + return isSkipUnavailable(clusterAlias).orElseGet(() -> allowPartialSearchResults != null && allowPartialSearchResults); } public Transport.Connection getConnection(String cluster) { @@ -583,7 +608,9 @@ public RemoteClusterClient getRemoteClusterClient( return new RemoteClusterAwareClient(transportService, clusterAlias, responseExecutor, switch (disconnectedStrategy) { case RECONNECT_IF_DISCONNECTED -> true; case FAIL_IF_DISCONNECTED -> false; - case RECONNECT_UNLESS_SKIP_UNAVAILABLE -> transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false; + case RECONNECT_UNLESS_SKIP_UNAVAILABLE -> transportService.getRemoteClusterService() + .isSkipUnavailable(clusterAlias) + .orElse(true) == false; }); } diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 29ed86492a455..32c3571895dd4 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -1373,15 +1373,15 @@ public void testSkipUnavailable() { service.start(); service.acceptIncomingRequests(); - assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1").orElse(true)); if (randomBoolean()) { updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", false); - assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + assertFalse(service.getRemoteClusterService().isSkipUnavailable("cluster1").orElse(true)); } updateSkipUnavailable(service.getRemoteClusterService(), "cluster1", true); - assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1")); + assertTrue(service.getRemoteClusterService().isSkipUnavailable("cluster1").orElse(true)); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index af008771f1068..f6a6b6065520f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -347,7 +347,7 @@ public void onFailure(Exception e) { } private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener lookupListener) { - if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster)) { + if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster).orElse(true)) { lookupListener.onResponse(new LookupResponse(e)); } else { lookupListener.onFailure(e); @@ -465,7 +465,7 @@ protected Map availablePolicies() { protected void getRemoteConnection(String cluster, ActionListener listener) { remoteClusterService.maybeEnsureConnectedAndGetConnection( cluster, - remoteClusterService.isSkipUnavailable(cluster) == false, + remoteClusterService.isSkipUnavailable(cluster).orElse(true) == false, listener ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index a23154c218a61..43fd7e8dc8077 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -333,7 +333,10 @@ private EsqlExecutionInfo getOrCreateExecutionInfo(Task task, EsqlQueryRequest r } private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) { - return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias), request.includeCCSMetadata()); + return new EsqlExecutionInfo( + clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true), + request.includeCCSMetadata() + ); } private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {