Skip to content

Commit 510ca5d

Browse files
bpinteakanoshiou
andauthored
ESQL: Enable async get to support formatting (#111104) (#118257)
I've updated the listener for GET /_query/async/{id} to EsqlResponseListener, so it now accepts parameters (delimiter, drop_null_columns and format) like the POST /_query API. Additionally, I have added tests to verify the correctness of the code. You can now set the format in the request parameters to specify the return style. Closes #110926 Co-authored-by: kanoshiou <[email protected]>
1 parent 02fd597 commit 510ca5d

File tree

8 files changed

+236
-74
lines changed

8 files changed

+236
-74
lines changed

docs/changelog/111104.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 111104
2+
summary: "ESQL: Enable async get to support formatting"
3+
area: ES|QL
4+
type: feature
5+
issues:
6+
- 110926

docs/reference/esql/esql-async-query-get-api.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ parameter is `true`.
3939
[[esql-async-query-get-api-query-params]]
4040
==== {api-query-parms-title}
4141

42+
The API accepts the same parameters as the synchronous
43+
<<esql-query-api-query-params,query API>>, along with the following
44+
parameters:
45+
4246
`wait_for_completion_timeout`::
4347
(Optional, <<time-units,time value>>)
4448
Timeout duration to wait for the request to finish. Defaults to no timeout,

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ private ActionListener<Response> wrapStoringListener(
208208
ActionListener<Response> listener
209209
) {
210210
AtomicReference<ActionListener<Response>> exclusiveListener = new AtomicReference<>(listener);
211-
// This is will performed in case of timeout
211+
// This will be performed in case of timeout
212212
Scheduler.ScheduledCancellable timeoutHandler = threadPool.schedule(() -> {
213213
ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
214214
if (acquiredListener != null) {

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 160 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -350,21 +350,21 @@ public void testTextMode() throws IOException {
350350
int count = randomIntBetween(0, 100);
351351
bulkLoadTestData(count);
352352
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
353-
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null));
353+
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null, mode));
354354
}
355355

356356
public void testCSVMode() throws IOException {
357357
int count = randomIntBetween(0, 100);
358358
bulkLoadTestData(count);
359359
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
360-
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|'));
360+
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|', mode));
361361
}
362362

363363
public void testTSVMode() throws IOException {
364364
int count = randomIntBetween(0, 100);
365365
bulkLoadTestData(count);
366366
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
367-
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null));
367+
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null, mode));
368368
}
369369

370370
public void testCSVNoHeaderMode() throws IOException {
@@ -1004,53 +1004,35 @@ public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject
10041004
}
10051005

10061006
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject) throws IOException {
1007-
return runEsqlAsync(requestObject, new AssertWarnings.NoWarnings());
1007+
return runEsqlAsync(requestObject, randomBoolean(), new AssertWarnings.NoWarnings());
10081008
}
10091009

10101010
static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) throws IOException {
10111011
if (mode == ASYNC) {
1012-
return runEsqlAsync(requestObject, assertWarnings);
1012+
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
10131013
} else {
10141014
return runEsqlSync(requestObject, assertWarnings);
10151015
}
10161016
}
10171017

10181018
public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
1019-
requestObject.build();
1020-
Request request = prepareRequest(SYNC);
1021-
String mediaType = attachBody(requestObject, request);
1022-
1023-
RequestOptions.Builder options = request.getOptions().toBuilder();
1024-
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
1025-
options.addHeader("Content-Type", mediaType);
1026-
1027-
if (randomBoolean()) {
1028-
options.addHeader("Accept", mediaType);
1029-
} else {
1030-
request.addParameter("format", requestObject.contentType().queryParameter());
1031-
}
1032-
request.setOptions(options);
1019+
Request request = prepareRequestWithOptions(requestObject, SYNC);
10331020

10341021
HttpEntity entity = performRequest(request, assertWarnings);
10351022
return entityToMap(entity, requestObject.contentType());
10361023
}
10371024

10381025
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
1039-
addAsyncParameters(requestObject);
1040-
requestObject.build();
1041-
Request request = prepareRequest(ASYNC);
1042-
String mediaType = attachBody(requestObject, request);
1043-
1044-
RequestOptions.Builder options = request.getOptions().toBuilder();
1045-
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
1046-
options.addHeader("Content-Type", mediaType);
1026+
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
1027+
}
10471028

