Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
462a8e5
Async request in txt mode returns the async parameters as headers
kanoshiou Jul 19, 2024
33e8b1f
Add changelog
kanoshiou Jul 19, 2024
f0285ea
Typo
kanoshiou Jul 30, 2024
f77b893
Change listener of `/_query/async/{id}` to `EsqlResponseListener` to …
kanoshiou Jul 30, 2024
e81487a
Change HTTP header names access modifiers to private
kanoshiou Jul 31, 2024
c27716f
Revert "Change HTTP header names access modifiers to private"
kanoshiou Jul 31, 2024
9a010a5
Response media type defaults to JSON if there is no specific media ty…
kanoshiou Jul 31, 2024
12ef64d
Add tests for async query in text mode
kanoshiou Jul 31, 2024
b774b61
Get requests do not need checkNonNullMediaType
kanoshiou Jul 31, 2024
c3c1396
Update doc
kanoshiou Jul 31, 2024
e94872a
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Aug 1, 2024
3f48c01
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Sep 5, 2024
d6bd2e2
Merge remote-tracking branch 'origin/main' into async-parameters-retu…
kanoshiou Oct 10, 2024
cc9a40b
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Nov 28, 2024
6f86392
small changes
kanoshiou Nov 28, 2024
e8484c8
Refactor `runEsqlAsync` to support text format
kanoshiou Nov 28, 2024
106b114
Refactor `runEsqlSync` to support text format
kanoshiou Nov 28, 2024
76d3be7
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Nov 28, 2024
7f9cece
Overload `runEsql` to support text formats
kanoshiou Nov 29, 2024
c01d53b
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Dec 1, 2024
e0786a7
Replace async headers
kanoshiou Dec 1, 2024
3a3912c
Revert
kanoshiou Dec 1, 2024
4f505fe
Add a private constructor for `EsqlResponseListener`
kanoshiou Dec 1, 2024
ae03113
Check non-null media type
kanoshiou Dec 1, 2024
7fc2bbb
Refactor `runEsqlAsTextWithFormat` to support async mode
kanoshiou Dec 1, 2024
cb50b12
Remove static map `TEXT_FORMATS`
kanoshiou Dec 1, 2024
3bf781d
Update 111104.yaml
kanoshiou Dec 2, 2024
b519248
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Dec 2, 2024
eb9e49d
Update 111104.yaml
kanoshiou Dec 2, 2024
5ed973f
Update 111104.yaml
kanoshiou Dec 2, 2024
2904aa2
Update
kanoshiou Dec 4, 2024
b0da5b5
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea Dec 4, 2024
444ed89
Allow an ASYNC GET with no media type spec
bpintea Dec 5, 2024
389dd8a
Merge branch 'main' into fork/kanoshiou/async-parameters-returns-as-h…
bpintea Dec 5, 2024
5b02528
Testcases for request with no Content-type header
kanoshiou Dec 6, 2024
201a45d
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea Dec 7, 2024
0fb5625
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/111104.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111104
summary: "ESQL: Enable async get to support formatting"
area: ES|QL
type: feature
issues:
- 110926
4 changes: 4 additions & 0 deletions docs/reference/esql/esql-async-query-get-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ parameter is `true`.
[[esql-async-query-get-api-query-params]]
==== {api-query-parms-title}

The API accepts the same parameters as the synchronous
<<esql-query-api-query-params,query API>>, along with the following
parameters:

`wait_for_completion_timeout`::
(Optional, <<time-units,time value>>)
Timeout duration to wait for the request to finish. Defaults to no timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private ActionListener<Response> wrapStoringListener(
ActionListener<Response> listener
) {
AtomicReference<ActionListener<Response>> exclusiveListener = new AtomicReference<>(listener);
// This is will performed in case of timeout
// This will be performed in case of timeout
Scheduler.ScheduledCancellable timeoutHandler = threadPool.schedule(() -> {
ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
if (acquiredListener != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,21 +350,21 @@ public void testTextMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null));
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null, mode));
}

public void testCSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|'));
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|', mode));
}

