Skip to content

Commit 0b3d23a

Browse files
authored
Resolve indices using original index pattern and retry the entire resolution (#136009)
1 parent 50a2c72 commit 0b3d23a

File tree

11 files changed

+125
-154
lines changed

11 files changed

+125
-154
lines changed

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RequestIndexFilteringTestCase.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,26 @@ public void testIndicesDontExist() throws IOException {
204204
ResponseException e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("foo"))));
205205
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
206206
assertThat(e.getMessage(), containsString("verification_exception"));
207-
assertThat(e.getMessage(), anyOf(containsString("Unknown index [foo]"), containsString("Unknown index [remote_cluster:foo]")));
207+
assertThat(
208+
e.getMessage(),
209+
anyOf(
210+
containsString("Unknown index [foo]"),
211+
containsString("Unknown index [*:foo]"),
212+
containsString("Unknown index [remote_cluster:foo]")
213+
)
214+
);
208215

209216
e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("foo*"))));
210217
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
211218
assertThat(e.getMessage(), containsString("verification_exception"));
212-
assertThat(e.getMessage(), anyOf(containsString("Unknown index [foo*]"), containsString("Unknown index [remote_cluster:foo*]")));
219+
assertThat(
220+
e.getMessage(),
221+
anyOf(
222+
containsString("Unknown index [foo*]"),
223+
containsString("Unknown index [*:foo*]"),
224+
containsString("Unknown index [remote_cluster:foo*]")
225+
)
226+
);
213227

214228
e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo, test1")));
215229
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());

x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.breaker.CircuitBreaker;
2222
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
2323
import org.elasticsearch.common.bytes.BytesReference;
24+
import org.elasticsearch.common.collect.Iterators;
2425
import org.elasticsearch.common.lucene.BytesRefs;
2526
import org.elasticsearch.common.regex.Regex;
2627
import org.elasticsearch.common.settings.Settings;
@@ -574,23 +575,19 @@ public static List<List<Object>> getValuesList(EsqlQueryResponse results) {
574575
}
575576

576577
public static List<List<Object>> getValuesList(Iterator<Iterator<Object>> values) {
577-
var valuesList = new ArrayList<List<Object>>();
578-
values.forEachRemaining(row -> {
579-
var rowValues = new ArrayList<>();
580-
row.forEachRemaining(rowValues::add);
581-
valuesList.add(rowValues);
582-
});
583-
return valuesList;
578+
return toList(Iterators.map(values, EsqlTestUtils::toList));
584579
}
585580

586581
public static List<List<Object>> getValuesList(Iterable<Iterable<Object>> values) {
587-
var valuesList = new ArrayList<List<Object>>();
588-
values.iterator().forEachRemaining(row -> {
589-
var rowValues = new ArrayList<>();
590-
row.iterator().forEachRemaining(rowValues::add);
591-
valuesList.add(rowValues);
592-
});
593-
return valuesList;
582+
return toList(Iterators.map(values.iterator(), row -> toList(row.iterator())));
583+
}
584+
585+
private static <T> List<T> toList(Iterator<T> iterator) {
586+
var list = new ArrayList<T>();
587+
while (iterator.hasNext()) {
588+
list.add(iterator.next());
589+
}
590+
return list;
594591
}
595592

596593
public static List<String> withDefaultLimitWarning(List<String> warnings) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinIT.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
1212
import org.elasticsearch.client.internal.Client;
1313
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.index.IndexMode;
15+
import org.elasticsearch.index.IndexSettings;
16+
import org.elasticsearch.index.query.TermQueryBuilder;
1417
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1518
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
1619
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
@@ -28,6 +31,7 @@
2831

2932
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3033
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
34+
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
3135
import static org.hamcrest.Matchers.containsString;
3236
import static org.hamcrest.Matchers.empty;
3337
import static org.hamcrest.Matchers.equalTo;
@@ -557,6 +561,57 @@ public void testLookupJoinFieldTypes() throws IOException {
557561
}
558562
}
559563

