Skip to content

Commit 15480b6

Browse files
Handle fork references correctly
1 parent 594d76a commit 15480b6

File tree

3 files changed

+81
-51
lines changed

3 files changed

+81
-51
lines changed

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/tree/Node.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,28 @@ public List<T> children() {
6565
}
6666

6767
@SuppressWarnings("unchecked")
68-
public void forEachDown(Consumer<? super T> action) {
69-
action.accept((T) this);
68+
public boolean forEachDownMayReturnEarly(Function<? super T, Boolean> action) {
69+
if (action.apply((T) this) == false) {
70+
// Early return.
71+
return false;
72+
}
7073
// please do not refactor it to a for-each loop to avoid
7174
// allocating iterator that performs concurrent modification checks and extra stack frames
7275
for (int c = 0, size = children.size(); c < size; c++) {
73-
children.get(c).forEachDown(action);
76+
if (children.get(c).forEachDownMayReturnEarly(action) == false) {
77+
return false;
78+
}
7479
}
80+
return true;
81+
}
82+
83+
@SuppressWarnings("unchecked")
84+
public void forEachDown(Consumer<? super T> action) {
85+
forEachDownMayReturnEarly(p -> {
86+
action.accept(p);
87+
// No early return.
88+
return true;
89+
});
7590
}
7691

7792
@SuppressWarnings("unchecked")

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 38 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
import java.util.List;
121121
import java.util.Map;
122122
import java.util.Set;
123+
import java.util.function.Function;
123124
import java.util.stream.Collectors;
124125
import java.util.stream.Stream;
125126

@@ -851,33 +852,7 @@ static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicy
851852
return result.withFieldNames(IndexResolver.ALL_FIELDS);
852853
}
853854

854-
Holder<Boolean> projectAfterFork = new Holder<>(false);
855-
Holder<Boolean> hasFork = new Holder<>(false);
856-
857-
parsed.forEachDown(plan -> {
858-
if (projectAll.get()) {
859-
return;
860-
}
861-
862-
if (hasFork.get() == false && shouldCollectReferencedFields(plan, inlinestatsAggs)) {
863-
projectAfterFork.set(true);
864-
}
865-
866-
if (plan instanceof Fork fork && projectAfterFork.get() == false) {
867-
hasFork.set(true);
868-
fork.children().forEach(child -> {
869-
if (child.anyMatch(p -> shouldCollectReferencedFields(p, inlinestatsAggs)) == false) {
870-
projectAll.set(true);
871-
}
872-
});
873-
}
874-
});
875-
876-
if (projectAll.get()) {
877-
return result.withFieldNames(IndexResolver.ALL_FIELDS);
878-
}
879-
880-
var referencesBuilder = AttributeSet.builder();
855+
var referencesBuilder = new Holder<>(AttributeSet.builder());
881856
// "keep" and "drop" attributes are special whenever a wildcard is used in their name, as the wildcard can cover some
882857
// attributes ("lookup join" generated columns among others); steps like removal of Aliases should ignore fields matching the
883858
// wildcards.
@@ -894,19 +869,35 @@ static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicy
894869
// lookup indices where we request "*" because we may require all their fields
895870
Set<String> wildcardJoinIndices = new java.util.HashSet<>();
896871

