Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0383960
ESQL: Change queries ID to be the same as the async
GalLalouche Apr 24, 2025
a463ba5
Update docs/changelog/127472.yaml
GalLalouche Apr 28, 2025
0948424
Fix compilation error
GalLalouche Apr 28, 2025
25fee06
Merge branch 'feature/apis' of github.com:GalLalouche/elasticsearch i…
GalLalouche Apr 28, 2025
9f2e297
Merge branch 'main' into feature/apis
GalLalouche Apr 28, 2025
ec7f125
Refactor test
GalLalouche Apr 28, 2025
79a9669
Fix failing test
GalLalouche Apr 28, 2025
e3958c0
Fix failing test
GalLalouche Apr 29, 2025
de60192
Fix failing test (third time's the chard?)
GalLalouche Apr 29, 2025
9011486
Merge branch 'main' into feature/apis
GalLalouche Apr 29, 2025
6dfa323
Fourth time then
GalLalouche Apr 29, 2025
24b654e
Added wire test
GalLalouche May 5, 2025
6791493
Changed status to contain AsyncExecutionId
GalLalouche May 6, 2025
b1eedc6
Merge branch 'main' into feature/apis
GalLalouche May 6, 2025
c6a717a
Merge branch 'main' into feature/apis
GalLalouche May 26, 2025
5a017be
Update accept hearders
GalLalouche May 26, 2025
62e421e
Fix missing import
GalLalouche May 26, 2025
de56665
Review fixes
GalLalouche May 29, 2025
fcdb7a8
Merge branch 'main' into feature/apis
GalLalouche May 29, 2025
398ba4c
Merge branch 'main' into feature/apis
GalLalouche Jun 11, 2025
f46d1fa
Added a TODO for createTask Nullable
GalLalouche Jun 11, 2025
45b4e8e
Merge branch 'main' into feature/apis
GalLalouche Jun 11, 2025
e61f293
Fix failing test
GalLalouche Jun 11, 2025
bbd87ac
Merge branch 'main' into feature/apis
GalLalouche Jun 11, 2025
7adbc29
Merge branch 'main' into feature/apis
GalLalouche Jun 11, 2025
2abb63d
Merge branch 'main' into feature/apis
GalLalouche Jun 12, 2025
84dcd7c
Merge branch 'main' into feature/apis
GalLalouche Jun 12, 2025
46f7494
Merge branch 'main' into feature/apis
GalLalouche Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/127472.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127472
summary: Change queries ID to be the same as the async
area: ES|QL
type: feature
issues:
- 127187
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
Expand All @@ -20,7 +22,7 @@
/**
* A class that contains all information related to a submitted async execution.
*/
public final class AsyncExecutionId {
public final class AsyncExecutionId implements Writeable {
public static final String ASYNC_EXECUTION_ID_HEADER = "X-Elasticsearch-Async-Id";
public static final String ASYNC_EXECUTION_IS_RUNNING_HEADER = "X-Elasticsearch-Async-Is-Running";

Expand Down Expand Up @@ -115,4 +117,13 @@ public static AsyncExecutionId decode(String id) {
}
return new AsyncExecutionId(docId, new TaskId(taskId), id);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(getEncoded());
}

public static AsyncExecutionId readFrom(StreamInput input) throws IOException {
return decode(input.readString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth adding an AbstractWireTestCase subclass for this then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -781,19 +781,23 @@ public void testListQueryForbidden() throws Exception {

public void testGetQueryAllowed() throws Exception {
// This is a bit tricky, since there is no such running query. We just make sure it didn't fail on forbidden privileges.
Request request = new Request("GET", "_query/queries/foo:1234");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(404)));
setUser(GET_QUERY_REQUEST, "user_with_monitor_privileges");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST));
assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(403)));
}

public void testGetQueryForbidden() throws Exception {
Request request = new Request("GET", "_query/queries/foo:1234");
setUser(request, "user_without_monitor_privileges");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(request));
setUser(GET_QUERY_REQUEST, "user_without_monitor_privileges");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST));
assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(resp.getMessage(), containsString("this action is granted by the cluster privileges [monitor_esql,monitor,manage,all]"));
}

private static final Request GET_QUERY_REQUEST = new Request(
"GET",
"_query/queries/FmJKWHpFRi1OU0l5SU1YcnpuWWhoUWcZWDFuYUJBeW1TY0dKM3otWUs2bDJudzo1Mg=="
);

