Skip to content

Commit 9b6ce86

Browse files
authored
FORK - allow EVAL/DISSECT/STATS in branches (#125937)
1 parent 4fe2fb5 commit 9b6ce86

File tree

12 files changed

+1116
-582
lines changed

12 files changed

+1116
-582
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,86 @@ fork1 | 5.603396578413904E18 | 2 | all we have to decide is w
112112
fork2 | 2.3447541759648727E18 | 3 | be excellent to each other
113113
fork2 | 6.093784261960139E18 | 2 | all we have to decide is what to do with the time that is given to us
114114
;
115+
116+
forkWithEvals
117+
required_capability: fork_v2
118+
119+
FROM employees
120+
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
121+
(WHERE emp_no == 10081 OR emp_no == 10087 | EVAL x = "def" | EVAL z = 2)
122+
| KEEP _fork, emp_no, x, y, z
123+
| SORT _fork, emp_no
124+
;
125+
126+
_fork:keyword | emp_no:integer | x:keyword | y:integer | z:integer
127+
fork1 | 10048 | abc | 1 | null
128+
fork1 | 10081 | abc | 1 | null
129+
fork2 | 10081 | def | null | 2
130+
fork2 | 10087 | def | null | 2
131+
;
132+
133+
forkWithStats
134+
required_capability: fork_v2
135+
136+
FROM employees
137+
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
138+
(WHERE emp_no == 10081 OR emp_no == 10087)
139+
(STATS x = COUNT(*), y = MAX(emp_no), z = MIN(emp_no))
140+
(STATS x = COUNT(*), y = MIN(emp_no))
141+
| KEEP _fork, emp_no, x, y, z
142+
| SORT _fork, emp_no
143+
;
144+
145+
_fork:keyword | emp_no:integer | x:long | y:integer | z:integer
146+
fork1 | 10048 | null | null | null
147+
fork1 | 10081 | null | null | null
148+
fork2 | 10081 | null | null | null
149+
fork2 | 10087 | null | null | null
150+
fork3 | null | 100 | 10100 | 10001
151+
fork4 | null | 100 | 10001 | null
152+
;
153+
154+
forkWithDissect
155+
required_capability: fork_v2
156+
157+
FROM employees
158+
| WHERE emp_no == 10048 OR emp_no == 10081
159+
| FORK (EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
160+
| DISSECT a "%{x} %{y} %{z}" )
161+
(EVAL b = CONCAT(last_name, " ", emp_no::keyword, " ", first_name)
162+
| DISSECT b "%{x} %{y} %{w}" )
163+
| KEEP _fork, emp_no, x, y, z, w
164+
| SORT _fork, emp_no
165+
;
166+
167+
_fork:keyword | emp_no:integer | x:keyword | y:keyword | z:keyword | w:keyword
168+
fork1 | 10048 | Florian | 10048 | Syrotiuk | null
169+
fork1 | 10081 | Zhongwei | 10081 | Rosen | null
170+
fork2 | 10048 | Syrotiuk | 10048 | null | Florian
171+
fork2 | 10081 | Rosen | 10081 | null | Zhongwei
172+
;
173+
174+
forkWithMixOfCommands
175+
required_capability: fork_v2
176+
177+
FROM employees
178+
| WHERE emp_no == 10048 OR emp_no == 10081
179+
| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
180+
| DISSECT a "%{x} %{y} %{z}"
181+
| EVAL y = y::keyword )
182+
( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword )
183+
( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name )
184+
( EVAL x = "abc" | EVAL y = "aaa" )
185+
| KEEP _fork, emp_no, x, y, z, a
186+
| SORT _fork, emp_no
187+
;
188+
189+
_fork:keyword | emp_no:integer | x:keyword | y:keyword | z:keyword | a:keyword
190+
fork1 | 10048 | Florian | 10048 | Syrotiuk | Florian 10048 Syrotiuk
191+
fork1 | 10081 | Zhongwei | 10081 | Rosen | Zhongwei 10081 Rosen
192+
fork2 | null | 2 | 10081 | 10048 | null
193+
fork3 | 10048 | Syrotiuk | null | null | null
194+
fork3 | 10081 | Rosen | null | null | null
195+
fork4 | 10048 | abc | aaa | null | null
196+
fork4 | 10081 | abc | aaa | null | null
197+
;

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xpack.esql.parser.ParsingException;
1515
import org.junit.Before;
1616

17+
import java.util.Arrays;
1718
import java.util.Iterator;
1819
import java.util.List;
1920
import java.util.function.Predicate;
@@ -501,6 +502,75 @@ public void testSubqueryWithoutLimitOnly() { // this should
501502
}
502503
}
503504

