Skip to content

Commit 6ea61d6

Browse files
committed
Fix constant folding when using FORK
1 parent 1fe3b77 commit 6ea61d6

File tree

7 files changed

+138
-17
lines changed

7 files changed

+138
-17
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,3 +195,22 @@ fork3 | 10081 | Rosen | null | null | null
195195
fork4 | 10048 | abc | aaa | null | null
196196
fork4 | 10081 | abc | aaa | null | null
197197
;
198+
199+
forkWithFiltersOnConstantValues
200+
FROM employees
201+
| EVAL z = 1
202+
| WHERE z == 1
203+
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | WHERE z - 1 == 0)
204+
(WHERE emp_no == 10081 OR emp_no == 10087 | EVAL a = "x" )
205+
(STATS x = COUNT(*), y = MAX(emp_no), z = MIN(emp_no) | EVAL a = "y" )
206+
(STATS x = COUNT(*), y = MIN(emp_no))
207+
| WHERE _fork == "fork2" OR a == "y"
208+
| KEEP _fork, emp_no, x, y, z
209+
| SORT _fork, emp_no
210+
;
211+
212+
_fork:keyword | emp_no:integer | x:long | y:integer | z:integer
213+
fork2 | 10081 | null | null | 1
214+
fork2 | 10087 | null | null | 1
215+
fork3 | null | 100 | 10100 | 10001
216+
;

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,51 @@ public void testWithStatsSimple() {
585585
}
586586
}
587587

