Skip to content

Commit 4bb963e

Browse files
authored
Enrich after main field caps (#134290)
1 parent 4d79a59 commit 4bb963e

File tree

8 files changed

+78
-119
lines changed

8 files changed

+78
-119
lines changed

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/AttributeMap.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,6 @@ private E delete(Object key) {
244244

245245
public Set<String> attributeNames() {
246246
Set<String> s = Sets.newLinkedHashSetWithExpectedSize(size());
247-
248247
for (AttributeWrapper aw : delegate.keySet()) {
249248
s.add(aw.attr.name());
250249
}

x-pack/plugin/esql/qa/testFixtures/src/main/resources/enrich.csv-spec

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -651,6 +651,11 @@ fieldsInOtherIndicesBug
651651
required_capability: enrich_load
652652
required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout
653653

654+
// from * accidentally selects columns with dense_vector field type.
655+
// This is not properly handled when the query is planned by newer node and executed by an older one.
656+
// see https://github.com/elastic/elasticsearch/issues/135193
657+
required_capability: dense_vector_field_type
658+
654659
from *
655660
| keep author.keyword, book_no, scalerank, street, bytes_in, @timestamp, abbrev, city_location, distance, description, birth_date, language_code, intersects, client_ip, event_duration, version
656661
| enrich languages_policy on author.keyword

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1674,6 +1674,11 @@ enrichLookupStatsBug
16741674
required_capability: join_lookup_v12
16751675
required_capability: fix_replace_missing_field_with_null_duplicate_name_id_in_layout
16761676

1677+
// from * accidentally selects columns with dense_vector field type.
1678+
// This is not properly handled when the query is planned by newer node and executed by an older one.
1679+
// see https://github.com/elastic/elasticsearch/issues/135193
1680+
required_capability: dense_vector_field_type
1681+
16771682
from *
16781683
| enrich languages_policy on cluster
16791684
| rename languages.byte as language_code

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/EnrichResolution.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
1212
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1313

14-
import java.util.Collection;
1514
import java.util.Map;
1615

1716
/**
@@ -28,11 +27,6 @@ public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mod
2827
return resolvedPolicies.get(new Key(policyName, mode));
2928
}
3029

31-
public Collection<ResolvedEnrichPolicy> resolvedEnrichPolicies() {
32-
return resolvedPolicies.values();
33-
34-
}
35-
3630
public String getError(String policyName, Enrich.Mode mode) {
3731
final String error = errors.get(new Key(policyName, mode));
3832
if (error != null) {
@@ -51,7 +45,5 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
5145
errors.putIfAbsent(new Key(policyName, mode), reason);
5246
}
5347

54-
private record Key(String policyName, Enrich.Mode mode) {
55-
56-
}
48+
private record Key(String policyName, Enrich.Mode mode) {}
5749
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolver.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.Set;
6060
import java.util.stream.Collectors;
6161

62+
import static java.util.stream.Collectors.toSet;
6263
import static org.elasticsearch.xpack.esql.expression.Foldables.stringLiteralValueOf;
6364
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards;
6465

@@ -121,7 +122,7 @@ public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionIn
121122
}
122123

123124
doResolvePolicies(
124-
new HashSet<>(executionInfo.getClusters().keySet()),
125+
executionInfo.clusterInfo.isEmpty() ? new HashSet<>() : executionInfo.getRunningClusterAliases().collect(toSet()),
125126
enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(),
126127
executionInfo,
127128
listener
@@ -310,7 +311,7 @@ private void lookupPolicies(
310311
Set<String> remotePolicies = unresolvedPolicies.stream()
311312
.filter(u -> u.mode != Enrich.Mode.COORDINATOR)
312313
.map(u -> u.name)
313-
.collect(Collectors.toSet());
314+
.collect(toSet());
314315
// remote clusters
315316
if (remotePolicies.isEmpty() == false) {
316317
for (String cluster : remoteClusters) {
@@ -342,7 +343,7 @@ public void onFailure(Exception e) {
342343
Set<String> localPolicies = unresolvedPolicies.stream()
343344
.filter(u -> includeLocal || u.mode != Enrich.Mode.REMOTE)
344345
.map(u -> u.name)
345-
.collect(Collectors.toSet());
346+
.collect(toSet());
346347
if (localPolicies.isEmpty() == false) {
347348
transportService.sendRequest(
348349
transportService.getLocalNode(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 38 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -394,14 +394,27 @@ public void analyzedPlan(
394394
}
395395

396396
var preAnalysis = preAnalyzer.preAnalyze(parsed);
397+
var result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false);
398+
397399
EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo);
398400

399-
SubscribableListener. //
400-
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l))
401-
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
402-
.<PreAnalysisResult>andThen((l, r) -> resolveInferences(parsed, r, l))
403-
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l))
401+
SubscribableListener.<PreAnalysisResult>newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
402+
.andThenApply(r -> {
403+
if (r.indices.isValid()
404+
&& executionInfo.isCrossClusterSearch()
405+
&& executionInfo.getRunningClusterAliases().findAny().isEmpty()) {
406+
LOGGER.debug("No more clusters to search, ending analysis stage");
407+
throw new NoClustersToSearchException();
408+
}
409+
return r;
410+
})
404411
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l))
412+
.<PreAnalysisResult>andThen((l, r) -> {
413+
enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution));
414+
})
415+
.<PreAnalysisResult>andThen((l, r) -> {
416+
inferenceService.inferenceResolver().resolveInferenceIds(parsed, l.map(r::withInferenceResolution));
417+
})
405418
.<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l))
406419
.addListener(logicalPlanListener);
407420
}
@@ -638,9 +651,7 @@ private void preAnalyzeMainIndices(
638651
if (indexExpressionToResolve.isEmpty()) {
639652
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
640653
listener.onResponse(
641-
result.withIndexResolution(
642-
IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))
643-
)
654+
result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of())))
644655
);
645656
} else {
646657
indexResolver.resolveAsMergedMapping(
@@ -657,14 +668,15 @@ private void preAnalyzeMainIndices(
657668
default -> requestFilter;
658669
},
659670
preAnalysis.indexMode() == IndexMode.TIME_SERIES,
660-
listener.delegateFailure((l, indexResolution) -> {
661-
l.onResponse(result.withIndexResolution(indexResolution));
671+
listener.delegateFailureAndWrap((l, indexResolution) -> {
672+
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures());
673+
l.onResponse(result.withIndices(indexResolution));
662674
})
663675
);
664676
}
665677
} else {
666678
// occurs when dealing with local relations (row a = 1)
667-
listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]")));
679+
listener.onResponse(result.withIndices(IndexResolution.invalid("[none specified]")));
668680
}
669681
}
670682

@@ -676,21 +688,8 @@ private void analyzeWithRetry(
676688
PreAnalysisResult result,
677689
ActionListener<LogicalPlan> listener
678690
) {
679-
if (result.indices.isValid()) {
680-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
681-
if (executionInfo.isCrossClusterSearch()
682-
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
683-
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
684-
// to let the LogicalPlanActionListener decide how to proceed
685-
LOGGER.debug("No more clusters to search, ending analysis stage");
686-
listener.onFailure(new NoClustersToSearchException());
687-
return;
688-
}
689-
}
690-
691691
var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter";
692692
LOGGER.debug("Analyzing the plan ({})", description);
693-
694693
try {
695694
if (result.indices.isValid() || requestFilter != null) {
696695
// We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report
@@ -715,7 +714,6 @@ private void analyzeWithRetry(
715714
try {
716715
// the order here is tricky - if the cluster has been filtered and later became unavailable,
717716
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
718-
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures());
719717
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false);
720718
LogicalPlan plan = analyzedPlan(parsed, r, executionInfo);
721719
LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan);
@@ -730,10 +728,6 @@ private void analyzeWithRetry(
730728
}
731729
}
732730

733-
private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener<PreAnalysisResult> l) {
734-
inferenceService.inferenceResolver().resolveInferenceIds(plan, l.map(preAnalysisResult::withInferenceResolution));
735-
}
736-
737731
private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) {
738732
PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan);
739733
physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> {
@@ -793,43 +787,33 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
793787
}
794788

795789
public record PreAnalysisResult(
790+
Set<String> fieldNames,
791+
Set<String> wildcardJoinIndices,
796792
IndexResolution indices,
797793
Map<String, IndexResolution> lookupIndices,
798794
EnrichResolution enrichResolution,
799-
Set<String> fieldNames,
800-
Set<String> wildcardJoinIndices,
801795
InferenceResolution inferenceResolution
802796
) {
803797

804-
public PreAnalysisResult(EnrichResolution enrichResolution, Set<String> fieldNames, Set<String> wildcardJoinIndices) {
805-
this(null, new HashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY);
798+
public PreAnalysisResult(Set<String> fieldNames, Set<String> wildcardJoinIndices) {
799+
this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY);
806800
}
807801

808-
PreAnalysisResult withInferenceResolution(InferenceResolution newInferenceResolution) {
809-
return new PreAnalysisResult(
810-
indices(),
811-
lookupIndices(),
812-
enrichResolution(),
813-
fieldNames(),
814-
wildcardJoinIndices(),
815-
newInferenceResolution
816-
);
802+
PreAnalysisResult withIndices(IndexResolution indices) {
803+
return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution);
817804
}
818805

819-
PreAnalysisResult withIndexResolution(IndexResolution newIndexResolution) {
820-
return new PreAnalysisResult(
821-
newIndexResolution,
822-
lookupIndices(),
823-
enrichResolution(),
824-
fieldNames(),
825-
wildcardJoinIndices(),
826-
inferenceResolution()
827-
);
806+
PreAnalysisResult addLookupIndexResolution(String index, IndexResolution indexResolution) {
807+
lookupIndices.put(index, indexResolution);
808+
return this;
828809
}
829810

830-
PreAnalysisResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) {
831-
lookupIndices.put(index, newIndexResolution);
832-
return this;
811+
PreAnalysisResult withEnrichResolution(EnrichResolution enrichResolution) {
812+
return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution);
813+
}
814+
815+
PreAnalysisResult withInferenceResolution(InferenceResolution inferenceResolution) {
816+
return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution);
833817
}
834818
}
835819
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
package org.elasticsearch.xpack.esql.session;
99

1010
import org.elasticsearch.common.regex.Regex;
11-
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
1211
import org.elasticsearch.xpack.esql.core.expression.Alias;
1312
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1413
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
@@ -20,7 +19,6 @@
2019
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
2120
import org.elasticsearch.xpack.esql.core.expression.UnresolvedStar;
2221
import org.elasticsearch.xpack.esql.core.util.Holder;
23-
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
2422
import org.elasticsearch.xpack.esql.expression.UnresolvedNamePattern;
2523
import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction;
2624
import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket;
@@ -52,21 +50,16 @@
5250
import java.util.Locale;
5351
import java.util.Set;
5452
import java.util.function.BiConsumer;
55-
import java.util.stream.Collectors;
53+
import java.util.stream.Stream;
5654

55+
import static java.util.stream.Collectors.toSet;
5756
import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD;
5857

5958
public class FieldNameUtils {
6059

6160
private static final Set<String> FUNCTIONS_REQUIRING_TIMESTAMP = Set.of(TBucket.NAME.toLowerCase(Locale.ROOT));
6261

63-
public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution) {
64-
65-
// we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API
66-
Set<String> enrichPolicyMatchFields = enrichResolution.resolvedEnrichPolicies()
67-
.stream()
68-
.map(ResolvedEnrichPolicy::matchField)
69-
.collect(Collectors.toSet());
62+
public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean hasEnriches) {
7063

7164
// get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy
7265
List<LogicalPlan> inlinestats = parsed.collect(InlineStats.class::isInstance);
@@ -78,7 +71,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
7871
if (false == parsed.anyMatch(p -> shouldCollectReferencedFields(p, inlinestatsAggs))) {
7972
// no explicit columns selection, for example "from employees"
8073
// also, inlinestats only adds columns to the existent output, its Aggregate shouldn't interfere with potentially using "*"
81-
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
74+
return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of());
8275
}
8376

8477
Holder<Boolean> projectAll = new Holder<>(false);
@@ -90,7 +83,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
9083
});
9184

9285
if (projectAll.get()) {
93-
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
86+
return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of());
9487
}
9588

9689
var referencesBuilder = new Holder<>(AttributeSet.builder());
@@ -232,7 +225,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
232225
parsed.forEachDownMayReturnEarly(forEachDownProcessor.get());
233226

234227
if (projectAll.get()) {
235-
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
228+
return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of());
236229
}
237230

238231
// Add JOIN ON column references afterward to avoid Alias removal
@@ -244,17 +237,21 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
244237
referencesBuilder.get().removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name()));
245238
Set<String> fieldNames = referencesBuilder.get().build().names();
246239

247-
if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) {
240+
if (hasEnriches) {
241+
// we do not know names of the enrich policy match fields beforehand. We need to resolve all fields in this case
242+
return new PreAnalysisResult(IndexResolver.ALL_FIELDS, wildcardJoinIndices);
243+
} else if (fieldNames.isEmpty()) {
248244
// there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index
249-
return new PreAnalysisResult(enrichResolution, IndexResolver.INDEX_METADATA_FIELD, wildcardJoinIndices);
245+
return new PreAnalysisResult(IndexResolver.INDEX_METADATA_FIELD, wildcardJoinIndices);
250246
} else {
251-
fieldNames.addAll(subfields(fieldNames));
252-
fieldNames.addAll(enrichPolicyMatchFields);
253-
fieldNames.addAll(subfields(enrichPolicyMatchFields));
254-
return new PreAnalysisResult(enrichResolution, fieldNames, wildcardJoinIndices);
247+
return new PreAnalysisResult(fieldNames.stream().flatMap(FieldNameUtils::withSubfields).collect(toSet()), wildcardJoinIndices);
255248
}
256249
}
257250

251+
private static Stream<String> withSubfields(String name) {
252+
return name.endsWith(WILDCARD) ? Stream.of(name) : Stream.of(name, name + ".*");
253+
}
254+
258255
/**
259256
* Indicates whether the given plan gives an exact list of fields that we need to collect from field_caps.
260257
*/
@@ -297,8 +294,4 @@ private static boolean matchByName(Attribute attr, String other, boolean skipIfP
297294
var name = attr.name();
298295
return isPattern ? Regex.simpleMatch(name, other) : name.equals(other);
299296
}
300-
301-
private static Set<String> subfields(Set<String> names) {
302-
return names.stream().filter(name -> name.endsWith(WILDCARD) == false).map(name -> name + ".*").collect(Collectors.toSet());
303-
}
304297
}

0 commit comments

Comments
 (0)