Skip to content

Commit c9db2c0

Browse files
authored
ES|QL: Refactor FUSE planning (#134038)
1 parent 231aeca commit c9db2c0

File tree

14 files changed

+219
-263
lines changed

14 files changed

+219
-263
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -798,12 +798,12 @@ public enum Cap {
798798
/**
799799
* This enables 60_usage.yml "Basic ESQL usage....snapshot" version test. See also the next capability.
800800
*/
801-
SNAPSHOT_TEST_FOR_TELEMETRY(Build.current().isSnapshot()),
801+
SNAPSHOT_TEST_FOR_TELEMETRY_V2(Build.current().isSnapshot()),
802802

803803
/**
804804
* This enables 60_usage.yml "Basic ESQL usage....non-snapshot" version test. See also the previous capability.
805805
*/
806-
NON_SNAPSHOT_TEST_FOR_TELEMETRY(Build.current().isSnapshot() == false),
806+
NON_SNAPSHOT_TEST_FOR_TELEMETRY_V2(Build.current().isSnapshot() == false),
807807

808808
/**
809809
* Support simplified syntax for named parameters for field and function names.

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 34 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@
6565
import org.elasticsearch.xpack.esql.expression.function.aggregate.MinOverTime;
6666
import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum;
6767
import org.elasticsearch.xpack.esql.expression.function.aggregate.SumOverTime;
68+
import org.elasticsearch.xpack.esql.expression.function.aggregate.SummationMode;
69+
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
6870
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
6971
import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction;
7072
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case;
@@ -93,7 +95,6 @@
9395
import org.elasticsearch.xpack.esql.parser.ParsingException;
9496
import org.elasticsearch.xpack.esql.plan.IndexPattern;
9597
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
96-
import org.elasticsearch.xpack.esql.plan.logical.Dedup;
9798
import org.elasticsearch.xpack.esql.plan.logical.Drop;
9899
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
99100
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
@@ -107,8 +108,9 @@
107108
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
108109
import org.elasticsearch.xpack.esql.plan.logical.Project;
109110
import org.elasticsearch.xpack.esql.plan.logical.Rename;
110-
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
111111
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
112+
import org.elasticsearch.xpack.esql.plan.logical.fuse.Fuse;
113+
import org.elasticsearch.xpack.esql.plan.logical.fuse.FuseScoreEval;
112114
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
113115
import org.elasticsearch.xpack.esql.plan.logical.inference.InferencePlan;
114116
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
@@ -525,12 +527,8 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) {
525527
return resolveInsist(i, childrenOutput, context.indexResolution());
526528
}
527529

528-
if (plan instanceof Dedup dedup) {
529-
return resolveDedup(dedup, childrenOutput);
530-
}
531-
532-
if (plan instanceof RrfScoreEval rrf) {
533-
return resolveRrfScoreEval(rrf, childrenOutput);
530+
if (plan instanceof Fuse fuse) {
531+
return resolveFuse(fuse, childrenOutput);
534532
}
535533

536534
if (plan instanceof Rerank r) {
@@ -929,52 +927,44 @@ private static FieldAttribute insistKeyword(Attribute attribute) {
929927
);
930928
}
931929

932-
private LogicalPlan resolveDedup(Dedup dedup, List<Attribute> childrenOutput) {
933-
List<NamedExpression> aggregates = dedup.finalAggs();
934-
List<Attribute> groupings = dedup.groupings();
935-
List<NamedExpression> newAggs = new ArrayList<>();
936-
List<Attribute> newGroupings = new ArrayList<>();
937-
938-
for (NamedExpression agg : aggregates) {
939-
var newAgg = (NamedExpression) agg.transformUp(UnresolvedAttribute.class, ua -> {
940-
Expression ne = ua;
941-
Attribute maybeResolved = maybeResolveAttribute(ua, childrenOutput);
942-
if (maybeResolved != null) {
943-
ne = maybeResolved;
944-
}
945-
return ne;
946-
});
947-
newAggs.add(newAgg);
930+
private LogicalPlan resolveFuse(Fuse fuse, List<Attribute> childrenOutput) {
931+
Source source = fuse.source();
932+
Attribute score = fuse.score();
933+
if (score instanceof UnresolvedAttribute) {
934+
score = maybeResolveAttribute((UnresolvedAttribute) score, childrenOutput);
948935
}
949936

950-
for (Attribute attr : groupings) {
951-
if (attr instanceof UnresolvedAttribute ua) {
952-
newGroupings.add(resolveAttribute(ua, childrenOutput));
953-
} else {
954-
newGroupings.add(attr);
955-
}
937+
Attribute discriminator = fuse.discriminator();
938+
if (discriminator instanceof UnresolvedAttribute) {
939+
discriminator = maybeResolveAttribute((UnresolvedAttribute) discriminator, childrenOutput);
956940
}
957941

958-
return new Dedup(dedup.source(), dedup.child(), newAggs, newGroupings);
959-
}
960-
961-
private LogicalPlan resolveRrfScoreEval(RrfScoreEval rrf, List<Attribute> childrenOutput) {
962-
Attribute scoreAttr = rrf.scoreAttribute();
963-
Attribute forkAttr = rrf.forkAttribute();
942+
List<NamedExpression> groupings = fuse.groupings()
943+
.stream()
944+
.map(attr -> attr instanceof UnresolvedAttribute ? maybeResolveAttribute((UnresolvedAttribute) attr, childrenOutput) : attr)
945+
.toList();
964946

965-
if (scoreAttr instanceof UnresolvedAttribute ua) {
966-
scoreAttr = resolveAttribute(ua, childrenOutput);
947+
// some attributes were unresolved - we return Fuse here so that the Verifier can raise an error message
948+
if (score instanceof UnresolvedAttribute || discriminator instanceof UnresolvedAttribute) {
949+
return new Fuse(fuse.source(), fuse.child(), score, discriminator, groupings, fuse.fuseType());
967950
}
968951

969-
if (forkAttr instanceof UnresolvedAttribute ua) {
970-
forkAttr = resolveAttribute(ua, childrenOutput);
971-
}
952+
LogicalPlan scoreEval = new FuseScoreEval(source, fuse.child(), score, discriminator);
953+
954+
// create aggregations
955+
Expression aggFilter = new Literal(source, true, DataType.BOOLEAN);
972956

973-
if (forkAttr != rrf.forkAttribute() || scoreAttr != rrf.scoreAttribute()) {
974-
return new RrfScoreEval(rrf.source(), rrf.child(), scoreAttr, forkAttr);
957+
List<NamedExpression> aggregates = new ArrayList<>();
958+
aggregates.add(new Alias(source, score.name(), new Sum(source, score, aggFilter, SummationMode.COMPENSATED_LITERAL)));
959+
960+
for (Attribute attr : childrenOutput) {
961+
if (attr.name().equals(score.name())) {
962+
continue;
963+
}
964+
aggregates.add(new Alias(source, attr.name(), new Values(source, attr, aggFilter)));
975965
}
976966

977-
return rrf;
967+
return resolveAggregate(new Aggregate(source, scoreEval, new ArrayList<>(groupings), aggregates), childrenOutput);
978968
}
979969

980970
private Attribute maybeResolveAttribute(UnresolvedAttribute ua, List<Attribute> childrenOutput) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
2222
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
2323
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
24-
import org.elasticsearch.xpack.esql.plan.logical.Dedup;
2524
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2625

2726
import java.io.IOException;
@@ -174,9 +173,7 @@ public boolean equals(Object obj) {
174173
@Override
175174
public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
176175
return (p, failures) -> {
177-
// `dedup` for now is not exposed as a command,
178-
// so allowing aggregate functions for dedup explicitly is just an internal implementation detail
179-
if ((p instanceof Aggregate) == false && (p instanceof Dedup) == false) {
176+
if ((p instanceof Aggregate) == false) {
180177
p.expressions().forEach(x -> x.forEachDown(AggregateFunction.class, af -> {
181178
failures.add(fail(af, "aggregate function [{}] not allowed outside STATS command", af.sourceText()));
182179
}));

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,9 @@
4444
import org.elasticsearch.xpack.esql.expression.Order;
4545
import org.elasticsearch.xpack.esql.expression.UnresolvedNamePattern;
4646
import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction;
47-
import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum;
48-
import org.elasticsearch.xpack.esql.expression.function.aggregate.SummationMode;
4947
import org.elasticsearch.xpack.esql.plan.IndexPattern;
5048
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
5149
import org.elasticsearch.xpack.esql.plan.logical.ChangePoint;
52-
import org.elasticsearch.xpack.esql.plan.logical.Dedup;
5350
import org.elasticsearch.xpack.esql.plan.logical.Dissect;
5451
import org.elasticsearch.xpack.esql.plan.logical.Drop;
5552
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@@ -68,10 +65,10 @@
6865
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
6966
import org.elasticsearch.xpack.esql.plan.logical.Rename;
7067
import org.elasticsearch.xpack.esql.plan.logical.Row;
71-
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
7268
import org.elasticsearch.xpack.esql.plan.logical.Sample;
7369
import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate;
7470
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
71+
import org.elasticsearch.xpack.esql.plan.logical.fuse.Fuse;
7572
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
7673
import org.elasticsearch.xpack.esql.plan.logical.inference.InferencePlan;
7774
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
@@ -777,19 +774,14 @@ public PlanFactory visitFuseCommand(EsqlBaseParser.FuseCommandContext ctx) {
777774
Source source = source(ctx);
778775
return input -> {
779776
Attribute scoreAttr = new UnresolvedAttribute(source, MetadataAttribute.SCORE);
780-
Attribute forkAttr = new UnresolvedAttribute(source, Fork.FORK_FIELD);
777+
Attribute discriminatorAttr = new UnresolvedAttribute(source, Fork.FORK_FIELD);
781778
Attribute idAttr = new UnresolvedAttribute(source, IdFieldMapper.NAME);
782779
Attribute indexAttr = new UnresolvedAttribute(source, MetadataAttribute.INDEX);
783-
List<NamedExpression> aggregates = List.of(
784-
new Alias(
785-
source,
786-
MetadataAttribute.SCORE,
787-
new Sum(source, scoreAttr, new Literal(source, true, DataType.BOOLEAN), SummationMode.COMPENSATED_LITERAL)
788-
)
789-
);
790-
List<Attribute> groupings = List.of(idAttr, indexAttr);
791780

792-
return new Dedup(source, new RrfScoreEval(source, input, scoreAttr, forkAttr), aggregates, groupings);
781+
List<NamedExpression> groupings = List.of(idAttr, indexAttr);
782+
Fuse.FuseType fuseType = Fuse.FuseType.RRF;
783+
784+
return new Fuse(source, input, scoreAttr, discriminatorAttr, groupings, fuseType);
793785
};
794786
}
795787

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Dedup.java

Lines changed: 0 additions & 111 deletions
This file was deleted.

0 commit comments

Comments
 (0)