Skip to content

Commit 5a6509a

Browse files
authored
Stream result pages from sub plans for FORK (#126705)
1 parent e13173c commit 5a6509a

File tree

14 files changed

+179
-158
lines changed

14 files changed

+179
-158
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MergeOperator.java

Lines changed: 0 additions & 85 deletions
This file was deleted.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/RrfScoreEvalOperator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,11 @@ protected Page process(Page page) {
6868
for (int i = 0; i < page.getBlockCount() - 1; i++) {
6969
projections[i] = i == scorePosition ? page.getBlockCount() - 1 : i;
7070
}
71-
72-
return page.projectBlocks(projections);
71+
try {
72+
return page.projectBlocks(projections);
73+
} finally {
74+
page.releaseBlocks();
75+
}
7376
}
7477

7578
@Override

x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//
44

55
simpleFork
6-
required_capability: fork
6+
required_capability: fork_v3
77

88
FROM employees
99
| FORK ( WHERE emp_no == 10001 )
@@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword
1818
;
1919

2020
forkWithWhereSortAndLimit
21-
required_capability: fork
21+
required_capability: fork_v3
2222

2323
FROM employees
2424
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
@@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword
3838
;
3939

4040
fiveFork
41-
required_capability: fork
41+
required_capability: fork_v3
4242

4343
FROM employees
4444
| FORK ( WHERE emp_no == 10005 )
@@ -59,7 +59,7 @@ fork5 | 10001
5959
;
6060

6161
forkWithWhereSortDescAndLimit
62-
required_capability: fork
62+
required_capability: fork_v3
6363

6464
FROM employees
6565
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
@@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin
7676
;
7777

7878
forkWithCommonPrefilter
79-
required_capability: fork
79+
required_capability: fork_v3
8080

8181
FROM employees
8282
| WHERE emp_no > 10050
@@ -94,7 +94,7 @@ fork2 | 10100
9494
;
9595

9696
forkWithSemanticSearchAndScore
97-
required_capability: fork
97+
required_capability: fork_v3
9898
required_capability: semantic_text_field_caps
9999
required_capability: metadata_score
100100

@@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w
114114
;
115115

116116
forkWithEvals
117-
required_capability: fork_v2
117+
required_capability: fork_v3
118118

119119
FROM employees
120120
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
@@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2
131131
;
132132

133133
forkWithStats
134-
required_capability: fork_v2
134+
required_capability: fork_v3
135135

136136
FROM employees
137137
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
@@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null
152152
;
153153

154154
forkWithDissect
155-
required_capability: fork_v2
155+
required_capability: fork_v3
156156

157157
FROM employees
158158
| WHERE emp_no == 10048 OR emp_no == 10081
@@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei
172172
;
173173

174174
forkWithMixOfCommands
175-
required_capability: fork_v2
175+
required_capability: fork_v3
176176

177177
FROM employees
178178
| WHERE emp_no == 10048 OR emp_no == 10081

x-pack/plugin/esql/qa/testFixtures/src/main/resources/rrf.csv-spec

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ FROM books METADATA _id, _index, _score
7676
( WHERE author:"Ursula K. Le Guin" AND title:"short stories" | SORT _score, _id DESC | LIMIT 3)
7777
| RRF
7878
| STATS count_fork=COUNT(*) BY _fork
79+
| SORT _fork
7980
;
8081

8182
count_fork:long | _fork:keyword
@@ -120,6 +121,7 @@ FROM semantic_text METADATA _id, _score, _index
120121
( WHERE semantic_text_field:"something else" | SORT _score DESC | LIMIT 2)
121122
| RRF
122123
| EVAL _score = round(_score, 4)
124+
| EVAL _fork = mv_sort(_fork)
123125
| KEEP _fork, _score, _id, semantic_text_field
124126
;
125127

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,29 @@ private void testSimpleImpl(String query) {
7272
}
7373
}
7474

