From 1aad0159672c947c229e52f03822a95a157bc9e1 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 17 Mar 2025 13:58:36 +0100 Subject: [PATCH] Enable and fix testFailOnUnavailableShards --- .../xpack/esql/plugin/CanMatchIT.java | 78 ++++++++++--------- .../esql/plugin/DataNodeRequestSender.java | 3 - 2 files changed, 40 insertions(+), 41 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java index 3db862684aca3..f22946f74bf29 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/plugin/CanMatchIT.java @@ -18,7 +18,6 @@ import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.hamcrest.ElasticsearchAssertions; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase; @@ -30,6 +29,7 @@ import java.util.Map; import java.util.Set; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.EsqlTestUtils.as; import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList; import static org.hamcrest.Matchers.containsString; @@ -48,7 +48,7 @@ protected Collection> nodePlugins() { * Make sure that we don't send data-node requests to the target shards which won't match the query */ public void testCanMatch() { - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareCreate("events_2022") @@ -60,9 +60,7 @@ public void testCanMatch() { .add(new IndexRequest().source("@timestamp", "2022-05-02", "uid", "u1")) .add(new IndexRequest().source("@timestamp", "2022-12-15", "uid", "u1")) .get(); - ElasticsearchAssertions.assertAcked( - client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword") - ); + assertAcked(client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword")); client().prepareBulk("events_2023") .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .add(new IndexRequest().source("@timestamp", "2023-01-15", "uid", "u2")) @@ -72,15 +70,17 @@ public void testCanMatch() { .get(); try { Set queriedIndices = ConcurrentCollections.newConcurrentSet(); - for (TransportService ts : internalCluster().getInstances(TransportService.class)) { - MockTransportService transportService = (MockTransportService) ts; - transportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> { - DataNodeRequest dataNodeRequest = (DataNodeRequest) request; - for (ShardId shardId : dataNodeRequest.shardIds()) { - queriedIndices.add(shardId.getIndexName()); + for (var transportService : internalCluster().getInstances(TransportService.class)) { + as(transportService, MockTransportService.class).addRequestHandlingBehavior( + ComputeService.DATA_ACTION_NAME, + (handler, request, channel, task) -> { + DataNodeRequest dataNodeRequest = (DataNodeRequest) request; + for (ShardId shardId : dataNodeRequest.shardIds()) { + queriedIndices.add(shardId.getIndexName()); + } + handler.messageReceived(request, channel, task); } - handler.messageReceived(request, channel, task); - }); + ); } try (EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gte("2023-01-01"))) { assertThat(getValuesList(resp), hasSize(4)); @@ -118,14 +118,14 @@ public void testCanMatch() { queriedIndices.clear(); } } finally { - for (TransportService ts : internalCluster().getInstances(TransportService.class)) { - ((MockTransportService) ts).clearAllRules(); + for (var transportService : internalCluster().getInstances(TransportService.class)) { + as(transportService, MockTransportService.class).clearAllRules(); } } } public void testAliasFilters() { - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareCreate("employees") @@ -141,7 +141,7 @@ public void testAliasFilters() { .add(new IndexRequest().source("emp_no", 106, "dept", "sales", "hired", "2012-08-09", "salary", 30.1)) .get(); - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT) @@ -209,11 +209,10 @@ public void testAliasFilters() { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103749") public void testFailOnUnavailableShards() throws Exception { internalCluster().ensureAtLeastNumDataNodes(2); String logsOnlyNode = internalCluster().startDataOnlyNode(); - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareCreate("events") @@ -230,7 +229,7 @@ public void testFailOnUnavailableShards() throws Exception { .add(new IndexRequest().source("timestamp", 2, "message", "b")) .add(new IndexRequest().source("timestamp", 3, "message", "c")) .get(); - ElasticsearchAssertions.assertAcked( + assertAcked( client().admin() .indices() .prepareCreate("logs") @@ -246,13 +245,17 @@ public void testFailOnUnavailableShards() throws Exception { .add(new IndexRequest().source("timestamp", 10, "message", "aa")) .add(new IndexRequest().source("timestamp", 11, "message", "bb")) .get(); + + // when all shards available try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message")) { assertThat(getValuesList(resp), hasSize(5)); - internalCluster().stopNode(logsOnlyNode); - ensureClusterSizeConsistency(); - Exception error = expectThrows(Exception.class, () -> run("from events,logs | KEEP timestamp,message")); - assertThat(error.getMessage(), containsString("no shard copies found")); } + + internalCluster().stopNode(logsOnlyNode); + ensureClusterSizeConsistency(); + + // when one shard is unavailable + expectThrows(Exception.class, containsString("no shard copies found"), () -> run("from events,logs | KEEP timestamp,message")); } public void testSkipOnIndexName() { @@ -261,9 +264,7 @@ public void testSkipOnIndexName() { Map indexToNumDocs = new HashMap<>(); for (int i = 0; i < numIndices; i++) { String index = "events-" + i; - ElasticsearchAssertions.assertAcked( - client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword") - ); + assertAcked(client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")); BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); int docs = between(1, 5); long timestamp = 1; @@ -274,15 +275,17 @@ public void testSkipOnIndexName() { indexToNumDocs.put(index, docs); } Set queriedIndices = ConcurrentCollections.newConcurrentSet(); - for (TransportService ts : internalCluster().getInstances(TransportService.class)) { - MockTransportService mockTransportService = as(ts, MockTransportService.class); - mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> { - DataNodeRequest dataNodeRequest = (DataNodeRequest) request; - for (ShardId shardId : dataNodeRequest.shardIds()) { - queriedIndices.add(shardId.getIndexName()); + for (var transportService : internalCluster().getInstances(TransportService.class)) { + as(transportService, MockTransportService.class).addRequestHandlingBehavior( + ComputeService.DATA_ACTION_NAME, + (handler, request, channel, task) -> { + DataNodeRequest dataNodeRequest = (DataNodeRequest) request; + for (ShardId shardId : dataNodeRequest.shardIds()) { + queriedIndices.add(shardId.getIndexName()); + } + handler.messageReceived(request, channel, task); } - handler.messageReceived(request, channel, task); - }); + ); } try { for (int i = 0; i < numIndices; i++) { @@ -294,9 +297,8 @@ public void testSkipOnIndexName() { assertThat(queriedIndices, equalTo(Set.of(index))); } } finally { - for (TransportService ts : internalCluster().getInstances(TransportService.class)) { - MockTransportService mockTransportService = as(ts, MockTransportService.class); - mockTransportService.clearAllRules(); + for (var transportService : internalCluster().getInstances(TransportService.class)) { + as(transportService, MockTransportService.class).clearAllRules(); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java index 7abc0ba40af76..a6c4f59b4fc99 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java @@ -429,9 +429,6 @@ void searchShards( Map shards = new HashMap<>(); for (SearchShardsGroup group : resp.getGroups()) { var shardId = group.shardId(); - if (concreteIndices.contains(shardId.getIndexName()) == false) { - continue; - } totalShards++; if (group.skipped()) { skippedShards++;