Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled;
import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V3;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V4;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2;
import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V6;
Expand Down Expand Up @@ -132,7 +132,7 @@ protected void shouldSkipTest(String testName) throws IOException {
assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName()));
// Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented.
assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName()));
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V3.capabilityName()));
assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V4.capabilityName()));
}

@Override
Expand Down
41 changes: 31 additions & 10 deletions x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//

simpleFork
required_capability: fork_v3
required_capability: fork_v4

FROM employees
| FORK ( WHERE emp_no == 10001 )
Expand All @@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword
;

forkWithWhereSortAndLimit
required_capability: fork_v3
required_capability: fork_v4

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 )
Expand All @@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword
;

fiveFork
required_capability: fork_v3
required_capability: fork_v4

FROM employees
| FORK ( WHERE emp_no == 10005 )
Expand All @@ -59,7 +59,7 @@ fork5 | 10001
;

forkWithWhereSortDescAndLimit
required_capability: fork_v3
required_capability: fork_v4

FROM employees
| FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 )
Expand All @@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin
;

forkWithCommonPrefilter
required_capability: fork_v3
required_capability: fork_v4

FROM employees
| WHERE emp_no > 10050
Expand All @@ -94,7 +94,7 @@ fork2 | 10100
;

forkWithSemanticSearchAndScore
required_capability: fork_v3
required_capability: fork_v4
required_capability: semantic_text_field_caps
required_capability: metadata_score

Expand All @@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w
;

forkWithEvals
required_capability: fork_v3
required_capability: fork_v4

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1)
Expand All @@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2
;

forkWithStats
required_capability: fork_v3
required_capability: fork_v4

FROM employees
| FORK (WHERE emp_no == 10048 OR emp_no == 10081)
Expand All @@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null
;

forkWithDissect
required_capability: fork_v3
required_capability: fork_v4

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
Expand All @@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei
;

forkWithMixOfCommands
required_capability: fork_v3
required_capability: fork_v4

FROM employees
| WHERE emp_no == 10048 OR emp_no == 10081
Expand All @@ -195,3 +195,24 @@ fork3 | 10081 | Rosen | null | null | null
fork4 | 10048 | abc | aaa | null | null
fork4 | 10081 | abc | aaa | null | null
;

forkWithFiltersOnConstantValues
required_capability: fork_v4

FROM employees
| EVAL z = 1
| WHERE z == 1
| FORK (WHERE emp_no == 10048 OR emp_no == 10081 | WHERE z - 1 == 0)
(WHERE emp_no == 10081 OR emp_no == 10087 | EVAL a = "x" )
(STATS x = COUNT(*), y = MAX(emp_no), z = MIN(emp_no) | EVAL a = "y" )
(STATS x = COUNT(*), y = MIN(emp_no))
| WHERE _fork == "fork2" OR a == "y"
| KEEP _fork, emp_no, x, y, z
| SORT _fork, emp_no
;

_fork:keyword | emp_no:integer | x:long | y:integer | z:integer
fork2 | 10081 | null | null | 1
fork2 | 10087 | null | null | 1
fork3 | null | 100 | 10100 | 10001
;
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,51 @@ public void testWithStatsSimple() {
}
}

