Skip to content

Commit 000c996

Browse files
authored
[8.18] ESQL: Fix async query inconsistent headers (#135078) (#135293)
* ESQL: Fix async query inconsistent headers (#135078) Fixes #135042 This PR: - Fixes the get-result not always returning the expected headers - Fixes the non-async query incorrectly returning the "is running" async header * Restore some wrongly backported tests
1 parent b8bd94d commit 000c996

File tree

6 files changed

+150
-44
lines changed

6 files changed

+150
-44
lines changed

docs/changelog/135078.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135078
2+
summary: Fix async get results with inconsistent headers
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135042

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 106 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.client.RequestOptions;
1717
import org.elasticsearch.client.Response;
1818
import org.elasticsearch.client.ResponseException;
19+
import org.elasticsearch.client.RestClient;
1920
import org.elasticsearch.client.WarningsHandler;
2021
import org.elasticsearch.common.bytes.BytesArray;
2122
import org.elasticsearch.common.io.Streams;
@@ -43,6 +44,7 @@
4344
import java.io.InputStream;
4445
import java.io.InputStreamReader;
4546
import java.io.OutputStream;
47+
import java.io.UncheckedIOException;
4648
import java.nio.charset.StandardCharsets;
4749
import java.time.ZoneId;
4850
import java.util.ArrayList;
@@ -53,6 +55,8 @@
5355
import java.util.Locale;
5456
import java.util.Map;
5557
import java.util.Set;
58+
import java.util.concurrent.ConcurrentHashMap;
59+
import java.util.concurrent.ConcurrentMap;
5660
import java.util.function.IntFunction;
5761

5862
import static java.util.Collections.emptySet;
@@ -65,6 +69,7 @@
6569
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
6670
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString;
6771
import static org.hamcrest.Matchers.any;
72+
import static org.hamcrest.Matchers.anyOf;
6873
import static org.hamcrest.Matchers.containsString;
6974
import static org.hamcrest.Matchers.either;
7075
import static org.hamcrest.Matchers.emptyOrNullString;
@@ -371,7 +376,9 @@ public void testCSVNoHeaderMode() throws IOException {
371376
options.addHeader("Content-Type", mediaType);
372377
options.addHeader("Accept", "text/csv; header=absent");
373378
request.setOptions(options);
374-
HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
379+
Response response = performRequest(request);
380+
assertWarnings(response, new AssertWarnings.NoWarnings());
381+
HttpEntity entity = response.getEntity();
375382
String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
376383
assertEquals("keyword0,0\r\n", actual);
377384
}
@@ -1053,8 +1060,17 @@ static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWar
10531060
public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
10541061
Request request = prepareRequestWithOptions(requestObject, SYNC);
10551062

1056-
HttpEntity entity = performRequest(request, assertWarnings);
1057-
return entityToMap(entity, requestObject.contentType());
1063+
Response response = performRequest(request);
1064+
HttpEntity entity = response.getEntity();
1065+
Map<String, Object> json = entityToMap(entity, requestObject.contentType());
1066+
1067+
var supportsAsyncHeadersFix = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
1068+
if (supportsAsyncHeadersFix) {
1069+
assertNoAsyncHeaders(response);
1070+
}
1071+
assertWarnings(response, assertWarnings);
1072+
1073+
return json;
10581074
}
10591075

10601076
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
@@ -1082,16 +1098,18 @@ public static Map<String, Object> runEsqlAsync(
10821098
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
10831099
String id = (String) json.get("id");
10841100

1085-
var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
1101+
var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers_fix"));
1102+
var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast"));
1103+
1104+
// Check headers on initial query call
1105+
if (supportsAsyncHeaders) {
1106+
assertAsyncHeaders(response, id, (boolean) json.get("is_running"));
1107+
}
10861108

10871109
if (id == null) {
10881110
// no id returned from an async call, must have completed immediately and without keep_on_completion
10891111
assertThat(requestObject.keepOnCompletion(), either(nullValue()).or(is(false)));
10901112
assertThat((boolean) json.get("is_running"), is(false));
1091-
if (supportsAsyncHeaders) {
1092-
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
1093-
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is("?0"));
1094-
}
10951113
assertWarnings(response, assertWarnings);
10961114
json.remove("is_running"); // remove this to not mess up later map assertions
10971115
return Collections.unmodifiableMap(json);
@@ -1112,11 +1130,6 @@ public static Map<String, Object> runEsqlAsync(
11121130
assertThat(json.get("pages"), nullValue());
11131131
}
11141132

1115-
if (supportsAsyncHeaders) {
1116-
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), is(id));
1117-
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), is(isRunning ? "?1" : "?0"));
1118-
}
1119-
11201133
// issue a second request to "async get" the results
11211134
Request getRequest = prepareAsyncGetRequest(id);
11221135
getRequest.setOptions(request.getOptions());
@@ -1126,9 +1139,21 @@ public static Map<String, Object> runEsqlAsync(
11261139

11271140
var result = entityToMap(entity, requestObject.contentType());
11281141

1142+
// Check headers on get call
1143+
if (supportsAsyncHeaders) {
1144+
assertAsyncHeaders(response, id, (boolean) result.get("is_running"));
1145+
}
1146+
11291147
// assert initial contents, if any, are the same as async get contents
11301148
if (initialColumns != null) {
1131-
assertEquals(initialColumns, result.get("columns"));
1149+
if (supportsSuggestedCast == false) {
1150+
assertEquals(
1151+
removeOriginalTypesAndSuggestedCast(initialColumns),
1152+
removeOriginalTypesAndSuggestedCast(result.get("columns"))
1153+
);
1154+
} else {
1155+
assertEquals(initialColumns, result.get("columns"));
1156+
}
11321157
assertEquals(initialValues, result.get("values"));
11331158
}
11341159

@@ -1137,6 +1162,45 @@ public static Map<String, Object> runEsqlAsync(
11371162
return removeAsyncProperties(result);
11381163
}
11391164

1165+
record CapabilitesCacheKey(RestClient client, List<String> capabilities) {}
1166+
1167+
/**
1168+
* Cache of capabilities.
1169+
*/
1170+
private static final ConcurrentMap<CapabilitesCacheKey, Boolean> capabilities = new ConcurrentHashMap<>();
1171+
1172+
public static boolean hasCapabilities(RestClient client, List<String> requiredCapabilities) {
1173+
if (requiredCapabilities.isEmpty()) {
1174+
return true;
1175+
}
1176+
return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> {
1177+
try {
1178+
return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
1179+
} catch (IOException e) {
1180+
throw new UncheckedIOException(e);
1181+
}
1182+
});
1183+
}
1184+
1185+
private static Object removeOriginalTypesAndSuggestedCast(Object response) {
1186+
if (response instanceof ArrayList<?> columns) {
1187+
var newColumns = new ArrayList<>();
1188+
for (var column : columns) {
1189+
if (column instanceof Map<?, ?> columnMap) {
1190+
var newMap = new HashMap<>(columnMap);
1191+
newMap.remove("original_types");
1192+
newMap.remove("suggested_cast");
1193+
newColumns.add(newMap);
1194+
} else {
1195+
newColumns.add(column);
1196+
}
1197+
}
1198+
return newColumns;
1199+
} else {
1200+
return response;
1201+
}
1202+
}
1203+
11401204
public void testAsyncGetWithoutContentType() throws IOException {
11411205
int count = randomIntBetween(0, 100);
11421206
bulkLoadTestData(count);
@@ -1278,7 +1342,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
12781342
}
12791343

