Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e0f5989
resolve original index pattern
idegtiarenko Sep 5, 2025
9e56546
Merge branch 'main' into es-12487
idegtiarenko Sep 8, 2025
e172c13
fix csv tests
idegtiarenko Sep 8, 2025
62cb8e9
Merge branch 'main' into es-12487
idegtiarenko Sep 10, 2025
d653020
Merge branch 'main' into es-12487
idegtiarenko Sep 17, 2025
21ccd63
returnLocalAll=false
idegtiarenko Sep 17, 2025
24b7a4c
move execution info updates
idegtiarenko Sep 17, 2025
b1e75f4
Merge branch 'main' into es-12487
idegtiarenko Sep 23, 2025
3029bfb
resolve using original index pattern
idegtiarenko Sep 23, 2025
58c8230
assert original index expression in error
idegtiarenko Sep 24, 2025
ee777a4
[CI] Update transport version definitions
Sep 24, 2025
06bfcfa
upd
idegtiarenko Sep 24, 2025
3c9a40b
Merge branch 'main' into es-12487
idegtiarenko Sep 24, 2025
cf31115
fix flakiness
idegtiarenko Sep 24, 2025
e9a0e29
Merge branch 'main' into es-12487
idegtiarenko Sep 25, 2025
880f169
Merge branch 'main' into es-12487
idegtiarenko Sep 25, 2025
444217d
Merge branch 'main' into es-12487
idegtiarenko Sep 29, 2025
e5254ec
update comment
idegtiarenko Oct 1, 2025
685830f
Merge branch 'main' into es-12487
idegtiarenko Oct 1, 2025
adaa593
Merge branch 'main' into es-12487
idegtiarenko Oct 6, 2025
ef36c2e
Retry the entire index resolution without filter
idegtiarenko Oct 6, 2025
8640d59
add an integration test
idegtiarenko Oct 6, 2025
6561b92
Merge branch 'main' into retry_complete_resolution
idegtiarenko Oct 6, 2025
028469d
Merge branch 'main' into retry_complete_resolution
idegtiarenko Oct 7, 2025
fc72d1e
additional test case
idegtiarenko Oct 8, 2025
2e2021c
Merge branch 'main' into retry_complete_resolution
idegtiarenko Oct 8, 2025
a996b25
use mutable list
idegtiarenko Oct 8, 2025
2f5dbce
Merge branch 'main' into retry_complete_resolution
idegtiarenko Oct 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,26 @@ public void testIndicesDontExist() throws IOException {
ResponseException e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("foo"))));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("verification_exception"));
assertThat(e.getMessage(), anyOf(containsString("Unknown index [foo]"), containsString("Unknown index [remote_cluster:foo]")));
assertThat(
e.getMessage(),
anyOf(
containsString("Unknown index [foo]"),
containsString("Unknown index [*:foo]"),
containsString("Unknown index [remote_cluster:foo]")
)
);

e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query(from("foo*"))));
assertEquals(400, e.getResponse().getStatusLine().getStatusCode());
assertThat(e.getMessage(), containsString("verification_exception"));
assertThat(e.getMessage(), anyOf(containsString("Unknown index [foo*]"), containsString("Unknown index [remote_cluster:foo*]")));
assertThat(
e.getMessage(),
anyOf(
containsString("Unknown index [foo*]"),
containsString("Unknown index [*:foo*]"),
containsString("Unknown index [remote_cluster:foo*]")
)
);

e = expectThrows(ResponseException.class, () -> runEsql(timestampFilter("gte", "2020-01-01").query("FROM foo, test1")));
assertEquals(404, e.getResponse().getStatusLine().getStatusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
Expand All @@ -28,6 +31,7 @@

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
import static org.elasticsearch.xpack.esql.action.EsqlQueryRequest.syncEsqlQueryRequest;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -557,6 +561,42 @@ public void testLookupJoinFieldTypes() throws IOException {
}
}

public void testLookupJoinRetryAnalysis() throws IOException {
setupClusters(3);
setSkipUnavailable(REMOTE_CLUSTER_1, false);
setSkipUnavailable(REMOTE_CLUSTER_2, false);

var defaultSettings = Settings.builder();
createIndexWithDocument(LOCAL_CLUSTER, "data", defaultSettings, Map.of("key", 1, "f1", 1));
createIndexWithDocument(REMOTE_CLUSTER_1, "data", defaultSettings, Map.of("key", 2, "f2", 2));
createIndexWithDocument(REMOTE_CLUSTER_2, "data", defaultSettings, Map.of("key", 3, "f3", 3));

var lookupSettings = Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOOKUP);
createIndexWithDocument(LOCAL_CLUSTER, "lookup", lookupSettings, Map.of("key", 1, "location", "local"));
createIndexWithDocument(REMOTE_CLUSTER_1, "lookup", lookupSettings, Map.of("key", 2, "location", "remote-1"));
// lookup is intentionally absent on REMOTE_CLUSTER_2

