Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public CommandDescription generate(
}
}

int n = randomIntBetween(2, 10);
int n = randomIntBetween(2, 8);

String cmd = " | FORK " + "( WHERE true ) ".repeat(n) + " | WHERE _fork == \"fork" + randomIntBetween(1, n) + "\" | DROP _fork";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ public void testOneSubQuery() {
( WHERE content:"fox" )
""";
var e = expectThrows(ParsingException.class, () -> run(query));
assertTrue(e.getMessage().contains("Fork requires at least two branches"));
assertTrue(e.getMessage().contains("Fork requires at least 2 branches"));
}

public void testForkWithinFork() {
Expand Down Expand Up @@ -1047,6 +1047,17 @@ public void testProfile() {
}
}

public void testWithTooManySubqueries() {
var query = """
FROM test
| FORK (WHERE true) (WHERE true) (WHERE true) (WHERE true) (WHERE true)
(WHERE true) (WHERE true) (WHERE true) (WHERE true)
""";
var e = expectThrows(ParsingException.class, () -> run(query));
assertTrue(e.getMessage().contains("Fork requires less than 8 branches"));

}

private void createAndPopulateIndices() {
var indexName = "test";
var client = client().admin().indices();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,13 @@ private void checkForRemoteClusters(LogicalPlan plan, Source source, String comm
@SuppressWarnings("unchecked")
public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) {
List<PlanFactory> subQueries = visitForkSubQueries(ctx.forkSubQueries());
if (subQueries.size() < 2) {
throw new ParsingException(source(ctx), "Fork requires at least two branches");
if (subQueries.size() < Fork.MIN_BRANCHES) {
throw new ParsingException(source(ctx), "Fork requires at least " + Fork.MIN_BRANCHES + " branches");
}
if (subQueries.size() > Fork.MAX_BRANCHES) {
throw new ParsingException(source(ctx), "Fork requires less than " + Fork.MAX_BRANCHES + " branches");
}

return input -> {
checkForRemoteClusters(input, source(ctx), "FORK");
List<LogicalPlan> subPlans = subQueries.stream().map(planFactory -> planFactory.apply(input)).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,19 @@
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware, TelemetryAware {

public static final String FORK_FIELD = "_fork";
public static final int MAX_BRANCHES = 8;
public static final int MIN_BRANCHES = 2;
private final List<Attribute> output;

public Fork(Source source, List<LogicalPlan> children, List<Attribute> output) {
super(source, children);
if (children.size() < 2) {
throw new IllegalArgumentException("requires more than two subqueries, got:" + children.size());
if (children.size() < MIN_BRANCHES) {
throw new IllegalArgumentException("FORK requires more than " + MIN_BRANCHES + " branches, got: " + children.size());
}
if (children.size() > MAX_BRANCHES) {
throw new IllegalArgumentException("FORK requires less than " + MAX_BRANCHES + " subqueries, got: " + children.size());
}

this.output = output;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3354,7 +3354,15 @@ public void testForkAllReleasedCommands() {
( EVAL xyz = ( (a/b) * (b/a)) )
( WHERE a < 1 )
( KEEP a )
( DROP b )
| KEEP a
""";

var plan = statement(query);
assertThat(plan, instanceOf(Keep.class));

query = """
FROM foo*
| FORK
( RENAME a as c )
( MV_EXPAND a )
( CHANGE_POINT a on b )
Expand All @@ -3365,7 +3373,7 @@ public void testForkAllReleasedCommands() {
| KEEP a
""";

var plan = statement(query);
plan = statement(query);
assertThat(plan, instanceOf(Keep.class));
}

Expand All @@ -3383,7 +3391,15 @@ public void testForkAllCommands() {
( EVAL xyz = ( (a/b) * (b/a)) )
( WHERE a < 1 )
( KEEP a )
( DROP b )
| KEEP a
""";
var plan = statement(query);
assertThat(plan, instanceOf(Keep.class));

query = """
FROM foo*
| FORK
( RENAME a as c )
( MV_EXPAND a )
( CHANGE_POINT a on b )
Expand All @@ -3392,22 +3408,36 @@ public void testForkAllCommands() {
( FORK ( WHERE a:"baz" ) ( EVAL x = [ 1, 2, 3 ] ) )
( COMPLETION a = b WITH c )
( SAMPLE 0.99 )
| KEEP a
""";
plan = statement(query);
assertThat(plan, instanceOf(Keep.class));

query = """
FROM foo*
| FORK
( INLINESTATS x = MIN(a), y = MAX(b) WHERE d > 1000 )
( INSIST_🐔 a )
( LOOKUP_🐔 a on b )
| KEEP a
""";

var plan = statement(query);
plan = statement(query);
assertThat(plan, instanceOf(Keep.class));
}

public void testInvalidFork() {
expectError("FROM foo* | FORK (WHERE a:\"baz\")", "line 1:13: Fork requires at least two branches");
expectError("FROM foo* | FORK (LIMIT 10)", "line 1:13: Fork requires at least two branches");
expectError("FROM foo* | FORK (SORT a)", "line 1:13: Fork requires at least two branches");
expectError("FROM foo* | FORK (WHERE x>1 | LIMIT 5)", "line 1:13: Fork requires at least two branches");
expectError("FROM foo* | WHERE x>1 | FORK (WHERE a:\"baz\")", "Fork requires at least two branches");
expectError("FROM foo* | FORK (WHERE a:\"baz\")", "line 1:13: Fork requires at least 2 branches");
expectError("FROM foo* | FORK (LIMIT 10)", "line 1:13: Fork requires at least 2 branches");
expectError("FROM foo* | FORK (SORT a)", "line 1:13: Fork requires at least 2 branches");
expectError("FROM foo* | FORK (WHERE x>1 | LIMIT 5)", "line 1:13: Fork requires at least 2 branches");
expectError("FROM foo* | WHERE x>1 | FORK (WHERE a:\"baz\")", "Fork requires at least 2 branches");

expectError("""
FROM foo*
| FORK (where true) (where true) (where true) (where true)
(where true) (where true) (where true) (where true)
(where true)
""", "Fork requires less than 8 branches");

expectError("FROM foo* | FORK ( x+1 ) ( WHERE y>2 )", "line 1:20: mismatched input 'x+1'");
expectError("FROM foo* | FORK ( LIMIT 10 ) ( y+2 )", "line 1:33: mismatched input 'y+2'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ private Object makeMap(Class<? extends Node<?>> toBuildClass, ParameterizedType

private int randomSizeForCollection(Class<? extends Node<?>> toBuildClass) {
int minCollectionLength = 0;
int maxCollectionLength = 10;
int maxCollectionLength = 8;
Copy link
Contributor Author

@ioanatia ioanatia Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting this to 8 for now which is the max number of FORK branches - I will follow up and improve this check so it's dependent on toBuildClass


if (hasAtLeastTwoChildren(toBuildClass)) {
minCollectionLength = 2;
Expand Down