Skip to content
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
e860a12
Grammar changes
ioanatia Feb 6, 2025
f0a64c8
Generate grammar changes
ioanatia Feb 6, 2025
6398ece
Fork planning
ioanatia Feb 6, 2025
3c66945
Fix field resolution
ioanatia Feb 6, 2025
af7ce2a
Cleanup
ioanatia Feb 6, 2025
d6b627c
Add CSV tests
ioanatia Feb 6, 2025
7c557ec
Update docs/changelog/121948.yaml
ioanatia Feb 6, 2025
b19cb8c
[CI] Auto commit changes from spotless
Feb 6, 2025
3d652a4
fix forbidden apis
ChrisHegarty Feb 8, 2025
f944fde
Merge branch 'main' into fork
ChrisHegarty Feb 8, 2025
4bfaaed
javadoc
ChrisHegarty Feb 8, 2025
ca7e8d0
remove serialization of fork and Merge
ChrisHegarty Feb 8, 2025
41bcae9
fix equality
ChrisHegarty Feb 9, 2025
94a33eb
fix EsqlNodeSubclassTests
ChrisHegarty Feb 9, 2025
033a76a
add statement parser tests
ChrisHegarty Feb 9, 2025
f380d37
remove unnecessary serialization
ChrisHegarty Feb 9, 2025
aa74bfd
automatic fork branch ids start at 1
ChrisHegarty Feb 9, 2025
31caf30
add analyzer test
ChrisHegarty Feb 9, 2025
e6fe497
more tests
ChrisHegarty Feb 10, 2025
862f018
more tests
ChrisHegarty Feb 10, 2025
ed34a58
Merge branch 'main' into fork
ChrisHegarty Feb 10, 2025
b575a9c
minor itr
ChrisHegarty Feb 10, 2025
51203bf
Merge branch 'main' into fork
ChrisHegarty Feb 10, 2025
20c0a51
replace [] with ()
ChrisHegarty Feb 10, 2025
90ebc7b
Merge branch 'main' into fork
ChrisHegarty Feb 10, 2025
f444947
move fork eval to initial logical plan
ChrisHegarty Feb 10, 2025
c064318
Merge branch 'main' into fork
ChrisHegarty Feb 10, 2025
a7a23b5
simplify MergeOperator finished state
ChrisHegarty Feb 11, 2025
38c0577
enable CVS tests
ChrisHegarty Feb 11, 2025
001e00c
rework Fork to use StubRelation and Merge to be Nary
ChrisHegarty Feb 18, 2025
0e0ef78
reverts
ChrisHegarty Feb 18, 2025
057381a
fail hard if not LocalSourceExec
ChrisHegarty Feb 18, 2025
a990ff3
spotless
ChrisHegarty Feb 18, 2025
675ed14
Merge branch 'main' into fork
ChrisHegarty Feb 18, 2025
85dfe2e
Merge branch 'main' into fork
ChrisHegarty Feb 18, 2025
1a1aa80
no fork in fork yet
ChrisHegarty Feb 18, 2025
fb8f2e1
Merge branch 'main' into fork
ChrisHegarty Feb 18, 2025
f04dea5
Merge branch 'main' into fork
ChrisHegarty Feb 19, 2025
9cde990
itr
ChrisHegarty Feb 19, 2025
da136b4
itr
ChrisHegarty Feb 19, 2025
401c50d
itr
ChrisHegarty Feb 19, 2025
9141739
fix EsqlNodeSubclassTests
ChrisHegarty Feb 19, 2025
80ce2e7
Merge branch 'main' into fork
ChrisHegarty Feb 19, 2025
96d00c2
more tests and restrict NESTED_XX to snapshot
ChrisHegarty Feb 19, 2025
cb4d29d
fix method name
ChrisHegarty Feb 19, 2025
0d7e44b
Merge branch 'main' into fork
ChrisHegarty Feb 19, 2025
5a14086
check for fork cap before testing ForkIT
ChrisHegarty Feb 19, 2025
e062fed
Merge branch 'main' into fork
ChrisHegarty Feb 19, 2025
0e8e78d
Merge remote-tracking branch 'elasticsearch/main' into fork
ioanatia Feb 20, 2025
2a1100a
Move fork id alias logic to parser
ioanatia Feb 20, 2025
9d48ba0
Merge branch 'main' into fork
ioanatia Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121948.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121948
summary: Add initial grammar and changes for FORK
area: ES|QL
type: feature
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;

