Skip to content

Commit f27f746

Browse files
authored
ES|QL async queries: Partial result on demand (#118122)
Add capability to stop async query on demand The theory: - User initiates async search request - User sends the stop request (POST _query/async/<ID>/stop) - If the async is finished by that time, it's like regular async get - If it's not finished, the sinks are closed and the request is forcefully finished
1 parent 9a9bc69 commit f27f746

File tree

46 files changed

+1777
-658
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1777
-658
lines changed

docs/changelog/118122.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118122
2+
summary: "ES|QL: Partial result on demand for async queries"
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/reference/esql/esql-across-clusters.asciidoc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ Which returns:
210210
{
211211
"is_running": false,
212212
"took": 42, <1>
213+
"is_partial": false, <7>
213214
"columns" : [
214215
{
215216
"name" : "COUNT(http.response.status_code)",
@@ -275,8 +276,9 @@ Which returns:
275276
<2> This section of counters shows all possible cluster search states and how many cluster
276277
searches are currently in that state. The clusters can have one of the following statuses: *running*,
277278
*successful* (searches on all shards were successful), *skipped* (the search
278-
failed on a cluster marked with `skip_unavailable`=`true`) or *failed* (the search
279-
failed on a cluster marked with `skip_unavailable`=`false`).
279+
failed on a cluster marked with `skip_unavailable`=`true`), *failed* (the search
280+
failed on a cluster marked with `skip_unavailable`=`false`) or **partial** (the search was
281+
<<esql-async-query-stop-api, interrupted>> before finishing).
280282
<3> The `_clusters/details` section shows metadata about the search on each cluster.
281283
<4> If you included indices from the local cluster you sent the request to in your {ccs},
282284
it is identified as "(local)".
@@ -285,6 +287,8 @@ which clusters have slower response times than others.
285287
<6> The shard details for the search on that cluster, including a count of shards that were
286288
skipped due to the can-match phase results. Shards are skipped when they cannot have any matching data
287289
and therefore are not included in the full ES|QL query.
290+
<7> The `is_partial` field is set to `true` if the search has partial results for any reason,
291+
for example if it was interrupted before finishing using the <<esql-async-query-stop-api,async query stop API>>.
288292

289293

290294
The cross-cluster metadata can be used to determine whether any data came back from a cluster.
@@ -314,6 +318,7 @@ Which returns:
314318
{
315319
"is_running": false,
316320
"took": 55,
321+
"is_partial": false,
317322
"columns": [
318323
... // not shown
319324
],

docs/reference/esql/esql-apis.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ overview of {esql} and related tutorials, see <<esql>>.
1717
* <<esql-async-query-api>>
1818
* <<esql-async-query-get-api>>
1919
* <<esql-async-query-delete-api>>
20+
* <<esql-async-query-stop-api>>
2021

2122

2223
include::esql-query-api.asciidoc[]
@@ -26,3 +27,5 @@ include::esql-async-query-api.asciidoc[]
2627
include::esql-async-query-get-api.asciidoc[]
2728

2829
include::esql-async-query-delete-api.asciidoc[]
30+
31+
include::esql-async-query-stop-api.asciidoc[]

docs/reference/esql/esql-async-query-api.asciidoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,10 @@ API>> to get the current status and available results for the query.
170170
(Boolean)
171171
If `true`, the query request is still executing.
172172
--
173+
174+
`is_partial`::
175+
+
176+
--
177+
(Boolean)
178+
If `true`, the query has partial results - for example, as a result of using the <<esql-async-query-stop-api, async query stop API>>.
179+
--
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
[[esql-async-query-stop-api]]
2+
=== {esql} async query stop API
3+
++++
4+
<titleabbrev>{esql} async query stop API</titleabbrev>
5+
++++
6+
7+
.New API reference
8+
[sidebar]
9+
--
10+
For the most up-to-date API details, refer to {api-es}/group/endpoint-esql[ES|QL APIs].
11+
--
12+
13+
The <<esql,{esql}>> async query stop API is used to manually stop an async query. Once the stop command is issued,
14+
the query stops processing new data and returns the results that have been already processed. Note that due to the pipelined
15+
nature of {esql} queries, the stop operation is not immediate and may take time to return results.
16+
17+
The results are returned in <<esql-query-api-response-body,the same format>> as the
18+
<<esql-async-query-get-api,{esql} async query get API>>.
19+
If the query has been finished by the time the stop command is issued, the results are returned immediately.
20+
21+
If the query processing has not finished by the time the stop command is issued, the response will have the `is_partial`
22+
field set to `true`.
23+
24+
[source,console]
25+
----
26+
POST /query/async/FkpMRkJGS1gzVDRlM3g4ZzMyRGlLbkEaTXlJZHdNT09TU2VTZVBoNDM3cFZMUToxMDM=/stop
27+
----
28+
// TEST[skip: no access to query ID]
29+
30+
[[esql-async-query-stop-api-request]]
31+
==== {api-request-title}
32+
33+
`POST /_query/async/<query_id>/stop`
34+
35+
[[esql-async-query-stop-api-prereqs]]
36+
==== {api-prereq-title}
37+
38+
* If the {es} {security-features} are enabled, only the authenticated user that submitted the original query request
39+
can stop the query.
40+
41+
[[esql-async-query-stop-api-path-params]]
42+
==== {api-path-parms-title}
43+
44+
`<query_id>`::
45+
(Required, string)
46+
Identifier for the query to stop.
47+
+
48+
A query ID is provided in the <<esql-async-query-api,{esql} async query API>>'s
49+
response for a query that does not complete in the awaited time.

docs/reference/esql/esql-rest.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ Which returns:
193193
----
194194
{
195195
"took": 28,
196+
"is_partial": false,
196197
"columns": [
197198
{"name": "author", "type": "text"},
198199
{"name": "name", "type": "text"},

docs/reference/esql/multivalued-fields.asciidoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ Multivalued fields come back as a JSON array:
2727
----
2828
{
2929
"took": 28,
30+
"is_partial": false,
3031
"columns": [
3132
{ "name": "a", "type": "long"},
3233
{ "name": "b", "type": "long"}
@@ -78,6 +79,7 @@ And {esql} sees that removal:
7879
----
7980
{
8081
"took": 28,
82+
"is_partial": false,
8183
"columns": [
8284
{ "name": "a", "type": "long"},
8385
{ "name": "b", "type": "keyword"}
@@ -122,6 +124,7 @@ And {esql} also sees that:
122124
----
123125
{
124126
"took": 28,
127+
"is_partial": false,
125128
"columns": [
126129
{ "name": "a", "type": "long"},
127130
{ "name": "b", "type": "long"}
@@ -165,6 +168,7 @@ POST /_query
165168
----
166169
{
167170
"took": 28,
171+
"is_partial": false,
168172
"columns": [
169173
{ "name": "a", "type": "long"},
170174
{ "name": "b", "type": "keyword"}
@@ -198,6 +202,7 @@ POST /_query
198202
----
199203
{
200204
"took": 28,
205+
"is_partial": false,
201206
"columns": [
202207
{ "name": "a", "type": "long"},
203208
],
@@ -241,6 +246,7 @@ POST /_query
241246
----
242247
{
243248
"took": 28,
249+
"is_partial": false,
244250
"columns": [
245251
{ "name": "a", "type": "long"},
246252
{ "name": "b", "type": "long"},
@@ -278,6 +284,7 @@ POST /_query
278284
----
279285
{
280286
"took": 28,
287+
"is_partial": false,
281288
"columns": [
282289
{ "name": "a", "type": "long"},
283290
{ "name": "b", "type": "long"},
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"esql.async_query_stop": {
3+
"documentation": {
4+
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/esql-async-query-stop-api.html",
5+
"description": "Stops a previously submitted async query request given its ID and collects the results."
6+
},
7+
"stability": "stable",
8+
"visibility": "public",
9+
"headers": {
10+
"accept": [
11+
"application/json"
12+
]
13+
},
14+
"url": {
15+
"paths": [
16+
{
17+
"path": "/_query/async/{id}/stop",
18+
"methods": [
19+
"POST"
20+
],
21+
"parts": {
22+
"id": {
23+
"type": "string",
24+
"description": "The async query ID"
25+
}
26+
}
27+
}
28+
]
29+
}
30+
}
31+
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ static TransportVersion def(int id) {
163163
public static final TransportVersion RESOLVE_CLUSTER_NO_INDEX_EXPRESSION = def(8_829_00_0);
164164
public static final TransportVersion ML_ROLLOVER_LEGACY_INDICES = def(8_830_00_0);
165165
public static final TransportVersion ADD_INCLUDE_FAILURE_INDICES_OPTION = def(8_831_00_0);
166+
public static final TransportVersion ESQL_RESPONSE_PARTIAL = def(8_832_00_0);
166167

167168
/*
168169
* STOP! READ THIS FIRST! No, really,

test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void skipOnAborted() {
8686
public void testSortByManyLongsSuccess() throws IOException {
8787
initManyLongs();
8888
Response response = sortByManyLongs(500);
89-
Map<?, ?> map = responseAsMap(response);
89+
Map<String, Object> map = responseAsMap(response);
9090
ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long"))
9191
.item(matchesMap().entry("name", "b").entry("type", "long"));
9292
ListMatcher values = matchesList();
@@ -95,8 +95,7 @@ public void testSortByManyLongsSuccess() throws IOException {
9595
values = values.item(List.of(0, b));
9696
}
9797
}
98-
MapMatcher mapMatcher = matchesMap();
99-
assertMap(map, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
98+
assertResultMap(map, columns, values);
10099
}
101100

102101
/**
@@ -236,11 +235,10 @@ private StringBuilder makeSortByManyLongs(int count) {
236235
public void testGroupOnSomeLongs() throws IOException {
237236
initManyLongs();
238237
Response resp = groupOnManyLongs(200);
239-
Map<?, ?> map = responseAsMap(resp);
238+
Map<String, Object> map = responseAsMap(resp);
240239
ListMatcher columns = matchesList().item(matchesMap().entry("name", "MAX(a)").entry("type", "long"));
241240
ListMatcher values = matchesList().item(List.of(9));
242-
MapMatcher mapMatcher = matchesMap();
243-
assertMap(map, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
241+
assertResultMap(map, columns, values);
244242
}
245243

246244
/**
@@ -249,11 +247,10 @@ public void testGroupOnSomeLongs() throws IOException {
249247
public void testGroupOnManyLongs() throws IOException {
250248
initManyLongs();
251249
Response resp = groupOnManyLongs(5000);
252-
Map<?, ?> map = responseAsMap(resp);
250+
Map<String, Object> map = responseAsMap(resp);
253251
ListMatcher columns = matchesList().item(matchesMap().entry("name", "MAX(a)").entry("type", "long"));
254252
ListMatcher values = matchesList().item(List.of(9));
255-
MapMatcher mapMatcher = matchesMap();
256-
assertMap(map, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
253+
assertResultMap(map, columns, values);
257254
}
258255

259256
private Response groupOnManyLongs(int count) throws IOException {
@@ -279,12 +276,11 @@ private StringBuilder makeManyLongs(int count) {
279276
public void testSmallConcat() throws IOException {
280277
initSingleDocIndex();
281278
Response resp = concat(2);
282-
Map<?, ?> map = responseAsMap(resp);
279+
Map<String, Object> map = responseAsMap(resp);
283280
ListMatcher columns = matchesList().item(matchesMap().entry("name", "a").entry("type", "long"))
284281
.item(matchesMap().entry("name", "str").entry("type", "keyword"));
285282
ListMatcher values = matchesList().item(List.of(1, "1".repeat(100)));
286-
MapMatcher mapMatcher = matchesMap();
287-
assertMap(map, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
283+
assertResultMap(map, columns, values);
288284
}
289285

290286
public void testHugeConcat() throws IOException {
@@ -465,7 +461,7 @@ private void assertManyStrings(Response resp, int strings) throws IOException {
465461
public void testManyEval() throws IOException {
466462
initManyLongs();
467463
Response resp = manyEval(1);
468-
Map<?, ?> map = responseAsMap(resp);
464+
Map<String, Object> map = responseAsMap(resp);
469465
ListMatcher columns = matchesList();
470466
columns = columns.item(matchesMap().entry("name", "a").entry("type", "long"));
471467
columns = columns.item(matchesMap().entry("name", "b").entry("type", "long"));
@@ -475,8 +471,7 @@ public void testManyEval() throws IOException {
475471
for (int i = 0; i < 20; i++) {
476472
columns = columns.item(matchesMap().entry("name", "i0" + i).entry("type", "long"));
477473
}
478-
MapMatcher mapMatcher = matchesMap();
479-
assertMap(map, mapMatcher.entry("columns", columns).entry("values", hasSize(10_000)).entry("took", greaterThanOrEqualTo(0)));
474+
assertResultMap(map, columns, hasSize(10_000));
480475
}
481476

482477
public void testTooManyEval() throws IOException {

0 commit comments

Comments
 (0)