Skip to content

Commit bd0a75d

Browse files
authored
Make Fork n-ary (#126074)
1 parent 3affea3 commit bd0a75d

File tree

12 files changed

+67
-263
lines changed

12 files changed

+67
-263
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 2 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,7 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
154154
);
155155

156156
private static final List<Batch<LogicalPlan>> RULES = List.of(
157-
new Batch<>(
158-
"Initialize",
159-
Limiter.ONCE,
160-
new ResolveTable(),
161-
new ResolveEnrich(),
162-
new ResolveLookupTables(),
163-
new ResolveFunctions(),
164-
new ResolveForkFunctions()
165-
),
157+
new Batch<>("Initialize", Limiter.ONCE, new ResolveTable(), new ResolveEnrich(), new ResolveLookupTables(), new ResolveFunctions()),
166158
new Batch<>(
167159
"Resolution",
168160
/*
@@ -171,7 +163,6 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
171163
* attempts to resolve this reference.
172164
*/
173165
new ImplicitCasting(),
174-
new ImplicitForkCasting(),
175166
new ResolveRefs(),
176167
new ResolveUnionTypes() // Must be after ResolveRefs, so union types can be found
177168
),
@@ -492,10 +483,6 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
492483
return resolveInsist(i, childrenOutput, context.indexResolution());
493484
}
494485

495-
if (plan instanceof Fork f) {
496-
return resolveFork(f, context);
497-
}
498-
499486
if (plan instanceof Dedup dedup) {
500487
return resolveDedup(dedup, childrenOutput);
501488
}
@@ -683,16 +670,6 @@ private Join resolveLookupJoin(LookupJoin join) {
683670
return join;
684671
}
685672

686-
private LogicalPlan resolveFork(Fork fork, AnalyzerContext context) {
687-
List<LogicalPlan> subPlans = fork.subPlans();
688-
689-
List<LogicalPlan> newSubPlans = new ArrayList<>();
690-
for (var logicalPlan : subPlans) {
691-
newSubPlans.add(logicalPlan.transformUp(LogicalPlan.class, p -> p.childrenResolved() == false ? p : rule(p, context)));
692-
}
693-
return new Fork(fork.source(), fork.child(), newSubPlans);
694-
}
695-
696673
private List<Attribute> resolveUsingColumns(List<Attribute> cols, List<Attribute> output, String side) {
697674
List<Attribute> resolved = new ArrayList<>(cols.size());
698675
for (Attribute col : cols) {
@@ -1154,23 +1131,6 @@ public static org.elasticsearch.xpack.esql.core.expression.function.Function res
11541131
}
11551132
}
11561133

