16
16
import org .elasticsearch .client .RequestOptions ;
17
17
import org .elasticsearch .client .Response ;
18
18
import org .elasticsearch .client .ResponseException ;
19
+ import org .elasticsearch .client .RestClient ;
19
20
import org .elasticsearch .client .WarningsHandler ;
20
21
import org .elasticsearch .common .bytes .BytesArray ;
21
22
import org .elasticsearch .common .io .Streams ;
22
23
import org .elasticsearch .common .settings .Settings ;
23
- import org .elasticsearch .common .xcontent .XContentHelper ;
24
24
import org .elasticsearch .core .CheckedConsumer ;
25
25
import org .elasticsearch .core .Nullable ;
26
26
import org .elasticsearch .core .TimeValue ;
40
40
41
41
import java .io .ByteArrayOutputStream ;
42
42
import java .io .IOException ;
43
- import java .io .InputStream ;
44
43
import java .io .InputStreamReader ;
45
44
import java .io .OutputStream ;
45
+ import java .io .UncheckedIOException ;
46
46
import java .nio .charset .StandardCharsets ;
47
47
import java .time .ZoneId ;
48
48
import java .util .ArrayList ;
53
53
import java .util .Locale ;
54
54
import java .util .Map ;
55
55
import java .util .Set ;
56
+ import java .util .concurrent .ConcurrentHashMap ;
57
+ import java .util .concurrent .ConcurrentMap ;
56
58
import java .util .function .IntFunction ;
57
59
58
60
import static java .util .Collections .emptySet ;
62
64
import static org .elasticsearch .test .MapMatcher .assertMap ;
63
65
import static org .elasticsearch .test .MapMatcher .matchesMap ;
64
66
import static org .elasticsearch .xpack .esql .EsqlTestUtils .as ;
65
- import static org .elasticsearch .xpack .esql .qa .rest .EsqlSpecTestCase .assertNotPartial ;
66
67
import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .ASYNC ;
67
68
import static org .elasticsearch .xpack .esql .qa .rest .RestEsqlTestCase .Mode .SYNC ;
68
69
import static org .elasticsearch .xpack .esql .type .EsqlDataTypeConverter .dateTimeToString ;
69
70
import static org .hamcrest .Matchers .any ;
71
+ import static org .hamcrest .Matchers .anyOf ;
70
72
import static org .hamcrest .Matchers .containsString ;
71
73
import static org .hamcrest .Matchers .either ;
72
74
import static org .hamcrest .Matchers .emptyOrNullString ;
@@ -390,7 +392,9 @@ public void testCSVNoHeaderMode() throws IOException {
390
392
options .addHeader ("Content-Type" , mediaType );
391
393
options .addHeader ("Accept" , "text/csv; header=absent" );
392
394
request .setOptions (options );
393
- HttpEntity entity = performRequest (request , new AssertWarnings .NoWarnings ());
395
+ Response response = performRequest (request );
396
+ assertWarnings (response , new AssertWarnings .NoWarnings ());
397
+ HttpEntity entity = response .getEntity ();
394
398
String actual = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
395
399
assertEquals ("keyword0,0\r \n " , actual );
396
400
}
@@ -1252,7 +1256,10 @@ public static Map<String, Object> runEsql(
1252
1256
var results = mode == ASYNC
1253
1257
? runEsqlAsync (requestObject , randomBoolean (), assertWarnings )
1254
1258
: runEsqlSync (requestObject , assertWarnings );
1255
- return checkPartialResults ? assertNotPartial (results ) : results ;
1259
+ if (checkPartialResults ) {
1260
+ assertNotPartial (results );
1261
+ }
1262
+ return results ;
1256
1263
}
1257
1264
1258
1265
public static Map <String , Object > runEsql (RequestObjectBuilder requestObject , AssertWarnings assertWarnings , Mode mode )
@@ -1263,8 +1270,17 @@ public static Map<String, Object> runEsql(RequestObjectBuilder requestObject, As
1263
1270
public static Map <String , Object > runEsqlSync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
1264
1271
Request request = prepareRequestWithOptions (requestObject , SYNC );
1265
1272
1266
- HttpEntity entity = performRequest (request , assertWarnings );
1267
- return entityToMap (entity , requestObject .contentType ());
1273
+ Response response = performRequest (request );
1274
+ HttpEntity entity = response .getEntity ();
1275
+ Map <String , Object > json = entityToMap (entity , requestObject .contentType ());
1276
+
1277
+ var supportsAsyncHeadersFix = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1278
+ if (supportsAsyncHeadersFix ) {
1279
+ assertNoAsyncHeaders (response );
1280
+ }
1281
+ assertWarnings (response , assertWarnings );
1282
+
1283
+ return json ;
1268
1284
}
1269
1285
1270
1286
public static Map <String , Object > runEsqlAsync (RequestObjectBuilder requestObject , AssertWarnings assertWarnings ) throws IOException {
@@ -1292,16 +1308,18 @@ public static Map<String, Object> runEsqlAsync(
1292
1308
checkKeepOnCompletion (requestObject , json , keepOnCompletion );
1293
1309
String id = (String ) json .get ("id" );
1294
1310
1295
- var supportsAsyncHeaders = clusterHasCapability ("POST" , "/_query" , List .of (), List .of ("async_query_status_headers" )).orElse (false );
1311
+ var supportsAsyncHeaders = hasCapabilities (adminClient (), List .of ("async_query_status_headers_fix" ));
1312
+ var supportsSuggestedCast = hasCapabilities (adminClient (), List .of ("suggested_cast" ));
1313
+
1314
+ // Check headers on initial query call
1315
+ if (supportsAsyncHeaders ) {
1316
+ assertAsyncHeaders (response , id , (boolean ) json .get ("is_running" ));
1317
+ }
1296
1318
1297
1319
if (id == null ) {
1298
1320
// no id returned from an async call, must have completed immediately and without keep_on_completion
1299
1321
assertThat (requestObject .keepOnCompletion (), either (nullValue ()).or (is (false )));
1300
1322
assertThat ((boolean ) json .get ("is_running" ), is (false ));
1301
- if (supportsAsyncHeaders ) {
1302
- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1303
- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is ("?0" ));
1304
- }
1305
1323
assertWarnings (response , assertWarnings );
1306
1324
json .remove ("is_running" ); // remove this to not mess up later map assertions
1307
1325
return Collections .unmodifiableMap (json );
@@ -1322,11 +1340,6 @@ public static Map<String, Object> runEsqlAsync(
1322
1340
assertThat (json .get ("pages" ), nullValue ());
1323
1341
}
1324
1342
1325
- if (supportsAsyncHeaders ) {
1326
- assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), is (id ));
1327
- assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), is (isRunning ? "?1" : "?0" ));
1328
- }
1329
-
1330
1343
// issue a second request to "async get" the results
1331
1344
Request getRequest = prepareAsyncGetRequest (id );
1332
1345
getRequest .setOptions (request .getOptions ());
@@ -1336,6 +1349,11 @@ public static Map<String, Object> runEsqlAsync(
1336
1349
1337
1350
var result = entityToMap (entity , requestObject .contentType ());
1338
1351
1352
+ // Check headers on get call
1353
+ if (supportsAsyncHeaders ) {
1354
+ assertAsyncHeaders (response , id , (boolean ) result .get ("is_running" ));
1355
+ }
1356
+
1339
1357
// assert initial contents, if any, are the same as async get contents
1340
1358
if (initialColumns != null ) {
1341
1359
assertEquals (initialColumns , result .get ("columns" ));
@@ -1347,6 +1365,26 @@ public static Map<String, Object> runEsqlAsync(
1347
1365
return removeAsyncProperties (result );
1348
1366
}
1349
1367
1368
+ record CapabilitesCacheKey (RestClient client , List <String > capabilities ) {}
1369
+
1370
+ /**
1371
+ * Cache of capabilities.
1372
+ */
1373
+ private static final ConcurrentMap <CapabilitesCacheKey , Boolean > capabilities = new ConcurrentHashMap <>();
1374
+
1375
+ public static boolean hasCapabilities (RestClient client , List <String > requiredCapabilities ) {
1376
+ if (requiredCapabilities .isEmpty ()) {
1377
+ return true ;
1378
+ }
1379
+ return capabilities .computeIfAbsent (new CapabilitesCacheKey (client , requiredCapabilities ), r -> {
1380
+ try {
1381
+ return clusterHasCapability (client , "POST" , "/_query" , List .of (), requiredCapabilities ).orElse (false );
1382
+ } catch (IOException e ) {
1383
+ throw new UncheckedIOException (e );
1384
+ }
1385
+ });
1386
+ }
1387
+
1350
1388
public void testAsyncGetWithoutContentType () throws IOException {
1351
1389
int count = randomIntBetween (0 , 100 );
1352
1390
bulkLoadTestData (count );
@@ -1447,15 +1485,11 @@ static Map<String, Object> removeAsyncProperties(Map<String, Object> map) {
1447
1485
}
1448
1486
1449
1487
protected static Map <String , Object > entityToMap (HttpEntity entity , XContentType expectedContentType ) throws IOException {
1450
- try (InputStream content = entity .getContent ()) {
1451
- XContentType xContentType = XContentType .fromMediaType (entity .getContentType ().getValue ());
1452
- assertEquals (expectedContentType , xContentType );
1453
- var map = XContentHelper .convertToMap (xContentType .xContent (), content , false );
1454
- if (shouldLog ()) {
1455
- LOGGER .info ("entity={}" , map );
1456
- }
1457
- return map ;
1488
+ var result = EsqlTestUtils .entityToMap (entity , expectedContentType );
1489
+ if (shouldLog ()) {
1490
+ LOGGER .info ("entity={}" , result );
1458
1491
}
1492
+ return result ;
1459
1493
}
1460
1494
1461
1495
static void addAsyncParameters (RequestObjectBuilder requestObject , boolean keepOnCompletion ) throws IOException {
@@ -1520,7 +1554,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
1520
1554
}
1521
1555
1522
1556
Response response = performRequest (request );
1523
- HttpEntity entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1557
+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1558
+ HttpEntity entity = response .getEntity ();
1524
1559
1525
1560
// get the content, it could be empty because the request might have not completed
1526
1561
String initialValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
@@ -1573,7 +1608,8 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
1573
1608
// if `addParam` is false, `options` will already have an `Accept` header
1574
1609
getRequest .setOptions (options );
1575
1610
response = performRequest (getRequest );
1576
- entity = assertWarnings (response , new AssertWarnings .NoWarnings ());
1611
+ assertWarnings (response , new AssertWarnings .NoWarnings ());
1612
+ entity = response .getEntity ();
1577
1613
}
1578
1614
String newValue = Streams .copyToString (new InputStreamReader (entity .getContent (), StandardCharsets .UTF_8 ));
1579
1615
@@ -1587,21 +1623,18 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
1587
1623
}
1588
1624
1589
1625
private static Request prepareRequest (Mode mode ) {
1590
- Request request = new Request ("POST" , "/_query" + (mode == ASYNC ? "/async" : "" ));
1591
- request .addParameter ("error_trace" , "true" ); // Helps with debugging in case something crazy happens on the server.
1592
- request .addParameter ("pretty" , "true" ); // Improves error reporting readability
1593
- return request ;
1626
+ return finishRequest (new Request ("POST" , "/_query" + (mode == ASYNC ? "/async" : "" )));
1594
1627
}
1595
1628
1596
1629
private static Request prepareAsyncGetRequest (String id ) {
1597
- Request request = new Request ("GET" , "/_query/async/" + id + "?wait_for_completion_timeout=60s" );
1598
- request .addParameter ("error_trace" , "true" ); // Helps with debugging in case something crazy happens on the server.
1599
- request .addParameter ("pretty" , "true" ); // Improves error reporting readability
1600
- return request ;
1630
+ return finishRequest (new Request ("GET" , "/_query/async/" + id + "?wait_for_completion_timeout=6000s" ));
1601
1631
}
1602
1632
1603
1633
private static Request prepareAsyncDeleteRequest (String id ) {
1604
- Request request = new Request ("DELETE" , "/_query/async/" + id );
1634
+ return finishRequest (new Request ("DELETE" , "/_query/async/" + id ));
1635
+ }
1636
+
1637
+ private static Request finishRequest (Request request ) {
1605
1638
request .addParameter ("error_trace" , "true" ); // Helps with debugging in case something crazy happens on the server.
1606
1639
request .addParameter ("pretty" , "true" ); // Improves error reporting readability
1607
1640
return request ;
@@ -1615,10 +1648,6 @@ private static String attachBody(RequestObjectBuilder requestObject, Request req
1615
1648
return mediaType ;
1616
1649
}
1617
1650
1618
- private static HttpEntity performRequest (Request request , AssertWarnings assertWarnings ) throws IOException {
1619
- return assertWarnings (performRequest (request ), assertWarnings );
1620
- }
1621
-
1622
1651
protected static Response performRequest (Request request ) throws IOException {
1623
1652
Response response = client ().performRequest (request );
1624
1653
if (shouldLog ()) {
@@ -1629,14 +1658,19 @@ protected static Response performRequest(Request request) throws IOException {
1629
1658
return response ;
1630
1659
}
1631
1660
1632
- private static HttpEntity assertWarnings (Response response , AssertWarnings assertWarnings ) {
1661
+ static void assertNotPartial (Map <String , Object > answer ) {
1662
+ var clusters = answer .get ("_clusters" );
1663
+ var reason = "unexpected partial results" + (clusters != null ? ": _clusters=" + clusters : "" );
1664
+ assertThat (reason , answer .get ("is_partial" ), anyOf (nullValue (), is (false )));
1665
+ }
1666
+
1667
+ private static void assertWarnings (Response response , AssertWarnings assertWarnings ) {
1633
1668
List <String > warnings = new ArrayList <>(response .getWarnings ());
1634
1669
warnings .removeAll (mutedWarnings ());
1635
1670
if (shouldLog ()) {
1636
1671
LOGGER .info ("RESPONSE warnings (after muted)={}" , warnings );
1637
1672
}
1638
1673
assertWarnings .assertWarnings (warnings );
1639
- return response .getEntity ();
1640
1674
}
1641
1675
1642
1676
private static Set <String > mutedWarnings () {
@@ -1747,6 +1781,16 @@ private static void createIndex(String indexName, boolean lookupMode, String map
1747
1781
assertEquals (200 , client ().performRequest (request ).getStatusLine ().getStatusCode ());
1748
1782
}
1749
1783
1784
+ private static void assertAsyncHeaders (Response response , @ Nullable String asyncId , boolean isRunning ) {
1785
+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), asyncId == null ? nullValue () : equalTo (asyncId ));
1786
+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), isRunning ? is ("?1" ) : is ("?0" ));
1787
+ }
1788
+
1789
+ private static void assertNoAsyncHeaders (Response response ) {
1790
+ assertThat (response .getHeader ("X-Elasticsearch-Async-Id" ), nullValue ());
1791
+ assertThat (response .getHeader ("X-Elasticsearch-Async-Is-Running" ), nullValue ());
1792
+ }
1793
+
1750
1794
public static RequestObjectBuilder requestObjectBuilder () throws IOException {
1751
1795
return new RequestObjectBuilder ();
1752
1796
}
0 commit comments