Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/121942.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121942
summary: Allow partial results in ES|QL
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_008_0_00);
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR = def(9_009_0_00);
public static final TransportVersion SLM_UNHEALTHY_IF_NO_SNAPSHOT_WITHIN = def(9_010_0_00);
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_011_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static Configuration randomConfiguration(String query, Map<String, Map<St
query,
profile,
tables,
System.nanoTime()
System.nanoTime(),
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ public static Configuration configuration(QueryPragmas pragmas, String query) {
query,
false,
TABLES,
System.nanoTime()
System.nanoTime(),
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,30 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu

@Override
protected EsqlQueryResponse run(EsqlQueryRequest request) {
if (randomBoolean()) {
request.allowPartialResults(randomBoolean());
}
Exception failure = null;
try {
return runWithBreaking(request);
} catch (Exception e) {
try (EsqlQueryResponse resp = super.run(request)) {
assertThat(e, instanceOf(CircuitBreakingException.class));
assertThat(ExceptionsHelper.status(e), equalTo(RestStatus.TOO_MANY_REQUESTS));
resp.incRef();
final EsqlQueryResponse resp = runWithBreaking(request);
if (resp.isPartial() == false) {
return resp;
}
try (resp) {
assertTrue(request.allowPartialResults());
}
} catch (Exception e) {
failure = e;
}
// Re-run if the previous query failed or returned partial results
// Only check the previous failure if the second query succeeded
try (EsqlQueryResponse resp = super.run(request)) {
if (failure != null) {
assertThat(failure, instanceOf(CircuitBreakingException.class));
assertThat(ExceptionsHelper.status(failure), equalTo(RestStatus.TOO_MANY_REQUESTS));
}
resp.incRef();
return resp;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,38 +83,58 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
logger.info("--> start disruption scheme [{}]", disruptionScheme);
disruptionScheme.startDisrupting();
logger.info("--> executing esql query with disruption {} ", request.query());
if (randomBoolean()) {
request.allowPartialResults(randomBoolean());
}
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
EsqlQueryResponse resp = null;
try {
return future.actionGet(2, TimeUnit.MINUTES);
resp = future.actionGet(2, TimeUnit.MINUTES);
if (resp.isPartial() == false) {
return resp;
}
} catch (Exception ignored) {

} finally {
clearDisruption();
}
try {
return future.actionGet(2, TimeUnit.MINUTES);
} catch (Exception e) {
logger.info(
"running tasks: {}",
client().admin()
.cluster()
.prepareListTasks()
.get()
.getTasks()
.stream()
.filter(
// Skip the tasks we that'd get in the way while debugging
t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
&& false == t.action().contains(HealthNode.TASK_NAME)
)
.toList()
);
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
ensureBlocksReleased();
logger.info("--> failed to execute esql query with disruption; retrying...", e);
EsqlTestUtils.assertEsqlFailure(e);
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
// wait for the response after clear disruption
if (resp == null) {
try {
resp = future.actionGet(2, TimeUnit.MINUTES);
} catch (Exception e) {
logger.info(
"running tasks: {}",
client().admin()
.cluster()
.prepareListTasks()
.get()
.getTasks()
.stream()
.filter(
// Skip the tasks we that'd get in the way while debugging
t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
&& false == t.action().contains(HealthNode.TASK_NAME)
)
.toList()
);
assertTrue("request must be failed or completed after clearing disruption", future.isDone());
ensureBlocksReleased();
logger.info("--> failed to execute esql query with disruption; retrying...", e);
EsqlTestUtils.assertEsqlFailure(e);
}
}
// use the response if it's not partial
if (resp != null) {
if (resp.isPartial() == false) {
return resp;
}
try (var ignored = resp) {
assertTrue(request.allowPartialResults());
}
}
// re-run the query
return super.run(request);
}

private ServiceDisruptionScheme addRandomDisruptionScheme() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
import org.elasticsearch.test.FailingFieldPlugin;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

/**
* Make sure the failures on the data node come back as failures over the wire.
Expand All @@ -48,10 +52,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return settings;
}

/**
* Use a runtime field that fails when loading field values to fail the entire query.
*/
public void testFailureLoadingFields() throws IOException {
public Set<String> populateIndices() throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
{
Expand All @@ -63,17 +64,62 @@ public void testFailureLoadingFields() throws IOException {
mapping.endObject();
}
mapping.endObject();
client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get();

int docCount = 50;
List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
for (int d = 0; d < docCount; d++) {
docs.add(client().prepareIndex("ok").setSource("foo", d));
client().admin().indices().prepareCreate("fail").setMapping(mapping.endObject()).get();
int okCount = between(1, 50);
Set<String> okIds = new HashSet<>();
List<IndexRequestBuilder> docs = new ArrayList<>(okCount);
for (int d = 0; d < okCount; d++) {
String id = "ok-" + d;
okIds.add(id);
docs.add(client().prepareIndex("ok").setId(id).setSource("foo", d));
}
int failCount = between(1, 50);
for (int d = 0; d < failCount; d++) {
docs.add(client().prepareIndex("fail").setId("fail-" + d).setSource("foo", d));
}
docs.add(client().prepareIndex("fail").setSource("foo", 0));
indexRandom(true, docs);
return okIds;
}

/**
* Use a runtime field that fails when loading field values to fail the entire query.
*/
public void testFailureLoadingFields() throws Exception {
populateIndices();
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close());
assertThat(e.getMessage(), equalTo("Accessing failing field"));
}

public void testPartialResults() throws Exception {
Set<String> okIds = populateIndices();
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.allowPartialResults(true);
request.pragmas(randomPragmas());
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
}
}
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
request.allowPartialResults(true);
request.pragmas(randomPragmas());
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
Set<String> actualIds = new HashSet<>();
for (List<Object> row : rows) {
assertThat(row.size(), equalTo(2));
String id = (String) row.getFirst();
assertThat(id, in(okIds));
assertTrue(actualIds.add(id));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,12 @@ public enum Cap {
* Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
* were refactored.
*/
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG);
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG),

/**
* Support partial_results
*/
SUPPORT_PARTIAL_RESULTS;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,13 @@ public Map<String, Cluster> getClusters() {
* @return the new Cluster object
*/
public Cluster swapCluster(String clusterAlias, BiFunction<String, Cluster, Cluster> remappingFunction) {
return clusterInfo.compute(clusterAlias, remappingFunction);
return clusterInfo.compute(clusterAlias, (unused, oldCluster) -> {
final Cluster newCluster = remappingFunction.apply(clusterAlias, oldCluster);
if (newCluster != null && isPartial == false) {
isPartial = newCluster.isPartial();
}
return newCluster;
});
}

@Override
Expand Down Expand Up @@ -305,13 +311,6 @@ public boolean isPartial() {
return isPartial;
}

/**
* Mark the query as having partial results.
*/
public void markAsPartial() {
isPartial = true;
}

public void markAsStopped() {
isStopped = true;
}
Expand All @@ -320,13 +319,6 @@ public boolean isStopped() {
return isStopped;
}

/**
* Mark this cluster as having partial results.
*/
public void markClusterAsPartial(String clusterAlias) {
swapCluster(clusterAlias, (k, v) -> new Cluster.Builder(v).setStatus(Cluster.Status.PARTIAL).build());
}

/**
* Represents the search metadata about a particular cluster involved in a cross-cluster search.
* The Cluster object can represent either the local cluster or a remote cluster.
Expand Down Expand Up @@ -618,6 +610,10 @@ public List<ShardSearchFailure> getFailures() {
return failures;
}

boolean isPartial() {
return status == Status.PARTIAL || status == Status.SKIPPED || (failedShards != null && failedShards > 0);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
private boolean keepOnCompletion;
private boolean onSnapshotBuild = Build.current().isSnapshot();
private boolean acceptedPragmaRisks = false;
private boolean allowPartialResults = false;

/**
* "Tables" provided in the request for use with things like {@code LOOKUP}.
Expand Down Expand Up @@ -231,6 +232,14 @@ public Map<String, Map<String, Column>> tables() {
return tables;
}

public boolean allowPartialResults() {
return allowPartialResults;
}

public void allowPartialResults(boolean allowPartialResults) {
this.allowPartialResults = allowPartialResults;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// Pass the query as the description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ public boolean isAsync() {
return isRunning;
}

public boolean isPartial() {
return executionInfo != null && executionInfo.isPartial();
}

public EsqlExecutionInfo getExecutionInfo() {
return executionInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ String fields() {
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout");
static final ParseField KEEP_ALIVE = new ParseField("keep_alive");
static final ParseField KEEP_ON_COMPLETION = new ParseField("keep_on_completion");
static final ParseField ALLOW_PARTIAL_RESULTS = new ParseField("allow_partial_results");

private static final ObjectParser<EsqlQueryRequest, Void> SYNC_PARSER = objectParserSync(EsqlQueryRequest::syncEsqlQueryRequest);
private static final ObjectParser<EsqlQueryRequest, Void> ASYNC_PARSER = objectParserAsync(EsqlQueryRequest::asyncEsqlQueryRequest);
Expand Down Expand Up @@ -114,6 +115,7 @@ private static void objectParserCommon(ObjectParser<EsqlQueryRequest, ?> parser)
parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD);
parser.declareBoolean(EsqlQueryRequest::profile, PROFILE_FIELD);
parser.declareField((p, r, c) -> new ParseTables(r, p).parseTables(), TABLES_FIELD, ObjectParser.ValueType.OBJECT);
parser.declareBoolean(EsqlQueryRequest::allowPartialResults, ALLOW_PARTIAL_RESULTS);
}

private static ObjectParser<EsqlQueryRequest, Void> objectParserSync(Supplier<EsqlQueryRequest> supplier) {
Expand Down
Loading