Skip to content

Commit 63e51f6

Browse files
authored
ESQL: Fix async query inconsistent headers (elastic#135078) (elastic#135294)
Fixes elastic#135042 This PR: - Fixes the get-result not always returning the expected headers - Fixes the non-async query incorrectly returning the "is running" async header
1 parent 8a1ca6b commit 63e51f6

File tree

6 files changed

+130
-54
lines changed

6 files changed

+130
-54
lines changed

docs/changelog/135078.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135078
2+
summary: Fix async get results with inconsistent headers
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135042

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 86 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
import org.elasticsearch.client.RequestOptions;
1717
import org.elasticsearch.client.Response;
1818
import org.elasticsearch.client.ResponseException;
19+
import org.elasticsearch.client.RestClient;
1920
import org.elasticsearch.client.WarningsHandler;
2021
import org.elasticsearch.common.bytes.BytesArray;
2122
import org.elasticsearch.common.io.Streams;
2223
import org.elasticsearch.common.settings.Settings;
23-
import org.elasticsearch.common.xcontent.XContentHelper;
2424
import org.elasticsearch.core.CheckedConsumer;
2525
import org.elasticsearch.core.Nullable;
2626
import org.elasticsearch.core.TimeValue;
@@ -40,9 +40,9 @@
4040

4141
import java.io.ByteArrayOutputStream;
4242
import java.io.IOException;
43-
import java.io.InputStream;
4443
import java.io.InputStreamReader;
4544
import java.io.OutputStream;
45+
import java.io.UncheckedIOException;
4646
import java.nio.charset.StandardCharsets;
4747
import java.time.ZoneId;
4848
import java.util.ArrayList;
@@ -53,6 +53,8 @@
5353
import java.util.Locale;
5454
import java.util.Map;
5555
import java.util.Set;
56+
import java.util.concurrent.ConcurrentHashMap;
57+
import java.util.concurrent.ConcurrentMap;
5658
import java.util.function.IntFunction;
5759

5860
import static java.util.Collections.emptySet;
@@ -62,11 +64,11 @@
6264
import static org.elasticsearch.test.MapMatcher.assertMap;
6365
import static org.elasticsearch.test.MapMatcher.matchesMap;
6466
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
65-
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertNotPartial;
6667
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC;
6768
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
6869
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString;
6970
import static org.hamcrest.Matchers.any;
71+
import static org.hamcrest.Matchers.anyOf;
7072
import static org.hamcrest.Matchers.containsString;
7173
import static org.hamcrest.Matchers.either;
7274
import static org.hamcrest.Matchers.emptyOrNullString;
@@ -390,7 +392,9 @@ public void testCSVNoHeaderMode() throws IOException {
390392
options.addHeader("Content-Type", mediaType);
391393
options.addHeader("Accept", "text/csv; header=absent");
392394
request.setOptions(options);
393-
HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
395+
Response response = performRequest(request);
396+
assertWarnings(response, new AssertWarnings.NoWarnings());
397+
HttpEntity entity = response.getEntity();
394398
String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
395399
assertEquals("keyword0,0\r\n", actual);
396400
}
@@ -1252,7 +1256,10 @@ public static Map<String, Object> runEsql(
12521256
var results = mode == ASYNC
12531257
? runEsqlAsync(requestObject, randomBoolean(), assertWarnings)
12541258
: runEsqlSync(requestObject, assertWarnings);
1255-
return checkPartialResults ? assertNotPartial(results) : results;
1259+
if (checkPartialResults) {
1260+
assertNotPartial(results);
1261+
}
1262+
return results;
12561263
}
12571264

12581265
public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode)
@@ -1263,8 +1270,17 @@ public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, As
12631270
public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
12641271
Request request = prepareRequestWithOptions(requestObject, SYNC);
12651272

1266-
HttpEntity entity = performRequest(request, assertWarnings);
1267-
return entityToMap(entity, requestObject.contentType());
1273+
Response response = performRequest(request);
1274+
HttpEntity entity = response.getEntity();
1275+
Map<String, Object> json = entityToMap(entity, requestObject.contentType());
1276+
1277+
var supportsAsyncHeadersFix = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
1278+
if (supportsAsyncHeadersFix) {
1279+
assertNoAsyncHeaders(response);
1280+
}
1281+
assertWarnings(response, assertWarnings);
1282+
1283+
return json;
12681284
}
12691285

12701286
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
@@ -1292,16 +1308,18 @@ public static Map<String, Object> runEsqlAsync(
12921308
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
12931309
String id = (String) json.get("id");
12941310

1295-
var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
1311+
var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
1312+
var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast"));
1313+
1314+
// Check headers on initial query call
1315+
if (supportsAsyncHeaders) {
1316+
assertAsyncHeaders(response, id, (boolean) json.get("is_running"));
1317+
}
12961318

12971319
if (id == null) {
12981320
// no id returned from an async call, must have completed immediately and without keep_on_completion
12991321
assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false)));
13001322
assertThat((boolean) json.get("is_running"), is(false));
1301-
if (supportsAsyncHeaders) {
1302-
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
1303-
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0"));
1304-
}
13051323
assertWarnings(response, assertWarnings);
13061324
json.remove("is_running"); // remove this to not mess up later map assertions
13071325
return Collections.unmodifiableMap(json);
@@ -1322,11 +1340,6 @@ public static Map<String, Object> runEsqlAsync(
13221340
assertThat(json.get("pages"), nullValue());
13231341
}
13241342

