diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeMap.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeMap.java index 9a3e487bca3f9..473882d90dad7 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeMap.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeMap.java @@ -244,7 +244,6 @@ private E delete(Object key) { public Set attributeNames() { Set s = Sets.newLinkedHashSetWithExpectedSize(size()); - for (AttributeWrapper aw : delegate.keySet()) { s.add(aw.attr.name()); } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec index c263ce4326865..1075f474f823e 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec @@ -651,6 +651,11 @@ fieldsInOtherIndicesBug required_capability: enrich_load required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout +// from * accidentally selects columns with dense_vector field type. +// This is not properly handled when the query is planned by newer node and executed by an older one. +// see https://github.com/elastic/elasticsearch/issues/135193 +required_capability: dense_vector_field_type + from * | keep author.keyword, book_no, scalerank, street, bytes_in, @timestamp, abbrev, city_location, distance, description, birth_date, language_code, intersects, client_ip, event_duration, version | enrich languages_policy on author.keyword diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec index 2aa88f47a0a96..7ca77696138a3 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec @@ -1674,6 +1674,11 @@ enrichLookupStatsBug required_capability: join_lookup_v12 required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout +// from * accidentally selects columns with dense_vector field type. +// This is not properly handled when the query is planned by newer node and executed by an older one. +// see https://github.com/elastic/elasticsearch/issues/135193 +required_capability: dense_vector_field_type + from * | enrich languages_policy on cluster | rename languages.byte as language_code diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java index 7fb279f18b1dc..be3a0b08336e6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java @@ -11,7 +11,6 @@ import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import java.util.Collection; import java.util.Map; /** @@ -28,11 +27,6 @@ public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mod return resolvedPolicies.get(new Key(policyName, mode)); } - public Collection resolvedEnrichPolicies() { - return resolvedPolicies.values(); - - } - public String getError(String policyName, Enrich.Mode mode) { final String error = errors.get(new Key(policyName, mode)); if (error != null) { @@ -51,7 +45,5 @@ public void addError(String policyName, Enrich.Mode mode, String reason) { errors.putIfAbsent(new Key(policyName, mode), reason); } - private record Key(String policyName, Enrich.Mode mode) { - - } + private record Key(String policyName, Enrich.Mode mode) {} } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java index d9314f1a0611d..49275f90193a1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java @@ -59,6 +59,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static java.util.stream.Collectors.toSet; import static org.elasticsearch.xpack.esql.expression.Foldables.stringLiteralValueOf; import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards; @@ -121,7 +122,7 @@ public void resolvePolicies(List enriches, EsqlExecutionInfo executionIn } doResolvePolicies( - new HashSet<>(executionInfo.getClusters().keySet()), + executionInfo.clusterInfo.isEmpty() ? new HashSet<>() : executionInfo.getRunningClusterAliases().collect(toSet()), enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(), executionInfo, listener @@ -310,7 +311,7 @@ private void lookupPolicies( Set remotePolicies = unresolvedPolicies.stream() .filter(u -> u.mode != Enrich.Mode.COORDINATOR) .map(u -> u.name) - .collect(Collectors.toSet()); + .collect(toSet()); // remote clusters if (remotePolicies.isEmpty() == false) { for (String cluster : remoteClusters) { @@ -342,7 +343,7 @@ public void onFailure(Exception e) { Set localPolicies = unresolvedPolicies.stream() .filter(u -> includeLocal || u.mode != Enrich.Mode.REMOTE) .map(u -> u.name) - .collect(Collectors.toSet()); + .collect(toSet()); if (localPolicies.isEmpty() == false) { transportService.sendRequest( transportService.getLocalNode(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java index b62da0bce2564..fa14952bf1b50 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java @@ -394,14 +394,27 @@ public void analyzedPlan( } var preAnalysis = preAnalyzer.preAnalyze(parsed); + var result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false); + EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo); - SubscribableListener. // - newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l)) - .andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution)) - .andThen((l, r) -> resolveInferences(parsed, r, l)) - .andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l)) + SubscribableListener.newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l)) + .andThenApply(r -> { + if (r.indices.isValid() + && executionInfo.isCrossClusterSearch() + && executionInfo.getRunningClusterAliases().findAny().isEmpty()) { + LOGGER.debug("No more clusters to search, ending analysis stage"); + throw new NoClustersToSearchException(); + } + return r; + }) .andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l)) + .andThen((l, r) -> { + enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution)); + }) + .andThen((l, r) -> { + inferenceService.inferenceResolver().resolveInferenceIds(parsed, l.map(r::withInferenceResolution)); + }) .andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l)) .addListener(logicalPlanListener); } @@ -638,9 +651,7 @@ private void preAnalyzeMainIndices( if (indexExpressionToResolve.isEmpty()) { // if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution listener.onResponse( - result.withIndexResolution( - IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of())) - ) + result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))) ); } else { indexResolver.resolveAsMergedMapping( @@ -657,14 +668,15 @@ private void preAnalyzeMainIndices( default -> requestFilter; }, preAnalysis.indexMode() == IndexMode.TIME_SERIES, - listener.delegateFailure((l, indexResolution) -> { - l.onResponse(result.withIndexResolution(indexResolution)); + listener.delegateFailureAndWrap((l, indexResolution) -> { + EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures()); + l.onResponse(result.withIndices(indexResolution)); }) ); } } else { // occurs when dealing with local relations (row a = 1) - listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]"))); + listener.onResponse(result.withIndices(IndexResolution.invalid("[none specified]"))); } } @@ -676,21 +688,8 @@ private void analyzeWithRetry( PreAnalysisResult result, ActionListener listener ) { - if (result.indices.isValid()) { - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures()); - if (executionInfo.isCrossClusterSearch() - && executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) { - // for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception - // to let the LogicalPlanActionListener decide how to proceed - LOGGER.debug("No more clusters to search, ending analysis stage"); - listener.onFailure(new NoClustersToSearchException()); - return; - } - } - var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter"; LOGGER.debug("Analyzing the plan ({})", description); - try { if (result.indices.isValid() || requestFilter != null) { // We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report @@ -715,7 +714,6 @@ private void analyzeWithRetry( try { // the order here is tricky - if the cluster has been filtered and later became unavailable, // do we want to declare it successful or skipped? For now, unavailability takes precedence. - EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures()); EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false); LogicalPlan plan = analyzedPlan(parsed, r, executionInfo); LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan); @@ -730,10 +728,6 @@ private void analyzeWithRetry( } } - private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener l) { - inferenceService.inferenceResolver().resolveInferenceIds(plan, l.map(preAnalysisResult::withInferenceResolution)); - } - private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) { PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan); physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> { @@ -793,43 +787,33 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) { } public record PreAnalysisResult( + Set fieldNames, + Set wildcardJoinIndices, IndexResolution indices, Map lookupIndices, EnrichResolution enrichResolution, - Set fieldNames, - Set wildcardJoinIndices, InferenceResolution inferenceResolution ) { - public PreAnalysisResult(EnrichResolution enrichResolution, Set fieldNames, Set wildcardJoinIndices) { - this(null, new HashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY); + public PreAnalysisResult(Set fieldNames, Set wildcardJoinIndices) { + this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY); } - PreAnalysisResult withInferenceResolution(InferenceResolution newInferenceResolution) { - return new PreAnalysisResult( - indices(), - lookupIndices(), - enrichResolution(), - fieldNames(), - wildcardJoinIndices(), - newInferenceResolution - ); + PreAnalysisResult withIndices(IndexResolution indices) { + return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); } - PreAnalysisResult withIndexResolution(IndexResolution newIndexResolution) { - return new PreAnalysisResult( - newIndexResolution, - lookupIndices(), - enrichResolution(), - fieldNames(), - wildcardJoinIndices(), - inferenceResolution() - ); + PreAnalysisResult addLookupIndexResolution(String index, IndexResolution indexResolution) { + lookupIndices.put(index, indexResolution); + return this; } - PreAnalysisResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) { - lookupIndices.put(index, newIndexResolution); - return this; + PreAnalysisResult withEnrichResolution(EnrichResolution enrichResolution) { + return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); + } + + PreAnalysisResult withInferenceResolution(InferenceResolution inferenceResolution) { + return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution); } } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java index c8f948303e22b..9bd498d489c84 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.esql.session; import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.xpack.esql.analysis.EnrichResolution; import org.elasticsearch.xpack.esql.core.expression.Alias; import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.AttributeSet; @@ -20,7 +19,6 @@ import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute; import org.elasticsearch.xpack.esql.core.expression.UnresolvedStar; import org.elasticsearch.xpack.esql.core.util.Holder; -import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; import org.elasticsearch.xpack.esql.expression.UnresolvedNamePattern; import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction; import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket; @@ -52,21 +50,16 @@ import java.util.Locale; import java.util.Set; import java.util.function.BiConsumer; -import java.util.stream.Collectors; +import java.util.stream.Stream; +import static java.util.stream.Collectors.toSet; import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD; public class FieldNameUtils { private static final Set FUNCTIONS_REQUIRING_TIMESTAMP = Set.of(TBucket.NAME.toLowerCase(Locale.ROOT)); - public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution) { - - // we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API - Set enrichPolicyMatchFields = enrichResolution.resolvedEnrichPolicies() - .stream() - .map(ResolvedEnrichPolicy::matchField) - .collect(Collectors.toSet()); + public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean hasEnriches) { // get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy List inlinestats = parsed.collect(InlineStats.class::isInstance); @@ -78,7 +71,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso if (false == parsed.anyMatch(p -> shouldCollectReferencedFields(p, inlinestatsAggs))) { // no explicit columns selection, for example "from employees" // also, inlinestats only adds columns to the existent output, its Aggregate shouldn't interfere with potentially using "*" - return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of()); + return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of()); } Holder projectAll = new Holder<>(false); @@ -90,7 +83,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso }); if (projectAll.get()) { - return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of()); + return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of()); } var referencesBuilder = new Holder<>(AttributeSet.builder()); @@ -232,7 +225,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso parsed.forEachDownMayReturnEarly(forEachDownProcessor.get()); if (projectAll.get()) { - return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of()); + return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of()); } // Add JOIN ON column references afterward to avoid Alias removal @@ -244,17 +237,21 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso referencesBuilder.get().removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name())); Set fieldNames = referencesBuilder.get().build().names(); - if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) { + if (hasEnriches) { + // we do not know names of the enrich policy match fields beforehand. We need to resolve all fields in this case + return new PreAnalysisResult(IndexResolver.ALL_FIELDS, wildcardJoinIndices); + } else if (fieldNames.isEmpty()) { // there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index - return new PreAnalysisResult(enrichResolution, IndexResolver.INDEX_METADATA_FIELD, wildcardJoinIndices); + return new PreAnalysisResult(IndexResolver.INDEX_METADATA_FIELD, wildcardJoinIndices); } else { - fieldNames.addAll(subfields(fieldNames)); - fieldNames.addAll(enrichPolicyMatchFields); - fieldNames.addAll(subfields(enrichPolicyMatchFields)); - return new PreAnalysisResult(enrichResolution, fieldNames, wildcardJoinIndices); + return new PreAnalysisResult(fieldNames.stream().flatMap(FieldNameUtils::withSubfields).collect(toSet()), wildcardJoinIndices); } } + private static Stream withSubfields(String name) { + return name.endsWith(WILDCARD) ? Stream.of(name) : Stream.of(name, name + ".*"); + } + /** * Indicates whether the given plan gives an exact list of fields that we need to collect from field_caps. */ @@ -297,8 +294,4 @@ private static boolean matchByName(Attribute attr, String other, boolean skipIfP var name = attr.name(); return isPattern ? Regex.simpleMatch(name, other) : name.equals(other); } - - private static Set subfields(Set names) { - return names.stream().filter(name -> name.endsWith(WILDCARD) == false).map(name -> name + ".*").collect(Collectors.toSet()); - } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java index eeb9d602bd271..b7dcad059824c 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/FieldNameUtilsTests.java @@ -10,13 +10,8 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.action.EsqlCapabilities; -import org.elasticsearch.xpack.esql.analysis.EnrichResolution; -import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy; import org.elasticsearch.xpack.esql.parser.EsqlParser; -import org.elasticsearch.xpack.esql.plan.logical.Enrich; -import java.util.List; -import java.util.Map; import java.util.Set; import static org.elasticsearch.xpack.esql.session.IndexResolver.ALL_FIELDS; @@ -1487,15 +1482,10 @@ public void testCountStar() { } public void testEnrichOnDefaultFieldWithKeep() { - assertFieldNames( - """ - from employees - | enrich languages_policy - | keep emp_no""", - enrichResolutionWith("language_name"), - Set.of("emp_no", "emp_no.*", "language_name", "language_name.*"), - Set.of() - ); + assertFieldNames(""" + from employees + | enrich languages_policy + | keep emp_no""", true, Set.of("*"), Set.of()); } public void testDissectOverwriteName() { @@ -1599,7 +1589,7 @@ public void testAvoidGrokAttributesRemoval5() { public void testEnrichOnDefaultField() { assertFieldNames(""" from employees - | enrich languages_policy""", enrichResolutionWith("language_name"), ALL_FIELDS, Set.of()); + | enrich languages_policy""", true, ALL_FIELDS, Set.of()); } public void testMetrics() { @@ -3047,26 +3037,16 @@ public void testStatsChainingWithTimestampCarriedForwardAsByKey() { } private void assertFieldNames(String query, Set expected) { - assertFieldNames(query, new EnrichResolution(), expected, Set.of()); + assertFieldNames(query, false, expected, Set.of()); } private void assertFieldNames(String query, Set expected, Set wildCardIndices) { - assertFieldNames(query, new EnrichResolution(), expected, wildCardIndices); + assertFieldNames(query, false, expected, wildCardIndices); } - private void assertFieldNames(String query, EnrichResolution enrichResolution, Set expected, Set wildCardIndices) { - var preAnalysisResult = FieldNameUtils.resolveFieldNames(parser.createStatement(query, EsqlTestUtils.TEST_CFG), enrichResolution); + private void assertFieldNames(String query, boolean hasEnriches, Set expected, Set wildCardIndices) { + var preAnalysisResult = FieldNameUtils.resolveFieldNames(parser.createStatement(query, EsqlTestUtils.TEST_CFG), hasEnriches); assertThat("Query-wide field names", preAnalysisResult.fieldNames(), equalTo(expected)); assertThat("Lookup Indices that expect wildcard lookups", preAnalysisResult.wildcardJoinIndices(), equalTo(wildCardIndices)); } - - private static EnrichResolution enrichResolutionWith(String enrichPolicyMatchField) { - var enrichResolution = new EnrichResolution(); - enrichResolution.addResolvedPolicy( - "policy", - Enrich.Mode.ANY, - new ResolvedEnrichPolicy(enrichPolicyMatchField, null, List.of(), Map.of(), Map.of()) - ); - return enrichResolution; - } }