public void testWithConditionOnForkField() {
var query = """
FROM test
| FORK ( WHERE content:"fox" | EVAL a = 1)
( WHERE content:"cat" | EVAL b = 2 )
( WHERE content:"dog" | EVAL c = 3 )
| WHERE _fork == "fork2"
| KEEP _fork, id, content, a, b, c
| SORT _fork
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("_fork", "id", "content", "a", "b", "c"));

Iterable<Iterable<Object>> expectedValues = List.of(
Arrays.stream(new Object[] { "fork2", 5, "There is also a white cat", null, 2, null }).toList()
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithFilteringOnConstantColumn() {
var query = """
FROM test
| FORK ( WHERE content:"fox" | EVAL a = 1)
( WHERE content:"cat" | EVAL a = 2 )
( WHERE content:"dog" | EVAL a = 3 )
| WHERE a == 3
| KEEP _fork, id, content, a
| SORT id
| LIMIT 3
""";

try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("_fork", "id", "content", "a"));

Iterable<Iterable<Object>> expectedValues = List.of(
List.of("fork3", 2, "This is a brown dog", 3),
List.of("fork3", 3, "This dog is really brown", 3),
List.of("fork3", 4, "The dog is brown but this document is very very long", 3)
);
assertValues(resp.values(), expectedValues);
}
}

public void testWithEvalWithConflictingTypes() {
var query = """
FROM test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ public enum Cap {
/**
* Support streaming of sub plan results
*/
FORK_V3(Build.current().isSnapshot()),
FORK_V4(Build.current().isSnapshot()),

/**
* Support for the {@code leading_zeros} named parameter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,15 +752,16 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
// we align the outputs of the sub plans such that they have the same columns
boolean changed = false;
List<LogicalPlan> newSubPlans = new ArrayList<>();
Set<String> forkColumns = fork.outputSet().names();
List<Attribute> outputUnion = Fork.outputUnion(fork.children());
List<String> forkColumns = outputUnion.stream().map(Attribute::name).toList();

for (LogicalPlan logicalPlan : fork.children()) {
Source source = logicalPlan.source();

// find the missing columns
List<Attribute> missing = new ArrayList<>();
Set<String> currentNames = logicalPlan.outputSet().names();
for (Attribute attr : fork.outputSet()) {
for (Attribute attr : outputUnion) {
if (currentNames.contains(attr.name()) == false) {
missing.add(attr);
}
Expand Down Expand Up @@ -795,7 +796,19 @@ private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
newSubPlans.add(logicalPlan);
}

return changed ? new Fork(fork.source(), newSubPlans) : fork;
if (changed == false) {
return fork;
}

List<Attribute> newOutput = new ArrayList<>();

// We don't want to keep the same attributes that are outputted by the FORK branches.
// Keeping the same attributes can have unintended side effects when applying optimizations like constant folding.
for (Attribute attr : newSubPlans.getFirst().output()) {
newOutput.add(new ReferenceAttribute(attr.source(), attr.name(), attr.dataType()));
}

return changed ? new Fork(fork.source(), newSubPlans, newOutput) : fork;
}

private LogicalPlan resolveRerank(Rerank rerank, List<Attribute> childrenOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.plan.QueryPlan;
import org.elasticsearch.xpack.esql.plan.logical.BinaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.Fork;
import org.elasticsearch.xpack.esql.plan.physical.BinaryExec;
import org.elasticsearch.xpack.esql.plan.physical.MergeExec;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.esql.common.Failure.fail;

Expand Down Expand Up @@ -44,6 +52,8 @@ public static void checkPlan(QueryPlan<?> p, Failures failures) {
binaryExec.right().outputSet(),
failures
);
} else if (p instanceof Fork || p instanceof MergeExec) {
checkMissingFork(p, failures);
} else {
checkMissing(p, p.references(), p.inputSet(), "missing references", failures);
}
Expand All @@ -59,6 +69,41 @@ public static void checkPlan(QueryPlan<?> p, Failures failures) {
}
}

private static void checkMissingFork(QueryPlan<?> plan, Failures failures) {
for (QueryPlan<?> child : plan.children()) {
checkMissingForkBranch(child, plan.outputSet(), failures);
}
}

private static void checkMissingForkBranch(QueryPlan<?> plan, AttributeSet forkOutputSet, Failures failures) {
Map<String, DataType> attributeTypes = forkOutputSet.stream().collect(Collectors.toMap(Attribute::name, Attribute::dataType));
AttributeSet missing = AttributeSet.of();

Set<String> commonAttrs = new HashSet<>();

// get the missing attributes from the sub plan
plan.output().forEach(attribute -> {
var attrType = attributeTypes.get(attribute.name());
if (attrType == null || attrType != attribute.dataType()) {
missing.add(attribute);
}
commonAttrs.add(attribute.name());
});

// get the missing attributes from the fork output
forkOutputSet.forEach(attribute -> {
if (commonAttrs.contains(attribute.name()) == false) {
missing.add(attribute);
}
});

if (missing.isEmpty() == false) {
failures.add(
fail(plan, "Plan [{}] optimized incorrectly due to missing attributes in subplans", plan.nodeString(), missing.toString())
);
}
}

private static void checkMissingBinary(
QueryPlan<?> plan,
AttributeSet leftReferences,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) {
}
return input -> {
List<LogicalPlan> subPlans = subQueries.stream().map(planFactory -> planFactory.apply(input)).toList();
return new Fork(source(ctx), subPlans);
return new Fork(source(ctx), subPlans, List.of());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@
public class Fork extends LogicalPlan implements PostAnalysisPlanVerificationAware {

public static final String FORK_FIELD = "_fork";
List<Attribute> lazyOutput;
private final List<Attribute> output;

public Fork(Source source, List<LogicalPlan> children) {
public Fork(Source source, List<LogicalPlan> children, List<Attribute> output) {
super(source, children);
if (children.size() < 2) {
throw new IllegalArgumentException("requires more than two subqueries, got:" + children.size());
}
this.output = output;
}

@Override
public LogicalPlan replaceChildren(List<LogicalPlan> newChildren) {
return new Fork(source(), newChildren);
return new Fork(source(), newChildren, output);
}

@Override
Expand All @@ -65,7 +66,8 @@ public boolean expressionsResolved() {
return false;
}

if (children().stream().anyMatch(p -> p.outputSet().names().contains(Analyzer.NO_FIELDS_NAME))) {
if (children().stream()
.anyMatch(p -> p.outputSet().names().contains(Analyzer.NO_FIELDS_NAME) || output.size() != p.output().size())) {
return false;
}

Expand All @@ -85,26 +87,23 @@ public boolean expressionsResolved() {

@Override
protected NodeInfo<? extends LogicalPlan> info() {
return NodeInfo.create(this, Fork::new, children());
return NodeInfo.create(this, Fork::new, children(), output);
}

public Fork replaceSubPlans(List<LogicalPlan> subPlans) {
return new Fork(source(), subPlans);
return new Fork(source(), subPlans, output);
}

@Override
public List<Attribute> output() {
if (lazyOutput == null) {
lazyOutput = lazyOutput();
}
return lazyOutput;
return output;
}

private List<Attribute> lazyOutput() {
public static List<Attribute> outputUnion(List<LogicalPlan> subplans) {
List<Attribute> output = new ArrayList<>();
Set<String> names = new HashSet<>();

for (var subPlan : children()) {
for (var subPlan : subplans) {
for (var attr : subPlan.output()) {
if (names.contains(attr.name()) == false && attr.name().equals(Analyzer.NO_FIELDS_NAME) == false) {
names.add(attr.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public final void test() throws Throwable {
);
assumeFalse(
"CSV tests cannot currently handle FORK",
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V3.capabilityName())
testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V4.capabilityName())
);
assumeFalse(
"CSV tests cannot currently handle multi_match function that depends on Lucene",
Expand Down
Loading