diff --git a/test/external-modules/error-query/src/javaRestTest/java/org/elasticsearch/test/esql/EsqlPartialResultsIT.java b/test/external-modules/error-query/src/javaRestTest/java/org/elasticsearch/test/esql/EsqlPartialResultsIT.java index 1a4299b0ce938..5f85dc8f3bec1 100644 --- a/test/external-modules/error-query/src/javaRestTest/java/org/elasticsearch/test/esql/EsqlPartialResultsIT.java +++ b/test/external-modules/error-query/src/javaRestTest/java/org/elasticsearch/test/esql/EsqlPartialResultsIT.java @@ -28,8 +28,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThanOrEqualTo; public class EsqlPartialResultsIT extends ESRestTestCase { @ClassRule @@ -106,7 +106,11 @@ public void testPartialResult() throws Exception { Set okIds = populateIndices(); String query = """ { - "query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v" + "query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v", + "pragma": { + "max_concurrent_shards_per_node": 1 + }, + "accept_pragma_risks": true } """; // allow_partial_results = true @@ -123,7 +127,7 @@ public void testPartialResult() throws Exception { List columns = (List) results.get("columns"); assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long")))); List values = (List) results.get("values"); - assertThat(values.size(), lessThanOrEqualTo(okIds.size())); + assertThat(values.size(), equalTo(okIds.size())); Map localInfo = (Map) XContentMapValues.extractValue( results, "_clusters", @@ -131,11 +135,10 @@ public void testPartialResult() throws Exception { "(local)" ); assertNotNull(localInfo); - assertThat(XContentMapValues.extractValue(localInfo, "_shards", "successful"), equalTo(0)); - assertThat( - XContentMapValues.extractValue(localInfo, "_shards", "failed"), - equalTo(XContentMapValues.extractValue(localInfo, "_shards", "total")) - ); + Integer successfulShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "successful"); + Integer failedShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "failed"); + assertThat(successfulShards, greaterThan(0)); + assertThat(failedShards, greaterThan(0)); List> failures = (List>) XContentMapValues.extractValue(localInfo, "failures"); assertThat(failures, hasSize(1)); assertThat( @@ -167,7 +170,11 @@ public void testFailureFromRemote() throws Exception { Set okIds = populateIndices(); String query = """ { - "query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v" + "query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v", + "pragma": { + "max_concurrent_shards_per_node": 1 + }, + "accept_pragma_risks": true } """; // allow_partial_results = true @@ -183,7 +190,7 @@ public void testFailureFromRemote() throws Exception { List columns = (List) results.get("columns"); assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long")))); List values = (List) results.get("values"); - assertThat(values.size(), lessThanOrEqualTo(okIds.size())); + assertThat(values.size(), equalTo(okIds.size())); Map remoteCluster = (Map) XContentMapValues.extractValue( results, "_clusters", @@ -191,11 +198,10 @@ public void testFailureFromRemote() throws Exception { "cluster_one" ); assertNotNull(remoteCluster); - assertThat(XContentMapValues.extractValue(remoteCluster, "_shards", "successful"), equalTo(0)); - assertThat( - XContentMapValues.extractValue(remoteCluster, "_shards", "failed"), - equalTo(XContentMapValues.extractValue(remoteCluster, "_shards", "total")) - ); + Integer successfulShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "successful"); + Integer failedShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "failed"); + assertThat(successfulShards, greaterThan(0)); + assertThat(failedShards, greaterThan(0)); List> failures = (List>) XContentMapValues.extractValue(remoteCluster, "failures"); assertThat(failures, hasSize(1)); assertThat( @@ -207,6 +213,25 @@ public void testFailureFromRemote() throws Exception { } } + public void testAllShardsFailed() throws Exception { + setupRemoteClusters(); + populateIndices(); + try { + for (boolean allowPartialResults : List.of(Boolean.TRUE, Boolean.FALSE)) { + for (String index : List.of("failing*", "*:failing*", "*:failing*,failing*")) { + Request request = new Request("POST", "/_query"); + request.setJsonEntity("{\"query\": \"FROM " + index + " | LIMIT 100\"}"); + request.addParameter("allow_partial_results", Boolean.toString(allowPartialResults)); + var error = expectThrows(ResponseException.class, () -> client().performRequest(request)); + Response resp = error.getResponse(); + assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field")); + } + } + } finally { + removeRemoteCluster(); + } + } + private void setupRemoteClusters() throws IOException { String settings = String.format(Locale.ROOT, """ { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java index 30b05f741ec82..7da333e12f7e6 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.util.ArrayList; import java.util.Collection; @@ -30,6 +31,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -98,38 +100,30 @@ public void testFailureLoadingFields() throws Exception { public void testPartialResults() throws Exception { Set okIds = populateIndices(); - { - EsqlQueryRequest request = new EsqlQueryRequest(); - request.query("FROM fail,ok | LIMIT 100"); - request.allowPartialResults(true); - request.pragmas(randomPragmas()); - try (EsqlQueryResponse resp = run(request)) { - assertTrue(resp.isPartial()); - List> rows = EsqlTestUtils.getValuesList(resp); - assertThat(rows.size(), lessThanOrEqualTo(okIds.size())); - } - } - { - EsqlQueryRequest request = new EsqlQueryRequest(); - request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100"); - request.allowPartialResults(true); - request.pragmas(randomPragmas()); - try (EsqlQueryResponse resp = run(request)) { - assertTrue(resp.isPartial()); - List> rows = EsqlTestUtils.getValuesList(resp); - assertThat(rows.size(), lessThanOrEqualTo(okIds.size())); - Set actualIds = new HashSet<>(); - for (List row : rows) { - assertThat(row.size(), equalTo(2)); - String id = (String) row.getFirst(); - assertThat(id, in(okIds)); - assertTrue(actualIds.add(id)); - } - EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY); - assertThat(localInfo.getFailures(), not(empty())); - assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); - assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field")); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100"); + request.allowPartialResults(true); + // have to run one shard at a time to avoid failing all shards + QueryPragmas pragma = new QueryPragmas( + Settings.builder().put(randomPragmas().getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build() + ); + request.pragmas(pragma); + request.acceptedPragmaRisks(true); + try (EsqlQueryResponse resp = run(request)) { + assertTrue(resp.isPartial()); + List> rows = EsqlTestUtils.getValuesList(resp); + assertThat(rows.size(), equalTo(okIds.size())); + Set actualIds = new HashSet<>(); + for (List row : rows) { + assertThat(row.size(), equalTo(2)); + String id = (String) row.getFirst(); + assertThat(id, in(okIds)); + assertTrue(actualIds.add(id)); } + EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY); + assertThat(localInfo.getFailures(), not(empty())); + assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL)); + assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field")); } } @@ -147,6 +141,15 @@ public void testDefaultPartialResults() throws Exception { EsqlQueryRequest request = new EsqlQueryRequest(); request.query("FROM fail,ok | LIMIT 100"); request.pragmas(randomPragmas()); + // have to run one shard at a time to avoid failing all shards + QueryPragmas pragma = new QueryPragmas( + Settings.builder() + .put(randomPragmas().getSettings()) + .put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1) + .build() + ); + request.pragmas(pragma); + request.acceptedPragmaRisks(true); if (randomBoolean()) { request.allowPartialResults(true); } @@ -154,6 +157,7 @@ public void testDefaultPartialResults() throws Exception { assertTrue(resp.isPartial()); List> rows = EsqlTestUtils.getValuesList(resp); assertThat(rows.size(), lessThanOrEqualTo(okIds.size())); + assertThat(rows.size(), greaterThan(0)); } } // allow_partial_results = false diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 4c13eb384924e..973455f435fab 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.plugin; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; @@ -375,9 +376,10 @@ public void executePlan( var computeListener = new ComputeListener( transportService.getThreadPool(), cancelQueryOnFailure, - listener.map(completionInfo -> { + listener.delegateFailureAndWrap((l, completionInfo) -> { + failIfAllShardsFailed(execInfo, collectedPages); execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements - return new Result(outputAttributes, collectedPages, completionInfo, execInfo); + l.onResponse(new Result(outputAttributes, collectedPages, completionInfo, execInfo)); }) ) ) { @@ -540,6 +542,47 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn } } + /** + * If all of target shards excluding the skipped shards failed from the local or remote clusters, then we should fail the entire query + * regardless of the partial_results configuration or skip_unavailable setting. This behavior doesn't fully align with the search API, + * which doesn't consider the failures from the remote clusters when skip_unavailable is true. + */ + static void failIfAllShardsFailed(EsqlExecutionInfo execInfo, List finalResults) { + // do not fail if any final result has results + if (finalResults.stream().anyMatch(p -> p.getPositionCount() > 0)) { + return; + } + int totalFailedShards = 0; + for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) { + final Integer successfulShards = cluster.getSuccessfulShards(); + if (successfulShards != null && successfulShards > 0) { + return; + } + if (cluster.getFailedShards() != null) { + totalFailedShards += cluster.getFailedShards(); + } + } + if (totalFailedShards == 0) { + return; + } + final var failureCollector = new FailureCollector(); + for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) { + var failedShards = cluster.getFailedShards(); + if (failedShards != null && failedShards > 0) { + assert cluster.getFailures().isEmpty() == false : "expected failures for cluster [" + cluster.getClusterAlias() + "]"; + for (ShardSearchFailure failure : cluster.getFailures()) { + if (failure.getCause() instanceof Exception e) { + failureCollector.unwrapAndCollect(e); + } else { + assert false : "unexpected failure: " + new AssertionError(failure.getCause()); + failureCollector.unwrapAndCollect(failure); + } + } + } + } + ExceptionsHelper.reThrowIfNotNull(failureCollector.getFailure()); + } + void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener listener) { listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts())); List contexts = new ArrayList<>(context.searchContexts().size());