// The following query uses filter f2=2 that narrows down execution only to REMOTE_CLUSTER_1 index however,
// later it uses `WHERE f1 == 1` esql condition that to an attribute present only on the local cluster index.
// This causes analysis to fail and retry the entire query without a filter.
// The second analysis executes against all cluster indices and should discover that lookup is absent on REMOTE_CLUSTER_2.
expectThrows(
VerificationException.class,
containsString("lookup index [lookup] is not available in remote cluster [remote-b]"),
() -> runQuery(
syncEsqlQueryRequest().query("FROM data,*:data | LOOKUP JOIN lookup ON key | WHERE f1 == 1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: please add the other use case without join we've discussed.

.filter(new TermQueryBuilder("f2", 2))
)
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test would fail if we only retry previously resolved subset of clusters opposed to ones resolved to all clusters without the filter.

}

private void createIndexWithDocument(String clusterAlias, String indexName, Settings.Builder settings, Map<String, Object> source) {
var client = client(clusterAlias);
client.admin().indices().prepareCreate(indexName).setSettings(settings).get();
client.prepareIndex(indexName).setSource(source).get();
client.admin().indices().prepareRefresh(indexName).get();
}

protected Map<String, Object> setupClustersAndLookups() throws IOException {
var setupData = setupClusters(2);
populateLookupIndex(LOCAL_CLUSTER, "values_lookup", 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,19 @@ public void testQueryAgainstNonMatchingClusterWildcardPattern() {

// since this wildcarded expression does not resolve to a valid remote cluster, it is not considered
// a cross-cluster search and thus should not throw a license error
String q = "FROM xremote*:events";
{
String limit1 = q + " | STATS count(*)";
try (EsqlQueryResponse resp = runQuery(limit1, requestIncludeMeta)) {
assertThat(resp.columns().size(), equalTo(1));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(false));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
}
try (EsqlQueryResponse resp = runQuery("FROM xremote*:events | STATS count(*)", requestIncludeMeta)) {
assertThat(resp.columns().size(), equalTo(1));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(false));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
}

String limit0 = q + " | LIMIT 0";
try (EsqlQueryResponse resp = runQuery(limit0, requestIncludeMeta)) {
assertThat(resp.columns().size(), equalTo(1));
assertThat(getValuesList(resp).size(), equalTo(0));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(false));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
}
try (EsqlQueryResponse resp = runQuery("FROM xremote*:events | LIMIT 0", requestIncludeMeta)) {
assertThat(resp.columns().size(), equalTo(1));
assertThat(getValuesList(resp).size(), equalTo(0));
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.isCrossClusterSearch(), is(false));
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,24 +383,24 @@ protected void testSearchesAgainstNonMatchingIndices(boolean exceptionWithSkipUn
// an error is thrown if there is a concrete index that does not match
{
String q = "FROM nomatch*,cluster-a:nomatch";
String expectedError = "Unknown index [cluster-a:nomatch,nomatch*]";
String expectedError = "Unknown index [nomatch*,cluster-a:nomatch]";
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
}

// an error is thrown if there are no matching indices at all - local with wildcard, remote with wildcard
{
String q = "FROM nomatch*,cluster-a:nomatch*";
String expectedError = "Unknown index [cluster-a:nomatch*,nomatch*]";
String expectedError = "Unknown index [nomatch*,cluster-a:nomatch*]";
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
}
{
String q = "FROM nomatch,cluster-a:nomatch";
String expectedError = "Unknown index [cluster-a:nomatch,nomatch]";
String expectedError = "Unknown index [nomatch,cluster-a:nomatch]";
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
}
{
String q = "FROM nomatch,cluster-a:nomatch*";
String expectedError = "Unknown index [cluster-a:nomatch*,nomatch]";
String expectedError = "Unknown index [nomatch,cluster-a:nomatch*]";
expectVerificationExceptionForQuery(q, expectedError, requestIncludeMeta);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ public PreAnalysis preAnalyze(LogicalPlan plan) {

protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
Holder<IndexMode> indexMode = new Holder<>();
Holder<IndexPattern> index = new Holder<>();
Holder<IndexPattern> indexPattern = new Holder<>();
List<IndexPattern> lookupIndices = new ArrayList<>();
plan.forEachUp(UnresolvedRelation.class, p -> {
if (p.indexMode() == IndexMode.LOOKUP) {
lookupIndices.add(p.indexPattern());
} else if (indexMode.get() == null || indexMode.get() == p.indexMode()) {
indexMode.set(p.indexMode());
index.set(p.indexPattern());
indexPattern.set(p.indexPattern());
} else {
throw new IllegalStateException("index mode is already set");
}
Expand Down Expand Up @@ -92,7 +92,7 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {

return new PreAnalysis(
indexMode.get(),
index.get(),
indexPattern.get(),
unresolvedEnriches,
lookupIndices,
indexMode.get() == IndexMode.TIME_SERIES || supportsAggregateMetricDouble.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,30 +155,6 @@ static void updateExecutionInfoToReturnEmptyResult(EsqlExecutionInfo executionIn
}
}

static String createIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo) {
StringBuilder sb = new StringBuilder();
for (String clusterAlias : executionInfo.clusterAliases()) {
EsqlExecutionInfo.Cluster cluster = executionInfo.getCluster(clusterAlias);
// Exclude clusters which are either skipped or have no indices matching wildcard, or filtered out.
if (cluster.getStatus() != Cluster.Status.SKIPPED && cluster.getStatus() != Cluster.Status.SUCCESSFUL) {
if (cluster.getClusterAlias().equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
sb.append(executionInfo.getCluster(clusterAlias).getIndexExpression()).append(',');
} else {
String indexExpression = executionInfo.getCluster(clusterAlias).getIndexExpression();
for (String index : indexExpression.split(",")) {
sb.append(clusterAlias).append(':').append(index).append(',');
}
}
}
}

if (sb.length() > 0) {
return sb.substring(0, sb.length() - 1);
} else {
return "";
}
}

static String createQualifiedLookupIndexExpressionFromAvailableClusters(EsqlExecutionInfo executionInfo, String localPattern) {
if (executionInfo.getClusters().isEmpty()) {
return localPattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,20 @@ public void analyzedPlan(

var preAnalysis = preAnalyzer.preAnalyze(parsed);
var result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false);
var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter";

resolveIndices(parsed, executionInfo, description, requestFilter, preAnalysis, result, logicalPlanListener);
}

private void resolveIndices(
LogicalPlan parsed,
EsqlExecutionInfo executionInfo,
String description,
QueryBuilder requestFilter,
PreAnalyzer.PreAnalysis preAnalysis,
PreAnalysisResult result,
ActionListener<LogicalPlan> logicalPlanListener
) {
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo);

SubscribableListener.<PreAnalysisResult>newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
Expand All @@ -459,7 +472,7 @@ public void analyzedPlan(
.<PreAnalysisResult>andThen((l, r) -> {
inferenceService.inferenceResolver(functionRegistry).resolveInferenceIds(parsed, l.map(r::withInferenceResolution));
})
.<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l))
.<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, executionInfo, description, requestFilter, preAnalysis, r, l))
.addListener(logicalPlanListener);
}

Expand Down Expand Up @@ -695,15 +708,14 @@ private void preAnalyzeMainIndices(
ThreadPool.Names.SYSTEM_READ
);
if (preAnalysis.indexPattern() != null) {
String indexExpressionToResolve = EsqlCCSUtils.createIndexExpressionFromAvailableClusters(executionInfo);
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
if (executionInfo.clusterAliases().isEmpty()) {
// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
listener.onResponse(
result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of())))
);
} else {
indexResolver.resolveAsMergedMapping(
indexExpressionToResolve,
preAnalysis.indexPattern().indexPattern(),
result.fieldNames,
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
switch (preAnalysis.indexMode()) {
Expand Down Expand Up @@ -732,13 +744,13 @@ private void preAnalyzeMainIndices(

private void analyzeWithRetry(
LogicalPlan parsed,
EsqlExecutionInfo executionInfo,
String description,
QueryBuilder requestFilter,
PreAnalyzer.PreAnalysis preAnalysis,
EsqlExecutionInfo executionInfo,
PreAnalysisResult result,
ActionListener<LogicalPlan> listener
) {
var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter";
LOGGER.debug("Analyzing the plan ({})", description);
try {
if (result.indices.isValid() || requestFilter != null) {
Expand All @@ -756,20 +768,9 @@ private void analyzeWithRetry(
// if the initial request didn't have a filter, then just pass the exception back to the user
listener.onFailure(ve);
} else {
// retrying and make the index resolution work without any index filtering.
preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, listener.delegateFailure((l, r) -> {
LOGGER.debug("Analyzing the plan (second attempt, without filter)");
try {
// the order here is tricky - if the cluster has been filtered and later became unavailable,
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false);
LogicalPlan plan = analyzedPlan(parsed, r, executionInfo);
LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan);
l.onResponse(plan);
} catch (Exception e) {
l.onFailure(e);
}
}));
// retrying the index resolution without index filtering.
executionInfo.clusterInfo.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

resolveIndices(parsed, executionInfo, "second attempt, without filter", null, preAnalysis, result, listener);
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ private static FieldCapabilitiesRequest createFieldCapsRequest(
req.fields(fieldNames.toArray(String[]::new));
req.includeUnmapped(true);
req.indexFilter(requestFilter);
req.returnLocalAll(false);
// lenient because we throw our own errors looking at the response e.g. if something was not resolved
// also because this way security doesn't throw authorization exceptions but rather honors ignore_unavailable
req.indicesOptions(FIELD_CAPS_INDICES_OPTIONS);
Expand Down
Loading