Skip to content
138 changes: 138 additions & 0 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,141 @@ fork1 | 10052
fork2 | 10099
fork2 | 10100
;

forkWithEvals
required_capability: fork

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
(WHERE emp_no == 10081 OR emp_no == 10087 | EVAL x = "def" | EVAL z = 2)
| KEEP _fork, emp_no, x, y, z
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:keyword | y:integer | z:integer
fork1 | 10048 | abc | 1 | null
fork1 | 10081 | abc | 1 | null
fork2 | 10081 | null | null | 2
fork2 | 10087 | null | null | 2
;

forkWithKeep-Ignore
required_capability: fork

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | KEEP emp_no, first_name)
(WHERE emp_no == 10081 OR emp_no == 10087 | KEEP emp_no)
| SORT _fork, emp_no, first_name
;

_fork:keyword | emp_no:integer | first_name:keyword
1 | 2 | 3
;


forkWithStats
required_capability: fork

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
(WHERE emp_no == 10081 OR emp_no == 10087)
(STATS x = COUNT(*), y = MAX(emp_no), z = MIN(emp_no))
(STATS x = COUNT(*), y = MIN(emp_no))
| KEEP _fork, emp_no, x, y, z
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:long | y:integer | z:integer
fork1 | 10048 | null | null | null
fork1 | 10081 | null | null | null
fork2 | 10081 | null | null | null
fork2 | 10087 | null | null | null
fork3 | null | 100 | 10100 | 10001
fork4 | null | null | null | null
;

forkWithDrop-Ignore
required_capability: fork

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | DROP last_name )
(WHERE emp_no == 10081 OR emp_no == 10087 | DROP first_name )
| KEEP _fork, emp_no, first_name, last_name
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | first_name:keyword | last_name:keyword
1 | 2 | 3 | 4
;

forkWithRename-Ignore
required_capability: fork

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | RENAME first_name AS x, last_name AS y)
(WHERE emp_no == 10081 OR emp_no == 10087 | RENAME last_name AS x)
| KEEP _fork, emp_no, first_name, last_name
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | first_name:keyword | last_name:keyword
1 | 2 | 3 | 4
;

forkWithGrok-Ignore
required_capability: fork

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK (EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| GROK a "%{WORD:x} %{INTEGER:y} %{WORD:z}")
(EVAL b = CONCAT(last_name, " ", emp_no::keyword, " ", first_name)
| GROK b "%{WORD:x} %{INTEGER:y} %{WORD:w}")
| KEEP _fork, emp_no, x, y, z, w
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:keyword | y:integer | z:keyword | x:keyword
1 | 2 | 3 | 4 | 5 | 6
;

forkWithDissect
FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK (EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| DISSECT a "%{x} %{y} %{z}" )
(EVAL b = CONCAT(last_name, " ", emp_no::keyword, " ", first_name)
| DISSECT b "%{x} %{y} %{w}" )
| KEEP _fork, emp_no, x, y, z, w
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:keyword | y:keyword | z:keyword | w:keyword
fork1 | 10048 | Florian | 10048 | Syrotiuk | null
fork1 | 10081 | Zhongwei | 10081 | Rosen | null
fork2 | 10048 | null | null | null | Florian
fork2 | 10081 | null | null | null | Zhongwei
;

forkWithMixOfCommands
FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name)
| DISSECT a "%{x} %{y} %{z}"
| EVAL y = y::integer )
( STATS x = COUNT(*), y = MAX(emp_no), z = MIN(emp_no) )
( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name )
( EVAL x = "abc" | EVAL y = 1 )
| KEEP _fork, emp_no, x, y, z, a
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:keyword | y:integer | z:keyword | a:keyword
fork1 | 10048 | Florian | 10048 | Syrotiuk | Florian 10048 Syrotiuk
fork1 | 10081 | Zhongwei | 10081 | Rosen | Zhongwei 10081 Rosen
fork2 | null | null | null | null | null
fork3 | 10048 | null | null | null | null
fork3 | 10081 | null | null | null | null
fork4 | 10048 | null | null | null | null
fork4 | 10081 | null | null | null | null
;
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.junit.Before;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
Expand Down Expand Up @@ -501,6 +502,72 @@ public void testSubqueryWithoutLimitOnly() { // this should
}
}

