Skip to content

Commit 0383960

Browse files
committed
ESQL: Change queries ID to be the same as the async
Also the individual ID, node, and data node tags. GitHub: Resolves #127187
1 parent 45d321d commit 0383960

File tree

12 files changed

+128
-47
lines changed

12 files changed

+128
-47
lines changed

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase {
3131

3232
protected static final Semaphore scriptPermits = new Semaphore(0);
33+
// Incremented onWait. Can be used to check if the onWait process has been reached.
34+
protected static final Semaphore scriptWaits = new Semaphore(0);
3335

3436
protected int pageSize = -1;
3537

@@ -98,6 +100,7 @@ public void setupIndex() throws IOException {
98100
public static class PausableFieldPlugin extends AbstractPauseFieldPlugin {
99101
@Override
100102
protected boolean onWait() throws InterruptedException {
103+
scriptWaits.release();
101104
return scriptPermits.tryAcquire(1, TimeUnit.MINUTES);
102105
}
103106
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,20 @@
77

88
package org.elasticsearch.xpack.esql.action;
99

10+
import org.elasticsearch.action.ActionFuture;
1011
import org.elasticsearch.client.Request;
1112
import org.elasticsearch.client.Response;
12-
import org.elasticsearch.tasks.TaskId;
1313
import org.elasticsearch.test.IntOrLongMatcher;
1414
import org.elasticsearch.test.MapMatcher;
1515
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
1616
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1717

18-
import java.util.List;
1918
import java.util.Map;
2019
import java.util.concurrent.TimeUnit;
2120

2221
import static org.elasticsearch.core.TimeValue.timeValueSeconds;
2322
import static org.elasticsearch.xpack.esql.EsqlTestUtils.jsonEntityToMap;
24-
import static org.hamcrest.Matchers.allOf;
25-
import static org.hamcrest.Matchers.everyItem;
2623
import static org.hamcrest.Matchers.is;
27-
import static org.hamcrest.Matchers.isA;
2824

2925
public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase {
3026
private static final String QUERY = "from test | stats sum(pause_me)";
@@ -53,22 +49,17 @@ public void testRunningQueries() throws Exception {
5349
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(
5450
jsonEntityToMap(listResponse.getEntity()).values()
5551
);
56-
var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet()));
52+
String queryId = EsqlTestUtils.singleValue(listResult.keySet());
5753
MapMatcher basicMatcher = MapMatcher.matchesMap()
58-
.entry("node", is(taskId.getNodeId()))
59-
.entry("id", IntOrLongMatcher.matches(taskId.getId()))
6054
.entry("query", is(QUERY))
6155
.entry("start_time_millis", IntOrLongMatcher.isIntOrLong())
6256
.entry("running_time_nanos", IntOrLongMatcher.isIntOrLong());
6357
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher);
6458

65-
Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + taskId));
59+
Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + queryId));
6660
MapMatcher.assertMap(
6761
jsonEntityToMap(getQueryResponse.getEntity()),
68-
basicMatcher.entry("coordinating_node", isA(String.class))
69-
.entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class))))
70-
.entry("documents_found", IntOrLongMatcher.isIntOrLong())
71-
.entry("values_loaded", IntOrLongMatcher.isIntOrLong())
62+
basicMatcher.entry("documents_found", IntOrLongMatcher.isIntOrLong()).entry("values_loaded", IntOrLongMatcher.isIntOrLong())
7263
);
7364
} finally {
7465
if (id != null) {
@@ -82,9 +73,42 @@ public void testRunningQueries() throws Exception {
8273
}
8374
}
8475

76+
public void testRunningQueriesSync() throws Exception {
77+
var future = sendSyncQueryAsyncly();
78+
try {
79+
scriptWaits.acquire();
80+
Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries"));
81+
@SuppressWarnings("unchecked")
82+
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(
83+
jsonEntityToMap(listResponse.getEntity()).values()
84+
);
85+
String queryId = EsqlTestUtils.singleValue(listResult.keySet());
86+
MapMatcher basicMatcher = MapMatcher.matchesMap()
87+
.entry("query", is(QUERY))
88+
.entry("start_time_millis", IntOrLongMatcher.isIntOrLong())
89+
.entry("running_time_nanos", IntOrLongMatcher.isIntOrLong());
90+
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher);
91+
92+
Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + queryId));
93+
MapMatcher.assertMap(
94+
jsonEntityToMap(getQueryResponse.getEntity()),
95+
basicMatcher.entry("documents_found", IntOrLongMatcher.isIntOrLong()).entry("values_loaded", IntOrLongMatcher.isIntOrLong())
96+
);
97+
} finally {
98+
scriptPermits.release(numberOfDocs());
99+
future.actionGet(timeValueSeconds(60)).close();
100+
}
101+
}
102+
85103
private EsqlQueryResponse sendAsyncQuery() {
86104
scriptPermits.drainPermits();
87105
scriptPermits.release(between(1, 5));
88106
return EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()).query(QUERY).execute().actionGet(60, TimeUnit.SECONDS);
89107
}
108+
109+
private ActionFuture<EsqlQueryResponse> sendSyncQueryAsyncly() {
110+
scriptPermits.drainPermits();
111+
scriptPermits.release(between(1, 5));
112+
return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client()).query(QUERY).execute();
113+
}
90114
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,29 +11,29 @@
1111
import org.elasticsearch.action.ActionRequestValidationException;
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14-
import org.elasticsearch.tasks.TaskId;
14+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
1515

