Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ protected final EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas)
}

protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter) {
return run(esqlCommands, pragmas, filter, null);
}

protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter, Boolean allowPartialResults) {
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query(esqlCommands);
if (pragmas != null) {
Expand All @@ -174,6 +178,9 @@ protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, Query
if (filter != null) {
request.filter(filter);
}
if (allowPartialResults != null) {
request.allowPartialResults(allowPartialResults);
}
return run(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
Expand All @@ -30,6 +29,7 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -48,7 +48,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
* Make sure that we don't send data-node requests to the target shards which won't match the query
*/
public void testCanMatch() {
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("events_2022")
Expand All @@ -60,9 +60,7 @@ public void testCanMatch() {
.add(new IndexRequest().source("@timestamp", "2022-05-02", "uid", "u1"))
.add(new IndexRequest().source("@timestamp", "2022-12-15", "uid", "u1"))
.get();
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword")
);
assertAcked(client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword"));
client().prepareBulk("events_2023")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest().source("@timestamp", "2023-01-15", "uid", "u2"))
Expand All @@ -72,15 +70,17 @@ public void testCanMatch() {
.get();
try {
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService transportService = (MockTransportService) ts;
transportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ComputeService.DATA_ACTION_NAME,
(handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
}
handler.messageReceived(request, channel, task);
});
);
}
try (EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gte("2023-01-01"))) {
assertThat(getValuesList(resp), hasSize(4));
Expand Down Expand Up @@ -118,14 +118,14 @@ public void testCanMatch() {
queriedIndices.clear();
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
((MockTransportService) ts).clearAllRules();
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).clearAllRules();
}
}
}