public void testWithEvalSimple() {
var query = """
FROM test
| FORK ( EVAL a = 1 )
( EVAL a = 2 )
| KEEP a, _fork
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("a", "_fork"));
}
}

public void testWithEvalDifferentOutputs() {
var query = """
FROM test
| FORK ( EVAL a = 1 )
( EVAL b = 2 )
| KEEP a, b, _fork
""";
try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("a", "b", "_fork"));
}
}

public void testWithStatsSimple() {
var query = """
FROM test
| FORK (STATS x=COUNT(*), y=VALUES(id))
(WHERE id == 2)
| KEEP _fork, x, y, id
""";
try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("_fork", "x", "y", "id"));
Iterable<Iterable<Object>> expectedValues = List.of(
Arrays.stream(new Object[] { "fork1", 6L, List.of(1, 2, 3, 4, 5, 6), null }).toList(),
Arrays.stream(new Object[] { "fork2", null, null, 2 }).toList()
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithMultipleCommandsAndBranches() {
var query = """
FROM test METADATA _score
| FORK (STATS x=COUNT(*), y=VALUES(id) | MV_EXPAND y )
(WHERE content:"fox" | EVAL new_score = _score + 20 | SORT new_score DESC)
(WHERE content:"dog" | EVAL new_score = _score + 1 | SORT new_score DESC | LIMIT 2 )
| KEEP _fork, new_score, _score, x, y
""";
// fails with a syntax error, does not like MV_EXPAND
// run(query);
}

public void testWithEvalWithConflictingTypes() {
var query = """
FROM test
| FORK ( EVAL a = 1 )
( EVAL a = "aaaa" )
| KEEP a, _fork
""";

var e = expectThrows(VerificationException.class, () -> run(query));
assertTrue(e.getMessage().contains("Column [a] has conflicting data types"));
}

public void testSubqueryWithUnknownField() {
var query = """
FROM test
Expand Down Expand Up @@ -565,6 +632,19 @@ public void testSubqueryWithUnknownFieldInSort() {
assertTrue(e.getMessage().contains("Unknown column [bar]"));
}

public void testSubqueryWithUnknownFieldInEval() {
var query = """
FROM test
| FORK
( EVAL x = baz + 1)
( WHERE content:"cat" )
| KEEP _fork, id, content
| SORT _fork, id
""";
var e = expectThrows(VerificationException.class, () -> run(query));
assertTrue(e.getMessage().contains("Unknown column [baz]"));
}

public void testOneSubQuery() {
var query = """
FROM test
Expand Down
12 changes: 10 additions & 2 deletions x-pack/plugin/esql/src/main/antlr/EsqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,17 @@ forkSubQueryCommand
;

forkSubQueryProcessingCommand
: whereCommand
| sortCommand
: evalCommand
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use processingCommand and exclude the commands that are not supported by Fork.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually having a bit of trouble with the grammar.
Even if I use processingCommand here, there are some combinations that fail with a parsing exception:

this works:

ROW a=[1,2,3]
| FORK (EVAL a = [2,3 ] | MV_EXPAND a | WHERE a == 2)
       (MV_EXPAND a | WHERE a == 2 )

this fails with a parsing exception:

FROM search-movies
| FORK (STATS x = COUNT(*), y = VALUES(title) | MV_EXPAND y)
       (WHERE title:"Journey")

error:

{
  "error": {
    "root_cause": [
      {
        "type": "parsing_exception",
        "reason": "line 3:66: token recognition error at: ')'"
      }
    ],
    "type": "parsing_exception",
    "reason": "line 3:66: token recognition error at: ')'",
    "caused_by": {
      "type": "lexer_no_viable_alt_exception",
      "reason": null
    }
  },
  "status": 400
}

I am able to use WHERE/LIMIT/SORT/DISSECT/EVAL/STATS without issues.
But using commands like MV_EXPAND/KEEP/RENAME/DROP/GROK in FORK branches fails with a parsing errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that ultimately we should be able to effectively remove this list and replace it with processingCommand (that was my original intention when I added it), but this PR is a good step forward in that direction. Let's decouple this, as it will need even more extensive and new testing which is better in a subsequent PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am able to use WHERE/LIMIT/SORT/DISSECT/EVAL/STATS without issues. But using commands like MV_EXPAND/KEEP/RENAME/DROP/GROK in FORK branches fails with a parsing errors.

GROK does not throw ParsingException for me if it is added under forkSubQueryProcessingCommand. I wonder if it is related to the mode of the commands, the commands that can be recognized under FORK have EXPRESSION_MODE.

+ curl -u elastic:password -v -X POST 'localhost:9200/_query?format=txt&pretty' -H 'Content-Type: application/json' '-d
{
  "query": "from sample_data | fork (grok message \"%{WORD:x} %{WORD:y}\") (dissect message \"%{x} %{y}\") | keep message, x, y, _fork"
}

       message       |       x       |       y       |     _fork     
---------------------+---------------+---------------+---------------
Connected to 10.1.0.3|Connected      |to             |fork1          
Connected to 10.1.0.2|Connected      |to             |fork1          
Disconnected         |null           |null           |fork1          
Connection error     |Connection     |error          |fork1          
Connection error     |Connection     |error          |fork1          
Connection error     |Connection     |error          |fork1          
Connected to 10.1.0.1|Connected      |to             |fork1          
Connected to 10.1.0.3|Connected      |to 10.1.0.3    |fork2          
Connected to 10.1.0.2|Connected      |to 10.1.0.2    |fork2          
Disconnected         |null           |null           |fork2          
Connection error     |Connection     |error          |fork2          
Connection error     |Connection     |error          |fork2          
Connection error     |Connection     |error          |fork2          
Connected to 10.1.0.1|Connected      |to 10.1.0.1    |fork2   

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is related to the mode of the commands

you are probably right - I'd like to follow up on the grammar issue separately if that's okay.
if I recall correctly for GROK I was hitting a parsing issue when the FORK subbranch contained multiple commands and not just GROK.

| whereCommand
| keepCommand
| limitCommand
| statsCommand
| sortCommand
| dropCommand
| renameCommand
| dissectCommand
| grokCommand
| mvExpandCommand
;

rrfCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.EmptyAttribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
Expand Down Expand Up @@ -695,10 +696,53 @@ private Join resolveLookupJoin(LookupJoin join) {
private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
List<LogicalPlan> subPlans = fork.subPlans();

List<LogicalPlan> newSubPlans = new ArrayList<>();
List<LogicalPlan> resolvedSubPlans = new ArrayList<>();
boolean unresolved = false;

// first resolve the subplans
for (var logicalPlan : subPlans) {
newSubPlans.add(logicalPlan.transformUp(LogicalPlan.class, p -> p.childrenResolved() == false ? p : rule(p, context)));
Source source = logicalPlan.source();
LogicalPlan newSubPlan = logicalPlan.transformUp(
LogicalPlan.class,
p -> p.childrenResolved() == false ? p : rule(p, context)
);
if (newSubPlan.resolved() == false) {
unresolved = true;
}
resolvedSubPlans.add(newSubPlan);
}

fork = new Fork(fork.source(), fork.child(), resolvedSubPlans);

// if any of the sub plans is still unresolved we should just return before
// we attempt to align the outputs of the sub plans
if (unresolved) {
return fork;
}

// we align the outputs of the sub plans such that they have the same columns
boolean changed = false;
List<LogicalPlan> newSubPlans = new ArrayList<>();
for (var logicalPlan : resolvedSubPlans) {
Source source = logicalPlan.source();

AttributeSet missing = fork.outputSet().subtract(logicalPlan.outputSet());
List<Alias> aliases = missing.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use missing.forEach() instead of heavier streams, but optional / preference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this as it is since I felt using map is more natural here - happy to change it if you have a strong preference to use forEach.

.map(attr -> new Alias(source, attr.name(), new Literal(source, null, attr.dataType())))
.collect(Collectors.toList());
;

if (aliases.size() > 0) {
logicalPlan = new Eval(source, logicalPlan, aliases);
changed = true;
}

newSubPlans.add(logicalPlan);
}
if (changed == false) {
return fork;
}

return new Fork(fork.source(), fork.child(), newSubPlans);
}

Expand Down

Large diffs are not rendered by default.

Loading