Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/121942.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121942
summary: Allow partial results in ES|QL
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_008_0_00);
public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR = def(9_009_0_00);
public static final TransportVersion SLM_UNHEALTHY_IF_NO_SNAPSHOT_WITHIN = def(9_010_0_00);
public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_011_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static Configuration randomConfiguration(String query, Map<String, Map<St
query,
profile,
tables,
System.nanoTime()
System.nanoTime(),
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ public static Configuration configuration(QueryPragmas pragmas, String query) {
query,
false,
TABLES,
System.nanoTime()
System.nanoTime(),
false
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,30 @@ private EsqlQueryResponse runWithBreaking(EsqlQueryRequest request) throws Circu

@Override
protected EsqlQueryResponse run(EsqlQueryRequest request) {
if (randomBoolean()) {
request.allowPartialResults(randomBoolean());
}
Exception failure = null;
try {
return runWithBreaking(request);
} catch (Exception e) {
try (EsqlQueryResponse resp = super.run(request)) {
assertThat(e, instanceOf(CircuitBreakingException.class));
assertThat(ExceptionsHelper.status(e), equalTo(RestStatus.TOO_MANY_REQUESTS));
resp.incRef();
final EsqlQueryResponse resp = runWithBreaking(request);
if (resp.isPartial() == false) {
return resp;
}
try (resp) {
assertTrue(request.allowPartialResults());
}
} catch (Exception e) {
failure = e;
}
// Re-run if the previous query failed or returned partial results
// Only check the previous failure if the second query succeeded
try (EsqlQueryResponse resp = super.run(request)) {
if (failure != null) {
assertThat(failure, instanceOf(CircuitBreakingException.class));
assertThat(ExceptionsHelper.status(failure), equalTo(RestStatus.TOO_MANY_REQUESTS));
}
resp.incRef();
return resp;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,31 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
logger.info("--> start disruption scheme [{}]", disruptionScheme);
disruptionScheme.startDisrupting();
logger.info("--> executing esql query with disruption {} ", request.query());
if (randomBoolean()) {
request.allowPartialResults(randomBoolean());
}
ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
try {
return future.actionGet(2, TimeUnit.MINUTES);
var resp = future.actionGet(2, TimeUnit.MINUTES);
if (resp.isPartial() == false) {
return resp;
}
try (resp) {
assertTrue(request.allowPartialResults());
}
} catch (Exception ignored) {

} finally {
clearDisruption();
}
try {
return future.actionGet(2, TimeUnit.MINUTES);
var resp = future.actionGet(2, TimeUnit.MINUTES);
if (resp.isPartial() == false) {
return resp;
}
try (resp) {
assertTrue(request.allowPartialResults());
}
} catch (Exception e) {
logger.info(
"running tasks: {}",
Expand All @@ -113,8 +128,8 @@ private EsqlQueryResponse runQueryWithDisruption(EsqlQueryRequest request) {
ensureBlocksReleased();
logger.info("--> failed to execute esql query with disruption; retrying...", e);
EsqlTestUtils.assertEsqlFailure(e);
return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
}
return super.run(request);
}

private ServiceDisruptionScheme addRandomDisruptionScheme() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
import org.elasticsearch.test.FailingFieldPlugin;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

/**
* Make sure the failures on the data node come back as failures over the wire.
Expand All @@ -48,10 +52,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return settings;
}

/**
* Use a runtime field that fails when loading field values to fail the entire query.
*/
public void testFailureLoadingFields() throws IOException {
public Set<String> populateIndices() throws Exception {
XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
mapping.startObject("runtime");
{
Expand All @@ -63,17 +64,62 @@ public void testFailureLoadingFields() throws IOException {
mapping.endObject();
}
mapping.endObject();
client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get();

int docCount = 50;
List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
for (int d = 0; d < docCount; d++) {
docs.add(client().prepareIndex("ok").setSource("foo", d));
client().admin().indices().prepareCreate("fail").setMapping(mapping.endObject()).get();
int okCount = between(1, 50);
Set<String> okIds = new HashSet<>();
List<IndexRequestBuilder> docs = new ArrayList<>(okCount);
for (int d = 0; d < okCount; d++) {
String id = "ok-" + d;
okIds.add(id);
docs.add(client().prepareIndex("ok").setId(id).setSource("foo", d));
}
int failCount = between(1, 50);
for (int d = 0; d < failCount; d++) {
docs.add(client().prepareIndex("fail").setId("fail-" + d).setSource("foo", d));
}
docs.add(client().prepareIndex("fail").setSource("foo", 0));
indexRandom(true, docs);
return okIds;
}

/**
* Use a runtime field that fails when loading field values to fail the entire query.
*/
public void testFailureLoadingFields() throws Exception {
populateIndices();
IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close());
assertThat(e.getMessage(), equalTo("Accessing failing field"));
}

public void testPartialResults() throws Exception {
Set<String> okIds = populateIndices();
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok | LIMIT 100");
request.allowPartialResults(true);
request.pragmas(randomPragmas());
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
}
}
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
request.allowPartialResults(true);
request.pragmas(randomPragmas());
try (EsqlQueryResponse resp = run(request)) {
assertTrue(resp.isPartial());
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
Set<String> actualIds = new HashSet<>();
for (List<Object> row : rows) {
assertThat(row.size(), equalTo(2));
String id = (String) row.getFirst();
assertThat(id, in(okIds));
assertTrue(actualIds.add(id));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,12 @@ public enum Cap {
* Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
* were refactored.
*/
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG);
INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG),

/**
* Support partial_results
*/
SUPPORT_PARTIAL_RESULTS;

private final boolean enabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ public TimeValue planningTookTime() {
public void markEndQuery() {
assert relativeStartNanos != null : "Relative start time must be set when markEndQuery is called";
overallTook = new TimeValue(System.nanoTime() - relativeStartNanos, TimeUnit.NANOSECONDS);
if (isPartial == false) {
// TODO: Mark individual clusters as partial if failed shards exist
isPartial = clusterInfo.values()
.stream()
.anyMatch(
c -> c.status == Cluster.Status.PARTIAL
|| c.status == Cluster.Status.SKIPPED
|| c.failedShards != null && c.failedShards > 0
);
}
}

// for testing only - use markEndQuery in production code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
private boolean keepOnCompletion;
private boolean onSnapshotBuild = Build.current().isSnapshot();
private boolean acceptedPragmaRisks = false;
private boolean allowPartialResults = false;

/**
* "Tables" provided in the request for use with things like {@code LOOKUP}.
Expand Down Expand Up @@ -231,6 +232,14 @@ public Map<String, Map<String, Column>> tables() {
return tables;
}

public boolean allowPartialResults() {
return allowPartialResults;
}

public void allowPartialResults(boolean allowPartialResults) {
this.allowPartialResults = allowPartialResults;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// Pass the query as the description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ public boolean isAsync() {
return isRunning;
}

public boolean isPartial() {
return executionInfo != null && executionInfo.isPartial();
}

public EsqlExecutionInfo getExecutionInfo() {
return executionInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ String fields() {
static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout");
static final ParseField KEEP_ALIVE = new ParseField("keep_alive");
static final ParseField KEEP_ON_COMPLETION = new ParseField("keep_on_completion");
static final ParseField ALLOW_PARTIAL_RESULTS = new ParseField("allow_partial_results");

private static final ObjectParser<EsqlQueryRequest, Void> SYNC_PARSER = objectParserSync(EsqlQueryRequest::syncEsqlQueryRequest);
private static final ObjectParser<EsqlQueryRequest, Void> ASYNC_PARSER = objectParserAsync(EsqlQueryRequest::asyncEsqlQueryRequest);
Expand Down Expand Up @@ -114,6 +115,7 @@ private static void objectParserCommon(ObjectParser<EsqlQueryRequest, ?> parser)
parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD);
parser.declareBoolean(EsqlQueryRequest::profile, PROFILE_FIELD);
parser.declareField((p, r, c) -> new ParseTables(r, p).parseTables(), TABLES_FIELD, ObjectParser.ValueType.OBJECT);
parser.declareBoolean(EsqlQueryRequest::allowPartialResults, ALLOW_PARTIAL_RESULTS);
}

private static ObjectParser<EsqlQueryRequest, Void> objectParserSync(Supplier<EsqlQueryRequest> supplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public void execute(
transportService.getThreadPool(),
cancelQueryOnFailure,
computeListener.acquireCompute().delegateFailure((l, profiles) -> {
if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) {
if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) {
var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos());
var status = localClusterWasInterrupted.get()
? EsqlExecutionInfo.Cluster.Status.PARTIAL
Expand Down Expand Up @@ -252,16 +252,14 @@ public void execute(
cancelQueryOnFailure,
localListener.acquireCompute().map(r -> {
localClusterWasInterrupted.set(execInfo.isPartial());
if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) {
execInfo.swapCluster(
LOCAL_CLUSTER,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards())
.setSuccessfulShards(r.getSuccessfulShards())
.setSkippedShards(r.getSkippedShards())
.setFailedShards(r.getFailedShards())
.build()
);
}
execInfo.swapCluster(
LOCAL_CLUSTER,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards())
.setSuccessfulShards(r.getSuccessfulShards())
.setSkippedShards(r.getSkippedShards())
.setFailedShards(r.getFailedShards())
.build()
);
return r.getProfiles();
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ void startComputeOnDataNodes(
Runnable runOnTaskFailure,
ActionListener<ComputeResponse> outListener
) {
DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask) {
final boolean allowPartialResults = configuration.allowPartialResults();
DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask, allowPartialResults) {
@Override
protected void sendRequest(
DiscoveryNode node,
Expand Down Expand Up @@ -125,14 +126,28 @@ protected void sendRequest(
queryPragmas.exchangeBufferSize(),
esqlExecutor,
listener.delegateFailureAndWrap((l, unused) -> {
final Runnable onGroupFailure;
final CancellableTask groupTask;
if (allowPartialResults) {
groupTask = RemoteListenerGroup.createGroupTask(
transportService,
parentTask,
() -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]"
);
onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
} else {
groupTask = parentTask;
onGroupFailure = runOnTaskFailure;
}
final AtomicReference<DataNodeComputeResponse> nodeResponseRef = new AtomicReference<>();
try (
var computeListener = new ComputeListener(threadPool, runOnTaskFailure, l.map(ignored -> nodeResponseRef.get()))
var computeListener = new ComputeListener(threadPool, onGroupFailure, l.map(ignored -> nodeResponseRef.get()))
) {
final var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, connection);
final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
exchangeSource.addRemoteSink(
remoteSink,
true,
allowPartialResults == false,
pagesFetched::incrementAndGet,
queryPragmas.concurrentExchangeClients(),
computeListener.acquireAvoid()
Expand All @@ -153,7 +168,7 @@ protected void sendRequest(
connection,
ComputeService.DATA_ACTION_NAME,
dataNodeRequest,
parentTask,
groupTask,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(computeListener.acquireCompute().map(r -> {
nodeResponseRef.set(r);
Expand Down Expand Up @@ -238,6 +253,7 @@ public void onFailure(Exception e) {
}
onResponse(List.of());
} else {
// TODO: add these to fatal failures so we can continue processing other shards.
try {
exchangeService.finishSinkHandler(request.sessionId(), e);
} finally {
Expand Down
Loading