Skip to content

Commit 6c34c05

Browse files
committed
Fail request when all target shards fail in runtime
1 parent 30d3877 commit 6c34c05

File tree

3 files changed

+119
-48
lines changed

3 files changed

+119
-48
lines changed

test/external-modules/error-query/src/javaRestTest/java/org/elasticsearch/test/esql/EsqlPartialResultsIT.java

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828

2929
import static org.hamcrest.Matchers.containsString;
3030
import static org.hamcrest.Matchers.equalTo;
31+
import static org.hamcrest.Matchers.greaterThan;
3132
import static org.hamcrest.Matchers.hasSize;
32-
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3333

3434
public class EsqlPartialResultsIT extends ESRestTestCase {
3535
@ClassRule
@@ -106,7 +106,11 @@ public void testPartialResult() throws Exception {
106106
Set<String> okIds = populateIndices();
107107
String query = """
108108
{
109-
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v"
109+
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v",
110+
"pragma": {
111+
"max_concurrent_shards_per_node": 1
112+
},
113+
"accept_pragma_risks": true
110114
}
111115
""";
112116
// allow_partial_results = true
@@ -123,19 +127,18 @@ public void testPartialResult() throws Exception {
123127
List<?> columns = (List<?>) results.get("columns");
124128
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
125129
List<?> values = (List<?>) results.get("values");
126-
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
130+
assertThat(values.size(), equalTo(okIds.size()));
127131
Map<String, Object> localInfo = (Map<String, Object>) XContentMapValues.extractValue(
128132
results,
129133
"_clusters",
130134
"details",
131135
"(local)"
132136
);
133137
assertNotNull(localInfo);
134-
assertThat(XContentMapValues.extractValue(localInfo, "_shards", "successful"), equalTo(0));
135-
assertThat(
136-
XContentMapValues.extractValue(localInfo, "_shards", "failed"),
137-
equalTo(XContentMapValues.extractValue(localInfo, "_shards", "total"))
138-
);
138+
Integer successfulShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "successful");
139+
Integer failedShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "failed");
140+
assertThat(successfulShards, greaterThan(0));
141+
assertThat(failedShards, greaterThan(0));
139142
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(localInfo, "failures");
140143
assertThat(failures, hasSize(1));
141144
assertThat(
@@ -167,7 +170,11 @@ public void testFailureFromRemote() throws Exception {
167170
Set<String> okIds = populateIndices();
168171
String query = """
169172
{
170-
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v"
173+
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v",
174+
"pragma": {
175+
"max_concurrent_shards_per_node": 1
176+
},
177+
"accept_pragma_risks": true
171178
}
172179
""";
173180
// allow_partial_results = true
@@ -183,19 +190,18 @@ public void testFailureFromRemote() throws Exception {
183190
List<?> columns = (List<?>) results.get("columns");
184191
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
185192
List<?> values = (List<?>) results.get("values");
186-
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
193+
assertThat(values.size(), equalTo(okIds.size()));
187194
Map<String, Object> remoteCluster = (Map<String, Object>) XContentMapValues.extractValue(
188195
results,
189196
"_clusters",
190197
"details",
191198
"cluster_one"
192199
);
193200
assertNotNull(remoteCluster);
194-
assertThat(XContentMapValues.extractValue(remoteCluster, "_shards", "successful"), equalTo(0));
195-
assertThat(
196-
XContentMapValues.extractValue(remoteCluster, "_shards", "failed"),
197-
equalTo(XContentMapValues.extractValue(remoteCluster, "_shards", "total"))
198-
);
201+
Integer successfulShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "successful");
202+
Integer failedShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "failed");
203+
assertThat(successfulShards, greaterThan(0));
204+
assertThat(failedShards, greaterThan(0));
199205
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(remoteCluster, "failures");
200206
assertThat(failures, hasSize(1));
201207
assertThat(
@@ -207,6 +213,25 @@ public void testFailureFromRemote() throws Exception {
207213
}
208214
}
209215

216+
public void testAllShardsFailed() throws Exception {
217+
setupRemoteClusters();
218+
populateIndices();
219+
try {
220+
for (boolean allowPartialResults : List.of(Boolean.TRUE, Boolean.FALSE)) {
221+
for (String index : List.of("failing*", "*:failing*", "*:failing*,failing*")) {
222+
Request request = new Request("POST", "/_query");
223+
request.setJsonEntity("{\"query\": \"FROM " + index + " | LIMIT 100\"}");
224+
request.addParameter("allow_partial_results", Boolean.toString(allowPartialResults));
225+
var error = expectThrows(ResponseException.class, () -> client().performRequest(request));
226+
Response resp = error.getResponse();
227+
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
228+
}
229+
}
230+
} finally {
231+
removeRemoteCluster();
232+
}
233+
}
234+
210235
private void setupRemoteClusters() throws IOException {
211236
String settings = String.format(Locale.ROOT, """
212237
{

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.xcontent.json.JsonXContent;
2020
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2121
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
22+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
2223

2324
import java.util.ArrayList;
2425
import java.util.Collection;
@@ -30,6 +31,7 @@
3031
import static org.hamcrest.Matchers.containsString;
3132
import static org.hamcrest.Matchers.empty;
3233
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.greaterThan;
3335
import static org.hamcrest.Matchers.in;
3436
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3537
import static org.hamcrest.Matchers.not;
@@ -98,38 +100,30 @@ public void testFailureLoadingFields() throws Exception {
98100

99101
public void testPartialResults() throws Exception {
100102
Set<String> okIds = populateIndices();
101-
{
102-
EsqlQueryRequest request = new EsqlQueryRequest();
103-
request.query("FROM fail,ok | LIMIT 100");
104-
request.allowPartialResults(true);
105-
request.pragmas(randomPragmas());
106-
try (EsqlQueryResponse resp = run(request)) {
107-
assertTrue(resp.isPartial());
108-
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
109-
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
110-
}
111-
}
112-
{
113-
EsqlQueryRequest request = new EsqlQueryRequest();
114-
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
115-
request.allowPartialResults(true);
116-
request.pragmas(randomPragmas());
117-
try (EsqlQueryResponse resp = run(request)) {
118-
assertTrue(resp.isPartial());
119-
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
120-
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
121-
Set<String> actualIds = new HashSet<>();
122-
for (List<Object> row : rows) {
123-
assertThat(row.size(), equalTo(2));
124-
String id = (String) row.getFirst();
125-
assertThat(id, in(okIds));
126-
assertTrue(actualIds.add(id));
127-
}
128-
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
129-
assertThat(localInfo.getFailures(), not(empty()));
130-
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
131-
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
103+
EsqlQueryRequest request = new EsqlQueryRequest();
104+
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
105+
request.allowPartialResults(true);
106+
// have to run one shard at a time to avoid failing all shards
107+
QueryPragmas pragma = new QueryPragmas(
108+
Settings.builder().put(randomPragmas().getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()
109+
);
110+
request.pragmas(pragma);
111+
request.acceptedPragmaRisks(true);
112+
try (EsqlQueryResponse resp = run(request)) {
113+
assertTrue(resp.isPartial());
114+
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
115+
assertThat(rows.size(), equalTo(okIds.size()));
116+
Set<String> actualIds = new HashSet<>();
117+
for (List<Object> row : rows) {
118+
assertThat(row.size(), equalTo(2));
119+
String id = (String) row.getFirst();
120+
assertThat(id, in(okIds));
121+
assertTrue(actualIds.add(id));
132122
}
123+
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
124+
assertThat(localInfo.getFailures(), not(empty()));
125+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
126+
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
133127
}
134128
}
135129

@@ -147,13 +141,23 @@ public void testDefaultPartialResults() throws Exception {
147141
EsqlQueryRequest request = new EsqlQueryRequest();
148142
request.query("FROM fail,ok | LIMIT 100");
149143
request.pragmas(randomPragmas());
144+
// have to run one shard at a time to avoid failing all shards
145+
QueryPragmas pragma = new QueryPragmas(
146+
Settings.builder()
147+
.put(randomPragmas().getSettings())
148+
.put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1)
149+
.build()
150+
);
151+
request.pragmas(pragma);
152+
request.acceptedPragmaRisks(true);
150153
if (randomBoolean()) {
151154
request.allowPartialResults(true);
152155
}
153156
try (EsqlQueryResponse resp = run(request)) {
154157
assertTrue(resp.isPartial());
155158
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
156159
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
160+
assertThat(rows.size(), greaterThan(0));
157161
}
158162
}
159163
// allow_partial_results = false

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.plugin;
99

10+
import org.elasticsearch.ExceptionsHelper;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.OriginalIndices;
1213
import org.elasticsearch.action.search.SearchRequest;
@@ -375,9 +376,10 @@ public void executePlan(
375376
var computeListener = new ComputeListener(
376377
transportService.getThreadPool(),
377378
cancelQueryOnFailure,
378-
listener.map(completionInfo -> {
379+
listener.delegateFailureAndWrap((l, completionInfo) -> {
380+
failIfAllShardsFailed(execInfo, collectedPages);
379381
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
380-
return new Result(outputAttributes, collectedPages, completionInfo, execInfo);
382+
l.onResponse(new Result(outputAttributes, collectedPages, completionInfo, execInfo));
381383
})
382384
)
383385
) {
@@ -540,6 +542,46 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn
540542
}
541543
}
542544

545+
/**
546+
* If all of target shards excluding the skipped shards failed, then we should fail the entire query regardless of the partial_results
547+
* configuration or skip_unavailable setting. This behavior doesn't fully align with the search API as the skip_unavailable
548+
* would ignore all the failures from the remote clusters; hence, only fail the request when all shards in the local cluster failed.
549+
*/
550+
static void failIfAllShardsFailed(EsqlExecutionInfo execInfo, List<Page> finalResults) {
551+
// do not fail if any final result has results
552+
if (finalResults.stream().anyMatch(p -> p.getPositionCount() > 0)) {
553+
return;
554+
}
555+
int totalFailedShards = 0;
556+
for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) {
557+
final Integer successfulShards = cluster.getSuccessfulShards();
558+
if (successfulShards != null && successfulShards > 0) {
559+
return;
560+
}
561+
if (cluster.getFailedShards() != null) {
562+
totalFailedShards += cluster.getFailedShards();
563+
}
564+
}
565+
if (totalFailedShards == 0) {
566+
return;
567+
}
568+
final var failureCollector = new FailureCollector();
569+
for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) {
570+
var failedShards = cluster.getFailedShards();
571+
if (failedShards != null && failedShards > 0) {
572+
assert cluster.getFailures().isEmpty() == false : "expected failures for cluster [" + cluster.getClusterAlias() + "]";
573+
for (ShardSearchFailure failure : cluster.getFailures()) {
574+
if (failure.getCause() instanceof Exception e) {
575+
failureCollector.unwrapAndCollect(e);
576+
} else {
577+
failureCollector.unwrapAndCollect(failure);
578+
}
579+
}
580+
}
581+
}
582+
ExceptionsHelper.reThrowIfNotNull(failureCollector.getFailure());
583+
}
584+
543585
void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<DriverCompletionInfo> listener) {
544586
listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts()));
545587
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts().size());

0 commit comments

Comments
 (0)