897-
boolean[] canRemoveAliases = new boolean[] { true };
872+
var canRemoveAliases = new Holder<>(true);
873+
874+
var processingLambda = new Holder<Function<LogicalPlan, Boolean>>();
875+
processingLambda.set((LogicalPlan p) -> {// go over each plan top-down
876+
if (p instanceof Fork fork) {
877+
// Early return from forEachDown. We will iterate over the children manually.
878+
var forkRefsResult = AttributeSet.builder();
879+
forkRefsResult.addAll(referencesBuilder.get());
880+
881+
for (var child : fork.children()) {
882+
referencesBuilder.set(AttributeSet.builder());
883+
var return_result = child.forEachDownMayReturnEarly(processingLambda.get());
884+
// No nested Forks for now...
885+
assert return_result;
886+
forkRefsResult.addAll(referencesBuilder.get());
887+
}
898888

899-
parsed.forEachDown(p -> {// go over each plan top-down
900-
if (p instanceof RegexExtract re) { // for Grok and Dissect
889+
referencesBuilder.set(forkRefsResult);
890+
return false;
891+
} else if (p instanceof RegexExtract re) { // for Grok and Dissect
901892
// keep the inputs needed by Grok/Dissect
902-
referencesBuilder.addAll(re.input().references());
893+
referencesBuilder.get().addAll(re.input().references());
903894
} else if (p instanceof Enrich enrich) {
904895
AttributeSet enrichFieldRefs = Expressions.references(enrich.enrichFields());
905896
AttributeSet.Builder enrichRefs = enrichFieldRefs.combine(enrich.matchField().references()).asBuilder();
906897
// Enrich adds an EmptyAttribute if no match field is specified
907898
// The exact name of the field will be added later as part of enrichPolicyMatchFields Set
908899
enrichRefs.removeIf(attr -> attr instanceof EmptyAttribute);
909-
referencesBuilder.addAll(enrichRefs);
900+
referencesBuilder.get().addAll(enrichRefs);
910901
} else if (p instanceof LookupJoin join) {
911902
if (join.config().type() instanceof JoinTypes.UsingJoinType usingJoinType) {
912903
joinRefs.addAll(usingJoinType.columns());
@@ -919,15 +910,15 @@ static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicy
919910
joinRefs.addAll(keepRefs);
920911
}
921912
} else {
922-
referencesBuilder.addAll(p.references());
913+
referencesBuilder.get().addAll(p.references());
923914
if (p instanceof UnresolvedRelation ur && ur.indexMode() == IndexMode.TIME_SERIES) {
924915
// METRICS aggs generally rely on @timestamp without the user having to mention it.
925-
referencesBuilder.add(new UnresolvedAttribute(ur.source(), MetadataAttribute.TIMESTAMP_FIELD));
916+
referencesBuilder.get().add(new UnresolvedAttribute(ur.source(), MetadataAttribute.TIMESTAMP_FIELD));
926917
}
927918
// special handling for UnresolvedPattern (which is not an UnresolvedAttribute)
928919
p.forEachExpression(UnresolvedNamePattern.class, up -> {
929920
var ua = new UnresolvedAttribute(up.source(), up.name());
930-
referencesBuilder.add(ua);
921+
referencesBuilder.get().add(ua);
931922
if (p instanceof Keep) {
932923
keepRefs.add(ua);
933924
} else if (p instanceof Drop) {
@@ -952,10 +943,10 @@ static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicy
952943
//
953944
// and ips_policy enriches the results with the same name ip field),
954945
// these aliases should be kept in the list of fields.
955-
if (canRemoveAliases[0] && p.anyMatch(EsqlSession::couldOverrideAliases)) {
956-
canRemoveAliases[0] = false;
946+
if (canRemoveAliases.get() && p.anyMatch(EsqlSession::couldOverrideAliases)) {
947+
canRemoveAliases.set(false);
957948
}
958-
if (canRemoveAliases[0]) {
949+
if (canRemoveAliases.get()) {
959950
// remove any already discovered UnresolvedAttributes that are in fact aliases defined later down in the tree
960951
// for example "from test | eval x = salary | stats max = max(x) by gender"
961952
// remove the UnresolvedAttribute "x", since that is an Alias defined in "eval"
@@ -971,24 +962,27 @@ static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicy
971962
if (fieldNames.contains(ne.name())) {
972963
return;
973964
}
974-
referencesBuilder.removeIf(
975-
attr -> matchByName(attr, ne.name(), keepRefs.contains(attr) || dropWildcardRefs.contains(attr))
976-
);
965+
referencesBuilder.get()
966+
.removeIf(attr -> matchByName(attr, ne.name(), keepRefs.contains(attr) || dropWildcardRefs.contains(attr)));
977967
});
978968
}
969+
970+
// No early return.
971+
return true;
979972
});
973+
parsed.forEachDownMayReturnEarly(processingLambda.get());
980974

981975
// Add JOIN ON column references afterward to avoid Alias removal
982-
referencesBuilder.addAll(joinRefs);
976+
referencesBuilder.get().addAll(joinRefs);
983977
// If any JOIN commands need wildcard field-caps calls, persist the index names
984978
if (wildcardJoinIndices.isEmpty() == false) {
985979
result = result.withWildcardJoinIndices(wildcardJoinIndices);
986980
}
987981

988982
// remove valid metadata attributes because they will be filtered out by the IndexResolver anyway
989983
// otherwise, in some edge cases, we will fail to ask for "*" (all fields) instead
990-
referencesBuilder.removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name()));
991-
Set<String> fieldNames = referencesBuilder.build().names();
984+
referencesBuilder.get().removeIf(a -> a instanceof MetadataAttribute || MetadataAttribute.isSupported(a.name()));
985+
Set<String> fieldNames = referencesBuilder.get().build().names();
992986

993987
if (fieldNames.isEmpty() && enrichPolicyMatchFields.isEmpty()) {
994988
// there cannot be an empty list of fields, we'll ask the simplest and lightest one instead: _index

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/IndexResolverFieldNamesTests.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2080,7 +2080,7 @@ public void testForkFieldsWithKeepAfterFork() {
20802080
(WHERE d > 1000 AND e == "aaa" | EVAL c = a + 200)
20812081
| WHERE x > y
20822082
| KEEP a, b, c, d, x
2083-
""", Set.of("a", "x", "y", "d", "e", "e.*", "d.*", "y.*", "x.*", "a.*"));
2083+
""", Set.of("a", "x", "y", "c", "d", "e", "e.*", "d.*", "y.*", "x.*", "a.*", "c.*"));
20842084
}
20852085

20862086
public void testForkFieldsWithKeepBeforeFork() {
@@ -2114,7 +2114,7 @@ public void testForkFieldsWithStatsInOneBranch() {
21142114
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
21152115
(STATS x = count(*), y=min(z))
21162116
| WHERE x > y
2117-
""", ALL_FIELDS);
2117+
""", Set.of("x", "y", "a", "c", "z", "y.*", "x.*", "z.*", "a.*", "c.*"));
21182118
}
21192119

21202120
public void testForkFieldsWithEnrichAndLookupJoins() {
@@ -2164,11 +2164,32 @@ public void testForkWithStatsInAllBranches() {
21642164
(EVAL z = a * b | STATS m = max(z))
21652165
(STATS x = count(*), y=min(z))
21662166
| WHERE x > y
2167-
""", Set.of("c", "a", "z", "z.*", "a.*", "c.*"));
2167+
""", Set.of("x", "y", "c", "a", "z", "y.*", "x.*", "z.*", "a.*", "c.*"));
2168+
}
2169+
2170+
public void testForkWithStatsInAllBranches1() {
2171+
assertFieldNames("""
2172+
FROM employees
2173+
| FORK
2174+
( STATS x = min(last_name))
2175+
( EVAL last_name = first_name | STATS y = max(last_name))
2176+
""", Set.of("first_name", "last_name", "first_name.*", "last_name.*"));
2177+
}
2178+
2179+
public void testForkWithStatsInAllBranches2() {
2180+
assertFieldNames("""
2181+
FROM employees
2182+
| FORK
2183+
( EVAL last_name = first_name | STATS y = VALUES(last_name))
2184+
( STATS x = VALUES(last_name))
2185+
""", Set.of("first_name", "last_name", "first_name.*", "last_name.*"));
21682186
}
21692187

21702188
public void testForkWithStatsAndWhere() {
2171-
assertFieldNames(" FROM employees | FORK ( WHERE true | stats min(salary) by gender) ( WHERE true | LIMIT 3 )", ALL_FIELDS);
2189+
assertFieldNames(
2190+
" FROM employees | FORK ( WHERE true | stats min(salary) by gender) ( WHERE true | LIMIT 3 )",
2191+
Set.of("gender", "salary", "gender.*", "salary.*")
2192+
);
21722193
}
21732194

21742195
private Set<String> fieldNames(String query, Set<String> enrichPolicyMatchFields) {

0 commit comments

Comments
 (0)