1616import org .elasticsearch .client .RequestOptions ;
1717import org .elasticsearch .client .Response ;
1818import org .elasticsearch .client .ResponseException ;
19+ import org .elasticsearch .client .RestClient ;
1920import org .elasticsearch .client .WarningsHandler ;
2021import org .elasticsearch .common .bytes .BytesArray ;
2122import org .elasticsearch .common .io .Streams ;
2223import org .elasticsearch .common .settings .Settings ;
23- import org .elasticsearch .common .xcontent .XContentHelper ;
2424import org .elasticsearch .core .CheckedConsumer ;
2525import org .elasticsearch .core .Nullable ;
2626import org .elasticsearch .core .TimeValue ;
4040
4141import java .io .ByteArrayOutputStream ;
4242import java .io .IOException ;
43- import java .io .InputStream ;
4443import java .io .InputStreamReader ;
4544import java .io .OutputStream ;
45+ import java .io .UncheckedIOException ;
4646import java .nio .charset .StandardCharsets ;
4747import java .time .ZoneId ;
4848import java .util .ArrayList ;
5353import java .util .Locale ;
5454import java .util .Map ;
5555import java .util .Set ;
56+ import java .util .concurrent .ConcurrentHashMap ;
57+ import java .util .concurrent .ConcurrentMap ;
5658import java .util .function .IntFunction ;
5759
5860import static java .util .Collections .emptySet ;
6264import static org .elasticsearch .test .MapMatcher .assertMap ;
6365import static org .elasticsearch .test .MapMatcher .matchesMap ;
6466import static org .elasticsearch .xpack .esql .EsqlTestUtils .as ;
65- import static org .elasticsearch .xpack .esql .qa .rest .EsqlSpecTestCase .assertNotPartial ;
6667import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .ASYNC ;
6768import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .SYNC ;
6869import static org .elasticsearch .xpack .esql .type .EsqlDataTypeConverter .dateTimeToString ;
6970import static org .hamcrest .Matchers .any ;
71+ import static org .hamcrest .Matchers .anyOf ;
7072import static org .hamcrest .Matchers .containsString ;
7173import static org .hamcrest .Matchers .either ;
7274import static org .hamcrest .Matchers .emptyOrNullString ;
@@ -390,7 +392,9 @@ public void testCSVNoHeaderMode() throws IOException {
390392 options .addHeader ("Content-Type" , mediaType );
391393 options .addHeader ("Accept" , "text/csv; header=absent" );
392394 request .setOptions (options );
393- HttpEntity entity = performRequest (request , new AssertWarnings .NoWarnings ());
395+ Response response = performRequest (request );
396+ assertWarnings (response , new AssertWarnings .NoWarnings ());
397+ HttpEntity entity = response .getEntity ();
394398 String actual = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
395399 assertEquals ("keyword0,0\r \n " , actual );
396400 }
@@ -1252,7 +1256,10 @@ public static Map<String, Object> runEsql(
12521256 var results = mode == ASYNC
12531257 ? runEsqlAsync (requestObject , randomBoolean (), assertWarnings )
12541258 : runEsqlSync (requestObject , assertWarnings );
1255- return checkPartialResults ? assertNotPartial (results ) : results ;
1259+ if (checkPartialResults ) {
1260+ assertNotPartial (results );
1261+ }
1262+ return results ;
12561263 }
12571264
12581265 public static Map <String , Object > runEsql (RequestObjectBuilder requestObject , AssertWarnings assertWarnings , Mode mode )
@@ -1263,8 +1270,17 @@ public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, As
12631270 public static Map <String , Object > runEsqlSync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
12641271 Request request = prepareRequestWithOptions (requestObject , SYNC );
12651272
1266- HttpEntity entity = performRequest (request , assertWarnings );
1267- return entityToMap (entity , requestObject .contentType ());
1273+ Response response = performRequest (request );
1274+ HttpEntity entity = response .getEntity ();
1275+ Map <String , Object > json = entityToMap (entity , requestObject .contentType ());
1276+
1277+ var supportsAsyncHeadersFix = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1278+ if (supportsAsyncHeadersFix ) {
1279+ assertNoAsyncHeaders (response );
1280+ }
1281+ assertWarnings (response , assertWarnings );
1282+
1283+ return json ;
12681284 }
12691285
12701286 public static Map <String , Object > runEsqlAsync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
@@ -1292,16 +1308,18 @@ public static Map<String, Object> runEsqlAsync(
12921308 checkKeepOnCompletion (requestObject , json , keepOnCompletion );
12931309 String id = (String ) json .get ("id" );
12941310
1295- var supportsAsyncHeaders = clusterHasCapability ("POST" , "/_query" , List .of (), List .of ("async_query_status_headers" )).orElse (false );
1311+ var supportsAsyncHeaders = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1312+ var supportsSuggestedCast = hasCapabilities (adminClient (), List .of ("suggested_cast" ));
1313+
1314+ // Check headers on initial query call
1315+ if (supportsAsyncHeaders ) {
1316+ assertAsyncHeaders (response , id , (boolean ) json .get ("is_running" ));
1317+ }
12961318
12971319 if (id == null ) {
12981320 // no id returned from an async call, must have completed immediately and without keep_on_completion
12991321 assertThat (requestObject .keepOnCompletion (), either (nullValue ()).or (is (false )));
13001322 assertThat ((boolean ) json .get ("is_running" ), is (false ));
1301- if (supportsAsyncHeaders ) {
1302- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1303- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is ("?0" ));
1304- }
13051323 assertWarnings (response , assertWarnings );
13061324 json .remove ("is_running" ); // remove this to not mess up later map assertions
13071325 return Collections .unmodifiableMap (json );
@@ -1322,11 +1340,6 @@ public static Map<String, Object> runEsqlAsync(
13221340 assertThat (json .get ("pages" ), nullValue ());
13231341 }
13241342
1325- if (supportsAsyncHeaders ) {
1326- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), is (id ));
1327- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is (isRunning ? "?1" : "?0" ));
1328- }
1329-
13301343 // issue a second request to "async get" the results
13311344 Request getRequest = prepareAsyncGetRequest (id );
13321345 getRequest .setOptions (request .getOptions ());
@@ -1336,6 +1349,11 @@ public static Map<String, Object> runEsqlAsync(
13361349
13371350 var result = entityToMap (entity , requestObject .contentType ());
13381351
1352+ // Check headers on get call
1353+ if (supportsAsyncHeaders ) {
1354+ assertAsyncHeaders (response , id , (boolean ) result .get ("is_running" ));
1355+ }
1356+
13391357 // assert initial contents, if any, are the same as async get contents
13401358 if (initialColumns != null ) {
13411359 assertEquals (initialColumns , result .get ("columns" ));
@@ -1347,6 +1365,26 @@ public static Map<String, Object> runEsqlAsync(
13471365 return removeAsyncProperties (result );
13481366 }
13491367
1368+ record CapabilitesCacheKey (RestClient client , List <String > capabilities ) {}
1369+
1370+ /**
1371+ * Cache of capabilities.
1372+ */
1373+ private static final ConcurrentMap <CapabilitesCacheKey , Boolean > capabilities = new ConcurrentHashMap <>();
1374+
1375+ public static boolean hasCapabilities (RestClient client , List <String > requiredCapabilities ) {
1376+ if (requiredCapabilities .isEmpty ()) {
1377+ return true ;
1378+ }
1379+ return capabilities .computeIfAbsent (new CapabilitesCacheKey (client , requiredCapabilities ), r -> {
1380+ try {
1381+ return clusterHasCapability (client , "POST" , "/_query" , List .of (), requiredCapabilities ).orElse (false );
1382+ } catch (IOException e ) {
1383+ throw new UncheckedIOException (e );
1384+ }
1385+ });
1386+ }
1387+
13501388 public void testAsyncGetWithoutContentType () throws IOException {
13511389 int count = randomIntBetween (0 , 100 );
13521390 bulkLoadTestData (count );
@@ -1447,15 +1485,11 @@ static Map<String, Object> removeAsyncProperties(Map<String, Object> map) {
14471485 }
14481486
14491487 protected static Map <String , Object > entityToMap (HttpEntity entity , XContentType expectedContentType ) throws IOException {
1450- try (InputStream content = entity .getContent ()) {
1451- XContentType xContentType = XContentType .fromMediaType (entity .getContentType ().getValue ());
1452- assertEquals (expectedContentType , xContentType );
1453- var map = XContentHelper .convertToMap (xContentType .xContent (), content , false );
1454- if (shouldLog ()) {
1455- LOGGER .info ("entity={}" , map );
1456- }
1457- return map ;
1488+ var result = EsqlTestUtils .entityToMap (entity , expectedContentType );
1489+ if (shouldLog ()) {
1490+ LOGGER .info ("entity={}" , result );
14581491 }
1492+ return result ;
14591493 }
14601494
14611495 static void addAsyncParameters (RequestObjectBuilder requestObject , boolean keepOnCompletion ) throws IOException {
@@ -1520,7 +1554,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15201554 }
15211555
15221556 Response response = performRequest (request );
1523- HttpEntity entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1557+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1558+ HttpEntity entity = response .getEntity ();
15241559
15251560 // get the content, it could be empty because the request might have not completed
15261561 String initialValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
@@ -1573,7 +1608,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15731608 // if `addParam` is false, `options` will already have an `Accept` header
15741609 getRequest .setOptions (options );
15751610 response = performRequest (getRequest );
1576- entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1611+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1612+ entity = response .getEntity ();
15771613 }
15781614 String newValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
15791615
@@ -1587,21 +1623,18 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15871623 }
15881624
15891625 private static Request prepareRequest (Mode mode ) {
1590- Request request = new Request ("POST" , "/_query" + (mode == ASYNC ? "/async" : "" ));
1591- request .addParameter ("error_trace" , "true" ); // Helps with debugging in case something crazy happens on the server.
1592- request .addParameter ("pretty" , "true" ); // Improves error reporting readability
1593- return request ;
1626+ return finishRequest (new Request ("POST" , "/_query" + (mode == ASYNC ? "/async" : "" )));
15941627 }
15951628
15961629 private static Request prepareAsyncGetRequest (String id ) {
1597- Request request = new Request ("GET" , "/_query/async/" + id + "?wait_for_completion_timeout=60s" );
1598- request .addParameter ("error_trace" , "true" ); // Helps with debugging in case something crazy happens on the server.
1599- request .addParameter ("pretty" , "true" ); // Improves error reporting readability
1600- return request ;
1630+ return finishRequest (new Request ("GET" , "/_query/async/" + id + "?wait_for_completion_timeout=6000s" ));
16011631 }
16021632
16031633 private static Request prepareAsyncDeleteRequest (String id ) {
1604- Request request = new Request ("DELETE" , "/_query/async/" + id );
1634+ return finishRequest (new Request ("DELETE" , "/_query/async/" + id ));
1635+ }
1636+
1637+ private static Request finishRequest (Request request ) {
16051638 request .addParameter ("error_trace" , "true" ); // Helps with debugging in case something crazy happens on the server.
16061639 request .addParameter ("pretty" , "true" ); // Improves error reporting readability
16071640 return request ;
@@ -1615,10 +1648,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
16151648 return mediaType ;
16161649 }
16171650
1618- private static HttpEntity performRequest (Request request , AssertWarnings assertWarnings ) throws IOException {
1619- return assertWarnings (performRequest (request ), assertWarnings );
1620- }
1621-
16221651 protected static Response performRequest (Request request ) throws IOException {
16231652 Response response = client ().performRequest (request );
16241653 if (shouldLog ()) {
@@ -1629,14 +1658,19 @@ protected static Response performRequest(Request request) throws IOException {
16291658 return response ;
16301659 }
16311660
1632- private static HttpEntity assertWarnings (Response response , AssertWarnings assertWarnings ) {
1661+ static void assertNotPartial (Map <String , Object > answer ) {
1662+ var clusters = answer .get ("_clusters" );
1663+ var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "" );
1664+ assertThat (reason , answer .get ("is_partial" ), anyOf (nullValue (), is (false )));
1665+ }
1666+
1667+ private static void assertWarnings (Response response , AssertWarnings assertWarnings ) {
16331668 List <String > warnings = new ArrayList <>(response .getWarnings ());
16341669 warnings .removeAll (mutedWarnings ());
16351670 if (shouldLog ()) {
16361671 LOGGER .info ("RESPONSE warnings (after muted)={}" , warnings );
16371672 }
16381673 assertWarnings .assertWarnings (warnings );
1639- return response .getEntity ();
16401674 }
16411675
16421676 private static Set <String > mutedWarnings () {
@@ -1747,6 +1781,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
17471781 assertEquals (200 , client ().performRequest (request ).getStatusLine ().getStatusCode ());
17481782 }
17491783
1784+ private static void assertAsyncHeaders (Response response , @ Nullable String asyncId , boolean isRunning ) {
1785+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), asyncId == null ? nullValue () : equalTo (asyncId ));
1786+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), isRunning ? is ("?1" ) : is ("?0" ));
1787+ }
1788+
1789+ private static void assertNoAsyncHeaders (Response response ) {
1790+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1791+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), nullValue ());
1792+ }
1793+
17501794 public static RequestObjectBuilder requestObjectBuilder () throws IOException {
17511795 return new RequestObjectBuilder ();
17521796 }
0 commit comments