Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
Expand All @@ -30,6 +29,7 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -48,7 +48,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
* Make sure that we don't send data-node requests to the target shards which won't match the query
*/
public void testCanMatch() {
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("events_2022")
Expand All @@ -60,9 +60,7 @@ public void testCanMatch() {
.add(new IndexRequest().source("@timestamp", "2022-05-02", "uid", "u1"))
.add(new IndexRequest().source("@timestamp", "2022-12-15", "uid", "u1"))
.get();
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword")
);
assertAcked(client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword"));
client().prepareBulk("events_2023")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest().source("@timestamp", "2023-01-15", "uid", "u2"))
Expand All @@ -72,15 +70,17 @@ public void testCanMatch() {
.get();
try {
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService transportService = (MockTransportService) ts;
transportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
for (var transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ComputeService.DATA_ACTION_NAME,
(handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
}
handler.messageReceived(request, channel, task);
});
);
}
try (EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gte("2023-01-01"))) {
assertThat(getValuesList(resp), hasSize(4));
Expand Down Expand Up @@ -118,14 +118,14 @@ public void testCanMatch() {
queriedIndices.clear();
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
((MockTransportService) ts).clearAllRules();
for (var transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).clearAllRules();
}
}
}

public void testAliasFilters() {
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("employees")
Expand All @@ -141,7 +141,7 @@ public void testAliasFilters() {
.add(new IndexRequest().source("emp_no", 106, "dept", "sales", "hired", "2012-08-09", "salary", 30.1))
.get();

ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
Expand Down Expand Up @@ -209,11 +209,10 @@ public void testAliasFilters() {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103749")
public void testFailOnUnavailableShards() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
String logsOnlyNode = internalCluster().startDataOnlyNode();
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("events")
Expand All @@ -230,7 +229,7 @@ public void testFailOnUnavailableShards() throws Exception {
.add(new IndexRequest().source("timestamp", 2, "message", "b"))
.add(new IndexRequest().source("timestamp", 3, "message", "c"))
.get();
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("logs")
Expand All @@ -246,13 +245,17 @@ public void testFailOnUnavailableShards() throws Exception {
.add(new IndexRequest().source("timestamp", 10, "message", "aa"))
.add(new IndexRequest().source("timestamp", 11, "message", "bb"))
.get();

// when all shards available
try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message")) {
assertThat(getValuesList(resp), hasSize(5));
internalCluster().stopNode(logsOnlyNode);
ensureClusterSizeConsistency();
Exception error = expectThrows(Exception.class, () -> run("from events,logs | KEEP timestamp,message"));
assertThat(error.getMessage(), containsString("no shard copies found"));
}

internalCluster().stopNode(logsOnlyNode);
ensureClusterSizeConsistency();

// when one shard is unavailable
expectThrows(Exception.class, containsString("no shard copies found"), () -> run("from events,logs | KEEP timestamp,message"));
}

public void testSkipOnIndexName() {
Expand All @@ -261,9 +264,7 @@ public void testSkipOnIndexName() {
Map<String, Integer> indexToNumDocs = new HashMap<>();
for (int i = 0; i < numIndices; i++) {
String index = "events-" + i;
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
);
assertAcked(client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword"));
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int docs = between(1, 5);
long timestamp = 1;
Expand All @@ -274,15 +275,17 @@ public void testSkipOnIndexName() {
indexToNumDocs.put(index, docs);
}
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
for (var transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ComputeService.DATA_ACTION_NAME,
(handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
}
handler.messageReceived(request, channel, task);
});
);
}
try {
for (int i = 0; i < numIndices; i++) {
Expand All @@ -294,9 +297,8 @@ public void testSkipOnIndexName() {
assertThat(queriedIndices, equalTo(Set.of(index)));
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.clearAllRules();
for (var transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).clearAllRules();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,6 @@ void searchShards(
Map<ShardId, TargetShard> shards = new HashMap<>();
for (SearchShardsGroup group : resp.getGroups()) {
var shardId = group.shardId();
if (concreteIndices.contains(shardId.getIndexName()) == false) {
continue;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition was causing testFailOnUnavailableShards failure as concreteIndices did not contain unavailable index. It is present in one of the groups with empty allocatedNodes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not remove this. If a new index is created between field-caps and search_shards with conflicting mappings, the query can fail.

totalShards++;
if (group.skipped()) {
skippedShards++;
Expand Down