16
16
import org .elasticsearch .client .RequestOptions ;
17
17
import org .elasticsearch .client .Response ;
18
18
import org .elasticsearch .client .ResponseException ;
19
+ import org .elasticsearch .client .RestClient ;
19
20
import org .elasticsearch .client .WarningsHandler ;
20
21
import org .elasticsearch .common .bytes .BytesArray ;
21
22
import org .elasticsearch .common .io .Streams ;
41
42
import java .io .IOException ;
42
43
import java .io .InputStreamReader ;
43
44
import java .io .OutputStream ;
45
+ import java .io .UncheckedIOException ;
44
46
import java .nio .charset .StandardCharsets ;
45
47
import java .time .ZoneId ;
46
48
import java .util .ArrayList ;
51
53
import java .util .Locale ;
52
54
import java .util .Map ;
53
55
import java .util .Set ;
56
+ import java .util .concurrent .ConcurrentHashMap ;
57
+ import java .util .concurrent .ConcurrentMap ;
54
58
import java .util .function .IntFunction ;
55
59
56
60
import static java .util .Collections .emptySet ;
60
64
import static org .elasticsearch .test .MapMatcher .assertMap ;
61
65
import static org .elasticsearch .test .MapMatcher .matchesMap ;
62
66
import static org .elasticsearch .xpack .esql .EsqlTestUtils .as ;
63
- import static org .elasticsearch .xpack .esql .qa .rest .EsqlSpecTestCase .assertNotPartial ;
64
67
import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .ASYNC ;
65
68
import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .SYNC ;
66
69
import static org .elasticsearch .xpack .esql .type .EsqlDataTypeConverter .dateTimeToString ;
67
70
import static org .hamcrest .Matchers .any ;
71
+ import static org .hamcrest .Matchers .anyOf ;
68
72
import static org .hamcrest .Matchers .containsString ;
69
73
import static org .hamcrest .Matchers .either ;
70
74
import static org .hamcrest .Matchers .emptyOrNullString ;
@@ -396,7 +400,9 @@ public void testCSVNoHeaderMode() throws IOException {
396
400
options .addHeader ("Content-Type" , mediaType );
397
401
options .addHeader ("Accept" , "text/csv; header=absent" );
398
402
request .setOptions (options );
399
- HttpEntity entity = performRequest (request , new AssertWarnings .NoWarnings ());
403
+ Response response = performRequest (request );
404
+ assertWarnings (response , new AssertWarnings .NoWarnings ());
405
+ HttpEntity entity = response .getEntity ();
400
406
String actual = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
401
407
assertEquals ("keyword0,0\r \n " , actual );
402
408
}
@@ -1258,7 +1264,10 @@ public static Map<String, Object> runEsql(
1258
1264
var results = mode == ASYNC
1259
1265
? runEsqlAsync (requestObject , randomBoolean (), assertWarnings )
1260
1266
: runEsqlSync (requestObject , assertWarnings );
1261
- return checkPartialResults ? assertNotPartial (results ) : results ;
1267
+ if (checkPartialResults ) {
1268
+ assertNotPartial (results );
1269
+ }
1270
+ return results ;
1262
1271
}
1263
1272
1264
1273
public static Map <String , Object > runEsql (RequestObjectBuilder requestObject , AssertWarnings assertWarnings , Mode mode )
@@ -1269,8 +1278,17 @@ public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, As
1269
1278
public static Map <String , Object > runEsqlSync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
1270
1279
Request request = prepareRequestWithOptions (requestObject , SYNC );
1271
1280
1272
- HttpEntity entity = performRequest (request , assertWarnings );
1273
- return entityToMap (entity , requestObject .contentType ());
1281
+ Response response = performRequest (request );
1282
+ HttpEntity entity = response .getEntity ();
1283
+ Map <String , Object > json = entityToMap (entity , requestObject .contentType ());
1284
+
1285
+ var supportsAsyncHeadersFix = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1286
+ if (supportsAsyncHeadersFix ) {
1287
+ assertNoAsyncHeaders (response );
1288
+ }
1289
+ assertWarnings (response , assertWarnings );
1290
+
1291
+ return json ;
1274
1292
}
1275
1293
1276
1294
public static Map <String , Object > runEsqlAsync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
@@ -1298,17 +1316,18 @@ public static Map<String, Object> runEsqlAsync(
1298
1316
checkKeepOnCompletion (requestObject , json , keepOnCompletion );
1299
1317
String id = (String ) json .get ("id" );
1300
1318
1301
- var supportsAsyncHeaders = clusterHasCapability ("POST" , "/_query" , List .of (), List .of ("async_query_status_headers" )).orElse (false );
1302
- var supportsSuggestedCast = clusterHasCapability ("POST" , "/_query" , List .of (), List .of ("suggested_cast" )).orElse (false );
1319
+ var supportsAsyncHeaders = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1320
+ var supportsSuggestedCast = hasCapabilities (adminClient (), List .of ("suggested_cast" ));
1321
+
1322
+ // Check headers on initial query call
1323
+ if (supportsAsyncHeaders ) {
1324
+ assertAsyncHeaders (response , id , (boolean ) json .get ("is_running" ));
1325
+ }
1303
1326
1304
1327
if (id == null ) {
1305
1328
// no id returned from an async call, must have completed immediately and without keep_on_completion
1306
1329
assertThat (requestObject .keepOnCompletion (), either (nullValue ()).or (is (false )));
1307
1330
assertThat ((boolean ) json .get ("is_running" ), is (false ));
1308
- if (supportsAsyncHeaders ) {
1309
- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1310
- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is ("?0" ));
1311
- }
1312
1331
assertWarnings (response , assertWarnings );
1313
1332
json .remove ("is_running" ); // remove this to not mess up later map assertions
1314
1333
return Collections .unmodifiableMap (json );
@@ -1329,11 +1348,6 @@ public static Map<String, Object> runEsqlAsync(
1329
1348
assertThat (json .get ("pages" ), nullValue ());
1330
1349
}
1331
1350
1332
- if (supportsAsyncHeaders ) {
1333
- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), is (id ));
1334
- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is (isRunning ? "?1" : "?0" ));
1335
- }
1336
-
1337
1351
// issue a second request to "async get" the results
1338
1352
Request getRequest = prepareAsyncGetRequest (id );
1339
1353
getRequest .setOptions (request .getOptions ());
@@ -1343,6 +1357,11 @@ public static Map<String, Object> runEsqlAsync(
1343
1357
1344
1358
var result = entityToMap (entity , requestObject .contentType ());
1345
1359
1360
+ // Check headers on get call
1361
+ if (supportsAsyncHeaders ) {
1362
+ assertAsyncHeaders (response , id , (boolean ) result .get ("is_running" ));
1363
+ }
1364
+
1346
1365
// assert initial contents, if any, are the same as async get contents
1347
1366
if (initialColumns != null ) {
1348
1367
if (supportsSuggestedCast == false ) {
@@ -1361,6 +1380,26 @@ public static Map<String, Object> runEsqlAsync(
1361
1380
return removeAsyncProperties (result );
1362
1381
}
1363
1382
1383
+ record CapabilitesCacheKey (RestClient client , List <String > capabilities ) {}
1384
+
1385
+ /**
1386
+ * Cache of capabilities.
1387
+ */
1388
+ private static final ConcurrentMap <CapabilitesCacheKey , Boolean > capabilities = new ConcurrentHashMap <>();
1389
+
1390
+ public static boolean hasCapabilities (RestClient client , List <String > requiredCapabilities ) {
1391
+ if (requiredCapabilities .isEmpty ()) {
1392
+ return true ;
1393
+ }
1394
+ return capabilities .computeIfAbsent (new CapabilitesCacheKey (client , requiredCapabilities ), r -> {
1395
+ try {
1396
+ return clusterHasCapability (client , "POST" , "/_query" , List .of (), requiredCapabilities ).orElse (false );
1397
+ } catch (IOException e ) {
1398
+ throw new UncheckedIOException (e );
1399
+ }
1400
+ });
1401
+ }
1402
+
1364
1403
private static Object removeOriginalTypesAndSuggestedCast (Object response ) {
1365
1404
if (response instanceof ArrayList <?> columns ) {
1366
1405
var newColumns = new ArrayList <>();
@@ -1589,7 +1628,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
1589
1628
}
1590
1629
1591
1630
Response response = performRequest (request );
1592
- HttpEntity entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1631
+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1632
+ HttpEntity entity = response .getEntity ();
1593
1633
1594
1634
// get the content, it could be empty because the request might have not completed
1595
1635
String initialValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
@@ -1642,7 +1682,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
1642
1682
// if `addParam` is false, `options` will already have an `Accept` header
1643
1683
getRequest .setOptions (options );
1644
1684
response = performRequest (getRequest );
1645
- entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1685
+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1686
+ entity = response .getEntity ();
1646
1687
}
1647
1688
String newValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
1648
1689
@@ -1681,10 +1722,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
1681
1722
return mediaType ;
1682
1723
}
1683
1724
1684
- private static HttpEntity performRequest (Request request , AssertWarnings assertWarnings ) throws IOException {
1685
- return assertWarnings (performRequest (request ), assertWarnings );
1686
- }
1687
-
1688
1725
protected static Response performRequest (Request request ) throws IOException {
1689
1726
Response response = client ().performRequest (request );
1690
1727
if (shouldLog ()) {
@@ -1695,14 +1732,19 @@ protected static Response performRequest(Request request) throws IOException {
1695
1732
return response ;
1696
1733
}
1697
1734
1698
- private static HttpEntity assertWarnings (Response response , AssertWarnings assertWarnings ) {
1735
+ static void assertNotPartial (Map <String , Object > answer ) {
1736
+ var clusters = answer .get ("_clusters" );
1737
+ var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "" );
1738
+ assertThat (reason , answer .get ("is_partial" ), anyOf (nullValue (), is (false )));
1739
+ }
1740
+
1741
+ private static void assertWarnings (Response response , AssertWarnings assertWarnings ) {
1699
1742
List <String > warnings = new ArrayList <>(response .getWarnings ());
1700
1743
warnings .removeAll (mutedWarnings ());
1701
1744
if (shouldLog ()) {
1702
1745
LOGGER .info ("RESPONSE warnings (after muted)={}" , warnings );
1703
1746
}
1704
1747
assertWarnings .assertWarnings (warnings );
1705
- return response .getEntity ();
1706
1748
}
1707
1749
1708
1750
private static Set <String > mutedWarnings () {
@@ -1813,6 +1855,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
1813
1855
assertEquals (200 , client ().performRequest (request ).getStatusLine ().getStatusCode ());
1814
1856
}
1815
1857
1858
+ private static void assertAsyncHeaders (Response response , @ Nullable String asyncId , boolean isRunning ) {
1859
+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), asyncId == null ? nullValue () : equalTo (asyncId ));
1860
+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), isRunning ? is ("?1" ) : is ("?0" ));
1861
+ }
1862
+
1863
+ private static void assertNoAsyncHeaders (Response response ) {
1864
+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1865
+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), nullValue ());
1866
+ }
1867
+
1816
1868
public static RequestObjectBuilder requestObjectBuilder () throws IOException {
1817
1869
return new RequestObjectBuilder ();
1818
1870
}
0 commit comments