diff --git a/docs/changelog/116365.yaml b/docs/changelog/116365.yaml new file mode 100644 index 0000000000000..b799be4e86f30 --- /dev/null +++ b/docs/changelog/116365.yaml @@ -0,0 +1,5 @@ +pr: 116365 +summary: Ignore remote ES|QL execution failures when skip_unavailable=true +area: ES|QL +type: enhancement +issues: [] diff --git a/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java b/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java index 64c90826fda85..670191676726a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java +++ b/test/framework/src/main/java/org/elasticsearch/test/FailingFieldPlugin.java @@ -24,12 +24,14 @@ public class FailingFieldPlugin extends Plugin implements ScriptPlugin { + public static final String FAILING_FIELD_LANG = "failing_field"; + @Override public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { return new ScriptEngine() { @Override public String getType() { - return "failing_field"; + return FAILING_FIELD_LANG; } @Override diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java index 5f3f135810322..3cc5ae99e568a 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/Clusters.java @@ -31,6 +31,10 @@ public static ElasticsearchCluster remoteCluster() { } public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster) { + return localCluster(remoteCluster, true); + } + + public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteCluster, Boolean skipUnavailable) { return ElasticsearchCluster.local() .name(LOCAL_CLUSTER_NAME) .distribution(DistributionType.DEFAULT) @@ -40,6 +44,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust .setting("xpack.license.self_generated.type", "trial") .setting("node.roles", "[data,ingest,master,remote_cluster_client]") .setting("cluster.remote.remote_cluster.seeds", () -> "\"" + remoteCluster.getTransportEndpoint(0) + "\"") + .setting("cluster.remote.remote_cluster.skip_unavailable", skipUnavailable.toString()) .setting("cluster.remote.connections_per_cluster", "1") .shared(true) .setting("cluster.routing.rebalance.enable", "none") diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java index 55500aa1c9537..d88d630cb18a2 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationIT.java @@ -30,7 +30,7 @@ @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class EsqlRestValidationIT extends EsqlRestValidationTestCase { static ElasticsearchCluster remoteCluster = Clusters.remoteCluster(); - static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster); + static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false); @ClassRule public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster); diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnavailableIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnavailableIT.java new file mode 100644 index 0000000000000..eb2d722d0a482 --- /dev/null +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/EsqlRestValidationSkipUnavailableIT.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.ccq; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.elasticsearch.test.TestClustersThreadFilter; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.junit.ClassRule; +import org.junit.rules.RuleChain; +import org.junit.rules.TestRule; + +import java.io.IOException; + +@ThreadLeakFilters(filters = TestClustersThreadFilter.class) +public class EsqlRestValidationSkipUnavailableIT extends EsqlRestValidationIT { + static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, true); + + @ClassRule + public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster); + + @Override + protected String getTestRestCluster() { + return localCluster.getHttpAddresses(); + } + + @Override + protected void assertErrorMessageMaybe(String indexName, String errorMessage, int statusCode) throws IOException { + assertValidRequestOnIndices(new String[] { indexName }); + } + +} diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java index 7c81f97714a6f..8362bcea5c251 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/RequestIndexFilteringIT.java @@ -30,7 +30,7 @@ public class RequestIndexFilteringIT extends RequestIndexFilteringTestCase { static ElasticsearchCluster remoteCluster = Clusters.remoteCluster(); - static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster); + static ElasticsearchCluster localCluster = Clusters.localCluster(remoteCluster, false); @ClassRule public static TestRule clusterRule = RuleChain.outerRule(remoteCluster).around(localCluster); diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java index 9ec4f60f4c843..ef63bd10504ef 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlRestValidationTestCase.java @@ -87,7 +87,7 @@ public void testInexistentIndexNameWithoutWildcard() throws IOException { public void testExistentIndexWithoutWildcard() throws IOException { for (String indexName : existentIndexWithoutWildcard) { - assertErrorMessage(indexName, "\"reason\" : \"no such index [inexistent]\"", 404); + assertErrorMessageMaybe(indexName, "\"reason\" : \"no such index [inexistent]\"", 404); } } @@ -95,18 +95,22 @@ public void testExistentIndexWithWildcard() throws IOException { assertValidRequestOnIndices(existentIndexWithWildcard); } + protected void assertErrorMessageMaybe(String indexName, String errorMessage, int statusCode) throws IOException { + assertErrorMessage(indexName, errorMessage, statusCode); + } + public void testAlias() throws IOException { createAlias(); for (String indexName : existentAliasWithoutWildcard) { - assertErrorMessage(indexName, "\"reason\" : \"no such index [inexistent]\"", 404); + assertErrorMessageMaybe(indexName, "\"reason\" : \"no such index [inexistent]\"", 404); } assertValidRequestOnIndices(existentAliasWithWildcard); deleteAlias(); } - private void assertErrorMessages(String[] indices, String errorMessage, int statusCode) throws IOException { + protected void assertErrorMessages(String[] indices, String errorMessage, int statusCode) throws IOException { for (String indexName : indices) { assertErrorMessage(indexName, errorMessage + "[" + clusterSpecificIndexName(indexName) + "]", statusCode); } @@ -116,7 +120,7 @@ protected String clusterSpecificIndexName(String indexName) { return indexName; } - private void assertErrorMessage(String indexName, String errorMessage, int statusCode) throws IOException { + protected void assertErrorMessage(String indexName, String errorMessage, int statusCode) throws IOException { var specificName = clusterSpecificIndexName(indexName); final var request = createRequest(specificName); ResponseException exc = expectThrows(ResponseException.class, () -> client().performRequest(request)); @@ -138,7 +142,7 @@ private Request createRequest(String indexName) throws IOException { return request; } - private void assertValidRequestOnIndices(String[] indices) throws IOException { + protected void assertValidRequestOnIndices(String[] indices) throws IOException { for (String indexName : indices) { final var request = createRequest(clusterSpecificIndexName(indexName)); Response response = client().performRequest(request); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java index 8054b260f0060..7b68969c09642 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java @@ -22,6 +22,8 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.xpack.esql.action.AbstractPauseFieldPlugin.PAUSE_FIELD_LANG; + /** A pausable testcase. Subclasses extend this testcase to simulate slow running queries. * * Uses the evaluation of a runtime field in the mappings "pause_me" of type long, along @@ -64,7 +66,7 @@ public void setupIndex() throws IOException { mapping.startObject("pause_me"); { mapping.field("type", "long"); - mapping.startObject("script").field("source", "").field("lang", "pause").endObject(); + mapping.startObject("script").field("source", "").field("lang", PAUSE_FIELD_LANG).endObject(); } mapping.endObject(); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java index 5554f7e571dfb..05a1e41c596e7 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPauseFieldPlugin.java @@ -28,6 +28,8 @@ */ public abstract class AbstractPauseFieldPlugin extends Plugin implements ScriptPlugin { + public static final String PAUSE_FIELD_LANG = "pause"; + // Called when the engine enters the execute() method. protected void onStartExecute() {} @@ -39,7 +41,7 @@ public ScriptEngine getScriptEngine(Settings settings, Collection remoteClusterAlias() { return List.of(REMOTE_CLUSTER); @@ -57,6 +66,11 @@ protected Collection> nodePlugins(String clusterAlias) { return plugins; } + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of("cluster-a", true, "cluster-b", false); + } + public static class InternalExchangePlugin extends Plugin { @Override public List> getSettings() { @@ -82,7 +96,7 @@ private void createRemoteIndex(int numDocs) throws Exception { mapping.startObject("const"); { mapping.field("type", "long"); - mapping.startObject("script").field("source", "").field("lang", "pause").endObject(); + mapping.startObject("script").field("source", "").field("lang", AbstractPauseFieldPlugin.PAUSE_FIELD_LANG).endObject(); } mapping.endObject(); } @@ -96,6 +110,26 @@ private void createRemoteIndex(int numDocs) throws Exception { bulk.get(); } + private void createLocalIndex(int numDocs) throws Exception { + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("const"); + { + mapping.field("type", "long"); + } + mapping.endObject(); + } + mapping.endObject(); + mapping.endObject(); + client(LOCAL_CLUSTER).admin().indices().prepareCreate("test").setMapping(mapping).get(); + BulkRequestBuilder bulk = client(LOCAL_CLUSTER).prepareBulk("test").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < numDocs; i++) { + bulk.add(new IndexRequest().source("const", i)); + } + bulk.get(); + } + public void testCancel() throws Exception { createRemoteIndex(between(10, 100)); EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); @@ -208,4 +242,92 @@ public void testTasks() throws Exception { } requestFuture.actionGet(30, TimeUnit.SECONDS).close(); } + + // Check that cancelling remote task with skip_unavailable=true produces partial + public void testCancelSkipUnavailable() throws Exception { + createRemoteIndex(between(10, 100)); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query("FROM *:test | STATS total=sum(const) | LIMIT 1"); + request.pragmas(randomPragmas()); + request.includeCCSMetadata(true); + PlainActionFuture requestFuture = new PlainActionFuture<>(); + client().execute(EsqlQueryAction.INSTANCE, request, requestFuture); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + List rootTasks = new ArrayList<>(); + assertBusy(() -> { + List tasks = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(ComputeService.CLUSTER_ACTION_NAME) + .get() + .getTasks(); + assertThat(tasks, hasSize(1)); + rootTasks.addAll(tasks); + }); + var cancelRequest = new CancelTasksRequest().setTargetTaskId(rootTasks.get(0).taskId()).setReason("remote failed"); + client(REMOTE_CLUSTER).execute(TransportCancelTasksAction.TYPE, cancelRequest); + try { + assertBusy(() -> { + List drivers = client(REMOTE_CLUSTER).admin() + .cluster() + .prepareListTasks() + .setActions(DriverTaskRunner.ACTION_NAME) + .get() + .getTasks(); + assertThat(drivers.size(), greaterThanOrEqualTo(1)); + for (TaskInfo driver : drivers) { + assertTrue(driver.cancelled()); + } + }); + } finally { + SimplePauseFieldPlugin.allowEmitting.countDown(); + } + var resp = requestFuture.actionGet(); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + + assertNotNull(executionInfo); + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER); + + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + assertThat(cluster.getFailures().size(), equalTo(1)); + assertThat(cluster.getFailures().get(0).getCause(), instanceOf(TaskCancelledException.class)); + } + + // Check that closing remote node with skip_unavailable=true produces partial + public void testCloseSkipUnavailable() throws Exception { + assumeTrue("Only snapshot builds have delay()", Build.current().isSnapshot()); + createRemoteIndex(between(1000, 5000)); + createLocalIndex(100); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(""" + FROM test*,cluster-a:test* METADATA _index + | EVAL cluster=MV_FIRST(SPLIT(_index, ":")) + | WHERE CASE(cluster == "cluster-a", delay(1ms), true) + | STATS total = sum(const) | LIMIT 1 + """); + request.pragmas(randomPragmas()); + var requestFuture = client().execute(EsqlQueryAction.INSTANCE, request); + assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS)); + SimplePauseFieldPlugin.allowEmitting.countDown(); + cluster(REMOTE_CLUSTER).close(); + try (var resp = requestFuture.actionGet()) { + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertNotNull(executionInfo); + + List> values = getValuesList(resp); + assertThat(values.get(0).size(), equalTo(1)); + // We can't be sure of the exact value here as we don't know if any data from remote came in, but all local data should be there + assertThat((long) values.get(0).get(0), greaterThanOrEqualTo(4950L)); + + EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(REMOTE_CLUSTER); + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(LOCAL_CLUSTER); + + assertThat(localCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)); + assertThat(localCluster.getSuccessfulShards(), equalTo(1)); + + assertThat(cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getFailures().size(), equalTo(1)); + } + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java index 6cfc42523007e..1264ba508d08e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersQueryIT.java @@ -27,10 +27,13 @@ import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.FailingFieldPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.XContentTestUtils; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -61,10 +64,10 @@ public class CrossClustersQueryIT extends AbstractMultiClustersTestCase { private static final String REMOTE_CLUSTER_1 = "cluster-a"; private static final String REMOTE_CLUSTER_2 = "remote-b"; - private static String LOCAL_INDEX = "logs-1"; - private static String IDX_ALIAS = "alias1"; - private static String FILTERED_IDX_ALIAS = "alias-filtered-1"; - private static String REMOTE_INDEX = "logs-2"; + private static final String LOCAL_INDEX = "logs-1"; + private static final String IDX_ALIAS = "alias1"; + private static final String FILTERED_IDX_ALIAS = "alias-filtered-1"; + private static final String REMOTE_INDEX = "logs-2"; @Override protected List remoteClusterAlias() { @@ -81,6 +84,7 @@ protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); plugins.add(InternalExchangePlugin.class); + plugins.add(FailingFieldPlugin.class); return plugins; } @@ -510,34 +514,48 @@ public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() { // since cluster-a is skip_unavailable=true and at least one cluster has a matching indices, no error is thrown { - // TODO solve in follow-on PR which does skip_unavailable handling at execution time - // String q = Strings.format("FROM %s,cluster-a:nomatch,cluster-a:%s*", localIndex, remote1Index); - // try (EsqlQueryResponse resp = runQuery(q, requestIncludeMeta)) { - // assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); - // EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); - // assertThat(executionInfo.isCrossClusterSearch(), is(true)); - // assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); - // assertExpectedClustersForMissingIndicesTests(executionInfo, List.of( - // // local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched - // new ExpectedCluster(REMOTE_CLUSTER_1, "nomatch", EsqlExecutionInfo.Cluster.Status.SKIPPED, 0), - // new ExpectedCluster(REMOTE_CLUSTER_1, "*", EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, remote2NumShards) - // )); - // } - - // TODO: handle LIMIT 0 for this case in follow-on PR - // String limit0 = q + " | LIMIT 0"; - // try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) { - // assertThat(resp.columns().size(), greaterThanOrEqualTo(1)); - // assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(0)); - // EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); - // assertThat(executionInfo.isCrossClusterSearch(), is(true)); - // assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); - // assertExpectedClustersForMissingIndicesTests(executionInfo, List.of( - // // local cluster is never marked as SKIPPED even when no matching indices - just marked as 0 shards searched - // new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 0), - // new ExpectedCluster(REMOTE_CLUSTER_1, "nomatch," + remote1Index + "*", EsqlExecutionInfo.Cluster.Status.SKIPPED, 0) - // )); - // } + String q = Strings.format("FROM %s,cluster-a:nomatch,cluster-a:%s*", localIndex, remote1Index); + try (EsqlQueryResponse resp = runQuery(q, requestIncludeMeta)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertExpectedClustersForMissingIndicesTests( + executionInfo, + List.of( + new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, localNumShards), + new ExpectedCluster( + REMOTE_CLUSTER_1, + "nomatch," + remote1Index + "*", + // The nomatch index handling happens at runtime, hence we get PARTIAL instead of SUCCESSFUL + EsqlExecutionInfo.Cluster.Status.PARTIAL, + 0 + ) + ) + ); + } + + String limit0 = q + " | LIMIT 0"; + try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) { + assertThat(resp.columns().size(), greaterThanOrEqualTo(1)); + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(0)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta)); + assertExpectedClustersForMissingIndicesTests( + executionInfo, + List.of( + new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, 0), + new ExpectedCluster( + REMOTE_CLUSTER_1, + "nomatch," + remote1Index + "*", + // TODO: this probably should be PARTIAL instead, matching the above case + EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, + 0 + ) + ) + ); + } } // tests with three clusters --- @@ -848,12 +866,15 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu long overallTookMillis = executionInfo.overallTook().millis(); assertThat(overallTookMillis, greaterThanOrEqualTo(0L)); - Set expectedClusterAliases = expected.stream().map(c -> c.clusterAlias()).collect(Collectors.toSet()); + Set expectedClusterAliases = expected.stream().map(ExpectedCluster::clusterAlias).collect(Collectors.toSet()); assertThat(executionInfo.clusterAliases(), equalTo(expectedClusterAliases)); for (ExpectedCluster expectedCluster : expected) { EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(expectedCluster.clusterAlias()); String msg = cluster.getClusterAlias(); + if (msg.equals(LOCAL_CLUSTER)) { + msg = "(local)"; + } assertThat(msg, cluster.getIndexExpression(), equalTo(expectedCluster.indexExpression())); assertThat(msg, cluster.getStatus(), equalTo(expectedCluster.status())); assertThat(msg, cluster.getTook().millis(), greaterThanOrEqualTo(0L)); @@ -869,6 +890,10 @@ public void assertExpectedClustersForMissingIndicesTests(EsqlExecutionInfo execu assertThat(msg, cluster.getFailures().get(0).getCause(), instanceOf(VerificationException.class)); String expectedMsg = "Unknown index [" + expectedCluster.indexExpression() + "]"; assertThat(msg, cluster.getFailures().get(0).getCause().getMessage(), containsString(expectedMsg)); + } else if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL) { + assertThat(msg, cluster.getSuccessfulShards(), equalTo(0)); + assertThat(msg, cluster.getSkippedShards(), equalTo(expectedCluster.totalShards())); + assertThat(msg, cluster.getFailures().size(), equalTo(1)); } // currently failed shards is always zero - change this once we start allowing partial data for individual shard failures assertThat(msg, cluster.getFailedShards(), equalTo(0)); @@ -1217,6 +1242,26 @@ public void testWarnings() throws Exception { assertTrue(latch.await(30, TimeUnit.SECONDS)); } + public void testRemoteFailureSkipUnavailable() throws IOException { + Map testClusterInfo = setupFailClusters(); + String localIndex = (String) testClusterInfo.get("local.index"); + String remote1Index = (String) testClusterInfo.get("remote.index"); + int localNumShards = (Integer) testClusterInfo.get("local.num_shards"); + String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index); + try (EsqlQueryResponse resp = runQuery(q, false)) { + assertThat(getValuesList(resp).size(), greaterThanOrEqualTo(1)); + EsqlExecutionInfo executionInfo = resp.getExecutionInfo(); + assertThat(executionInfo.isCrossClusterSearch(), is(true)); + assertExpectedClustersForMissingIndicesTests( + executionInfo, + List.of( + new ExpectedCluster(LOCAL_CLUSTER, localIndex, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL, localNumShards), + new ExpectedCluster(REMOTE_CLUSTER_1, remote1Index + "*", EsqlExecutionInfo.Cluster.Status.PARTIAL, 0) + ) + ); + } + } + private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) { try { final Map esqlResponseAsMap = XContentTestUtils.convertToMap(resp); @@ -1299,6 +1344,22 @@ Map setupClusters(int numClusters) { return clusterInfo; } + Map setupFailClusters() throws IOException { + int numShardsLocal = randomIntBetween(1, 5); + populateLocalIndices(LOCAL_INDEX, numShardsLocal); + + int numShardsRemote = randomIntBetween(1, 5); + populateRemoteIndicesFail(REMOTE_CLUSTER_1, REMOTE_INDEX, numShardsRemote); + + Map clusterInfo = new HashMap<>(); + clusterInfo.put("local.num_shards", numShardsLocal); + clusterInfo.put("local.index", LOCAL_INDEX); + clusterInfo.put("remote.num_shards", numShardsRemote); + clusterInfo.put("remote.index", REMOTE_INDEX); + setSkipUnavailable(REMOTE_CLUSTER_1, true); + return clusterInfo; + } + /** * For the local cluster and REMOTE_CLUSTER_1 it creates a standard alias to the index created in populateLocalIndices * and populateRemoteIndices. It also creates a filtered alias against those indices that looks like: @@ -1421,6 +1482,31 @@ void populateRemoteIndices(String clusterAlias, String indexName, int numShards) remoteClient.admin().indices().prepareRefresh(indexName).get(); } + void populateRemoteIndicesFail(String clusterAlias, String indexName, int numShards) throws IOException { + Client remoteClient = client(clusterAlias); + XContentBuilder mapping = JsonXContent.contentBuilder().startObject(); + mapping.startObject("runtime"); + { + mapping.startObject("fail_me"); + { + mapping.field("type", "long"); + mapping.startObject("script").field("source", "").field("lang", FailingFieldPlugin.FAILING_FIELD_LANG).endObject(); + } + mapping.endObject(); + } + mapping.endObject(); + assertAcked( + remoteClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping(mapping.endObject()) + ); + + remoteClient.prepareIndex(indexName).setSource("id", 0).get(); + remoteClient.admin().indices().prepareRefresh(indexName).get(); + } + private void setSkipUnavailable(String clusterAlias, boolean skip) { client(LOCAL_CLUSTER).admin() .cluster() diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java index 8bd23230fcde7..a60ac37a06866 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeListener.java @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static org.elasticsearch.xpack.esql.session.EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards; + /** * A variant of {@link RefCountingListener} with the following differences: * 1. Automatically cancels sub tasks on failure. @@ -135,8 +137,7 @@ private ComputeListener( private static void setFinalStatusAndShardCounts(String clusterAlias, EsqlExecutionInfo executionInfo) { executionInfo.swapCluster(clusterAlias, (k, v) -> { - // TODO: once PARTIAL status is supported (partial results work to come), modify this code as needed - if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + if (v.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED && v.getStatus() != EsqlExecutionInfo.Cluster.Status.PARTIAL) { assert v.getTotalShards() != null && v.getSkippedShards() != null : "Null total or skipped shard count: " + v; return new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL) /* @@ -201,6 +202,46 @@ ActionListener acquireAvoid() { }); } + /** + * Should we ignore a failure from the remote cluster due to skip_unavailable=true setting? + */ + boolean shouldIgnoreRemoteErrors(@Nullable String computeClusterAlias) { + return computeClusterAlias != null + && esqlExecutionInfo.isCrossClusterSearch() + && computeClusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false + && runningOnRemoteCluster() == false + && esqlExecutionInfo.isSkipUnavailable(computeClusterAlias); + } + + /** + * Marks the cluster as PARTIAL and adds the exception to the cluster's failures record. + * Currently, additional failures are not recorded. + * TODO: add accumulating failures. + */ + void markAsPartial(String computeClusterAlias, Exception e) { + var status = esqlExecutionInfo.getCluster(computeClusterAlias).getStatus(); + assert status != EsqlExecutionInfo.Cluster.Status.SKIPPED + : "We shouldn't be running compute on a cluster that's already marked as skipped"; + // We use PARTIAL here because we can not know whether the cluster has already sent any data. + if (status != EsqlExecutionInfo.Cluster.Status.PARTIAL) { + LOGGER.debug("Marking failed cluster {} as partial: {}", computeClusterAlias, e); + markClusterWithFinalStateAndNoShards(esqlExecutionInfo, computeClusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e); + } + } + + /** + * Acquire a listener that respects skip_unavailable setting for this cluster. + */ + ActionListener acquireSkipUnavailable(@Nullable String computeClusterAlias) { + if (shouldIgnoreRemoteErrors(computeClusterAlias) == false) { + return acquireAvoid(); + } + return refs.acquire().delegateResponse((l, e) -> { + markAsPartial(computeClusterAlias, e); + l.onResponse(null); + }); + } + /** * Acquires a new listener that collects compute result. This listener will also collect warnings emitted during compute * @param computeClusterAlias The cluster alias where the compute is happening. Used when metadata needs to be gathered @@ -211,7 +252,7 @@ ActionListener acquireCompute(@Nullable String computeClusterAl assert computeClusterAlias == null || (esqlExecutionInfo != null && esqlExecutionInfo.getRelativeStartNanos() != null) : "When clusterAlias is provided to acquireCompute, executionInfo and relativeStartTimeNanos must be non-null"; - return acquireAvoid().map(resp -> { + return acquireSkipUnavailable(computeClusterAlias).map(resp -> { responseHeaders.collect(); var profiles = resp.getProfiles(); if (profiles != null && profiles.isEmpty() == false) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index a38236fe60954..9b7d1629985a9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.compute.EsqlRefCountingListener; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.Page; @@ -49,11 +50,14 @@ import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -105,6 +109,7 @@ public class ComputeService { private final LookupFromIndexService lookupFromIndexService; private final ClusterService clusterService; private final AtomicLong childSessionIdGenerator = new AtomicLong(); + private final TaskManager taskManager; public ComputeService( SearchService searchService, @@ -134,6 +139,7 @@ public ComputeService( this.enrichLookupService = enrichLookupService; this.lookupFromIndexService = lookupFromIndexService; this.clusterService = clusterService; + this.taskManager = transportService.getTaskManager(); } public void execute( @@ -402,36 +408,126 @@ private void startComputeOnRemoteClusters( var linkExchangeListeners = ActionListener.releaseAfter(computeListener.acquireAvoid(), exchangeSource.addEmptySink()); try (EsqlRefCountingListener refs = new EsqlRefCountingListener(linkExchangeListeners)) { for (RemoteCluster cluster : clusters) { + final String clusterAlias = cluster.clusterAlias(); + final boolean suppressRemoteFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias); final var childSessionId = newChildSession(sessionId); + var openExchangeListener = refs.acquire(); ExchangeService.openExchange( transportService, cluster.connection, childSessionId, queryPragmas.exchangeBufferSize(), esqlExecutor, - refs.acquire().delegateFailureAndWrap((l, unused) -> { - var remoteSink = exchangeService.newRemoteSink(rootTask, childSessionId, transportService, cluster.connection); - exchangeSource.addRemoteSink(remoteSink, true, queryPragmas.concurrentExchangeClients(), ActionListener.noop()); + ActionListener.wrap(unused -> { + // Exchange opening is successful var remotePlan = new RemoteClusterPlan(plan, cluster.concreteIndices, cluster.originalIndices); var clusterRequest = new ClusterComputeRequest(cluster.clusterAlias, childSessionId, configuration, remotePlan); - var clusterListener = ActionListener.runBefore( - computeListener.acquireCompute(cluster.clusterAlias()), - () -> l.onResponse(null) + var listenerGroup = new RemoteListenerGroup(rootTask, computeListener, clusterAlias, openExchangeListener); + var remoteSink = exchangeService.newRemoteSink( + listenerGroup.getGroupTask(), + childSessionId, + transportService, + cluster.connection + ); + exchangeSource.addRemoteSink( + remoteSink, + // This applies to all the subrequests for this cluster, and all the failures. + // If we need to change it depending on the kind of failure, we'll have to replace this with a predicate. + suppressRemoteFailure == false, + queryPragmas.concurrentExchangeClients(), + listenerGroup.getExchangeRequestListener() ); transportService.sendChildRequest( cluster.connection, CLUSTER_ACTION_NAME, clusterRequest, - rootTask, + listenerGroup.getGroupTask(), TransportRequestOptions.EMPTY, - new ActionListenerResponseHandler<>(clusterListener, ComputeResponse::new, esqlExecutor) + new ActionListenerResponseHandler<>( + listenerGroup.getClusterRequestListener(), + ComputeResponse::new, + esqlExecutor + ) ); + }, e -> { + // Exchange opening failed + if (suppressRemoteFailure) { + computeListener.markAsPartial(clusterAlias, e); + openExchangeListener.onResponse(null); + } else { + openExchangeListener.onFailure(e); + } }) ); } } } + // Create group task for this cluster. This group task ensures that two branches of the computation: + // the exchange sink and the cluster request, belong to the same group and each of them can cancel the other. + // runAfter listeners below ensure that the group is finalized when both branches are done. + // The group task is the child of the root task, so if the root task is cancelled, the group task is cancelled too. + private class RemoteListenerGroup { + private final CancellableTask groupTask; + private final ActionListener exchangeRequestListener; + private final ActionListener clusterRequestListener; + + RemoteListenerGroup(Task rootTask, ComputeListener computeListener, String clusterAlias, ActionListener delegate) { + final boolean suppressRemoteFailure = computeListener.shouldIgnoreRemoteErrors(clusterAlias); + groupTask = createGroupTask(rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]"); + CountDown countDown = new CountDown(2); + // The group is done when both the sink and the cluster request are done + Runnable finishGroup = () -> { + if (countDown.countDown()) { + taskManager.unregister(groupTask); + delegate.onResponse(null); + } + }; + // Cancel the group on sink failure + ActionListener exchangeListener = computeListener.acquireAvoid().delegateResponse((inner, e) -> { + taskManager.cancelTaskAndDescendants(groupTask, "exchange sink failure", true, ActionListener.running(() -> { + if (suppressRemoteFailure) { + computeListener.markAsPartial(clusterAlias, e); + inner.onResponse(null); + } else { + inner.onFailure(e); + } + })); + }); + exchangeRequestListener = ActionListener.runAfter(exchangeListener, finishGroup); + // Cancel the group on cluster request failure + var clusterListener = computeListener.acquireCompute(clusterAlias).delegateResponse((inner, e) -> { + taskManager.cancelTaskAndDescendants( + groupTask, + "exchange cluster action failure", + true, + ActionListener.running(() -> inner.onFailure(e)) + ); + }); + clusterRequestListener = ActionListener.runAfter(clusterListener, finishGroup); + } + + public CancellableTask getGroupTask() { + return groupTask; + } + + public ActionListener getExchangeRequestListener() { + return exchangeRequestListener; + } + + public ActionListener getClusterRequestListener() { + return clusterRequestListener; + } + + private CancellableTask createGroupTask(Task parentTask, Supplier description) { + return (CancellableTask) taskManager.register( + "transport", + "esql_compute_group", + new ComputeGroupTaskRequest(parentTask.taskInfo(transportService.getLocalNode().getId(), false).taskId(), description) + ); + } + } + void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener listener) { listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts)); List contexts = new ArrayList<>(context.searchContexts.size()); @@ -958,4 +1054,24 @@ public List searchExecutionContexts() { private String newChildSession(String session) { return session + "/" + childSessionIdGenerator.incrementAndGet(); } + + private static class ComputeGroupTaskRequest extends TransportRequest { + private final Supplier parentDescription; + + ComputeGroupTaskRequest(TaskId parentTask, Supplier description) { + this.parentDescription = description; + setParentTask(parentTask); + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + assert parentTaskId.isSet(); + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public String getDescription() { + return "group [" + parentDescription.get() + "]"; + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java index f8670a8e6d053..3b5dfee1807cf 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtils.java @@ -15,7 +15,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Nullable; import org.elasticsearch.indices.IndicesExpressionGrouper; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.transport.ConnectTransportException; @@ -25,6 +25,7 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo; +import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo.Cluster; import org.elasticsearch.xpack.esql.analysis.Analyzer; import org.elasticsearch.xpack.esql.analysis.TableInfo; import org.elasticsearch.xpack.esql.index.IndexResolution; @@ -37,7 +38,7 @@ import java.util.Map; import java.util.Set; -class EsqlSessionCCSUtils { +public class EsqlSessionCCSUtils { private EsqlSessionCCSUtils() {} @@ -119,16 +120,16 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn } for (String clusterAlias : executionInfo.clusterAliases()) { executionInfo.swapCluster(clusterAlias, (k, v) -> { - EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v).setTook(executionInfo.overallTook()) + Cluster.Builder builder = new Cluster.Builder(v).setTook(executionInfo.overallTook()) .setTotalShards(0) .setSuccessfulShards(0) .setSkippedShards(0) .setFailedShards(0); if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias)) { // never mark local cluster as skipped - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL); + builder.setStatus(Cluster.Status.SUCCESSFUL); } else { - builder.setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED); + builder.setStatus(Cluster.Status.SKIPPED); // add this exception to the failures list only if there is no failure already recorded there if (v.getFailures() == null || v.getFailures().size() == 0) { builder.setFailures(List.of(new ShardSearchFailure(exceptionForResponse))); @@ -142,8 +143,8 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) { StringBuilder sb = new StringBuilder(); for (String clusterAlias : executionInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias); - if (cluster.getStatus() != EsqlExecutionInfo.Cluster.Status.SKIPPED) { + Cluster cluster = executionInfo.getCluster(clusterAlias); + if (cluster.getStatus() != Cluster.Status.SKIPPED) { if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(','); } else { @@ -171,16 +172,7 @@ static void updateExecutionInfoWithUnavailableClusters(EsqlExecutionInfo execInf entry.getValue().getException() ); if (skipUnavailable) { - execInfo.swapCluster( - clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SKIPPED) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0) - .setFailures(List.of(new ShardSearchFailure(e))) - .build() - ); + markClusterWithFinalStateAndNoShards(execInfo, clusterAlias, Cluster.Status.SKIPPED, e); } else { throw e; } @@ -228,27 +220,16 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } } else { // handles local cluster (when no concrete indices requested) and skip_unavailable=true clusters - EsqlExecutionInfo.Cluster.Status status; - ShardSearchFailure failure; + Cluster.Status status; + Exception failureException; if (c.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) { - status = EsqlExecutionInfo.Cluster.Status.SUCCESSFUL; - failure = null; + status = Cluster.Status.SUCCESSFUL; + failureException = null; } else { - status = EsqlExecutionInfo.Cluster.Status.SKIPPED; - failure = new ShardSearchFailure(new VerificationException("Unknown index [" + indexExpression + "]")); + status = Cluster.Status.SKIPPED; + failureException = new VerificationException("Unknown index [" + indexExpression + "]"); } - executionInfo.swapCluster(c, (k, v) -> { - var builder = new EsqlExecutionInfo.Cluster.Builder(v).setStatus(status) - .setTook(new TimeValue(0)) - .setTotalShards(0) - .setSuccessfulShards(0) - .setSkippedShards(0) - .setFailedShards(0); - if (failure != null) { - builder.setFailures(List.of(failure)); - } - return builder.build(); - }); + markClusterWithFinalStateAndNoShards(executionInfo, c, status, failureException); } } if (fatalErrorMessage != null) { @@ -256,6 +237,32 @@ static void updateExecutionInfoWithClustersWithNoMatchingIndices(EsqlExecutionIn } } + /** + * Mark cluster with a default cluster state with the given status and potentially failure from exception. + * Most metrics are set to 0 except for "took" which is set to the total time taken so far. + * The status must be the final state of the cluster, not RUNNING. + */ + public static void markClusterWithFinalStateAndNoShards( + EsqlExecutionInfo executionInfo, + String clusterAlias, + Cluster.Status status, + @Nullable Exception ex + ) { + assert status != Cluster.Status.RUNNING : "status must be a final state, not RUNNING"; + executionInfo.swapCluster(clusterAlias, (k, v) -> { + Cluster.Builder builder = new Cluster.Builder(v).setStatus(status) + .setTook(executionInfo.tookSoFar()) + .setTotalShards(0) + .setSuccessfulShards(0) + .setSkippedShards(0) + .setFailedShards(0); + if (ex != null) { + builder.setFailures(List.of(new ShardSearchFailure(ex))); + } + return builder.build(); + }); + } + // visible for testing static boolean missingIndicesIsFatal(String clusterAlias, EsqlExecutionInfo executionInfo) { // missing indices on local cluster is fatal only if a concrete index requested @@ -287,11 +294,11 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) { if (execInfo.isCrossClusterSearch()) { execInfo.markEndPlanning(); for (String clusterAlias : execInfo.clusterAliases()) { - EsqlExecutionInfo.Cluster cluster = execInfo.getCluster(clusterAlias); - if (cluster.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + Cluster cluster = execInfo.getCluster(clusterAlias); + if (cluster.getStatus() == Cluster.Status.SKIPPED) { execInfo.swapCluster( clusterAlias, - (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTook(execInfo.planningTookTime()) + (k, v) -> new Cluster.Builder(v).setTook(execInfo.planningTookTime()) .setTotalShards(0) .setSuccessfulShards(0) .setSkippedShards(0) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java index 6b01010ffa5f4..82c72d372da1d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/EsqlSessionCCSUtilsTests.java @@ -216,6 +216,47 @@ public void testUpdateExecutionInfoWithUnavailableClusters() { } } + public void testUpdateExecutionInfoWithUnavailableClustersPartial() { + final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; + final String remote1Alias = "remote1"; + final String remote2Alias = "remote2"; + + // skip_unavailable=true clusters are unavailable, both marked as PARTIAL + { + EsqlExecutionInfo executionInfo = new EsqlExecutionInfo(true); + executionInfo.swapCluster(localClusterAlias, (k, v) -> new EsqlExecutionInfo.Cluster(localClusterAlias, "logs*", false)); + executionInfo.swapCluster(remote1Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote1Alias, "*", true)); + executionInfo.swapCluster(remote2Alias, (k, v) -> new EsqlExecutionInfo.Cluster(remote2Alias, "mylogs1,mylogs2,logs*", true)); + + Exception ex = new NoSeedNodeLeftException("unable to connect"); + var unvailableClusters = Map.of(remote1Alias, ex, remote2Alias, ex); + unvailableClusters.forEach( + (c, f) -> EsqlSessionCCSUtils.markClusterWithFinalStateAndNoShards( + executionInfo, + c, + EsqlExecutionInfo.Cluster.Status.PARTIAL, + ex + ) + ); + + assertThat(executionInfo.clusterAliases(), equalTo(Set.of(localClusterAlias, remote1Alias, remote2Alias))); + assertNull(executionInfo.overallTook()); + + EsqlExecutionInfo.Cluster localCluster = executionInfo.getCluster(localClusterAlias); + assertThat(localCluster.getIndexExpression(), equalTo("logs*")); + assertClusterStatusAndShardCounts(localCluster, EsqlExecutionInfo.Cluster.Status.RUNNING); + + EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); + assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); + assertClusterStatusAndShardCounts(remote1Cluster, EsqlExecutionInfo.Cluster.Status.PARTIAL); + + EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); + assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); + assertClusterStatusAndShardCounts(remote2Cluster, EsqlExecutionInfo.Cluster.Status.PARTIAL); + } + + } + public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { final String localClusterAlias = RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY; final String remote1Alias = "remote1"; @@ -293,7 +334,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlExecutionInfo.Cluster remote1Cluster = executionInfo.getCluster(remote1Alias); assertThat(remote1Cluster.getIndexExpression(), equalTo("*")); assertThat(remote1Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(remote1Cluster.getTook().millis(), equalTo(0L)); + assertThat(remote1Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remote1Cluster.getTotalShards(), equalTo(0)); assertThat(remote1Cluster.getSuccessfulShards(), equalTo(0)); assertThat(remote1Cluster.getSkippedShards(), equalTo(0)); @@ -335,7 +376,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() { EsqlExecutionInfo.Cluster remote2Cluster = executionInfo.getCluster(remote2Alias); assertThat(remote2Cluster.getIndexExpression(), equalTo("mylogs1,mylogs2,logs*")); assertThat(remote2Cluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED)); - assertThat(remote2Cluster.getTook().millis(), equalTo(0L)); + assertThat(remote2Cluster.getTook().millis(), greaterThanOrEqualTo(0L)); assertThat(remote2Cluster.getTotalShards(), equalTo(0)); assertThat(remote2Cluster.getSuccessfulShards(), equalTo(0)); assertThat(remote2Cluster.getSkippedShards(), equalTo(0)); @@ -468,13 +509,20 @@ public void testUpdateExecutionInfoAtEndOfPlanning() { private void assertClusterStatusAndShardCounts(EsqlExecutionInfo.Cluster cluster, EsqlExecutionInfo.Cluster.Status status) { assertThat(cluster.getStatus(), equalTo(status)); - assertNull(cluster.getTook()); if (status == EsqlExecutionInfo.Cluster.Status.RUNNING) { + assertNull(cluster.getTook()); assertNull(cluster.getTotalShards()); assertNull(cluster.getSuccessfulShards()); assertNull(cluster.getSkippedShards()); assertNull(cluster.getFailedShards()); } else if (status == EsqlExecutionInfo.Cluster.Status.SKIPPED) { + assertNotNull(cluster.getTook()); + assertThat(cluster.getTotalShards(), equalTo(0)); + assertThat(cluster.getSuccessfulShards(), equalTo(0)); + assertThat(cluster.getSkippedShards(), equalTo(0)); + assertThat(cluster.getFailedShards(), equalTo(0)); + } else if (status == EsqlExecutionInfo.Cluster.Status.PARTIAL) { + assertNotNull(cluster.getTook()); assertThat(cluster.getTotalShards(), equalTo(0)); assertThat(cluster.getSuccessfulShards(), equalTo(0)); assertThat(cluster.getSkippedShards(), equalTo(0)); diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1MissingIndicesIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1MissingIndicesIT.java index 8bccc2e3c5c23..a97b6e343729d 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1MissingIndicesIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1MissingIndicesIT.java @@ -92,7 +92,6 @@ void assertExpectedClustersForMissingIndicesTests(Map responseMa Map clusterDetails = (Map) detailsMap.get(expectedCluster.clusterAlias()); String msg = expectedCluster.clusterAlias(); - assertThat(msg, (int) clusterDetails.get("took"), greaterThan(0)); assertThat(msg, clusterDetails.get("status"), is(expectedCluster.status())); Map shards = (Map) clusterDetails.get("_shards"); if (expectedCluster.totalShards() == null) { @@ -102,6 +101,7 @@ void assertExpectedClustersForMissingIndicesTests(Map responseMa } if (expectedCluster.status().equals("successful")) { + assertThat(msg, (int) clusterDetails.get("took"), greaterThan(0)); assertThat((int) shards.get("successful"), is((int) shards.get("total"))); assertThat((int) shards.get("skipped"), is(0)); @@ -116,6 +116,11 @@ void assertExpectedClustersForMissingIndicesTests(Map responseMa assertThat(innerReason.get("reason").toString(), containsString(expectedMsg)); assertThat(innerReason.get("type").toString(), containsString("verification_exception")); + } else if (expectedCluster.status().equals("partial")) { + assertThat((int) shards.get("successful"), is(0)); + assertThat((int) shards.get("skipped"), is(0)); + ArrayList failures = (ArrayList) clusterDetails.get("failures"); + assertThat(failures.size(), is(1)); } else { fail(msg + "; Unexpected status: " + expectedCluster.status()); } @@ -302,40 +307,42 @@ public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() throw assertThat(e.getMessage(), containsString(Strings.format("%s:%s", REMOTE_CLUSTER_ALIAS, remoteExpr))); } - // TODO uncomment and test in follow-on PR which does skip_unavailable handling at execution time - // { - // String q = Strings.format("FROM %s,%s:nomatch,%s:%s*", INDEX1, REMOTE_CLUSTER_ALIAS, REMOTE_CLUSTER_ALIAS, INDEX2); - // - // String limit1 = q + " | LIMIT 1"; - // Response response = client().performRequest(esqlRequest(limit1)); - // assertOK(response); - // - // Map map = responseAsMap(response); - // assertThat(((ArrayList) map.get("columns")).size(), greaterThanOrEqualTo(1)); - // assertThat(((ArrayList) map.get("values")).size(), greaterThanOrEqualTo(1)); - // - // assertExpectedClustersForMissingIndicesTests(map, - // List.of( - // new ExpectedCluster("(local)", INDEX1, "successful", null), - // new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "skipped", 0) - // ) - // ); - // - // String limit0 = q + " | LIMIT 0"; - // response = client().performRequest(esqlRequest(limit0)); - // assertOK(response); - // - // map = responseAsMap(response); - // assertThat(((ArrayList) map.get("columns")).size(), greaterThanOrEqualTo(1)); - // assertThat(((ArrayList) map.get("values")).size(), is(0)); - // - // assertExpectedClustersForMissingIndicesTests(map, - // List.of( - // new ExpectedCluster("(local)", INDEX1, "successful", 0), - // new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "skipped", 0) - // ) - // ); - // } + { + String q = Strings.format("FROM %s,%s:nomatch,%s:%s*", INDEX1, REMOTE_CLUSTER_ALIAS, REMOTE_CLUSTER_ALIAS, INDEX2); + + String limit1 = q + " | LIMIT 1"; + Response response = client().performRequest(esqlRequest(limit1)); + assertOK(response); + + Map map = responseAsMap(response); + assertThat(((ArrayList) map.get("columns")).size(), greaterThanOrEqualTo(1)); + assertThat(((ArrayList) map.get("values")).size(), greaterThanOrEqualTo(1)); + + assertExpectedClustersForMissingIndicesTests( + map, + List.of( + new ExpectedCluster("(local)", INDEX1, "successful", 1), + new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "partial", 0) + ) + ); + + String limit0 = q + " | LIMIT 0"; + response = client().performRequest(esqlRequest(limit0)); + assertOK(response); + + map = responseAsMap(response); + assertThat(((ArrayList) map.get("columns")).size(), greaterThanOrEqualTo(1)); + assertThat(((ArrayList) map.get("values")).size(), is(0)); + + assertExpectedClustersForMissingIndicesTests( + map, + List.of( + new ExpectedCluster("(local)", INDEX1, "successful", 0), + // TODO: this should actually be partial, but LIMIT 0 is not processed properly yet + new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "nomatch," + INDEX2 + "*", "successful", 0) + ) + ); + } } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java index d6bad85161fd9..3e24f5ea2e9fb 100644 --- a/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java +++ b/x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/RemoteClusterSecurityEsqlIT.java @@ -1017,7 +1017,6 @@ public void testSearchesAgainstNonMatchingIndicesWithSkipUnavailableTrue() throw } String remoteSearchUserAPIKey = createRemoteSearchUserAPIKey(); - // sanity check - init queries to ensure we can query employees on local and employees,employees2 on remote { Request request = esqlRequest(""" @@ -1215,15 +1214,15 @@ public void accept(Response response, Boolean limit0) throws Exception { } final List expectedClusters = List.of( new ExpectedCluster("(local)", "employees", "successful", limit0 ? 0 : null), - new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "employees_nomatch,employees*", "successful", 0) + // TODO: LIMIT0 query should be partial too, but current implementation does not mark it as such + new ExpectedCluster(REMOTE_CLUSTER_ALIAS, "employees_nomatch,employees*", limit0 ? "successful" : "partial", 0) ); assertExpectedClustersForMissingIndicesTests(map, expectedClusters); }; - // TODO: uncomment in follow on PR handling skip_unavailable errors at execution time - // Request limit1 = esqlRequest(q + " | LIMIT 1"); - // verifier.accept(performRequestWithRemoteSearchUser(limit1), false); - // verifier.accept(performRequestWithRemoteSearchUserViaAPIKey(limit1, remoteSearchUserAPIKey), false); + Request limit1 = esqlRequest(q + " | LIMIT 1"); + verifier.accept(performRequestWithRemoteSearchUser(limit1), false); + verifier.accept(performRequestWithRemoteSearchUserViaAPIKey(limit1, remoteSearchUserAPIKey), false); Request limit0 = esqlRequest(q + " | LIMIT 0"); verifier.accept(performRequestWithRemoteSearchUser(limit0), true); @@ -1674,6 +1673,12 @@ void assertExpectedClustersForMissingIndicesTests(Map responseMa assertThat(innerReason.get("reason").toString(), containsString(expectedMsg)); assertThat(innerReason.get("type").toString(), containsString("verification_exception")); + } else if (expectedCluster.status().equals("partial")) { + assertThat((int) shards.get("successful"), is(0)); + assertThat((int) shards.get("skipped"), is(0)); + ArrayList failures = (ArrayList) clusterDetails.get("failures"); + assertThat(failures.size(), is(1)); + } else { fail(msg + "; Unexpected status: " + expectedCluster.status()); }