Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/118122.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118122
summary: "ES|QL: Partial result on demand for async queries"
area: ES|QL
type: enhancement
issues: []
9 changes: 7 additions & 2 deletions docs/reference/esql/esql-across-clusters.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ Which returns:
{
"is_running": false,
"took": 42, <1>
"is_partial": false, <7>
"columns" : [
{
"name" : "COUNT(http.response.status_code)",
Expand Down Expand Up @@ -280,8 +281,9 @@ Which returns:
<2> This section of counters shows all possible cluster search states and how many cluster
searches are currently in that state. The clusters can have one of the following statuses: *running*,
*successful* (searches on all shards were successful), *skipped* (the search
failed on a cluster marked with `skip_unavailable`=`true`) or *failed* (the search
failed on a cluster marked with `skip_unavailable`=`false`).
failed on a cluster marked with `skip_unavailable`=`true`), *failed* (the search
failed on a cluster marked with `skip_unavailable`=`false`) or **partial** (the search was
<<esql-async-query-stop-api, interrupted>> before finishing).
<3> The `_clusters/details` section shows metadata about the search on each cluster.
<4> If you included indices from the local cluster you sent the request to in your {ccs},
it is identified as "(local)".
Expand All @@ -290,6 +292,8 @@ which clusters have slower response times than others.
<6> The shard details for the search on that cluster, including a count of shards that were
skipped due to the can-match phase results. Shards are skipped when they cannot have any matching data
and therefore are not included in the full ES|QL query.
<7> The `is_partial` field is set to `true` if the search has partial results for any reason,
for example if it was interrupted before finishing using the <<esql-async-query-stop-api,async query stop API>>.


The cross-cluster metadata can be used to determine whether any data came back from a cluster.
Expand Down Expand Up @@ -319,6 +323,7 @@ Which returns:
{
"is_running": false,
"took": 55,
"is_partial": false,
"columns": [
... // not shown
],
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/esql/esql-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ overview of {esql} and related tutorials, see <<esql>>.
* <<esql-async-query-api>>
* <<esql-async-query-get-api>>
* <<esql-async-query-delete-api>>
* <<esql-async-query-stop-api>>


include::esql-query-api.asciidoc[]
Expand All @@ -26,3 +27,5 @@ include::esql-async-query-api.asciidoc[]
include::esql-async-query-get-api.asciidoc[]

include::esql-async-query-delete-api.asciidoc[]

include::esql-async-query-stop-api.asciidoc[]
7 changes: 7 additions & 0 deletions docs/reference/esql/esql-async-query-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,10 @@ API>> to get the current status and available results for the query.
(Boolean)
If `true`, the query request is still executing.
--

`is_partial`::
+
--
(Boolean)
If `true`, the query has partial results - for example, as a result of using the <<esql-async-query-stop-api, async query stop API>>.
--
49 changes: 49 additions & 0 deletions docs/reference/esql/esql-async-query-stop-api.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[[esql-async-query-stop-api]]
=== {esql} async query stop API
++++
<titleabbrev>{esql} async query stop API</titleabbrev>
++++

.New API reference
[sidebar]
--
For the most up-to-date API details, refer to {api-es}/group/endpoint-esql[ES|QL APIs].
--

The <<esql,{esql}>> async query stop API is used to manually stop an async query. Once the stop command is issued,
the query stops processing new data and returns the results that have been already processed. Note that due to the pipelined
nature of {esql} queries, the stop operation is not immediate and may take time to return results.

The results are returned in <<esql-query-api-response-body,the same format>> as the
<<esql-async-query-get-api,{esql} async query get API>>.
If the query has been finished by the time the stop command is issued, the results are returned immediately.

If the query processing has not finished by the time the stop command is issued, the response will have the `is_partial`
field set to `true`.

[source,console]
----
POST /query/async/FkpMRkJGS1gzVDRlM3g4ZzMyRGlLbkEaTXlJZHdNT09TU2VTZVBoNDM3cFZMUToxMDM=/stop
----
// TEST[skip: no access to query ID]

[[esql-async-query-stop-api-request]]
==== {api-request-title}

`POST /_query/async/<query_id>/stop`

[[esql-async-query-stop-api-prereqs]]
==== {api-prereq-title}

* If the {es} {security-features} are enabled, only the authenticated user that submitted the original query request
can stop the query.

[[esql-async-query-stop-api-path-params]]
==== {api-path-parms-title}

`<query_id>`::
(Required, string)
Identifier for the query to stop.
+
A query ID is provided in the <<esql-async-query-api,{esql} async query API>>'s
response for a query that does not complete in the awaited time.
1 change: 1 addition & 0 deletions docs/reference/esql/esql-rest.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ Which returns:
----
{
"took": 28,
"is_partial": false,
"columns": [
{"name": "author", "type": "text"},
{"name": "name", "type": "text"},
Expand Down
7 changes: 7 additions & 0 deletions docs/reference/esql/multivalued-fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Multivalued fields come back as a JSON array:
----
{
"took": 28,
"is_partial": false,
"columns": [
{ "name": "a", "type": "long"},
{ "name": "b", "type": "long"}
Expand Down Expand Up @@ -78,6 +79,7 @@ And {esql} sees that removal:
----
{
"took": 28,
"is_partial": false,
"columns": [
{ "name": "a", "type": "long"},
{ "name": "b", "type": "keyword"}
Expand Down Expand Up @@ -122,6 +124,7 @@ And {esql} also sees that:
----
{
"took": 28,
"is_partial": false,
"columns": [
{ "name": "a", "type": "long"},
{ "name": "b", "type": "long"}
Expand Down Expand Up @@ -165,6 +168,7 @@ POST /_query
----
{
"took": 28,
"is_partial": false,
"columns": [
{ "name": "a", "type": "long"},
{ "name": "b", "type": "keyword"}
Expand Down Expand Up @@ -198,6 +202,7 @@ POST /_query
----
{
"took": 28,
"is_partial": false,
"columns": [
{ "name": "a", "type": "long"},
],
Expand Down Expand Up @@ -241,6 +246,7 @@ POST /_query
----
{
"took": 28,
"is_partial": false,
"columns": [
{ "name": "a", "type": "long"},
{ "name": "b", "type": "long"},
Expand Down Expand Up @@ -278,6 +284,7 @@ POST /_query
----
{
"took": 28,
"is_partial": false,
"columns": [
{ "name": "a", "type": "long"},
{ "name": "b", "type": "long"},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"esql.async_query_stop": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/esql-async-query-stop-api.html",
"description": "Stops a previously submitted async query request given its ID and collects the results."
},
"stability": "stable",
"visibility": "public",
"headers": {
"accept": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_query/async/{id}/stop",
"methods": [
"POST"
],
"parts": {
"id": {
"type": "string",
"description": "The async query ID"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ static TransportVersion def(int id) {
public static final TransportVersion RESOLVE_CLUSTER_NO_INDEX_EXPRESSION = def(8_829_00_0);
public static final TransportVersion ML_ROLLOVER_LEGACY_INDICES = def(8_830_00_0);
public static final TransportVersion ADD_INCLUDE_FAILURE_INDICES_OPTION = def(8_831_00_0);
public static final TransportVersion ESQL_RESPONSE_PARTIAL = def(8_832_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void skipOnAborted() {
public void testSortByManyLongsSuccess() throws IOException {
initManyLongs();
Response response = sortByManyLongs(500);
Map<?, ?> map = responseAsMap(response);
Map<String, Object> map = responseAsMap(response);
ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long"))
.item(matchesMap().entry("name", "b").entry("type", "long"));
ListMatcher values = matchesList();
Expand All @@ -95,8 +95,7 @@ public void testSortByManyLongsSuccess() throws IOException {
values = values.item(List.of(0, b));
}
}
MapMatcher mapMatcher = matchesMap();
assertMap(map, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
assertResultMap(map, columns, values);
}

/**
Expand Down Expand Up @@ -236,11 +235,10 @@ private StringBuilder makeSortByManyLongs(int count) {
public void testGroupOnSomeLongs() throws IOException {
initManyLongs();
Response resp = groupOnManyLongs(200);
Map<?, ?> map = responseAsMap(resp);
Map<String, Object> map = responseAsMap(resp);
ListMatcher columns = matchesList().item(matchesMap().entry("name", "MAX(a)").entry("type", "long"));
ListMatcher values = matchesList().item(List.of(9));
MapMatcher mapMatcher = matchesMap();
assertMap(map, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
assertResultMap(map, columns, values);
}

/**
Expand All @@ -249,11 +247,10 @@ public void testGroupOnSomeLongs() throws IOException {
public void testGroupOnManyLongs() throws IOException {
initManyLongs();
Response resp = groupOnManyLongs(5000);
Map<?, ?> map = responseAsMap(resp);
Map<String, Object> map = responseAsMap(resp);
ListMatcher columns = matchesList().item(matchesMap().entry("name", "MAX(a)").entry("type", "long"));
ListMatcher values = matchesList().item(List.of(9));
MapMatcher mapMatcher = matchesMap();
assertMap(map, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
assertResultMap(map, columns, values);
}

private Response groupOnManyLongs(int count) throws IOException {
Expand All @@ -279,12 +276,11 @@ private StringBuilder makeManyLongs(int count) {
public void testSmallConcat() throws IOException {
initSingleDocIndex();
Response resp = concat(2);
Map<?, ?> map = responseAsMap(resp);
Map<String, Object> map = responseAsMap(resp);
ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long"))
.item(matchesMap().entry("name", "str").entry("type", "keyword"));
ListMatcher values = matchesList().item(List.of(1, "1".repeat(100)));
MapMatcher mapMatcher = matchesMap();
assertMap(map, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
assertResultMap(map, columns, values);
}

public void testHugeConcat() throws IOException {
Expand Down Expand Up @@ -465,7 +461,7 @@ private void assertManyStrings(Response resp, int strings) throws IOException {
public void testManyEval() throws IOException {
initManyLongs();
Response resp = manyEval(1);
Map<?, ?> map = responseAsMap(resp);
Map<String, Object> map = responseAsMap(resp);
ListMatcher columns = matchesList();
columns = columns.item(matchesMap().entry("name", "a").entry("type", "long"));
columns = columns.item(matchesMap().entry("name", "b").entry("type", "long"));
Expand All @@ -475,8 +471,7 @@ public void testManyEval() throws IOException {
for (int i = 0; i < 20; i++) {
columns = columns.item(matchesMap().entry("name", "i0" + i).entry("type", "long"));
}
MapMatcher mapMatcher = matchesMap();
assertMap(map, mapMatcher.entry("columns", columns).entry("values", hasSize(10_000)).entry("took", greaterThanOrEqualTo(0)));
assertResultMap(map, columns, hasSize(10_000));
}

public void testTooManyEval() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.AbstractBroadcastResponseTestCase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MapMatcher;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand All @@ -84,6 +85,7 @@
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
Expand Down Expand Up @@ -133,12 +135,15 @@
import static org.elasticsearch.client.RestClient.IGNORE_RESPONSE_CODES_PARAM;
import static org.elasticsearch.cluster.ClusterState.VERSION_INTRODUCING_TRANSPORT_VERSIONS;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.test.rest.TestFeatureService.ALL_FEATURES;
import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.notNullValue;

Expand Down Expand Up @@ -2627,4 +2632,46 @@ public static Request newXContentRequest(HttpMethod method, String endpoint, ToX
addXContentBody(request, body);
return request;
}

protected static MapMatcher getResultMatcher(boolean includeMetadata, boolean includePartial) {
MapMatcher mapMatcher = matchesMap();
if (includeMetadata) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
// Older version may not have is_partial
if (includePartial) {
mapMatcher = mapMatcher.entry("is_partial", false);
}
return mapMatcher;
}

/**
* Create empty result matcher from result, taking into account all metadata items.
*/
protected static MapMatcher getResultMatcher(Map<String, Object> result) {
return getResultMatcher(result.containsKey("took"), result.containsKey("is_partial"));
}

/**
* Match result columns and values, with default matchers for metadata.
*/
protected static void assertResultMap(Map<String, Object> result, Matcher<?> columnMatcher, Matcher<?> valuesMatcher) {
assertMap(result, getResultMatcher(result).entry("columns", columnMatcher).entry("values", valuesMatcher));
}

protected static void assertResultMap(Map<String, Object> result, Object columnMatcher, Object valuesMatcher) {
assertMap(result, getResultMatcher(result).entry("columns", columnMatcher).entry("values", valuesMatcher));
}

/**
* Match result columns and values, with default matchers for metadata.
*/
protected static void assertResultMap(
Map<String, Object> result,
MapMatcher mapMatcher,
Matcher<?> columnMatcher,
Matcher<?> valuesMatcher
) {
assertMap(result, mapMatcher.entry("columns", columnMatcher).entry("values", valuesMatcher));
}
}
Loading