Skip to content

Commit 5e689d8

Browse files
authored
[9.1] ESQL: Fix async query inconsistent headers (#135273)
* ESQL: Fix async query inconsistent headers (#135078) Fixes #135042 This PR: - Fixes the get-result not always returning the expected headers - Fixes the non-async query incorrectly returning the "is running" async header * Revert rest tests file * Added main changes on test class * Backported test for the headers fix * Remove unrelated changes
1 parent 429e219 commit 5e689d8

File tree

6 files changed

+120
-36
lines changed

6 files changed

+120
-36
lines changed

docs/changelog/135078.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135078
2+
summary: Fix async get results with inconsistent headers
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135042

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 76 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.client.RequestOptions;
1717
import org.elasticsearch.client.Response;
1818
import org.elasticsearch.client.ResponseException;
19+
import org.elasticsearch.client.RestClient;
1920
import org.elasticsearch.client.WarningsHandler;
2021
import org.elasticsearch.common.bytes.BytesArray;
2122
import org.elasticsearch.common.io.Streams;
@@ -41,6 +42,7 @@
4142
import java.io.IOException;
4243
import java.io.InputStreamReader;
4344
import java.io.OutputStream;
45+
import java.io.UncheckedIOException;
4446
import java.nio.charset.StandardCharsets;
4547
import java.time.ZoneId;
4648
import java.util.ArrayList;
@@ -51,6 +53,8 @@
5153
import java.util.Locale;
5254
import java.util.Map;
5355
import java.util.Set;
56+
import java.util.concurrent.ConcurrentHashMap;
57+
import java.util.concurrent.ConcurrentMap;
5458
import java.util.function.IntFunction;
5559

5660
import static java.util.Collections.emptySet;
@@ -60,11 +64,11 @@
6064
import static org.elasticsearch.test.MapMatcher.assertMap;
6165
import static org.elasticsearch.test.MapMatcher.matchesMap;
6266
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
63-
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertNotPartial;
6467
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC;
6568
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
6669
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString;
6770
import static org.hamcrest.Matchers.any;
71+
import static org.hamcrest.Matchers.anyOf;
6872
import static org.hamcrest.Matchers.containsString;
6973
import static org.hamcrest.Matchers.either;
7074
import static org.hamcrest.Matchers.emptyOrNullString;
@@ -396,7 +400,9 @@ public void testCSVNoHeaderMode() throws IOException {
396400
options.addHeader("Content-Type", mediaType);
397401
options.addHeader("Accept", "text/csv; header=absent");
398402
request.setOptions(options);
399-
HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
403+
Response response = performRequest(request);
404+
assertWarnings(response, new AssertWarnings.NoWarnings());
405+
HttpEntity entity = response.getEntity();
400406
String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
401407
assertEquals("keyword0,0\r\n", actual);
402408
}
@@ -1258,7 +1264,10 @@ public static Map<String, Object> runEsql(
12581264
var results = mode == ASYNC
12591265
? runEsqlAsync(requestObject, randomBoolean(), assertWarnings)
12601266
: runEsqlSync(requestObject, assertWarnings);
1261-
return checkPartialResults ? assertNotPartial(results) : results;
1267+
if (checkPartialResults) {
1268+
assertNotPartial(results);
1269+
}
1270+
return results;
12621271
}
12631272

12641273
public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode)
@@ -1269,8 +1278,17 @@ public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, As
12691278
public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
12701279
Request request = prepareRequestWithOptions(requestObject, SYNC);
12711280

1272-
HttpEntity entity = performRequest(request, assertWarnings);
1273-
return entityToMap(entity, requestObject.contentType());
1281+
Response response = performRequest(request);
1282+
HttpEntity entity = response.getEntity();
1283+
Map<String, Object> json = entityToMap(entity, requestObject.contentType());
1284+
1285+
var supportsAsyncHeadersFix = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
1286+
if (supportsAsyncHeadersFix) {
1287+
assertNoAsyncHeaders(response);
1288+
}
1289+
assertWarnings(response, assertWarnings);
1290+
1291+
return json;
12741292
}
12751293

12761294
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
@@ -1298,17 +1316,18 @@ public static Map<String, Object> runEsqlAsync(
12981316
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
12991317
String id = (String) json.get("id");
13001318

1301-
var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
1302-
var supportsSuggestedCast = clusterHasCapability("POST", "/_query", List.of(), List.of("suggested_cast")).orElse(false);
1319+
var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
1320+
var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast"));
1321+
1322+
// Check headers on initial query call
1323+
if (supportsAsyncHeaders) {
1324+
assertAsyncHeaders(response, id, (boolean) json.get("is_running"));
1325+
}
13031326

13041327
if (id == null) {
13051328
// no id returned from an async call, must have completed immediately and without keep_on_completion
13061329
assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false)));
13071330
assertThat((boolean) json.get("is_running"), is(false));
1308-
if (supportsAsyncHeaders) {
1309-
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
1310-
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0"));
1311-
}
13121331
assertWarnings(response, assertWarnings);
13131332
json.remove("is_running"); // remove this to not mess up later map assertions
13141333
return Collections.unmodifiableMap(json);
@@ -1329,11 +1348,6 @@ public static Map<String, Object> runEsqlAsync(
13291348
assertThat(json.get("pages"), nullValue());
13301349
}
13311350

