Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,29 @@ static PreAnalysisResult fieldNames(LogicalPlan parsed, Set<String> enrichPolicy
}
projectAll.set(true);
});

if (projectAll.get()) {
return result.withFieldNames(IndexResolver.ALL_FIELDS);
}

Holder<Boolean> projectAfterFork = new Holder<>(false);
Holder<Boolean> hasFork = new Holder<>(false);

parsed.forEachDown(plan -> {
if (hasFork.get() == false && (plan instanceof Project || plan instanceof Aggregate)) {
projectAfterFork.set(true);
}

if (plan instanceof Fork fork && projectAfterFork.get() == false) {
hasFork.set(true);
fork.children().forEach(child -> {
if (child.anyMatch(p -> p instanceof Project || p instanceof Aggregate) == false) {
projectAll.set(true);
}
});
}
});

if (projectAll.get()) {
return result.withFieldNames(IndexResolver.ALL_FIELDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1796,7 +1796,7 @@ public void testDropWildcardFields_WithLookupJoin() {
);
}

public void testForkFieldsWithKeep() {
public void testForkFieldsWithKeepAfterFork() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

assertFieldNames("""
Expand All @@ -1810,6 +1810,20 @@ public void testForkFieldsWithKeep() {
""", Set.of("a", "a.*", "c", "c.*", "d", "d.*", "e", "e.*", "x", "x.*", "y", "y.*"));
}

public void testForkFieldsWithKeepBeforeFork() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

assertFieldNames("""
FROM test
| KEEP a, b, c, d, x
| WHERE a > 2000
| EVAL b = a + 100
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
(WHERE d > 1000 AND e == "aaa" | EVAL c = a + 200)
| WHERE x > y
""", Set.of("a", "a.*", "b", "b.*", "c", "c.*", "d", "d.*", "e", "e.*", "x", "x.*", "y", "y.*"));
}

public void testForkFieldsWithNoProjection() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

Expand All @@ -1833,23 +1847,62 @@ public void testForkFieldsWithStatsInOneBranch() {
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
(STATS x = count(*), y=min(z))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm - now that I am looking at this more - I think we should return all fields, because one branch is not bounded by any command like KEEP or STATS that resets the output to a known list of fields. will fix 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one more query where I am confused, in part because of probably not knowing what is the expectation for fork.

FROM employees | FORK ( WHERE true | stats min(salary) by gender) ( WHERE true | LIMIT 3 )

This shows these columns:

min(salary) | gender | _fork | salary

Should these be a "union" kind of set of columns and fieldNames is only limiting it to what it "sees" in the fork's first branch? If that's true, this means that fieldNames should consider a union kind of field names from all the branches of fork. As a shortcut, the first branch that it finds that's the "widest" it should stop checking the rest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The FORK output should be a union of the outputs of the fork branches.

If we detect that the same field exists in multiple fork branches outputs, but the field has different type, we fail with a validation error.

FORK will pad with null columns the output of FORK branches that are missing fields.
For example:

ROW a = [1, 2, 3], b = "foo"
| MV_EXPAND a
| FORK (WHERE true | LIMIT 2)
        (STATS x = count(*))
        (WHERE a % 2 == 0 | EVAL y = 2)
| SORT _fork, a
| KEEP _fork, a, b, x, y

Will produce:

     _fork     |       a       |       b       |       x       |       y       
---------------+---------------+---------------+---------------+---------------
fork1          |1              |foo            |null           |null           
fork1          |2              |foo            |null           |null           
fork2          |null           |null           |3              |null           
fork3          |2              |foo            |null           |2          

I think fixed the field resolution for this case.
In this test example here we should ask for all fields, since not all branches are bounded by an Aggregate or Project.

| WHERE x > y
""", Set.of("a", "a.*", "c", "c.*", "z", "z.*"));
""", ALL_FIELDS);
}

public void testForkFieldsWithEnrichAndLookupJoins() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());
assumeTrue("LOOKUP JOIN available as snapshot only", EsqlCapabilities.Cap.JOIN_LOOKUP_V12.isEnabled());

assertFieldNames(
"""
FROM test
| KEEP a, b, abc, def, z, xyz
| ENRICH enrich_policy ON abc
| EVAL b = a + 100
| LOOKUP JOIN my_lookup_index ON def
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
(STATS x = count(*), y=min(z))
| LOOKUP JOIN my_lookup_index ON xyz
| WHERE x > y OR _fork == "fork1"
""",
Set.of(
"x",
"y",
"_fork",
"a",
"c",
"abc",
"b",
"def",
"z",
"xyz",
"def.*",
"_fork.*",
"y.*",
"x.*",
"xyz.*",
"z.*",
"abc.*",
"a.*",
"c.*",
"b.*"
)
);
}

public void testForkWithStatsInAllBranches() {
assumeTrue("FORK available as snapshot only", EsqlCapabilities.Cap.FORK.isEnabled());

assertFieldNames("""
FROM test
| ENRICH enrich_policy ON abc
| WHERE a > 2000
| EVAL b = a + 100
| LOOKUP JOIN my_lookup_index ON def
| FORK (WHERE c > 1 AND a < 10000 | EVAL d = a + 500)
| FORK (WHERE c > 1 AND a < 10000 | STATS m = count(*))
(EVAL z = a * b | STATS m = max(z))
(STATS x = count(*), y=min(z))
| LOOKUP JOIN my_lookup_index ON xyz
| WHERE x > y
""", Set.of("a", "a.*", "abc", "abc.*", "c", "c.*", "def", "def.*", "x", "x.*", "xyz", "xyz.*", "y", "y.*", "z", "z.*"));
""", Set.of("a", "a.*", "b", "b.*", "c", "c.*", "z", "z.*"));
}

private Set<String> fieldNames(String query, Set<String> enrichPolicyMatchFields) {
Expand Down