Skip to content
117 changes: 93 additions & 24 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,25 @@ fork2 | 10087 | 5 | null
fork3 | 10081 | 2 | French
;

forkBranchWithLookupJoin
required_capability: fork_v8

FROM employees
| EVAL language_code = languages
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | LOOKUP JOIN languages_lookup ON language_code)
(WHERE emp_no == 10081 OR emp_no == 10087 | LOOKUP JOIN languages_lookup ON language_code)
(WHERE emp_no == 10081 | EVAL language_name = "Klingon" | LOOKUP JOIN languages_lookup ON language_code)
| KEEP _fork, emp_no, language_code, language_name
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | language_code:integer | language_name:keyword
fork1 | 10048 | 3 | Spanish
fork1 | 10081 | 2 | French
fork2 | 10081 | 2 | French
fork2 | 10087 | 5 | null
fork3 | 10081 | 2 | French
;

forkBeforeStats
required_capability: fork_v8
Expand Down Expand Up @@ -351,17 +370,21 @@ German | null | fork2
Spanish | null | fork2
;

forkBranchWithDrop-Ignore
forkBranchWithDrop
required_capability: fork_v8

FROM languages
| FORK ( DROP language_code | WHERE language_name == "English" | EVAL x = 1 )
| FORK ( EVAL x = 1 | DROP language_code | WHERE language_name == "English" | DROP x )
( WHERE language_name != "English" )
| SORT _fork, language_name
| KEEP language_name, language_code, _fork
;

language_name:keyword | language_code:integer | x:integer | _fork:keyword
a | 1 | 2
language_name:keyword | language_code:integer | _fork:keyword
English | null | fork1
French | 2 | fork2
German | 4 | fork2
Spanish | 3 | fork2
;


Expand All @@ -381,6 +404,22 @@ French | fork1
English | fork2
;

forkBranchWithKeep
required_capability: fork_v8

FROM languages
| FORK ( WHERE language_name == "English" | KEEP language_name, language_code )
( WHERE language_name != "English" )
| SORT _fork, language_name
;

language_name:keyword | language_code:integer | _fork:keyword
English | 1 | fork1
French | 2 | fork2
German | 4 | fork2
Spanish | 3 | fork2
;

forkBeforeRename
required_capability: fork_v8

Expand All @@ -397,19 +436,39 @@ code:integer | language_name:keyword | _fork:keyword
1 | English | fork2
;

forkBranchWithRenameAs-Ignore
forkBranchWithRenameAs
required_capability: fork_v8

FROM languages
| FORK (RENAME language_code AS code | WHERE code == 1 OR code == 2)
(WHERE language_code == 1 | RENAME language_code AS x)
| SORT _fork, language_name
| KEEP code, language_name, x, _fork
;

code:integer | language_name:keyword | x:integer | _fork:keyword
1 | English | null | fork1
2 | French | null | fork1
null | English | 1 | fork2
;

forkBranchWithRenameEquals
required_capability: fork_v8

FROM languages
| FORK (RENAME code = language_code | WHERE code == 1 OR code == 2)
(WHERE language_code == 1 | RENAME x = language_code)
| SORT _fork, language_name
| KEEP code, language_name, x, _fork
;

language_name:keyword | x:integer | code:integer | _fork:keyword
1 | 2 | 3 | 4
code:integer | language_name:keyword | x:integer | _fork:keyword
1 | English | null | fork1
2 | French | null | fork1
null | English | 1 | fork2
;


forkAfterRename
required_capability: fork_v8

Expand Down Expand Up @@ -502,19 +561,24 @@ Japan | Tokyo | null
Japan | Tokyo | null | fork2
;

forkBranchWithEnrich-Ignore
forkBranchWithEnrich
required_capability: fork_v8

FROM addresses
| KEEP city.country.continent.planet.name, city.country.name, city.name
| EVAL city.name = REPLACE(city.name, "San Francisco", "South San Francisco")
| FORK (ENRICH city_names ON city.name WITH city.country.continent.planet.name = airport)
(WHERE city.country.name == "Japan")
(ENRICH city_names ON city.name WITH city.country.continent.planet.name = airport)
| SORT _fork, city.name
;