public void testAliasFilters() {
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("employees")
Expand All @@ -141,7 +141,7 @@ public void testAliasFilters() {
.add(new IndexRequest().source("emp_no", 106, "dept", "sales", "hired", "2012-08-09", "salary", 30.1))
.get();

ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareAliases()
Expand Down Expand Up @@ -209,11 +209,10 @@ public void testAliasFilters() {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103749")
public void testFailOnUnavailableShards() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
String logsOnlyNode = internalCluster().startDataOnlyNode();
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("events")
Expand All @@ -230,7 +229,7 @@ public void testFailOnUnavailableShards() throws Exception {
.add(new IndexRequest().source("timestamp", 2, "message", "b"))
.add(new IndexRequest().source("timestamp", 3, "message", "c"))
.get();
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("logs")
Expand All @@ -246,12 +245,28 @@ public void testFailOnUnavailableShards() throws Exception {
.add(new IndexRequest().source("timestamp", 10, "message", "aa"))
.add(new IndexRequest().source("timestamp", 11, "message", "bb"))
.get();

// when all shards available
try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message")) {
assertThat(getValuesList(resp), hasSize(5));
internalCluster().stopNode(logsOnlyNode);
ensureClusterSizeConsistency();
Exception error = expectThrows(Exception.class, () -> run("from events,logs | KEEP timestamp,message"));
assertThat(error.getMessage(), containsString("no shard copies found"));
}

internalCluster().stopNode(logsOnlyNode);
ensureClusterSizeConsistency();

// when one shard is unavailable
expectThrows(
Exception.class,
containsString("index [logs] has no active shard copy"),
() -> run("from events,logs | KEEP timestamp,message")
);
expectThrows(
Exception.class,
containsString("index [logs] has no active shard copy"),
() -> run("from * | KEEP timestamp,message")
);
try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message", null, null, true)) {
assertThat(getValuesList(resp), hasSize(3));
}
}

Expand All @@ -261,9 +276,7 @@ public void testSkipOnIndexName() {
Map<String, Integer> indexToNumDocs = new HashMap<>();
for (int i = 0; i < numIndices; i++) {
String index = "events-" + i;
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
);
assertAcked(client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword"));
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int docs = between(1, 5);
long timestamp = 1;
Expand All @@ -274,15 +287,17 @@ public void testSkipOnIndexName() {
indexToNumDocs.put(index, docs);
}
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ComputeService.DATA_ACTION_NAME,
(handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
}
handler.messageReceived(request, channel, task);
});
);
}
try {
for (int i = 0; i < numIndices; i++) {
Expand All @@ -294,9 +309,8 @@ public void testSkipOnIndexName() {
assertThat(queriedIndices, equalTo(Set.of(index)));
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.clearAllRules();
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).clearAllRules();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
*/
package org.elasticsearch.xpack.esql.index;

import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.core.Nullable;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -19,30 +19,33 @@ public final class IndexResolution {
/**
* @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps.
* @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex).
* @param unavailableShards Set of shards that were unavailable during index resolution
* @param unavailableClusters Remote clusters that could not be contacted during planning
* @return valid IndexResolution
*/
public static IndexResolution valid(
EsIndex index,
Set<String> resolvedIndices,
Set<NoShardAvailableActionException> unavailableShards,
Map<String, FieldCapabilitiesFailure> unavailableClusters
) {
Objects.requireNonNull(index, "index must not be null if it was found");
Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null");
Objects.requireNonNull(unavailableShards, "unavailableShards must not be null");
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
return new IndexResolution(index, null, resolvedIndices, unavailableClusters);
return new IndexResolution(index, null, resolvedIndices, unavailableShards, unavailableClusters);
}

/**
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
*/
public static IndexResolution valid(EsIndex index) {
return valid(index, index.concreteIndices(), Collections.emptyMap());
return valid(index, index.concreteIndices(), Set.of(), Map.of());
}

public static IndexResolution invalid(String invalid) {
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
return new IndexResolution(null, invalid, Collections.emptySet(), Collections.emptyMap());
return new IndexResolution(null, invalid, Set.of(), Set.of(), Map.of());
}

public static IndexResolution notFound(String name) {
Expand All @@ -56,18 +59,21 @@ public static IndexResolution notFound(String name) {

// all indices found by field-caps
private final Set<String> resolvedIndices;
private final Set<NoShardAvailableActionException> unavailableShards;
// remote clusters included in the user's index expression that could not be connected to
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;

private IndexResolution(
EsIndex index,
@Nullable String invalid,
Set<String> resolvedIndices,
Set<NoShardAvailableActionException> unavailableShards,
Map<String, FieldCapabilitiesFailure> unavailableClusters
) {
this.index = index;
this.invalid = invalid;
this.resolvedIndices = resolvedIndices;
this.unavailableShards = unavailableShards;
this.unavailableClusters = unavailableClusters;
}

Expand Down Expand Up @@ -109,6 +115,13 @@ public Set<String> resolvedIndices() {
return resolvedIndices;
}

/**
* @return set of unavailable shards during index resolution
*/
public Set<NoShardAvailableActionException> getUnavailableShards() {
return unavailableShards;
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,13 @@ private void preAnalyzeIndices(
indexExpressionToResolve,
result.fieldNames,
requestFilter,
listener.map(indexResolution -> result.withIndexResolution(indexResolution))
listener.delegateFailure((l, indexResolution) -> {
if (configuration.allowPartialResults() == false && indexResolution.getUnavailableShards().isEmpty() == false) {
l.onFailure(indexResolution.getUnavailableShards().iterator().next());
} else {
l.onResponse(result.withIndexResolution(indexResolution));
}
})
);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
Expand Down Expand Up @@ -144,6 +145,13 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp
fieldCapsResponse.getFailures()
);

Set<NoShardAvailableActionException> unavailableShards = new HashSet<>();
for (FieldCapabilitiesFailure failure : fieldCapsResponse.getFailures()) {
if (failure.getException() instanceof NoShardAvailableActionException e) {
unavailableShards.add(e);
}
}

Map<String, IndexMode> concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size());
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
Expand All @@ -153,11 +161,9 @@ public IndexResolution mergedMappings(String indexPattern, FieldCapabilitiesResp
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
allEmpty &= ir.get().isEmpty();
}
if (allEmpty) {
// If all the mappings are empty we return an empty set of resolved indices to line up with QL
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, Map.of()), concreteIndices.keySet(), unavailableRemotes);
}
return IndexResolution.valid(new EsIndex(indexPattern, rootFields, concreteIndices), concreteIndices.keySet(), unavailableRemotes);
// If all the mappings are empty we return an empty set of resolved indices to line up with QL
var index = new EsIndex(indexPattern, rootFields, allEmpty ? Map.of() : concreteIndices);
return IndexResolution.valid(index, concreteIndices.keySet(), unavailableShards, unavailableRemotes);
}

private static Map<String, List<IndexFieldCapabilities>> collectFieldCaps(FieldCapabilitiesResponse fieldCapsResponse) {
Expand Down
Loading