1048-
if (randomBoolean()) {
1049-
options.addHeader("Accept", mediaType);
1050-
} else {
1051-
request.addParameter("format", requestObject.contentType().queryParameter());
1052-
}
1053-
request.setOptions(options);
1029+
public static Map<String, Object> runEsqlAsync(
1030+
RequestObjectBuilder requestObject,
1031+
boolean keepOnCompletion,
1032+
AssertWarnings assertWarnings
1033+
) throws IOException {
1034+
addAsyncParameters(requestObject, keepOnCompletion);
1035+
Request request = prepareRequestWithOptions(requestObject, ASYNC);
10541036

10551037
if (shouldLog()) {
10561038
LOGGER.info("REQUEST={}", request);
@@ -1062,7 +1044,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
10621044
Object initialColumns = null;
10631045
Object initialValues = null;
10641046
var json = entityToMap(entity, requestObject.contentType());
1065-
checkKeepOnCompletion(requestObject, json);
1047+
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
10661048
String id = (String) json.get("id");
10671049

10681050
var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
@@ -1102,7 +1084,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
11021084

11031085
// issue a second request to "async get" the results
11041086
Request getRequest = prepareAsyncGetRequest(id);
1105-
getRequest.setOptions(options);
1087+
getRequest.setOptions(request.getOptions());
11061088
response = performRequest(getRequest);
11071089
entity = response.getEntity();
11081090
}
@@ -1120,6 +1102,66 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
11201102
return removeAsyncProperties(result);
11211103
}
11221104

1105+
public void testAsyncGetWithoutContentType() throws IOException {
1106+
int count = randomIntBetween(0, 100);
1107+
bulkLoadTestData(count);
1108+
var requestObject = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
1109+
1110+
addAsyncParameters(requestObject, true);
1111+
Request request = prepareRequestWithOptions(requestObject, ASYNC);
1112+
1113+
if (shouldLog()) {
1114+
LOGGER.info("REQUEST={}", request);
1115+
}
1116+
1117+
Response response = performRequest(request);
1118+
HttpEntity entity = response.getEntity();
1119+
1120+
var json = entityToMap(entity, requestObject.contentType());
1121+
checkKeepOnCompletion(requestObject, json, true);
1122+
String id = (String) json.get("id");
1123+
// results won't be returned since keepOnCompletion is true
1124+
assertThat(id, is(not(emptyOrNullString())));
1125+
1126+
// issue an "async get" request with no Content-Type
1127+
Request getRequest = prepareAsyncGetRequest(id);
1128+
response = performRequest(getRequest);
1129+
entity = response.getEntity();
1130+
var result = entityToMap(entity, XContentType.JSON);
1131+
1132+
ListMatcher values = matchesList();
1133+
for (int i = 0; i < count; i++) {
1134+
values = values.item(matchesList().item("keyword" + i).item(i));
1135+
}
1136+
assertMap(
1137+
result,
1138+
matchesMap().entry(
1139+
"columns",
1140+
matchesList().item(matchesMap().entry("name", "keyword").entry("type", "keyword"))
1141+
.item(matchesMap().entry("name", "integer").entry("type", "integer"))
1142+
).entry("values", values).entry("took", greaterThanOrEqualTo(0)).entry("id", id).entry("is_running", false)
1143+
);
1144+
1145+
}
1146+
1147+
static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException {
1148+
requestObject.build();
1149+
Request request = prepareRequest(mode);
1150+
String mediaType = attachBody(requestObject, request);
1151+
1152+
RequestOptions.Builder options = request.getOptions().toBuilder();
1153+
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
1154+
options.addHeader("Content-Type", mediaType);
1155+
1156+
if (randomBoolean()) {
1157+
options.addHeader("Accept", mediaType);
1158+
} else {
1159+
request.addParameter("format", requestObject.contentType().queryParameter());
1160+
}
1161+
request.setOptions(options);
1162+
return request;
1163+
}
1164+
11231165
// Removes async properties, otherwise consuming assertions would need to handle sync and async differences
11241166
static Map<String, Object> removeAsyncProperties(Map<String, Object> map) {
11251167
Map<String, Object> copy = new HashMap<>(map);
@@ -1140,17 +1182,20 @@ protected static Map<String, Object> entityToMap(HttpEntity entity, XContentType
11401182
}
11411183
}
11421184