public void testTSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null));
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null, mode));
}

public void testCSVNoHeaderMode() throws IOException {
Expand Down Expand Up @@ -1003,53 +1003,35 @@ public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject
}

public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject) throws IOException {
return runEsqlAsync(requestObject, new AssertWarnings.NoWarnings());
return runEsqlAsync(requestObject, randomBoolean(), new AssertWarnings.NoWarnings());
}

static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) throws IOException {
if (mode == ASYNC) {
return runEsqlAsync(requestObject, assertWarnings);
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
} else {
return runEsqlSync(requestObject, assertWarnings);
}
}

public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
requestObject.build();
Request request = prepareRequest(SYNC);
String mediaType = attachBody(requestObject, request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
options.addHeader("Accept", mediaType);
} else {
request.addParameter("format", requestObject.contentType().queryParameter());
}
request.setOptions(options);
Request request = prepareRequestWithOptions(requestObject, SYNC);

HttpEntity entity = performRequest(request, assertWarnings);
return entityToMap(entity, requestObject.contentType());
}

public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
addAsyncParameters(requestObject);
requestObject.build();
Request request = prepareRequest(ASYNC);
String mediaType = attachBody(requestObject, request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
options.addHeader("Content-Type", mediaType);
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
}

if (randomBoolean()) {
options.addHeader("Accept", mediaType);
} else {
request.addParameter("format", requestObject.contentType().queryParameter());
}
request.setOptions(options);
public static Map<String, Object> runEsqlAsync(
RequestObjectBuilder requestObject,
boolean keepOnCompletion,
AssertWarnings assertWarnings
) throws IOException {
addAsyncParameters(requestObject, keepOnCompletion);
Request request = prepareRequestWithOptions(requestObject, ASYNC);

if (shouldLog()) {
LOGGER.info("REQUEST={}", request);
Expand All @@ -1061,7 +1043,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
Object initialColumns = null;
Object initialValues = null;
var json = entityToMap(entity, requestObject.contentType());
checkKeepOnCompletion(requestObject, json);
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
String id = (String) json.get("id");

var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
Expand Down Expand Up @@ -1101,7 +1083,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec

// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
getRequest.setOptions(options);
getRequest.setOptions(request.getOptions());
response = performRequest(getRequest);
entity = response.getEntity();
}
Expand All @@ -1119,6 +1101,66 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
return removeAsyncProperties(result);
}

public void testAsyncGetWithoutContentType() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var requestObject = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");

addAsyncParameters(requestObject, true);
Request request = prepareRequestWithOptions(requestObject, ASYNC);

if (shouldLog()) {
LOGGER.info("REQUEST={}", request);
}

Response response = performRequest(request);
HttpEntity entity = response.getEntity();

var json = entityToMap(entity, requestObject.contentType());
checkKeepOnCompletion(requestObject, json, true);
String id = (String) json.get("id");
// results won't be returned since keepOnCompletion is true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if they're not returned, it's because the waitForCompletion() is provided a very small interval (see addAsyncParameters()), so ES won't have the time to query and respond with results in time.

assertThat(id, is(not(emptyOrNullString())));

// issue an "async get" request with no Content-Type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Request getRequest = prepareAsyncGetRequest(id);
response = performRequest(getRequest);
entity = response.getEntity();
var result = entityToMap(entity, XContentType.JSON);

ListMatcher values = matchesList();
for (int i = 0; i < count; i++) {
values = values.item(matchesList().item("keyword" + i).item(i));
}
assertMap(
result,
matchesMap().entry(
"columns",
matchesList().item(matchesMap().entry("name", "keyword").entry("type", "keyword"))
.item(matchesMap().entry("name", "integer").entry("type", "integer"))
).entry("values", values).entry("took", greaterThanOrEqualTo(0)).entry("id", id).entry("is_running", false)
);

}

static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

requestObject.build();
Request request = prepareRequest(mode);
String mediaType = attachBody(requestObject, request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
options.addHeader("Accept", mediaType);
} else {
request.addParameter("format", requestObject.contentType().queryParameter());
}
request.setOptions(options);
return request;
}

