Skip to content

Commit 58ccb18

Browse files
committed
More tests
1 parent 8345aa1 commit 58ccb18

File tree

9 files changed

+126
-85
lines changed

9 files changed

+126
-85
lines changed

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.client.RequestOptions;
1717
import org.elasticsearch.client.Response;
1818
import org.elasticsearch.client.ResponseException;
19+
import org.elasticsearch.client.ResponseListener;
1920
import org.elasticsearch.client.WarningsHandler;
2021
import org.elasticsearch.common.bytes.BytesArray;
2122
import org.elasticsearch.common.io.Streams;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.core.TimeValue;
2728
import org.elasticsearch.logging.LogManager;
2829
import org.elasticsearch.logging.Logger;
30+
import org.elasticsearch.tasks.TaskId;
2931
import org.elasticsearch.test.ListMatcher;
3032
import org.elasticsearch.test.rest.ESRestTestCase;
3133
import org.elasticsearch.xcontent.ToXContent;
@@ -44,6 +46,7 @@
4446
import java.io.InputStreamReader;
4547
import java.io.OutputStream;
4648
import java.nio.charset.StandardCharsets;
49+
import java.time.Duration;
4750
import java.time.ZoneId;
4851
import java.util.ArrayList;
4952
import java.util.Arrays;
@@ -70,6 +73,7 @@
7073
import static org.hamcrest.Matchers.emptyOrNullString;
7174
import static org.hamcrest.Matchers.equalTo;
7275
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
76+
import static org.hamcrest.Matchers.hasKey;
7377
import static org.hamcrest.Matchers.is;
7478
import static org.hamcrest.Matchers.not;
7579
import static org.hamcrest.Matchers.nullValue;
@@ -1345,12 +1349,56 @@ public void testAsyncGetWithoutContentType() throws IOException {
13451349

13461350
}
13471351

