Skip to content

Commit 9683bb0

Browse files
committed
ESQL: Fix async query inconsistent headers (#135078)
Fixes #135042 This PR: - Fixes the get-result not always returning the expected headers - Fixes the non-async query incorrectly returning the "is running" async header
1 parent 32b90cf commit 9683bb0

File tree

6 files changed

+134
-54
lines changed

6 files changed

+134
-54
lines changed

docs/changelog/135078.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 135078
2+
summary: Fix async get results with inconsistent headers
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 135042

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 90 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.client.RequestOptions;
1717
import org.elasticsearch.client.Response;
1818
import org.elasticsearch.client.ResponseException;
19+
import org.elasticsearch.client.RestClient;
1920
import org.elasticsearch.client.WarningsHandler;
2021
import org.elasticsearch.common.bytes.BytesArray;
2122
import org.elasticsearch.common.io.Streams;
@@ -41,6 +42,7 @@
4142
import java.io.IOException;
4243
import java.io.InputStreamReader;
4344
import java.io.OutputStream;
45+
import java.io.UncheckedIOException;
4446
import java.nio.charset.StandardCharsets;
4547
import java.time.ZoneId;
4648
import java.util.ArrayList;
@@ -51,6 +53,8 @@
5153
import java.util.Locale;
5254
import java.util.Map;
5355
import java.util.Set;
56+
import java.util.concurrent.ConcurrentHashMap;
57+
import java.util.concurrent.ConcurrentMap;
5458
import java.util.function.IntFunction;
5559

5660
import static java.util.Collections.emptySet;
@@ -60,11 +64,11 @@
6064
import static org.elasticsearch.test.MapMatcher.assertMap;
6165
import static org.elasticsearch.test.MapMatcher.matchesMap;
6266
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
63-
import static org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase.assertNotPartial;
6467
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.ASYNC;
6568
import static org.elasticsearch.xpack.esql.qa.rest.RestEsqlTestCase.Mode.SYNC;
6669
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateTimeToString;
6770
import static org.hamcrest.Matchers.any;
71+
import static org.hamcrest.Matchers.anyOf;
6872
import static org.hamcrest.Matchers.containsString;
6973
import static org.hamcrest.Matchers.either;
7074
import static org.hamcrest.Matchers.emptyOrNullString;
@@ -396,7 +400,9 @@ public void testCSVNoHeaderMode() throws IOException {
396400
options.addHeader("Content-Type", mediaType);
397401
options.addHeader("Accept", "text/csv; header=absent");
398402
request.setOptions(options);
399-
HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
403+
Response response = performRequest(request);
404+
assertWarnings(response, new AssertWarnings.NoWarnings());
405+
HttpEntity entity = response.getEntity();
400406
String actual = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
401407
assertEquals("keyword0,0\r\n", actual);
402408
}
@@ -692,12 +698,12 @@ public void testNamedParamsForIdentifierAndIdentifierPatterns() throws IOExcepti
692698
bulkLoadTestData(10);
693699
// positive
694700
var query = requestObjectBuilder().query(
695-
format(
696-
null,
697-
"from {} | eval x1 = ?n1 | where ?n2 == x1 | stats xx2 = ?fn1(?n3) by ?n4 | keep ?n4, ?n5 | sort ?n4",
698-
testIndexName()
701+
format(
702+
null,
703+
"from {} | eval x1 = ?n1 | where ?n2 == x1 | stats xx2 = ?fn1(?n3) by ?n4 | keep ?n4, ?n5 | sort ?n4",
704+
testIndexName()
705+
)
699706
)
700-
)
701707
.params(
702708
"[{\"n1\" : {\"identifier\" : \"integer\"}}, {\"n2\" : {\"identifier\" : \"short\"}}, "
703709
+ "{\"n3\" : {\"identifier\" : \"double\"}}, {\"n4\" : {\"identifier\" : \"boolean\"}}, "
@@ -832,12 +838,12 @@ public void testDoubleParamsForIdentifiers() throws IOException {
832838
// positive
833839
// named double parameters
834840
var query = requestObjectBuilder().query(
835-
format(
836-
null,
837-
"from {} | eval x1 = ??n1 | where ??n2 == x1 | stats xx2 = ??fn1(??n3) by ??n4 | keep ??n4, ??n5 | sort ??n4",
838-
testIndexName()
841+
format(
842+
null,
843+
"from {} | eval x1 = ??n1 | where ??n2 == x1 | stats xx2 = ??fn1(??n3) by ??n4 | keep ??n4, ??n5 | sort ??n4",
844+
testIndexName()
845+
)
839846
)
840-
)
841847
.params(
842848
"[{\"n1\" : \"integer\"}, {\"n2\" : \"short\"}, {\"n3\" : \"double\"}, {\"n4\" : \"boolean\"}, "
843849
+ "{\"n5\" : \"xx2\"}, {\"fn1\" : \"max\"}]"
@@ -846,12 +852,12 @@ public void testDoubleParamsForIdentifiers() throws IOException {
846852

847853
// positional double parameters
848854
query = requestObjectBuilder().query(
849-
format(
850-
null,
851-
"from {} | eval x1 = ??1 | where ??2 == x1 | stats xx2 = ??6(??3) by ??4 | keep ??4, ??5 | sort ??4",
852-
testIndexName()
855+
format(
856+
null,
857+
"from {} | eval x1 = ??1 | where ??2 == x1 | stats xx2 = ??6(??3) by ??4 | keep ??4, ??5 | sort ??4",
858+
testIndexName()
859+
)
853860
)
854-
)
855861
.params(
856862
"[{\"n1\" : \"integer\"}, {\"n2\" : \"short\"}, {\"n3\" : \"double\"}, {\"n4\" : \"boolean\"}, "
857863
+ "{\"n5\" : \"xx2\"}, {\"fn1\" : \"max\"}]"
@@ -869,8 +875,8 @@ public void testDoubleParamsForIdentifiers() throws IOException {
869875

870876
// anonymous double parameters
871877
query = requestObjectBuilder().query(
872-
format(null, "from {} | eval x1 = ?? | where ?? == x1 | stats xx2 = ??(??) by ?? | keep ??, ?? | sort ??", testIndexName())
873-
)
878+
format(null, "from {} | eval x1 = ?? | where ?? == x1 | stats xx2 = ??(??) by ?? | keep ??, ?? | sort ??", testIndexName())
879+
)
874880
.params(
875881
"[{\"n1\" : \"integer\"}, {\"n2\" : \"short\"}, {\"fn1\" : \"max\"}, {\"n3\" : \"double\"}, {\"n4\" : \"boolean\"}, "
876882
+ "{\"n4\" : \"boolean\"}, {\"n5\" : \"xx2\"}, {\"n4\" : \"boolean\"}]"
@@ -1091,7 +1097,7 @@ public void testComplexFieldNames() throws IOException {
10911097
}
10921098

10931099
/**
1094-
* INLINESTATS <strong>can</strong> group on {@code NOW()}. It's a little silly, but
1100+
* INLINE STATS <strong>can</strong> group on {@code NOW()}. It's a little silly, but
10951101
* doing something like {@code DATE_TRUNC(1 YEAR, NOW() - 1970-01-01T00:00:00Z)} is
10961102
* much more sensible. But just grouping on {@code NOW()} is enough to test this.
10971103
* <p>
@@ -1101,11 +1107,11 @@ public void testComplexFieldNames() throws IOException {
11011107
*/
11021108
@AwaitsFix(bugUrl = "Disabled temporarily until JOIN implementation is completed")
11031109
public void testInlineStatsNow() throws IOException {
1104-
assumeTrue("INLINESTATS only available on snapshots", Build.current().isSnapshot());
1110+
assumeTrue("INLINE STATS only available on snapshots", Build.current().isSnapshot());
11051111
indexTimestampData(1);
11061112

11071113
RequestObjectBuilder builder = requestObjectBuilder().query(
1108-
fromIndex() + " | EVAL now=NOW() | INLINESTATS AVG(value) BY now | SORT value ASC"
1114+
fromIndex() + " | EVAL now=NOW() | INLINE STATS AVG(value) BY now | SORT value ASC"
11091115
);
11101116
Map<String, Object> result = runEsql(builder);
11111117
ListMatcher values = matchesList();
@@ -1115,8 +1121,8 @@ public void testInlineStatsNow() throws IOException {
11151121
.item("value" + i)
11161122
.item("value" + i)
11171123
.item(i)
1118-
.item(any(String.class))
11191124
.item(499.5)
1125+
.item(any(String.class))
11201126
);
11211127
}
11221128
assertResultMap(
@@ -1125,8 +1131,8 @@ public void testInlineStatsNow() throws IOException {
11251131
.item(matchesMap().entry("name", "test").entry("type", "text"))
11261132
.item(matchesMap().entry("name", "test.keyword").entry("type", "keyword"))
11271133
.item(matchesMap().entry("name", "value").entry("type", "long"))
1128-
.item(matchesMap().entry("name", "now").entry("type", "date"))
1129-
.item(matchesMap().entry("name", "AVG(value)").entry("type", "double")),
1134+
.item(matchesMap().entry("name", "AVG(value)").entry("type", "double"))
1135+
.item(matchesMap().entry("name", "now").entry("type", "date")),
11301136
values
11311137
);
11321138
}
@@ -1258,22 +1264,40 @@ public static Map<String, Object> runEsql(
12581264
var results = mode == ASYNC
12591265
? runEsqlAsync(requestObject, randomBoolean(), assertWarnings)
12601266
: runEsqlSync(requestObject, assertWarnings);
1261-
return checkPartialResults ? assertNotPartial(results) : results;
1267+
if (checkPartialResults) {
1268+
assertNotPartial(results);
1269+
}
1270+
return results;
12621271
}
12631272

1264-
public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode)
1265-
throws IOException {
1273+
public static Map<String, Object> runEsql(
1274+
RequestObjectBuilder requestObject,
1275+
AssertWarnings assertWarnings,
1276+
Mode mode
1277+
) throws IOException {
12661278
return runEsql(requestObject, assertWarnings, mode, true);
12671279
}
12681280

1269-
public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
1281+
public static Map<String, Object> runEsqlSync(
1282+
RequestObjectBuilder requestObject,
1283+
AssertWarnings assertWarnings
1284+
) throws IOException {
1285+
Boolean profileEnabled = requestObject.profile;
12701286
Request request = prepareRequestWithOptions(requestObject, SYNC);
12711287

1272-
HttpEntity entity = performRequest(request, assertWarnings);
1273-
return entityToMap(entity, requestObject.contentType());
1288+
Response response = performRequest(request);
1289+
HttpEntity entity = response.getEntity();
1290+
Map<String, Object> json = entityToMap(entity, requestObject.contentType());
1291+
1292+
assertWarnings(response, assertWarnings);
1293+
1294+
return json;
12741295
}
12751296

1276-
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
1297+
public static Map<String, Object> runEsqlAsync(
1298+
RequestObjectBuilder requestObject,
1299+
AssertWarnings assertWarnings
1300+
) throws IOException {
12771301
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
12781302
}
12791303

@@ -1282,6 +1306,7 @@ public static Map<String, Object> runEsqlAsync(
12821306
boolean keepOnCompletion,
12831307
AssertWarnings assertWarnings
12841308
) throws IOException {
1309+
Boolean profileEnabled = requestObject.profile;
12851310
addAsyncParameters(requestObject, keepOnCompletion);
12861311
Request request = prepareRequestWithOptions(requestObject, ASYNC);
12871312

@@ -1298,8 +1323,8 @@ public static Map<String, Object> runEsqlAsync(
12981323
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
12991324
String id = (String) json.get("id");
13001325

1301-
var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
1302-
var supportsSuggestedCast = clusterHasCapability("POST", "/_query", List.of(), List.of("suggested_cast")).orElse(false);
1326+
var supportsAsyncHeaders = hasCapabilities(adminClient(), List.of("async_query_status_headers"));
1327+
var supportsSuggestedCast = hasCapabilities(adminClient(), List.of("suggested_cast"));
13031328

13041329
if (id == null) {
13051330
// no id returned from an async call, must have completed immediately and without keep_on_completion
@@ -1361,6 +1386,26 @@ public static Map<String, Object> runEsqlAsync(
13611386
return removeAsyncProperties(result);
13621387
}
13631388

1389+
record CapabilitesCacheKey(RestClient client, List<String> capabilities) {}
1390+
1391+
/**
1392+
* Cache of capabilities.
1393+
*/
1394+
private static final ConcurrentMap<CapabilitesCacheKey, Boolean> capabilities = new ConcurrentHashMap<>();
1395+
1396+
public static boolean hasCapabilities(RestClient client, List<String> requiredCapabilities) {
1397+
if (requiredCapabilities.isEmpty()) {
1398+
return true;
1399+
}
1400+
return capabilities.computeIfAbsent(new CapabilitesCacheKey(client, requiredCapabilities), r -> {
1401+
try {
1402+
return clusterHasCapability(client, "POST", "/_query", List.of(), requiredCapabilities).orElse(false);
1403+
} catch (IOException e) {
1404+
throw new UncheckedIOException(e);
1405+
}
1406+
});
1407+
}
1408+
13641409
private static Object removeOriginalTypesAndSuggestedCast(Object response) {
13651410
if (response instanceof ArrayList<?> columns) {
13661411
var newColumns = new ArrayList<>();
@@ -1589,7 +1634,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15891634
}
15901635

15911636
Response response = performRequest(request);
1592-
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1637+
assertWarnings(response, new AssertWarnings.NoWarnings());
1638+
HttpEntity entity = response.getEntity();
15931639

15941640
// get the content, it could be empty because the request might have not completed
15951641
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
@@ -1642,7 +1688,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
16421688
// if `addParam` is false, `options` will already have an `Accept` header
16431689
getRequest.setOptions(options);
16441690
response = performRequest(getRequest);
1645-
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1691+
assertWarnings(response, new AssertWarnings.NoWarnings());
1692+
entity = response.getEntity();
16461693
}
16471694
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
16481695

@@ -1681,10 +1728,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
16811728
return mediaType;
16821729
}
16831730

1684-
private static HttpEntity performRequest(Request request, AssertWarnings assertWarnings) throws IOException {
1685-
return assertWarnings(performRequest(request), assertWarnings);
1686-
}
1687-
16881731
protected static Response performRequest(Request request) throws IOException {
16891732
Response response = client().performRequest(request);
16901733
if (shouldLog()) {
@@ -1695,14 +1738,19 @@ protected static Response performRequest(Request request) throws IOException {
16951738
return response;
16961739
}
16971740

1698-
private static HttpEntity assertWarnings(Response response, AssertWarnings assertWarnings) {
1741+
static void assertNotPartial(Map<String, Object> answer) {
1742+
var clusters = answer.get("_clusters");
1743+
var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "");
1744+
assertThat(reason, answer.get("is_partial"), anyOf(nullValue(), is(false)));
1745+
}
1746+
1747+
private static void assertWarnings(Response response, AssertWarnings assertWarnings) {
16991748
List<String> warnings = new ArrayList<>(response.getWarnings());
17001749
warnings.removeAll(mutedWarnings());
17011750
if (shouldLog()) {
17021751
LOGGER.info("RESPONSE warnings (after muted)={}", warnings);
17031752
}
17041753
assertWarnings.assertWarnings(warnings);
1705-
return response.getEntity();
17061754
}
17071755

17081756
private static Set<String> mutedWarnings() {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,11 @@ public enum Cap {
628628
*/
629629
ASYNC_QUERY_STATUS_HEADERS,
630630

631+
/**
632+
* Fix async headers not being sent on "get" requests
633+
*/
634+
ASYNC_QUERY_STATUS_HEADERS_FIX,
635+
631636
/**
632637
* Consider the upper bound when computing the interval in BUCKET auto mode.
633638
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ public boolean isRunning() {
221221
}
222222

223223
public boolean isAsync() {
224-
return isRunning;
224+
return isAsync;
225225
}
226226

227227
public boolean isPartial() {

0 commit comments

Comments
 (0)