import java.util.List;
import java.util.ListIterator;
import java.util.function.Supplier;

/**
* A merge operator is effectively a "fan-in" operator - accepts input
* from several sources and provides it in a single output.
*/
public class MergeOperator extends SourceOperator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This operator is somewhat temporary, until we stream the subquery results.


public record MergeOperatorFactory(BlockSuppliers suppliers) implements SourceOperatorFactory {
@Override
public String describe() {
return "MergeOperator[suppliers=" + suppliers + "]";
}

@Override
public SourceOperator get(DriverContext driverContext) {
return new MergeOperator(driverContext.blockFactory(), suppliers);
}
}

private final BlockFactory blockFactory;
private final BlockSuppliers suppliers;
private boolean finished;
private ListIterator<Block[]> subPlanBlocks;

public MergeOperator(BlockFactory blockFactory, BlockSuppliers suppliers) {
super();
this.blockFactory = blockFactory;
this.suppliers = suppliers;
}

public interface BlockSuppliers extends Supplier<List<Block[]>> {};

@Override
public void finish() {}

@Override
public boolean isFinished() {
return finished;
}

@Override
public Page getOutput() {
if (subPlanBlocks == null) {
subPlanBlocks = suppliers.get().listIterator();
}

if (subPlanBlocks.hasNext()) {
return new Page(subPlanBlocks.next());
}
finished = true;
return null;
}

@Override
public void close() {
// release blocks from any subplan not fully consumed.
if (subPlanBlocks != null) {
while (subPlanBlocks.hasNext()) {
Releasables.close(subPlanBlocks.next());
}
}
}

@Override
public String toString() {
return "MergeOperator[subPlanBlocks=" + subPlanBlocks + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
//
// CSV spec for FORK command
//

simpleFork
required_capability: fork

FROM employees
| FORK ( WHERE emp_no == 10001 )
( WHERE emp_no == 10002 )
| KEEP emp_no, _fork
| SORT emp_no
;

emp_no:integer | _fork:keyword
10001 | fork1
10002 | fork2
;

forkWithWhereSortAndLimit
required_capability: fork

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
( WHERE hire_date < "1988-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
| KEEP emp_no, first_name, _fork
| SORT emp_no, _fork
;

emp_no:integer | first_name:keyword | _fork:keyword
10002 | Bezalel | fork2
10009 | Sumant | fork1
10014 | Berni | fork2
10048 | Florian | fork1
10058 | Berhard | fork2
10060 | Breannda | fork2
10094 | Arumugam | fork2
;

fiveFork
required_capability: fork

FROM employees
| FORK ( WHERE emp_no == 10005 )
( WHERE emp_no == 10004 )
( WHERE emp_no == 10003 )
( WHERE emp_no == 10002 )
( WHERE emp_no == 10001 )
| KEEP _fork, emp_no
| SORT _fork
;

_fork:keyword | emp_no:integer
fork1 | 10005
fork2 | 10004
fork3 | 10003
fork4 | 10002
fork5 | 10001
;

forkWithWhereSortDescAndLimit
required_capability: fork

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
( WHERE hire_date < "1988-03-01T00:00:00Z" | SORT first_name DESC NULLS LAST | LIMIT 2 )
| KEEP _fork, emp_no, first_name
| SORT _fork, first_name DESC
;

_fork:keyword | emp_no:integer | first_name:keyword
fork1 | 10009 | Sumant
fork1 | 10048 | Florian
fork2 | 10081 | Zhongwei
fork2 | 10087 | Xinglin
;

forkWithCommonPrefilter
required_capability: fork

FROM employees
| WHERE emp_no > 10050
| FORK ( SORT emp_no ASC | LIMIT 2 )
( SORT emp_no DESC NULLS LAST | LIMIT 2 )
| KEEP _fork, emp_no
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer
fork1 | 10051
fork1 | 10052
fork2 | 10099
fork2 | 10100
;
Loading