Skip to content

Commit d960d0f

Browse files
authored
Fail request when all target shards fail in runtime (#131177) (#131337)
If all target shards, excluding skipped shards, fail, we should fail the entire query regardless of the partial_results configuration or skip_unavailable setting. This behavior does not fully align with the search API, where skip_unavailable ignores all failures from remote clusters and only fails the request when all shards in the local cluster fail. However, we believe the proposed behavior is more sensible than the existing behavior in the search API. Closes #128994
1 parent 3834323 commit d960d0f

File tree

3 files changed

+120
-48
lines changed

3 files changed

+120
-48
lines changed

test/external-modules/error-query/src/javaRestTest/java/org/elasticsearch/test/esql/EsqlPartialResultsIT.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828

2929
import static org.hamcrest.Matchers.containsString;
3030
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.greaterThan;
3132
import static org.hamcrest.Matchers.hasSize;
32-
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3333

3434
public class EsqlPartialResultsIT extends ESRestTestCase {
3535
@ClassRule
@@ -106,7 +106,11 @@ public void testPartialResult() throws Exception {
106106
Set<String> okIds = populateIndices();
107107
String query = """
108108
{
109-
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v"
109+
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v",
110+
"pragma": {
111+
"max_concurrent_shards_per_node": 1
112+
},
113+
"accept_pragma_risks": true
110114
}
111115
""";
112116
// allow_partial_results = true
@@ -123,19 +127,18 @@ public void testPartialResult() throws Exception {
123127
List<?> columns = (List<?>) results.get("columns");
124128
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
125129
List<?> values = (List<?>) results.get("values");
126-
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
130+
assertThat(values.size(), equalTo(okIds.size()));
127131
Map<String, Object> localInfo = (Map<String, Object>) XContentMapValues.extractValue(
128132
results,
129133
"_clusters",
130134
"details",
131135
"(local)"
132136
);
133137
assertNotNull(localInfo);
134-
assertThat(XContentMapValues.extractValue(localInfo, "_shards", "successful"), equalTo(0));
135-
assertThat(
136-
XContentMapValues.extractValue(localInfo, "_shards", "failed"),
137-
equalTo(XContentMapValues.extractValue(localInfo, "_shards", "total"))
138-
);
138+
Integer successfulShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "successful");
139+
Integer failedShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "failed");
140+
assertThat(successfulShards, greaterThan(0));
141+
assertThat(failedShards, greaterThan(0));
139142
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(localInfo, "failures");
140143
assertThat(failures, hasSize(1));
141144
assertThat(
@@ -167,7 +170,11 @@ public void testFailureFromRemote() throws Exception {
167170
Set<String> okIds = populateIndices();
168171
String query = """
169172
{
170-
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v"
173+
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v",
174+
"pragma": {
175+
"max_concurrent_shards_per_node": 1
176+
},
177+
"accept_pragma_risks": true
171178
}
172179
""";
173180
// allow_partial_results = true
@@ -183,19 +190,18 @@ public void testFailureFromRemote() throws Exception {
183190
List<?> columns = (List<?>) results.get("columns");
184191
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
185192
List<?> values = (List<?>) results.get("values");
186-
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
193+
assertThat(values.size(), equalTo(okIds.size()));
187194
Map<String, Object> remoteCluster = (Map<String, Object>) XContentMapValues.extractValue(
188195
results,
189196
"_clusters",
190197
"details",
191198
"cluster_one"
192199
);
193200
assertNotNull(remoteCluster);
194-
assertThat(XContentMapValues.extractValue(remoteCluster, "_shards", "successful"), equalTo(0));
195-
assertThat(
196-
XContentMapValues.extractValue(remoteCluster, "_shards", "failed"),
197-
equalTo(XContentMapValues.extractValue(remoteCluster, "_shards", "total"))
198-
);
201+
Integer successfulShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "successful");
202+
Integer failedShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "failed");
203+
assertThat(successfulShards, greaterThan(0));
204+
assertThat(failedShards, greaterThan(0));
199205
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(remoteCluster, "failures");
200206
assertThat(failures, hasSize(1));
201207
assertThat(
@@ -207,6 +213,25 @@ public void testFailureFromRemote() throws Exception {
207213
}
208214
}
209215

216+
public void testAllShardsFailed() throws Exception {
217+
setupRemoteClusters();
218+
populateIndices();
219+
try {
220+
for (boolean allowPartialResults : List.of(Boolean.TRUE, Boolean.FALSE)) {
221+
for (String index : List.of("failing*", "*:failing*", "*:failing*,failing*")) {
222+
Request request = new Request("POST", "/_query");
223+
request.setJsonEntity("{\"query\": \"FROM " + index + " | LIMIT 100\"}");
224+
request.addParameter("allow_partial_results", Boolean.toString(allowPartialResults));
225+
var error = expectThrows(ResponseException.class, () -> client().performRequest(request));
226+
Response resp = error.getResponse();
227+
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
228+
}
229+
}
230+
} finally {
231+
removeRemoteCluster();
232+
}
233+
}
234+
210235
private void setupRemoteClusters() throws IOException {
211236
String settings = String.format(Locale.ROOT, """
212237
{

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.xcontent.json.JsonXContent;
2020
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2121
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
22+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
2223

2324
import java.util.ArrayList;
2425
import java.util.Collection;
@@ -30,6 +31,7 @@
3031
import static org.hamcrest.Matchers.containsString;
3132
import static org.hamcrest.Matchers.empty;
3233
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.greaterThan;
3335
import static org.hamcrest.Matchers.in;
3436
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3537
import static org.hamcrest.Matchers.not;
@@ -98,38 +100,30 @@ public void testFailureLoadingFields() throws Exception {
98100

99101
public void testPartialResults() throws Exception {
100102
Set<String> okIds = populateIndices();
101-
{
102-
EsqlQueryRequest request = new EsqlQueryRequest();
103-
request.query("FROM fail,ok | LIMIT 100");
104-
request.allowPartialResults(true);
105-
request.pragmas(randomPragmas());
106-
try (EsqlQueryResponse resp = run(request)) {
107-
assertTrue(resp.isPartial());
108-
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
109-
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
110-
}
111-
}
112-
{
113-
EsqlQueryRequest request = new EsqlQueryRequest();
114-
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
115-
request.allowPartialResults(true);
116-
request.pragmas(randomPragmas());
117-
try (EsqlQueryResponse resp = run(request)) {
118-
assertTrue(resp.isPartial());
119-
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
120-
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
121-
Set<String> actualIds = new HashSet<>();
122-
for (List<Object> row : rows) {
123-
assertThat(row.size(), equalTo(2));
124-
String id = (String) row.getFirst();
125-
assertThat(id, in(okIds));
126-
assertTrue(actualIds.add(id));
127-
}
128-
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
129-
assertThat(localInfo.getFailures(), not(empty()));
130-
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
131-
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
103+
EsqlQueryRequest request = new EsqlQueryRequest();
104+
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
105+
request.allowPartialResults(true);
106+
// have to run one shard at a time to avoid failing all shards
107+
QueryPragmas pragma = new QueryPragmas(
108+
Settings.builder().put(randomPragmas().getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()
109+
);
110+
request.pragmas(pragma);
111+
request.acceptedPragmaRisks(true);
112+
try (EsqlQueryResponse resp = run(request)) {
113+
assertTrue(resp.isPartial());
114+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
115+
assertThat(rows.size(), equalTo(okIds.size()));
116+
Set<String> actualIds = new HashSet<>();
117+
for (List<Object> row : rows) {
118+
assertThat(row.size(), equalTo(2));
119+
String id = (String) row.getFirst();
120+
assertThat(id, in(okIds));
121+
assertTrue(actualIds.add(id));
132122
}
123+
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
124+
assertThat(localInfo.getFailures(), not(empty()));
125+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
126+
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
133127
}
134128
}
135129

@@ -147,13 +141,23 @@ public void testDefaultPartialResults() throws Exception {
147141
EsqlQueryRequest request = new EsqlQueryRequest();
148142
request.query("FROM fail,ok | LIMIT 100");
149143
request.pragmas(randomPragmas());
144+
// have to run one shard at a time to avoid failing all shards
145+
QueryPragmas pragma = new QueryPragmas(
146+
Settings.builder()
147+
.put(randomPragmas().getSettings())
148+
.put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1)
149+
.build()
150+
);
151+
request.pragmas(pragma);
152+
request.acceptedPragmaRisks(true);
150153
if (randomBoolean()) {
151154
request.allowPartialResults(true);
152155
}
153156
try (EsqlQueryResponse resp = run(request)) {
154157
assertTrue(resp.isPartial());
155158
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
156159
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
160+
assertThat(rows.size(), greaterThan(0));
157161
}
158162
}
159163
// allow_partial_results = false

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10+
import org.elasticsearch.ExceptionsHelper;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.OriginalIndices;
1213
import org.elasticsearch.action.search.SearchRequest;
@@ -375,9 +376,10 @@ public void executePlan(
375376
var computeListener = new ComputeListener(
376377
transportService.getThreadPool(),
377378
cancelQueryOnFailure,
378-
listener.map(completionInfo -> {
379+
listener.delegateFailureAndWrap((l, completionInfo) -> {
380+
failIfAllShardsFailed(execInfo, collectedPages);
379381
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
380-
return new Result(outputAttributes, collectedPages, completionInfo, execInfo);
382+
l.onResponse(new Result(outputAttributes, collectedPages, completionInfo, execInfo));
381383
})
382384
)
383385
) {
@@ -540,6 +542,47 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn
540542
}
541543
}
542544

545+
/**
546+
* If all of target shards excluding the skipped shards failed from the local or remote clusters, then we should fail the entire query
547+
* regardless of the partial_results configuration or skip_unavailable setting. This behavior doesn't fully align with the search API,
548+
* which doesn't consider the failures from the remote clusters when skip_unavailable is true.
549+
*/
550+
static void failIfAllShardsFailed(EsqlExecutionInfo execInfo, List<Page> finalResults) {
551+
// do not fail if any final result has results
552+
if (finalResults.stream().anyMatch(p -> p.getPositionCount() > 0)) {
553+
return;
554+
}
555+
int totalFailedShards = 0;
556+
for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) {
557+
final Integer successfulShards = cluster.getSuccessfulShards();
558+
if (successfulShards != null && successfulShards > 0) {
559+
return;
560+
}
561+
if (cluster.getFailedShards() != null) {
562+
totalFailedShards += cluster.getFailedShards();
563+
}
564+
}
565+
if (totalFailedShards == 0) {
566+
return;
567+
}
568+
final var failureCollector = new FailureCollector();
569+
for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) {
570+
var failedShards = cluster.getFailedShards();
571+
if (failedShards != null && failedShards > 0) {
572+
assert cluster.getFailures().isEmpty() == false : "expected failures for cluster [" + cluster.getClusterAlias() + "]";
573+
for (ShardSearchFailure failure : cluster.getFailures()) {
574+
if (failure.getCause() instanceof Exception e) {
575+
failureCollector.unwrapAndCollect(e);
576+
} else {
577+
assert false : "unexpected failure: " + new AssertionError(failure.getCause());
578+
failureCollector.unwrapAndCollect(failure);
579+
}
580+
}
581+
}
582+
}
583+
ExceptionsHelper.reThrowIfNotNull(failureCollector.getFailure());
584+
}
585+
543586
void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<DriverCompletionInfo> listener) {
544587
listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts()));
545588
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts().size());

0 commit comments

Comments
 (0)