Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ protected final EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas)
}

protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter) {
return run(esqlCommands, pragmas, filter, null);
}

protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter, Boolean allowPartialResults) {
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query(esqlCommands);
if (pragmas != null) {
Expand All @@ -174,6 +178,9 @@ protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, Query
if (filter != null) {
request.filter(filter);
}
if (allowPartialResults != null) {
request.allowPartialResults(allowPartialResults);
}
return run(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
Expand All @@ -30,6 +29,7 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -48,7 +48,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
* Make sure that we don't send data-node requests to the target shards which won't match the query
*/
public void testCanMatch() {
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("events_2022")
Expand All @@ -60,9 +60,7 @@ public void testCanMatch() {
.add(new IndexRequest().source("@timestamp", "2022-05-02", "uid", "u1"))
.add(new IndexRequest().source("@timestamp", "2022-12-15", "uid", "u1"))
.get();
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword")
);
assertAcked(client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword"));
client().prepareBulk("events_2023")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest().source("@timestamp", "2023-01-15", "uid", "u2"))
Expand All @@ -72,15 +70,17 @@ public void testCanMatch() {
.get();
try {
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService transportService = (MockTransportService) ts;
transportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ComputeService.DATA_ACTION_NAME,
(handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
}
handler.messageReceived(request, channel, task);
});
);
}
try (EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gte("2023-01-01"))) {
assertThat(getValuesList(resp), hasSize(4));
Expand Down Expand Up @@ -118,14 +118,14 @@ public void testCanMatch() {
queriedIndices.clear();
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
((MockTransportService) ts).clearAllRules();
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).clearAllRules();
}
}
}

