From 0fe67dc22e796deb24ebe46220f7c9280e10f21d Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 8 Sep 2025 11:54:46 +0200 Subject: [PATCH 01/14] resolve all fields when enrich is used --- .../esql/core/expression/AttributeMap.java | 1 - .../xpack/esql/session/FieldNameUtils.java | 32 +++++++++---------- .../esql/session/FieldNameUtilsTests.java | 13 +++----- 3 files changed, 19 insertions(+), 27 deletions(-) diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeMap.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeMap.java index 9a3e487bca3f9..473882d90dad7 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeMap.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeMap.java @@ -244,7 +244,6 @@ private E delete(Object key) { public Set attributeNames() { Set s = Sets.newLinkedHashSetWithExpectedSize(size()); - for (AttributeWrapper aw : delegate.keySet()) { s.add(aw.attr.name()); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java index c8f948303e22b..f3389a7a7bfa9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java @@ -20,7 +20,6 @@ import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; import org.elasticsearch.xpack.esql.core.expression.UnresolvedStar; import org.elasticsearch.xpack.esql.core.util.Holder; -import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; import org.elasticsearch.xpack.esql.expression.UnresolvedNamePattern; import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction; import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket; @@ -52,8 +51,9 @@ import java.util.Locale; import java.util.Set; import java.util.function.BiConsumer; -import java.util.stream.Collectors; +import java.util.stream.Stream; +import static java.util.stream.Collectors.toSet; import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD; public class FieldNameUtils { @@ -62,12 +62,6 @@ public class FieldNameUtils { public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution) { - // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API - Set enrichPolicyMatchFields = enrichResolution.resolvedEnrichPolicies() - .stream() - .map(ResolvedEnrichPolicy::matchField) - .collect(Collectors.toSet()); - // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy List inlinestats = parsed.collect(InlineStats.class::isInstance); Set inlinestatsAggs = new HashSet<>(); @@ -244,17 +238,25 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso referencesBuilder.get().removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name())); Set fieldNames = referencesBuilder.get().build().names(); - if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) { + if (enrichResolution.resolvedEnrichPolicies().isEmpty() == false) { + // we do not know names of the enrich policy match fields before hand. We need to resolve all fields in thisc ase + return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, wildcardJoinIndices); + } else if (fieldNames.isEmpty()) { // there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index return new PreAnalysisResult(enrichResolution, IndexResolver.INDEX_METADATA_FIELD, wildcardJoinIndices); } else { - fieldNames.addAll(subfields(fieldNames)); - fieldNames.addAll(enrichPolicyMatchFields); - fieldNames.addAll(subfields(enrichPolicyMatchFields)); - return new PreAnalysisResult(enrichResolution, fieldNames, wildcardJoinIndices); + return new PreAnalysisResult( + enrichResolution, + fieldNames.stream().flatMap(FieldNameUtils::withSubfields).collect(toSet()), + wildcardJoinIndices + ); } } + private static Stream withSubfields(String name) { + return name.endsWith(WILDCARD) ? Stream.of(name) : Stream.of(name, name + ".*"); + } + /** * Indicates whether the given plan gives an exact list of fields that we need to collect from field_caps. */ @@ -297,8 +299,4 @@ private static boolean matchByName(Attribute attr, String other, boolean skipIfP var name = attr.name(); return isPattern ? Regex.simpleMatch(name, other) : name.equals(other); } - - private static Set subfields(Set names) { - return names.stream().filter(name -> name.endsWith(WILDCARD) == false).map(name -> name + ".*").collect(Collectors.toSet()); - } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java index 10cde6f9297ec..06afc31fe277a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java @@ -1489,15 +1489,10 @@ public void testCountStar() { } public void testEnrichOnDefaultFieldWithKeep() { - assertFieldNames( - """ - from employees - | enrich languages_policy - | keep emp_no""", - enrichResolutionWith("language_name"), - Set.of("emp_no", "emp_no.*", "language_name", "language_name.*"), - Set.of() - ); + assertFieldNames(""" + from employees + | enrich languages_policy + | keep emp_no""", enrichResolutionWith("language_name"), Set.of("*"), Set.of()); } public void testDissectOverwriteName() { From 3c1ce1fb0acc0ecb3d8bd99602973dc60fb9d1ba Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 8 Sep 2025 12:09:50 +0200 Subject: [PATCH 02/14] resolve enriches after main indices --- .../xpack/esql/analysis/EnrichResolution.java | 10 +-- .../xpack/esql/session/EsqlSession.java | 62 ++++++++----------- .../xpack/esql/session/FieldNameUtils.java | 21 +++---- 3 files changed, 35 insertions(+), 58 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java index 7fb279f18b1dc..be3a0b08336e6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java @@ -11,7 +11,6 @@ import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import java.util.Collection; import java.util.Map; /** @@ -28,11 +27,6 @@ public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mod return resolvedPolicies.get(new Key(policyName, mode)); } - public Collection resolvedEnrichPolicies() { - return resolvedPolicies.values(); - - } - public String getError(String policyName, Enrich.Mode mode) { final String error = errors.get(new Key(policyName, mode)); if (error != null) { @@ -51,7 +45,5 @@ public void addError(String policyName, Enrich.Mode mode, String reason) { errors.putIfAbsent(new Key(policyName, mode), reason); } - private record Key(String policyName, Enrich.Mode mode) { - - } + private record Key(String policyName, Enrich.Mode mode) {} } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index a8078feec4f8b..410f5f3add071 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -373,14 +373,18 @@ public void analyzedPlan( } var preAnalysis = preAnalyzer.preAnalyze(parsed); + var result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false); + EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.index(), executionInfo); - SubscribableListener. // - newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l)) - .andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution)) - .andThen((l, r) -> resolveInferences(parsed, r, l)) + SubscribableListener.newForked(l -> { + inferenceService.inferenceResolver().resolveInferenceIds(parsed, l.map(result::withInferenceResolution)); + }) .andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l)) .andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l)) + .andThen((l, r) -> { + enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution)); + }) .andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l)) .addListener(logicalPlanListener); } @@ -637,7 +641,7 @@ private void preAnalyzeMainIndices( if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse( - result.withIndexResolution(IndexResolution.valid(new EsIndex(preAnalysis.index().indexPattern(), Map.of(), Map.of()))) + result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.index().indexPattern(), Map.of(), Map.of()))) ); } else { boolean includeAllDimensions = false; @@ -658,13 +662,13 @@ private void preAnalyzeMainIndices( requestFilter, includeAllDimensions, listener.delegateFailure((l, indexResolution) -> { - l.onResponse(result.withIndexResolution(indexResolution)); + l.onResponse(result.withIndices(indexResolution)); }) ); } } else { // occurs when dealing with local relations (row a = 1) - listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]"))); + listener.onResponse(result.withIndices(IndexResolution.invalid("[none specified]"))); } } @@ -730,10 +734,6 @@ private void analyzeWithRetry( } } - private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener l) { - inferenceService.inferenceResolver().resolveInferenceIds(plan, l.map(preAnalysisResult::withInferenceResolution)); - } - private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) { PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan); physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> { @@ -793,43 +793,33 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { } public record PreAnalysisResult( + Set fieldNames, + Set wildcardJoinIndices, IndexResolution indices, Map lookupIndices, EnrichResolution enrichResolution, - Set fieldNames, - Set wildcardJoinIndices, InferenceResolution inferenceResolution ) { - public PreAnalysisResult(EnrichResolution enrichResolution, Set fieldNames, Set wildcardJoinIndices) { - this(null, new HashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY); + public PreAnalysisResult(Set fieldNames, Set wildcardJoinIndices) { + this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY); } - PreAnalysisResult withInferenceResolution(InferenceResolution newInferenceResolution) { - return new PreAnalysisResult( - indices(), - lookupIndices(), - enrichResolution(), - fieldNames(), - wildcardJoinIndices(), - newInferenceResolution - ); + PreAnalysisResult withIndices(IndexResolution indices) { + return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); } - PreAnalysisResult withIndexResolution(IndexResolution newIndexResolution) { - return new PreAnalysisResult( - newIndexResolution, - lookupIndices(), - enrichResolution(), - fieldNames(), - wildcardJoinIndices(), - inferenceResolution() - ); + PreAnalysisResult addLookupIndexResolution(String index, IndexResolution indexResolution) { + lookupIndices.put(index, indexResolution); + return this; } - PreAnalysisResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) { - lookupIndices.put(index, newIndexResolution); - return this; + PreAnalysisResult withEnrichResolution(EnrichResolution enrichResolution) { + return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); + } + + PreAnalysisResult withInferenceResolution(InferenceResolution inferenceResolution) { + return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java index f3389a7a7bfa9..e98676f7e5a24 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.esql.session; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; @@ -60,7 +59,7 @@ public class FieldNameUtils { private static final Set FUNCTIONS_REQUIRING_TIMESTAMP = Set.of(TBucket.NAME.toLowerCase(Locale.ROOT)); - public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution) { + public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean hasEnriches) { // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy List inlinestats = parsed.collect(InlineStats.class::isInstance); @@ -72,7 +71,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso if (false == parsed.anyMatch(p -> shouldCollectReferencedFields(p, inlinestatsAggs))) { // no explicit columns selection, for example "from employees" // also, inlinestats only adds columns to the existent output, its Aggregate shouldn't interfere with potentially using "*" - return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of()); + return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of()); } Holder projectAll = new Holder<>(false); @@ -84,7 +83,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso }); if (projectAll.get()) { - return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of()); + return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of()); } var referencesBuilder = new Holder<>(AttributeSet.builder()); @@ -226,7 +225,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso parsed.forEachDownMayReturnEarly(forEachDownProcessor.get()); if (projectAll.get()) { - return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of()); + return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of()); } // Add JOIN ON column references afterward to avoid Alias removal @@ -238,18 +237,14 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso referencesBuilder.get().removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name())); Set fieldNames = referencesBuilder.get().build().names(); - if (enrichResolution.resolvedEnrichPolicies().isEmpty() == false) { + if (hasEnriches) { // we do not know names of the enrich policy match fields before hand. We need to resolve all fields in thisc ase - return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, wildcardJoinIndices); + return new PreAnalysisResult(IndexResolver.ALL_FIELDS, wildcardJoinIndices); } else if (fieldNames.isEmpty()) { // there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index - return new PreAnalysisResult(enrichResolution, IndexResolver.INDEX_METADATA_FIELD, wildcardJoinIndices); + return new PreAnalysisResult(IndexResolver.INDEX_METADATA_FIELD, wildcardJoinIndices); } else { - return new PreAnalysisResult( - enrichResolution, - fieldNames.stream().flatMap(FieldNameUtils::withSubfields).collect(toSet()), - wildcardJoinIndices - ); + return new PreAnalysisResult(fieldNames.stream().flatMap(FieldNameUtils::withSubfields).collect(toSet()), wildcardJoinIndices); } } From 6c37a8fbf7e379668b5e96b53b6d517b0c1be209 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 8 Sep 2025 12:11:56 +0200 Subject: [PATCH 03/14] comment data dependency --- .../elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index d9314f1a0611d..675ae825cb9de 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -121,7 +121,7 @@ public void resolvePolicies(List enriches, EsqlExecutionInfo executionIn } doResolvePolicies( - new HashSet<>(executionInfo.getClusters().keySet()), + new HashSet<>(executionInfo.getClusters().keySet()), // executionInfo.getClusters() is populated by prior main index resolution enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(), executionInfo, listener From 5582140e3e35478f58299eb29bfa5b7f49865980 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 8 Sep 2025 13:28:07 +0200 Subject: [PATCH 04/14] fix unit test --- .../esql/session/FieldNameUtilsTests.java | 27 +++++-------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java index 06afc31fe277a..f03372872a32e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java @@ -10,14 +10,9 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; -import org.elasticsearch.xpack.esql.analysis.EnrichResolution; -import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; import org.elasticsearch.xpack.esql.parser.EsqlParser; import org.elasticsearch.xpack.esql.parser.ParsingException; -import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import java.util.List; -import java.util.Map; import java.util.Set; import static org.elasticsearch.xpack.esql.session.IndexResolver.ALL_FIELDS; @@ -1492,7 +1487,7 @@ public void testEnrichOnDefaultFieldWithKeep() { assertFieldNames(""" from employees | enrich languages_policy - | keep emp_no""", enrichResolutionWith("language_name"), Set.of("*"), Set.of()); + | keep emp_no""", true, Set.of("*"), Set.of()); } public void testDissectOverwriteName() { @@ -1596,7 +1591,7 @@ public void testAvoidGrokAttributesRemoval5() { public void testEnrichOnDefaultField() { assertFieldNames(""" from employees - | enrich languages_policy""", enrichResolutionWith("language_name"), ALL_FIELDS, Set.of()); + | enrich languages_policy""", true, ALL_FIELDS, Set.of()); } public void testMetrics() { @@ -3049,26 +3044,16 @@ public void testStatsChainingWithTimestampCarriedForwardAsByKey() { } private void assertFieldNames(String query, Set expected) { - assertFieldNames(query, new EnrichResolution(), expected, Set.of()); + assertFieldNames(query, false, expected, Set.of()); } private void assertFieldNames(String query, Set expected, Set wildCardIndices) { - assertFieldNames(query, new EnrichResolution(), expected, wildCardIndices); + assertFieldNames(query, false, expected, wildCardIndices); } - private void assertFieldNames(String query, EnrichResolution enrichResolution, Set expected, Set wildCardIndices) { - var preAnalysisResult = FieldNameUtils.resolveFieldNames(parser.createStatement(query, EsqlTestUtils.TEST_CFG), enrichResolution); + private void assertFieldNames(String query, boolean hasEnriches, Set expected, Set wildCardIndices) { + var preAnalysisResult = FieldNameUtils.resolveFieldNames(parser.createStatement(query, EsqlTestUtils.TEST_CFG), hasEnriches); assertThat("Query-wide field names", preAnalysisResult.fieldNames(), equalTo(expected)); assertThat("Lookup Indices that expect wildcard lookups", preAnalysisResult.wildcardJoinIndices(), equalTo(wildCardIndices)); } - - private static EnrichResolution enrichResolutionWith(String enrichPolicyMatchField) { - var enrichResolution = new EnrichResolution(); - enrichResolution.addResolvedPolicy( - "policy", - Enrich.Mode.ANY, - new ResolvedEnrichPolicy(enrichPolicyMatchField, null, List.of(), Map.of(), Map.of()) - ); - return enrichResolution; - } } From 1362fa98266dd975184398a9109b9b466bf0dee7 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 18 Sep 2025 11:18:35 +0200 Subject: [PATCH 05/14] connect to only running clusters when resolving enrich --- .../xpack/esql/action/EsqlExecutionInfo.java | 4 + .../esql/enrich/EnrichPolicyResolver.java | 8 +- .../xpack/esql/session/EsqlSession.java | 75 ++++++------------- 3 files changed, 30 insertions(+), 57 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java index 5fdf94474fa80..32d9b583ff7d7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java @@ -319,6 +319,10 @@ public Stream getClusterStates(Cluster.Status status) { return clusterInfo.values().stream().filter(cluster -> cluster.getStatus() == status); } + public Stream getRunningClusterAliases() { + return getClusterStates(Cluster.Status.RUNNING).map(Cluster::getClusterAlias); + } + @Override public String toString() { return "EsqlExecutionInfo{" diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index 675ae825cb9de..be06a4b1b5ae7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -53,12 +53,12 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static java.util.stream.Collectors.toSet; import static org.elasticsearch.xpack.esql.expression.Foldables.stringLiteralValueOf; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards; @@ -121,7 +121,7 @@ public void resolvePolicies(List enriches, EsqlExecutionInfo executionIn } doResolvePolicies( - new HashSet<>(executionInfo.getClusters().keySet()), // executionInfo.getClusters() is populated by prior main index resolution + executionInfo.getRunningClusterAliases().collect(toSet()), enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(), executionInfo, listener @@ -310,7 +310,7 @@ private void lookupPolicies( Set remotePolicies = unresolvedPolicies.stream() .filter(u -> u.mode != Enrich.Mode.COORDINATOR) .map(u -> u.name) - .collect(Collectors.toSet()); + .collect(toSet()); // remote clusters if (remotePolicies.isEmpty() == false) { for (String cluster : remoteClusters) { @@ -342,7 +342,7 @@ public void onFailure(Exception e) { Set localPolicies = unresolvedPolicies.stream() .filter(u -> includeLocal || u.mode != Enrich.Mode.REMOTE) .map(u -> u.name) - .collect(Collectors.toSet()); + .collect(toSet()); if (localPolicies.isEmpty() == false) { transportService.sendRequest( transportService.getLocalNode(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 8a0e756b14031..18ef3ad729a73 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -85,7 +85,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.stream.Collectors.joining; import static java.util.stream.Collectors.toSet; @@ -564,10 +563,8 @@ private PreAnalysisResult receiveLookupIndexResolution( }); // These are clusters that are still in the running, we need to have the index on all of them - Stream clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING); // Verify that all active clusters have the lookup index resolved - clusters.forEach(cluster -> { - String clusterAlias = cluster.getClusterAlias(); + executionInfo.getRunningClusterAliases().forEach(clusterAlias -> { if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) { // Missing cluster resolution skipClusterOrError(clusterAlias, executionInfo, findFailure(lookupIndexResolution.failures(), index, clusterAlias)); @@ -626,9 +623,7 @@ private IndexResolution checkSingleIndex( * concrete indices aliased to the same index name. */ private void validateRemoteVersions(EsqlExecutionInfo executionInfo) { - Stream clusters = executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING); - clusters.forEach(cluster -> { - String clusterAlias = cluster.getClusterAlias(); + executionInfo.getRunningClusterAliases().forEach(clusterAlias -> { if (clusterAlias.equals(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY) == false) { // No need to check local, obviously var connection = remoteClusterService.getConnection(clusterAlias); @@ -667,27 +662,25 @@ private void preAnalyzeMainIndices( result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.index().indexPattern(), Map.of(), Map.of()))) ); } else { - boolean includeAllDimensions = false; - // call the EsqlResolveFieldsAction (field-caps) to resolve indices and get field types - if (preAnalysis.indexMode() == IndexMode.TIME_SERIES) { - includeAllDimensions = true; - // TODO: Maybe if no indices are returned, retry without index mode and provide a clearer error message. - var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()); - if (requestFilter != null) { - requestFilter = new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter); - } else { - requestFilter = indexModeFilter; + indexResolver.resolveAsMergedMapping(indexExpressionToResolve, result.fieldNames, switch (preAnalysis.indexMode()) { + case IndexMode.TIME_SERIES -> { + var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()); + yield requestFilter != null + ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) + : indexModeFilter; } - } - indexResolver.resolveAsMergedMapping( - indexExpressionToResolve, - result.fieldNames, - requestFilter, - includeAllDimensions, - listener.delegateFailure((l, indexResolution) -> { - l.onResponse(result.withIndices(indexResolution)); - }) - ); + default -> requestFilter; + }, preAnalysis.indexMode() == IndexMode.TIME_SERIES, listener.delegateFailure((l, mainIindexResolution) -> { + // the order here is tricky - if the cluster has been filtered and later became unavailable, + // do we want to declare it successful or skipped? For now, unavailability takes precedence. + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, mainIindexResolution.failures()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( + executionInfo, + mainIindexResolution, + requestFilter != null + ); + l.onResponse(result.withIndices(mainIindexResolution)); + })); } } else { // occurs when dealing with local relations (row a = 1) @@ -703,29 +696,9 @@ private void analyzeWithRetry( PreAnalysisResult result, ActionListener listener ) { - if (result.indices.isValid()) { - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures()); - if (executionInfo.isCrossClusterSearch() - && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { - // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception - // to let the LogicalPlanActionListener decide how to proceed - LOGGER.debug("No more clusters to search, ending analysis stage"); - listener.onFailure(new NoClustersToSearchException()); - return; - } - } - var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter"; - LOGGER.debug("Analyzing the plan ({})", description); - try { - if (result.indices.isValid() || requestFilter != null) { - // We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report - // when the resolution result is not valid for a different reason. - if (executionInfo.clusterInfo.isEmpty() == false) { - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, result.indices, requestFilter != null); - } - } + LOGGER.debug("Analyzing the plan ({})", description); LogicalPlan plan = analyzedPlan(parsed, result, executionInfo); LOGGER.debug("Analyzed plan ({}):\n{}", description, plan); // the analysis succeeded from the first attempt, irrespective if it had a filter or not, just continue with the planning @@ -738,12 +711,8 @@ private void analyzeWithRetry( } else { // retrying and make the index resolution work without any index filtering. preAnalyzeMainIndices(preAnalysis, executionInfo, result, null, listener.delegateFailure((l, r) -> { - LOGGER.debug("Analyzing the plan (second attempt, without filter)"); try { - // the order here is tricky - if the cluster has been filtered and later became unavailable, - // do we want to declare it successful or skipped? For now, unavailability takes precedence. - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures()); - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false); + LOGGER.debug("Analyzing the plan (second attempt, without filter)"); LogicalPlan plan = analyzedPlan(parsed, r, executionInfo); LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan); l.onResponse(plan); From 4613736234010f93d6a05ffdb4c6e84ae1af6fa2 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 18 Sep 2025 11:36:59 +0200 Subject: [PATCH 06/14] fix exception propagation --- .../xpack/esql/session/EsqlSession.java | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index 18ef3ad729a73..b85713f90fe2b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -662,25 +662,31 @@ private void preAnalyzeMainIndices( result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.index().indexPattern(), Map.of(), Map.of()))) ); } else { - indexResolver.resolveAsMergedMapping(indexExpressionToResolve, result.fieldNames, switch (preAnalysis.indexMode()) { - case IndexMode.TIME_SERIES -> { - var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()); - yield requestFilter != null - ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) - : indexModeFilter; - } - default -> requestFilter; - }, preAnalysis.indexMode() == IndexMode.TIME_SERIES, listener.delegateFailure((l, mainIindexResolution) -> { - // the order here is tricky - if the cluster has been filtered and later became unavailable, - // do we want to declare it successful or skipped? For now, unavailability takes precedence. - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, mainIindexResolution.failures()); - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( - executionInfo, - mainIindexResolution, - requestFilter != null - ); - l.onResponse(result.withIndices(mainIindexResolution)); - })); + indexResolver.resolveAsMergedMapping( + indexExpressionToResolve, // + result.fieldNames, + switch (preAnalysis.indexMode()) { + case IndexMode.TIME_SERIES -> { + var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName()); + yield requestFilter != null + ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) + : indexModeFilter; + } + default -> requestFilter; + }, + preAnalysis.indexMode() == IndexMode.TIME_SERIES, + listener.delegateFailureAndWrap((l, mainIndexResolution) -> { + // the order here is tricky - if the cluster has been filtered and later became unavailable, + // do we want to declare it successful or skipped? For now, unavailability takes precedence. + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, mainIndexResolution.failures()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( + executionInfo, + mainIndexResolution, + requestFilter != null + ); + l.onResponse(result.withIndices(mainIndexResolution)); + }) + ); } } else { // occurs when dealing with local relations (row a = 1) From d4446bd6c9c343f3e8a57b483c1b38c82cc9821e Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 18 Sep 2025 14:31:06 +0200 Subject: [PATCH 07/14] fix enrich with row --- .../elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index be06a4b1b5ae7..49275f90193a1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -53,6 +53,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -121,7 +122,7 @@ public void resolvePolicies(List enriches, EsqlExecutionInfo executionIn } doResolvePolicies( - executionInfo.getRunningClusterAliases().collect(toSet()), + executionInfo.clusterInfo.isEmpty() ? new HashSet<>() : executionInfo.getRunningClusterAliases().collect(toSet()), enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(), executionInfo, listener From 43171a6d428ee0b7db6a5ebbc2e7164314f0643a Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 18 Sep 2025 15:43:24 +0200 Subject: [PATCH 08/14] only update if resolution is valid --- .../xpack/esql/session/EsqlSession.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b85713f90fe2b..221088ab04c9e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -678,12 +678,14 @@ private void preAnalyzeMainIndices( listener.delegateFailureAndWrap((l, mainIndexResolution) -> { // the order here is tricky - if the cluster has been filtered and later became unavailable, // do we want to declare it successful or skipped? For now, unavailability takes precedence. - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, mainIndexResolution.failures()); - EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( - executionInfo, - mainIndexResolution, - requestFilter != null - ); + if (mainIndexResolution.isValid()) { + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, mainIndexResolution.failures()); + EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices( + executionInfo, + mainIndexResolution, + requestFilter != null + ); + } l.onResponse(result.withIndices(mainIndexResolution)); }) ); From fe40176a1f3b351af6dcc4c1465887d7259cd106 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Fri, 19 Sep 2025 10:25:53 +0200 Subject: [PATCH 09/14] fix merge --- .../xpack/esql/session/EsqlSession.java | 64 ++++++++----------- 1 file changed, 26 insertions(+), 38 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b62da0bce2564..bf7e30e008220 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -394,14 +394,18 @@ public void analyzedPlan( } var preAnalysis = preAnalyzer.preAnalyze(parsed); + var result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false); + EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo); - SubscribableListener. // - newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l)) - .andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution)) - .andThen((l, r) -> resolveInferences(parsed, r, l)) + SubscribableListener.newForked(l -> { + inferenceService.inferenceResolver().resolveInferenceIds(parsed, l.map(result::withInferenceResolution)); + }) .andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l)) .andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l)) + .andThen((l, r) -> { + enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution)); + }) .andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l)) .addListener(logicalPlanListener); } @@ -638,9 +642,7 @@ private void preAnalyzeMainIndices( if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse( - result.withIndexResolution( - IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of())) - ) + result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))) ); } else { indexResolver.resolveAsMergedMapping( @@ -658,13 +660,13 @@ private void preAnalyzeMainIndices( }, preAnalysis.indexMode() == IndexMode.TIME_SERIES, listener.delegateFailure((l, indexResolution) -> { - l.onResponse(result.withIndexResolution(indexResolution)); + l.onResponse(result.withIndices(indexResolution)); }) ); } } else { // occurs when dealing with local relations (row a = 1) - listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]"))); + listener.onResponse(result.withIndices(IndexResolution.invalid("[none specified]"))); } } @@ -730,10 +732,6 @@ private void analyzeWithRetry( } } - private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener l) { - inferenceService.inferenceResolver().resolveInferenceIds(plan, l.map(preAnalysisResult::withInferenceResolution)); - } - private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) { PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan); physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> { @@ -793,43 +791,33 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { } public record PreAnalysisResult( + Set fieldNames, + Set wildcardJoinIndices, IndexResolution indices, Map lookupIndices, EnrichResolution enrichResolution, - Set fieldNames, - Set wildcardJoinIndices, InferenceResolution inferenceResolution ) { - public PreAnalysisResult(EnrichResolution enrichResolution, Set fieldNames, Set wildcardJoinIndices) { - this(null, new HashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY); + public PreAnalysisResult(Set fieldNames, Set wildcardJoinIndices) { + this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY); } - PreAnalysisResult withInferenceResolution(InferenceResolution newInferenceResolution) { - return new PreAnalysisResult( - indices(), - lookupIndices(), - enrichResolution(), - fieldNames(), - wildcardJoinIndices(), - newInferenceResolution - ); + PreAnalysisResult withIndices(IndexResolution indices) { + return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); } - PreAnalysisResult withIndexResolution(IndexResolution newIndexResolution) { - return new PreAnalysisResult( - newIndexResolution, - lookupIndices(), - enrichResolution(), - fieldNames(), - wildcardJoinIndices(), - inferenceResolution() - ); + PreAnalysisResult addLookupIndexResolution(String index, IndexResolution indexResolution) { + lookupIndices.put(index, indexResolution); + return this; } - PreAnalysisResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) { - lookupIndices.put(index, newIndexResolution); - return this; + PreAnalysisResult withEnrichResolution(EnrichResolution enrichResolution) { + return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); + } + + PreAnalysisResult withInferenceResolution(InferenceResolution inferenceResolution) { + return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); } } } From f87d225ae4d24aca52205b421d1fdd694c09c8c5 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Fri, 19 Sep 2025 10:38:48 +0200 Subject: [PATCH 10/14] reorder --- .../xpack/esql/session/EsqlSession.java | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index bf7e30e008220..eb2f5e1d10b4f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -400,8 +400,17 @@ public void analyzedPlan( SubscribableListener.newForked(l -> { inferenceService.inferenceResolver().resolveInferenceIds(parsed, l.map(result::withInferenceResolution)); - }) + }) // .andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l)) + .andThenApply(r -> { + if (r.indices.isValid() + && executionInfo.isCrossClusterSearch() + && executionInfo.getRunningClusterAliases().findAny().isEmpty()) { + LOGGER.debug("No more clusters to search, ending analysis stage"); + throw new NoClustersToSearchException(); + } + return r; + }) .andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l)) .andThen((l, r) -> { enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution)); @@ -659,7 +668,8 @@ private void preAnalyzeMainIndices( default -> requestFilter; }, preAnalysis.indexMode() == IndexMode.TIME_SERIES, - listener.delegateFailure((l, indexResolution) -> { + listener.delegateFailureAndWrap((l, indexResolution) -> { + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); l.onResponse(result.withIndices(indexResolution)); }) ); @@ -678,21 +688,8 @@ private void analyzeWithRetry( PreAnalysisResult result, ActionListener listener ) { - if (result.indices.isValid()) { - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures()); - if (executionInfo.isCrossClusterSearch() - && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { - // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception - // to let the LogicalPlanActionListener decide how to proceed - LOGGER.debug("No more clusters to search, ending analysis stage"); - listener.onFailure(new NoClustersToSearchException()); - return; - } - } - var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter"; LOGGER.debug("Analyzing the plan ({})", description); - try { if (result.indices.isValid() || requestFilter != null) { // We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report @@ -717,7 +714,6 @@ private void analyzeWithRetry( try { // the order here is tricky - if the cluster has been filtered and later became unavailable, // do we want to declare it successful or skipped? For now, unavailability takes precedence. - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures()); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false); LogicalPlan plan = analyzedPlan(parsed, r, executionInfo); LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan); From 5489c5b9582928d6f2aab2206ffa5df636040211 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Fri, 19 Sep 2025 11:52:26 +0200 Subject: [PATCH 11/14] postpone inferenceService resolution --- .../org/elasticsearch/xpack/esql/session/EsqlSession.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index eb2f5e1d10b4f..fa14952bf1b50 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -398,10 +398,7 @@ public void analyzedPlan( EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo); - SubscribableListener.newForked(l -> { - inferenceService.inferenceResolver().resolveInferenceIds(parsed, l.map(result::withInferenceResolution)); - }) // - .andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l)) + SubscribableListener.newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l)) .andThenApply(r -> { if (r.indices.isValid() && executionInfo.isCrossClusterSearch() @@ -415,6 +412,9 @@ public void analyzedPlan( .andThen((l, r) -> { enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution)); }) + .andThen((l, r) -> { + inferenceService.inferenceResolver().resolveInferenceIds(parsed, l.map(r::withInferenceResolution)); + }) .andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l)) .addListener(logicalPlanListener); } From 146e3ecd0c6f6b53077d90263f55c121ef33f3ce Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Fri, 19 Sep 2025 15:00:19 +0200 Subject: [PATCH 12/14] add required capabilities --- .../esql/qa/testFixtures/src/main/resources/enrich.csv-spec | 1 + .../esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec | 1 + 2 files changed, 2 insertions(+) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec index c263ce4326865..bf625815259ae 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec @@ -650,6 +650,7 @@ IDR | Indore | POINT(75.8472 22.7167) | India | POINT(75.8 fieldsInOtherIndicesBug required_capability: enrich_load required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout +required_capability: dense_vector_field_type from * | keep author.keyword, book_no, scalerank, street, bytes_in, @timestamp, abbrev, city_location, distance, description, birth_date, language_code, intersects, client_ip, event_duration, version diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 2aa88f47a0a96..9ca2fc78e2cd6 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1673,6 +1673,7 @@ null |1952-02-27T00:00:00.000Z enrichLookupStatsBug required_capability: join_lookup_v12 required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout +required_capability: dense_vector_field_type from * | enrich languages_policy on cluster From 4957da30240ae49de5889c047eef8f71ae919b04 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Mon, 22 Sep 2025 11:45:54 +0200 Subject: [PATCH 13/14] fix typo --- .../org/elasticsearch/xpack/esql/session/FieldNameUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java index e98676f7e5a24..9bd498d489c84 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java @@ -238,7 +238,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean ha Set fieldNames = referencesBuilder.get().build().names(); if (hasEnriches) { - // we do not know names of the enrich policy match fields before hand. We need to resolve all fields in thisc ase + // we do not know names of the enrich policy match fields beforehand. We need to resolve all fields in this case return new PreAnalysisResult(IndexResolver.ALL_FIELDS, wildcardJoinIndices); } else if (fieldNames.isEmpty()) { // there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index From b00fd5972969e61e1f9728ad46abd03154dc2679 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Tue, 23 Sep 2025 09:17:52 +0200 Subject: [PATCH 14/14] add comment to the tests --- .../esql/qa/testFixtures/src/main/resources/enrich.csv-spec | 4 ++++ .../qa/testFixtures/src/main/resources/lookup-join.csv-spec | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec index bf625815259ae..1075f474f823e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec @@ -650,6 +650,10 @@ IDR | Indore | POINT(75.8472 22.7167) | India | POINT(75.8 fieldsInOtherIndicesBug required_capability: enrich_load required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout + +// from * accidentally selects columns with dense_vector field type. +// This is not properly handled when the query is planned by newer node and executed by an older one. +// see https://github.com/elastic/elasticsearch/issues/135193 required_capability: dense_vector_field_type from * diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 9ca2fc78e2cd6..7ca77696138a3 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1673,6 +1673,10 @@ null |1952-02-27T00:00:00.000Z enrichLookupStatsBug required_capability: join_lookup_v12 required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout + +// from * accidentally selects columns with dense_vector field type. +// This is not properly handled when the query is planned by newer node and executed by an older one. +// see https://github.com/elastic/elasticsearch/issues/135193 required_capability: dense_vector_field_type from *