1616import org .elasticsearch .client .RequestOptions ;
1717import org .elasticsearch .client .Response ;
1818import org .elasticsearch .client .ResponseException ;
19+ import org .elasticsearch .client .RestClient ;
1920import org .elasticsearch .client .WarningsHandler ;
2021import org .elasticsearch .common .bytes .BytesArray ;
2122import org .elasticsearch .common .io .Streams ;
4142import java .io .IOException ;
4243import java .io .InputStreamReader ;
4344import java .io .OutputStream ;
45+ import java .io .UncheckedIOException ;
4446import java .nio .charset .StandardCharsets ;
4547import java .time .ZoneId ;
4648import java .util .ArrayList ;
5153import java .util .Locale ;
5254import java .util .Map ;
5355import java .util .Set ;
56+ import java .util .concurrent .ConcurrentHashMap ;
57+ import java .util .concurrent .ConcurrentMap ;
5458import java .util .function .IntFunction ;
5559
5660import static java .util .Collections .emptySet ;
6064import static org .elasticsearch .test .MapMatcher .assertMap ;
6165import static org .elasticsearch .test .MapMatcher .matchesMap ;
6266import static org .elasticsearch .xpack .esql .EsqlTestUtils .as ;
63- import static org .elasticsearch .xpack .esql .qa .rest .EsqlSpecTestCase .assertNotPartial ;
6467import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .ASYNC ;
6568import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .SYNC ;
6669import static org .elasticsearch .xpack .esql .type .EsqlDataTypeConverter .dateTimeToString ;
6770import static org .hamcrest .Matchers .any ;
71+ import static org .hamcrest .Matchers .anyOf ;
6872import static org .hamcrest .Matchers .containsString ;
6973import static org .hamcrest .Matchers .either ;
7074import static org .hamcrest .Matchers .emptyOrNullString ;
@@ -396,7 +400,9 @@ public void testCSVNoHeaderMode() throws IOException {
396400 options .addHeader ("Content-Type" , mediaType );
397401 options .addHeader ("Accept" , "text/csv; header=absent" );
398402 request .setOptions (options );
399- HttpEntity entity = performRequest (request , new AssertWarnings .NoWarnings ());
403+ Response response = performRequest (request );
404+ assertWarnings (response , new AssertWarnings .NoWarnings ());
405+ HttpEntity entity = response .getEntity ();
400406 String actual = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
401407 assertEquals ("keyword0,0\r \n " , actual );
402408 }
@@ -1091,7 +1097,7 @@ public void testComplexFieldNames() throws IOException {
10911097 }
10921098
10931099 /**
1094- * INLINESTATS <strong>can</strong> group on {@code NOW()}. It's a little silly, but
1100+ * INLINE STATS <strong>can</strong> group on {@code NOW()}. It's a little silly, but
10951101 * doing something like {@code DATE_TRUNC(1 YEAR, NOW() - 1970-01-01T00:00:00Z)} is
10961102 * much more sensible. But just grouping on {@code NOW()} is enough to test this.
10971103 * <p>
@@ -1101,11 +1107,11 @@ public void testComplexFieldNames() throws IOException {
11011107 */
11021108 @ AwaitsFix (bugUrl = "Disabled temporarily until JOIN implementation is completed" )
11031109 public void testInlineStatsNow () throws IOException {
1104- assumeTrue ("INLINESTATS only available on snapshots" , Build .current ().isSnapshot ());
1110+ assumeTrue ("INLINE STATS only available on snapshots" , Build .current ().isSnapshot ());
11051111 indexTimestampData (1 );
11061112
11071113 RequestObjectBuilder builder = requestObjectBuilder ().query (
1108- fromIndex () + " | EVAL now=NOW() | INLINESTATS AVG(value) BY now | SORT value ASC"
1114+ fromIndex () + " | EVAL now=NOW() | INLINE STATS AVG(value) BY now | SORT value ASC"
11091115 );
11101116 Map <String , Object > result = runEsql (builder );
11111117 ListMatcher values = matchesList ();
@@ -1258,7 +1264,10 @@ public static Map<String, Object> runEsql(
12581264 var results = mode == ASYNC
12591265 ? runEsqlAsync (requestObject , randomBoolean (), assertWarnings )
12601266 : runEsqlSync (requestObject , assertWarnings );
1261- return checkPartialResults ? assertNotPartial (results ) : results ;
1267+ if (checkPartialResults ) {
1268+ assertNotPartial (results );
1269+ }
1270+ return results ;
12621271 }
12631272
12641273 public static Map <String , Object > runEsql (RequestObjectBuilder requestObject , AssertWarnings assertWarnings , Mode mode )
@@ -1269,8 +1278,13 @@ public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, As
12691278 public static Map <String , Object > runEsqlSync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
12701279 Request request = prepareRequestWithOptions (requestObject , SYNC );
12711280
1272- HttpEntity entity = performRequest (request , assertWarnings );
1273- return entityToMap (entity , requestObject .contentType ());
1281+ Response response = performRequest (request );
1282+ HttpEntity entity = response .getEntity ();
1283+ Map <String , Object > json = entityToMap (entity , requestObject .contentType ());
1284+
1285+ assertWarnings (response , assertWarnings );
1286+
1287+ return json ;
12741288 }
12751289
12761290 public static Map <String , Object > runEsqlAsync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
@@ -1298,8 +1312,8 @@ public static Map<String, Object> runEsqlAsync(
12981312 checkKeepOnCompletion (requestObject , json , keepOnCompletion );
12991313 String id = (String ) json .get ("id" );
13001314
1301- var supportsAsyncHeaders = clusterHasCapability ( "POST" , "/_query" , List . of (), List .of ("async_query_status_headers" )). orElse ( false );
1302- var supportsSuggestedCast = clusterHasCapability ( "POST" , "/_query" , List . of (), List .of ("suggested_cast" )). orElse ( false );
1315+ var supportsAsyncHeaders = hasCapabilities ( adminClient (), List .of ("async_query_status_headers" ));
1316+ var supportsSuggestedCast = hasCapabilities ( adminClient (), List .of ("suggested_cast" ));
13031317
13041318 if (id == null ) {
13051319 // no id returned from an async call, must have completed immediately and without keep_on_completion
@@ -1361,6 +1375,26 @@ public static Map<String, Object> runEsqlAsync(
13611375 return removeAsyncProperties (result );
13621376 }
13631377
1378+ record CapabilitesCacheKey (RestClient client , List <String > capabilities ) {}
1379+
1380+ /**
1381+ * Cache of capabilities.
1382+ */
1383+ private static final ConcurrentMap <CapabilitesCacheKey , Boolean > capabilities = new ConcurrentHashMap <>();
1384+
1385+ public static boolean hasCapabilities (RestClient client , List <String > requiredCapabilities ) {
1386+ if (requiredCapabilities .isEmpty ()) {
1387+ return true ;
1388+ }
1389+ return capabilities .computeIfAbsent (new CapabilitesCacheKey (client , requiredCapabilities ), r -> {
1390+ try {
1391+ return clusterHasCapability (client , "POST" , "/_query" , List .of (), requiredCapabilities ).orElse (false );
1392+ } catch (IOException e ) {
1393+ throw new UncheckedIOException (e );
1394+ }
1395+ });
1396+ }
1397+
13641398 private static Object removeOriginalTypesAndSuggestedCast (Object response ) {
13651399 if (response instanceof ArrayList <?> columns ) {
13661400 var newColumns = new ArrayList <>();
@@ -1589,7 +1623,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
15891623 }
15901624
15911625 Response response = performRequest (request );
1592- HttpEntity entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1626+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1627+ HttpEntity entity = response .getEntity ();
15931628
15941629 // get the content, it could be empty because the request might have not completed
15951630 String initialValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
@@ -1642,7 +1677,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
16421677 // if `addParam` is false, `options` will already have an `Accept` header
16431678 getRequest .setOptions (options );
16441679 response = performRequest (getRequest );
1645- entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1680+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1681+ entity = response .getEntity ();
16461682 }
16471683 String newValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
16481684
@@ -1681,10 +1717,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
16811717 return mediaType ;
16821718 }
16831719
1684- private static HttpEntity performRequest (Request request , AssertWarnings assertWarnings ) throws IOException {
1685- return assertWarnings (performRequest (request ), assertWarnings );
1686- }
1687-
16881720 protected static Response performRequest (Request request ) throws IOException {
16891721 Response response = client ().performRequest (request );
16901722 if (shouldLog ()) {
@@ -1695,14 +1727,19 @@ protected static Response performRequest(Request request) throws IOException {
16951727 return response ;
16961728 }
16971729
1698- private static HttpEntity assertWarnings (Response response , AssertWarnings assertWarnings ) {
1730+ static void assertNotPartial (Map <String , Object > answer ) {
1731+ var clusters = answer .get ("_clusters" );
1732+ var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "" );
1733+ assertThat (reason , answer .get ("is_partial" ), anyOf (nullValue (), is (false )));
1734+ }
1735+
1736+ private static void assertWarnings (Response response , AssertWarnings assertWarnings ) {
16991737 List <String > warnings = new ArrayList <>(response .getWarnings ());
17001738 warnings .removeAll (mutedWarnings ());
17011739 if (shouldLog ()) {
17021740 LOGGER .info ("RESPONSE warnings (after muted)={}" , warnings );
17031741 }
17041742 assertWarnings .assertWarnings (warnings );
1705- return response .getEntity ();
17061743 }
17071744
17081745 private static Set <String > mutedWarnings () {
0 commit comments