Skip to content

Commit 8e4f8f3

Browse files
authored
ESQL: Fix async query inconsistent headers (elastic#135078)
Fixes elastic#135042 This PR: - Fixes the get-result not always returning the expected headers - Fixes the non-async query incorrectly returning the "is running" async header
1 parent 26937a7 commit 8e4f8f3

File tree

6 files changed

+70
-23
lines changed

6 files changed

+70
-23
lines changed

docs/changelog/135078.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135078
2+
summary: Fix async get results with inconsistent headers
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135042

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1411,6 +1411,10 @@ public static Map<String, Object> runEsqlSync(
14111411
profileLogger.extractProfile(json, profileEnabled);
14121412
}
14131413

1414+
var supportsAsyncHeadersFix = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
1415+
if (supportsAsyncHeadersFix) {
1416+
assertNoAsyncHeaders(response);
1417+
}
14141418
assertWarnings(response, assertWarnings);
14151419

14161420
return json;
@@ -1448,17 +1452,18 @@ public static Map<String, Object> runEsqlAsync(
14481452
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
14491453
String id = (String) json.get("id");
14501454

1451-
var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers"));
1455+
var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
14521456
var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast"));
14531457

1458+
// Check headers on initial query call
1459+
if (supportsAsyncHeaders) {
1460+
assertAsyncHeaders(response, id, (boolean) json.get("is_running"));
1461+
}
1462+
14541463
if (id == null) {
14551464
// no id returned from an async call, must have completed immediately and without keep_on_completion
14561465
assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false)));
14571466
assertThat((boolean) json.get("is_running"), is(false));
1458-
if (supportsAsyncHeaders) {
1459-
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
1460-
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0"));
1461-
}
14621467
if (profileLogger != null) {
14631468
profileLogger.extractProfile(json, profileEnabled);
14641469
}
@@ -1485,11 +1490,6 @@ public static Map<String, Object> runEsqlAsync(
14851490
assertThat(json.get("pages"), nullValue());
14861491
}
14871492