public void testAliasFilters() {
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("employees")
Expand All @@ -141,7 +141,7 @@ public void testAliasFilters() {
.add(new IndexRequest().source("emp_no", 106, "dept", "sales", "hired", "2012-08-09", "salary", 30.1))
.get();

ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
Expand Down Expand Up @@ -209,11 +209,10 @@ public void testAliasFilters() {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103749")
public void testFailOnUnavailableShards() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
String logsOnlyNode = internalCluster().startDataOnlyNode();
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("events")
Expand All @@ -230,7 +229,7 @@ public void testFailOnUnavailableShards() throws Exception {
.add(new IndexRequest().source("timestamp", 2, "message", "b"))
.add(new IndexRequest().source("timestamp", 3, "message", "c"))
.get();
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("logs")
Expand All @@ -246,12 +245,28 @@ public void testFailOnUnavailableShards() throws Exception {
.add(new IndexRequest().source("timestamp", 10, "message", "aa"))
.add(new IndexRequest().source("timestamp", 11, "message", "bb"))
.get();

// when all shards available
try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message")) {
assertThat(getValuesList(resp), hasSize(5));
internalCluster().stopNode(logsOnlyNode);
ensureClusterSizeConsistency();
Exception error = expectThrows(Exception.class, () -> run("from events,logs | KEEP timestamp,message"));
assertThat(error.getMessage(), containsString("no shard copies found"));
}

internalCluster().stopNode(logsOnlyNode);
ensureClusterSizeConsistency();

// when one shard is unavailable
expectThrows(
Exception.class,
containsString("index [logs] has no active shard copy"),
() -> run("from events,logs | KEEP timestamp,message")
);
expectThrows(
Exception.class,
containsString("index [logs] has no active shard copy"),
() -> run("from * | KEEP timestamp,message")
);
try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message", null, null, true)) {
assertThat(getValuesList(resp), hasSize(3));
}
}

Expand All @@ -261,9 +276,7 @@ public void testSkipOnIndexName() {
Map<String, Integer> indexToNumDocs = new HashMap<>();
for (int i = 0; i < numIndices; i++) {
String index = "events-" + i;
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
);
assertAcked(client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword"));
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int docs = between(1, 5);
long timestamp = 1;
Expand All @@ -274,15 +287,17 @@ public void testSkipOnIndexName() {
indexToNumDocs.put(index, docs);
}
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ComputeService.DATA_ACTION_NAME,
(handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
}
handler.messageReceived(request, channel, task);
});
);
}
try {
for (int i = 0; i < numIndices; i++) {
Expand All @@ -294,9 +309,8 @@ public void testSkipOnIndexName() {
assertThat(queriedIndices, equalTo(Set.of(index)));
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.clearAllRules();
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).clearAllRules();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
*/
package org.elasticsearch.xpack.esql.index;

import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.core.Nullable;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -19,30 +19,33 @@ public final class IndexResolution {
/**
* @param index EsIndex encapsulating requested index expression, resolved mappings and index modes from field-caps.
* @param resolvedIndices Set of concrete indices resolved by field-caps. (This information is not always present in the EsIndex).
* @param unavailableShards Set of shards that were unavailable during index resolution
* @param unavailableClusters Remote clusters that could not be contacted during planning
* @return valid IndexResolution
*/
public static IndexResolution valid(
EsIndex index,
Set<String> resolvedIndices,
Set<NoShardAvailableActionException> unavailableShards,
Map<String, FieldCapabilitiesFailure> unavailableClusters
) {
Objects.requireNonNull(index, "index must not be null if it was found");
Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null");
Objects.requireNonNull(unavailableShards, "unavailableShards must not be null");
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
return new IndexResolution(index, null, resolvedIndices, unavailableClusters);
return new IndexResolution(index, null, resolvedIndices, unavailableShards, unavailableClusters);
}

/**
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
*/
public static IndexResolution valid(EsIndex index) {
return valid(index, index.concreteIndices(), Collections.emptyMap());
return valid(index, index.concreteIndices(), Set.of(), Map.of());
}

public static IndexResolution invalid(String invalid) {
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
return new IndexResolution(null, invalid, Collections.emptySet(), Collections.emptyMap());
return new IndexResolution(null, invalid, Set.of(), Set.of(), Map.of());
}

public static IndexResolution notFound(String name) {
Expand All @@ -56,18 +59,21 @@ public static IndexResolution notFound(String name) {

// all indices found by field-caps
private final Set<String> resolvedIndices;
private final Set<NoShardAvailableActionException> unavailableShards;
// remote clusters included in the user's index expression that could not be connected to
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;

private IndexResolution(
EsIndex index,
@Nullable String invalid,
Set<String> resolvedIndices,
Set<NoShardAvailableActionException> unavailableShards,
Map<String, FieldCapabilitiesFailure> unavailableClusters
) {
this.index = index;
this.invalid = invalid;
this.resolvedIndices = resolvedIndices;
this.unavailableShards = unavailableShards;
this.unavailableClusters = unavailableClusters;
}

Expand Down Expand Up @@ -109,6 +115,13 @@ public Set<String> resolvedIndices() {
return resolvedIndices;
}

/**
* @return set of unavailable shards during index resolution
*/
public Set<NoShardAvailableActionException> getUnavailableShards() {
return unavailableShards;
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,13 @@ private void preAnalyzeIndices(
indexExpressionToResolve,
result.fieldNames,
requestFilter,
listener.map(indexResolution -> result.withIndexResolution(indexResolution))
listener.delegateFailure((l, indexResolution) -> {
if (configuration.allowPartialResults() == false && indexResolution.getUnavailableShards().isEmpty() == false) {
l.onFailure(indexResolution.getUnavailableShards().iterator().next());
} else {
l.onResponse(result.withIndexResolution(indexResolution));
}
})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change completes the listener early, immediately after index resolution with a failure if there are failures

);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
Expand Down Expand Up @@ -152,6 +153,13 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit
fieldCapsResponse.getFailures()
);

Set<NoShardAvailableActionException> unavailableShards = new HashSet<>();
for (FieldCapabilitiesFailure failure : fieldCapsResponse.getFailures()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can mistakenly use failures from the remote clusters. Can we extend determineUnavailableRemoteClusters to include local failures and extract from there instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. I have replaced this with filtering for NoShardAvailableActionException.
I would like to merge this fix as it fixes the handling of local unavailable shards, but likely we will need one more followup iteration with focus on CCS (does unavailable remote cluster means result is partial? can we detect remote cluster unavailable shards?)

if (failure.getException() instanceof NoShardAvailableActionException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is restrictive and could be fragile. The field-caps API can fail for reasons other than a NoShardAvailableActionException. Should we handle all exceptions? I'm fine if you plan to address this in a follow-up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is happening when running testFailOnUnavailableShards,
but I agree, we should expand this list as we find more cases.
For now, do you believe there are other cases I could add?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should detect the index pattern for non-remote scenarios and handle all exceptions.

unavailableShards.add(e);
}
}

Map<String, IndexMode> concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size());
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
Expand All @@ -163,7 +171,7 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit
}
// If all the mappings are empty we return an empty set of resolved indices to line up with QL
var index = new EsIndex(indexPattern, rootFields, allEmpty ? Map.of() : concreteIndices, partiallyUnmappedFields);
return IndexResolution.valid(index, concreteIndices.keySet(), unavailableRemotes);
return IndexResolution.valid(index, concreteIndices.keySet(), unavailableShards, unavailableRemotes);
}

private static Map<String, List<IndexFieldCapabilities>> collectFieldCaps(FieldCapabilitiesResponse fieldCapsResponse) {
Expand Down
Loading