1348-
public void testListApi_noRunningQueries_returnsAnEmptyArray() throws Exception {
1352+
public void testListApi_noRunningQueries_returnsAnObject() throws Exception {
13491353
Request request = prepareListQueriesRequest();
13501354
Response response = performRequest(request);
13511355
assertThat(entityToMap(response.getEntity(), XContentType.JSON), is(Map.of("queries", Map.of())));
13521356
}
13531357

1358+
public void testListApi_runningQuery_returnsQueriesObject() throws Exception {
1359+
bulkLoadTestData(1);
1360+
String query = fromIndex() + " | keep keyword, integer | where delay(10s) | limit 100 ";
1361+
var builder = requestObjectBuilder().query(query);
1362+
Request request = prepareRequest(SYNC);
1363+
String mediaType = attachBody(builder.build(), request);
1364+
RequestOptions.Builder options = request.getOptions().toBuilder();
1365+
options.addHeader("Content-Type", mediaType);
1366+
options.addHeader("Accept", mediaType);
1367+
request.setOptions(options);
1368+
client().performRequestAsync(request, new ResponseListener() {
1369+
@Override
1370+
public void onSuccess(Response response) {}
1371+
1372+
@Override
1373+
public void onFailure(Exception exception) {}
1374+
});
1375+
Thread.sleep(Duration.ofSeconds(5));
1376+
Response response = performRequest(prepareListQueriesRequest());
1377+
@SuppressWarnings("unchecked")
1378+
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(
1379+
entityToMap(response.getEntity(), XContentType.JSON).values()
1380+
);
1381+
var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet()));
1382+
var queryFromListResult = EsqlTestUtils.singleValue(listResult.values());
1383+
assertThat(queryFromListResult.get("id"), is((int) taskId.getId()));
1384+
assertThat(queryFromListResult.get("node"), is(taskId.getNodeId()));
1385+
assertThat(queryFromListResult.get("query"), is(query));
1386+
assertThat(queryFromListResult, hasKey("start_time_millis"));
1387+
assertThat(queryFromListResult, hasKey("running_time_nanos"));
1388+
1389+
response = performRequest(prepareGetQueryRequest(taskId));
1390+
@SuppressWarnings("unchecked")
1391+
Map<String, Object> getQueryResult = entityToMap(response.getEntity(), XContentType.JSON);
1392+
assertThat(getQueryResult.get("id"), is((int) taskId.getId()));
1393+
assertThat(getQueryResult.get("node"), is(taskId.getNodeId()));
1394+
assertThat(getQueryResult.get("query"), is(query));
1395+
assertThat(getQueryResult.get("start_time_millis"), is(queryFromListResult.get("start_time_millis")));
1396+
assertThat(getQueryResult, hasKey("running_time_nanos"));
1397+
assertThat(getQueryResult, hasKey("coordinating_node"));
1398+
assertThat(getQueryResult, hasKey("data_nodes"));
1399+
Thread.sleep(Duration.ofSeconds(5));
1400+
}
1401+
13541402
protected static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException {
13551403
requestObject.build();
13561404
Request request = prepareRequest(mode);
@@ -1536,6 +1584,10 @@ private static Request prepareListQueriesRequest() {
15361584
return finishRequest(new Request("GET", "/_query/queries/"));
15371585
}
15381586

1587+
private static Request prepareGetQueryRequest(TaskId id) {
1588+
return finishRequest(new Request("GET", "/_query/queries/" + id));
1589+
}
1590+
15391591
private static Request finishRequest(Request request) {
15401592
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
15411593
request.addParameter("pretty", "true"); // Improves error reporting readability

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
public class EsqlGetQueryAction extends ActionType<EsqlGetQueryResponse> {
1414
public static final EsqlGetQueryAction INSTANCE = new EsqlGetQueryAction();
15-
public static final String NAME = "cluster:data/read/esql/query";
15+
public static final String NAME = "cluster:monitor/data/read/esql/query";
1616

1717
private EsqlGetQueryAction() {
1818
super(NAME);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,29 @@
1111
import org.elasticsearch.action.ActionRequestValidationException;
1212
import org.elasticsearch.common.io.stream.StreamInput;
1313
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.tasks.TaskId;
1415

1516
import java.io.IOException;
1617

1718
public class EsqlGetQueryRequest extends ActionRequest {
18-
private final String id;
19+
private final TaskId id;
1920

20-
public EsqlGetQueryRequest(String id) {
21+
public EsqlGetQueryRequest(TaskId id) {
2122
this.id = id;
2223
}
2324

24-
public String id() {
25+
public TaskId id() {
2526
return id;
2627
}
2728

2829
public EsqlGetQueryRequest(StreamInput streamInput) throws IOException {
2930
super(streamInput);
30-
id = streamInput.readString();
31+
id = TaskId.readFromStream(streamInput);
3132
}
3233

3334
@Override
3435
public void writeTo(StreamOutput out) throws IOException {
35-
out.writeString(id);
36+
out.writeWriteable(id);
3637
}
3738

3839
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
public class EsqlListQueriesAction extends ActionType<EsqlListQueriesResponse> {
1414
public static final EsqlListQueriesAction INSTANCE = new EsqlListQueriesAction();
15-
public static final String NAME = "cluster:data/read/esql/queries";
15+
public static final String NAME = "cluster:monitor/data/read/esql/queries";
1616

1717
private EsqlListQueriesAction() {
1818
super(NAME);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.rest.Scope;
1616
import org.elasticsearch.rest.ServerlessScope;
1717
import org.elasticsearch.rest.action.RestToXContentListener;
18+
import org.elasticsearch.tasks.TaskId;
1819

1920
import java.io.IOException;
2021
import java.util.List;
@@ -45,7 +46,11 @@ private static RestChannelConsumer restChannelConsumer(RestRequest request, Node
4546

4647
String id = request.param("id");
4748
return id != null
48-
? (channel -> client.execute(EsqlGetQueryAction.INSTANCE, new EsqlGetQueryRequest(id), new RestToXContentListener<>(channel)))
49+
? (channel -> client.execute(
50+
EsqlGetQueryAction.INSTANCE,
51+
new EsqlGetQueryRequest(new TaskId(id)),
52+
new RestToXContentListener<>(channel)
53+
))
4954
: (channel -> client.execute(
5055
EsqlListQueriesAction.INSTANCE,
5156
new EsqlListQueriesRequest(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import org.elasticsearch.action.ActionResponse;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
12-
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.tasks.TaskId;
1313
import org.elasticsearch.xcontent.ToXContentObject;
1414
import org.elasticsearch.xcontent.XContentBuilder;
1515

@@ -19,32 +19,23 @@
1919
public class EsqlGetQueryResponse extends ActionResponse implements ToXContentObject {
2020
// This is rather limited at the moment, as we don't extract information such as CPU and memory usage, owning user, etc. for the task.
2121
public record DetailedQuery(
22-
String id,
22+
TaskId id,
2323
long startTimeMillis,
2424
long runningTimeNanos,
2525
String query,
2626
String coordinatingNode,
2727
List<String> dataNodes
28-
) implements Writeable, ToXContentObject {
29-
@Override
30-
public void writeTo(StreamOutput out) throws IOException {
31-
out.writeString(id);
32-
out.writeLong(startTimeMillis);
33-
out.writeLong(runningTimeNanos);
34-
out.writeString(query);
35-
out.writeString(coordinatingNode);
36-
out.writeStringCollection(dataNodes);
37-
}
38-
28+
) implements ToXContentObject {
3929
@Override
4030
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
4131
builder.startObject();
42-
builder.field("id", id);
43-
builder.field("startTimeMillis", startTimeMillis);
44-
builder.field("runningTimeNanos", runningTimeNanos);
32+
builder.field("id", id.getId());
33+
builder.field("node", id.getNodeId());
34+
builder.field("start_time_millis", startTimeMillis);
35+
builder.field("running_time_nanos", runningTimeNanos);
4536
builder.field("query", query);
46-
builder.field("coordinatingNode", coordinatingNode);
47-
builder.field("dataNodes", dataNodes);
37+
builder.field("coordinating_node", coordinatingNode);
38+
builder.field("data_nodes", dataNodes);
4839
builder.endObject();
4940
return builder;
5041
}
@@ -58,7 +49,7 @@ public EsqlGetQueryResponse(DetailedQuery query) {
5849

5950
@Override
6051
public void writeTo(StreamOutput out) throws IOException {
61-
out.writeWriteable(query);
52+
throw new AssertionError("should not reach here");
6253
}
6354

6455
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.elasticsearch.action.ActionResponse;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
12-
import org.elasticsearch.common.io.stream.Writeable;
1312
import org.elasticsearch.xcontent.ToXContentObject;
1413
import org.elasticsearch.xcontent.XContentBuilder;
1514

@@ -19,21 +18,16 @@
1918
public class EsqlListQueriesResponse extends ActionResponse implements ToXContentObject {
2019
private final List<Query> queries;
2120

22-
public record Query(String id, long startTimeMillis, long runningTimeNanos, String query) implements Writeable, ToXContentObject {
23-
@Override
24-
public void writeTo(StreamOutput out) throws IOException {
25-
out.writeString(id);
26-
out.writeLong(startTimeMillis);
27-
out.writeLong(runningTimeNanos);
28-
out.writeString(query);
29-
}
30-
21+
public record Query(org.elasticsearch.tasks.TaskId taskId, long startTimeMillis, long runningTimeNanos, String query)
22+
implements
23+
ToXContentObject {
3124
@Override
3225
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
33-
builder.startObject();
34-
builder.field("id", id);
35-
builder.field("startTimeMillis", startTimeMillis);
36-
builder.field("runningTimeNanos", runningTimeNanos);
26+
builder.startObject(taskId.toString());
27+
builder.field("id", taskId.getId());
28+
builder.field("node", taskId.getNodeId());
29+
builder.field("start_time_millis", startTimeMillis);
30+
builder.field("running_time_nanos", runningTimeNanos);
3731
builder.field("query", query);
3832
builder.endObject();
3933
return builder;
@@ -46,16 +40,16 @@ public EsqlListQueriesResponse(List<Query> queries) {
4640

4741
@Override
4842
public void writeTo(StreamOutput out) throws IOException {
49-
out.writeCollection(queries);
43+
throw new AssertionError("should not reach here");
5044
}
5145

5246
@Override
5347
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
5448
builder.startObject();
5549
builder.startObject("queries");
56-
// for (Query query : queries) {
57-
// query.toXContent(builder, params);
58-
// }
50+
for (Query query : queries) {
51+
query.toXContent(builder, params);
52+
}
5953
builder.endObject();
6054
builder.endObject();
6155
return builder;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.compute.operator.DriverTaskRunner;
2020
import org.elasticsearch.injection.guice.Inject;
2121
import org.elasticsearch.tasks.Task;
22-
import org.elasticsearch.tasks.TaskId;
2322
import org.elasticsearch.tasks.TaskInfo;
2423
import org.elasticsearch.transport.TransportService;
2524
import org.elasticsearch.xpack.esql.action.EsqlGetQueryAction;
@@ -36,43 +35,42 @@ public TransportEsqlGetQueryAction(TransportService transportService, NodeClient
3635

3736
@Override
3837
protected void doExecute(Task task, EsqlGetQueryRequest request, ActionListener<EsqlGetQueryResponse> listener) {
39-
new GetTaskRequestBuilder(nodeClient).setTaskId(new TaskId(nodeClient.getLocalNodeId(), Long.valueOf(request.id())))
40-
.execute(new ActionListener<>() {
41-
@Override
42-
public void onResponse(GetTaskResponse response) {
43-
TaskInfo task = response.getTask().getTask();
44-
String node = task.node();
45-
new ListTasksRequestBuilder(nodeClient).setDetailed(true)
46-
.setActions(DriverTaskRunner.ACTION_NAME)
47-
.setTargetParentTaskId(new TaskId(node, Long.valueOf(request.id())))
48-
.execute(new ActionListener<>() {
49-
@Override
50-
public void onResponse(ListTasksResponse response) {
51-
listener.onResponse(
52-
new EsqlGetQueryResponse(
53-
new EsqlGetQueryResponse.DetailedQuery(
54-
request.id(),
55-
task.startTime(),
56-
task.runningTimeNanos(),
57-
task.description(),
58-
task.node(),
59-
response.getTasks().stream().map(t -> t.node()).distinct().toList()
60-
)
38+
new GetTaskRequestBuilder(nodeClient).setTaskId(request.id()).execute(new ActionListener<>() {
39+
@Override
40+
public void onResponse(GetTaskResponse response) {
41+
TaskInfo task = response.getTask().getTask();
42+
new ListTasksRequestBuilder(nodeClient).setDetailed(true)
43+
.setActions(DriverTaskRunner.ACTION_NAME)
44+
.setTargetParentTaskId(request.id())
45+
.execute(new ActionListener<>() {
46+
@Override
47+
public void onResponse(ListTasksResponse response) {
48+
listener.onResponse(
49+
new EsqlGetQueryResponse(
50+
new EsqlGetQueryResponse.DetailedQuery(
51+
request.id(),
52+
task.startTime(),
53+
task.runningTimeNanos(),
54+
task.description(),
55+
// FIXME(gal, NOCOMMIT) This should be the coordinating node... how do I get that?
56+
task.node(),
57+
response.getTasks().stream().map(TaskInfo::node).distinct().toList()
6158
)
62-
);
63-
}
59+
)
60+
);
61+
}
6462

65-
@Override
66-
public void onFailure(Exception e) {
67-
listener.onFailure(e);
68-
}
69-
});
70-
}
63+
@Override
64+
public void onFailure(Exception e) {
65+
listener.onFailure(e);
66+
}
67+
});
68+
}
7169

72-
@Override
73-
public void onFailure(Exception e) {
74-
listener.onFailure(e);
75-
}
76-
});
70+
@Override
71+
public void onFailure(Exception e) {
72+
listener.onFailure(e);
73+
}
74+
});
7775
}
7876
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlListQueriesAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ protected void doExecute(Task task, EsqlListQueriesRequest request, ActionListen
4646
public void onResponse(ListTasksResponse response) {
4747
List<EsqlListQueriesResponse.Query> queries = response.getTasks()
4848
.stream()
49-
.map(TransportEsqlListQueriesAction::getQuery)
49+
.map(TransportEsqlListQueriesAction::toQuery)
5050
.toList();
5151
listener.onResponse(new EsqlListQueriesResponse(queries));
5252
}
@@ -58,9 +58,9 @@ public void onFailure(Exception e) {
5858
});
5959
}
6060

61-
private static EsqlListQueriesResponse.Query getQuery(TaskInfo taskInfo) {
61+
private static EsqlListQueriesResponse.Query toQuery(TaskInfo taskInfo) {
6262
return new EsqlListQueriesResponse.Query(
63-
String.valueOf(taskInfo.id()),
63+
taskInfo.taskId(),
6464
taskInfo.startTime(),
6565
taskInfo.runningTimeNanos(),
6666
taskInfo.description()

0 commit comments

Comments
 (0)