Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase;
Expand All @@ -30,6 +29,7 @@
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.hamcrest.Matchers.containsString;
Expand All @@ -48,7 +48,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
* Make sure that we don't send data-node requests to the target shards which won't match the query
*/
public void testCanMatch() {
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("events_2022")
Expand All @@ -60,9 +60,7 @@ public void testCanMatch() {
.add(new IndexRequest().source("@timestamp", "2022-05-02", "uid", "u1"))
.add(new IndexRequest().source("@timestamp", "2022-12-15", "uid", "u1"))
.get();
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword")
);
assertAcked(client().admin().indices().prepareCreate("events_2023").setMapping("@timestamp", "type=date", "uid", "type=keyword"));
client().prepareBulk("events_2023")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.add(new IndexRequest().source("@timestamp", "2023-01-15", "uid", "u2"))
Expand All @@ -72,15 +70,17 @@ public void testCanMatch() {
.get();
try {
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService transportService = (MockTransportService) ts;
transportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ComputeService.DATA_ACTION_NAME,
(handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
}
handler.messageReceived(request, channel, task);
});
);
}
try (EsqlQueryResponse resp = run("from events_*", randomPragmas(), new RangeQueryBuilder("@timestamp").gte("2023-01-01"))) {
assertThat(getValuesList(resp), hasSize(4));
Expand Down Expand Up @@ -118,14 +118,14 @@ public void testCanMatch() {
queriedIndices.clear();
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
((MockTransportService) ts).clearAllRules();
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).clearAllRules();
}
}
}