12801344
Response response = performRequest(request);
1281-
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1345+
assertWarnings(response, new AssertWarnings.NoWarnings());
1346+
HttpEntity entity = response.getEntity();
12821347

12831348
// get the content, it could be empty because the request might have not completed
12841349
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
@@ -1331,7 +1396,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
13311396
// if `addParam` is false, `options` will already have an `Accept` header
13321397
getRequest.setOptions(options);
13331398
response = performRequest(getRequest);
1334-
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1399+
assertWarnings(response, new AssertWarnings.NoWarnings());
1400+
entity = response.getEntity();
13351401
}
13361402
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
13371403

@@ -1345,21 +1411,18 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
13451411
}
13461412

13471413
private static Request prepareRequest(Mode mode) {
1348-
Request request = new Request("POST", "/_query" + (mode == ASYNC ? "/async" : ""));
1349-
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
1350-
request.addParameter("pretty", "true"); // Improves error reporting readability
1351-
return request;
1414+
return finishRequest(new Request("POST", "/_query" + (mode == ASYNC ? "/async" : "")));
13521415
}
13531416

13541417
private static Request prepareAsyncGetRequest(String id) {
1355-
Request request = new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=60s");
1356-
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
1357-
request.addParameter("pretty", "true"); // Improves error reporting readability
1358-
return request;
1418+
return finishRequest(new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=6000s"));
13591419
}
13601420

13611421
private static Request prepareAsyncDeleteRequest(String id) {
1362-
Request request = new Request("DELETE", "/_query/async/" + id);
1422+
return finishRequest(new Request("DELETE", "/_query/async/" + id));
1423+
}
1424+
1425+
private static Request finishRequest(Request request) {
13631426
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
13641427
request.addParameter("pretty", "true"); // Improves error reporting readability
13651428
return request;
@@ -1373,11 +1436,7 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
13731436
return mediaType;
13741437
}
13751438

1376-
private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException {
1377-
return assertWarnings(performRequest(request), assertWarnings);
1378-
}
1379-
1380-
private static Response performRequest(Request request) throws IOException {
1439+
protected static Response performRequest(Request request) throws IOException {
13811440
Response response = client().performRequest(request);
13821441
if (shouldLog()) {
13831442
LOGGER.info("RESPONSE={}", response);
@@ -1387,14 +1446,19 @@ private static Response performRequest(Request request) throws IOException {
13871446
return response;
13881447
}
13891448

1390-
private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) {
1449+
static void assertNotPartial(Map<String, Object> answer) {
1450+
var clusters = answer.get("_clusters");
1451+
var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "");
1452+
assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false)));
1453+
}
1454+
1455+
private static void assertWarnings(Response response, AssertWarnings assertWarnings) {
13911456
List<String> warnings = new ArrayList<>(response.getWarnings());
13921457
warnings.removeAll(mutedWarnings());
13931458
if (shouldLog()) {
13941459
LOGGER.info("RESPONSE warnings (after muted)={}", warnings);
13951460
}
13961461
assertWarnings.assertWarnings(warnings);
1397-
return response.getEntity();
13981462
}
13991463

