Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0383960
ESQL: Change queries ID to be the same as the async
GalLalouche Apr 24, 2025
a463ba5
Update docs/changelog/127472.yaml
GalLalouche Apr 28, 2025
0948424
Fix compilation error
GalLalouche Apr 28, 2025
25fee06
Merge branch 'feature/apis' of github.com:GalLalouche/elasticsearch i…
GalLalouche Apr 28, 2025
9f2e297
Merge branch 'main' into feature/apis
GalLalouche Apr 28, 2025
ec7f125
Refactor test
GalLalouche Apr 28, 2025
79a9669
Fix failing test
GalLalouche Apr 28, 2025
e3958c0
Fix failing test
GalLalouche Apr 29, 2025
de60192
Fix failing test (third time's the chard?)
GalLalouche Apr 29, 2025
9011486
Merge branch 'main' into feature/apis
GalLalouche Apr 29, 2025
6dfa323
Fourth time then
GalLalouche Apr 29, 2025
24b654e
Added wire test
GalLalouche May 5, 2025
6791493
Changed status to contain AsyncExecutionId
GalLalouche May 6, 2025
b1eedc6
Merge branch 'main' into feature/apis
GalLalouche May 6, 2025
c6a717a
Merge branch 'main' into feature/apis
GalLalouche May 26, 2025
5a017be
Update accept hearders
GalLalouche May 26, 2025
62e421e
Fix missing import
GalLalouche May 26, 2025
de56665
Review fixes
GalLalouche May 29, 2025
fcdb7a8
Merge branch 'main' into feature/apis
GalLalouche May 29, 2025
398ba4c
Merge branch 'main' into feature/apis
GalLalouche Jun 11, 2025
f46d1fa
Added a TODO for createTask Nullable
GalLalouche Jun 11, 2025
45b4e8e
Merge branch 'main' into feature/apis
GalLalouche Jun 11, 2025
e61f293
Fix failing test
GalLalouche Jun 11, 2025
bbd87ac
Merge branch 'main' into feature/apis
GalLalouche Jun 11, 2025
7adbc29
Merge branch 'main' into feature/apis
GalLalouche Jun 11, 2025
2abb63d
Merge branch 'main' into feature/apis
GalLalouche Jun 12, 2025
84dcd7c
Merge branch 'main' into feature/apis
GalLalouche Jun 12, 2025
46f7494
Merge branch 'main' into feature/apis
GalLalouche Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/127472.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127472
summary: Change queries ID to be the same as the async
area: ES|QL
type: feature
issues:
- 127187
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
"stability": "experimental",
"visibility": "public",
"headers": {
"accept": [],
"content_type": [
"application/json"
]
"accept": ["application/json"],
"content_type": ["application/json"]
},
"url": {
"paths": [
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.tasks;

import org.elasticsearch.core.Nullable;

import java.util.Map;

/**
Expand Down Expand Up @@ -52,6 +54,21 @@ default Task createTask(long id, String type, String action, TaskId parentTaskId
return new Task(id, type, action, getDescription(), parentTaskId, headers);
}

/**
* Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID.
*/
// TODO remove the above overload, use only this one.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a TODO you are planning to followup with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I wanted to keep this PR cleaner.

default Task createTask(
@Nullable String localNodeId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this really be null? I don't think that is valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, localNodeId is @Nullable in DiscoveryNodes, so at least at a very superficial static analysis level, yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't be null in practice. I asked @DaveCTurner about this and he surmised this is a holdover from the days of TransportClient where there was no local "node".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like @rjernst is going to be away for a well deserve two weeks. A ton of tests rely on this being null. Let's leave it laid out like this for now, get this in, and poke at the tests for a bit. Hopefully it's big bug mechanical. We can decide from there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify: it's not that tests rely on this being null, rather they don't make sure it's not null.

long id,
String type,
String action,
TaskId parentTaskId,
Map<String, String> headers
) {
return createTask(id, type, action, parentTaskId, headers);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps localNodeId and id should be combined into a TaskId as a parameter to this like parentTaskId?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is assuming that localNodeId can never be null. If it can't, then I'll add an assert in TaskManager and replace this with a non-nullable TaskId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S. This would make it slightly more involved to remove that above overload, since all the overriders would need to extract the node ID from the TaskId, but it's not that big of a deal.

}

/**
* Returns optional description of the request to be displayed by the task manager
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,14 @@ public Task register(String type, String action, TaskAwareRequest request, boole
headers.put(key, httpHeader);
}
}
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
Task task = request.createTask(
lastDiscoveryNodes.getLocalNodeId(),
taskIdGenerator.incrementAndGet(),
type,
action,
request.getParentTask(),
headers
);
Objects.requireNonNull(task);
assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId";
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
Expand All @@ -20,7 +22,7 @@
/**
* A class that contains all information related to a submitted async execution.
*/
public final class AsyncExecutionId {
public final class AsyncExecutionId implements Writeable {
public static final String ASYNC_EXECUTION_ID_HEADER = "X-Elasticsearch-Async-Id";
public static final String ASYNC_EXECUTION_IS_RUNNING_HEADER = "X-Elasticsearch-Async-Is-Running";

Expand Down Expand Up @@ -115,4 +117,13 @@ public static AsyncExecutionId decode(String id) {
}
return new AsyncExecutionId(docId, new TaskId(taskId), id);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(getEncoded());
}

public static AsyncExecutionId readFrom(StreamInput input) throws IOException {
return decode(input.readString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth adding an AbstractWireTestCase subclass for this then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.async;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.AbstractWireSerializingTestCase;

import java.io.IOException;

public class AsyncExecutionIdWireTests extends AbstractWireSerializingTestCase<AsyncExecutionId> {
@Override
protected Writeable.Reader<AsyncExecutionId> instanceReader() {
return AsyncExecutionId::readFrom;
}

@Override
protected AsyncExecutionId createTestInstance() {
return new AsyncExecutionId(randomAlphaOfLength(15), new TaskId(randomAlphaOfLength(10), randomLong()));
}

@Override
protected AsyncExecutionId mutateInstance(AsyncExecutionId instance) throws IOException {
return new AsyncExecutionId(
instance.getDocId(),
new TaskId(instance.getTaskId().getNodeId(), instance.getTaskId().getId() * 12345)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -781,19 +781,23 @@ public void testListQueryForbidden() throws Exception {

public void testGetQueryAllowed() throws Exception {
// This is a bit tricky, since there is no such running query. We just make sure it didn't fail on forbidden privileges.
Request request = new Request("GET", "_query/queries/foo:1234");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(404)));
setUser(GET_QUERY_REQUEST, "user_with_monitor_privileges");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST));
assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(403)));
}

public void testGetQueryForbidden() throws Exception {
Request request = new Request("GET", "_query/queries/foo:1234");
setUser(request, "user_without_monitor_privileges");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(request));
setUser(GET_QUERY_REQUEST, "user_without_monitor_privileges");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST));
assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(resp.getMessage(), containsString("this action is granted by the cluster privileges [monitor_esql,monitor,manage,all]"));
}

private static final Request GET_QUERY_REQUEST = new Request(
"GET",
"_query/queries/FmJKWHpFRi1OU0l5SU1YcnpuWWhoUWcZWDFuYUJBeW1TY0dKM3otWUs2bDJudzo1Mg=="
);

private void createEnrichPolicy() throws Exception {
createIndex("songs", Settings.EMPTY, """
"properties":{"song_id": {"type": "keyword"}, "title": {"type": "keyword"}, "artist": {"type": "keyword"} }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
"id" : 5326,
"type" : "transport",
"action" : "indices:data/read/esql",
"status" : {
"request_id" : "Ks5ApyqMTtWj5LrKigmCjQ"
},
"description" : "FROM test | STATS MAX(d) by a, b", <1>
"start_time" : "2023-07-31T15:46:32.328Z",
"start_time_in_millis" : 1690818392328,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase {

protected static final Semaphore scriptPermits = new Semaphore(0);
// Incremented onWait. Can be used to check if the onWait process has been reached.
protected static final Semaphore scriptWaits = new Semaphore(0);

protected int pageSize = -1;

Expand Down Expand Up @@ -98,6 +100,7 @@ public void setupIndex() throws IOException {
public static class PausableFieldPlugin extends AbstractPauseFieldPlugin {
@Override
protected boolean onWait() throws InterruptedException {
scriptWaits.release();
return scriptPermits.tryAcquire(1, TimeUnit.MINUTES);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,21 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.IntOrLongMatcher;
import org.elasticsearch.test.MapMatcher;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.esql.EsqlTestUtils;

import java.util.List;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.core.TimeValue.timeValueSeconds;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.jsonEntityToMap;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;

public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase {
private static final String QUERY = "from test | stats sum(pause_me)";
Expand All @@ -45,31 +42,10 @@ public void testRunningQueries() throws Exception {
try (var initialResponse = sendAsyncQuery()) {
id = initialResponse.asyncExecutionId().get();

assertRunningQueries();
var getResultsRequest = new GetAsyncResultRequest(id);
getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(1));
client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close();
Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries"));
@SuppressWarnings("unchecked")
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(
jsonEntityToMap(listResponse.getEntity()).values()
);
var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet()));
MapMatcher basicMatcher = MapMatcher.matchesMap()
.entry("node", is(taskId.getNodeId()))
.entry("id", IntOrLongMatcher.matches(taskId.getId()))
.entry("query", is(QUERY))
.entry("start_time_millis", IntOrLongMatcher.isIntOrLong())
.entry("running_time_nanos", IntOrLongMatcher.isIntOrLong());
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher);

Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + taskId));
MapMatcher.assertMap(
jsonEntityToMap(getQueryResponse.getEntity()),
basicMatcher.entry("coordinating_node", isA(String.class))
.entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class))))
.entry("documents_found", IntOrLongMatcher.isIntOrLong())
.entry("values_loaded", IntOrLongMatcher.isIntOrLong())
);
} finally {
if (id != null) {
// Finish the query.
Expand All @@ -82,9 +58,44 @@ public void testRunningQueries() throws Exception {
}
}

public void testRunningQueriesSync() throws Exception {
var future = sendSyncQueryAsyncly();
try {
scriptWaits.acquire();
assertRunningQueries();
} finally {
scriptPermits.release(numberOfDocs());
future.actionGet(timeValueSeconds(60)).close();
}
}

private static void assertRunningQueries() throws IOException {
Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries"));
@SuppressWarnings("unchecked")
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(jsonEntityToMap(listResponse.getEntity()).values());
String queryId = EsqlTestUtils.singleValue(listResult.keySet());
MapMatcher basicMatcher = MapMatcher.matchesMap()
.entry("query", is(QUERY))
.entry("start_time_millis", IntOrLongMatcher.isIntOrLong())
.entry("running_time_nanos", IntOrLongMatcher.isIntOrLong());
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher);

Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + queryId));
MapMatcher.assertMap(
jsonEntityToMap(getQueryResponse.getEntity()),
basicMatcher.entry("documents_found", IntOrLongMatcher.isIntOrLong()).entry("values_loaded", IntOrLongMatcher.isIntOrLong())
);
}

private EsqlQueryResponse sendAsyncQuery() {
scriptPermits.drainPermits();
scriptPermits.release(between(1, 5));
return EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()).query(QUERY).execute().actionGet(60, TimeUnit.SECONDS);
}

private ActionFuture<EsqlQueryResponse> sendSyncQueryAsyncly() {
scriptPermits.drainPermits();
scriptPermits.release(between(1, 5));
return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client()).query(QUERY).execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,33 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.LegacyActionRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;

import java.io.IOException;

public class EsqlGetQueryRequest extends LegacyActionRequest {
private final TaskId id;
public class EsqlGetQueryRequest extends ActionRequest {
private final AsyncExecutionId asyncExecutionId;

public EsqlGetQueryRequest(TaskId id) {
this.id = id;
public EsqlGetQueryRequest(AsyncExecutionId asyncExecutionId) {
this.asyncExecutionId = asyncExecutionId;
}

public TaskId id() {
return id;
public AsyncExecutionId id() {
return asyncExecutionId;
}

public EsqlGetQueryRequest(StreamInput streamInput) throws IOException {
super(streamInput);
id = TaskId.readFromStream(streamInput);
asyncExecutionId = AsyncExecutionId.decode(streamInput.readString());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeWriteable(id);
out.writeWriteable(asyncExecutionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.CompositeIndicesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -20,8 +21,10 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
import org.elasticsearch.xpack.esql.Column;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plugin.EsqlQueryStatus;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

import java.io.IOException;
Expand Down Expand Up @@ -242,9 +245,32 @@ public EsqlQueryRequest allowPartialResults(boolean allowPartialResults) {
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
// Pass the query as the description
return new CancellableTask(id, type, action, query, parentTaskId, headers);
public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), new TaskId(localNodeId, id)));
return new EsqlQueryRequestTask(query, id, type, action, parentTaskId, headers, status);
}

private static class EsqlQueryRequestTask extends CancellableTask {
private final Status status;

EsqlQueryRequestTask(
String query,
long id,
String type,
String action,
TaskId parentTaskId,
Map<String, String> headers,
EsqlQueryStatus status
) {
// Pass the query as the description
super(id, type, action, query, parentTaskId, headers);
this.status = status;
}

@Override
public Status getStatus() {
return status;
}
}

// Setter for tests
Expand Down
Loading
Loading