Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ private E delete(Object key) {

public Set<String> attributeNames() {
Set<String> s = Sets.newLinkedHashSetWithExpectedSize(size());

for (AttributeWrapper aw : delegate.keySet()) {
s.add(aw.attr.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;

import java.util.Collection;
import java.util.Map;

/**
Expand All @@ -28,11 +27,6 @@ public ResolvedEnrichPolicy getResolvedPolicy(String policyName, Enrich.Mode mod
return resolvedPolicies.get(new Key(policyName, mode));
}

public Collection<ResolvedEnrichPolicy> resolvedEnrichPolicies() {
return resolvedPolicies.values();

}

public String getError(String policyName, Enrich.Mode mode) {
final String error = errors.get(new Key(policyName, mode));
if (error != null) {
Expand All @@ -51,7 +45,5 @@ public void addError(String policyName, Enrich.Mode mode, String reason) {
errors.putIfAbsent(new Key(policyName, mode), reason);
}

private record Key(String policyName, Enrich.Mode mode) {

}
private record Key(String policyName, Enrich.Mode mode) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.xpack.esql.expression.Foldables.stringLiteralValueOf;
import static org.elasticsearch.xpack.esql.session.EsqlCCSUtils.markClusterWithFinalStateAndNoShards;

Expand Down Expand Up @@ -121,7 +122,7 @@ public void resolvePolicies(List<Enrich> enriches, EsqlExecutionInfo executionIn
}

doResolvePolicies(
new HashSet<>(executionInfo.getClusters().keySet()),
executionInfo.clusterInfo.isEmpty() ? new HashSet<>() : executionInfo.getRunningClusterAliases().collect(toSet()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this check? Wouldn't the stream take care of it anyway?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executionInfo.getRunningClusterAliases() calls getClusterStates(Cluster.Status.RUNNING) that inside has the following assertion:

public Stream<Cluster> getClusterStates(Cluster.Status status) {
assert clusterInfo.isEmpty() == false : "ClusterMap in EsqlExecutionInfo must not be empty";
return clusterInfo.values().stream().filter(cluster -> cluster.getStatus() == status);
}

I assume that enforces us to perform a "in ccs" check. Please let me know if that could be done simpler

enriches.stream().map(EnrichPolicyResolver.UnresolvedPolicy::from).toList(),
executionInfo,
listener
Expand Down Expand Up @@ -310,7 +311,7 @@ private void lookupPolicies(
Set<String> remotePolicies = unresolvedPolicies.stream()
.filter(u -> u.mode != Enrich.Mode.COORDINATOR)
.map(u -> u.name)
.collect(Collectors.toSet());
.collect(toSet());
// remote clusters
if (remotePolicies.isEmpty() == false) {
for (String cluster : remoteClusters) {
Expand Down Expand Up @@ -342,7 +343,7 @@ public void onFailure(Exception e) {
Set<String> localPolicies = unresolvedPolicies.stream()
.filter(u -> includeLocal || u.mode != Enrich.Mode.REMOTE)
.map(u -> u.name)
.collect(Collectors.toSet());
.collect(toSet());
if (localPolicies.isEmpty() == false) {
transportService.sendRequest(
transportService.getLocalNode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,27 @@ public void analyzedPlan(
}

var preAnalysis = preAnalyzer.preAnalyze(parsed);
var result = FieldNameUtils.resolveFieldNames(parsed, preAnalysis.enriches().isEmpty() == false);

EsqlCCSUtils.initCrossClusterState(indicesExpressionGrouper, verifier.licenseState(), preAnalysis.indexPattern(), executionInfo);

SubscribableListener. //
<EnrichResolution>newForked(l -> enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l))
.<PreAnalysisResult>andThenApply(enrichResolution -> FieldNameUtils.resolveFieldNames(parsed, enrichResolution))
.<PreAnalysisResult>andThen((l, r) -> resolveInferences(parsed, r, l))
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeMainIndices(preAnalysis, executionInfo, r, requestFilter, l))
SubscribableListener.<PreAnalysisResult>newForked(l -> preAnalyzeMainIndices(preAnalysis, executionInfo, result, requestFilter, l))
.andThenApply(r -> {
if (r.indices.isValid()
&& executionInfo.isCrossClusterSearch()
&& executionInfo.getRunningClusterAliases().findAny().isEmpty()) {
LOGGER.debug("No more clusters to search, ending analysis stage");
throw new NoClustersToSearchException();
}
return r;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pulled to an earlier stage from analyzeWithRetry.
No need to resolve anything else (such as lookup, enrich, inference) if the query could not be executed anyways.

})
.<PreAnalysisResult>andThen((l, r) -> preAnalyzeLookupIndices(preAnalysis.lookupIndices().iterator(), r, executionInfo, l))
.<PreAnalysisResult>andThen((l, r) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: before this change, the resolution order was: enrich, inference, main, lookup
now it is: main, lookup, inference, enrich

Are there any reasons why we didn't keep the original order (apart from main of course), ie. main, enrich, inference, lookup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason actually. It could be any as long as main is first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd put inference last maybe, because enrich & lookup deal with remote clusters (and inference currently doesn't), where there's a high chance something may go wrong, and if it does, there's no need to even spending time on inference.

enrichPolicyResolver.resolvePolicies(preAnalysis.enriches(), executionInfo, l.map(r::withEnrichResolution));
})
.<PreAnalysisResult>andThen((l, r) -> {
inferenceService.inferenceResolver().resolveInferenceIds(parsed, l.map(r::withInferenceResolution));
})
.<LogicalPlan>andThen((l, r) -> analyzeWithRetry(parsed, requestFilter, preAnalysis, executionInfo, r, l))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am a bit worried about this situation:

  1. Let's say we have two clusters, Local and Remote, and a filter.
  2. The first call to preAnalyzeMainIndices filters out all Remote indices, so we consider it only for Local and do all the resolutions only for Local.
  3. Now analysis fails, and we retry it without filter. This time the list of indices comes in with both Local and Remote.
  4. Because of that, we're going to send the request to both Local and Remote. But we did not check lookup indices or policies there. It is true that Remote does not actually need to use them because of the filter, but is filter applied early enough? What if the planner on Remote needs something about some index or policy and it's not there? I'm not sure what would happen...

Copy link
Contributor Author

@idegtiarenko idegtiarenko Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a valid concern. I believe in such case we need to retry the entire analysis including index resolution.
I opened ES-12978 for this. This should not be a problem until we have flat resolution. For now list of remotes is still known beforehand.

.addListener(logicalPlanListener);
}
Expand Down Expand Up @@ -638,9 +651,7 @@ private void preAnalyzeMainIndices(
if (indexExpressionToResolve.isEmpty()) {
// if this was a pure remote CCS request (no local indices) and all remotes are offline, return an empty IndexResolution
listener.onResponse(
result.withIndexResolution(
IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of()))
)
result.withIndices(IndexResolution.valid(new EsIndex(preAnalysis.indexPattern().indexPattern(), Map.of(), Map.of())))
);
} else {
indexResolver.resolveAsMergedMapping(
Expand All @@ -657,14 +668,15 @@ private void preAnalyzeMainIndices(
default -> requestFilter;
},
preAnalysis.indexMode() == IndexMode.TIME_SERIES,
listener.delegateFailure((l, indexResolution) -> {
l.onResponse(result.withIndexResolution(indexResolution));
listener.delegateFailureAndWrap((l, indexResolution) -> {
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.failures());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pulled to an earlier stage from analyzeWithRetry.
It is required to record failures into executionInfo so that following steps (lookup and enrich resolution) are aware about failed clusters and could skip them.

l.onResponse(result.withIndices(indexResolution));
})
);
}
} else {
// occurs when dealing with local relations (row a = 1)
listener.onResponse(result.withIndexResolution(IndexResolution.invalid("[none specified]")));
listener.onResponse(result.withIndices(IndexResolution.invalid("[none specified]")));
}
}

Expand All @@ -676,21 +688,8 @@ private void analyzeWithRetry(
PreAnalysisResult result,
ActionListener<LogicalPlan> listener
) {
if (result.indices.isValid()) {
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, result.indices.failures());
if (executionInfo.isCrossClusterSearch()
&& executionInfo.getClusterStates(EsqlExecutionInfo.Cluster.Status.RUNNING).findAny().isEmpty()) {
// for a CCS, if all clusters have been marked as SKIPPED, nothing to search so send a sentinel Exception
// to let the LogicalPlanActionListener decide how to proceed
LOGGER.debug("No more clusters to search, ending analysis stage");
listener.onFailure(new NoClustersToSearchException());
return;
}
}

var description = requestFilter == null ? "the only attempt without filter" : "first attempt with filter";
LOGGER.debug("Analyzing the plan ({})", description);

try {
if (result.indices.isValid() || requestFilter != null) {
// We won't run this check with no filter and no valid indices since this may lead to false positive - missing index report
Expand All @@ -715,7 +714,6 @@ private void analyzeWithRetry(
try {
// the order here is tricky - if the cluster has been filtered and later became unavailable,
// do we want to declare it successful or skipped? For now, unavailability takes precedence.
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, r.indices.failures());
EsqlCCSUtils.updateExecutionInfoWithClustersWithNoMatchingIndices(executionInfo, r.indices, false);
LogicalPlan plan = analyzedPlan(parsed, r, executionInfo);
LOGGER.debug("Analyzed plan (second attempt without filter):\n{}", plan);
Expand All @@ -730,10 +728,6 @@ private void analyzeWithRetry(
}
}

private void resolveInferences(LogicalPlan plan, PreAnalysisResult preAnalysisResult, ActionListener<PreAnalysisResult> l) {
inferenceService.inferenceResolver().resolveInferenceIds(plan, l.map(preAnalysisResult::withInferenceResolution));
}

private PhysicalPlan logicalPlanToPhysicalPlan(LogicalPlan optimizedPlan, EsqlQueryRequest request) {
PhysicalPlan physicalPlan = optimizedPhysicalPlan(optimizedPlan);
physicalPlan = physicalPlan.transformUp(FragmentExec.class, f -> {
Expand Down Expand Up @@ -793,43 +787,33 @@ public PhysicalPlan optimizedPhysicalPlan(LogicalPlan optimizedPlan) {
}

public record PreAnalysisResult(
Set<String> fieldNames,
Set<String> wildcardJoinIndices,
IndexResolution indices,
Map<String, IndexResolution> lookupIndices,
EnrichResolution enrichResolution,
Set<String> fieldNames,
Set<String> wildcardJoinIndices,
InferenceResolution inferenceResolution
) {

public PreAnalysisResult(EnrichResolution enrichResolution, Set<String> fieldNames, Set<String> wildcardJoinIndices) {
this(null, new HashMap<>(), enrichResolution, fieldNames, wildcardJoinIndices, InferenceResolution.EMPTY);
public PreAnalysisResult(Set<String> fieldNames, Set<String> wildcardJoinIndices) {
this(fieldNames, wildcardJoinIndices, null, new HashMap<>(), null, InferenceResolution.EMPTY);
}

PreAnalysisResult withInferenceResolution(InferenceResolution newInferenceResolution) {
return new PreAnalysisResult(
indices(),
lookupIndices(),
enrichResolution(),
fieldNames(),
wildcardJoinIndices(),
newInferenceResolution
);
PreAnalysisResult withIndices(IndexResolution indices) {
return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution);
}

PreAnalysisResult withIndexResolution(IndexResolution newIndexResolution) {
return new PreAnalysisResult(
newIndexResolution,
lookupIndices(),
enrichResolution(),
fieldNames(),
wildcardJoinIndices(),
inferenceResolution()
);
PreAnalysisResult addLookupIndexResolution(String index, IndexResolution indexResolution) {
lookupIndices.put(index, indexResolution);
return this;
}

PreAnalysisResult addLookupIndexResolution(String index, IndexResolution newIndexResolution) {
lookupIndices.put(index, newIndexResolution);
return this;
PreAnalysisResult withEnrichResolution(EnrichResolution enrichResolution) {
return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution);
}

PreAnalysisResult withInferenceResolution(InferenceResolution inferenceResolution) {
return new PreAnalysisResult(fieldNames, wildcardJoinIndices, indices, lookupIndices, enrichResolution, inferenceResolution);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
Expand All @@ -20,7 +19,6 @@
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.esql.core.expression.UnresolvedStar;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.enrich.ResolvedEnrichPolicy;
import org.elasticsearch.xpack.esql.expression.UnresolvedNamePattern;
import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction;
import org.elasticsearch.xpack.esql.expression.function.grouping.TBucket;
Expand Down Expand Up @@ -52,21 +50,16 @@
import java.util.Locale;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD;

public class FieldNameUtils {

private static final Set<String> FUNCTIONS_REQUIRING_TIMESTAMP = Set.of(TBucket.NAME.toLowerCase(Locale.ROOT));

public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichResolution enrichResolution) {

// we need the match_fields names from enrich policies and THEN, with an updated list of fields, we call field_caps API
Set<String> enrichPolicyMatchFields = enrichResolution.resolvedEnrichPolicies()
.stream()
.map(ResolvedEnrichPolicy::matchField)
.collect(Collectors.toSet());
public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, boolean hasEnriches) {

// get the field names from the parsed plan combined with the ENRICH match fields from the ENRICH policy
List<LogicalPlan> inlinestats = parsed.collect(InlineStats.class::isInstance);
Expand All @@ -78,7 +71,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
if (false == parsed.anyMatch(p -> shouldCollectReferencedFields(p, inlinestatsAggs))) {
// no explicit columns selection, for example "from employees"
// also, inlinestats only adds columns to the existent output, its Aggregate shouldn't interfere with potentially using "*"
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of());
}

Holder<Boolean> projectAll = new Holder<>(false);
Expand All @@ -90,7 +83,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
});

if (projectAll.get()) {
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of());
}

var referencesBuilder = new Holder<>(AttributeSet.builder());
Expand Down Expand Up @@ -232,7 +225,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
parsed.forEachDownMayReturnEarly(forEachDownProcessor.get());

if (projectAll.get()) {
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
return new PreAnalysisResult(IndexResolver.ALL_FIELDS, Set.of());
}

// Add JOIN ON column references afterward to avoid Alias removal
Expand All @@ -244,17 +237,21 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
referencesBuilder.get().removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name()));
Set<String> fieldNames = referencesBuilder.get().build().names();

if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) {
if (hasEnriches) {
// we do not know names of the enrich policy match fields before hand. We need to resolve all fields in thisc ase
return new PreAnalysisResult(IndexResolver.ALL_FIELDS, wildcardJoinIndices);
Comment on lines 240 to 242
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could have an impact on performance, but I don't think we have alternatives.

Nit: in the comment in thisc ase -> in this case

} else if (fieldNames.isEmpty()) {
// there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index
return new PreAnalysisResult(enrichResolution, IndexResolver.INDEX_METADATA_FIELD, wildcardJoinIndices);
return new PreAnalysisResult(IndexResolver.INDEX_METADATA_FIELD, wildcardJoinIndices);
} else {
fieldNames.addAll(subfields(fieldNames));
fieldNames.addAll(enrichPolicyMatchFields);
fieldNames.addAll(subfields(enrichPolicyMatchFields));
return new PreAnalysisResult(enrichResolution, fieldNames, wildcardJoinIndices);
return new PreAnalysisResult(fieldNames.stream().flatMap(FieldNameUtils::withSubfields).collect(toSet()), wildcardJoinIndices);
}
}

private static Stream<String> withSubfields(String name) {
return name.endsWith(WILDCARD) ? Stream.of(name) : Stream.of(name, name + ".*");
}

/**
* Indicates whether the given plan gives an exact list of fields that we need to collect from field_caps.
*/
Expand Down Expand Up @@ -297,8 +294,4 @@ private static boolean matchByName(Attribute attr, String other, boolean skipIfP
var name = attr.name();
return isPattern ? Regex.simpleMatch(name, other) : name.equals(other);
}

private static Set<String> subfields(Set<String> names) {
return names.stream().filter(name -> name.endsWith(WILDCARD) == false).map(name -> name + ".*").collect(Collectors.toSet());
}
}
Loading