private void createEnrichPolicy() throws Exception {
createIndex("songs", Settings.EMPTY, """
"properties":{"song_id": {"type": "keyword"}, "title": {"type": "keyword"}, "artist": {"type": "keyword"} }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"id" : 5326,
"type" : "transport",
"action" : "indices:data/read/esql",
"status" : "Ks5ApyqMTtWj5LrKigmCjQ",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this so high in the PR really helped review it!

"description" : "FROM test | STATS MAX(d) by a, b", <1>
"start_time" : "2023-07-31T15:46:32.328Z",
"start_time_in_millis" : 1690818392328,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase {

protected static final Semaphore scriptPermits = new Semaphore(0);
// Incremented onWait. Can be used to check if the onWait process has been reached.
protected static final Semaphore scriptWaits = new Semaphore(0);

protected int pageSize = -1;

Expand Down Expand Up @@ -98,6 +100,7 @@ public void setupIndex() throws IOException {
public static class PausableFieldPlugin extends AbstractPauseFieldPlugin {
@Override
protected boolean onWait() throws InterruptedException {
scriptWaits.release();
return scriptPermits.tryAcquire(1, TimeUnit.MINUTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,21 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.IntOrLongMatcher;
import org.elasticsearch.test.MapMatcher;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.esql.EsqlTestUtils;

import java.util.List;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.core.TimeValue.timeValueSeconds;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.jsonEntityToMap;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;

public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase {
private static final String QUERY = "from test | stats sum(pause_me)";
Expand All @@ -45,31 +42,10 @@ public void testRunningQueries() throws Exception {
try (var initialResponse = sendAsyncQuery()) {
id = initialResponse.asyncExecutionId().get();

assertRunningQueries();
var getResultsRequest = new GetAsyncResultRequest(id);
getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(1));
client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close();
Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries"));
@SuppressWarnings("unchecked")
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(
jsonEntityToMap(listResponse.getEntity()).values()
);
var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet()));
MapMatcher basicMatcher = MapMatcher.matchesMap()
.entry("node", is(taskId.getNodeId()))
.entry("id", IntOrLongMatcher.matches(taskId.getId()))
.entry("query", is(QUERY))
.entry("start_time_millis", IntOrLongMatcher.isIntOrLong())
.entry("running_time_nanos", IntOrLongMatcher.isIntOrLong());
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher);

Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + taskId));
MapMatcher.assertMap(
jsonEntityToMap(getQueryResponse.getEntity()),
basicMatcher.entry("coordinating_node", isA(String.class))
.entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class))))
.entry("documents_found", IntOrLongMatcher.isIntOrLong())
.entry("values_loaded", IntOrLongMatcher.isIntOrLong())
);
} finally {
if (id != null) {
// Finish the query.
Expand All @@ -82,9 +58,44 @@ public void testRunningQueries() throws Exception {
}
}

public void testRunningQueriesSync() throws Exception {
var future = sendSyncQueryAsyncly();
try {
scriptWaits.acquire();
assertRunningQueries();
} finally {
scriptPermits.release(numberOfDocs());
future.actionGet(timeValueSeconds(60)).close();
}
}

private static void assertRunningQueries() throws IOException {
Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries"));
@SuppressWarnings("unchecked")
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(jsonEntityToMap(listResponse.getEntity()).values());
String queryId = EsqlTestUtils.singleValue(listResult.keySet());
MapMatcher basicMatcher = MapMatcher.matchesMap()
.entry("query", is(QUERY))
.entry("start_time_millis", IntOrLongMatcher.isIntOrLong())
.entry("running_time_nanos", IntOrLongMatcher.isIntOrLong());
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher);

Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + queryId));
MapMatcher.assertMap(
jsonEntityToMap(getQueryResponse.getEntity()),
basicMatcher.entry("documents_found", IntOrLongMatcher.isIntOrLong()).entry("values_loaded", IntOrLongMatcher.isIntOrLong())
);
}

private EsqlQueryResponse sendAsyncQuery() {
scriptPermits.drainPermits();
scriptPermits.release(between(1, 5));
return EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()).query(QUERY).execute().actionGet(60, TimeUnit.SECONDS);
}

private ActionFuture<EsqlQueryResponse> sendSyncQueryAsyncly() {
scriptPermits.drainPermits();
scriptPermits.release(between(1, 5));
return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client()).query(QUERY).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;

import java.io.IOException;

public class EsqlGetQueryRequest extends ActionRequest {
private final TaskId id;
private final AsyncExecutionId asyncExecutionId;

public EsqlGetQueryRequest(TaskId id) {
this.id = id;
public EsqlGetQueryRequest(AsyncExecutionId asyncExecutionId) {
this.asyncExecutionId = asyncExecutionId;
}

public TaskId id() {
return id;
public AsyncExecutionId id() {
return asyncExecutionId;
}

public EsqlGetQueryRequest(StreamInput streamInput) throws IOException {
super(streamInput);
id = TaskId.readFromStream(streamInput);
asyncExecutionId = AsyncExecutionId.decode(streamInput.readString());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeWriteable(id);
out.writeWriteable(asyncExecutionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -22,6 +23,7 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.esql.Column;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plugin.EsqlDocIdStatus;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

import java.io.IOException;
Expand Down Expand Up @@ -243,8 +245,14 @@ public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) {

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
EsqlDocIdStatus status = new EsqlDocIdStatus(UUIDs.randomBase64UUID());
// Pass the query as the description
return new CancellableTask(id, type, action, query, parentTaskId, headers);
return new CancellableTask(id, type, action, query, parentTaskId, headers) {
@Override
public Status getStatus() {
return status;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's probably worth making this a concrete class with a name and stuff.

Could you put the doc id as a member and return a status that's xcontent renders like:

{
  "request_id": "<the encoded request id we'd get from the list api"
}

That way we can see the whole encoded blob in the tasks list api and folks won't wonder what that random thing means in the status?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we don't have the local node id here - but we could pass it in to this method. You'd have to modify the 30 implementers, but I guess that's ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

};
}

// Setter for tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.GET;
Expand All @@ -37,7 +36,7 @@ public List<Route> routes() {
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
return restChannelConsumer(request, client);
}

Expand All @@ -46,7 +45,7 @@ private static RestChannelConsumer restChannelConsumer(RestRequest request, Node

String id = request.param("id");
var action = id != null ? EsqlGetQueryAction.INSTANCE : EsqlListQueriesAction.INSTANCE;
var actionRequest = id != null ? new EsqlGetQueryRequest(new TaskId(id)) : new EsqlListQueriesRequest();
var actionRequest = id != null ? new EsqlGetQueryRequest(AsyncExecutionId.decode(id)) : new EsqlListQueriesRequest();

return channel -> client.execute(action, actionRequest, new RestToXContentListener<>(channel));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;

import java.io.IOException;

/**
* Keeps track of the doc ID, which is itself used for {@link AsyncExecutionId}.
*
* The reason this contains just the doc ID and not the entire {@link AsyncExecutionId} is that during the creation of
* {@link EsqlQueryAction}, we don't have access to the node ID yet, thus we can't create a {@link TaskId} yet.
*/
public record EsqlDocIdStatus(String id) implements Task.Status {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Task.Status.class,
"EsqlDocIdStatus",
EsqlDocIdStatus::new
);

@Override
public String getWriteableName() {
return ENTRY.name;
}

private EsqlDocIdStatus(StreamInput stream) throws IOException {
this(stream.readString());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.rawValue(Strings.format("\"%s\"", id));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;

public class EsqlGetQueryResponse extends ActionResponse implements ToXContentObject {
// This is rather limited at the moment, as we don't extract information such as CPU and memory usage, owning user, etc. for the task.
Expand All @@ -24,22 +23,16 @@ public record DetailedQuery(
long runningTimeNanos,
long documentsFound,
long valuesLoaded,
String query,
String coordinatingNode,
List<String> dataNodes
String query
) implements ToXContentObject {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("id", id.getId());
builder.field("node", id.getNodeId());
builder.field("start_time_millis", startTimeMillis);
builder.field("running_time_nanos", runningTimeNanos);
builder.field("documents_found", documentsFound);
builder.field("values_loaded", valuesLoaded);
builder.field("query", query);
builder.field("coordinating_node", coordinatingNode);
builder.field("data_nodes", dataNodes);
builder.endObject();
return builder;
}
Expand Down
Loading
Loading