Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class EsqlPartialResultsIT extends ESRestTestCase {
@ClassRule
Expand Down Expand Up @@ -106,7 +106,11 @@ public void testPartialResult() throws Exception {
Set<String> okIds = populateIndices();
String query = """
{
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v"
"query": "FROM ok-index,failing-index | LIMIT 100 | KEEP fail_me,v",
"pragma": {
"max_concurrent_shards_per_node": 1
},
"accept_pragma_risks": true
}
""";
// allow_partial_results = true
Expand All @@ -123,19 +127,18 @@ public void testPartialResult() throws Exception {
List<?> columns = (List<?>) results.get("columns");
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
List<?> values = (List<?>) results.get("values");
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
assertThat(values.size(), equalTo(okIds.size()));
Map<String, Object> localInfo = (Map<String, Object>) XContentMapValues.extractValue(
results,
"_clusters",
"details",
"(local)"
);
assertNotNull(localInfo);
assertThat(XContentMapValues.extractValue(localInfo, "_shards", "successful"), equalTo(0));
assertThat(
XContentMapValues.extractValue(localInfo, "_shards", "failed"),
equalTo(XContentMapValues.extractValue(localInfo, "_shards", "total"))
);
Integer successfulShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "successful");
Integer failedShards = (Integer) XContentMapValues.extractValue(localInfo, "_shards", "failed");
assertThat(successfulShards, greaterThan(0));
assertThat(failedShards, greaterThan(0));
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(localInfo, "failures");
assertThat(failures, hasSize(1));
assertThat(
Expand Down Expand Up @@ -167,7 +170,11 @@ public void testFailureFromRemote() throws Exception {
Set<String> okIds = populateIndices();
String query = """
{
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v"
"query": "FROM *:ok-index,*:failing-index | LIMIT 100 | KEEP fail_me,v",
"pragma": {
"max_concurrent_shards_per_node": 1
},
"accept_pragma_risks": true
}
""";
// allow_partial_results = true
Expand All @@ -183,19 +190,18 @@ public void testFailureFromRemote() throws Exception {
List<?> columns = (List<?>) results.get("columns");
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
List<?> values = (List<?>) results.get("values");
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
assertThat(values.size(), equalTo(okIds.size()));
Map<String, Object> remoteCluster = (Map<String, Object>) XContentMapValues.extractValue(
results,
"_clusters",
"details",
"cluster_one"
);
assertNotNull(remoteCluster);
assertThat(XContentMapValues.extractValue(remoteCluster, "_shards", "successful"), equalTo(0));
assertThat(
XContentMapValues.extractValue(remoteCluster, "_shards", "failed"),
equalTo(XContentMapValues.extractValue(remoteCluster, "_shards", "total"))
);
Integer successfulShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "successful");
Integer failedShards = (Integer) XContentMapValues.extractValue(remoteCluster, "_shards", "failed");
assertThat(successfulShards, greaterThan(0));
assertThat(failedShards, greaterThan(0));
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(remoteCluster, "failures");
assertThat(failures, hasSize(1));
assertThat(
Expand All @@ -207,6 +213,25 @@ public void testFailureFromRemote() throws Exception {
}
}

public void testAllShardsFailed() throws Exception {
setupRemoteClusters();
populateIndices();
try {
for (boolean allowPartialResults : List.of(Boolean.TRUE, Boolean.FALSE)) {
for (String index : List.of("failing*", "*:failing*", "*:failing*,failing*")) {
Request request = new Request("POST", "/_query");
request.setJsonEntity("{\"query\": \"FROM " + index + " | LIMIT 100\"}");
request.addParameter("allow_partial_results", Boolean.toString(allowPartialResults));
var error = expectThrows(ResponseException.class, () -> client().performRequest(request));
Response resp = error.getResponse();
assertThat(EntityUtils.toString(resp.getEntity()), containsString("Accessing failing field"));
}
}
} finally {
removeRemoteCluster();
}
}

private void setupRemoteClusters() throws IOException {
String settings = String.format(Locale.ROOT, """
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -30,6 +31,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -98,38 +100,30 @@ public void testFailureLoadingFields() throws Exception {

public void testPartialResults() throws Exception {
Set<String> okIds = populateIndices();
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.allowPartialResults(true);
request.pragmas(randomPragmas());
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
}
}
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
request.allowPartialResults(true);
request.pragmas(randomPragmas());
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
Set<String> actualIds = new HashSet<>();
for (List<Object> row : rows) {
assertThat(row.size(), equalTo(2));
String id = (String) row.getFirst();
assertThat(id, in(okIds));
assertTrue(actualIds.add(id));
}
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
assertThat(localInfo.getFailures(), not(empty()));
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
request.allowPartialResults(true);
// have to run one shard at a time to avoid failing all shards
QueryPragmas pragma = new QueryPragmas(
Settings.builder().put(randomPragmas().getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()
);
request.pragmas(pragma);
request.acceptedPragmaRisks(true);
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), equalTo(okIds.size()));
Set<String> actualIds = new HashSet<>();
for (List<Object> row : rows) {
assertThat(row.size(), equalTo(2));
String id = (String) row.getFirst();
assertThat(id, in(okIds));
assertTrue(actualIds.add(id));
}
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
assertThat(localInfo.getFailures(), not(empty()));
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
}
}

Expand All @@ -147,13 +141,23 @@ public void testDefaultPartialResults() throws Exception {
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.pragmas(randomPragmas());
// have to run one shard at a time to avoid failing all shards
QueryPragmas pragma = new QueryPragmas(
Settings.builder()
.put(randomPragmas().getSettings())
.put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1)
.build()
);
request.pragmas(pragma);
request.acceptedPragmaRisks(true);
if (randomBoolean()) {
request.allowPartialResults(true);
}
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
assertThat(rows.size(), greaterThan(0));
}
}
// allow_partial_results = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -375,9 +376,10 @@ public void executePlan(
var computeListener = new ComputeListener(
transportService.getThreadPool(),
cancelQueryOnFailure,
listener.map(completionInfo -> {
listener.delegateFailureAndWrap((l, completionInfo) -> {
failIfAllShardsFailed(execInfo, collectedPages);
execInfo.markEndQuery(); // TODO: revisit this time recording model as part of INLINESTATS improvements
return new Result(outputAttributes, collectedPages, completionInfo, execInfo);
l.onResponse(new Result(outputAttributes, collectedPages, completionInfo, execInfo));
})
)
) {
Expand Down Expand Up @@ -540,6 +542,47 @@ private static void updateExecutionInfoAfterCoordinatorOnlyQuery(EsqlExecutionIn
}
}

/**
* If all of target shards excluding the skipped shards failed from the local or remote clusters, then we should fail the entire query
* regardless of the partial_results configuration or skip_unavailable setting. This behavior doesn't fully align with the search API,
* which doesn't consider the failures from the remote clusters when skip_unavailable is true.
*/
static void failIfAllShardsFailed(EsqlExecutionInfo execInfo, List<Page> finalResults) {
// do not fail if any final result has results
if (finalResults.stream().anyMatch(p -> p.getPositionCount() > 0)) {
return;
}
int totalFailedShards = 0;
for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) {
final Integer successfulShards = cluster.getSuccessfulShards();
if (successfulShards != null && successfulShards > 0) {
return;
}
if (cluster.getFailedShards() != null) {
totalFailedShards += cluster.getFailedShards();
}
}
if (totalFailedShards == 0) {
return;
}
final var failureCollector = new FailureCollector();
for (EsqlExecutionInfo.Cluster cluster : execInfo.clusterInfo.values()) {
var failedShards = cluster.getFailedShards();
if (failedShards != null && failedShards > 0) {
assert cluster.getFailures().isEmpty() == false : "expected failures for cluster [" + cluster.getClusterAlias() + "]";
for (ShardSearchFailure failure : cluster.getFailures()) {
if (failure.getCause() instanceof Exception e) {
failureCollector.unwrapAndCollect(e);
} else {
assert false : "unexpected failure: " + new AssertionError(failure.getCause());
failureCollector.unwrapAndCollect(failure);
}
}
}
}
ExceptionsHelper.reThrowIfNotNull(failureCollector.getFailure());
}

void runCompute(CancellableTask task, ComputeContext context, PhysicalPlan plan, ActionListener<DriverCompletionInfo> listener) {
listener = ActionListener.runBefore(listener, () -> Releasables.close(context.searchContexts()));
List<EsPhysicalOperationProviders.ShardContext> contexts = new ArrayList<>(context.searchContexts().size());
Expand Down