588+
public void testWithConditionOnForkField() {
589+
var query = """
590+
FROM test
591+
| FORK ( WHERE content:"fox" | EVAL a = 1)
592+
( WHERE content:"cat" | EVAL b = 2 )
593+
( WHERE content:"dog" | EVAL c = 3 )
594+
| WHERE _fork == "fork2"
595+
| KEEP _fork, id, content, a, b, c
596+
| SORT _fork
597+
""";
598+
599+
try (var resp = run(query)) {
600+
assertColumnNames(resp.columns(), List.of("_fork", "id", "content", "a", "b", "c"));
601+
602+
Iterable<Iterable<Object>> expectedValues = List.of(
603+
Arrays.stream(new Object[] { "fork2", 5, "There is also a white cat", null, 2, null }).toList()
604+
);
605+
assertValues(resp.values(), expectedValues);
606+
}
607+
}
608+
609+
public void testWithFilteringOnConstantColumn() {
610+
var query = """
611+
FROM test
612+
| FORK ( WHERE content:"fox" | EVAL a = 1)
613+
( WHERE content:"cat" | EVAL a = 2 )
614+
( WHERE content:"dog" | EVAL a = 3 )
615+
| WHERE a == 3
616+
| KEEP _fork, id, content, a
617+
| SORT id
618+
| LIMIT 3
619+
""";
620+
621+
try (var resp = run(query)) {
622+
assertColumnNames(resp.columns(), List.of("_fork", "id", "content", "a"));
623+
624+
Iterable<Iterable<Object>> expectedValues = List.of(
625+
List.of("fork3", 2, "This is a brown dog", 3),
626+
List.of("fork3", 3, "This dog is really brown", 3),
627+
List.of("fork3", 4, "The dog is brown but this document is very very long", 3)
628+
);
629+
assertValues(resp.values(), expectedValues);
630+
}
631+
}
632+
588633
public void testWithEvalWithConflictingTypes() {
589634
var query = """
590635
FROM test

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -752,15 +752,16 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
752752
// we align the outputs of the sub plans such that they have the same columns
753753
boolean changed = false;
754754
List<LogicalPlan> newSubPlans = new ArrayList<>();
755-
Set<String> forkColumns = fork.outputSet().names();
755+
List<Attribute> outputUnion = Fork.outputUnion(fork.children());
756+
Set<String> forkColumns = outputUnion.stream().map(Attribute::name).collect(Collectors.toSet());
756757

757758
for (LogicalPlan logicalPlan : fork.children()) {
758759
Source source = logicalPlan.source();
759760

760761
// find the missing columns
761762
List<Attribute> missing = new ArrayList<>();
762763
Set<String> currentNames = logicalPlan.outputSet().names();
763-
for (Attribute attr : fork.outputSet()) {
764+
for (Attribute attr : outputUnion) {
764765
if (currentNames.contains(attr.name()) == false) {
765766
missing.add(attr);
766767
}
@@ -795,7 +796,19 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
795796
newSubPlans.add(logicalPlan);
796797
}
797798

798-
return changed ? new Fork(fork.source(), newSubPlans) : fork;
799+
if (!changed) {
800+
return fork;
801+
}
802+
803+
List<Attribute> newOutput = new ArrayList<>();
804+
805+
// We don't want to keep the same attributes that are outputted by the FORK branches.
806+
// Keeping the same attributes can have unintended side effects when applying optimizations like constant folding.
807+
for (Attribute attr : Fork.outputUnion(newSubPlans)) {
808+
newOutput.add(new ReferenceAttribute(attr.source(), attr.name(), attr.dataType()));
809+
}
810+
811+
return changed ? new Fork(fork.source(), newSubPlans, newOutput) : fork;
799812
}
800813

801814
private LogicalPlan resolveRerank(Rerank rerank, List<Attribute> childrenOutput) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/PlanConsistencyChecker.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,17 @@
1212
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1313
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
1414
import org.elasticsearch.xpack.esql.core.expression.NameId;
15+
import org.elasticsearch.xpack.esql.core.type.DataType;
1516
import org.elasticsearch.xpack.esql.plan.QueryPlan;
1617
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
18+
import org.elasticsearch.xpack.esql.plan.logical.Fork;
1719
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;
20+
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
21+
22+
import java.util.HashSet;
23+
import java.util.Map;
24+
import java.util.Set;
25+
import java.util.stream.Collectors;
1826

1927
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2028

@@ -44,6 +52,8 @@ public static void checkPlan(QueryPlan<?> p, Failures failures) {
4452
binaryExec.right().outputSet(),
4553
failures
4654
);
55+
} else if (p instanceof Fork || p instanceof MergeExec) {
56+
checkMissingFork(p, failures);
4757
} else {
4858
checkMissing(p, p.references(), p.inputSet(), "missing references", failures);
4959
}
@@ -59,6 +69,41 @@ public static void checkPlan(QueryPlan<?> p, Failures failures) {
5969
}
6070
}
6171

72+
private static void checkMissingFork(QueryPlan<?> plan, Failures failures) {
73+
for (QueryPlan<?> child : plan.children()) {
74+
checkMissingForkBranch(child, plan.outputSet(), failures);
75+
}
76+
}
77+
78+
private static void checkMissingForkBranch(QueryPlan<?> plan, AttributeSet forkOutputSet, Failures failures) {
79+
Map<String, DataType> attributeTypes = forkOutputSet.stream().collect(Collectors.toMap(Attribute::name, Attribute::dataType));
80+
AttributeSet missing = AttributeSet.of();
81+
82+
Set<String> commonAttrs = new HashSet<>();
83+
84+
// get the missing attributes from the sub plan
85+
plan.output().forEach(attribute -> {
86+
var attrType = attributeTypes.get(attribute.name());
87+
if (attrType == null || attrType != attribute.dataType()) {
88+
missing.add(attribute);
89+
}
90+
commonAttrs.add(attribute.name());
91+
});
92+
93+
// get the missing attributes from the fork output
94+
forkOutputSet.forEach(attribute -> {
95+
if (commonAttrs.contains(attribute.name()) == false) {
96+
missing.add(attribute);
97+
}
98+
});
99+
100+
if (missing.isEmpty() == false) {
101+
failures.add(
102+
fail(plan, "Plan [{}] optimized incorrectly due to missing attributes in subplans", plan.nodeString(), missing.toString())
103+
);
104+
}
105+
}
106+
62107
private static void checkMissingBinary(
63108
QueryPlan<?> plan,
64109
AttributeSet leftReferences,

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) {
637637
}
638638
return input -> {
639639
List<LogicalPlan> subPlans = subQueries.stream().map(planFactory -> planFactory.apply(input)).toList();
640-
return new Fork(source(ctx), subPlans);
640+
return new Fork(source(ctx), subPlans, List.of());
641641
};
642642
}
643643

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,19 @@
3535
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware {
3636

3737
public static final String FORK_FIELD = "_fork";
38-
List<Attribute> lazyOutput;
38+
private final List<Attribute> output;
3939

40-
public Fork(Source source, List<LogicalPlan> children) {
40+
public Fork(Source source, List<LogicalPlan> children, List<Attribute> output) {
4141
super(source, children);
4242
if (children.size() < 2) {
4343
throw new IllegalArgumentException("requires more than two subqueries, got:" + children.size());
4444
}
45+
this.output = output;
4546
}
4647

4748
@Override
4849
public LogicalPlan replaceChildren(List<LogicalPlan> newChildren) {
49-
return new Fork(source(), newChildren);
50+
return new Fork(source(), newChildren, output);
5051
}
5152

5253
@Override
@@ -65,7 +66,8 @@ public boolean expressionsResolved() {
6566
return false;
6667
}
6768

68-
if (children().stream().anyMatch(p -> p.outputSet().names().contains(Analyzer.NO_FIELDS_NAME))) {
69+
if (children().stream()
70+
.anyMatch(p -> p.outputSet().names().contains(Analyzer.NO_FIELDS_NAME) || output.size() != p.output().size())) {
6971
return false;
7072
}
7173

@@ -85,26 +87,23 @@ public boolean expressionsResolved() {
8587

8688
@Override
8789
protected NodeInfo<? extends LogicalPlan> info() {
88-
return NodeInfo.create(this, Fork::new, children());
90+
return NodeInfo.create(this, Fork::new, children(), output);
8991
}
9092

9193
public Fork replaceSubPlans(List<LogicalPlan> subPlans) {
92-
return new Fork(source(), subPlans);
94+
return new Fork(source(), subPlans, output);
9395
}
9496

9597
@Override
9698
public List<Attribute> output() {
97-
if (lazyOutput == null) {
98-
lazyOutput = lazyOutput();
99-
}
100-
return lazyOutput;
99+
return output;
101100
}
102101

103-
private List<Attribute> lazyOutput() {
102+
public static List<Attribute> outputUnion(List<LogicalPlan> subplans) {
104103
List<Attribute> output = new ArrayList<>();
105104
Set<String> names = new HashSet<>();
106105

107-
for (var subPlan : children()) {
106+
for (var subPlan : subplans) {
108107
for (var attr : subPlan.output()) {
109108
if (names.contains(attr.name()) == false && attr.name().equals(Analyzer.NO_FIELDS_NAME) == false) {
110109
names.add(attr.name());

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/CommandLicenseTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ private static LogicalPlan createInstance(Class<? extends LogicalPlan> clazz, Lo
156156
return new Grok(source, child, null, null, List.of());
157157
}
158158
case "Fork" -> {
159-
return new Fork(source, List.of(child, child));
159+
return new Fork(source, List.of(child, child), List.of());
160160
}
161161
case "Sample" -> {
162162
return new Sample(source, null, null, child);

0 commit comments

Comments
 (0)