Skip to content

Commit a26b596

Browse files
ioanatiaelasticsearchmachineChrisHegarty
authored
ES|QL: Initial grammar and changes for FORK (snapshot) (#121948)
* Grammar changes * Generate grammar changes * Fork planning * Fix field resolution * Cleanup * Add CSV tests * Update docs/changelog/121948.yaml * [CI] Auto commit changes from spotless * fix forbidden apis * javadoc * remove serialization of fork and Merge * fix equality * fix EsqlNodeSubclassTests * add statement parser tests * remove unnecessary serialization * automatic fork branch ids start at 1 * add analyzer test * more tests * more tests * minor itr * replace [] with () * move fork eval to initial logical plan * simplify MergeOperator finished state * enable CVS tests * rework Fork to use StubRelation and Merge to be Nary * reverts * fail hard if not LocalSourceExec * spotless * no fork in fork yet * itr * itr * itr * fix EsqlNodeSubclassTests * more tests and restrict NESTED_XX to snapshot * fix method name * check for fork cap before testing ForkIT * Move fork id alias logic to parser --------- Co-authored-by: elasticsearchmachine <[email protected]> Co-authored-by: ChrisHegarty <[email protected]> Co-authored-by: Chris Hegarty <[email protected]>
1 parent cdaa5dd commit a26b596

File tree

32 files changed

+4690
-2357
lines changed

32 files changed

+4690
-2357
lines changed

docs/changelog/121948.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121948
2+
summary: Add initial grammar and changes for FORK
3+
area: ES|QL
4+
type: feature
5+
issues: []
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator;
9+
10+
import org.elasticsearch.compute.data.Block;
11+
import org.elasticsearch.compute.data.BlockFactory;
12+
import org.elasticsearch.compute.data.Page;
13+
import org.elasticsearch.core.Releasables;
14+
15+
import java.util.List;
16+
import java.util.ListIterator;
17+
import java.util.function.Supplier;
18+
19+
/**
20+
* A merge operator is effectively a "fan-in" operator - accepts input
21+
* from several sources and provides it in a single output.
22+
*/
23+
public class MergeOperator extends SourceOperator {
24+
25+
public record MergeOperatorFactory(BlockSuppliers suppliers) implements SourceOperatorFactory {
26+
@Override
27+
public String describe() {
28+
return "MergeOperator[suppliers=" + suppliers + "]";
29+
}
30+
31+
@Override
32+
public SourceOperator get(DriverContext driverContext) {
33+
return new MergeOperator(driverContext.blockFactory(), suppliers);
34+
}
35+
}
36+
37+
private final BlockFactory blockFactory;
38+
private final BlockSuppliers suppliers;
39+
private boolean finished;
40+
private ListIterator<Block[]> subPlanBlocks;
41+
42+
public MergeOperator(BlockFactory blockFactory, BlockSuppliers suppliers) {
43+
super();
44+
this.blockFactory = blockFactory;
45+
this.suppliers = suppliers;
46+
}
47+
48+
public interface BlockSuppliers extends Supplier<List<Block[]>> {};
49+
50+
@Override
51+
public void finish() {}
52+
53+
@Override
54+
public boolean isFinished() {
55+
return finished;
56+
}
57+
58+
@Override
59+
public Page getOutput() {
60+
if (subPlanBlocks == null) {
61+
subPlanBlocks = suppliers.get().listIterator();
62+
}
63+
64+
if (subPlanBlocks.hasNext()) {
65+
return new Page(subPlanBlocks.next());
66+
}
67+
finished = true;
68+
return null;
69+
}
70+
71+
@Override
72+
public void close() {
73+
// release blocks from any subplan not fully consumed.
74+
if (subPlanBlocks != null) {
75+
while (subPlanBlocks.hasNext()) {
76+
Releasables.close(subPlanBlocks.next());
77+
}
78+
}
79+
}
80+
81+
@Override
82+
public String toString() {
83+
return "MergeOperator[subPlanBlocks=" + subPlanBlocks + "]";
84+
}
85+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
//
2+
// CSV spec for FORK command
3+
//
4+
5+
simpleFork
6+
required_capability: fork
7+
8+
FROM employees
9+
| FORK ( WHERE emp_no == 10001 )
10+
( WHERE emp_no == 10002 )
11+
| KEEP emp_no, _fork
12+
| SORT emp_no
13+
;
14+
15+
emp_no:integer | _fork:keyword
16+
10001 | fork1
17+
10002 | fork2
18+
;
19+
20+
forkWithWhereSortAndLimit
21+
required_capability: fork
22+
23+
FROM employees
24+
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
25+
( WHERE hire_date < "1988-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
26+
| KEEP emp_no, first_name, _fork
27+
| SORT emp_no, _fork
28+
;
29+
30+
emp_no:integer | first_name:keyword | _fork:keyword
31+
10002 | Bezalel | fork2
32+
10009 | Sumant | fork1
33+
10014 | Berni | fork2
34+
10048 | Florian | fork1
35+
10058 | Berhard | fork2
36+
10060 | Breannda | fork2
37+
10094 | Arumugam | fork2
38+
;
39+
40+
fiveFork
41+
required_capability: fork
42+
43+
FROM employees
44+
| FORK ( WHERE emp_no == 10005 )
45+
( WHERE emp_no == 10004 )
46+
( WHERE emp_no == 10003 )
47+
( WHERE emp_no == 10002 )
48+
( WHERE emp_no == 10001 )
49+
| KEEP _fork, emp_no
50+
| SORT _fork
51+
;
52+
53+
_fork:keyword | emp_no:integer
54+
fork1 | 10005
55+
fork2 | 10004
56+
fork3 | 10003
57+
fork4 | 10002
58+
fork5 | 10001
59+
;
60+
61+
forkWithWhereSortDescAndLimit
62+
required_capability: fork
63+
64+
FROM employees
65+
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
66+
( WHERE hire_date < "1988-03-01T00:00:00Z" | SORT first_name DESC NULLS LAST | LIMIT 2 )
67+
| KEEP _fork, emp_no, first_name
68+
| SORT _fork, first_name DESC
69+
;
70+
71+
_fork:keyword | emp_no:integer | first_name:keyword
72+
fork1 | 10009 | Sumant
73+
fork1 | 10048 | Florian
74+
fork2 | 10081 | Zhongwei
75+
fork2 | 10087 | Xinglin
76+
;
77+
78+
forkWithCommonPrefilter
79+
required_capability: fork
80+
81+
FROM employees
82+
| WHERE emp_no > 10050
83+
| FORK ( SORT emp_no ASC | LIMIT 2 )
84+
( SORT emp_no DESC NULLS LAST | LIMIT 2 )
85+
| KEEP _fork, emp_no
86+
| SORT _fork, emp_no
87+
;
88+
89+
_fork:keyword | emp_no:integer
90+
fork1 | 10051
91+
fork1 | 10052
92+
fork2 | 10099
93+
fork2 | 10100
94+
;

0 commit comments

Comments
 (0)