1332-
if (supportsAsyncHeaders) {
1333-
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id));
1334-
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0"));
1335-
}
1336-
13371351
// issue a second request to "async get" the results
13381352
Request getRequest = prepareAsyncGetRequest(id);
13391353
getRequest.setOptions(request.getOptions());
@@ -1343,6 +1357,11 @@ public static Map<String, Object> runEsqlAsync(
13431357

13441358
var result = entityToMap(entity, requestObject.contentType());
13451359

1360+
// Check headers on get call
1361+
if (supportsAsyncHeaders) {
1362+
assertAsyncHeaders(response, id, (boolean) result.get("is_running"));
1363+
}
1364+
13461365
// assert initial contents, if any, are the same as async get contents
13471366
if (initialColumns != null) {
13481367
if (supportsSuggestedCast == false) {
@@ -1361,6 +1380,26 @@ public static Map<String, Object> runEsqlAsync(
13611380
return removeAsyncProperties(result);
13621381
}
13631382

1383+
record CapabilitesCacheKey(RestClient client, List<String> capabilities) {}
1384+
1385+
/**
1386+
* Cache of capabilities.
1387+
*/
1388+
private static final ConcurrentMap<CapabilitesCacheKey, Boolean> capabilities = new ConcurrentHashMap<>();
1389+
1390+
public static boolean hasCapabilities(RestClient client, List<String> requiredCapabilities) {
1391+
if (requiredCapabilities.isEmpty()) {
1392+
return true;
1393+
}
1394+
return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> {
1395+
try {
1396+
return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
1397+
} catch (IOException e) {
1398+
throw new UncheckedIOException(e);
1399+
}
1400+
});
1401+
}
1402+
13641403
private static Object removeOriginalTypesAndSuggestedCast(Object response) {
13651404
if (response instanceof ArrayList<?> columns) {
13661405
var newColumns = new ArrayList<>();
@@ -1589,7 +1628,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15891628
}
15901629

15911630
Response response = performRequest(request);
1592-
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1631+
assertWarnings(response, new AssertWarnings.NoWarnings());
1632+
HttpEntity entity = response.getEntity();
15931633

15941634
// get the content, it could be empty because the request might have not completed
15951635
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
@@ -1642,7 +1682,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
16421682
// if `addParam` is false, `options` will already have an `Accept` header
16431683
getRequest.setOptions(options);
16441684
response = performRequest(getRequest);
1645-
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1685+
assertWarnings(response, new AssertWarnings.NoWarnings());
1686+
entity = response.getEntity();
16461687
}
16471688
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
16481689

@@ -1681,10 +1722,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
16811722
return mediaType;
16821723
}
16831724

1684-
private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException {
1685-
return assertWarnings(performRequest(request), assertWarnings);
1686-
}
1687-
16881725
protected static Response performRequest(Request request) throws IOException {
16891726
Response response = client().performRequest(request);
16901727
if (shouldLog()) {
@@ -1695,14 +1732,19 @@ protected static Response performRequest(Request request) throws IOException {
16951732
return response;
16961733
}
16971734

1698-
private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) {
1735+
static void assertNotPartial(Map<String, Object> answer) {
1736+
var clusters = answer.get("_clusters");
1737+
var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "");
1738+
assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false)));
1739+
}
1740+
1741+
private static void assertWarnings(Response response, AssertWarnings assertWarnings) {
16991742
List<String> warnings = new ArrayList<>(response.getWarnings());
17001743
warnings.removeAll(mutedWarnings());
17011744
if (shouldLog()) {
17021745
LOGGER.info("RESPONSE warnings (after muted)={}", warnings);
17031746
}
17041747
assertWarnings.assertWarnings(warnings);
1705-
return response.getEntity();
17061748
}
17071749

17081750
private static Set<String> mutedWarnings() {
@@ -1813,6 +1855,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
18131855
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
18141856
}
18151857

