Skip to content

Commit 2bf6d54

Browse files
authored
ES|QL: Fix constant folding when using FORK (#128276)
1 parent ef06bc8 commit 2bf6d54

File tree

11 files changed

+187
-42
lines changed

11 files changed

+187
-42
lines changed

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
4747
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
4848
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
49-
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V3;
49+
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V4;
5050
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
5151
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
5252
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7;
@@ -132,7 +132,7 @@ protected void shouldSkipTest(String testName) throws IOException {
132132
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
133133
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
134134
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
135-
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V3.capabilityName()));
135+
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V4.capabilityName()));
136136
}
137137

138138
@Override

x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//
44

55
simpleFork
6-
required_capability: fork_v3
6+
required_capability: fork_v4
77

88
FROM employees
99
| FORK ( WHERE emp_no == 10001 )
@@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword
1818
;
1919

2020
forkWithWhereSortAndLimit
21-
required_capability: fork_v3
21+
required_capability: fork_v4
2222

2323
FROM employees
2424
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
@@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword
3838
;
3939

4040
fiveFork
41-
required_capability: fork_v3
41+
required_capability: fork_v4
4242

4343
FROM employees
4444
| FORK ( WHERE emp_no == 10005 )
@@ -59,7 +59,7 @@ fork5 | 10001
5959
;
6060

6161
forkWithWhereSortDescAndLimit
62-
required_capability: fork_v3
62+
required_capability: fork_v4
6363

6464
FROM employees
6565
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
@@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin
7676
;
7777

7878
forkWithCommonPrefilter
79-
required_capability: fork_v3
79+
required_capability: fork_v4
8080

8181
FROM employees
8282
| WHERE emp_no > 10050
@@ -94,7 +94,7 @@ fork2 | 10100
9494
;
9595

9696
forkWithSemanticSearchAndScore
97-
required_capability: fork_v3
97+
required_capability: fork_v4
9898
required_capability: semantic_text_field_caps
9999
required_capability: metadata_score
100100

@@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w
114114
;
115115

116116
forkWithEvals
117-
required_capability: fork_v3
117+
required_capability: fork_v4
118118

119119
FROM employees
120120
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
@@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2
131131
;
132132

133133
forkWithStats
134-
required_capability: fork_v3
134+
required_capability: fork_v4
135135

136136
FROM employees
137137
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
@@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null
152152
;
153153

154154
forkWithDissect
155-
required_capability: fork_v3
155+
required_capability: fork_v4
156156

157157
FROM employees
158158
| WHERE emp_no == 10048 OR emp_no == 10081
@@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei
172172
;
173173

174174
forkWithMixOfCommands
175-
required_capability: fork_v3
175+
required_capability: fork_v4
176176

177177
FROM employees
178178
| WHERE emp_no == 10048 OR emp_no == 10081
@@ -195,3 +195,24 @@ fork3 | 10081 | Rosen | null | null | null
195195
fork4 | 10048 | abc | aaa | null | null
196196
fork4 | 10081 | abc | aaa | null | null
197197
;
198+
199+
forkWithFiltersOnConstantValues
200+
required_capability: fork_v4
201+
202+
FROM employees
203+
| EVAL z = 1
204+
| WHERE z == 1
205+
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | WHERE z - 1 == 0)
206+
(WHERE emp_no == 10081 OR emp_no == 10087 | EVAL a = "x" )
207+
(STATS x = COUNT(*), y = MAX(emp_no), z = MIN(emp_no) | EVAL a = "y" )
208+
(STATS x = COUNT(*), y = MIN(emp_no))
209+
| WHERE _fork == "fork2" OR a == "y"
210+
| KEEP _fork, emp_no, x, y, z
211+
| SORT _fork, emp_no
212+
;
213+
214+
_fork:keyword | emp_no:integer | x:long | y:integer | z:integer
215+
fork2 | 10081 | null | null | 1
216+
fork2 | 10087 | null | null | 1
217+
fork3 | null | 100 | 10100 | 10001
218+
;

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,51 @@ public void testWithStatsSimple() {
585585
}
586586
}
587587