505+
public void testWithEvalSimple() {
506+
var query = """
507+
FROM test
508+
| WHERE content:"cat"
509+
| FORK ( EVAL a = 1 )
510+
( EVAL a = 2 )
511+
| KEEP a, _fork, id, content
512+
""";
513+
514+
try (var resp = run(query)) {
515+
assertColumnNames(resp.columns(), List.of("a", "_fork", "id", "content"));
516+
517+
Iterable<Iterable<Object>> expectedValues = List.of(
518+
List.of(1, "fork1", 5, "There is also a white cat"),
519+
List.of(2, "fork2", 5, "There is also a white cat")
520+
);
521+
assertValues(resp.values(), expectedValues);
522+
}
523+
}
524+
525+
public void testWithEvalDifferentOutputs() {
526+
var query = """
527+
FROM test
528+
| WHERE id == 2
529+
| FORK ( EVAL a = 1 )
530+
( EVAL b = 2 )
531+
| KEEP a, b, _fork
532+
| SORT _fork, a
533+
""";
534+
try (var resp = run(query)) {
535+
assertColumnNames(resp.columns(), List.of("a", "b", "_fork"));
536+
Iterable<Iterable<Object>> expectedValues = List.of(
537+
Arrays.stream(new Object[] { 1, null, "fork1" }).toList(),
538+
Arrays.stream(new Object[] { null, 2, "fork2" }).toList()
539+
);
540+
assertValues(resp.values(), expectedValues);
541+
}
542+
}
543+
544+
public void testWithStatsSimple() {
545+
var query = """
546+
FROM test
547+
| FORK (STATS x=COUNT(*), y=VALUES(id))
548+
(WHERE id == 2)
549+
| KEEP _fork, x, y, id
550+
| SORT _fork, id
551+
""";
552+
try (var resp = run(query)) {
553+
assertColumnNames(resp.columns(), List.of("_fork", "x", "y", "id"));
554+
Iterable<Iterable<Object>> expectedValues = List.of(
555+
Arrays.stream(new Object[] { "fork1", 6L, List.of(1, 2, 3, 4, 5, 6), null }).toList(),
556+
Arrays.stream(new Object[] { "fork2", null, null, 2 }).toList()
557+
);
558+
assertValues(resp.values(), expectedValues);
559+
}
560+
}
561+
562+
public void testWithEvalWithConflictingTypes() {
563+
var query = """
564+
FROM test
565+
| FORK ( EVAL a = 1 )
566+
( EVAL a = "aaaa" )
567+
| KEEP a, _fork
568+
""";
569+
570+
var e = expectThrows(VerificationException.class, () -> run(query));
571+
assertTrue(e.getMessage().contains("Column [a] has conflicting data types"));
572+
}
573+
504574
public void testSubqueryWithUnknownField() {
505575
var query = """
506576
FROM test
@@ -565,6 +635,19 @@ public void testSubqueryWithUnknownFieldInSort() {
565635
assertTrue(e.getMessage().contains("Unknown column [bar]"));
566636
}
567637

638+
public void testSubqueryWithUnknownFieldInEval() {
639+
var query = """
640+
FROM test
641+
| FORK
642+
( EVAL x = baz + 1)
643+
( WHERE content:"cat" )
644+
| KEEP _fork, id, content
645+
| SORT _fork, id
646+
""";
647+
var e = expectThrows(VerificationException.class, () -> run(query));
648+
assertTrue(e.getMessage().contains("Unknown column [baz]"));
649+
}
650+
568651
public void testOneSubQuery() {
569652
var query = """
570653
FROM test

x-pack/plugin/esql/src/main/antlr/EsqlBaseParser.g4

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,12 @@ forkSubQueryCommand
281281
;
282282

283283
forkSubQueryProcessingCommand
284-
: whereCommand
285-
| sortCommand
284+
: evalCommand
285+
| whereCommand
286286
| limitCommand
287+
| statsCommand
288+
| sortCommand
289+
| dissectCommand
287290
;
288291

289292
rrfCommand

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -964,7 +964,12 @@ public enum Cap {
964964
/**
965965
* Support max_over_time aggregation
966966
*/
967-
MAX_OVER_TIME(Build.current().isSnapshot());
967+
MAX_OVER_TIME(Build.current().isSnapshot()),
968+
969+
/**
970+
* Support STATS/EVAL/DISSECT in Fork branches
971+
*/
972+
FORK_V2(Build.current().isSnapshot());
968973

969974
private final boolean enabled;
970975

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,9 @@
153153
public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerContext> {
154154
// marker list of attributes for plans that do not have any concrete fields to return, but have other computed columns to return
155155
// ie from test | stats c = count(*)
156+
public static final String NO_FIELDS_NAME = "<no-fields>";
156157
public static final List<Attribute> NO_FIELDS = List.of(
157-
new ReferenceAttribute(Source.EMPTY, "<no-fields>", DataType.NULL, Nullability.TRUE, null, true)
158+
new ReferenceAttribute(Source.EMPTY, NO_FIELDS_NAME, DataType.NULL, Nullability.TRUE, null, true)
158159
);
159160

160161
private static final List<Batch<LogicalPlan>> RULES = List.of(
@@ -499,6 +500,10 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
499500
return resolveKeep(p, childrenOutput);
500501
}
501502

503+
if (plan instanceof Fork f) {
504+
return resolveFork(f, context);
505+
}
506+
502507
if (plan instanceof Eval p) {
503508
return resolveEval(p, childrenOutput);
504509
}
@@ -714,6 +719,56 @@ private Join resolveLookupJoin(LookupJoin join) {
714719
return join;
715720
}
716721

722+
private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
723+
// we align the outputs of the sub plans such that they have the same columns
724+
boolean changed = false;
725+
List<LogicalPlan> newSubPlans = new ArrayList<>();
726+
Set<String> forkColumns = fork.outputSet().names();
727+
728+
for (LogicalPlan logicalPlan : fork.children()) {
729+
Source source = logicalPlan.source();
730+
731+
// find the missing columns
732+
List<Attribute> missing = new ArrayList<>();
733+
Set<String> currentNames = logicalPlan.outputSet().names();
734+
for (Attribute attr : fork.outputSet()) {
735+
if (currentNames.contains(attr.name()) == false) {
736+
missing.add(attr);
737+
}
738+
}
739+
740+
List<Alias> aliases = missing.stream().map(attr -> new Alias(source, attr.name(), Literal.of(attr, null))).toList();
741+
742+
// add the missing columns
743+
if (aliases.size() > 0) {
744+
logicalPlan = new Eval(source, logicalPlan, aliases);
745+
changed = true;
746+
}
747+
748+
List<String> subPlanColumns = logicalPlan.output().stream().map(Attribute::name).toList();
749+
// We need to add an explicit Keep even if the outputs align
750+
// This is because at the moment the sub plans are executed and optimized separately and the output might change
751+
// during optimizations. Once we add streaming we might not need to add a Keep when the outputs already align.
752+
// Note that until we add explicit support for KEEP in FORK branches, this condition will always be true.
753+
if (logicalPlan instanceof Keep == false || subPlanColumns.equals(forkColumns) == false) {
754+
changed = true;
755+
List<Attribute> newOutput = new ArrayList<>();
756+
for (String attrName : forkColumns) {
757+
for (Attribute subAttr : logicalPlan.output()) {
758+
if (attrName.equals(subAttr.name())) {
759+
newOutput.add(subAttr);
760+
}
761+
}
762+
}
763+
logicalPlan = new Keep(logicalPlan.source(), logicalPlan, newOutput);
764+
}
765+
766+
newSubPlans.add(logicalPlan);
767+
}
768+
769+
return changed ? new Fork(fork.source(), newSubPlans) : fork;
770+
}
771+
717772
private LogicalPlan resolveRerank(Rerank rerank, List<Attribute> childrenOutput) {
718773
List<Alias> newFields = new ArrayList<>();
719774
boolean changed = false;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PruneColumns.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
import org.elasticsearch.xpack.esql.core.expression.EmptyAttribute;
1515
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1616
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
17+
import org.elasticsearch.xpack.esql.core.util.Holder;
1718
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
1819
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
1920
import org.elasticsearch.xpack.esql.plan.logical.Eval;
21+
import org.elasticsearch.xpack.esql.plan.logical.Fork;
2022
import org.elasticsearch.xpack.esql.plan.logical.Limit;
2123
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2224
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
@@ -36,6 +38,8 @@ public final class PruneColumns extends Rule<LogicalPlan, LogicalPlan> {
3638
public LogicalPlan apply(LogicalPlan plan) {
3739
// track used references
3840
var used = plan.outputSet().asBuilder();
41+
Holder<Boolean> forkPresent = new Holder<>(false);
42+
3943
// while going top-to-bottom (upstream)
4044
var pl = plan.transformDown(p -> {
4145
// Note: It is NOT required to do anything special for binary plans like JOINs. It is perfectly fine that transformDown descends
@@ -50,6 +54,14 @@ public LogicalPlan apply(LogicalPlan plan) {
5054
return p;
5155
}
5256

57+
if (p instanceof Fork) {
58+
forkPresent.set(true);
59+
}
60+
// pruning columns for Fork branches can have the side effect of having misaligned outputs
61+
if (forkPresent.get()) {
62+
return p;
63+
}
64+
5365
// remember used
5466
boolean recheck;
5567
// analyze the unused items against dedicated 'producer' nodes such as Eval and Aggregate

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/EsqlBaseParser.interp

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)