-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ESQL: Enable async get to support formatting #111104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
bpintea
merged 37 commits into
elastic:main
from
kanoshiou:async-parameters-returns-as-headers-in-text-mode
Dec 9, 2024
Merged
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit
Hold shift + click to select a range
462a8e5
Async request in txt mode returns the async parameters as headers
kanoshiou 33e8b1f
Add changelog
kanoshiou f0285ea
Typo
kanoshiou f77b893
Change listener of `/_query/async/{id}` to `EsqlResponseListener` to …
kanoshiou e81487a
Change HTTP header names access modifiers to private
kanoshiou c27716f
Revert "Change HTTP header names access modifiers to private"
kanoshiou 9a010a5
Response media type defaults to JSON if there is no specific media ty…
kanoshiou 12ef64d
Add tests for async query in text mode
kanoshiou b774b61
Get requests do not need checkNonNullMediaType
kanoshiou c3c1396
Update doc
kanoshiou e94872a
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou 3f48c01
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou d6bd2e2
Merge remote-tracking branch 'origin/main' into async-parameters-retu…
kanoshiou cc9a40b
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou 6f86392
small changes
kanoshiou e8484c8
Refactor `runEsqlAsync` to support text format
kanoshiou 106b114
Refactor `runEsqlSync` to support text format
kanoshiou 76d3be7
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou 7f9cece
Overload `runEsql` to support text formats
kanoshiou c01d53b
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou e0786a7
Replace async headers
kanoshiou 3a3912c
Revert
kanoshiou 4f505fe
Add a private constructor for `EsqlResponseListener`
kanoshiou ae03113
Check non-null media type
kanoshiou 7fc2bbb
Refactor `runEsqlAsTextWithFormat` to support async mode
kanoshiou cb50b12
Remove static map `TEXT_FORMATS`
kanoshiou 3bf781d
Update 111104.yaml
kanoshiou b519248
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou eb9e49d
Update 111104.yaml
kanoshiou 5ed973f
Update 111104.yaml
kanoshiou 2904aa2
Update
kanoshiou b0da5b5
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea 444ed89
Allow an ASYNC GET with no media type spec
bpintea 389dd8a
Merge branch 'main' into fork/kanoshiou/async-parameters-returns-as-h…
bpintea 5b02528
Testcases for request with no Content-type header
kanoshiou 201a45d
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea 0fb5625
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -351,7 +351,6 @@ public void testTextMode() throws IOException { | |
bulkLoadTestData(count); | ||
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100"); | ||
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null, mode)); | ||
|
||
} | ||
|
||
public void testCSVMode() throws IOException { | ||
|
@@ -1004,53 +1003,35 @@ public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject | |
} | ||
|
||
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject) throws IOException { | ||
return runEsqlAsync(requestObject, new AssertWarnings.NoWarnings()); | ||
return runEsqlAsync(requestObject, randomBoolean(), new AssertWarnings.NoWarnings()); | ||
} | ||
|
||
static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) throws IOException { | ||
if (mode == ASYNC) { | ||
return runEsqlAsync(requestObject, assertWarnings); | ||
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings); | ||
} else { | ||
return runEsqlSync(requestObject, assertWarnings); | ||
} | ||
} | ||
|
||
public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { | ||
requestObject.build(); | ||
Request request = prepareRequest(SYNC); | ||
String mediaType = attachBody(requestObject, request); | ||
|
||
RequestOptions.Builder options = request.getOptions().toBuilder(); | ||
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves | ||
options.addHeader("Content-Type", mediaType); | ||
|
||
if (randomBoolean()) { | ||
options.addHeader("Accept", mediaType); | ||
} else { | ||
request.addParameter("format", requestObject.contentType().queryParameter()); | ||
} | ||
request.setOptions(options); | ||
Request request = prepareRequestWithOptions(requestObject, SYNC); | ||
|
||
HttpEntity entity = performRequest(request, assertWarnings); | ||
return entityToMap(entity, requestObject.contentType()); | ||
} | ||
|
||
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException { | ||
addAsyncParameters(requestObject); | ||
requestObject.build(); | ||
Request request = prepareRequest(ASYNC); | ||
String mediaType = attachBody(requestObject, request); | ||
|
||
RequestOptions.Builder options = request.getOptions().toBuilder(); | ||
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves | ||
options.addHeader("Content-Type", mediaType); | ||
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings); | ||
} | ||
|
||
if (randomBoolean()) { | ||
options.addHeader("Accept", mediaType); | ||
} else { | ||
request.addParameter("format", requestObject.contentType().queryParameter()); | ||
} | ||
request.setOptions(options); | ||
public static Map<String, Object> runEsqlAsync( | ||
RequestObjectBuilder requestObject, | ||
boolean keepOnCompletion, | ||
AssertWarnings assertWarnings | ||
) throws IOException { | ||
addAsyncParameters(requestObject, keepOnCompletion); | ||
Request request = prepareRequestWithOptions(requestObject, ASYNC); | ||
|
||
if (shouldLog()) { | ||
LOGGER.info("REQUEST={}", request); | ||
|
@@ -1062,7 +1043,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec | |
Object initialColumns = null; | ||
Object initialValues = null; | ||
var json = entityToMap(entity, requestObject.contentType()); | ||
checkKeepOnCompletion(requestObject, json); | ||
checkKeepOnCompletion(requestObject, json, keepOnCompletion); | ||
String id = (String) json.get("id"); | ||
|
||
var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false); | ||
|
@@ -1102,7 +1083,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec | |
|
||
// issue a second request to "async get" the results | ||
Request getRequest = prepareAsyncGetRequest(id); | ||
getRequest.setOptions(options); | ||
getRequest.setOptions(request.getOptions()); | ||
response = performRequest(getRequest); | ||
entity = response.getEntity(); | ||
} | ||
|
@@ -1120,6 +1101,66 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec | |
return removeAsyncProperties(result); | ||
} | ||
|
||
public void testAsyncGetWithoutContentType() throws IOException { | ||
int count = randomIntBetween(0, 100); | ||
bulkLoadTestData(count); | ||
var requestObject = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100"); | ||
|
||
addAsyncParameters(requestObject, true); | ||
Request request = prepareRequestWithOptions(requestObject, ASYNC); | ||
|
||
if (shouldLog()) { | ||
LOGGER.info("REQUEST={}", request); | ||
} | ||
|
||
Response response = performRequest(request); | ||
HttpEntity entity = response.getEntity(); | ||
|
||
var json = entityToMap(entity, requestObject.contentType()); | ||
checkKeepOnCompletion(requestObject, json, true); | ||
String id = (String) json.get("id"); | ||
// results won't be returned since keepOnCompletion is true | ||
assertThat(id, is(not(emptyOrNullString()))); | ||
|
||
// issue an "async get" request with no Content-Type | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
Request getRequest = prepareAsyncGetRequest(id); | ||
response = performRequest(getRequest); | ||
entity = response.getEntity(); | ||
var result = entityToMap(entity, XContentType.JSON); | ||
|
||
ListMatcher values = matchesList(); | ||
for (int i = 0; i < count; i++) { | ||
values = values.item(matchesList().item("keyword" + i).item(i)); | ||
} | ||
assertMap( | ||
result, | ||
matchesMap().entry( | ||
"columns", | ||
matchesList().item(matchesMap().entry("name", "keyword").entry("type", "keyword")) | ||
.item(matchesMap().entry("name", "integer").entry("type", "integer")) | ||
).entry("values", values).entry("took", greaterThanOrEqualTo(0)).entry("id", id).entry("is_running", false) | ||
); | ||
|
||
} | ||
|
||
static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
requestObject.build(); | ||
Request request = prepareRequest(mode); | ||
String mediaType = attachBody(requestObject, request); | ||
|
||
RequestOptions.Builder options = request.getOptions().toBuilder(); | ||
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves | ||
options.addHeader("Content-Type", mediaType); | ||
|
||
if (randomBoolean()) { | ||
options.addHeader("Accept", mediaType); | ||
} else { | ||
request.addParameter("format", requestObject.contentType().queryParameter()); | ||
} | ||
request.setOptions(options); | ||
return request; | ||
} | ||
|
||
// Removes async properties, otherwise consuming assertions would need to handle sync and async differences | ||
static Map<String, Object> removeAsyncProperties(Map<String, Object> map) { | ||
Map<String, Object> copy = new HashMap<>(map); | ||
|
@@ -1140,17 +1181,20 @@ protected static Map<String, Object> entityToMap(HttpEntity entity, XContentType | |
} | ||
} | ||
|
||
static void addAsyncParameters(RequestObjectBuilder requestObject) throws IOException { | ||
static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException { | ||
// deliberately short in order to frequently trigger return without results | ||
requestObject.waitForCompletion(TimeValue.timeValueNanos(randomIntBetween(1, 100))); | ||
requestObject.keepOnCompletion(randomBoolean()); | ||
requestObject.keepOnCompletion(keepOnCompletion); | ||
requestObject.keepAlive(TimeValue.timeValueDays(randomIntBetween(1, 10))); | ||
} | ||
|
||
// If keep_on_completion is set then an id must always be present, regardless of the value of any other property. | ||
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json) { | ||
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json, boolean keepOnCompletion) { | ||
if (requestObject.keepOnCompletion()) { | ||
assertTrue(keepOnCompletion); | ||
assertThat((String) json.get("id"), not(emptyOrNullString())); | ||
} else { | ||
assertFalse(keepOnCompletion); | ||
} | ||
} | ||
|
||
|
@@ -1172,7 +1216,7 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma | |
throws IOException { | ||
Request request = prepareRequest(mode); | ||
if (mode == ASYNC) { | ||
addAsyncParameters(builder); | ||
addAsyncParameters(builder, randomBoolean()); | ||
} | ||
String mediaType = attachBody(builder.build(), request); | ||
|
||
|
@@ -1194,6 +1238,10 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma | |
} | ||
request.setOptions(options); | ||
|
||
if (shouldLog()) { | ||
LOGGER.info("REQUEST={}", request); | ||
} | ||
|
||
Response response = performRequest(request); | ||
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings()); | ||
|
||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: if they're not returned, it's because the
waitForCompletion()
is provided a very small interval (seeaddAsyncParameters()
), so ES won't have the time to query and respond with results in time.