From db7f2d64b06476036f1c304967ecefe4c68b6143 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Thu, 25 Sep 2025 10:55:41 +0200 Subject: [PATCH 1/3] ES|QL: do not use skip_unavailable in CPS --- .../EsqlCpsDoesNotUseSkipUnavailableIT.java | 98 +++++++++++++++++++ .../esql/plugin/TransportEsqlQueryAction.java | 6 +- 2 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlCpsDoesNotUseSkipUnavailableIT.java diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlCpsDoesNotUseSkipUnavailableIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlCpsDoesNotUseSkipUnavailableIT.java new file mode 100644 index 0000000000000..42fd91ea1811a --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlCpsDoesNotUseSkipUnavailableIT.java @@ -0,0 +1,98 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.ConnectTransportException; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; + +// TODO: Move this test to the Serverless repo once the IT framework is ready there. +public class EsqlCpsDoesNotUseSkipUnavailableIT extends AbstractCrossClusterTestCase { + + public static class CpsPlugin extends Plugin implements ClusterPlugin { + @Override + public List> getSettings() { + return List.of(CpsEnableSetting); + } + } + + private static final Setting CpsEnableSetting = Setting.simpleString( + "serverless.cross_project.enabled", + Setting.Property.NodeScope + ); + + @Override + protected List remoteClusterAlias() { + return List.of(REMOTE_CLUSTER_1); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), CpsPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder().put(super.nodeSettings()).put("serverless.cross_project.enabled", "true").build(); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + // Setting skip_unavailable=false results in a fatal error when the linked cluster is not available. + return Map.of(REMOTE_CLUSTER_1, false); + } + + public void testCpsShouldNotUseSkipUnavailable() throws Exception { + // Add some dummy data to prove we are communicating fine with the remote. + assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index")); + client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get(); + client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get(); + + // Shut down the linked cluster we'd be targeting in the search. + try { + cluster(REMOTE_CLUSTER_1).close(); + } catch (Exception e) { + throw new AssertionError(e); + } + + /* + * Under normal circumstances, we should get a fatal error for when skip_unavailable=false for a linked cluster + * and that cluster is targeted in a search op. However, in CPS environment, setting allow_partial_search_results=true + * should not result in a fatal error. + */ + + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM *,*:* | limit 10"); + request.allowPartialResults(true); + try (EsqlQueryResponse response = runQuery(request)) { + assertThat(response.isPartial(), is(true)); + EsqlExecutionInfo info = response.getExecutionInfo(); + assertThat(info.getCluster(REMOTE_CLUSTER_1).getStatus(), is(EsqlExecutionInfo.Cluster.Status.SKIPPED)); + } + + request = new EsqlQueryRequest().query("FROM *,*:* | limit 10"); + try (EsqlQueryResponse response = runQuery(request)) { + fail("a fatal error should be thrown since allow_partial_results=false"); + } catch (Exception e) { + assertThat(e, instanceOf(ConnectTransportException.class)); + } + + } + +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java index 82768bd4b0c15..df7393349eff9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java @@ -376,7 +376,11 @@ private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) { // include_ccs_metadata is considered only if include_execution_metadata is not set includeCcsMetadata = Boolean.TRUE.equals(request.includeCCSMetadata()); } - return new EsqlExecutionInfo(clusterAlias -> remoteClusterService.isSkipUnavailable(clusterAlias).orElse(true), includeCcsMetadata); + Boolean allowPartialResults = request.allowPartialResults() != null ? request.allowPartialResults() : defaultAllowPartialResults; + return new EsqlExecutionInfo( + clusterAlias -> remoteClusterService.shouldSkipOnFailure(clusterAlias, allowPartialResults), + includeCcsMetadata + ); } private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) { From 0d3a7531f705ac6043d57550da94f536499c68ae Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Fri, 26 Sep 2025 15:39:27 +0200 Subject: [PATCH 2/3] enrich --- .../esql/enrich/EnrichPolicyResolver.java | 26 ++++++++++--------- .../enrich/EnrichPolicyResolverTests.java | 6 ++++- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 49275f90193a1..959503d5d6cc1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -141,7 +141,7 @@ protected void doResolvePolicies( } final boolean includeLocal = remoteClusters.isEmpty() || remoteClusters.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, listener.map(lookupResponses -> { + lookupPolicies(remoteClusters, includeLocal, unresolvedPolicies, executionInfo, listener.map(lookupResponses -> { final EnrichResolution enrichResolution = new EnrichResolution(); final Map lookupResponsesToProcess = new HashMap<>(); for (Map.Entry entry : lookupResponses.entrySet()) { @@ -304,6 +304,7 @@ private void lookupPolicies( Collection remoteClusters, boolean includeLocal, Collection unresolvedPolicies, + EsqlExecutionInfo executionInfo, ActionListener> listener ) { final Map lookupResponses = ConcurrentCollections.newConcurrentMap(); @@ -316,7 +317,7 @@ private void lookupPolicies( if (remotePolicies.isEmpty() == false) { for (String cluster : remoteClusters) { ActionListener lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp)); - getRemoteConnection(cluster, new ActionListener() { + getRemoteConnection(cluster, executionInfo, new ActionListener() { @Override public void onResponse(Transport.Connection connection) { transportService.sendRequest( @@ -325,7 +326,7 @@ public void onResponse(Transport.Connection connection) { new LookupRequest(cluster, remotePolicies), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>( - lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, l)), + lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, executionInfo, l)), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH) ) @@ -334,7 +335,7 @@ public void onResponse(Transport.Connection connection) { @Override public void onFailure(Exception e) { - failIfSkipUnavailableFalse(e, cluster, lookupListener); + failIfSkipUnavailableFalse(e, cluster, executionInfo, lookupListener); } }); } @@ -359,8 +360,13 @@ public void onFailure(Exception e) { } } - private void failIfSkipUnavailableFalse(Exception e, String cluster, ActionListener lookupListener) { - if (ExceptionsHelper.isRemoteUnavailableException(e) && remoteClusterService.isSkipUnavailable(cluster).orElse(true)) { + private void failIfSkipUnavailableFalse( + Exception e, + String cluster, + EsqlExecutionInfo executionInfo, + ActionListener lookupListener + ) { + if (ExceptionsHelper.isRemoteUnavailableException(e) && executionInfo.shouldSkipOnFailure(cluster)) { lookupListener.onResponse(new LookupResponse(e)); } else { lookupListener.onFailure(e); @@ -471,11 +477,7 @@ protected Map availablePolicies() { return projectResolver.getProjectMetadata(clusterService.state()).custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY).getPolicies(); } - protected void getRemoteConnection(String cluster, ActionListener listener) { - remoteClusterService.maybeEnsureConnectedAndGetConnection( - cluster, - remoteClusterService.isSkipUnavailable(cluster).orElse(true) == false, - listener - ); + protected void getRemoteConnection(String cluster, EsqlExecutionInfo executionInfo, ActionListener listener) { + remoteClusterService.maybeEnsureConnectedAndGetConnection(cluster, executionInfo.shouldSkipOnFailure(cluster) == false, listener); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index c842d7beee6f5..ffd9ab3b7c13d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -458,7 +458,11 @@ EnrichResolution resolvePolicies(Collection clusters, Collection listener) { + protected void getRemoteConnection( + String remoteCluster, + EsqlExecutionInfo executionInfo, + ActionListener listener + ) { assertThat("Must only called on the local cluster", cluster, equalTo(LOCAL_CLUSTER_GROUP_KEY)); listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode())); } From d1c3c94fc80a470a9a8d5611d19d14958f812757 Mon Sep 17 00:00:00 2001 From: Luigi Dell'Aquila Date: Mon, 29 Sep 2025 10:12:06 +0200 Subject: [PATCH 3/3] simplify --- .../esql/enrich/EnrichPolicyResolver.java | 20 ++++++++----------- .../enrich/EnrichPolicyResolverTests.java | 6 +----- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index b4a4872ae3a8c..0744fd126999d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -316,8 +316,9 @@ private void lookupPolicies( // remote clusters if (remotePolicies.isEmpty() == false) { for (String cluster : remoteClusters) { + boolean skipOnFailure = executionInfo.shouldSkipOnFailure(cluster); ActionListener lookupListener = refs.acquire(resp -> lookupResponses.put(cluster, resp)); - getRemoteConnection(cluster, executionInfo, new ActionListener() { + getRemoteConnection(cluster, skipOnFailure == false, new ActionListener() { @Override public void onResponse(Transport.Connection connection) { transportService.sendRequest( @@ -326,7 +327,7 @@ public void onResponse(Transport.Connection connection) { new LookupRequest(cluster, remotePolicies), TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>( - lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, cluster, executionInfo, l)), + lookupListener.delegateResponse((l, e) -> failIfSkipUnavailableFalse(e, skipOnFailure, l)), LookupResponse::new, threadPool.executor(ThreadPool.Names.SEARCH) ) @@ -335,7 +336,7 @@ public void onResponse(Transport.Connection connection) { @Override public void onFailure(Exception e) { - failIfSkipUnavailableFalse(e, cluster, executionInfo, lookupListener); + failIfSkipUnavailableFalse(e, skipOnFailure, lookupListener); } }); } @@ -360,13 +361,8 @@ public void onFailure(Exception e) { } } - private void failIfSkipUnavailableFalse( - Exception e, - String cluster, - EsqlExecutionInfo executionInfo, - ActionListener lookupListener - ) { - if (ExceptionsHelper.isRemoteUnavailableException(e) && executionInfo.shouldSkipOnFailure(cluster)) { + private void failIfSkipUnavailableFalse(Exception e, boolean skipOnFailure, ActionListener lookupListener) { + if (ExceptionsHelper.isRemoteUnavailableException(e) && skipOnFailure) { lookupListener.onResponse(new LookupResponse(e)); } else { lookupListener.onFailure(e); @@ -486,7 +482,7 @@ protected Map availablePolicies() { return projectResolver.getProjectMetadata(clusterService.state()).custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY).getPolicies(); } - protected void getRemoteConnection(String cluster, EsqlExecutionInfo executionInfo, ActionListener listener) { - remoteClusterService.maybeEnsureConnectedAndGetConnection(cluster, executionInfo.shouldSkipOnFailure(cluster) == false, listener); + protected void getRemoteConnection(String cluster, boolean ensureConnected, ActionListener listener) { + remoteClusterService.maybeEnsureConnectedAndGetConnection(cluster, ensureConnected, listener); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index ffd9ab3b7c13d..a0c5c99f82a60 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -458,11 +458,7 @@ EnrichResolution resolvePolicies(Collection clusters, Collection listener - ) { + protected void getRemoteConnection(String remoteCluster, boolean ensureConnected, ActionListener listener) { assertThat("Must only called on the local cluster", cluster, equalTo(LOCAL_CLUSTER_GROUP_KEY)); listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode())); }