Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
462a8e5
Async request in txt mode returns the async parameters as headers
kanoshiou Jul 19, 2024
33e8b1f
Add changelog
kanoshiou Jul 19, 2024
f0285ea
Typo
kanoshiou Jul 30, 2024
f77b893
Change listener of `/_query/async/{id}` to `EsqlResponseListener` to …
kanoshiou Jul 30, 2024
e81487a
Change HTTP header names access modifiers to private
kanoshiou Jul 31, 2024
c27716f
Revert "Change HTTP header names access modifiers to private"
kanoshiou Jul 31, 2024
9a010a5
Response media type defaults to JSON if there is no specific media ty…
kanoshiou Jul 31, 2024
12ef64d
Add tests for async query in text mode
kanoshiou Jul 31, 2024
b774b61
Get requests do not need checkNonNullMediaType
kanoshiou Jul 31, 2024
c3c1396
Update doc
kanoshiou Jul 31, 2024
e94872a
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Aug 1, 2024
3f48c01
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Sep 5, 2024
d6bd2e2
Merge remote-tracking branch 'origin/main' into async-parameters-retu…
kanoshiou Oct 10, 2024
cc9a40b
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Nov 28, 2024
6f86392
small changes
kanoshiou Nov 28, 2024
e8484c8
Refactor `runEsqlAsync` to support text format
kanoshiou Nov 28, 2024
106b114
Refactor `runEsqlSync` to support text format
kanoshiou Nov 28, 2024
76d3be7
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Nov 28, 2024
7f9cece
Overload `runEsql` to support text formats
kanoshiou Nov 29, 2024
c01d53b
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Dec 1, 2024
e0786a7
Replace async headers
kanoshiou Dec 1, 2024
3a3912c
Revert
kanoshiou Dec 1, 2024
4f505fe
Add a private constructor for `EsqlResponseListener`
kanoshiou Dec 1, 2024
ae03113
Check non-null media type
kanoshiou Dec 1, 2024
7fc2bbb
Refactor `runEsqlAsTextWithFormat` to support async mode
kanoshiou Dec 1, 2024
cb50b12
Remove static map `TEXT_FORMATS`
kanoshiou Dec 1, 2024
3bf781d
Update 111104.yaml
kanoshiou Dec 2, 2024
b519248
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Dec 2, 2024
eb9e49d
Update 111104.yaml
kanoshiou Dec 2, 2024
5ed973f
Update 111104.yaml
kanoshiou Dec 2, 2024
2904aa2
Update
kanoshiou Dec 4, 2024
b0da5b5
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea Dec 4, 2024
444ed89
Allow an ASYNC GET with no media type spec
bpintea Dec 5, 2024
389dd8a
Merge branch 'main' into fork/kanoshiou/async-parameters-returns-as-h…
bpintea Dec 5, 2024
5b02528
Testcases for request with no Content-type header
kanoshiou Dec 6, 2024
201a45d
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea Dec 7, 2024
0fb5625
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/111104.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111104
summary: Async request in txt mode returns the async parameters as headers
area: Analysis
type: bug
issues:
- 110926
4 changes: 4 additions & 0 deletions docs/reference/esql/esql-async-query-get-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ parameter is `true`.
[[esql-async-query-get-api-query-params]]
==== {api-query-parms-title}

The API accepts the same parameters as the synchronous
<<esql-query-api-query-params,query API>>, along with the following
parameters:

`wait_for_completion_timeout`::
(Optional, <<time-units,time value>>)
Timeout duration to wait for the request to finish. Defaults to no timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private GetResult innerGetFetch(
Mapper fieldMapper = mappingLookup.getMapper(field);
if (fieldMapper == null) {
if (mappingLookup.objectMappers().get(field) != null) {
// Only fail if we know it is a object field, missing paths / fields shouldn't fail.
// Only fail if we know it is an object field, missing paths / fields shouldn't fail.
throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private ActionListener<Response> wrapStoringListener(
ActionListener<Response> listener
) {
AtomicReference<ActionListener<Response>> exclusiveListener = new AtomicReference<>(listener);
// This is will performed in case of timeout
// This will be performed in case of timeout
Scheduler.ScheduledCancellable timeoutHandler = threadPool.schedule(() -> {
ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
if (acquiredListener != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.IntFunction;
import java.util.regex.Pattern;
Expand All @@ -60,6 +61,8 @@
import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.action.EsqlResponseListener.HEADER_NAME_ASYNC_ID;
import static org.elasticsearch.xpack.esql.action.EsqlResponseListener.HEADER_NAME_ASYNC_RUNNING;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC;
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString;
Expand Down Expand Up @@ -342,21 +345,33 @@ public void testTextMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null));
if (mode == ASYNC) {
assertEquals(expectedTextBody("txt", count, null), runEsqlAsyncAsTextWithFormat(builder, "txt", null));
} else {
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null));
}
}

public void testCSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|'));
if (mode == ASYNC) {
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsyncAsTextWithFormat(builder, "csv", '|'));
} else {
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|'));
}
}