city.country.name:keyword | city.name:keyword | city.country.continent.planet.name:text | _fork:keyword
Netherlands | Amsterdam | null | a
Netherlands | Amsterdam | null | fork1
United States of America | South San Francisco | San Francisco Int'l | fork1
Japan | Tokyo | null | fork1
Netherlands | Amsterdam | null | fork2
United States of America | South San Francisco | San Francisco Int'l | fork2
Japan | Tokyo | null | fork2
;

forkBeforeEnrich
Expand Down Expand Up @@ -556,7 +620,7 @@ emp_no:integer | job_positions:keyword | _fork:keyword
10087 | Junior Developer | fork2
;

forkBranchWithMvExpand-Ignore
forkBranchWithMvExpand
required_capability: fork_v8

FROM employees
Expand All @@ -566,8 +630,12 @@ FROM employees
| SORT _fork, emp_no, job_positions
;

emp_no:integer | job_positions:keyword | _fork:keyword
1 | 2 | 3
emp_no:integer | job_positions:keyword | _fork:keyword
10048 | Internship | fork1
10081 | Accountant | fork1
10081 | Internship | fork1
10081 | [Accountant, Internship] | fork2
10087 | [Internship, Junior Developer] | fork2
;

forkAfterMvExpand
Expand Down Expand Up @@ -673,17 +741,18 @@ required_capability: fork_v8
FROM employees
| KEEP emp_no, salary
| FORK (EVAL salary=CASE(emp_no==10042, 1000000, salary)
| CHANGE_POINT salary ON emp_no
| STATS COUNT() by type)
(WHERE emp_no == 10081 OR emp_no == 10087)
| SORT _fork, type, emp_no
;

COUNT():long | type:keyword | _fork:keyword | emp_no:integer | salary:long
1 | spike | fork1 | null | null
99 | null | fork1 | null | null
null | null | fork2 | 10081 | 50128
null | null | fork2 | 10087 | 32272
| CHANGE_POINT salary ON emp_no)
(EVAL salary=CASE(emp_no==10087, 1000000, salary)
| CHANGE_POINT salary ON emp_no)
| STATS COUNT() by type, _fork
| SORT _fork, type
;

COUNT():long | type:keyword | _fork:keyword
1 | spike | fork1
99 | null | fork1
1 | spike | fork2
99 | null | fork2
;

forkAfterChangePoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,53 @@ public void testWithUnionTypesInBranches() {
}
}