1858+
private static void assertAsyncHeaders(Response response, @Nullable String asyncId, boolean isRunning) {
1859+
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), asyncId == null ? nullValue() : equalTo(asyncId));
1860+
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), isRunning ? is("?1") : is("?0"));
1861+
}
1862+
1863+
private static void assertNoAsyncHeaders(Response response) {
1864+
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
1865+
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), nullValue());
1866+
}
1867+
18161868
public static RequestObjectBuilder requestObjectBuilder() throws IOException {
18171869
return new RequestObjectBuilder();
18181870
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,11 @@ public enum Cap {
628628
*/
629629
ASYNC_QUERY_STATUS_HEADERS,
630630

631+
/**
632+
* Fix async headers not being sent on "get" requests
633+
*/
634+
ASYNC_QUERY_STATUS_HEADERS_FIX,
635+
631636
/**
632637
* Consider the upper bound when computing the interval in BUCKET auto mode.
633638
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public boolean isRunning() {
222222
}
223223

224224
public boolean isAsync() {
225-
return isRunning;
225+
return isAsync;
226226
}
227227

228228
public boolean isPartial() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.tasks.Task;
2424
import org.elasticsearch.threadpool.ThreadPool;
2525
import org.elasticsearch.transport.TransportService;
26+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
2627
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
2728
import org.elasticsearch.xpack.esql.VerificationException;
2829
import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
@@ -35,6 +36,7 @@
3536
public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction<EsqlQueryResponse, EsqlQueryTask> {
3637

3738
private final BlockFactory blockFactory;
39+
private final ThreadPool threadPool;
3840

3941
@Inject
4042
public TransportEsqlAsyncGetResultsAction(
@@ -43,9 +45,9 @@ public TransportEsqlAsyncGetResultsAction(
4345
ClusterService clusterService,
4446
NamedWriteableRegistry registry,
4547
Client client,
46-
ThreadPool threadPool,
4748
BigArrays bigArrays,
48-
BlockFactoryProvider blockFactoryProvider
49+
BlockFactoryProvider blockFactoryProvider,
50+
ThreadPool threadPool
4951
) {
5052
super(
5153
EsqlAsyncGetResultAction.NAME,
@@ -59,11 +61,12 @@ public TransportEsqlAsyncGetResultsAction(
5961
EsqlQueryTask.class
6062
);
6163
this.blockFactory = blockFactoryProvider.blockFactory();
64+
this.threadPool = threadPool;
6265
}
6366

6467
@Override
6568
protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<EsqlQueryResponse> listener) {
66-
super.doExecute(task, request, unwrapListener(listener));
69+
super.doExecute(task, request, unwrapListener(request.getId(), listener));
6770
}
6871

6972
@Override
@@ -75,14 +78,21 @@ public Writeable.Reader<EsqlQueryResponse> responseReader() {
7578
static final String VERIFY_EX_NAME = ElasticsearchException.getExceptionName(new VerificationException(""));
7679

7780
/**
78-
* Unwraps the exception in the case of failure. This keeps the exception types
79-
* the same as the sync API, namely ParsingException and VerificationException.
81+
* Adds async headers, and unwraps the exception in the case of failure.
82+
* <p>
83+
* This keeps the exception types the same as the sync API, namely ParsingException and VerificationException.
84+
* </p>
8085
*/
81-
static <R> ActionListener<R> unwrapListener(ActionListener<R> listener) {
86+
ActionListener<EsqlQueryResponse> unwrapListener(String asyncExecutionId, ActionListener<EsqlQueryResponse> listener) {
8287
return new ActionListener<>() {
8388
@Override
84-
public void onResponse(R o) {
85-
listener.onResponse(o);
89+
public void onResponse(EsqlQueryResponse response) {
90+
boolean isRunning = response.isRunning();
91+
threadPool.getThreadContext()
92+
.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
93+
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
94+
95+
listener.onResponse(response);
8696
}
8797

8898
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,20 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
252252
ActionListener.wrap(result -> {
253253
recordCCSTelemetry(task, executionInfo, request, null);
254254
planExecutor.metrics().recordTook(executionInfo.overallTook().millis());
255-
listener.onResponse(toResponse(task, request, configuration, result));
255+
var response = toResponse(task, request, configuration, result);
256+
assert response.isAsync() == request.async() : "The response must be async if the request was async";
257+
258+
if (response.isAsync()) {
259+
if (response.asyncExecutionId().isPresent()) {
260+
String asyncExecutionId = response.asyncExecutionId().get();
261+
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
262+
}
263+
boolean isRunning = response.isRunning();
264+
threadPool.getThreadContext()
265+
.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
266+
}
267+
268+
listener.onResponse(response);
256269
}, ex -> {
257270
recordCCSTelemetry(task, executionInfo, request, ex);
258271
listener.onFailure(ex);
@@ -338,10 +351,8 @@ private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Config
338351
EsqlQueryResponse.Profile profile = configuration.profile()
339352
? new EsqlQueryResponse.Profile(result.completionInfo().driverProfiles(), result.completionInfo().planProfiles())
340353
: null;
341-
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
342354
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
343355
String asyncExecutionId = asyncTask.getExecutionId().getEncoded();
344-
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
345356
return new EsqlQueryResponse(
346357
columns,
347358
result.pages(),

0 commit comments

Comments
 (0)