Skip to content

Commit d217794

Browse files
authored
Fail request when all target shards fail in runtime (#131177) (#131339)
If all target shards, excluding skipped shards, fail, we should fail the entire query regardless of the partial_results configuration or skip_unavailable setting. This behavior does not fully align with the search API, where skip_unavailable ignores all failures from remote clusters and only fails the request when all shards in the local cluster fail. However, we believe the proposed behavior is more sensible than the existing behavior in the search API. Closes #128994 (cherry picked from commit 8f6f763) # Conflicts: # x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java
1 parent 4e74288 commit d217794

File tree

3 files changed

+120
-48
lines changed

3 files changed

+120
-48
lines changed

test/external-modules/error-query/src/javaRestTest/java/org/elasticsearch/test/esql/EsqlPartialResultsIT.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828

2929
import static org.hamcrest.Matchers.containsString;
3030
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.greaterThan;
3132
import static org.hamcrest.Matchers.hasSize;
32-
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3333

3434
public class EsqlPartialResultsIT extends ESRestTestCase {
3535
@ClassRule
@@ -106,7 +106,11 @@ public void testPartialResult() throws Exception {
106106
Set<String> okIds = populateIndices();
107107
String query = """
108108
{
109-
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v"
109+
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v",
110+
"pragma": {
111+
"max_concurrent_shards_per_node": 1
112+
},
113+
"accept_pragma_risks": true
110114
}
111115
""";
112116
// allow_partial_results = true
@@ -123,19 +127,18 @@ public void testPartialResult() throws Exception {
123127
List<?> columns = (List<?>) results.get("columns");
124128
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
125129
List<?> values = (List<?>) results.get("values");
126-
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
130+
assertThat(values.size(), equalTo(okIds.size()));
127131
Map<String, Object> localInfo = (Map<String, Object>) XContentMapValues.extractValue(
128132
results,
129133
"_clusters",
130134
"details",
131135
"(local)"
132136
);
133137
assertNotNull(localInfo);
134-
assertThat(XContentMapValues.extractValue(localInfo, "_shards", "successful"), equalTo(0));
135-
assertThat(
136-
XContentMapValues.extractValue(localInfo, "_shards", "failed"),
137-
equalTo(XContentMapValues.extractValue(localInfo, "_shards", "total"))
138-
);
138+
Integer successfulShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "successful");
139+
Integer failedShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "failed");
140+
assertThat(successfulShards, greaterThan(0));
141+
assertThat(failedShards, greaterThan(0));
139142
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(localInfo, "failures");
140143
assertThat(failures, hasSize(1));
141144
assertThat(
@@ -167,7 +170,11 @@ public void testFailureFromRemote() throws Exception {
167170
Set<String> okIds = populateIndices();
168171
String query = """
169172
{
170-
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v"
173+
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v",
174+
"pragma": {
175+
"max_concurrent_shards_per_node": 1
176+
},
177+
"accept_pragma_risks": true
171178
}
172179
""";
173180
// allow_partial_results = true
@@ -183,19 +190,18 @@ public void testFailureFromRemote() throws Exception {
183190
List<?> columns = (List<?>) results.get("columns");
184191
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
185192
List<?> values = (List<?>) results.get("values");
186-
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
193+
assertThat(values.size(), equalTo(okIds.size()));
187194
Map<String, Object> remoteCluster = (Map<String, Object>) XContentMapValues.extractValue(
188195
results,
189196
"_clusters",
190197
"details",
191198
"cluster_one"
192199
);
193200
assertNotNull(remoteCluster);
194-
assertThat(XContentMapValues.extractValue(remoteCluster, "_shards", "successful"), equalTo(0));
195-
assertThat(
196-
XContentMapValues.extractValue(remoteCluster, "_shards", "failed"),
197-
equalTo(XContentMapValues.extractValue(remoteCluster, "_shards", "total"))
198-
);
201+
Integer successfulShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "successful");
202+
Integer failedShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "failed");
203+
assertThat(successfulShards, greaterThan(0));
204+
assertThat(failedShards, greaterThan(0));
199205
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(remoteCluster, "failures");
200206
assertThat(failures, hasSize(1));
201207
assertThat(
@@ -207,6 +213,25 @@ public void testFailureFromRemote() throws Exception {
207213
}
208214
}
209215

216+
public void testAllShardsFailed() throws Exception {
217+
setupRemoteClusters();
218+
populateIndices();
219+
try {
220+
for (boolean allowPartialResults : List.of(Boolean.TRUE, Boolean.FALSE)) {
221+
for (String index : List.of("failing*", "*:failing*", "*:failing*,failing*")) {
222+
Request request = new Request("POST", "/_query");
223+
request.setJsonEntity("{\"query\": \"FROM " + index + " | LIMIT 100\"}");
224+
request.addParameter("allow_partial_results", Boolean.toString(allowPartialResults));
225+
var error = expectThrows(ResponseException.class, () -> client().performRequest(request));
226+
Response resp = error.getResponse();
227+
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
228+
}
229+
}
230+
} finally {
231+
removeRemoteCluster();
232+
}
233+
}
234+
210235
private void setupRemoteClusters() throws IOException {
211236
String settings = String.format(Locale.ROOT, """
212237
{

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.xcontent.json.JsonXContent;
2020
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2121
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
22+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
2223

2324
import java.util.ArrayList;
2425
import java.util.Collection;
@@ -30,6 +31,7 @@
3031
import static org.hamcrest.Matchers.containsString;
3132
import static org.hamcrest.Matchers.empty;
3233
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.greaterThan;
3335
import static org.hamcrest.Matchers.in;
3436
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3537
import static org.hamcrest.Matchers.not;
@@ -98,38 +100,30 @@ public void testFailureLoadingFields() throws Exception {
98100

99101
public void testPartialResults() throws Exception {
100102
Set<String> okIds = populateIndices();
101-
{
102-
EsqlQueryRequest request = new EsqlQueryRequest();
103-
request.query("FROM fail,ok | LIMIT 100");
104-
request.allowPartialResults(true);
105-
request.pragmas(randomPragmas());
106-
try (EsqlQueryResponse resp = run(request)) {
107-
assertTrue(resp.isPartial());
108-
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
109-
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
110-
}
111-
}
112-
{
113-
EsqlQueryRequest request = new EsqlQueryRequest();
114-
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
115-
request.allowPartialResults(true);
116-
request.pragmas(randomPragmas());
117-
try (EsqlQueryResponse resp = run(request)) {
118-
assertTrue(resp.isPartial());
119-
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
120-
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
121-
Set<String> actualIds = new HashSet<>();
122-
for (List<Object> row : rows) {
123-
assertThat(row.size(), equalTo(2));
124-
String id = (String) row.get(0);
125-
assertThat(id, in(okIds));
126-
assertTrue(actualIds.add(id));
127-
}
128-
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
129-
assertThat(localInfo.getFailures(), not(empty()));
130-
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
131-
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
103+
EsqlQueryRequest request = new EsqlQueryRequest();
104+
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
105+
request.allowPartialResults(true);
106+
// have to run one shard at a time to avoid failing all shards
107+
QueryPragmas pragma = new QueryPragmas(
108+
Settings.builder().put(randomPragmas().getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()
109+
);
110+
request.pragmas(pragma);
111+
request.acceptedPragmaRisks(true);
112+
try (EsqlQueryResponse resp = run(request)) {
113+
assertTrue(resp.isPartial());
114+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
115+
assertThat(rows.size(), equalTo(okIds.size()));
116+
Set<String> actualIds = new HashSet<>();
117+
for (List<Object> row : rows) {
118+
assertThat(row.size(), equalTo(2));
119+
String id = (String) row.get(0);
120+
assertThat(id, in(okIds));
121+
assertTrue(actualIds.add(id));
132122
}
123+
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
124+
assertThat(localInfo.getFailures(), not(empty()));
125+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
126+
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
133127
}
134128
}
135129

@@ -147,13 +141,23 @@ public void testDefaultPartialResults() throws Exception {
147141
EsqlQueryRequest request = new EsqlQueryRequest();
148142
request.query("FROM fail,ok | LIMIT 100");
149143
request.pragmas(randomPragmas());
144+
// have to run one shard at a time to avoid failing all shards
145+
QueryPragmas pragma = new QueryPragmas(
146+
Settings.builder()
147+
.put(randomPragmas().getSettings())
148+
.put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1)
149+
.build()
150+
);
151+
request.pragmas(pragma);
152+
request.acceptedPragmaRisks(true);
150153
if (randomBoolean()) {
151154
request.allowPartialResults(true);
152155
}
153156
try (EsqlQueryResponse resp = run(request)) {
154157
assertTrue(resp.isPartial());
155158
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
156159
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
160+
assertThat(rows.size(), greaterThan(0));
157161
}
158162
}
159163
// allow_partial_results = false

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10+
import org.elasticsearch.ExceptionsHelper;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.OriginalIndices;
1213
import org.elasticsearch.action.search.SearchRequest;
@@ -226,9 +227,10 @@ public void execute(
226227
var computeListener = new ComputeListener(
227228
transportService.getThreadPool(),
228229
cancelQueryOnFailure,
229-
listener.map(completionInfo -> {
230+
listener.delegateFailureAndWrap((l, completionInfo) -> {
231+
failIfAllShardsFailed(execInfo, collectedPages);
230232
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
231-
return new Result(outputAttributes, collectedPages, completionInfo, execInfo);
233+
l.onResponse(new Result(outputAttributes, collectedPages, completionInfo, execInfo));
232234
})
233235
)
234236
) {
@@ -391,6 +393,47 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn
391393
}
392394
}
393395

396+
/**
397+
* If all of target shards excluding the skipped shards failed from the local or remote clusters, then we should fail the entire query
398+
* regardless of the partial_results configuration or skip_unavailable setting. This behavior doesn't fully align with the search API,
399+
* which doesn't consider the failures from the remote clusters when skip_unavailable is true.
400+
*/
401+
static void failIfAllShardsFailed(EsqlExecutionInfo execInfo, List<Page> finalResults) {
402+
// do not fail if any final result has results
403+
if (finalResults.stream().anyMatch(p -> p.getPositionCount() > 0)) {
404+
return;
405+
}
406+
int totalFailedShards = 0;
407+
for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) {
408+
final Integer successfulShards = cluster.getSuccessfulShards();
409+
if (successfulShards != null && successfulShards > 0) {
410+
return;
411+
}
412+
if (cluster.getFailedShards() != null) {
413+
totalFailedShards += cluster.getFailedShards();
414+
}
415+
}
416+
if (totalFailedShards == 0) {
417+
return;
418+
}
419+
final var failureCollector = new FailureCollector();
420+
for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) {
421+
var failedShards = cluster.getFailedShards();
422+
if (failedShards != null && failedShards > 0) {
423+
assert cluster.getFailures().isEmpty() == false : "expected failures for cluster [" + cluster.getClusterAlias() + "]";
424+
for (ShardSearchFailure failure : cluster.getFailures()) {
425+
if (failure.getCause() instanceof Exception e) {
426+
failureCollector.unwrapAndCollect(e);
427+
} else {
428+
assert false : "unexpected failure: " + new AssertionError(failure.getCause());
429+
failureCollector.unwrapAndCollect(failure);
430+
}
431+
}
432+
}
433+
}
434+
ExceptionsHelper.reThrowIfNotNull(failureCollector.getFailure());
435+
}
436+
394437
void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<DriverCompletionInfo> listener) {
395438
listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts()));
396439
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts().size());

0 commit comments

Comments
 (0)