564+
public void testAlwaysAppliesTheFilter() throws IOException {
565+
setupClusters(3);
566+
setSkipUnavailable(REMOTE_CLUSTER_1, false);
567+
setSkipUnavailable(REMOTE_CLUSTER_2, false);
568+
569+
var defaultSettings = Settings.builder();
570+
createIndexWithDocument(LOCAL_CLUSTER, "data", defaultSettings, Map.of("key", 1, "f1", 1));
571+
createIndexWithDocument(REMOTE_CLUSTER_1, "data", defaultSettings, Map.of("key", 2, "f2", 2));
572+
createIndexWithDocument(REMOTE_CLUSTER_2, "data", defaultSettings, Map.of("key", 3, "f3", 3));
573+
574+
try (var r = runQuery(syncEsqlQueryRequest().query("FROM data,*:data | WHERE f1 == 1").filter(new TermQueryBuilder("f2", 2)))) {
575+
assertThat(getValuesList(r), hasSize(0));
576+
}
577+
}
578+
579+
public void testLookupJoinRetryAnalysis() throws IOException {
580+
setupClusters(3);
581+
setSkipUnavailable(REMOTE_CLUSTER_1, false);
582+
setSkipUnavailable(REMOTE_CLUSTER_2, false);
583+
584+
var defaultSettings = Settings.builder();
585+
createIndexWithDocument(LOCAL_CLUSTER, "data", defaultSettings, Map.of("key", 1, "f1", 1));
586+
createIndexWithDocument(REMOTE_CLUSTER_1, "data", defaultSettings, Map.of("key", 2, "f2", 2));
587+
createIndexWithDocument(REMOTE_CLUSTER_2, "data", defaultSettings, Map.of("key", 3, "f3", 3));
588+
589+
var lookupSettings = Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP);
590+
createIndexWithDocument(LOCAL_CLUSTER, "lookup", lookupSettings, Map.of("key", 1, "location", "local"));
591+
createIndexWithDocument(REMOTE_CLUSTER_1, "lookup", lookupSettings, Map.of("key", 2, "location", "remote-1"));
592+
// lookup is intentionally absent on REMOTE_CLUSTER_2
593+
594+
// The following query uses filter f2=2 that narrows down execution only to REMOTE_CLUSTER_1 index however,
595+
// later it uses `WHERE f1 == 1` esql condition that to an attribute present only on the local cluster index.
596+
// This causes analysis to fail and retry the entire query without a filter.
597+
// The second analysis executes against all cluster indices and should discover that lookup is absent on REMOTE_CLUSTER_2.
598+
expectThrows(
599+
VerificationException.class,
600+
containsString("lookup index [lookup] is not available in remote cluster [remote-b]"),
601+
() -> runQuery(
602+
syncEsqlQueryRequest().query("FROM data,*:data | LOOKUP JOIN lookup ON key | WHERE f1 == 1")
603+
.filter(new TermQueryBuilder("f2", 2))
604+
)
605+
);
606+
}
607+
608+
private void createIndexWithDocument(String clusterAlias, String indexName, Settings.Builder settings, Map<String, Object> source) {
609+
var client = client(clusterAlias);
610+
client.admin().indices().prepareCreate(indexName).setSettings(settings).get();
611+
client.prepareIndex(indexName).setSource(source).get();
612+
client.admin().indices().prepareRefresh(indexName).get();
613+
}
614+
560615
protected Map<String, Object> setupClustersAndLookups() throws IOException {
561616
var setupData = setupClusters(2);
562617
populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10);

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueriesWithInvalidLicenseIT.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -65,24 +65,19 @@ public void testQueryAgainstNonMatchingClusterWildcardPattern() {
6565

6666
// since this wildcarded expression does not resolve to a valid remote cluster, it is not considered
6767
// a cross-cluster search and thus should not throw a license error
68-
String q = "FROM xremote*:events";
69-
{
70-
String limit1 = q + " | STATS count(*)";
71-
try (EsqlQueryResponse resp = runQuery(limit1, requestIncludeMeta)) {
72-
assertThat(resp.columns().size(), equalTo(1));
73-
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
74-
assertThat(executionInfo.isCrossClusterSearch(), is(false));
75-
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
76-
}
68+
try (EsqlQueryResponse resp = runQuery("FROM xremote*:events | STATS count(*)", requestIncludeMeta)) {
69+
assertThat(resp.columns().size(), equalTo(1));
70+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
71+
assertThat(executionInfo.isCrossClusterSearch(), is(false));
72+
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
73+
}
7774

78-
String limit0 = q + " | LIMIT 0";
79-
try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) {
80-
assertThat(resp.columns().size(), equalTo(1));
81-
assertThat(getValuesList(resp).size(), equalTo(0));
82-
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
83-
assertThat(executionInfo.isCrossClusterSearch(), is(false));
84-
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
85-
}
75+
try (EsqlQueryResponse resp = runQuery("FROM xremote*:events | LIMIT 0", requestIncludeMeta)) {
76+
assertThat(resp.columns().size(), equalTo(1));
77+
assertThat(getValuesList(resp).size(), equalTo(0));
78+
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
79+
assertThat(executionInfo.isCrossClusterSearch(), is(false));
80+
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
8681
}
8782
}
8883

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,24 +383,24 @@ protected void testSearchesAgainstNonMatchingIndices(boolean exceptionWithSkipUn
383383
// an error is thrown if there is a concrete index that does not match
384384
{
385385
String q = "FROM nomatch*,cluster-a:nomatch";
386-
String expectedError = "Unknown index [cluster-a:nomatch,nomatch*]";
386+
String expectedError = "Unknown index [nomatch*,cluster-a:nomatch]";
387387
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
388388
}
389389

390390
// an error is thrown if there are no matching indices at all - local with wildcard, remote with wildcard
391391
{
392392
String q = "FROM nomatch*,cluster-a:nomatch*";
393-
String expectedError = "Unknown index [cluster-a:nomatch*,nomatch*]";
393+
String expectedError = "Unknown index [nomatch*,cluster-a:nomatch*]";
394394
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
395395
}
396396
{
397397
String q = "FROM nomatch,cluster-a:nomatch";
398-
String expectedError = "Unknown index [cluster-a:nomatch,nomatch]";
398+
String expectedError = "Unknown index [nomatch,cluster-a:nomatch]";
399399
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
400400
}
401401
{
402402
String q = "FROM nomatch,cluster-a:nomatch*";
403-
String expectedError = "Unknown index [cluster-a:nomatch*,nomatch]";
403+
String expectedError = "Unknown index [nomatch,cluster-a:nomatch*]";
404404
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
405405
}
406406

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ public PreAnalysis preAnalyze(LogicalPlan plan) {
4444

4545
protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
4646
Holder<IndexMode> indexMode = new Holder<>();
47-
Holder<IndexPattern> index = new Holder<>();
47+
Holder<IndexPattern> indexPattern = new Holder<>();
4848
List<IndexPattern> lookupIndices = new ArrayList<>();
4949
plan.forEachUp(UnresolvedRelation.class, p -> {
5050
if (p.indexMode() == IndexMode.LOOKUP) {
5151
lookupIndices.add(p.indexPattern());
5252
} else if (indexMode.get() == null || indexMode.get() == p.indexMode()) {
5353
indexMode.set(p.indexMode());
54-
index.set(p.indexPattern());
54+
indexPattern.set(p.indexPattern());
5555
} else {
5656
throw new IllegalStateException("index mode is already set");
5757
}
@@ -92,7 +92,7 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
9292

9393
return new PreAnalysis(
9494
indexMode.get(),
95-
index.get(),
95+
indexPattern.get(),
9696
unresolvedEnriches,
9797
lookupIndices,
9898
indexMode.get() == IndexMode.TIME_SERIES || supportsAggregateMetricDouble.get(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlCCSUtils.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -155,30 +155,6 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn
155155
}
156156
}
157157

158-
static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
159-
StringBuilder sb = new StringBuilder();
160-
for (String clusterAlias : executionInfo.clusterAliases()) {
161-
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
162-
// Exclude clusters which are either skipped or have no indices matching wildcard, or filtered out.
163-
if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) {
164-
if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
165-
sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
166-
} else {
167-
String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
168-
for (String index : indexExpression.split(",")) {
169-
sb.append(clusterAlias).append(':').append(index).append(',');
170-
}
171-
}
172-
}
173-
}
174-
175-
if (sb.length() > 0) {
176-
return sb.substring(0, sb.length() - 1);
177-
} else {
178-
return "";
179-
}
180-
}
181-
182158
static String createQualifiedLookupIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo, String localPattern) {
183159
if (executionInfo.getClusters().isEmpty()) {
184160
return localPattern;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,20 @@ public void analyzedPlan(
439439

440440
var preAnalysis = preAnalyzer.preAnalyze(parsed);
441441
var result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false);
442+
var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter";
443+
444+
resolveIndices(parsed, executionInfo, description, requestFilter, preAnalysis, result, logicalPlanListener);
445+
}
442446

447+
private void resolveIndices(
448+
LogicalPlan parsed,
449+
EsqlExecutionInfo executionInfo,
450+
String description,
451+
QueryBuilder requestFilter,
452+
PreAnalyzer.PreAnalysis preAnalysis,
453+
PreAnalysisResult result,
454+
ActionListener<LogicalPlan> logicalPlanListener
455+
) {
443456
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo);
444457

445458
SubscribableListener.<PreAnalysisResult>newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
@@ -459,7 +472,7 @@ public void analyzedPlan(
459472
.<PreAnalysisResult>andThen((l, r) -> {
460473
inferenceService.inferenceResolver(functionRegistry).resolveInferenceIds(parsed, l.map(r::withInferenceResolution));
461474
})
462-
.<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l))
475+
.<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, executionInfo, description, requestFilter, preAnalysis, r, l))
463476
.addListener(logicalPlanListener);
464477
}
465478