1157-
private static class ResolveForkFunctions extends ParameterizedAnalyzerRule<LogicalPlan, AnalyzerContext> {
1158-
private final ResolveFunctions resolveFunctions = new ResolveFunctions();
1159-
1160-
@Override
1161-
protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
1162-
return plan.transformUp(Fork.class, fork -> resolveFunctionsInForkSubQueries(fork, context));
1163-
}
1164-
1165-
private LogicalPlan resolveFunctionsInForkSubQueries(Fork fork, AnalyzerContext ctx) {
1166-
List<LogicalPlan> newSubPlans = new ArrayList<>();
1167-
for (var subPlan : fork.subPlans()) {
1168-
newSubPlans.add(resolveFunctions.apply(subPlan, ctx));
1169-
}
1170-
return fork.replaceSubPlans(newSubPlans);
1171-
}
1172-
}
1173-
11741134
private static class AddImplicitLimit extends ParameterizedRule<LogicalPlan, LogicalPlan, AnalyzerContext> {
11751135
@Override
11761136
public LogicalPlan apply(LogicalPlan logicalPlan, AnalyzerContext context) {
@@ -1200,7 +1160,7 @@ public LogicalPlan apply(LogicalPlan logicalPlan, AnalyzerContext context) {
12001160

12011161
private LogicalPlan addImplicitLimitToForkSubQueries(Fork fork, AnalyzerContext ctx) {
12021162
List<LogicalPlan> newSubPlans = new ArrayList<>();
1203-
for (var subPlan : fork.subPlans()) {
1163+
for (var subPlan : fork.children()) {
12041164
newSubPlans.add(addImplicitLimit.apply(subPlan, ctx));
12051165
}
12061166
return fork.replaceSubPlans(newSubPlans);
@@ -1458,23 +1418,6 @@ private static Expression castStringLiteral(Expression from, DataType target) {
14581418
}
14591419
}
14601420

1461-
private static class ImplicitForkCasting extends ParameterizedRule<LogicalPlan, LogicalPlan, AnalyzerContext> {
1462-
private final ImplicitCasting implicitCasting = new ImplicitCasting();
1463-
1464-
@Override
1465-
public LogicalPlan apply(LogicalPlan logicalPlan, AnalyzerContext context) {
1466-
return logicalPlan.transformUp(Fork.class, fork -> implicitCastForkSubQueries(fork, context));
1467-
}
1468-
1469-
private LogicalPlan implicitCastForkSubQueries(Fork fork, AnalyzerContext ctx) {
1470-
List<LogicalPlan> newSubPlans = new ArrayList<>();
1471-
for (var subPlan : fork.subPlans()) {
1472-
newSubPlans.add(implicitCasting.apply(subPlan, ctx));
1473-
}
1474-
return fork.replaceSubPlans(newSubPlans);
1475-
}
1476-
}
1477-
14781421
/**
14791422
* The EsqlIndexResolver will create InvalidMappedField instances for fields that are ambiguous (i.e. have multiple mappings).
14801423
* During {@link ResolveRefs} we do not convert these to UnresolvedAttribute instances, as we want to first determine if they can

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/PreAnalyzer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
1515

1616
import java.util.ArrayList;
17+
import java.util.HashSet;
1718
import java.util.List;
19+
import java.util.Set;
1820

1921
import static java.util.Collections.emptyList;
2022

@@ -46,7 +48,8 @@ public PreAnalysis preAnalyze(LogicalPlan plan) {
4648
}
4749

4850
protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
49-
List<IndexPattern> indices = new ArrayList<>();
51+
Set<IndexPattern> indices = new HashSet<>();
52+
5053
List<Enrich> unresolvedEnriches = new ArrayList<>();
5154
List<IndexPattern> lookupIndices = new ArrayList<>();
5255

@@ -56,6 +59,6 @@ protected PreAnalysis doPreAnalyze(LogicalPlan plan) {
5659
// mark plan as preAnalyzed (if it were marked, there would be no analysis)
5760
plan.forEachUp(LogicalPlan::setPreAnalyzed);
5861

59-
return new PreAnalysis(indices, unresolvedEnriches, lookupIndices);
62+
return new PreAnalysis(indices.stream().toList(), unresolvedEnriches, lookupIndices);
6063
}
6164
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Verifier.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.NotEquals;
3030
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
3131
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
32-
import org.elasticsearch.xpack.esql.plan.logical.Fork;
3332
import org.elasticsearch.xpack.esql.plan.logical.Insist;
3433
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3534
import org.elasticsearch.xpack.esql.plan.logical.Lookup;
@@ -179,11 +178,6 @@ else if (p instanceof Lookup lookup) {
179178
else {
180179
lookup.matchFields().forEach(unresolvedExpressions);
181180
}
182-
} else if (p instanceof Fork fork) {
183-
var subPlans = fork.subPlans();
184-
for (var subPlan : subPlans) {
185-
checkUnresolvedAttributes(subPlan, failures);
186-
}
187181
}
188182

189183
else {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/fulltext/QueryBuilderResolver.java

Lines changed: 6 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@
1212
import org.elasticsearch.index.query.QueryBuilder;
1313
import org.elasticsearch.index.query.QueryRewriteContext;
1414
import org.elasticsearch.index.query.Rewriteable;
15-
import org.elasticsearch.xpack.esql.core.expression.Expression;
1615
import org.elasticsearch.xpack.esql.core.util.Holder;
1716
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
18-
import org.elasticsearch.xpack.esql.plan.logical.Fork;
1917
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2018
import org.elasticsearch.xpack.esql.planner.TranslatorHandler;
2119
import org.elasticsearch.xpack.esql.plugin.TransportActionServices;
@@ -24,8 +22,6 @@
2422
import java.io.IOException;
2523
import java.util.HashSet;
2624
import java.util.Set;
27-
import java.util.function.Function;
28-
import java.util.stream.Collectors;
2925

3026
/**
3127
* Some {@link FullTextFunction} implementations such as {@link org.elasticsearch.xpack.esql.expression.function.fulltext.Match}
@@ -38,7 +34,11 @@ public final class QueryBuilderResolver {
3834
private QueryBuilderResolver() {}
3935

4036
public static void resolveQueryBuilders(LogicalPlan plan, TransportActionServices services, ActionListener<LogicalPlan> listener) {
41-
var hasFullTextFunctions = hasFullTextFunctions(plan);
37+
var hasFullTextFunctions = plan.anyMatch(p -> {
38+
Holder<Boolean> hasFullTextFunction = new Holder<>(false);
39+
p.forEachExpression(FullTextFunction.class, unused -> hasFullTextFunction.set(true));
40+
return hasFullTextFunction.get();
41+
});
4242
if (hasFullTextFunctions) {
4343
Rewriteable.rewriteAndFetch(
4444
new FullTextFunctionsRewritable(plan),
@@ -69,29 +69,12 @@ private static Set<String> indexNames(LogicalPlan plan) {
6969
return indexNames;
7070
}
7171

72-
private static boolean hasFullTextFunctions(LogicalPlan plan) {
73-
return plan.anyMatch(p -> {
74-
Holder<Boolean> hasFullTextFunction = new Holder<>(false);
75-
p.forEachExpression(FullTextFunction.class, unused -> hasFullTextFunction.set(true));
76-
77-
if (p instanceof Fork fork) {
78-
fork.subPlans().forEach(subPlan -> {
79-
if (hasFullTextFunctions(subPlan)) {
80-
hasFullTextFunction.set(true);
81-
}
82-
});
83-
}
84-
85-
return hasFullTextFunction.get();
86-
});
87-
}
88-
8972
private record FullTextFunctionsRewritable(LogicalPlan plan) implements Rewriteable<QueryBuilderResolver.FullTextFunctionsRewritable> {
9073
@Override
9174
public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOException {
9275
Holder<IOException> exceptionHolder = new Holder<>();
9376
Holder<Boolean> updated = new Holder<>(false);
94-
LogicalPlan newPlan = transformPlan(plan, f -> {
77+
LogicalPlan newPlan = plan.transformExpressionsDown(FullTextFunction.class, f -> {
9578
QueryBuilder builder = f.queryBuilder(), initial = builder;
9679
builder = builder == null ? f.asQuery(TranslatorHandler.TRANSLATOR_HANDLER).toQueryBuilder() : builder;
9780
try {
@@ -108,15 +91,5 @@ public FullTextFunctionsRewritable rewrite(QueryRewriteContext ctx) throws IOExc
10891
}
10992
return updated.get() ? new FullTextFunctionsRewritable(newPlan) : this;
11093
}
111-
112-
private LogicalPlan transformPlan(LogicalPlan plan, Function<FullTextFunction, ? extends Expression> rule) {
113-
return plan.transformExpressionsDown(FullTextFunction.class, rule).transformDown(Fork.class, fork -> {
114-
var subPlans = fork.subPlans()
115-
.stream()
116-
.map(subPlan -> subPlan.transformExpressionsDown(FullTextFunction.class, rule))
117-
.collect(Collectors.toList());
118-
return new Fork(fork.source(), fork.child(), subPlans);
119-
});
120-
}
12194
}
12295
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
6969
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
7070
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
71-
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
7271
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
7372
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
7473
import org.joni.exception.SyntaxException;
@@ -630,9 +629,8 @@ public PlanFactory visitForkCommand(EsqlBaseParser.ForkCommandContext ctx) {
630629
throw new ParsingException(source(ctx), "Fork requires at least two branches");
631630
}
632631
return input -> {
633-
var stub = StubRelation.EMPTY;
634-
List<LogicalPlan> subPlans = subQueries.stream().map(planFactory -> planFactory.apply(stub)).toList();
635-
return new Fork(source(ctx), input, subPlans);
632+
List<LogicalPlan> subPlans = subQueries.stream().map(planFactory -> planFactory.apply(input)).toList();
633+
return new Fork(source(ctx), subPlans);
636634
};
637635
}
638636

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Fork.java

Lines changed: 18 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,90 +11,66 @@
1111
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1212
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1313
import org.elasticsearch.xpack.esql.core.tree.Source;
14-
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
1514

1615
import java.io.IOException;
1716
import java.util.List;
1817
import java.util.Objects;
19-
import java.util.stream.Stream;
2018

2119
/**
22-
* A Fork is a {@code Plan} with one child, but holds several logical subplans, e.g.
20+
* A Fork is a n-ary {@code Plan} where each child is a sub plan, e.g.
2321
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
2422
*/
25-
public class Fork extends UnaryPlan implements SurrogateLogicalPlan {
23+
public class Fork extends LogicalPlan {
2624
public static final String FORK_FIELD = "_fork";
27-
28-
private final List<LogicalPlan> subPlans;
2925
List<Attribute> lazyOutput;
3026

31-
public Fork(Source source, LogicalPlan child, List<LogicalPlan> subPlans) {
32-
super(source, child);
33-
if (subPlans.size() < 2) {
34-
throw new IllegalArgumentException("requires more than two subqueries, got:" + subPlans.size());
27+
public Fork(Source source, List<LogicalPlan> children) {
28+
super(source, children);
29+
if (children.size() < 2) {
30+
throw new IllegalArgumentException("requires more than two subqueries, got:" + children.size());
3531
}
36-
this.subPlans = subPlans;
3732
}
3833

3934
@Override
40-
public void writeTo(StreamOutput out) throws IOException {
41-
throw new UnsupportedOperationException("not serialized");
35+
public LogicalPlan replaceChildren(List<LogicalPlan> newChildren) {
36+
return new Fork(source(), newChildren);
4237
}
4338

4439
@Override
45-
public String getWriteableName() {
40+
public void writeTo(StreamOutput out) throws IOException {
4641
throw new UnsupportedOperationException("not serialized");
4742
}
4843

49-
public List<LogicalPlan> subPlans() {
50-
return subPlans;
51-
}
52-
5344
@Override
54-
public LogicalPlan surrogate() {
55-
var newChildren = subPlans.stream().map(p -> Merge.replaceStub(child(), p)).toList();
56-
return new Merge(source(), newChildren);
45+
public String getWriteableName() {
46+
throw new UnsupportedOperationException("not serialized");
5747
}
5848

5949
@Override
6050
public boolean expressionsResolved() {
61-
return child().resolved() && subPlans.stream().allMatch(LogicalPlan::resolved);
51+
return children().stream().allMatch(LogicalPlan::resolved);
6252
}
6353

6454
@Override
6555
protected NodeInfo<? extends LogicalPlan> info() {
66-
return NodeInfo.create(this, Fork::new, child(), subPlans);
67-
}
68-
69-
static List<LogicalPlan> replaceEmptyStubs(List<LogicalPlan> subQueries, LogicalPlan newChild) {
70-
List<Attribute> attributes = List.copyOf(newChild.output());
71-
return subQueries.stream().map(subquery -> {
72-
var newStub = new StubRelation(subquery.source(), attributes);
73-
return subquery.transformUp(StubRelation.class, stubRelation -> newStub);
74-
}).toList();
75-
}
76-
77-
@Override
78-
public Fork replaceChild(LogicalPlan newChild) {
79-
var newSubQueries = replaceEmptyStubs(subPlans, newChild);
80-
return new Fork(source(), newChild, newSubQueries);
56+
return NodeInfo.create(this, Fork::new, children());
8157
}
8258

8359
public Fork replaceSubPlans(List<LogicalPlan> subPlans) {
84-
return new Fork(source(), child(), subPlans);
60+
return new Fork(source(), subPlans);
8561
}
8662

8763
@Override
8864
public List<Attribute> output() {
8965
if (lazyOutput == null) {
90-
lazyOutput = Stream.concat(children().getFirst().output().stream(), subPlans.getFirst().output().stream()).distinct().toList();
66+
lazyOutput = children().getFirst().output();
9167
}
9268
return lazyOutput;
9369
}
9470

9571
@Override
9672
public int hashCode() {
97-
return Objects.hash(super.hashCode(), subPlans);
73+
return Objects.hash(Fork.class, children());
9874
}
9975

10076
@Override
@@ -105,10 +81,8 @@ public boolean equals(Object o) {
10581
if (o == null || getClass() != o.getClass()) {
10682
return false;
10783
}
108-
if (super.equals(o) == false) {
109-
return false;
110-
}
11184
Fork other = (Fork) o;
112-
return Objects.equals(subPlans, other.subPlans);
85+
86+
return Objects.equals(children(), other.children());
11387
}
11488
}

0 commit comments

Comments
 (0)