public void testWithDrop() {
var query = """
FROM test
| WHERE id > 2
| FORK
( WHERE content:"fox" | DROP content)
( WHERE content:"dog" | DROP content)
| KEEP id, _fork
| SORT id, _fork
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("id", "_fork"));
assertColumnTypes(resp.columns(), List.of("integer", "keyword"));
Iterable<Iterable<Object>> expectedValues = List.of(
List.of(3, "fork2"),
List.of(4, "fork2"),
List.of(6, "fork1"),
List.of(6, "fork2")
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithKeep() {
var query = """
FROM test
| WHERE id > 2
| FORK
( WHERE content:"fox" | KEEP id)
( WHERE content:"dog" | KEEP id)
| SORT id, _fork
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("id", "_fork"));
assertColumnTypes(resp.columns(), List.of("integer", "keyword"));
Iterable<Iterable<Object>> expectedValues = List.of(
List.of(3, "fork2"),
List.of(4, "fork2"),
List.of(6, "fork1"),
List.of(6, "fork2")
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithEvalWithConflictingTypes() {
var query = """
FROM test
Expand Down Expand Up @@ -853,6 +900,16 @@ public void testOneSubQuery() {
assertTrue(e.getMessage().contains("Fork requires at least two branches"));
}

public void testForkWithinFork() {
var query = """
FROM test
| FORK ( FORK (WHERE true) (WHERE true) )
( FORK (WHERE true) (WHERE true) )
""";
var e = expectThrows(VerificationException.class, () -> run(query));
assertTrue(e.getMessage().contains("Only a single FORK command is allowed, but found multiple"));
}

public void testProfile() {
var query = """
FROM test
Expand Down
10 changes: 1 addition & 9 deletions x-pack/plugin/esql/src/main/antlr/EsqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,7 @@ forkSubQueryCommand
;

forkSubQueryProcessingCommand
: evalCommand
| whereCommand
| limitCommand
| statsCommand
| sortCommand
| dissectCommand
| changePointCommand
| completionCommand
| grokCommand
: processingCommand
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we added fork initially, it was a goal to get to this point - well done!

;

rrfCommand
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/esql/src/main/antlr/lexer/ChangePoint.g4
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ CHANGE_POINT : 'change_point' -> pushMode(CHANGE_POINT_MODE);

mode CHANGE_POINT_MODE;
CHANGE_POINT_PIPE : PIPE -> type(PIPE), popMode;
CHANGE_POINT_RP : RP -> type(RP), popMode, popMode;
CHANGE_POINT_ON : ON -> type(ON);
CHANGE_POINT_AS : AS -> type(AS);
CHANGE_POINT_DOT: DOT -> type(DOT);
Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugin/esql/src/main/antlr/lexer/Enrich.g4
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ ENRICH : 'enrich' -> pushMode(ENRICH_MODE);

mode ENRICH_MODE;
ENRICH_PIPE : PIPE -> type(PIPE), popMode;
ENRICH_RP : RP -> type(RP), popMode, popMode;
ENRICH_OPENING_BRACKET : OPENING_BRACKET -> type(OPENING_BRACKET), pushMode(SETTING_MODE);

ENRICH_ON : ON -> type(ON), pushMode(ENRICH_FIELD_MODE);
Expand Down Expand Up @@ -49,6 +50,7 @@ ENRICH_WS
// submode for Enrich to allow different lexing between policy source (loose) and field identifiers
mode ENRICH_FIELD_MODE;
ENRICH_FIELD_PIPE : PIPE -> type(PIPE), popMode, popMode;
ENRICH_FIELD_RP : RP -> type(RP), popMode, popMode, popMode;
ENRICH_FIELD_ASSIGN : ASSIGN -> type(ASSIGN);
ENRICH_FIELD_COMMA : COMMA -> type(COMMA);
ENRICH_FIELD_DOT: DOT -> type(DOT);
Expand Down Expand Up @@ -82,6 +84,7 @@ ENRICH_FIELD_WS

mode SETTING_MODE;
SETTING_CLOSING_BRACKET : CLOSING_BRACKET -> type(CLOSING_BRACKET), popMode;
SETTING_RP : RP -> type(RP), popMode, popMode, popMode;

SETTING_COLON : COLON -> type(COLON);

Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/esql/src/main/antlr/lexer/Fork.g4
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ DEV_FORK : {this.isDevVersion()}? 'fork' -> pushMode(FORK_MODE);

mode FORK_MODE;
FORK_LP : LP -> type(LP), pushMode(DEFAULT_MODE);
FORK_RO : RP -> type(RP), popMode, popMode;
FORK_PIPE : PIPE -> type(PIPE), popMode;

FORK_WS : WS -> channel(HIDDEN);
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/esql/src/main/antlr/lexer/MvExpand.g4
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ MV_EXPAND : 'mv_expand' -> pushMode(MVEXPAND_MODE);

mode MVEXPAND_MODE;
MVEXPAND_PIPE : PIPE -> type(PIPE), popMode;
MVEXPAND_RP : RP -> type(RP), popMode, popMode;
MVEXPAND_DOT: DOT -> type(DOT);
MVEXPAND_PARAM : PARAM -> type(PARAM);
MVEXPAND_NAMED_OR_POSITIONAL_PARAM : NAMED_OR_POSITIONAL_PARAM -> type(NAMED_OR_POSITIONAL_PARAM);
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/esql/src/main/antlr/lexer/Project.g4
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ DEV_INSIST : {this.isDevVersion()}? 'insist_🐔' -> pushMode(PROJECT_

mode PROJECT_MODE;
PROJECT_PIPE : PIPE -> type(PIPE), popMode;
PROJECT_RP : RP -> type(RP), popMode, popMode;
PROJECT_DOT: DOT -> type(DOT);
PROJECT_COMMA : COMMA -> type(COMMA);
PROJECT_PARAM : PARAM -> type(PARAM);
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/esql/src/main/antlr/lexer/Rename.g4
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RENAME : 'rename' -> pushMode(RENAME_MODE);

mode RENAME_MODE;
RENAME_PIPE : PIPE -> type(PIPE), popMode;
RENAME_RP : RP -> type(RP), popMode, popMode;
RENAME_ASSIGN : ASSIGN -> type(ASSIGN);
RENAME_COMMA : COMMA -> type(COMMA);
RENAME_DOT: DOT -> type(DOT);
Expand Down
Loading