1616
import java.io.IOException;
1717

1818
public class EsqlGetQueryRequest extends ActionRequest {
19-
private final TaskId id;
19+
private final AsyncExecutionId asyncExecutionId;
2020

21-
public EsqlGetQueryRequest(TaskId id) {
22-
this.id = id;
21+
public EsqlGetQueryRequest(AsyncExecutionId asyncExecutionId) {
22+
this.asyncExecutionId = asyncExecutionId;
2323
}
2424

25-
public TaskId id() {
26-
return id;
25+
public AsyncExecutionId id() {
26+
return asyncExecutionId;
2727
}
2828

2929
public EsqlGetQueryRequest(StreamInput streamInput) throws IOException {
3030
super(streamInput);
31-
id = TaskId.readFromStream(streamInput);
31+
asyncExecutionId = AsyncExecutionId.decode(streamInput.readString());
3232
}
3333

3434
@Override
3535
public void writeTo(StreamOutput out) throws IOException {
36-
out.writeWriteable(id);
36+
out.writeWriteable(asyncExecutionId);
3737
}
3838

3939
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.ActionRequestValidationException;
1212
import org.elasticsearch.action.CompositeIndicesRequest;
1313
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.common.UUIDs;
1415
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.common.settings.Settings;
@@ -22,6 +23,7 @@
2223
import org.elasticsearch.tasks.TaskId;
2324
import org.elasticsearch.xpack.esql.Column;
2425
import org.elasticsearch.xpack.esql.parser.QueryParams;
26+
import org.elasticsearch.xpack.esql.plugin.EsqlDocIdStatus;
2527
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
2628

2729
import java.io.IOException;
@@ -243,8 +245,14 @@ public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) {
243245

244246
@Override
245247
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
248+
EsqlDocIdStatus status = new EsqlDocIdStatus(UUIDs.randomBase64UUID());
246249
// Pass the query as the description
247-
return new CancellableTask(id, type, action, query, parentTaskId, headers);
250+
return new CancellableTask(id, type, action, query, parentTaskId, headers) {
251+
@Override
252+
public Status getStatus() {
253+
return status;
254+
}
255+
};
248256
}
249257

250258
// Setter for tests

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
import org.elasticsearch.rest.Scope;
1616
import org.elasticsearch.rest.ServerlessScope;
1717
import org.elasticsearch.rest.action.RestToXContentListener;
18-
import org.elasticsearch.tasks.TaskId;
18+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
1919

20-
import java.io.IOException;
2120
import java.util.List;
2221

2322
import static org.elasticsearch.rest.RestRequest.Method.GET;
@@ -37,7 +36,7 @@ public List<Route> routes() {
3736
}
3837

3938
@Override
40-
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
39+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
4140
return restChannelConsumer(request, client);
4241
}
4342

@@ -46,7 +45,7 @@ private static RestChannelConsumer restChannelConsumer(RestRequest request, Node
4645

4746
String id = request.param("id");
4847
var action = id != null ? EsqlGetQueryAction.INSTANCE : EsqlListQueriesAction.INSTANCE;
49-
var actionRequest = id != null ? new EsqlGetQueryRequest(new TaskId(id)) : new EsqlListQueriesRequest();
48+
var actionRequest = id != null ? new EsqlGetQueryRequest(AsyncExecutionId.decode(id)) : new EsqlListQueriesRequest();
5049

5150
return channel -> client.execute(action, actionRequest, new RestToXContentListener<>(channel));
5251
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plugin;
9+
10+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.tasks.Task;
14+
import org.elasticsearch.tasks.TaskId;
15+
import org.elasticsearch.xcontent.XContentBuilder;
16+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
17+
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
18+
19+
import java.io.IOException;
20+
21+
/**
22+
* Keeps track of the doc ID, which is itself used for {@link AsyncExecutionId}.
23+
*
24+
* The reason this contains just the doc ID and not the entire {@link AsyncExecutionId} is that during the creation of
25+
* {@link EsqlQueryAction}, we don't have access to the node ID yet, thus we can't create a {@link TaskId} yet.
26+
*/
27+
public record EsqlDocIdStatus(String id) implements Task.Status {
28+
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
29+
Task.Status.class,
30+
"EsqlDocIdStatus",
31+
EsqlDocIdStatus::new
32+
);
33+
34+
@Override
35+
public String getWriteableName() {
36+
return ENTRY.name;
37+
}
38+
39+
private EsqlDocIdStatus(StreamInput stream) throws IOException {
40+
this(stream.readString());
41+
}
42+
43+
@Override
44+
public void writeTo(StreamOutput out) throws IOException {
45+
out.writeString(id);
46+
}
47+
48+
@Override
49+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
50+
return builder.rawValue(id);
51+
}
52+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.xcontent.XContentBuilder;
1515

1616
import java.io.IOException;
17-
import java.util.List;
1817

1918
public class EsqlGetQueryResponse extends ActionResponse implements ToXContentObject {
2019
// This is rather limited at the moment, as we don't extract information such as CPU and memory usage, owning user, etc. for the task.
@@ -24,22 +23,16 @@ public record DetailedQuery(
2423
long runningTimeNanos,
2524
long documentsFound,
2625
long valuesLoaded,
27-
String query,
28-
String coordinatingNode,
29-
List<String> dataNodes
26+
String query
3027
) implements ToXContentObject {
3128
@Override
3229
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
3330
builder.startObject();
34-
builder.field("id", id.getId());
35-
builder.field("node", id.getNodeId());
3631
builder.field("start_time_millis", startTimeMillis);
3732
builder.field("running_time_nanos", runningTimeNanos);
3833
builder.field("documents_found", documentsFound);
3934
builder.field("values_loaded", valuesLoaded);
4035
builder.field("query", query);
41-
builder.field("coordinating_node", coordinatingNode);
42-
builder.field("data_nodes", dataNodes);
4336
builder.endObject();
4437
return builder;
4538
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,21 @@
99

1010
import org.elasticsearch.action.ActionResponse;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
12-
import org.elasticsearch.tasks.TaskId;
1312
import org.elasticsearch.xcontent.ToXContentFragment;
1413
import org.elasticsearch.xcontent.ToXContentObject;
1514
import org.elasticsearch.xcontent.XContentBuilder;
15+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
1616

1717
import java.io.IOException;
1818
import java.util.List;
1919

2020
public class EsqlListQueriesResponse extends ActionResponse implements ToXContentObject {
2121
private final List<Query> queries;
2222

23-
public record Query(TaskId taskId, long startTimeMillis, long runningTimeNanos, String query) implements ToXContentFragment {
23+
public record Query(AsyncExecutionId id, long startTimeMillis, long runningTimeNanos, String query) implements ToXContentFragment {
2424
@Override
2525
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
26-
builder.startObject(taskId.toString());
27-
builder.field("id", taskId.getId());
28-
builder.field("node", taskId.getNodeId());
26+
builder.startObject(id.getEncoded());
2927
builder.field("start_time_millis", startTimeMillis);
3028
builder.field("running_time_nanos", runningTimeNanos);
3129
builder.field("query", query);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
275275
entries.add(AbstractPageMappingOperator.Status.ENTRY);
276276
entries.add(AbstractPageMappingToIteratorOperator.Status.ENTRY);
277277
entries.add(AggregationOperator.Status.ENTRY);
278+
entries.add(EsqlDocIdStatus.ENTRY);
278279
entries.add(ExchangeSinkOperator.Status.ENTRY);
279280
entries.add(ExchangeSourceOperator.Status.ENTRY);
280281
entries.add(HashAggregationOperator.Status.ENTRY);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ protected void doExecute(Task task, EsqlGetQueryRequest request, ActionListener<
4949
nodeClient,
5050
ESQL_ORIGIN,
5151
TransportGetTaskAction.TYPE,
52-
new GetTaskRequest().setTaskId(request.id()),
52+
new GetTaskRequest().setTaskId(request.id().getTaskId()),
5353
new ActionListener<>() {
5454
@Override
5555
public void onResponse(GetTaskResponse response) {
@@ -64,7 +64,7 @@ public void onResponse(GetTaskResponse response) {
6464
TransportListTasksAction.TYPE,
6565
new ListTasksRequest().setDetailed(true)
6666
.setActions(DriverTaskRunner.ACTION_NAME)
67-
.setTargetParentTaskId(request.id()),
67+
.setTargetParentTaskId(request.id().getTaskId()),
6868
new ActionListener<>() {
6969
@Override
7070
public void onResponse(ListTasksResponse response) {
@@ -91,7 +91,6 @@ public void onFailure(Exception e) {
9191

9292
private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo main, ListTasksResponse sub) {
9393
String query = main.description();
94-
String coordinatingNode = main.node();
9594

9695
// TODO include completed drivers in documentsFound and valuesLoaded
9796
long documentsFound = 0;
@@ -110,9 +109,7 @@ private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo main,
110109
main.runningTimeNanos(),
111110
documentsFound,
112111
valuesLoaded,
113-
query,
114-
coordinatingNode,
115-
sub.getTasks().stream().map(TaskInfo::node).distinct().toList() // Data nodes
112+
query
116113
);
117114
}
118115
}

0 commit comments

Comments
 (0)