1818import org .elasticsearch .index .query .RangeQueryBuilder ;
1919import org .elasticsearch .index .shard .ShardId ;
2020import org .elasticsearch .plugins .Plugin ;
21- import org .elasticsearch .test .hamcrest .ElasticsearchAssertions ;
2221import org .elasticsearch .test .transport .MockTransportService ;
2322import org .elasticsearch .transport .TransportService ;
2423import org .elasticsearch .xpack .esql .action .AbstractEsqlIntegTestCase ;
3029import java .util .Map ;
3130import java .util .Set ;
3231
32+ import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
3333import static org .elasticsearch .xpack .esql .EsqlTestUtils .as ;
3434import static org .elasticsearch .xpack .esql .EsqlTestUtils .getValuesList ;
3535import static org .hamcrest .Matchers .containsString ;
@@ -48,7 +48,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
4848 * Make sure that we don't send data-node requests to the target shards which won't match the query
4949 */
5050 public void testCanMatch () {
51- ElasticsearchAssertions . assertAcked (
51+ assertAcked (
5252 client ().admin ()
5353 .indices ()
5454 .prepareCreate ("events_2022" )
@@ -60,9 +60,7 @@ public void testCanMatch() {
6060 .add (new IndexRequest ().source ("@timestamp" , "2022-05-02" , "uid" , "u1" ))
6161 .add (new IndexRequest ().source ("@timestamp" , "2022-12-15" , "uid" , "u1" ))
6262 .get ();
63- ElasticsearchAssertions .assertAcked (
64- client ().admin ().indices ().prepareCreate ("events_2023" ).setMapping ("@timestamp" , "type=date" , "uid" , "type=keyword" )
65- );
63+ assertAcked (client ().admin ().indices ().prepareCreate ("events_2023" ).setMapping ("@timestamp" , "type=date" , "uid" , "type=keyword" ));
6664 client ().prepareBulk ("events_2023" )
6765 .setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE )
6866 .add (new IndexRequest ().source ("@timestamp" , "2023-01-15" , "uid" , "u2" ))
@@ -72,15 +70,17 @@ public void testCanMatch() {
7270 .get ();
7371 try {
7472 Set <String > queriedIndices = ConcurrentCollections .newConcurrentSet ();
75- for (TransportService ts : internalCluster ().getInstances (TransportService .class )) {
76- MockTransportService transportService = (MockTransportService ) ts ;
77- transportService .addRequestHandlingBehavior (ComputeService .DATA_ACTION_NAME , (handler , request , channel , task ) -> {
78- DataNodeRequest dataNodeRequest = (DataNodeRequest ) request ;
79- for (ShardId shardId : dataNodeRequest .shardIds ()) {
80- queriedIndices .add (shardId .getIndexName ());
73+ for (TransportService transportService : internalCluster ().getInstances (TransportService .class )) {
74+ as (transportService , MockTransportService .class ).addRequestHandlingBehavior (
75+ ComputeService .DATA_ACTION_NAME ,
76+ (handler , request , channel , task ) -> {
77+ DataNodeRequest dataNodeRequest = (DataNodeRequest ) request ;
78+ for (ShardId shardId : dataNodeRequest .shardIds ()) {
79+ queriedIndices .add (shardId .getIndexName ());
80+ }
81+ handler .messageReceived (request , channel , task );
8182 }
82- handler .messageReceived (request , channel , task );
83- });
83+ );
8484 }
8585 try (EsqlQueryResponse resp = run ("from events_*" , randomPragmas (), new RangeQueryBuilder ("@timestamp" ).gte ("2023-01-01" ))) {
8686 assertThat (getValuesList (resp ), hasSize (4 ));
@@ -118,14 +118,14 @@ public void testCanMatch() {
118118 queriedIndices .clear ();
119119 }
120120 } finally {
121- for (TransportService ts : internalCluster ().getInstances (TransportService .class )) {
122- (( MockTransportService ) ts ).clearAllRules ();
121+ for (TransportService transportService : internalCluster ().getInstances (TransportService .class )) {
122+ as ( transportService , MockTransportService . class ).clearAllRules ();
123123 }
124124 }
125125 }
126126
127127 public void testAliasFilters () {
128- ElasticsearchAssertions . assertAcked (
128+ assertAcked (
129129 client ().admin ()
130130 .indices ()
131131 .prepareCreate ("employees" )
@@ -141,7 +141,7 @@ public void testAliasFilters() {
141141 .add (new IndexRequest ().source ("emp_no" , 106 , "dept" , "sales" , "hired" , "2012-08-09" , "salary" , 30.1 ))
142142 .get ();
143143
144- ElasticsearchAssertions . assertAcked (
144+ assertAcked (
145145 client ().admin ()
146146 .indices ()
147147 .prepareAliases (TEST_REQUEST_TIMEOUT , TEST_REQUEST_TIMEOUT )
@@ -209,11 +209,10 @@ public void testAliasFilters() {
209209 }
210210 }
211211
212- @ AwaitsFix (bugUrl = "https://github.com/elastic/elasticsearch/issues/103749" )
213212 public void testFailOnUnavailableShards () throws Exception {
214213 internalCluster ().ensureAtLeastNumDataNodes (2 );
215214 String logsOnlyNode = internalCluster ().startDataOnlyNode ();
216- ElasticsearchAssertions . assertAcked (
215+ assertAcked (
217216 client ().admin ()
218217 .indices ()
219218 .prepareCreate ("events" )
@@ -230,7 +229,7 @@ public void testFailOnUnavailableShards() throws Exception {
230229 .add (new IndexRequest ().source ("timestamp" , 2 , "message" , "b" ))
231230 .add (new IndexRequest ().source ("timestamp" , 3 , "message" , "c" ))
232231 .get ();
233- ElasticsearchAssertions . assertAcked (
232+ assertAcked (
234233 client ().admin ()
235234 .indices ()
236235 .prepareCreate ("logs" )
@@ -246,13 +245,17 @@ public void testFailOnUnavailableShards() throws Exception {
246245 .add (new IndexRequest ().source ("timestamp" , 10 , "message" , "aa" ))
247246 .add (new IndexRequest ().source ("timestamp" , 11 , "message" , "bb" ))
248247 .get ();
248+
249+ // when all shards available
249250 try (EsqlQueryResponse resp = run ("from events,logs | KEEP timestamp,message" )) {
250251 assertThat (getValuesList (resp ), hasSize (5 ));
251- internalCluster ().stopNode (logsOnlyNode );
252- ensureClusterSizeConsistency ();
253- Exception error = expectThrows (Exception .class , () -> run ("from events,logs | KEEP timestamp,message" ));
254- assertThat (error .getMessage (), containsString ("no shard copies found" ));
255252 }
253+ internalCluster ().stopNode (logsOnlyNode );
254+ ensureClusterSizeConsistency ();
255+
256+ // when one shard is unavailable
257+ expectThrows (Exception .class , containsString ("no shard copies found" ), () -> run ("from events,logs | KEEP timestamp,message" ));
258+ expectThrows (Exception .class , containsString ("no shard copies found" ), () -> run ("from * | KEEP timestamp,message" ));
256259 }
257260
258261 public void testSkipOnIndexName () {
@@ -261,9 +264,7 @@ public void testSkipOnIndexName() {
261264 Map <String , Integer > indexToNumDocs = new HashMap <>();
262265 for (int i = 0 ; i < numIndices ; i ++) {
263266 String index = "events-" + i ;
264- ElasticsearchAssertions .assertAcked (
265- client ().admin ().indices ().prepareCreate (index ).setMapping ("timestamp" , "type=long" , "message" , "type=keyword" )
266- );
267+ assertAcked (client ().admin ().indices ().prepareCreate (index ).setMapping ("timestamp" , "type=long" , "message" , "type=keyword" ));
267268 BulkRequestBuilder bulk = client ().prepareBulk (index ).setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
268269 int docs = between (1 , 5 );
269270 long timestamp = 1 ;
@@ -274,15 +275,17 @@ public void testSkipOnIndexName() {
274275 indexToNumDocs .put (index , docs );
275276 }
276277 Set <String > queriedIndices = ConcurrentCollections .newConcurrentSet ();
277- for (TransportService ts : internalCluster ().getInstances (TransportService .class )) {
278- MockTransportService mockTransportService = as (ts , MockTransportService .class );
279- mockTransportService .addRequestHandlingBehavior (ComputeService .DATA_ACTION_NAME , (handler , request , channel , task ) -> {
280- DataNodeRequest dataNodeRequest = (DataNodeRequest ) request ;
281- for (ShardId shardId : dataNodeRequest .shardIds ()) {
282- queriedIndices .add (shardId .getIndexName ());
278+ for (TransportService transportService : internalCluster ().getInstances (TransportService .class )) {
279+ as (transportService , MockTransportService .class ).addRequestHandlingBehavior (
280+ ComputeService .DATA_ACTION_NAME ,
281+ (handler , request , channel , task ) -> {
282+ DataNodeRequest dataNodeRequest = (DataNodeRequest ) request ;
283+ for (ShardId shardId : dataNodeRequest .shardIds ()) {
284+ queriedIndices .add (shardId .getIndexName ());
285+ }
286+ handler .messageReceived (request , channel , task );
283287 }
284- handler .messageReceived (request , channel , task );
285- });
288+ );
286289 }
287290 try {
288291 for (int i = 0 ; i < numIndices ; i ++) {
@@ -294,9 +297,8 @@ public void testSkipOnIndexName() {
294297 assertThat (queriedIndices , equalTo (Set .of (index )));
295298 }
296299 } finally {
297- for (TransportService ts : internalCluster ().getInstances (TransportService .class )) {
298- MockTransportService mockTransportService = as (ts , MockTransportService .class );
299- mockTransportService .clearAllRules ();
300+ for (TransportService transportService : internalCluster ().getInstances (TransportService .class )) {
301+ as (transportService , MockTransportService .class ).clearAllRules ();
300302 }
301303 }
302304 }
0 commit comments