588+
public void testWithConditionOnForkField() {
589+
var query = """
590+
FROM test
591+
| FORK ( WHERE content:"fox" | EVAL a = 1)
592+
( WHERE content:"cat" | EVAL b = 2 )
593+
( WHERE content:"dog" | EVAL c = 3 )
594+
| WHERE _fork == "fork2"
595+
| KEEP _fork, id, content, a, b, c
596+
| SORT _fork
597+
""";
598+
599+
try (var resp = run(query)) {
600+
assertColumnNames(resp.columns(), List.of("_fork", "id", "content", "a", "b", "c"));
601+
602+
Iterable<Iterable<Object>> expectedValues = List.of(
603+
Arrays.stream(new Object[] { "fork2", 5, "There is also a white cat", null, 2, null }).toList()
604+
);
605+
assertValues(resp.values(), expectedValues);
606+
}
607+
}
608+
609+
public void testWithFilteringOnConstantColumn() {
610+
var query = """
611+
FROM test
612+
| FORK ( WHERE content:"fox" | EVAL a = 1)
613+
( WHERE content:"cat" | EVAL a = 2 )
614+
( WHERE content:"dog" | EVAL a = 3 )
615+
| WHERE a == 3
616+
| KEEP _fork, id, content, a
617+
| SORT id
618+
| LIMIT 3
619+
""";
620+
621+
try (var resp = run(query)) {
622+
assertColumnNames(resp.columns(), List.of("_fork", "id", "content", "a"));
623+
624+
Iterable<Iterable<Object>> expectedValues = List.of(
625+
List.of("fork3", 2, "This is a brown dog", 3),
626+
List.of("fork3", 3, "This dog is really brown", 3),
627+
List.of("fork3", 4, "The dog is brown but this document is very very long", 3)
628+
);
629+
assertValues(resp.values(), expectedValues);
630+
}
631+
}
632+
588633
public void testWithEvalWithConflictingTypes() {
589634
var query = """
590635
FROM test

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1013,7 +1013,7 @@ public enum Cap {
10131013
/**
10141014
* Support streaming of sub plan results
10151015
*/
1016-
FORK_V3(Build.current().isSnapshot()),
1016+
FORK_V4(Build.current().isSnapshot()),
10171017

10181018
/**
10191019
* Support for the {@code leading_zeros} named parameter.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -752,15 +752,16 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
752752
// we align the outputs of the sub plans such that they have the same columns
753753
boolean changed = false;
754754
List<LogicalPlan> newSubPlans = new ArrayList<>();
755-
Set<String> forkColumns = fork.outputSet().names();
755+
List<Attribute> outputUnion = Fork.outputUnion(fork.children());
756+
List<String> forkColumns = outputUnion.stream().map(Attribute::name).toList();
756757

757758
for (LogicalPlan logicalPlan : fork.children()) {
758759
Source source = logicalPlan.source();
759760

760761
// find the missing columns
761762
List<Attribute> missing = new ArrayList<>();
762763
Set<String> currentNames = logicalPlan.outputSet().names();
763-
for (Attribute attr : fork.outputSet()) {
764+
for (Attribute attr : outputUnion) {
764765
if (currentNames.contains(attr.name()) == false) {
765766
missing.add(attr);
766767
}
@@ -795,7 +796,19 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
795796
newSubPlans.add(logicalPlan);
796797
}
797798

798-
return changed ? new Fork(fork.source(), newSubPlans) : fork;
799+
if (changed == false) {
800+
return fork;
801+
}
802+
803+
List<Attribute> newOutput = new ArrayList<>();
804+
805+
// We don't want to keep the same attributes that are outputted by the FORK branches.
806+
// Keeping the same attributes can have unintended side effects when applying optimizations like constant folding.
807+
for (Attribute attr : newSubPlans.getFirst().output()) {
808+
newOutput.add(new ReferenceAttribute(attr.source(), attr.name(), attr.dataType()));
809+
}
810+
811+
return changed ? new Fork(fork.source(), newSubPlans, newOutput) : fork;
799812
}
800813

801814
private LogicalPlan resolveRerank(Rerank rerank, List<Attribute> childrenOutput) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,17 @@
1212
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1313
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1414
import org.elasticsearch.xpack.esql.core.expression.NameId;
15+
import org.elasticsearch.xpack.esql.core.type.DataType;
1516
import org.elasticsearch.xpack.esql.plan.QueryPlan;
1617
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
18+
import org.elasticsearch.xpack.esql.plan.logical.Fork;
1719
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;
20+
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
21+
22+
import java.util.HashSet;
23+
import java.util.Map;
24+
import java.util.Set;
25+
import java.util.stream.Collectors;
1826

1927
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2028

@@ -44,6 +52,8 @@ public static void checkPlan(QueryPlan<?> p, Failures failures) {
4452
binaryExec.right().outputSet(),
4553
failures
4654
);
55+
} else if (p instanceof Fork || p instanceof MergeExec) {
56+
checkMissingFork(p, failures);
4757
} else {
4858
checkMissing(p, p.references(), p.inputSet(), "missing references", failures);
4959
}
@@ -59,6 +69,41 @@ public static void checkPlan(QueryPlan<?> p, Failures failures) {
5969
}
6070
}
6171

72+
private static void checkMissingFork(QueryPlan<?> plan, Failures failures) {
73+
for (QueryPlan<?> child : plan.children()) {
74+
checkMissingForkBranch(child, plan.outputSet(), failures);
75+
}
76+
}
77+
78+
private static void checkMissingForkBranch(QueryPlan<?> plan, AttributeSet forkOutputSet, Failures failures) {
79+
Map<String, DataType> attributeTypes = forkOutputSet.stream().collect(Collectors.toMap(Attribute::name, Attribute::dataType));
80+
AttributeSet missing = AttributeSet.of();
81+
82+
Set<String> commonAttrs = new HashSet<>();
83+
84+
// get the missing attributes from the sub plan
85+
plan.output().forEach(attribute -> {
86+
var attrType = attributeTypes.get(attribute.name());
87+
if (attrType == null || attrType != attribute.dataType()) {
88+
missing.add(attribute);
89+
}
90+
commonAttrs.add(attribute.name());
91+
});
92+
93+
// get the missing attributes from the fork output
94+
forkOutputSet.forEach(attribute -> {
95+
if (commonAttrs.contains(attribute.name()) == false) {
96+
missing.add(attribute);
97+
}
98+
});
99+
100+
if (missing.isEmpty() == false) {
101+
failures.add(
102+
fail(plan, "Plan [{}] optimized incorrectly due to missing attributes in subplans", plan.nodeString(), missing.toString())
103+
);
104+
}
105+
}
106+
62107
private static void checkMissingBinary(
63108
QueryPlan<?> plan,
64109
AttributeSet leftReferences,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) {
637637
}
638638
return input -> {
639639
List<LogicalPlan> subPlans = subQueries.stream().map(planFactory -> planFactory.apply(input)).toList();
640-
return new Fork(source(ctx), subPlans);
640+
return new Fork(source(ctx), subPlans, List.of());
641641
};
642642
}
643643

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,19 @@
3535
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware {
3636

3737
public static final String FORK_FIELD = "_fork";
38-
List<Attribute> lazyOutput;
38+
private final List<Attribute> output;
3939

40-
public Fork(Source source, List<LogicalPlan> children) {
40+
public Fork(Source source, List<LogicalPlan> children, List<Attribute> output) {
4141
super(source, children);
4242
if (children.size() < 2) {
4343
throw new IllegalArgumentException("requires more than two subqueries, got:" + children.size());
4444
}
45+
this.output = output;
4546
}
4647

4748
@Override
4849
public LogicalPlan replaceChildren(List<LogicalPlan> newChildren) {
49-
return new Fork(source(), newChildren);
50+
return new Fork(source(), newChildren, output);
5051
}
5152

5253
@Override
@@ -65,7 +66,8 @@ public boolean expressionsResolved() {
6566
return false;
6667
}
6768

68-
if (children().stream().anyMatch(p -> p.outputSet().names().contains(Analyzer.NO_FIELDS_NAME))) {
69+
if (children().stream()
70+
.anyMatch(p -> p.outputSet().names().contains(Analyzer.NO_FIELDS_NAME) || output.size() != p.output().size())) {
6971
return false;
7072
}
7173

@@ -85,26 +87,23 @@ public boolean expressionsResolved() {
8587

8688
@Override
8789
protected NodeInfo<? extends LogicalPlan> info() {
88-
return NodeInfo.create(this, Fork::new, children());
90+
return NodeInfo.create(this, Fork::new, children(), output);
8991
}
9092

9193
public Fork replaceSubPlans(List<LogicalPlan> subPlans) {
92-
return new Fork(source(), subPlans);
94+
return new Fork(source(), subPlans, output);
9395
}
9496

9597
@Override
9698
public List<Attribute> output() {
97-
if (lazyOutput == null) {
98-
lazyOutput = lazyOutput();
99-
}
100-
return lazyOutput;
99+
return output;
101100
}
102101

103-
private List<Attribute> lazyOutput() {
102+
public static List<Attribute> outputUnion(List<LogicalPlan> subplans) {
104103
List<Attribute> output = new ArrayList<>();
105104
Set<String> names = new HashSet<>();
106105

107-
for (var subPlan : children()) {
106+
for (var subPlan : subplans) {
108107
for (var attr : subPlan.output()) {
109108
if (names.contains(attr.name()) == false && attr.name().equals(Analyzer.NO_FIELDS_NAME) == false) {
110109
names.add(attr.name());

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public final void test() throws Throwable {
310310
);
311311
assumeFalse(
312312
"CSV tests cannot currently handle FORK",
313-
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V3.capabilityName())
313+
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V4.capabilityName())
314314
);
315315
assumeFalse(
316316
"CSV tests cannot currently handle multi_match function that depends on Lucene",

0 commit comments

Comments
 (0)