1325-
if (supportsAsyncHeaders) {
1326-
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id));
1327-
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0"));
1328-
}
1329-
13301343
// issue a second request to "async get" the results
13311344
Request getRequest = prepareAsyncGetRequest(id);
13321345
getRequest.setOptions(request.getOptions());
@@ -1336,6 +1349,11 @@ public static Map<String, Object> runEsqlAsync(
13361349

13371350
var result = entityToMap(entity, requestObject.contentType());
13381351

1352+
// Check headers on get call
1353+
if (supportsAsyncHeaders) {
1354+
assertAsyncHeaders(response, id, (boolean) result.get("is_running"));
1355+
}
1356+
13391357
// assert initial contents, if any, are the same as async get contents
13401358
if (initialColumns != null) {
13411359
assertEquals(initialColumns, result.get("columns"));
@@ -1347,6 +1365,26 @@ public static Map<String, Object> runEsqlAsync(
13471365
return removeAsyncProperties(result);
13481366
}
13491367

1368+
record CapabilitesCacheKey(RestClient client, List<String> capabilities) {}
1369+
1370+
/**
1371+
* Cache of capabilities.
1372+
*/
1373+
private static final ConcurrentMap<CapabilitesCacheKey, Boolean> capabilities = new ConcurrentHashMap<>();
1374+
1375+
public static boolean hasCapabilities(RestClient client, List<String> requiredCapabilities) {
1376+
if (requiredCapabilities.isEmpty()) {
1377+
return true;
1378+
}
1379+
return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> {
1380+
try {
1381+
return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
1382+
} catch (IOException e) {
1383+
throw new UncheckedIOException(e);
1384+
}
1385+
});
1386+
}
1387+
13501388
public void testAsyncGetWithoutContentType() throws IOException {
13511389
int count = randomIntBetween(0, 100);
13521390
bulkLoadTestData(count);
@@ -1447,15 +1485,11 @@ static Map<String, Object> removeAsyncProperties(Map<String, Object> map) {
14471485
}
14481486

14491487
protected static Map<String, Object> entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException {
1450-
try (InputStream content = entity.getContent()) {
1451-
XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue());
1452-
assertEquals(expectedContentType, xContentType);
1453-
var map = XContentHelper.convertToMap(xContentType.xContent(), content, false);
1454-
if (shouldLog()) {
1455-
LOGGER.info("entity={}", map);
1456-
}
1457-
return map;
1488+
var result = EsqlTestUtils.entityToMap(entity, expectedContentType);
1489+
if (shouldLog()) {
1490+
LOGGER.info("entity={}", result);
14581491
}
1492+
return result;
14591493
}
14601494

14611495
static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException {
@@ -1520,7 +1554,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15201554
}
15211555

15221556
Response response = performRequest(request);
1523-
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1557+
assertWarnings(response, new AssertWarnings.NoWarnings());
1558+
HttpEntity entity = response.getEntity();
15241559

15251560
// get the content, it could be empty because the request might have not completed
15261561
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
@@ -1573,7 +1608,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15731608
// if `addParam` is false, `options` will already have an `Accept` header
15741609
getRequest.setOptions(options);
15751610
response = performRequest(getRequest);
1576-
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1611+
assertWarnings(response, new AssertWarnings.NoWarnings());
1612+
entity = response.getEntity();
15771613
}
15781614
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
15791615

@@ -1587,21 +1623,18 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15871623
}
15881624

15891625
private static Request prepareRequest(Mode mode) {
1590-
Request request = new Request("POST", "/_query" + (mode == ASYNC ? "/async" : ""));
1591-
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
1592-
request.addParameter("pretty", "true"); // Improves error reporting readability
1593-
return request;
1626+
return finishRequest(new Request("POST", "/_query" + (mode == ASYNC ? "/async" : "")));
15941627
}
15951628