14001464
private static Set<String> mutedWarnings() {
@@ -1505,6 +1569,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
15051569
assertEquals(200, client().performRequest(request).getStatusLine().getStatusCode());
15061570
}
15071571

1572+
private static void assertAsyncHeaders(Response response, @Nullable String asyncId, boolean isRunning) {
1573+
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), asyncId == null ? nullValue() : equalTo(asyncId));
1574+
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), isRunning ? is("?1") : is("?0"));
1575+
}
1576+
1577+
private static void assertNoAsyncHeaders(Response response) {
1578+
assertThat(response.getHeader("X-Elasticsearch-Async-Id"), nullValue());
1579+
assertThat(response.getHeader("X-Elasticsearch-Async-Is-Running"), nullValue());
1580+
}
1581+
15081582
public static RequestObjectBuilder requestObjectBuilder() throws IOException {
15091583
return new RequestObjectBuilder();
15101584
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,11 @@ public enum Cap {
444444
*/
445445
ASYNC_QUERY_STATUS_HEADERS,
446446

447+
/**
448+
* Fix async headers not being sent on "get" requests
449+
*/
450+
ASYNC_QUERY_STATUS_HEADERS_FIX,
451+
447452
/**
448453
* Consider the upper bound when computing the interval in BUCKET auto mode.
449454
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public boolean isRunning() {
177177
}
178178

179179
public boolean isAsync() {
180-
return isRunning;
180+
return isAsync;
181181
}
182182

183183
public EsqlExecutionInfo getExecutionInfo() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.tasks.Task;
2323
import org.elasticsearch.threadpool.ThreadPool;
2424
import org.elasticsearch.transport.TransportService;
25+
import org.elasticsearch.xpack.core.async.AsyncExecutionId;
2526
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
2627
import org.elasticsearch.xpack.esql.VerificationException;
2728
import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
@@ -34,6 +35,7 @@
3435
public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction<EsqlQueryResponse, EsqlQueryTask> {
3536

3637
private final BlockFactory blockFactory;
38+
private final ThreadPool threadPool;
3739

3840
@Inject
3941
public TransportEsqlAsyncGetResultsAction(
@@ -42,9 +44,9 @@ public TransportEsqlAsyncGetResultsAction(
4244
ClusterService clusterService,
4345
NamedWriteableRegistry registry,
4446
Client client,
45-
ThreadPool threadPool,
4647
BigArrays bigArrays,
47-
BlockFactory blockFactory
48+
BlockFactory blockFactory,
49+
ThreadPool threadPool
4850
) {
4951
super(
5052
EsqlAsyncGetResultAction.NAME,
@@ -58,11 +60,12 @@ public TransportEsqlAsyncGetResultsAction(
5860
EsqlQueryTask.class
5961
);
6062
this.blockFactory = blockFactory;
63+
this.threadPool = threadPool;
6164
}
6265

6366
@Override
6467
protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<EsqlQueryResponse> listener) {
65-
super.doExecute(task, request, unwrapListener(listener));
68+
super.doExecute(task, request, unwrapListener(request.getId(), listener));
6669
}
6770

6871
@Override
@@ -74,14 +77,21 @@ public Writeable.Reader<EsqlQueryResponse> responseReader() {
7477
static final String VERIFY_EX_NAME = ElasticsearchException.getExceptionName(new VerificationException(""));
7578

7679
/**
77-
* Unwraps the exception in the case of failure. This keeps the exception types
78-
* the same as the sync API, namely ParsingException and VerificationException.
80+
* Adds async headers, and unwraps the exception in the case of failure.
81+
* <p>
82+
* This keeps the exception types the same as the sync API, namely ParsingException and VerificationException.
83+
* </p>
7984
*/
80-
static <R> ActionListener<R> unwrapListener(ActionListener<R> listener) {
85+
ActionListener<EsqlQueryResponse> unwrapListener(String asyncExecutionId, ActionListener<EsqlQueryResponse> listener) {
8186
return new ActionListener<>() {
8287
@Override
83-
public void onResponse(R o) {
84-
listener.onResponse(o);
88+
public void onResponse(EsqlQueryResponse response) {
89+
boolean isRunning = response.isRunning();
90+
threadPool.getThreadContext()
91+
.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
92+
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
93+
94+
listener.onResponse(response);
8595
}
8696

8797
@Override

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,20 @@ private void innerExecute(Task task, EsqlQueryRequest request, ActionListener<Es
227227
queryBuilderResolver,
228228
ActionListener.wrap(result -> {
229229
recordCCSTelemetry(task, executionInfo, request, null);
230-
listener.onResponse(toResponse(task, request, configuration, result));
230+
var response = toResponse(task, request, configuration, result);
231+
assert response.isAsync() == request.async() : "The response must be async if the request was async";
232+
233+
if (response.isAsync()) {
234+
if (response.asyncExecutionId().isPresent()) {
235+
String asyncExecutionId = response.asyncExecutionId().get();
236+
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
237+
}
238+
boolean isRunning = response.isRunning();
239+
threadPool.getThreadContext()
240+
.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, isRunning ? "?1" : "?0");
241+
}
242+
243+
listener.onResponse(response);
231244
}, ex -> {
232245
recordCCSTelemetry(task, executionInfo, request, ex);
233246
listener.onFailure(ex);
@@ -301,10 +314,8 @@ private EsqlExecutionInfo createEsqlExecutionInfo(EsqlQueryRequest request) {
301314
private EsqlQueryResponse toResponse(Task task, EsqlQueryRequest request, Configuration configuration, Result result) {
302315
List<ColumnInfoImpl> columns = result.schema().stream().map(c -> new ColumnInfoImpl(c.name(), c.dataType().outputType())).toList();
303316
EsqlQueryResponse.Profile profile = configuration.profile() ? new EsqlQueryResponse.Profile(result.profiles()) : null;
304-
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, "?0");
305317
if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
306318
String asyncExecutionId = asyncTask.getExecutionId().getEncoded();
307-
threadPool.getThreadContext().addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, asyncExecutionId);
308319
return new EsqlQueryResponse(
309320
columns,
310321
result.pages(),

0 commit comments

Comments
 (0)