Skip to content

Commit 8e3ab37

Browse files
ES|QL: Fix Fork field reference tracking (#131723)
Addresses #127208. Handle fork references correctly.
1 parent 9f5908e commit 8e3ab37

File tree

4 files changed

+922
-39
lines changed

4 files changed

+922
-39
lines changed

docs/changelog/131723.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131723
2+
summary: Tests for FORK's evaluation of field names used in `field_caps` resolve calls
3+
area: Search
4+
type: bug
5+
issues: []

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88

99
import org.elasticsearch.common.io.stream.NamedWriteable;
1010
import org.elasticsearch.xpack.esql.core.QlIllegalArgumentException;
11+
import org.elasticsearch.xpack.esql.core.util.Holder;
1112

1213
import java.util.ArrayList;
1314
import java.util.BitSet;
1415
import java.util.Iterator;
1516
import java.util.List;
1617
import java.util.Objects;
18+
import java.util.function.BiConsumer;
1719
import java.util.function.Consumer;
1820
import java.util.function.Function;
1921
import java.util.function.Predicate;
@@ -74,6 +76,33 @@ public void forEachDown(Consumer<? super T> action) {
7476
}
7577
}
7678

79+
/**
80+
* Same as forEachDown, but can end the traverse early, by setting the boolean argument in the action.
81+
*/
82+
public boolean forEachDownMayReturnEarly(BiConsumer<? super T, Holder<Boolean>> action) {
83+
var breakEarly = new Holder<>(false);
84+
forEachDownMayReturnEarly(action, breakEarly);
85+
return breakEarly.get();
86+
}
87+
88+
@SuppressWarnings("unchecked")
89+
void forEachDownMayReturnEarly(BiConsumer<? super T, Holder<Boolean>> action, Holder<Boolean> breakEarly) {
90+
action.accept((T) this, breakEarly);
91+
if (breakEarly.get()) {
92+
// Early return.
93+
return;
94+
}
95+
// please do not refactor it to a for-each loop to avoid
96+
// allocating iterator that performs concurrent modification checks and extra stack frames
97+
for (int c = 0, size = children.size(); c < size; c++) {
98+
children.get(c).forEachDownMayReturnEarly(action, breakEarly);
99+
if (breakEarly.get()) {
100+
// Early return.
101+
return;
102+
}
103+
}
104+
}
105+
77106
@SuppressWarnings("unchecked")
78107
public <E extends T> void forEachDown(Class<E> typeToken, Consumer<? super E> action) {
79108
forEachDown(t -> {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/FieldNameUtils.java

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.HashSet;
5050
import java.util.List;
5151
import java.util.Set;
52+
import java.util.function.BiConsumer;
5253
import java.util.stream.Collectors;
5354

5455
import static org.elasticsearch.xpack.esql.core.util.StringUtils.WILDCARD;
@@ -76,11 +77,6 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
7677
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
7778
}
7879

79-
// TODO: Improve field resolution for FORK - right now we request all fields
80-
if (parsed.anyMatch(p -> p instanceof Fork)) {
81-
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
82-
}
83-
8480
Holder<Boolean> projectAll = new Holder<>(false);
8581
parsed.forEachExpressionDown(UnresolvedStar.class, us -> {// explicit "*" fields selection
8682
if (projectAll.get()) {
@@ -93,7 +89,7 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
9389
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
9490
}
9591

96-
var referencesBuilder = AttributeSet.builder();
92+
var referencesBuilder = new Holder<>(AttributeSet.builder());
9793
// "keep" and "drop" attributes are special whenever a wildcard is used in their name, as the wildcard can cover some
9894
// attributes ("lookup join" generated columns among others); steps like removal of Aliases should ignore fields matching the
9995
// wildcards.
@@ -110,19 +106,49 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
110106
// lookup indices where we request "*" because we may require all their fields
111107
Set<String> wildcardJoinIndices = new java.util.HashSet<>();
112108

113-
boolean[] canRemoveAliases = new boolean[] { true };
109+
var canRemoveAliases = new Holder<>(true);
110+
111+
var forEachDownProcessor = new Holder<BiConsumer<LogicalPlan, Holder<Boolean>>>();
112+
forEachDownProcessor.set((LogicalPlan p, Holder<Boolean> breakEarly) -> {// go over each plan top-down
113+
if (p instanceof Fork fork) {
114+
// Early return from forEachDown. We will iterate over the children manually and end the recursion via forEachDown early.
115+
var forkRefsResult = AttributeSet.builder();
116+
forkRefsResult.addAll(referencesBuilder.get());
117+
118+
for (var forkBranch : fork.children()) {
119+
referencesBuilder.set(AttributeSet.builder());
120+
var isNestedFork = forkBranch.forEachDownMayReturnEarly(forEachDownProcessor.get());
121+
// This assert is just for good measure. FORKs within FORKs is yet not supported.
122+
assert isNestedFork == false : "Nested FORKs are not yet supported";
123+
// This is a safety measure for fork where the list of fields returned is empty.
124+
// It can be empty for a branch that does need all the fields. For example "fork (where true) (where a is not null)"
125+
// but it can also be empty for queries where NO fields are needed from ES,
126+
// for example "fork (eval x = 1 | keep x) (eval y = 1 | keep y)" but we cannot establish this yet.
127+
if (referencesBuilder.get().isEmpty()) {
128+
projectAll.set(true);
129+
// Return early, we'll be returning all references no matter what the remainder of the query is.
130+
breakEarly.set(true);
131+
return;
132+
}
133+
forkRefsResult.addAll(referencesBuilder.get());
134+
}
135+
136+
forkRefsResult.removeIf(attr -> attr.name().equals(Fork.FORK_FIELD));
137+
referencesBuilder.set(forkRefsResult);
114138

115-
parsed.forEachDown(p -> {// go over each plan top-down
116-
if (p instanceof RegexExtract re) { // for Grok and Dissect
139+
// Return early, we've already explored all fork branches.
140+
breakEarly.set(true);
141+
return;
142+
} else if (p instanceof RegexExtract re) { // for Grok and Dissect
117143
// keep the inputs needed by Grok/Dissect
118-
referencesBuilder.addAll(re.input().references());
144+
referencesBuilder.get().addAll(re.input().references());
119145
} else if (p instanceof Enrich enrich) {
120146
AttributeSet enrichFieldRefs = Expressions.references(enrich.enrichFields());
121147
AttributeSet.Builder enrichRefs = enrichFieldRefs.combine(enrich.matchField().references()).asBuilder();
122148
// Enrich adds an EmptyAttribute if no match field is specified
123149
// The exact name of the field will be added later as part of enrichPolicyMatchFields Set
124150
enrichRefs.removeIf(attr -> attr instanceof EmptyAttribute);
125-
referencesBuilder.addAll(enrichRefs);
151+
referencesBuilder.get().addAll(enrichRefs);
126152
} else if (p instanceof LookupJoin join) {
127153
if (join.config().type() instanceof JoinTypes.UsingJoinType usingJoinType) {
128154
joinRefs.addAll(usingJoinType.columns());
@@ -135,15 +161,15 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
135161
joinRefs.addAll(keepRefs);
136162
}
137163
} else {
138-
referencesBuilder.addAll(p.references());
164+
referencesBuilder.get().addAll(p.references());
139165
if (p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES) {
140166
// METRICS aggs generally rely on @timestamp without the user having to mention it.
141-
referencesBuilder.add(new UnresolvedAttribute(ur.source(), MetadataAttribute.TIMESTAMP_FIELD));
167+
referencesBuilder.get().add(new UnresolvedAttribute(ur.source(), MetadataAttribute.TIMESTAMP_FIELD));
142168
}
143169
// special handling for UnresolvedPattern (which is not an UnresolvedAttribute)
144170
p.forEachExpression(UnresolvedNamePattern.class, up -> {
145171
var ua = new UnresolvedAttribute(up.source(), up.name());
146-
referencesBuilder.add(ua);
172+
referencesBuilder.get().add(ua);
147173
if (p instanceof Keep) {
148174
keepRefs.add(ua);
149175
} else if (p instanceof Drop) {
@@ -168,10 +194,10 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
168194
//
169195
// and ips_policy enriches the results with the same name ip field),
170196
// these aliases should be kept in the list of fields.
171-
if (canRemoveAliases[0] && p.anyMatch(FieldNameUtils::couldOverrideAliases)) {
172-
canRemoveAliases[0] = false;
197+
if (canRemoveAliases.get() && p.anyMatch(FieldNameUtils::couldOverrideAliases)) {
198+
canRemoveAliases.set(false);
173199
}
174-
if (canRemoveAliases[0]) {
200+
if (canRemoveAliases.get()) {
175201
// remove any already discovered UnresolvedAttributes that are in fact aliases defined later down in the tree
176202
// for example "from test | eval x = salary | stats max = max(x) by gender"
177203
// remove the UnresolvedAttribute "x", since that is an Alias defined in "eval"
@@ -187,21 +213,25 @@ public static PreAnalysisResult resolveFieldNames(LogicalPlan parsed, EnrichReso
187213
if (fieldNames.contains(ne.name())) {
188214
return;
189215
}
190-
referencesBuilder.removeIf(
191-
attr -> matchByName(attr, ne.name(), keepRefs.contains(attr) || dropWildcardRefs.contains(attr))
192-
);
216+
referencesBuilder.get()
217+
.removeIf(attr -> matchByName(attr, ne.name(), keepRefs.contains(attr) || dropWildcardRefs.contains(attr)));
193218
});
194219
}
195220
});
221+
parsed.forEachDownMayReturnEarly(forEachDownProcessor.get());
222+
223+
if (projectAll.get()) {
224+
return new PreAnalysisResult(enrichResolution, IndexResolver.ALL_FIELDS, Set.of());
225+
}
196226

197227
// Add JOIN ON column references afterward to avoid Alias removal
198-
referencesBuilder.addAll(joinRefs);
228+
referencesBuilder.get().addAll(joinRefs);
199229
// If any JOIN commands need wildcard field-caps calls, persist the index names
200230

201231
// remove valid metadata attributes because they will be filtered out by the IndexResolver anyway
202232
// otherwise, in some edge cases, we will fail to ask for "*" (all fields) instead
203-
referencesBuilder.removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name()));
204-
Set<String> fieldNames = referencesBuilder.build().names();
233+
referencesBuilder.get().removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name()));
234+
Set<String> fieldNames = referencesBuilder.get().build().names();
205235

206236
if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) {
207237
// there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index

0 commit comments

Comments
 (0)