75+
public void testRow() {
76+
var query = """
77+
ROW a = [1, 2, 3, 4], b = 100
78+
| MV_EXPAND a
79+
| FORK (WHERE a % 2 == 1)
80+
(WHERE a % 2 == 0)
81+
| SORT _fork, a
82+
""";
83+
84+
try (var resp = run(query)) {
85+
assertColumnNames(resp.columns(), List.of("a", "b", "_fork"));
86+
assertColumnTypes(resp.columns(), List.of("integer", "integer", "keyword"));
87+
88+
Iterable<Iterable<Object>> expectedValues = List.of(
89+
List.of(1, 100, "fork1"),
90+
List.of(3, 100, "fork1"),
91+
List.of(2, 100, "fork2"),
92+
List.of(4, 100, "fork2")
93+
);
94+
assertValues(resp.values(), expectedValues);
95+
}
96+
}
97+
7598
public void testSortAndLimitInFirstSubQuery() {
7699
var query = """
77100
FROM test
@@ -216,13 +239,15 @@ public void testWhereSortOnlyInFork() {
216239
( WHERE content:"fox" | SORT id )
217240
( WHERE content:"dog" | SORT id )
218241
| KEEP _fork, id, content
242+
| SORT _fork, id
219243
""";
220244
var queryWithMatchFunction = """
221245
FROM test
222246
| FORK
223247
( WHERE match(content, "fox") | SORT id )
224248
( WHERE match(content, "dog") | SORT id )
225249
| KEEP _fork, id, content
250+
| SORT _fork, id
226251
""";
227252
for (var query : List.of(queryWithMatchOperator, queryWithMatchFunction)) {
228253
try (var resp = run(query)) {
@@ -509,6 +534,7 @@ public void testWithEvalSimple() {
509534
| FORK ( EVAL a = 1 )
510535
( EVAL a = 2 )
511536
| KEEP a, _fork, id, content
537+
| SORT _fork
512538
""";
513539

514540
try (var resp = run(query)) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/RrfIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public void testRrf() {
4040
( WHERE content:"fox" | SORT _score, _id DESC )
4141
( WHERE content:"dog" | SORT _score, _id DESC )
4242
| RRF
43+
| EVAL _fork = mv_sort(_fork)
4344
| EVAL _score = round(_score, 4)
4445
| KEEP id, content, _score, _fork
4546
""";

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -983,9 +983,9 @@ public enum Cap {
983983
MAX_OVER_TIME(Build.current().isSnapshot()),
984984

985985
/**
986-
* Support STATS/EVAL/DISSECT in Fork branches
986+
* Support streaming of sub plan results
987987
*/
988-
FORK_V2(Build.current().isSnapshot()),
988+
FORK_V3(Build.current().isSnapshot()),
989989

990990
/**
991991
* Support for the {@code leading_zeros} named parameter.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/ProjectAwayColumns.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.xpack.esql.plan.logical.Project;
2020
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
2121
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
22+
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
2223
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2324
import org.elasticsearch.xpack.esql.rule.Rule;
2425

@@ -45,6 +46,10 @@ public PhysicalPlan apply(PhysicalPlan plan) {
4546

4647
// This will require updating should we choose to have non-unary execution plans in the future.
4748
return plan.transformDown(currentPlanNode -> {
49+
if (currentPlanNode instanceof MergeExec) {
50+
keepTraversing.set(FALSE);
51+
}
52+
4853
if (keepTraversing.get() == false) {
4954
return currentPlanNode;
5055
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.compute.operator.LimitOperator;
3232
import org.elasticsearch.compute.operator.LocalSourceOperator;
3333
import org.elasticsearch.compute.operator.LocalSourceOperator.LocalSourceFactory;
34-
import org.elasticsearch.compute.operator.MergeOperator;
3534
import org.elasticsearch.compute.operator.MvExpandOperator;
3635
import org.elasticsearch.compute.operator.Operator;
3736
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
@@ -103,7 +102,6 @@
103102
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
104103
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
105104
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
106-
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
107105
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
108106
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
109107
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -281,8 +279,6 @@ else if (node instanceof OutputExec outputExec) {
281279
return planOutput(outputExec, context);
282280
} else if (node instanceof ExchangeSinkExec exchangeSink) {
283281
return planExchangeSink(exchangeSink, context);
284-
} else if (node instanceof MergeExec mergeExec) {
285-
return planMerge(mergeExec, context);
286282
} else if (node instanceof RrfScoreEvalExec rrf) {
287283
return planRrfScoreEvalExec(rrf, context);
288284
}
@@ -804,13 +800,6 @@ private PhysicalOperation planChangePoint(ChangePointExec changePoint, LocalExec
804800
);
805801
}
806802

807-
private PhysicalOperation planMerge(MergeExec mergeExec, LocalExecutionPlannerContext context) {
808-
Layout.Builder layout = new Layout.Builder();
809-
layout.append(mergeExec.output());
810-
MergeOperator.BlockSuppliers suppliers = () -> mergeExec.suppliers().stream().map(s -> s.get()).toList();
811-
return PhysicalOperation.fromSource(new MergeOperator.MergeOperatorFactory(suppliers), layout.build());
812-
}
813-
814803
/**
815804
* Immutable physical operation.
816805
*/

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@
3636
import org.elasticsearch.xpack.esql.plan.QueryPlan;
3737
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
3838
import org.elasticsearch.xpack.esql.plan.logical.Filter;
39+
import org.elasticsearch.xpack.esql.plan.logical.Project;
3940
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
4041
import org.elasticsearch.xpack.esql.plan.physical.EsSourceExec;
4142
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
4243
import org.elasticsearch.xpack.esql.plan.physical.ExchangeExec;
4344
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSinkExec;
4445
import org.elasticsearch.xpack.esql.plan.physical.ExchangeSourceExec;
4546
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
47+
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
4648
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
4749
import org.elasticsearch.xpack.esql.planner.mapper.LocalMapper;
4850
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
@@ -67,6 +69,34 @@
6769

6870
public class PlannerUtils {
6971

72+
/**
73+
* When the plan contains children like {@code MergeExec} resulted from the planning of commands such as FORK,
74+
* we need to break the plan into sub plans and a main coordinator plan.
75+
* The result pages from each sub plan will be funneled to the main coordinator plan.
76+
* To achieve this, we wire each sub plan with a {@code ExchangeSinkExec} and add a {@code ExchangeSourceExec}
77+
* to the main coordinator plan.
78+
* There is an additional split of each sub plan into a data node plan and coordinator plan.
79+
* This split is not done here, but as part of {@code PlannerUtils#breakPlanBetweenCoordinatorAndDataNode}.
80+
*/
81+
public static Tuple<List<PhysicalPlan>, PhysicalPlan> breakPlanIntoSubPlansAndMainPlan(PhysicalPlan plan) {
82+
var subplans = new Holder<List<PhysicalPlan>>();
83+
PhysicalPlan mainPlan = plan.transformUp(MergeExec.class, me -> {
84+
subplans.set(me.children().stream().map(child -> {
85+
// TODO: we are adding a Project plan to force InsertFieldExtraction - we should remove this transformation
86+
child = child.transformUp(FragmentExec.class, f -> {
87+
var logicalFragment = f.fragment();
88+
logicalFragment = new Project(logicalFragment.source(), logicalFragment, logicalFragment.output());
89+
return new FragmentExec(logicalFragment);
90+
});
91+
92+
return (PhysicalPlan) new ExchangeSinkExec(child.source(), child.output(), false, child);
93+
}).toList());
94+
return new ExchangeSourceExec(me.source(), me.output(), false);
95+
});
96+
97+
return new Tuple<>(subplans.get(), mainPlan);
98+
}
99+
70100
public static Tuple<PhysicalPlan, PhysicalPlan> breakPlanBetweenCoordinatorAndDataNode(PhysicalPlan plan, Configuration config) {
71101
var dataNodePlan = new Holder<PhysicalPlan>();
72102

0 commit comments

Comments
 (0)