1488-
if (supportsAsyncHeaders) {
1489-
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id));
1490-
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0"));
1491-
}
1492-
14931493
// issue a second request to "async get" the results
14941494
Request getRequest = prepareAsyncGetRequest(id);
14951495
getRequest.setOptions(request.getOptions());
@@ -1499,6 +1499,11 @@ public static Map<String, Object> runEsqlAsync(
14991499

15001500
var result = entityToMap(entity, requestObject.contentType());
15011501

1502+
// Check headers on get call
1503+
if (supportsAsyncHeaders) {
1504+
assertAsyncHeaders(response, id, (boolean) result.get("is_running"));
1505+
}
1506+
15021507
// assert initial contents, if any, are the same as async get contents
15031508
if (initialColumns != null) {
15041509
if (supportsSuggestedCast == false) {
@@ -2005,6 +2010,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
20052010
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
20062011
}
20072012

2013+
private static void assertAsyncHeaders(Response response, @Nullable String asyncId, boolean isRunning) {
2014+
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), asyncId == null ? nullValue() : equalTo(asyncId));
2015+
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), isRunning ? is("?1") : is("?0"));
2016+
}
2017+
2018+
private static void assertNoAsyncHeaders(Response response) {
2019+
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
2020+
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), nullValue());
2021+
}
2022+
20082023
public static RequestObjectBuilder requestObjectBuilder() throws IOException {
20092024
return new RequestObjectBuilder();
20102025
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,11 @@ public enum Cap {
669669
*/
670670
ASYNC_QUERY_STATUS_HEADERS,
671671

672+
/**
673+
* Fix async headers not being sent on "get" requests
674+
*/
675+
ASYNC_QUERY_STATUS_HEADERS_FIX,
676+
672677
/**
673678
* Consider the upper bound when computing the interval in BUCKET auto mode.
674679
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public static Writeable.Reader<EsqlQueryResponse> reader(BlockFactory blockFacto
111111
}
112112

113113
static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException {
114-
String asyncExecutionId = asyncExecutionId = in.readOptionalString();
114+
String asyncExecutionId = in.readOptionalString();
115115
boolean isRunning = in.readBoolean();
116116
boolean isAsync = in.readBoolean();
117117
List<ColumnInfoImpl> columns = in.readCollectionAsList(ColumnInfoImpl::new);
@@ -208,7 +208,7 @@ public boolean isRunning() {
208208
}
209209

210210
public boolean isAsync() {
211-
return isRunning;
211+
return isAsync;
212212
}
213213

214214
public boolean isPartial() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.tasks.Task;
2424
import org.elasticsearch.threadpool.ThreadPool;
2525
import org.elasticsearch.transport.TransportService;
26+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
2627
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
2728
import org.elasticsearch.xpack.esql.VerificationException;
2829
import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
@@ -35,6 +36,7 @@
3536
public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction<EsqlQueryResponse, EsqlQueryTask> {
3637

3738
private final BlockFactory blockFactory;
39+
private final ThreadPool threadPool;
3840

3941
@Inject
4042
public TransportEsqlAsyncGetResultsAction(
@@ -43,9 +45,9 @@ public TransportEsqlAsyncGetResultsAction(
4345
ClusterService clusterService,
4446
NamedWriteableRegistry registry,
4547
Client client,
46-
ThreadPool threadPool,
4748
BigArrays bigArrays,
48-
BlockFactoryProvider blockFactoryProvider
49+
BlockFactoryProvider blockFactoryProvider,
50+
ThreadPool threadPool
4951
) {
5052
super(
5153
EsqlAsyncGetResultAction.NAME,
@@ -59,11 +61,12 @@ public TransportEsqlAsyncGetResultsAction(
5961
EsqlQueryTask.class
6062
);
6163
this.blockFactory = blockFactoryProvider.blockFactory();
64+
this.threadPool = threadPool;
6265
}
6366

6467
@Override
6568
protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<EsqlQueryResponse> listener) {
66-
super.doExecute(task, request, unwrapListener(listener));
69+
super.doExecute(task, request, unwrapListener(request.getId(), listener));
6770
}
6871

6972
@Override
@@ -75,14 +78,21 @@ public Writeable.Reader<EsqlQueryResponse> responseReader() {
7578
static final String VERIFY_EX_NAME = ElasticsearchException.getExceptionName(new VerificationException(""));
7679

7780
/**
78-
* Unwraps the exception in the case of failure. This keeps the exception types
79-
* the same as the sync API, namely ParsingException and VerificationException.
81+
* Adds async headers, and unwraps the exception in the case of failure.
82+
* <p>
83+
* This keeps the exception types the same as the sync API, namely ParsingException and VerificationException.
84+
* </p>
8085
*/
81-
static <R> ActionListener<R> unwrapListener(ActionListener<R> listener) {
86+
ActionListener<EsqlQueryResponse> unwrapListener(String asyncExecutionId, ActionListener<EsqlQueryResponse> listener) {
8287
return new ActionListener<>() {
8388
@Override
84-
public void onResponse(R o) {
85-
listener.onResponse(o);
89+
public void onResponse(EsqlQueryResponse response) {
90+
boolean isRunning = response.isRunning();
91+
threadPool.getThreadContext()
92+
.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
93+
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
94+
95+
listener.onResponse(response);
8696
}
8797

8898
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,20 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
282282
ActionListener.wrap(result -> {
283283
recordCCSTelemetry(task, executionInfo, request, null);
284284
planExecutor.metrics().recordTook(executionInfo.overallTook().millis());
285-
listener.onResponse(toResponse(task, request, configuration, result));
285+
var response = toResponse(task, request, configuration, result);
286+
assert response.isAsync() == request.async() : "The response must be async if the request was async";
287+
288+
if (response.isAsync()) {
289+
if (response.asyncExecutionId().isPresent()) {
290+
String asyncExecutionId = response.asyncExecutionId().get();
291+
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
292+
}
293+
boolean isRunning = response.isRunning();
294+
threadPool.getThreadContext()
295+
.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
296+
}
297+
298+
listener.onResponse(response);
286299
}, ex -> {
287300
recordCCSTelemetry(task, executionInfo, request, ex);
288301
listener.onFailure(ex);
@@ -380,10 +393,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config
380393
EsqlQueryResponse.Profile profile = configuration.profile()
381394
? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles())
382395
: null;
383-
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
384396
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
385397
String asyncExecutionId = asyncTask.getExecutionId().getEncoded();
386-
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
387398
return new EsqlQueryResponse(
388399
columns,
389400
result.pages(),

0 commit comments

Comments
 (0)