1616import org .elasticsearch .client .RequestOptions ;
1717import org .elasticsearch .client .Response ;
1818import org .elasticsearch .client .ResponseException ;
19+ import org .elasticsearch .client .RestClient ;
1920import org .elasticsearch .client .WarningsHandler ;
2021import org .elasticsearch .common .bytes .BytesArray ;
2122import org .elasticsearch .common .io .Streams ;
4142import java .io .IOException ;
4243import java .io .InputStreamReader ;
4344import java .io .OutputStream ;
45+ import java .io .UncheckedIOException ;
4446import java .nio .charset .StandardCharsets ;
4547import java .time .ZoneId ;
4648import java .util .ArrayList ;
5153import java .util .Locale ;
5254import java .util .Map ;
5355import java .util .Set ;
56+ import java .util .concurrent .ConcurrentHashMap ;
57+ import java .util .concurrent .ConcurrentMap ;
5458import java .util .function .IntFunction ;
5559
5660import static java .util .Collections .emptySet ;
6064import static org .elasticsearch .test .MapMatcher .assertMap ;
6165import static org .elasticsearch .test .MapMatcher .matchesMap ;
6266import static org .elasticsearch .xpack .esql .EsqlTestUtils .as ;
63- import static org .elasticsearch .xpack .esql .qa .rest .EsqlSpecTestCase .assertNotPartial ;
6467import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .ASYNC ;
6568import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .SYNC ;
6669import static org .elasticsearch .xpack .esql .type .EsqlDataTypeConverter .dateTimeToString ;
6770import static org .hamcrest .Matchers .any ;
71+ import static org .hamcrest .Matchers .anyOf ;
6872import static org .hamcrest .Matchers .containsString ;
6973import static org .hamcrest .Matchers .either ;
7074import static org .hamcrest .Matchers .emptyOrNullString ;
@@ -396,7 +400,9 @@ public void testCSVNoHeaderMode() throws IOException {
396400 options .addHeader ("Content-Type" , mediaType );
397401 options .addHeader ("Accept" , "text/csv; header=absent" );
398402 request .setOptions (options );
399- HttpEntity entity = performRequest (request , new AssertWarnings .NoWarnings ());
403+ Response response = performRequest (request );
404+ assertWarnings (response , new AssertWarnings .NoWarnings ());
405+ HttpEntity entity = response .getEntity ();
400406 String actual = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
401407 assertEquals ("keyword0,0\r \n " , actual );
402408 }
@@ -1258,7 +1264,10 @@ public static Map<String, Object> runEsql(
12581264 var results = mode == ASYNC
12591265 ? runEsqlAsync (requestObject , randomBoolean (), assertWarnings )
12601266 : runEsqlSync (requestObject , assertWarnings );
1261- return checkPartialResults ? assertNotPartial (results ) : results ;
1267+ if (checkPartialResults ) {
1268+ assertNotPartial (results );
1269+ }
1270+ return results ;
12621271 }
12631272
12641273 public static Map <String , Object > runEsql (RequestObjectBuilder requestObject , AssertWarnings assertWarnings , Mode mode )
@@ -1269,8 +1278,17 @@ public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, As
12691278 public static Map <String , Object > runEsqlSync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
12701279 Request request = prepareRequestWithOptions (requestObject , SYNC );
12711280
1272- HttpEntity entity = performRequest (request , assertWarnings );
1273- return entityToMap (entity , requestObject .contentType ());
1281+ Response response = performRequest (request );
1282+ HttpEntity entity = response .getEntity ();
1283+ Map <String , Object > json = entityToMap (entity , requestObject .contentType ());
1284+
1285+ var supportsAsyncHeadersFix = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1286+ if (supportsAsyncHeadersFix ) {
1287+ assertNoAsyncHeaders (response );
1288+ }
1289+ assertWarnings (response , assertWarnings );
1290+
1291+ return json ;
12741292 }
12751293
12761294 public static Map <String , Object > runEsqlAsync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
@@ -1298,17 +1316,18 @@ public static Map<String, Object> runEsqlAsync(
12981316 checkKeepOnCompletion (requestObject , json , keepOnCompletion );
12991317 String id = (String ) json .get ("id" );
13001318
1301- var supportsAsyncHeaders = clusterHasCapability ("POST" , "/_query" , List .of (), List .of ("async_query_status_headers" )).orElse (false );
1302- var supportsSuggestedCast = clusterHasCapability ("POST" , "/_query" , List .of (), List .of ("suggested_cast" )).orElse (false );
1319+ var supportsAsyncHeaders = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1320+ var supportsSuggestedCast = hasCapabilities (adminClient (), List .of ("suggested_cast" ));
1321+
1322+ // Check headers on initial query call
1323+ if (supportsAsyncHeaders ) {
1324+ assertAsyncHeaders (response , id , (boolean ) json .get ("is_running" ));
1325+ }
13031326
13041327 if (id == null ) {
13051328 // no id returned from an async call, must have completed immediately and without keep_on_completion
13061329 assertThat (requestObject .keepOnCompletion (), either (nullValue ()).or (is (false )));
13071330 assertThat ((boolean ) json .get ("is_running" ), is (false ));
1308- if (supportsAsyncHeaders ) {
1309- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1310- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is ("?0" ));
1311- }
13121331 assertWarnings (response , assertWarnings );
13131332 json .remove ("is_running" ); // remove this to not mess up later map assertions
13141333 return Collections .unmodifiableMap (json );
@@ -1329,11 +1348,6 @@ public static Map<String, Object> runEsqlAsync(
13291348 assertThat (json .get ("pages" ), nullValue ());
13301349 }
13311350
1332- if (supportsAsyncHeaders ) {
1333- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), is (id ));
1334- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is (isRunning ? "?1" : "?0" ));
1335- }
1336-
13371351 // issue a second request to "async get" the results
13381352 Request getRequest = prepareAsyncGetRequest (id );
13391353 getRequest .setOptions (request .getOptions ());
@@ -1343,6 +1357,11 @@ public static Map<String, Object> runEsqlAsync(
13431357
13441358 var result = entityToMap (entity , requestObject .contentType ());
13451359
1360+ // Check headers on get call
1361+ if (supportsAsyncHeaders ) {
1362+ assertAsyncHeaders (response , id , (boolean ) result .get ("is_running" ));
1363+ }
1364+
13461365 // assert initial contents, if any, are the same as async get contents
13471366 if (initialColumns != null ) {
13481367 if (supportsSuggestedCast == false ) {
@@ -1361,6 +1380,26 @@ public static Map<String, Object> runEsqlAsync(
13611380 return removeAsyncProperties (result );
13621381 }
13631382
1383+ record CapabilitesCacheKey (RestClient client , List <String > capabilities ) {}
1384+
1385+ /**
1386+ * Cache of capabilities.
1387+ */
1388+ private static final ConcurrentMap <CapabilitesCacheKey , Boolean > capabilities = new ConcurrentHashMap <>();
1389+
1390+ public static boolean hasCapabilities (RestClient client , List <String > requiredCapabilities ) {
1391+ if (requiredCapabilities .isEmpty ()) {
1392+ return true ;
1393+ }
1394+ return capabilities .computeIfAbsent (new CapabilitesCacheKey (client , requiredCapabilities ), r -> {
1395+ try {
1396+ return clusterHasCapability (client , "POST" , "/_query" , List .of (), requiredCapabilities ).orElse (false );
1397+ } catch (IOException e ) {
1398+ throw new UncheckedIOException (e );
1399+ }
1400+ });
1401+ }
1402+
13641403 private static Object removeOriginalTypesAndSuggestedCast (Object response ) {
13651404 if (response instanceof ArrayList <?> columns ) {
13661405 var newColumns = new ArrayList <>();
@@ -1589,7 +1628,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15891628 }
15901629
15911630 Response response = performRequest (request );
1592- HttpEntity entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1631+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1632+ HttpEntity entity = response .getEntity ();
15931633
15941634 // get the content, it could be empty because the request might have not completed
15951635 String initialValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
@@ -1642,7 +1682,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
16421682 // if `addParam` is false, `options` will already have an `Accept` header
16431683 getRequest .setOptions (options );
16441684 response = performRequest (getRequest );
1645- entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1685+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1686+ entity = response .getEntity ();
16461687 }
16471688 String newValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
16481689
@@ -1681,10 +1722,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
16811722 return mediaType ;
16821723 }
16831724
1684- private static HttpEntity performRequest (Request request , AssertWarnings assertWarnings ) throws IOException {
1685- return assertWarnings (performRequest (request ), assertWarnings );
1686- }
1687-
16881725 protected static Response performRequest (Request request ) throws IOException {
16891726 Response response = client ().performRequest (request );
16901727 if (shouldLog ()) {
@@ -1695,14 +1732,19 @@ protected static Response performRequest(Request request) throws IOException {
16951732 return response ;
16961733 }
16971734
1698- private static HttpEntity assertWarnings (Response response , AssertWarnings assertWarnings ) {
1735+ static void assertNotPartial (Map <String , Object > answer ) {
1736+ var clusters = answer .get ("_clusters" );
1737+ var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "" );
1738+ assertThat (reason , answer .get ("is_partial" ), anyOf (nullValue (), is (false )));
1739+ }
1740+
1741+ private static void assertWarnings (Response response , AssertWarnings assertWarnings ) {
16991742 List <String > warnings = new ArrayList <>(response .getWarnings ());
17001743 warnings .removeAll (mutedWarnings ());
17011744 if (shouldLog ()) {
17021745 LOGGER .info ("RESPONSE warnings (after muted)={}" , warnings );
17031746 }
17041747 assertWarnings .assertWarnings (warnings );
1705- return response .getEntity ();
17061748 }
17071749
17081750 private static Set <String > mutedWarnings () {
@@ -1813,6 +1855,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
18131855 assertEquals (200 , client ().performRequest (request ).getStatusLine ().getStatusCode ());
18141856 }
18151857
1858+ private static void assertAsyncHeaders (Response response , @ Nullable String asyncId , boolean isRunning ) {
1859+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), asyncId == null ? nullValue () : equalTo (asyncId ));
1860+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), isRunning ? is ("?1" ) : is ("?0" ));
1861+ }
1862+
1863+ private static void assertNoAsyncHeaders (Response response ) {
1864+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1865+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), nullValue ());
1866+ }
1867+
18161868 public static RequestObjectBuilder requestObjectBuilder () throws IOException {
18171869 return new RequestObjectBuilder ();
18181870 }
0 commit comments