Skip to content
Merged
Show file tree
Hide file tree
Changes from 80 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
ff822ef
Add capability to stop async query on demand
smalyshev Dec 3, 2024
e2e761b
Add test skeleton
smalyshev Dec 5, 2024
932d0f9
Add security check
smalyshev Dec 6, 2024
beb7b02
Allow close exchange early
dnhatn Dec 5, 2024
f7e1d9c
Fix tests
dnhatn Dec 6, 2024
bada285
Add query interruption
smalyshev Dec 6, 2024
867a46c
spotless
smalyshev Dec 6, 2024
ccd6d34
code cleanup
smalyshev Dec 6, 2024
406d1a0
Add action to non-op list
smalyshev Dec 6, 2024
188dedb
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 6, 2024
124e070
Wait for the listener to complete
smalyshev Dec 6, 2024
0ce939b
Allow early termination in Driver
dnhatn Dec 7, 2024
516abbe
Capture partial status
smalyshev Dec 9, 2024
b53e1f1
Merge branch 'pr/118211' into partial-result-on-demand
smalyshev Dec 9, 2024
514e22d
Ensure remote pipeline early termination
smalyshev Dec 12, 2024
49c768e
fix tests
smalyshev Dec 12, 2024
85220a7
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 13, 2024
1a93f9f
more test fixes
smalyshev Dec 13, 2024
ab08912
test fix
smalyshev Dec 13, 2024
4e5b3de
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 17, 2024
b4729eb
More tests
smalyshev Dec 18, 2024
ddb8794
Plugin refactoring
smalyshev Dec 18, 2024
02a440c
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 18, 2024
ebb4cd1
More tests
smalyshev Dec 19, 2024
93c5543
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 19, 2024
bbdcdec
More tests
smalyshev Dec 19, 2024
cb3620a
Update docs/changelog/118122.yaml
smalyshev Dec 19, 2024
1b462b7
More tests
smalyshev Dec 19, 2024
a6185d7
test fix
smalyshev Dec 19, 2024
a6da813
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 21, 2024
10c0d02
Update 118122.yaml
smalyshev Dec 22, 2024
e928e10
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 22, 2024
ab37d10
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 26, 2024
4a94c15
Feedback
smalyshev Dec 26, 2024
605a5ee
[CI] Auto commit changes from spotless
Dec 26, 2024
8035e41
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 26, 2024
a488391
Improve handling of PARTIAL results
smalyshev Dec 27, 2024
6b949fe
Merge branch 'main' into partial-result-on-demand
smalyshev Dec 27, 2024
6d41758
Fix tests - we do need to serialize isPartial in exec info
smalyshev Dec 27, 2024
d310ba4
Add checks for delay() usage
smalyshev Dec 27, 2024
8bd844f
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 2, 2025
a120670
add comments
smalyshev Jan 2, 2025
812322b
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 2, 2025
f781e7a
[CI] Auto commit changes from spotless
Jan 2, 2025
fc4d476
Add JSON defs for stop endpoint
smalyshev Jan 2, 2025
d65f173
Add docs
smalyshev Jan 2, 2025
9098c82
fix tests
smalyshev Jan 2, 2025
6f58544
fix API name
smalyshev Jan 2, 2025
a44da7b
add is_partial back, somehow it got dropped
smalyshev Jan 3, 2025
0db9d98
Add async stop telemetry test
smalyshev Jan 3, 2025
9712367
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 3, 2025
145a07e
Fix bad id test
smalyshev Jan 6, 2025
3cb2527
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 6, 2025
391253f
Delete async search at the end of the test
smalyshev Jan 6, 2025
080e1ba
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 6, 2025
d86acb4
Remove DriverEarlyTerminationException
smalyshev Jan 7, 2025
e9559fe
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 7, 2025
7ccc749
Cleanup partial status handling
smalyshev Jan 7, 2025
a67234c
Refine skipped checks
smalyshev Jan 7, 2025
9ab0207
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 8, 2025
65bf5a5
Cleanup & some docs
smalyshev Jan 8, 2025
d1160b6
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 9, 2025
e739670
Remove onFinishEarly and use EsqlQueryListener instead
smalyshev Jan 9, 2025
ebeeaf4
remove micro-optimization
smalyshev Jan 9, 2025
8900a9e
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 9, 2025
4e9a17a
Use new listener class
smalyshev Jan 10, 2025
c66b630
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 13, 2025
c7a3e3d
Add test to ensure we early terminate
smalyshev Jan 13, 2025
c5bf7c3
[CI] Auto commit changes from spotless
Jan 13, 2025
291394e
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 15, 2025
c49262f
Remove setSource as superceded by https://github.com/elastic/elastics…
smalyshev Jan 14, 2025
765b8ff
[CI] Auto commit changes from spotless
Jan 15, 2025
ba0c337
Pull feedback
smalyshev Jan 15, 2025
18bb7d5
fix test
smalyshev Jan 15, 2025
c24721a
Docs improvements
smalyshev Jan 15, 2025
3b2cf6a
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 15, 2025
7416227
Remove timeout
smalyshev Jan 15, 2025
6c98a2c
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 15, 2025
280edcd
[CI] Auto commit changes from spotless
Jan 16, 2025
1665d43
Restore the wait time
smalyshev Jan 16, 2025
2bfad31
Merge branch 'main' into partial-result-on-demand
Jan 21, 2025
8825fe5
Move is_partial to the top dir
smalyshev Jan 21, 2025
be1badf
Fix esql tests to allow is_partial in result
smalyshev Jan 21, 2025
d74b79e
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 21, 2025
c83f913
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 21, 2025
5d9056e
test fixes
smalyshev Jan 21, 2025
58d1b98
test fixes
smalyshev Jan 21, 2025
3814c2f
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 21, 2025
7fb5e63
test fix
smalyshev Jan 21, 2025
94e4e07
test fix
smalyshev Jan 22, 2025
bea45e3
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 22, 2025
ed26482
Fix doc tests
smalyshev Jan 22, 2025
10da662
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 22, 2025
dc31690
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 22, 2025
3d82200
pull feedback
smalyshev Jan 22, 2025
bcfca2b
Merge remote-tracking branch 'elastic/main' into fork/smalyshev/parti…
dnhatn Jan 22, 2025
d08313a
cluster status
dnhatn Jan 23, 2025
fb33318
better status
dnhatn Jan 23, 2025
058705f
wait for local cluster
dnhatn Jan 23, 2025
58ea6c7
Merge remote-tracking branch 'elastic/main' into fork/smalyshev/parti…
dnhatn Jan 23, 2025
8d27af8
NPE
dnhatn Jan 23, 2025
99d5eb4
fix tests
dnhatn Jan 23, 2025
80087f5
restore the stop query check
smalyshev Jan 23, 2025
cd5c4a7
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 23, 2025
9d97471
Merge branch 'main' into partial-result-on-demand
smalyshev Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/118122.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118122
summary: "ES|QL: Partial result on demand for async queries"
area: ES|QL
type: enhancement
issues: []
9 changes: 7 additions & 2 deletions docs/reference/esql/esql-across-clusters.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ Which returns:
"skipped": 0,
"partial": 0,
"failed": 0,
"is_partial": false, <7>
"details": { <3>
"(local)": { <4>
"status": "successful",
Expand Down Expand Up @@ -275,8 +276,9 @@ Which returns:
<2> This section of counters shows all possible cluster search states and how many cluster
searches are currently in that state. The clusters can have one of the following statuses: *running*,
*successful* (searches on all shards were successful), *skipped* (the search
failed on a cluster marked with `skip_unavailable`=`true`) or *failed* (the search
failed on a cluster marked with `skip_unavailable`=`false`).
failed on a cluster marked with `skip_unavailable`=`true`), *failed* (the search
failed on a cluster marked with `skip_unavailable`=`false`) or **partial** (the search was
<<esql-async-query-stop-api, interrupted>> before finishing).
<3> The `_clusters/details` section shows metadata about the search on each cluster.
<4> If you included indices from the local cluster you sent the request to in your {ccs},
it is identified as "(local)".
Expand All @@ -285,6 +287,8 @@ which clusters have slower response times than others.
<6> The shard details for the search on that cluster, including a count of shards that were
skipped due to the can-match phase results. Shards are skipped when they cannot have any matching data
and therefore are not included in the full ES|QL query.
<7> The `is_partial` field is set to `true` if the search was interrupted before finishing using
<<esql-async-query-stop-api,async stop API>>.


The cross-cluster metadata can be used to determine whether any data came back from a cluster.
Expand Down Expand Up @@ -327,6 +331,7 @@ Which returns:
"skipped": 0,
"partial": 0,
"failed": 0,
"is_partial": false,
"details": {
"cluster_one": {
"status": "successful",
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/esql/esql-apis.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ overview of {esql} and related tutorials, see <<esql>>.
* <<esql-async-query-api>>
* <<esql-async-query-get-api>>
* <<esql-async-query-delete-api>>
* <<esql-async-query-stop-api>>


include::esql-query-api.asciidoc[]
Expand All @@ -26,3 +27,5 @@ include::esql-async-query-api.asciidoc[]
include::esql-async-query-get-api.asciidoc[]

include::esql-async-query-delete-api.asciidoc[]

include::esql-async-query-stop-api.asciidoc[]
7 changes: 7 additions & 0 deletions docs/reference/esql/esql-async-query-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,10 @@ API>> to get the current status and available results for the query.
(Boolean)
If `true`, the query request is still executing.
--

`is_partial`::
+
--
(Boolean)
If `true`, the query has partial results - for example, as a result of using the <<esql-async-query-stop-api, query stop API>>.
--
49 changes: 49 additions & 0 deletions docs/reference/esql/esql-async-query-stop-api.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[[esql-async-query-stop-api]]
=== {esql} async query stop API
++++
<titleabbrev>{esql} async query stop API</titleabbrev>
++++

.New API reference
[sidebar]
--
For the most up-to-date API details, refer to {api-es}/group/endpoint-esql[ES|QL APIs].
--

The <<esql,{esql}>> async query stop API is used to manually stop an async query. Once stop command is issued,
the query stops processing new data and returns the results that have been already processed. Note that due to the pipelined
nature of {esql} queries, the stop operation is not immediate and may take time to return results.

The results are returned in <<esql-query-api-response-body,the same format>> as for
<<esql-async-query-get-api,{esql} async query get API>>.
If the query has been finished by the time the stop command is issued, the results are returned immediately.

If the query processing has not been finished by the time the stop command is issued, the result data will have
`is_partial` field set to `true`.

[source,console]
----
POST /query/async/FkpMRkJGS1gzVDRlM3g4ZzMyRGlLbkEaTXlJZHdNT09TU2VTZVBoNDM3cFZMUToxMDM=/stop
----
// TEST[skip: no access to query ID]

[[esql-async-query-stop-api-request]]
==== {api-request-title}

`POST /_query/async/<query_id>/stop`

[[esql-async-query-stop-api-prereqs]]
==== {api-prereq-title}

* If the {es} {security-features} are enabled, only the authenticated user that submitted the original query request
can stop the query.

[[esql-async-query-stop-api-path-params]]
==== {api-path-parms-title}

`<query_id>`::
(Required, string)
Identifier for the query to stop.
+
A query ID is provided in the <<esql-async-query-api,{esql} async query API>>'s
response for a query that does not complete in the awaited time.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"esql.async_query_stop": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/esql-async-query-stop-api.html",
"description": "Stops a previously submitted async query request given its ID and collects the results."
},
"stability": "stable",
"visibility": "public",
"headers": {
"accept": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_query/async/{id}/stop",
"methods": [
"POST"
],
"parts": {
"id": {
"type": "string",
"description": "The async query ID"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ELASTIC_INFERENCE_SERVICE_UNIFIED_CHAT_COMPLETIONS_INTEGRATION = def(8_822_00_0);
public static final TransportVersion KQL_QUERY_TECH_PREVIEW = def(8_823_00_0);
public static final TransportVersion ESQL_PROFILE_ROWS_PROCESSED = def(8_824_00_0);
public static final TransportVersion ESQL_RESPONSE_PARTIAL = def(8_825_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.async;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Objects;

/**
* Request for TransportEsqlAsyncStopAction action.
*/
public class AsyncStopRequest extends ActionRequest {
private final String id;

/**
* Creates a new request
*
* @param id The id of the search progress request.
*/
public AsyncStopRequest(String id) {
this.id = id;
}

public AsyncStopRequest(StreamInput in) throws IOException {
super(in);
this.id = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

/**
* Returns the id of the async search.
*/
public String getId() {
return id;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AsyncStopRequest request = (AsyncStopRequest) o;
return Objects.equals(id, request.id);
}

@Override
public int hashCode() {
return Objects.hash(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@
*/
public class EsqlAsyncActionNames {
public static final String ESQL_ASYNC_GET_RESULT_ACTION_NAME = "indices:data/read/esql/async/get";
public static final String ESQL_ASYNC_STOP_ACTION_NAME = "indices:data/read/esql/async/stop";
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public final class ExchangeService extends AbstractLifecycleComponent {
private final BlockFactory blockFactory;

private final Map<String, ExchangeSinkHandler> sinks = ConcurrentCollections.newConcurrentMap();
private final Map<String, ExchangeSourceHandler> exchangeSources = ConcurrentCollections.newConcurrentMap();

public ExchangeService(Settings settings, ThreadPool threadPool, String executorName, BlockFactory blockFactory) {
this.threadPool = threadPool;
Expand Down Expand Up @@ -172,6 +173,32 @@ public static void openExchange(
);
}

/**
* Remember the exchange source handler for the given session ID.
* This can be used for async/stop requests.
*/
public void addExchangeSourceHandler(String sessionId, ExchangeSourceHandler sourceHandler) {
exchangeSources.put(sessionId, sourceHandler);
}

public ExchangeSourceHandler removeExchangeSourceHandler(String sessionId) {
return exchangeSources.remove(sessionId);
}

/**
* Finishes the session early, i.e., before all sources are finished.
* It is called by async/stop API and should be called on the node that coordinates the async request.
* It will close all sources and return the results - unlike cancel, this does not discard the results.
*/
public void finishSessionEarly(String sessionId, ActionListener<Void> listener) {
ExchangeSourceHandler exchangeSource = removeExchangeSourceHandler(sessionId);
if (exchangeSource != null) {
exchangeSource.finishEarly(false, listener);
} else {
listener.onResponse(null);
}
}

private static class OpenExchangeRequest extends TransportRequest {
private final String sessionId;
private final int exchangeBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasKey;

@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
Expand Down Expand Up @@ -260,8 +261,8 @@ public void testUngroupedAggs() throws Exception {
private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteOnly) {
@SuppressWarnings("unchecked")
Map<String, Object> clusters = (Map<String, Object>) result.get("_clusters");
assertThat(clusters.size(), equalTo(7));
assertThat(clusters.keySet(), equalTo(Set.of("total", "successful", "running", "skipped", "partial", "failed", "details")));
assertThat(clusters.size(), greaterThanOrEqualTo(7));
assertThat(clusters.keySet(), hasItems("total", "successful", "running", "skipped", "partial", "failed", "details"));
int expectedNumClusters = remoteOnly ? 1 : 2;
Set<String> expectedClusterAliases = remoteOnly ? Set.of("remote_cluster") : Set.of("remote_cluster", "(local)");

Expand All @@ -271,6 +272,10 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO
assertThat(clusters.get("skipped"), equalTo(0));
assertThat(clusters.get("partial"), equalTo(0));
assertThat(clusters.get("failed"), equalTo(0));
if (clusters.containsKey("is_partial")) {
// for some BWC tests, the is_partial key may not be present
assertThat(clusters.get("is_partial"), equalTo(false));
}

@SuppressWarnings("unchecked")
Map<String, Object> details = (Map<String, Object>) clusters.get("details");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected CCSTelemetrySnapshot getTelemetryFromFailedQuery(String query) throws
return getTelemetrySnapshot(queryNode);
}

private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
protected CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
var usage = cluster(LOCAL_CLUSTER).getInstance(UsageService.class, nodeName);
return usage.getEsqlUsageHolder().getCCSTelemetrySnapshot();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ protected void onStartExecute() {}
// Called when the engine needs to wait for further execution to be allowed.
protected abstract boolean onWait() throws InterruptedException;

protected String scriptTypeName() {
return "pause";
}

@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ScriptEngine() {
@Override
public String getType() {
return "pause";
return scriptTypeName();
}

@Override
Expand Down
Loading