// Removes async properties, otherwise consuming assertions would need to handle sync and async differences
static Map<String, Object> removeAsyncProperties(Map<String, Object> map) {
Map<String, Object> copy = new HashMap<>(map);
Expand All @@ -1139,17 +1181,20 @@ protected static Map<String, Object> entityToMap(HttpEntity entity, XContentType
}
}

static void addAsyncParameters(RequestObjectBuilder requestObject) throws IOException {
static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException {
// deliberately short in order to frequently trigger return without results
requestObject.waitForCompletion(TimeValue.timeValueNanos(randomIntBetween(1, 100)));
requestObject.keepOnCompletion(randomBoolean());
requestObject.keepOnCompletion(keepOnCompletion);
requestObject.keepAlive(TimeValue.timeValueDays(randomIntBetween(1, 10)));
}

// If keep_on_completion is set then an id must always be present, regardless of the value of any other property.
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json) {
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json, boolean keepOnCompletion) {
if (requestObject.keepOnCompletion()) {
assertTrue(keepOnCompletion);
assertThat((String) json.get("id"), not(emptyOrNullString()));
} else {
assertFalse(keepOnCompletion);
}
}

Expand All @@ -1167,14 +1212,19 @@ static void deleteNonExistent(Request request) throws IOException {
assertEquals(404, response.getStatusLine().getStatusCode());
}

static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter) throws IOException {
Request request = prepareRequest(SYNC);
static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter, Mode mode)
throws IOException {
Request request = prepareRequest(mode);
if (mode == ASYNC) {
addAsyncParameters(builder, randomBoolean());
}
String mediaType = attachBody(builder.build(), request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
boolean addParam = randomBoolean();
if (addParam) {
request.addParameter("format", format);
} else {
switch (format) {
Expand All @@ -1188,8 +1238,75 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
}
request.setOptions(options);

HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
return Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
if (shouldLog()) {
LOGGER.info("REQUEST={}", request);
}

Response response = performRequest(request);
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());

// get the content, it could be empty because the request might have not completed
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
String id = response.getHeader("X-Elasticsearch-Async-Id");

if (mode == SYNC) {
assertThat(id, is(emptyOrNullString()));
return initialValue;
}

if (id == null) {
// no id returned from an async call, must have completed immediately and without keep_on_completion
assertThat(builder.keepOnCompletion(), either(nullValue()).or(is(false)));
assertNull(response.getHeader("is_running"));
// the content cant be empty
assertThat(initialValue, not(emptyOrNullString()));
return initialValue;
} else {
// async may not return results immediately, so may need an async get
assertThat(id, is(not(emptyOrNullString())));
String isRunning = response.getHeader("X-Elasticsearch-Async-Is-Running");
if ("?0".equals(isRunning)) {
// must have completed immediately so keep_on_completion must be true
assertThat(builder.keepOnCompletion(), is(true));
} else {
// did not return results immediately, so we will need an async get
// Also, different format modes return different results.
switch (format) {
case "txt" -> assertThat(initialValue, emptyOrNullString());
case "csv" -> {
assertEquals(initialValue, "\r\n");
initialValue = "";
}
case "tsv" -> {
assertEquals(initialValue, "\n");
initialValue = "";
}
}
}
// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
if (delimiter != null) {
getRequest.addParameter("delimiter", String.valueOf(delimiter));
}
// If the `format` parameter is not added, the GET request will return a response
// with the `Content-Type` type due to the lack of an `Accept` header.
if (addParam) {
getRequest.addParameter("format", format);
}
// if `addParam` is false, `options` will already have an `Accept` header
getRequest.setOptions(options);
response = performRequest(getRequest);
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
}
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));

// assert initial contents, if any, are the same as async get contents
if (initialValue != null && initialValue.isEmpty() == false) {
assertEquals(initialValue, newValue);
}

assertDeletable(id);
return newValue;
}

private static Request prepareRequest(Mode mode) {
Expand Down
Loading