public void testTSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null));
if (mode == ASYNC) {
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsyncAsTextWithFormat(builder, "tsv", null));
} else {
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null));
}
}

public void testCSVNoHeaderMode() throws IOException {
Expand Down Expand Up @@ -1078,6 +1093,95 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
return Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
}

static String runEsqlAsyncAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter)
throws IOException {
Request request = prepareRequest(ASYNC);
addAsyncParameters(builder);

String mediaType = attachBody(builder.build(), request);
RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Content-Type", mediaType);

boolean addParam = randomBoolean();
if (addParam) {
request.addParameter("format", format);
} else {
switch (format) {
case "txt" -> options.addHeader("Accept", "text/plain");
case "csv" -> options.addHeader("Accept", "text/csv");
case "tsv" -> options.addHeader("Accept", "text/tab-separated-values");
}
}
if (delimiter != null) {
request.addParameter("delimiter", String.valueOf(delimiter));
}
request.setOptions(options);
Response response = performRequest(request);
HttpEntity entity = response.getEntity();

// get the content, it could be empty because the request might have not completed
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));

String id = response.getHeader(HEADER_NAME_ASYNC_ID);
if (builder.keepOnCompletion()) {
assertThat(id, not(emptyOrNullString()));
}

if (id == null) {
// no id returned from an async call, must have completed immediately and without keep_on_completion
assertThat(builder.keepOnCompletion(), either(nullValue()).or(is(false)));
assertNull(response.getHeader("is_running"));
// the content cant be empty
assertThat(initialValue, not(emptyOrNullString()));
return initialValue;
} else {
// async may not return results immediately, so may need an async get
assertThat(id, is(not(emptyOrNullString())));
String isRunning = response.getHeader(HEADER_NAME_ASYNC_RUNNING);
if (Objects.equals(isRunning, "false")) {
// must have completed immediately so keep_on_completion must be true
assertThat(builder.keepOnCompletion(), is(true));
} else {
// did not return results immediately, so we will need an async get
// Also, different format modes return different results.
switch (format) {
case "txt" -> assertThat(initialValue, emptyOrNullString());
case "csv" -> {
assertEquals(initialValue, "\r\n");
initialValue = "";
}
case "tsv" -> {
assertEquals(initialValue, "\n");
initialValue = "";
}
}
}
// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
if (delimiter != null) {
getRequest.addParameter("delimiter", String.valueOf(delimiter));
}
// If the `format` parameter is not added, the GET request will return a response
// with the `Content-Type` type due to the lack of an `Accept` header.
if (addParam) {
getRequest.addParameter("format", format);
}
// if `addParam` is false, `options` will already have an `Accept` header
getRequest.setOptions(options);
response = performRequest(getRequest);
entity = response.getEntity();
}
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));

// assert initial contents, if any, are the same as async get contents
if (initialValue != null && initialValue.isEmpty() == false) {
assertEquals(initialValue, newValue);
}

assertDeletable(id);
return newValue;
}

private static Request prepareRequest(Mode mode) {
Request request = new Request("POST", "/_query" + (mode == ASYNC ? "/async" : ""));
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,14 @@ public TimeValue stop() {
}

private static final Logger LOGGER = LogManager.getLogger(EsqlResponseListener.class);
private static final String HEADER_NAME_TOOK_NANOS = "Took-nanos";

/**
* HTTP header names
*/
public static final String HEADER_NAME_TOOK_NANOS = "Took-nanos";
public static final String HEADER_NAME_ASYNC_ID = "Async-ID";
public static final String HEADER_NAME_ASYNC_RUNNING = "Async-running";

private final RestChannel channel;
private final RestRequest restRequest;
private final MediaType mediaType;
Expand All @@ -103,24 +110,23 @@ public EsqlResponseListener(RestChannel channel, RestRequest restRequest, EsqlQu
this.channel = channel;
this.restRequest = restRequest;
this.esqlQuery = esqlRequest.query();
mediaType = EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest);
this.mediaType = EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest);