1143-
static void addAsyncParameters(RequestObjectBuilder requestObject) throws IOException {
1185+
static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException {
11441186
// deliberately short in order to frequently trigger return without results
11451187
requestObject.waitForCompletion(TimeValue.timeValueNanos(randomIntBetween(1, 100)));
1146-
requestObject.keepOnCompletion(randomBoolean());
1188+
requestObject.keepOnCompletion(keepOnCompletion);
11471189
requestObject.keepAlive(TimeValue.timeValueDays(randomIntBetween(1, 10)));
11481190
}
11491191

11501192
// If keep_on_completion is set then an id must always be present, regardless of the value of any other property.
1151-
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json) {
1193+
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json, boolean keepOnCompletion) {
11521194
if (requestObject.keepOnCompletion()) {
1195+
assertTrue(keepOnCompletion);
11531196
assertThat((String) json.get("id"), not(emptyOrNullString()));
1197+
} else {
1198+
assertFalse(keepOnCompletion);
11541199
}
11551200
}
11561201

@@ -1168,14 +1213,19 @@ static void deleteNonExistent(Request request) throws IOException {
11681213
assertEquals(404, response.getStatusLine().getStatusCode());
11691214
}
11701215

1171-
static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter) throws IOException {
1172-
Request request = prepareRequest(SYNC);
1216+
static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter, Mode mode)
1217+
throws IOException {
1218+
Request request = prepareRequest(mode);
1219+
if (mode == ASYNC) {
1220+
addAsyncParameters(builder, randomBoolean());
1221+
}
11731222
String mediaType = attachBody(builder.build(), request);
11741223

11751224
RequestOptions.Builder options = request.getOptions().toBuilder();
11761225
options.addHeader("Content-Type", mediaType);
11771226

1178-
if (randomBoolean()) {
1227+
boolean addParam = randomBoolean();
1228+
if (addParam) {
11791229
request.addParameter("format", format);
11801230
} else {
11811231
switch (format) {
@@ -1189,8 +1239,75 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
11891239
}
11901240
request.setOptions(options);
11911241

1192-
HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
1193-
return Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
1242+
if (shouldLog()) {
1243+
LOGGER.info("REQUEST={}", request);
1244+
}
1245+
1246+
Response response = performRequest(request);
1247+
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1248+
1249+
// get the content, it could be empty because the request might have not completed
1250+
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
1251+
String id = response.getHeader("X-Elasticsearch-Async-Id");
1252+
1253+
if (mode == SYNC) {
1254+
assertThat(id, is(emptyOrNullString()));
1255+
return initialValue;
1256+
}
1257+
1258+
if (id == null) {
1259+
// no id returned from an async call, must have completed immediately and without keep_on_completion
1260+
assertThat(builder.keepOnCompletion(), either(nullValue()).or(is(false)));
1261+
assertNull(response.getHeader("is_running"));
1262+
// the content cant be empty
1263+
assertThat(initialValue, not(emptyOrNullString()));
1264+
return initialValue;
1265+
} else {
1266+
// async may not return results immediately, so may need an async get
1267+
assertThat(id, is(not(emptyOrNullString())));
1268+
String isRunning = response.getHeader("X-Elasticsearch-Async-Is-Running");
1269+
if ("?0".equals(isRunning)) {
1270+
// must have completed immediately so keep_on_completion must be true
1271+
assertThat(builder.keepOnCompletion(), is(true));
1272+
} else {
1273+
// did not return results immediately, so we will need an async get
1274+
// Also, different format modes return different results.
1275+
switch (format) {
1276+
case "txt" -> assertThat(initialValue, emptyOrNullString());
1277+
case "csv" -> {
1278+
assertEquals(initialValue, "\r\n");
1279+
initialValue = "";
1280+
}
1281+
case "tsv" -> {
1282+
assertEquals(initialValue, "\n");
1283+
initialValue = "";
1284+
}
1285+
}
1286+
}
1287+
// issue a second request to "async get" the results
1288+
Request getRequest = prepareAsyncGetRequest(id);
1289+
if (delimiter != null) {
1290+
getRequest.addParameter("delimiter", String.valueOf(delimiter));
1291+
}
1292+
// If the `format` parameter is not added, the GET request will return a response
1293+
// with the `Content-Type` type due to the lack of an `Accept` header.
1294+
if (addParam) {
1295+
getRequest.addParameter("format", format);
1296+
}
1297+
// if `addParam` is false, `options` will already have an `Accept` header
1298+
getRequest.setOptions(options);
1299+
response = performRequest(getRequest);
1300+
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1301+
}
1302+
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
1303+
1304+
// assert initial contents, if any, are the same as async get contents
1305+
if (initialValue != null && initialValue.isEmpty() == false) {
1306+
assertEquals(initialValue, newValue);
1307+
}
1308+
1309+
assertDeletable(id);
1310+
return newValue;
11941311
}
11951312

11961313
private static Request prepareRequest(Mode mode) {

0 commit comments

Comments
 (0)