15961629
private static Request prepareAsyncGetRequest(String id) {
1597-
Request request = new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=60s");
1598-
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
1599-
request.addParameter("pretty", "true"); // Improves error reporting readability
1600-
return request;
1630+
return finishRequest(new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=6000s"));
16011631
}
16021632

16031633
private static Request prepareAsyncDeleteRequest(String id) {
1604-
Request request = new Request("DELETE", "/_query/async/" + id);
1634+
return finishRequest(new Request("DELETE", "/_query/async/" + id));
1635+
}
1636+
1637+
private static Request finishRequest(Request request) {
16051638
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
16061639
request.addParameter("pretty", "true"); // Improves error reporting readability
16071640
return request;
@@ -1615,10 +1648,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
16151648
return mediaType;
16161649
}
16171650

1618-
private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException {
1619-
return assertWarnings(performRequest(request), assertWarnings);
1620-
}
1621-
16221651
protected static Response performRequest(Request request) throws IOException {
16231652
Response response = client().performRequest(request);
16241653
if (shouldLog()) {
@@ -1629,14 +1658,19 @@ protected static Response performRequest(Request request) throws IOException {
16291658
return response;
16301659
}
16311660

1632-
private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) {
1661+
static void assertNotPartial(Map<String, Object> answer) {
1662+
var clusters = answer.get("_clusters");
1663+
var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "");
1664+
assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false)));
1665+
}
1666+
1667+
private static void assertWarnings(Response response, AssertWarnings assertWarnings) {
16331668
List<String> warnings = new ArrayList<>(response.getWarnings());
16341669
warnings.removeAll(mutedWarnings());
16351670
if (shouldLog()) {
16361671
LOGGER.info("RESPONSE warnings (after muted)={}", warnings);
16371672
}
16381673
assertWarnings.assertWarnings(warnings);
1639-
return response.getEntity();
16401674
}
16411675

16421676
private static Set<String> mutedWarnings() {
@@ -1747,6 +1781,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
17471781
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
17481782
}
17491783

1784+
private static void assertAsyncHeaders(Response response, @Nullable String asyncId, boolean isRunning) {
1785+
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), asyncId == null ? nullValue() : equalTo(asyncId));
1786+
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), isRunning ? is("?1") : is("?0"));
1787+
}
1788+
1789+
private static void assertNoAsyncHeaders(Response response) {
1790+
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
1791+
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), nullValue());
1792+
}
1793+
17501794
public static RequestObjectBuilder requestObjectBuilder() throws IOException {
17511795
return new RequestObjectBuilder();
17521796
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,11 @@ public enum Cap {
497497
*/
498498
ASYNC_QUERY_STATUS_HEADERS,
499499

500+
/**
501+
* Fix async headers not being sent on "get" requests
502+
*/
503+
ASYNC_QUERY_STATUS_HEADERS_FIX,
504+
500505
/**
501506
* Consider the upper bound when computing the interval in BUCKET auto mode.
502507
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public boolean isRunning() {
217217
}
218218

219219
public boolean isAsync() {
220-
return isRunning;
220+
return isAsync;
221221
}
222222

223223
public boolean isPartial() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.tasks.Task;
2424
import org.elasticsearch.threadpool.ThreadPool;
2525
import org.elasticsearch.transport.TransportService;
26+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
2627
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
2728
import org.elasticsearch.xpack.esql.VerificationException;
2829
import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
@@ -35,6 +36,7 @@
3536
public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction<EsqlQueryResponse, EsqlQueryTask> {
3637

3738
private final BlockFactory blockFactory;
39+
private final ThreadPool threadPool;
3840

3941
@Inject
4042
public TransportEsqlAsyncGetResultsAction(
@@ -43,9 +45,9 @@ public TransportEsqlAsyncGetResultsAction(
4345
ClusterService clusterService,
4446
NamedWriteableRegistry registry,
4547
Client client,
46-
ThreadPool threadPool,
4748
BigArrays bigArrays,
48-
BlockFactoryProvider blockFactoryProvider
49+
BlockFactoryProvider blockFactoryProvider,
50+
ThreadPool threadPool
4951
) {
5052
super(
5153
EsqlAsyncGetResultAction.NAME,
@@ -59,11 +61,12 @@ public TransportEsqlAsyncGetResultsAction(
5961
EsqlQueryTask.class
6062
);
6163
this.blockFactory = blockFactoryProvider.blockFactory();
64+
this.threadPool = threadPool;
6265
}
6366

6467
@Override
6568
protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<EsqlQueryResponse> listener) {
66-
super.doExecute(task, request, unwrapListener(listener));
69+
super.doExecute(task, request, unwrapListener(request.getId(), listener));
6770
}
6871

6972
@Override
@@ -75,14 +78,21 @@ public Writeable.Reader<EsqlQueryResponse> responseReader() {
7578
static final String VERIFY_EX_NAME = ElasticsearchException.getExceptionName(new VerificationException(""));
7679

7780
/**
78-
* Unwraps the exception in the case of failure. This keeps the exception types
79-
* the same as the sync API, namely ParsingException and VerificationException.
81+
* Adds async headers, and unwraps the exception in the case of failure.
82+
* <p>
83+
* This keeps the exception types the same as the sync API, namely ParsingException and VerificationException.
84+
* </p>
8085
*/
81-
static <R> ActionListener<R> unwrapListener(ActionListener<R> listener) {
86+
ActionListener<EsqlQueryResponse> unwrapListener(String asyncExecutionId, ActionListener<EsqlQueryResponse> listener) {
8287
return new ActionListener<>() {
8388
@Override
84-
public void onResponse(R o) {
85-
listener.onResponse(o);
89+
public void onResponse(EsqlQueryResponse response) {
90+
boolean isRunning = response.isRunning();
91+
threadPool.getThreadContext()
92+
.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
93+
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
94+
95+
listener.onResponse(response);
8696
}
8797

8898
@Override

0 commit comments

Comments
 (0)