public void testAliasFilters() {
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("employees")
Expand All @@ -141,7 +141,7 @@ public void testAliasFilters() {
.add(new IndexRequest().source("emp_no", 106, "dept", "sales", "hired", "2012-08-09", "salary", 30.1))
.get();

ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareAliases(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT)
Expand Down Expand Up @@ -209,11 +209,10 @@ public void testAliasFilters() {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/103749")
public void testFailOnUnavailableShards() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
String logsOnlyNode = internalCluster().startDataOnlyNode();
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("events")
Expand All @@ -230,7 +229,7 @@ public void testFailOnUnavailableShards() throws Exception {
.add(new IndexRequest().source("timestamp", 2, "message", "b"))
.add(new IndexRequest().source("timestamp", 3, "message", "c"))
.get();
ElasticsearchAssertions.assertAcked(
assertAcked(
client().admin()
.indices()
.prepareCreate("logs")
Expand All @@ -246,13 +245,26 @@ public void testFailOnUnavailableShards() throws Exception {
.add(new IndexRequest().source("timestamp", 10, "message", "aa"))
.add(new IndexRequest().source("timestamp", 11, "message", "bb"))
.get();

// when all shards available
try (EsqlQueryResponse resp = run("from events,logs | KEEP timestamp,message")) {
assertThat(getValuesList(resp), hasSize(5));
internalCluster().stopNode(logsOnlyNode);
ensureClusterSizeConsistency();
Exception error = expectThrows(Exception.class, () -> run("from events,logs | KEEP timestamp,message"));
assertThat(error.getMessage(), containsString("no shard copies found"));
}

internalCluster().stopNode(logsOnlyNode);
ensureClusterSizeConsistency();

// when one shard is unavailable
expectThrows(
Exception.class,
containsString("index [logs] has no active shard copy"),
() -> run("from events,logs | KEEP timestamp,message")
);
expectThrows(
Exception.class,
containsString("index [logs] has no active shard copy"),
() -> run("from * | KEEP timestamp,message")
);
}

public void testSkipOnIndexName() {
Expand All @@ -261,9 +273,7 @@ public void testSkipOnIndexName() {
Map<String, Integer> indexToNumDocs = new HashMap<>();
for (int i = 0; i < numIndices; i++) {
String index = "events-" + i;
ElasticsearchAssertions.assertAcked(
client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword")
);
assertAcked(client().admin().indices().prepareCreate(index).setMapping("timestamp", "type=long", "message", "type=keyword"));
BulkRequestBuilder bulk = client().prepareBulk(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
int docs = between(1, 5);
long timestamp = 1;
Expand All @@ -274,15 +284,17 @@ public void testSkipOnIndexName() {
indexToNumDocs.put(index, docs);
}
Set<String> queriedIndices = ConcurrentCollections.newConcurrentSet();
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.addRequestHandlingBehavior(ComputeService.DATA_ACTION_NAME, (handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).addRequestHandlingBehavior(
ComputeService.DATA_ACTION_NAME,
(handler, request, channel, task) -> {
DataNodeRequest dataNodeRequest = (DataNodeRequest) request;
for (ShardId shardId : dataNodeRequest.shardIds()) {
queriedIndices.add(shardId.getIndexName());
}
handler.messageReceived(request, channel, task);
}
handler.messageReceived(request, channel, task);
});
);
}
try {
for (int i = 0; i < numIndices; i++) {
Expand All @@ -294,9 +306,8 @@ public void testSkipOnIndexName() {
assertThat(queriedIndices, equalTo(Set.of(index)));
}
} finally {
for (TransportService ts : internalCluster().getInstances(TransportService.class)) {
MockTransportService mockTransportService = as(ts, MockTransportService.class);
mockTransportService.clearAllRules();
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
as(transportService, MockTransportService.class).clearAllRules();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.core.Nullable;

import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -25,24 +24,26 @@ public final class IndexResolution {
public static IndexResolution valid(
EsIndex index,
Set<String> resolvedIndices,
FieldCapabilitiesFailure localResolutionFailure,
Map<String, FieldCapabilitiesFailure> unavailableClusters
) {
Objects.requireNonNull(index, "index must not be null if it was found");
Objects.requireNonNull(resolvedIndices, "resolvedIndices must not be null");
Objects.requireNonNull(resolvedIndices, "resolutionFailures must not be null");
Objects.requireNonNull(unavailableClusters, "unavailableClusters must not be null");
return new IndexResolution(index, null, resolvedIndices, unavailableClusters);
return new IndexResolution(index, null, resolvedIndices, localResolutionFailure, unavailableClusters);
}

/**
* Use this method only if the set of concrete resolved indices is the same as EsIndex#concreteIndices().
*/
public static IndexResolution valid(EsIndex index) {
return valid(index, index.concreteIndices(), Collections.emptyMap());
return valid(index, index.concreteIndices(), null, Map.of());
}

public static IndexResolution invalid(String invalid) {
Objects.requireNonNull(invalid, "invalid must not be null to signal that the index is invalid");
return new IndexResolution(null, invalid, Collections.emptySet(), Collections.emptyMap());
return new IndexResolution(null, invalid, Set.of(), null, Map.of());
}

public static IndexResolution notFound(String name) {
Expand All @@ -56,18 +57,22 @@ public static IndexResolution notFound(String name) {

// all indices found by field-caps
private final Set<String> resolvedIndices;
@Nullable
private final FieldCapabilitiesFailure localResolutionFailure;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we merge this with unavailableClusters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think so. I just updated the implementation to look for unavailable shards rather than generic failures and I would like to keep unavailableClusters remote only as documented:

// remote clusters included in the user's index expression that could not be connected to

// remote clusters included in the user's index expression that could not be connected to
private final Map<String, FieldCapabilitiesFailure> unavailableClusters;

private IndexResolution(
EsIndex index,
@Nullable String invalid,
Set<String> resolvedIndices,
@Nullable FieldCapabilitiesFailure localResolutionFailure,
Map<String, FieldCapabilitiesFailure> unavailableClusters
) {
this.index = index;
this.invalid = invalid;
this.resolvedIndices = resolvedIndices;
this.localResolutionFailure = localResolutionFailure;
this.unavailableClusters = unavailableClusters;
}

Expand Down Expand Up @@ -109,6 +114,14 @@ public Set<String> resolvedIndices() {
return resolvedIndices;
}

/**
* @return local cluster index resolution failure if present
*/
@Nullable
public FieldCapabilitiesFailure getLocalResolutionFailure() {
return localResolutionFailure;
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,13 @@ private void preAnalyzeIndices(
indexExpressionToResolve,
result.fieldNames,
requestFilter,
listener.map(indexResolution -> result.withIndexResolution(indexResolution))
listener.delegateFailure((l, indexResolution) -> {
if (configuration.allowPartialResults() == false && indexResolution.getLocalResolutionFailure() != null) {
l.onFailure(indexResolution.getLocalResolutionFailure().getException());
} else {
l.onResponse(result.withIndexResolution(indexResolution));
}
})
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change completes the listener early, immediately after index resolution with a failure if there are failures

);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesIndexResponse;
Expand Down Expand Up @@ -87,7 +88,10 @@ public void resolveAsMergedMapping(
client.execute(
EsqlResolveFieldsAction.TYPE,
createFieldCapsRequest(indexWildcard, fieldNames, requestFilter),
listener.delegateFailureAndWrap((l, response) -> l.onResponse(mergedMappings(indexWildcard, response)))
listener.delegateFailureAndWrap((l, response) -> {

l.onResponse(mergedMappings(indexWildcard, response));
})
);
}

Expand Down Expand Up @@ -152,6 +156,14 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit
fieldCapsResponse.getFailures()
);

FieldCapabilitiesFailure localResolutionFailure = null;
for (FieldCapabilitiesFailure failure : fieldCapsResponse.getFailures()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can mistakenly use failures from the remote clusters. Can we extend determineUnavailableRemoteClusters to include local failures and extract from there instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. I have replaced this with filtering for NoShardAvailableActionException.
I would like to merge this fix as it fixes the handling of local unavailable shards, but likely we will need one more followup iteration with focus on CCS (does unavailable remote cluster means result is partial? can we detect remote cluster unavailable shards?)

if (ExceptionsHelper.isRemoteUnavailableException(failure.getException()) == false) {
localResolutionFailure = failure;
break;
}
}

Map<String, IndexMode> concreteIndices = Maps.newMapWithExpectedSize(fieldCapsResponse.getIndexResponses().size());
for (FieldCapabilitiesIndexResponse ir : fieldCapsResponse.getIndexResponses()) {
concreteIndices.put(ir.getIndexName(), ir.getIndexMode());
Expand All @@ -163,7 +175,7 @@ public static IndexResolution mergedMappings(String indexPattern, FieldCapabilit
}
// If all the mappings are empty we return an empty set of resolved indices to line up with QL
var index = new EsIndex(indexPattern, rootFields, allEmpty ? Map.of() : concreteIndices, partiallyUnmappedFields);
return IndexResolution.valid(index, concreteIndices.keySet(), unavailableRemotes);
return IndexResolution.valid(index, concreteIndices.keySet(), localResolutionFailure, unavailableRemotes);
}

private static Map<String, List<IndexFieldCapabilities>> collectFieldCaps(FieldCapabilitiesResponse fieldCapsResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
)
);

IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), Map.of());
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), null, Map.of());

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand Down Expand Up @@ -298,7 +298,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
)
);
Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of();
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), null, unavailableClusters);

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand Down Expand Up @@ -340,7 +340,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
// remote1 is unavailable
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(REMOTE1_ALIAS, failure);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), null, unavailableClusters);

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand Down Expand Up @@ -383,7 +383,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {

var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(REMOTE1_ALIAS, failure);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), null, unavailableClusters);
VerificationException ve = expectThrows(
VerificationException.class,
() -> EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution)
Expand Down Expand Up @@ -417,7 +417,7 @@ public void testUpdateExecutionInfoWithClustersWithNoMatchingIndices() {
// remote1 is unavailable
var failure = new FieldCapabilitiesFailure(new String[] { "logs-a" }, new NoSeedNodeLeftException("unable to connect"));
Map<String, FieldCapabilitiesFailure> unavailableClusters = Map.of(REMOTE1_ALIAS, failure);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), unavailableClusters);
IndexResolution indexResolution = IndexResolution.valid(esIndex, esIndex.concreteIndices(), null, unavailableClusters);

EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, indexResolution);

Expand Down
Loading