@@ -695,15 +708,14 @@ private void preAnalyzeMainIndices(
695708
ThreadPool.Names.SYSTEM_READ
696709
);
697710
if (preAnalysis.indexPattern() != null) {
698-
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
699-
if (indexExpressionToResolve.isEmpty()) {
700-
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
711+
if (executionInfo.clusterAliases().isEmpty()) {
712+
// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
701713
listener.onResponse(
702714
result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of())))
703715
);
704716
} else {
705717
indexResolver.resolveAsMergedMapping(
706-
indexExpressionToResolve,
718+
preAnalysis.indexPattern().indexPattern(),
707719
result.fieldNames,
708720
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
709721
switch (preAnalysis.indexMode()) {
@@ -732,13 +744,13 @@ private void preAnalyzeMainIndices(
732744

733745
private void analyzeWithRetry(
734746
LogicalPlan parsed,
747+
EsqlExecutionInfo executionInfo,
748+
String description,
735749
QueryBuilder requestFilter,
736750
PreAnalyzer.PreAnalysis preAnalysis,
737-
EsqlExecutionInfo executionInfo,
738751
PreAnalysisResult result,
739752
ActionListener<LogicalPlan> listener
740753
) {
741-
var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter";
742754
LOGGER.debug("Analyzing the plan ({})", description);
743755
try {
744756
if (result.indices.isValid() || requestFilter != null) {
@@ -756,20 +768,9 @@ private void analyzeWithRetry(
756768
// if the initial request didn't have a filter, then just pass the exception back to the user
757769
listener.onFailure(ve);
758770
} else {
759-
// retrying and make the index resolution work without any index filtering.
760-
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, listener.delegateFailure((l, r) -> {
761-
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
762-
try {
763-
// the order here is tricky - if the cluster has been filtered and later became unavailable,
764-
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
765-
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false);
766-
LogicalPlan plan = analyzedPlan(parsed, r, executionInfo);
767-
LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan);
768-
l.onResponse(plan);
769-
} catch (Exception e) {
770-
l.onFailure(e);
771-
}
772-
}));
771+
// retrying the index resolution without index filtering.
772+
executionInfo.clusterInfo.clear();
773+
resolveIndices(parsed, executionInfo, "second attempt, without filter", null, preAnalysis, result, listener);
773774
}
774775
} catch (Exception e) {
775776
listener.onFailure(e);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/IndexResolver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ private static FieldCapabilitiesRequest createFieldCapsRequest(
329329
req.fields(fieldNames.toArray(String[]::new));
330330
req.includeUnmapped(true);
331331
req.indexFilter(requestFilter);
332+
req.returnLocalAll(false);
332333
// lenient because we throw our own errors looking at the response e.g. if something was not resolved
333334
// also because this way security doesn't throw authorization exceptions but rather honors ignore_unavailable
334335
req.indicesOptions(FIELD_CAPS_INDICES_OPTIONS);

0 commit comments

Comments
 (0)