/*
* Special handling for the "delimiter" parameter which should only be
* checked for being present or not in the case of CSV format. We cannot
* override {@link BaseRestHandler#responseParams()} because this
* parameter should only be checked for CSV, not other formats.
*/
if (mediaType != CSV && restRequest.hasParam(URL_PARAM_DELIMITER)) {
String message = String.format(
Locale.ROOT,
"parameter: [%s] can only be used with the format [%s] for request [%s]",
URL_PARAM_DELIMITER,
CSV.queryParameter(),
restRequest.path()
);
throw new IllegalArgumentException(message);
}
checkDelimiter();
}

/**
* Async query get API does not have an esqlRequest and mediaType defaults to JSON if no "format" in params
*/
public EsqlResponseListener(RestChannel channel, RestRequest restRequest) {
super(channel);

this.channel = channel;
this.restRequest = restRequest;
this.esqlQuery = null;
this.mediaType = EsqlMediaTypeParser.getResponseMediaType(restRequest);

checkDelimiter();
}

@Override
Expand All @@ -139,6 +145,10 @@ private RestResponse buildResponse(EsqlQueryResponse esqlResponse) throws IOExce
ChunkedRestResponseBodyPart.fromTextChunks(format.contentType(restRequest), format.format(restRequest, esqlResponse)),
releasable
);
if (esqlResponse.asyncExecutionId().isPresent()) {
restResponse.addHeader(HEADER_NAME_ASYNC_ID, esqlResponse.asyncExecutionId().get());
restResponse.addHeader(HEADER_NAME_ASYNC_RUNNING, String.valueOf(esqlResponse.isRunning()));
}
} else if (mediaType == ArrowFormat.INSTANCE) {
ArrowResponse arrowResponse = new ArrowResponse(
// Map here to avoid cyclic dependencies between the arrow subproject and its parent
Expand Down Expand Up @@ -213,4 +223,23 @@ static void logOnFailure(Throwable throwable) {
RestStatus status = ExceptionsHelper.status(throwable);
LOGGER.log(status.getStatus() >= 500 ? Level.WARN : Level.DEBUG, () -> "Request failed with status [" + status + "]: ", throwable);
}

/*
* Special handling for the "delimiter" parameter which should only be
* checked for being present or not in the case of CSV format. We cannot
* override {@link BaseRestHandler#responseParams()} because this
* parameter should only be checked for CSV, not other formats.
*/
private void checkDelimiter() {
if (mediaType != CSV && restRequest.hasParam(URL_PARAM_DELIMITER)) {
String message = String.format(
Locale.ROOT,
"parameter: [%s] can only be used with the format [%s] for request [%s]",
URL_PARAM_DELIMITER,
CSV.queryParameter(),
restRequest.path()
);
throw new IllegalArgumentException(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (request.hasParam("keep_alive")) {
get.setKeepAlive(request.paramAsTime("keep_alive", get.getKeepAlive()));
}
return channel -> client.execute(EsqlAsyncGetResultAction.INSTANCE, get, new RestRefCountedChunkedToXContentListener<>(channel));
return channel -> client.execute(EsqlAsyncGetResultAction.INSTANCE, get, new EsqlResponseListener(channel, request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ public static MediaType getResponseMediaType(RestRequest request, EsqlQueryReque
return validateColumnarRequest(esqlRequest.columnar(), mediaType, request);
}

/*
* If requests have no media type in params nor headers, default to JSON
*/
public static MediaType getResponseMediaType(RestRequest request) {
MediaType mediaType;
if (request.hasParam(URL_PARAM_FORMAT)) {
mediaType = mediaTypeFromParams(request);
} else {
ParsedMediaType acceptType = request.getParsedAccept();
mediaType = acceptType != null ? acceptType.toMediaType(MEDIA_TYPE_REGISTRY) : request.getXContentType();
}
if (mediaType == null) {
mediaType = XContentType.JSON;
}
return mediaType;
}

private static MediaType mediaTypeFromHeaders(RestRequest request) {
ParsedMediaType acceptType = request.getParsedAccept();
MediaType mediaType = acceptType != null ? acceptType.toMediaType(MEDIA_TYPE_REGISTRY) : request.getXContentType();
Expand Down