1616import org .elasticsearch .client .RequestOptions ;
1717import org .elasticsearch .client .Response ;
1818import org .elasticsearch .client .ResponseException ;
19+ import org .elasticsearch .client .RestClient ;
1920import org .elasticsearch .client .WarningsHandler ;
2021import org .elasticsearch .common .bytes .BytesArray ;
2122import org .elasticsearch .common .io .Streams ;
4344import java .io .InputStream ;
4445import java .io .InputStreamReader ;
4546import java .io .OutputStream ;
47+ import java .io .UncheckedIOException ;
4648import java .nio .charset .StandardCharsets ;
4749import java .time .ZoneId ;
4850import java .util .ArrayList ;
5355import java .util .Locale ;
5456import java .util .Map ;
5557import java .util .Set ;
58+ import java .util .concurrent .ConcurrentHashMap ;
59+ import java .util .concurrent .ConcurrentMap ;
5660import java .util .function .IntFunction ;
5761
5862import static java .util .Collections .emptySet ;
6569import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .SYNC ;
6670import static org .elasticsearch .xpack .esql .type .EsqlDataTypeConverter .dateTimeToString ;
6771import static org .hamcrest .Matchers .any ;
72+ import static org .hamcrest .Matchers .anyOf ;
6873import static org .hamcrest .Matchers .containsString ;
6974import static org .hamcrest .Matchers .either ;
7075import static org .hamcrest .Matchers .emptyOrNullString ;
@@ -371,7 +376,9 @@ public void testCSVNoHeaderMode() throws IOException {
371376 options .addHeader ("Content-Type" , mediaType );
372377 options .addHeader ("Accept" , "text/csv; header=absent" );
373378 request .setOptions (options );
374- HttpEntity entity = performRequest (request , new AssertWarnings .NoWarnings ());
379+ Response response = performRequest (request );
380+ assertWarnings (response , new AssertWarnings .NoWarnings ());
381+ HttpEntity entity = response .getEntity ();
375382 String actual = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
376383 assertEquals ("keyword0,0\r \n " , actual );
377384 }
@@ -1053,8 +1060,17 @@ static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWar
10531060 public static Map <String , Object > runEsqlSync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
10541061 Request request = prepareRequestWithOptions (requestObject , SYNC );
10551062
1056- HttpEntity entity = performRequest (request , assertWarnings );
1057- return entityToMap (entity , requestObject .contentType ());
1063+ Response response = performRequest (request );
1064+ HttpEntity entity = response .getEntity ();
1065+ Map <String , Object > json = entityToMap (entity , requestObject .contentType ());
1066+
1067+ var supportsAsyncHeadersFix = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1068+ if (supportsAsyncHeadersFix ) {
1069+ assertNoAsyncHeaders (response );
1070+ }
1071+ assertWarnings (response , assertWarnings );
1072+
1073+ return json ;
10581074 }
10591075
10601076 public static Map <String , Object > runEsqlAsync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
@@ -1082,16 +1098,18 @@ public static Map<String, Object> runEsqlAsync(
10821098 checkKeepOnCompletion (requestObject , json , keepOnCompletion );
10831099 String id = (String ) json .get ("id" );
10841100
1085- var supportsAsyncHeaders = clusterHasCapability ("POST" , "/_query" , List .of (), List .of ("async_query_status_headers" )).orElse (false );
1101+ var supportsAsyncHeaders = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1102+ var supportsSuggestedCast = hasCapabilities (adminClient (), List .of ("suggested_cast" ));
1103+
1104+ // Check headers on initial query call
1105+ if (supportsAsyncHeaders ) {
1106+ assertAsyncHeaders (response , id , (boolean ) json .get ("is_running" ));
1107+ }
10861108
10871109 if (id == null ) {
10881110 // no id returned from an async call, must have completed immediately and without keep_on_completion
10891111 assertThat (requestObject .keepOnCompletion (), either (nullValue ()).or (is (false )));
10901112 assertThat ((boolean ) json .get ("is_running" ), is (false ));
1091- if (supportsAsyncHeaders ) {
1092- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1093- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is ("?0" ));
1094- }
10951113 assertWarnings (response , assertWarnings );
10961114 json .remove ("is_running" ); // remove this to not mess up later map assertions
10971115 return Collections .unmodifiableMap (json );
@@ -1112,11 +1130,6 @@ public static Map<String, Object> runEsqlAsync(
11121130 assertThat (json .get ("pages" ), nullValue ());
11131131 }
11141132
1115- if (supportsAsyncHeaders ) {
1116- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), is (id ));
1117- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is (isRunning ? "?1" : "?0" ));
1118- }
1119-
11201133 // issue a second request to "async get" the results
11211134 Request getRequest = prepareAsyncGetRequest (id );
11221135 getRequest .setOptions (request .getOptions ());
@@ -1126,9 +1139,21 @@ public static Map<String, Object> runEsqlAsync(
11261139
11271140 var result = entityToMap (entity , requestObject .contentType ());
11281141
1142+ // Check headers on get call
1143+ if (supportsAsyncHeaders ) {
1144+ assertAsyncHeaders (response , id , (boolean ) result .get ("is_running" ));
1145+ }
1146+
11291147 // assert initial contents, if any, are the same as async get contents
11301148 if (initialColumns != null ) {
1131- assertEquals (initialColumns , result .get ("columns" ));
1149+ if (supportsSuggestedCast == false ) {
1150+ assertEquals (
1151+ removeOriginalTypesAndSuggestedCast (initialColumns ),
1152+ removeOriginalTypesAndSuggestedCast (result .get ("columns" ))
1153+ );
1154+ } else {
1155+ assertEquals (initialColumns , result .get ("columns" ));
1156+ }
11321157 assertEquals (initialValues , result .get ("values" ));
11331158 }
11341159
@@ -1137,6 +1162,45 @@ public static Map<String, Object> runEsqlAsync(
11371162 return removeAsyncProperties (result );
11381163 }
11391164
1165+ record CapabilitesCacheKey (RestClient client , List <String > capabilities ) {}
1166+
1167+ /**
1168+ * Cache of capabilities.
1169+ */
1170+ private static final ConcurrentMap <CapabilitesCacheKey , Boolean > capabilities = new ConcurrentHashMap <>();
1171+
1172+ public static boolean hasCapabilities (RestClient client , List <String > requiredCapabilities ) {
1173+ if (requiredCapabilities .isEmpty ()) {
1174+ return true ;
1175+ }
1176+ return capabilities .computeIfAbsent (new CapabilitesCacheKey (client , requiredCapabilities ), r -> {
1177+ try {
1178+ return clusterHasCapability (client , "POST" , "/_query" , List .of (), requiredCapabilities ).orElse (false );
1179+ } catch (IOException e ) {
1180+ throw new UncheckedIOException (e );
1181+ }
1182+ });
1183+ }
1184+
1185+ private static Object removeOriginalTypesAndSuggestedCast (Object response ) {
1186+ if (response instanceof ArrayList <?> columns ) {
1187+ var newColumns = new ArrayList <>();
1188+ for (var column : columns ) {
1189+ if (column instanceof Map <?, ?> columnMap ) {
1190+ var newMap = new HashMap <>(columnMap );
1191+ newMap .remove ("original_types" );
1192+ newMap .remove ("suggested_cast" );
1193+ newColumns .add (newMap );
1194+ } else {
1195+ newColumns .add (column );
1196+ }
1197+ }
1198+ return newColumns ;
1199+ } else {
1200+ return response ;
1201+ }
1202+ }
1203+
11401204 public void testAsyncGetWithoutContentType () throws IOException {
11411205 int count = randomIntBetween (0 , 100 );
11421206 bulkLoadTestData (count );
@@ -1278,7 +1342,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
12781342 }
12791343
12801344 Response response = performRequest (request );
1281- HttpEntity entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1345+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1346+ HttpEntity entity = response .getEntity ();
12821347
12831348 // get the content, it could be empty because the request might have not completed
12841349 String initialValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
@@ -1331,7 +1396,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
13311396 // if `addParam` is false, `options` will already have an `Accept` header
13321397 getRequest .setOptions (options );
13331398 response = performRequest (getRequest );
1334- entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1399+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1400+ entity = response .getEntity ();
13351401 }
13361402 String newValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
13371403
@@ -1345,21 +1411,18 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
13451411 }
13461412
13471413 private static Request prepareRequest (Mode mode ) {
1348- Request request = new Request ("POST" , "/_query" + (mode == ASYNC ? "/async" : "" ));
1349- request .addParameter ("error_trace" , "true" ); // Helps with debugging in case something crazy happens on the server.
1350- request .addParameter ("pretty" , "true" ); // Improves error reporting readability
1351- return request ;
1414+ return finishRequest (new Request ("POST" , "/_query" + (mode == ASYNC ? "/async" : "" )));
13521415 }
13531416
13541417 private static Request prepareAsyncGetRequest (String id ) {
1355- Request request = new Request ("GET" , "/_query/async/" + id + "?wait_for_completion_timeout=60s" );
1356- request .addParameter ("error_trace" , "true" ); // Helps with debugging in case something crazy happens on the server.
1357- request .addParameter ("pretty" , "true" ); // Improves error reporting readability
1358- return request ;
1418+ return finishRequest (new Request ("GET" , "/_query/async/" + id + "?wait_for_completion_timeout=6000s" ));
13591419 }
13601420
13611421 private static Request prepareAsyncDeleteRequest (String id ) {
1362- Request request = new Request ("DELETE" , "/_query/async/" + id );
1422+ return finishRequest (new Request ("DELETE" , "/_query/async/" + id ));
1423+ }
1424+
1425+ private static Request finishRequest (Request request ) {
13631426 request .addParameter ("error_trace" , "true" ); // Helps with debugging in case something crazy happens on the server.
13641427 request .addParameter ("pretty" , "true" ); // Improves error reporting readability
13651428 return request ;
@@ -1373,11 +1436,7 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
13731436 return mediaType ;
13741437 }
13751438
1376- private static HttpEntity performRequest (Request request , AssertWarnings assertWarnings ) throws IOException {
1377- return assertWarnings (performRequest (request ), assertWarnings );
1378- }
1379-
1380- private static Response performRequest (Request request ) throws IOException {
1439+ protected static Response performRequest (Request request ) throws IOException {
13811440 Response response = client ().performRequest (request );
13821441 if (shouldLog ()) {
13831442 LOGGER .info ("RESPONSE={}" , response );
@@ -1387,14 +1446,19 @@ private static Response performRequest(Request request) throws IOException {
13871446 return response ;
13881447 }
13891448
1390- private static HttpEntity assertWarnings (Response response , AssertWarnings assertWarnings ) {
1449+ static void assertNotPartial (Map <String , Object > answer ) {
1450+ var clusters = answer .get ("_clusters" );
1451+ var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "" );
1452+ assertThat (reason , answer .get ("is_partial" ), anyOf (nullValue (), is (false )));
1453+ }
1454+
1455+ private static void assertWarnings (Response response , AssertWarnings assertWarnings ) {
13911456 List <String > warnings = new ArrayList <>(response .getWarnings ());
13921457 warnings .removeAll (mutedWarnings ());
13931458 if (shouldLog ()) {
13941459 LOGGER .info ("RESPONSE warnings (after muted)={}" , warnings );
13951460 }
13961461 assertWarnings .assertWarnings (warnings );
1397- return response .getEntity ();
13981462 }
13991463
14001464 private static Set <String > mutedWarnings () {
@@ -1505,6 +1569,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
15051569 assertEquals (200 , client ().performRequest (request ).getStatusLine ().getStatusCode ());
15061570 }
15071571
1572+ private static void assertAsyncHeaders (Response response , @ Nullable String asyncId , boolean isRunning ) {
1573+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), asyncId == null ? nullValue () : equalTo (asyncId ));
1574+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), isRunning ? is ("?1" ) : is ("?0" ));
1575+ }
1576+
1577+ private static void assertNoAsyncHeaders (Response response ) {
1578+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1579+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), nullValue ());
1580+ }
1581+
15081582 public static RequestObjectBuilder requestObjectBuilder () throws IOException {
15091583 return new RequestObjectBuilder ();
15101584 }
0 commit comments