diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 083fcf04bf150..e5deb7e2bf007 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -798,12 +798,12 @@ public enum Cap { /** * This enables 60_usage.yml "Basic ESQL usage....snapshot" version test. See also the next capability. */ - SNAPSHOT_TEST_FOR_TELEMETRY(Build.current().isSnapshot()), + SNAPSHOT_TEST_FOR_TELEMETRY_V2(Build.current().isSnapshot()), /** * This enables 60_usage.yml "Basic ESQL usage....non-snapshot" version test. See also the previous capability. */ - NON_SNAPSHOT_TEST_FOR_TELEMETRY(Build.current().isSnapshot() == false), + NON_SNAPSHOT_TEST_FOR_TELEMETRY_V2(Build.current().isSnapshot() == false), /** * Support simplified syntax for named parameters for field and function names. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 17491dedf495f..a9aa039debe47 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -65,6 +65,8 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.MinOverTime; import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum; import org.elasticsearch.xpack.esql.expression.function.aggregate.SumOverTime; +import org.elasticsearch.xpack.esql.expression.function.aggregate.SummationMode; +import org.elasticsearch.xpack.esql.expression.function.aggregate.Values; import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction; import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction; import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case; @@ -93,7 +95,6 @@ import org.elasticsearch.xpack.esql.parser.ParsingException; import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; -import org.elasticsearch.xpack.esql.plan.logical.Dedup; import org.elasticsearch.xpack.esql.plan.logical.Drop; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; @@ -107,8 +108,9 @@ import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.Rename; -import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval; import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; +import org.elasticsearch.xpack.esql.plan.logical.fuse.Fuse; +import org.elasticsearch.xpack.esql.plan.logical.fuse.FuseScoreEval; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.InferencePlan; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; @@ -525,12 +527,8 @@ protected LogicalPlan rule(LogicalPlan plan, AnalyzerContext context) { return resolveInsist(i, childrenOutput, context.indexResolution()); } - if (plan instanceof Dedup dedup) { - return resolveDedup(dedup, childrenOutput); - } - - if (plan instanceof RrfScoreEval rrf) { - return resolveRrfScoreEval(rrf, childrenOutput); + if (plan instanceof Fuse fuse) { + return resolveFuse(fuse, childrenOutput); } if (plan instanceof Rerank r) { @@ -929,52 +927,44 @@ private static FieldAttribute insistKeyword(Attribute attribute) { ); } - private LogicalPlan resolveDedup(Dedup dedup, List childrenOutput) { - List aggregates = dedup.finalAggs(); - List groupings = dedup.groupings(); - List newAggs = new ArrayList<>(); - List newGroupings = new ArrayList<>(); - - for (NamedExpression agg : aggregates) { - var newAgg = (NamedExpression) agg.transformUp(UnresolvedAttribute.class, ua -> { - Expression ne = ua; - Attribute maybeResolved = maybeResolveAttribute(ua, childrenOutput); - if (maybeResolved != null) { - ne = maybeResolved; - } - return ne; - }); - newAggs.add(newAgg); + private LogicalPlan resolveFuse(Fuse fuse, List childrenOutput) { + Source source = fuse.source(); + Attribute score = fuse.score(); + if (score instanceof UnresolvedAttribute) { + score = maybeResolveAttribute((UnresolvedAttribute) score, childrenOutput); } - for (Attribute attr : groupings) { - if (attr instanceof UnresolvedAttribute ua) { - newGroupings.add(resolveAttribute(ua, childrenOutput)); - } else { - newGroupings.add(attr); - } + Attribute discriminator = fuse.discriminator(); + if (discriminator instanceof UnresolvedAttribute) { + discriminator = maybeResolveAttribute((UnresolvedAttribute) discriminator, childrenOutput); } - return new Dedup(dedup.source(), dedup.child(), newAggs, newGroupings); - } - - private LogicalPlan resolveRrfScoreEval(RrfScoreEval rrf, List childrenOutput) { - Attribute scoreAttr = rrf.scoreAttribute(); - Attribute forkAttr = rrf.forkAttribute(); + List groupings = fuse.groupings() + .stream() + .map(attr -> attr instanceof UnresolvedAttribute ? maybeResolveAttribute((UnresolvedAttribute) attr, childrenOutput) : attr) + .toList(); - if (scoreAttr instanceof UnresolvedAttribute ua) { - scoreAttr = resolveAttribute(ua, childrenOutput); + // some attributes were unresolved - we return Fuse here so that the Verifier can raise an error message + if (score instanceof UnresolvedAttribute || discriminator instanceof UnresolvedAttribute) { + return new Fuse(fuse.source(), fuse.child(), score, discriminator, groupings, fuse.fuseType()); } - if (forkAttr instanceof UnresolvedAttribute ua) { - forkAttr = resolveAttribute(ua, childrenOutput); - } + LogicalPlan scoreEval = new FuseScoreEval(source, fuse.child(), score, discriminator); + + // create aggregations + Expression aggFilter = new Literal(source, true, DataType.BOOLEAN); - if (forkAttr != rrf.forkAttribute() || scoreAttr != rrf.scoreAttribute()) { - return new RrfScoreEval(rrf.source(), rrf.child(), scoreAttr, forkAttr); + List aggregates = new ArrayList<>(); + aggregates.add(new Alias(source, score.name(), new Sum(source, score, aggFilter, SummationMode.COMPENSATED_LITERAL))); + + for (Attribute attr : childrenOutput) { + if (attr.name().equals(score.name())) { + continue; + } + aggregates.add(new Alias(source, attr.name(), new Values(source, attr, aggFilter))); } - return rrf; + return resolveAggregate(new Aggregate(source, scoreEval, new ArrayList<>(groupings), aggregates), childrenOutput); } private Attribute maybeResolveAttribute(UnresolvedAttribute ua, List childrenOutput) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java index d82bd4f446df0..d59bf721ccf30 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java @@ -21,7 +21,6 @@ import org.elasticsearch.xpack.esql.core.util.CollectionUtils; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; -import org.elasticsearch.xpack.esql.plan.logical.Dedup; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import java.io.IOException; @@ -174,9 +173,7 @@ public boolean equals(Object obj) { @Override public BiConsumer postAnalysisPlanVerification() { return (p, failures) -> { - // `dedup` for now is not exposed as a command, - // so allowing aggregate functions for dedup explicitly is just an internal implementation detail - if ((p instanceof Aggregate) == false && (p instanceof Dedup) == false) { + if ((p instanceof Aggregate) == false) { p.expressions().forEach(x -> x.forEachDown(AggregateFunction.class, af -> { failures.add(fail(af, "aggregate function [{}] not allowed outside STATS command", af.sourceText())); })); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java index e23d22f26a1e9..bf4d2a372cd7f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/parser/LogicalPlanBuilder.java @@ -44,12 +44,9 @@ import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.expression.UnresolvedNamePattern; import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction; -import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum; -import org.elasticsearch.xpack.esql.expression.function.aggregate.SummationMode; import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.ChangePoint; -import org.elasticsearch.xpack.esql.plan.logical.Dedup; import org.elasticsearch.xpack.esql.plan.logical.Dissect; import org.elasticsearch.xpack.esql.plan.logical.Drop; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -68,10 +65,10 @@ import org.elasticsearch.xpack.esql.plan.logical.OrderBy; import org.elasticsearch.xpack.esql.plan.logical.Rename; import org.elasticsearch.xpack.esql.plan.logical.Row; -import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval; import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; +import org.elasticsearch.xpack.esql.plan.logical.fuse.Fuse; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.InferencePlan; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; @@ -777,19 +774,14 @@ public PlanFactory visitFuseCommand(EsqlBaseParser.FuseCommandContext ctx) { Source source = source(ctx); return input -> { Attribute scoreAttr = new UnresolvedAttribute(source, MetadataAttribute.SCORE); - Attribute forkAttr = new UnresolvedAttribute(source, Fork.FORK_FIELD); + Attribute discriminatorAttr = new UnresolvedAttribute(source, Fork.FORK_FIELD); Attribute idAttr = new UnresolvedAttribute(source, IdFieldMapper.NAME); Attribute indexAttr = new UnresolvedAttribute(source, MetadataAttribute.INDEX); - List aggregates = List.of( - new Alias( - source, - MetadataAttribute.SCORE, - new Sum(source, scoreAttr, new Literal(source, true, DataType.BOOLEAN), SummationMode.COMPENSATED_LITERAL) - ) - ); - List groupings = List.of(idAttr, indexAttr); - return new Dedup(source, new RrfScoreEval(source, input, scoreAttr, forkAttr), aggregates, groupings); + List groupings = List.of(idAttr, indexAttr); + Fuse.FuseType fuseType = Fuse.FuseType.RRF; + + return new Fuse(source, input, scoreAttr, discriminatorAttr, groupings, fuseType); }; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Dedup.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Dedup.java deleted file mode 100644 index d4474e1dd15a9..0000000000000 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Dedup.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.xpack.esql.plan.logical; - -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.xpack.esql.core.capabilities.Resolvables; -import org.elasticsearch.xpack.esql.core.expression.Alias; -import org.elasticsearch.xpack.esql.core.expression.Attribute; -import org.elasticsearch.xpack.esql.core.expression.Expression; -import org.elasticsearch.xpack.esql.core.expression.Literal; -import org.elasticsearch.xpack.esql.core.expression.NamedExpression; -import org.elasticsearch.xpack.esql.core.tree.NodeInfo; -import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.type.DataType; -import org.elasticsearch.xpack.esql.expression.NamedExpressions; -import org.elasticsearch.xpack.esql.expression.function.aggregate.Values; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -/** - * Removes the rows that contain the same values for a list specified fields. - * Dedup also receives a list of aggregates similar to {@link Aggregate STATS}. - * In the current implementation Dedup implements {@link SurrogateLogicalPlan} and actually expands to {@link Aggregate STATS}. - * At the moment this is only used in the planning of the RRF command, but could evolve as a standalone command. - */ -public class Dedup extends UnaryPlan implements SurrogateLogicalPlan { - private final List aggregates; - private final List groupings; - private List lazyOutput; - private List lazyFinalAggs; - - public Dedup(Source source, LogicalPlan child, List aggregates, List groupings) { - super(source, child); - this.aggregates = aggregates; - this.groupings = groupings; - } - - @Override - public String getWriteableName() { - throw new UnsupportedOperationException("not serialized"); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - throw new UnsupportedOperationException("not serialized"); - } - - @Override - protected NodeInfo info() { - return NodeInfo.create(this, Dedup::new, child(), aggregates, groupings); - } - - @Override - public boolean expressionsResolved() { - return Resolvables.resolved(aggregates) && Resolvables.resolved(groupings); - } - - @Override - public UnaryPlan replaceChild(LogicalPlan newChild) { - return new Dedup(source(), newChild, aggregates, groupings); - } - - @Override - public LogicalPlan surrogate() { - return new Aggregate(source(), child(), new ArrayList<>(groupings), finalAggs()); - } - - public List aggregates() { - return aggregates; - } - - public List groupings() { - return groupings; - } - - public List finalAggs() { - if (lazyFinalAggs == null) { - lazyFinalAggs = new ArrayList<>(aggregates); - - Set names = new HashSet<>(aggregates.stream().map(att -> att.name()).toList()); - Expression aggFilter = new Literal(source(), true, DataType.BOOLEAN); - - for (Attribute attr : child().output()) { - if (names.contains(attr.name())) { - continue; - } - - lazyFinalAggs.add(new Alias(source(), attr.name(), new Values(source(), attr, aggFilter))); - } - } - - return lazyFinalAggs; - } - - @Override - public List output() { - if (lazyOutput == null) { - lazyOutput = NamedExpressions.mergeOutputAttributes(finalAggs(), child().output()); - } - return lazyOutput; - } -} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/Fuse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/Fuse.java new file mode 100644 index 0000000000000..7bf45ff3a7370 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/Fuse.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.logical.fuse; + +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.esql.capabilities.TelemetryAware; +import org.elasticsearch.xpack.esql.core.expression.Attribute; +import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; + +import java.io.IOException; +import java.util.List; + +public class Fuse extends UnaryPlan implements TelemetryAware { + private final Attribute score; + private final Attribute discriminator; + private final List groupings; + private final FuseType fuseType; + + public enum FuseType { + RRF, + LINEAR + }; + + public Fuse( + Source source, + LogicalPlan child, + Attribute score, + Attribute discriminator, + List groupings, + FuseType fuseType + ) { + super(source, child); + this.score = score; + this.discriminator = discriminator; + this.groupings = groupings; + this.fuseType = fuseType; + + } + + @Override + public String getWriteableName() { + throw new UnsupportedOperationException("not serialized"); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException("not serialized"); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, Fuse::new, child(), score, discriminator, groupings, fuseType); + } + + @Override + public UnaryPlan replaceChild(LogicalPlan newChild) { + return new Fuse(source(), newChild, score, discriminator, groupings, fuseType); + } + + public List groupings() { + return groupings; + } + + public Attribute discriminator() { + return discriminator; + } + + public Attribute score() { + return score; + } + + public FuseType fuseType() { + return fuseType; + } + + @Override + public boolean expressionsResolved() { + return score.resolved() && discriminator.resolved() && groupings.stream().allMatch(Expression::resolved); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RrfScoreEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java similarity index 64% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RrfScoreEval.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java index 9fc16ac615231..d5fb25e7156a9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/RrfScoreEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java @@ -5,7 +5,7 @@ * 2.0. */ -package org.elasticsearch.xpack.esql.plan.logical; +package org.elasticsearch.xpack.esql.plan.logical.fuse; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.license.License; @@ -14,18 +14,20 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import java.io.IOException; import java.util.Objects; -public class RrfScoreEval extends UnaryPlan implements LicenseAware { - private final Attribute forkAttr; +public class FuseScoreEval extends UnaryPlan implements LicenseAware { + private final Attribute discriminatorAttr; private final Attribute scoreAttr; - public RrfScoreEval(Source source, LogicalPlan child, Attribute scoreAttr, Attribute forkAttr) { + public FuseScoreEval(Source source, LogicalPlan child, Attribute scoreAttr, Attribute discriminatorAttr) { super(source, child); this.scoreAttr = scoreAttr; - this.forkAttr = forkAttr; + this.discriminatorAttr = discriminatorAttr; } @Override @@ -40,30 +42,30 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, RrfScoreEval::new, child(), scoreAttr, forkAttr); + return NodeInfo.create(this, FuseScoreEval::new, child(), scoreAttr, discriminatorAttr); } @Override public boolean expressionsResolved() { - return scoreAttr.resolved() && forkAttr.resolved(); + return scoreAttr.resolved() && discriminatorAttr.resolved(); } @Override public UnaryPlan replaceChild(LogicalPlan newChild) { - return new RrfScoreEval(source(), newChild, scoreAttr, forkAttr); + return new FuseScoreEval(source(), newChild, scoreAttr, discriminatorAttr); } - public Attribute scoreAttribute() { + public Attribute score() { return scoreAttr; } - public Attribute forkAttribute() { - return forkAttr; + public Attribute discriminator() { + return discriminatorAttr; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), scoreAttr, forkAttr); + return Objects.hash(super.hashCode(), scoreAttr, discriminatorAttr); } @Override @@ -75,8 +77,8 @@ public boolean equals(Object obj) { return false; } - RrfScoreEval rrf = (RrfScoreEval) obj; - return child().equals(rrf.child()) && scoreAttr.equals(rrf.scoreAttribute()) && forkAttr.equals(forkAttribute()); + FuseScoreEval rrf = (FuseScoreEval) obj; + return child().equals(rrf.child()) && scoreAttr.equals(rrf.score()) && discriminatorAttr.equals(discriminator()); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/RrfScoreEvalExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FuseScoreEvalExec.java similarity index 65% rename from x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/RrfScoreEvalExec.java rename to x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FuseScoreEvalExec.java index 8b9b203af1715..f55a594646456 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/RrfScoreEvalExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/FuseScoreEvalExec.java @@ -15,14 +15,14 @@ import java.io.IOException; -public class RrfScoreEvalExec extends UnaryExec { +public class FuseScoreEvalExec extends UnaryExec { private final Attribute scoreAttr; - private final Attribute forkAttr; + private final Attribute discriminatorAttr; - public RrfScoreEvalExec(Source source, PhysicalPlan child, Attribute scoreAttr, Attribute forkAttr) { + public FuseScoreEvalExec(Source source, PhysicalPlan child, Attribute scoreAttr, Attribute discriminatorAttr) { super(source, child); this.scoreAttr = scoreAttr; - this.forkAttr = forkAttr; + this.discriminatorAttr = discriminatorAttr; } @Override @@ -37,16 +37,24 @@ public void writeTo(StreamOutput out) throws IOException { @Override protected NodeInfo info() { - return NodeInfo.create(this, RrfScoreEvalExec::new, child(), scoreAttr, forkAttr); + return NodeInfo.create(this, FuseScoreEvalExec::new, child(), scoreAttr, discriminatorAttr); } @Override public UnaryExec replaceChild(PhysicalPlan newChild) { - return new RrfScoreEvalExec(source(), newChild, scoreAttr, forkAttr); + return new FuseScoreEvalExec(source(), newChild, scoreAttr, discriminatorAttr); + } + + public Attribute score() { + return scoreAttr; + } + + public Attribute discriminator() { + return discriminatorAttr; } @Override protected AttributeSet computeReferences() { - return AttributeSet.of(scoreAttr, forkAttr); + return AttributeSet.of(scoreAttr, discriminatorAttr); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 4bac4d4b0aae1..259184714b40f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -93,7 +93,6 @@ import org.elasticsearch.xpack.esql.inference.completion.CompletionOperator; import org.elasticsearch.xpack.esql.inference.rerank.RerankOperator; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; -import org.elasticsearch.xpack.esql.plan.logical.Fork; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec; @@ -108,6 +107,7 @@ import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.plan.physical.FragmentExec; +import org.elasticsearch.xpack.esql.plan.physical.FuseScoreEvalExec; import org.elasticsearch.xpack.esql.plan.physical.GrokExec; import org.elasticsearch.xpack.esql.plan.physical.HashJoinExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; @@ -118,7 +118,6 @@ import org.elasticsearch.xpack.esql.plan.physical.ParallelExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; -import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec; import org.elasticsearch.xpack.esql.plan.physical.SampleExec; import org.elasticsearch.xpack.esql.plan.physical.ShowExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; @@ -313,8 +312,8 @@ else if (node instanceof OutputExec outputExec) { return planOutput(outputExec, context); } else if (node instanceof ExchangeSinkExec exchangeSink) { return planExchangeSink(exchangeSink, context); - } else if (node instanceof RrfScoreEvalExec rrf) { - return planRrfScoreEvalExec(rrf, context); + } else if (node instanceof FuseScoreEvalExec fuse) { + return planFuseScoreEvalExec(fuse, context); } throw new EsqlIllegalArgumentException("unknown physical plan node [" + node.nodeName() + "]"); @@ -333,17 +332,18 @@ private PhysicalOperation planCompletion(CompletionExec completion, LocalExecuti return source.with(new CompletionOperator.Factory(inferenceService, inferenceId, promptEvaluatorFactory), outputLayout); } - private PhysicalOperation planRrfScoreEvalExec(RrfScoreEvalExec rrf, LocalExecutionPlannerContext context) { - PhysicalOperation source = plan(rrf.child(), context); + private PhysicalOperation planFuseScoreEvalExec(FuseScoreEvalExec fuse, LocalExecutionPlannerContext context) { + PhysicalOperation source = plan(fuse.child(), context); int scorePosition = -1; - int forkPosition = -1; + int discriminatorPosition = -1; int pos = 0; - for (Attribute attr : rrf.child().output()) { - if (attr.name().equals(Fork.FORK_FIELD)) { - forkPosition = pos; + + for (Attribute attr : fuse.child().output()) { + if (attr.name().equals(fuse.discriminator().name())) { + discriminatorPosition = pos; } - if (attr.name().equals(MetadataAttribute.SCORE)) { + if (attr.name().equals(fuse.score().name())) { scorePosition = pos; } @@ -351,13 +351,13 @@ private PhysicalOperation planRrfScoreEvalExec(RrfScoreEvalExec rrf, LocalExecut } if (scorePosition == -1) { - throw new IllegalStateException("can't find _score attribute position"); + throw new IllegalStateException("can't find score attribute position"); } - if (forkPosition == -1) { - throw new IllegalStateException("can'find _fork attribute position"); + if (discriminatorPosition == -1) { + throw new IllegalStateException("can'find discriminator attribute position"); } - return source.with(new RrfScoreEvalOperator.Factory(forkPosition, scorePosition), source.layout); + return source.with(new RrfScoreEvalOperator.Factory(discriminatorPosition, scorePosition), source.layout); } private PhysicalOperation planAggregation(AggregateExec aggregate, LocalExecutionPlannerContext context) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java index 5cbf8f4844b2d..2c4bc56c15880 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/MapperUtils.java @@ -22,10 +22,10 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.MvExpand; import org.elasticsearch.xpack.esql.plan.logical.Project; -import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval; import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; +import org.elasticsearch.xpack.esql.plan.logical.fuse.FuseScoreEval; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation; @@ -36,12 +36,12 @@ import org.elasticsearch.xpack.esql.plan.physical.EnrichExec; import org.elasticsearch.xpack.esql.plan.physical.EvalExec; import org.elasticsearch.xpack.esql.plan.physical.FilterExec; +import org.elasticsearch.xpack.esql.plan.physical.FuseScoreEvalExec; import org.elasticsearch.xpack.esql.plan.physical.GrokExec; import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; -import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec; import org.elasticsearch.xpack.esql.plan.physical.SampleExec; import org.elasticsearch.xpack.esql.plan.physical.ShowExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; @@ -135,8 +135,8 @@ static PhysicalPlan mapUnary(UnaryPlan p, PhysicalPlan child) { ); } - if (p instanceof RrfScoreEval rrf) { - return new RrfScoreEvalExec(rrf.source(), child, rrf.scoreAttribute(), rrf.forkAttribute()); + if (p instanceof FuseScoreEval rrf) { + return new FuseScoreEvalExec(rrf.source(), child, rrf.score(), rrf.discriminator()); } if (p instanceof Sample sample) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/telemetry/FeatureMetric.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/telemetry/FeatureMetric.java index 639a48d8b54da..f7ebaed247e6e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/telemetry/FeatureMetric.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/telemetry/FeatureMetric.java @@ -10,7 +10,6 @@ import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.ChangePoint; -import org.elasticsearch.xpack.esql.plan.logical.Dedup; import org.elasticsearch.xpack.esql.plan.logical.Dissect; import org.elasticsearch.xpack.esql.plan.logical.Drop; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -31,9 +30,10 @@ import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.Rename; import org.elasticsearch.xpack.esql.plan.logical.Row; -import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval; import org.elasticsearch.xpack.esql.plan.logical.Sample; import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; +import org.elasticsearch.xpack.esql.plan.logical.fuse.Fuse; +import org.elasticsearch.xpack.esql.plan.logical.fuse.FuseScoreEval; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin; @@ -67,10 +67,9 @@ public enum FeatureMetric { CHANGE_POINT(ChangePoint.class::isInstance), INLINESTATS(InlineStats.class::isInstance), RERANK(Rerank.class::isInstance), - DEDUP(Dedup.class::isInstance), INSIST(Insist.class::isInstance), FORK(Fork.class::isInstance), - RRF(RrfScoreEval.class::isInstance), + FUSE(Fuse.class::isInstance), COMPLETION(Completion.class::isInstance), SAMPLE(Sample.class::isInstance); @@ -81,7 +80,8 @@ public enum FeatureMetric { UnresolvedRelation.class, EsqlProject.class, Project.class, - Limit.class // LIMIT is managed in another way, see above + Limit.class, // LIMIT is managed in another way, see above + FuseScoreEval.class ); private Predicate planCheck; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index a2d6f770e8915..439c1bc189e3b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -74,7 +74,6 @@ import org.elasticsearch.xpack.esql.parser.QueryParams; import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; -import org.elasticsearch.xpack.esql.plan.logical.Dedup; import org.elasticsearch.xpack.esql.plan.logical.Dissect; import org.elasticsearch.xpack.esql.plan.logical.Enrich; import org.elasticsearch.xpack.esql.plan.logical.EsRelation; @@ -88,8 +87,8 @@ import org.elasticsearch.xpack.esql.plan.logical.OrderBy; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.Row; -import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval; import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; +import org.elasticsearch.xpack.esql.plan.logical.fuse.FuseScoreEval; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject; @@ -3505,17 +3504,16 @@ public void testValidFuse() { Limit limit = as(plan, Limit.class); - Dedup dedup = as(limit.child(), Dedup.class); - assertThat(dedup.groupings().size(), equalTo(2)); - assertThat(dedup.aggregates().size(), equalTo(15)); + Aggregate aggregate = as(limit.child(), Aggregate.class); + assertThat(aggregate.groupings().size(), equalTo(2)); - RrfScoreEval rrf = as(dedup.child(), RrfScoreEval.class); - assertThat(rrf.scoreAttribute(), instanceOf(ReferenceAttribute.class)); - assertThat(rrf.scoreAttribute().name(), equalTo("_score")); - assertThat(rrf.forkAttribute(), instanceOf(ReferenceAttribute.class)); - assertThat(rrf.forkAttribute().name(), equalTo("_fork")); + FuseScoreEval scoreEval = as(aggregate.child(), FuseScoreEval.class); + assertThat(scoreEval.score(), instanceOf(ReferenceAttribute.class)); + assertThat(scoreEval.score().name(), equalTo("_score")); + assertThat(scoreEval.discriminator(), instanceOf(ReferenceAttribute.class)); + assertThat(scoreEval.discriminator().name(), equalTo("_fork")); - assertThat(rrf.child(), instanceOf(Fork.class)); + assertThat(scoreEval.child(), instanceOf(Fork.class)); } public void testFuseError() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java index d985937544436..49dce0645de6a 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/parser/StatementParserTests.java @@ -45,7 +45,6 @@ import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThanOrEqual; import org.elasticsearch.xpack.esql.plan.IndexPattern; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; -import org.elasticsearch.xpack.esql.plan.logical.Dedup; import org.elasticsearch.xpack.esql.plan.logical.Dissect; import org.elasticsearch.xpack.esql.plan.logical.Drop; import org.elasticsearch.xpack.esql.plan.logical.Enrich; @@ -64,9 +63,9 @@ import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.Rename; import org.elasticsearch.xpack.esql.plan.logical.Row; -import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval; import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation; +import org.elasticsearch.xpack.esql.plan.logical.fuse.Fuse; import org.elasticsearch.xpack.esql.plan.logical.inference.Completion; import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank; import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes; @@ -3973,22 +3972,17 @@ public void testValidFuse() { | FUSE """); - var dedup = as(plan, Dedup.class); - assertThat(dedup.groupings().size(), equalTo(2)); - assertThat(dedup.groupings().get(0), instanceOf(UnresolvedAttribute.class)); - assertThat(dedup.groupings().get(0).name(), equalTo("_id")); - assertThat(dedup.groupings().get(1), instanceOf(UnresolvedAttribute.class)); - assertThat(dedup.groupings().get(1).name(), equalTo("_index")); - assertThat(dedup.aggregates().size(), equalTo(1)); - assertThat(dedup.aggregates().get(0), instanceOf(Alias.class)); - - var rrfScoreEval = as(dedup.child(), RrfScoreEval.class); - assertThat(rrfScoreEval.scoreAttribute(), instanceOf(UnresolvedAttribute.class)); - assertThat(rrfScoreEval.scoreAttribute().name(), equalTo("_score")); - assertThat(rrfScoreEval.forkAttribute(), instanceOf(UnresolvedAttribute.class)); - assertThat(rrfScoreEval.forkAttribute().name(), equalTo("_fork")); - - assertThat(rrfScoreEval.child(), instanceOf(Fork.class)); + var fuse = as(plan, Fuse.class); + assertThat(fuse.groupings().size(), equalTo(2)); + assertThat(fuse.groupings().get(0), instanceOf(UnresolvedAttribute.class)); + assertThat(fuse.groupings().get(0).name(), equalTo("_id")); + assertThat(fuse.groupings().get(1), instanceOf(UnresolvedAttribute.class)); + assertThat(fuse.groupings().get(1).name(), equalTo("_index")); + assertThat(fuse.discriminator().name(), equalTo("_fork")); + assertThat(fuse.score().name(), equalTo("_score")); + assertThat(fuse.fuseType(), equalTo(Fuse.FuseType.RRF)); + + assertThat(fuse.child(), instanceOf(Fork.class)); } public void testDoubleParamsForIdentifier() { diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml index 886ca8f6b0b6d..f670b7e639764 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/esql/60_usage.yml @@ -34,20 +34,20 @@ setup: parameters: [] # A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise. capabilities: - - snapshot_test_for_telemetry + - snapshot_test_for_telemetry_v2 - fn_byte_length - match_function_options - first_over_time - sum_over_time - count_over_time - - distinct_over_time + - count_distinct_over_time - cosine_vector_similarity_function reason: "Test that should only be executed on snapshot versions" - do: {xpack.usage: {}} - match: { esql.available: true } - match: { esql.enabled: true } - - length: { esql.features: 27 } + - length: { esql.features: 26 } - set: {esql.features.dissect: dissect_counter} - set: {esql.features.drop: drop_counter} - set: {esql.features.eval: eval_counter} @@ -68,10 +68,9 @@ setup: - set: {esql.features.change_point: change_point_counter} - set: {esql.features.inlinestats: inlinestats_counter} - set: {esql.features.rerank: rerank_counter} - - set: {esql.features.dedup: dedup_counter} - set: {esql.features.insist: insist_counter} - set: {esql.features.fork: fork_counter} - - set: {esql.features.rrf: rrf_counter} + - set: {esql.features.fuse: fuse_counter} - set: {esql.features.completion: completion_counter} - set: {esql.features.sample: sample_counter} - length: { esql.queries: 3 } @@ -112,10 +111,9 @@ setup: - match: {esql.features.change_point: $change_point_counter} - match: {esql.features.inlinestats: $inlinestats_counter} - match: {esql.features.rerank: $rerank_counter} - - match: {esql.features.dedup: $dedup_counter} - match: {esql.features.insist: $insist_counter} - match: {esql.features.fork: $fork_counter} - - match: {esql.features.rrf: $rrf_counter} + - match: {esql.features.fuse: $fuse_counter} - match: {esql.features.completion: $completion_counter} - match: {esql.features.sample: $sample_counter} - gt: {esql.queries.rest.total: $rest_total_counter} @@ -131,7 +129,7 @@ setup: - match: {esql.functions.coalesce: $functions_coalesce} - gt: {esql.functions.categorize: $functions_categorize} # Testing for the entire function set isn't feasible, so we just check that we return the correct count as an approximation. - - length: {esql.functions: 157} # check the "sister" test below for a likely update to the same esql.functions length check + - length: {esql.functions: 171} # check the "sister" test below for a likely update to the same esql.functions length check --- "Basic ESQL usage output (telemetry) non-snapshot version": @@ -141,13 +139,13 @@ setup: - method: POST path: /_query parameters: [] - capabilities: [ non_snapshot_test_for_telemetry, fn_contains ] + capabilities: [ non_snapshot_test_for_telemetry_v2, fn_contains ] reason: "Test that should only be executed on release versions" - do: {xpack.usage: {}} - match: { esql.available: true } - match: { esql.enabled: true } - - length: { esql.features: 27 } + - length: { esql.features: 26 } - set: {esql.features.dissect: dissect_counter} - set: {esql.features.drop: drop_counter} - set: {esql.features.eval: eval_counter} @@ -168,10 +166,9 @@ setup: - set: {esql.features.change_point: change_point_counter} - set: {esql.features.inlinestats: inlinestats_counter} - set: {esql.features.rerank: rerank_counter} - - set: {esql.features.dedup: dedup_counter} - set: {esql.features.insist: insist_counter} - set: {esql.features.fork: fork_counter} - - set: {esql.features.rrf: rrf_counter} + - set: {esql.features.fuse: fuse_counter} - set: {esql.features.completion: completion_counter} - set: {esql.features.sample: sample_counter} - length: { esql.queries: 3 } @@ -212,10 +209,9 @@ setup: - match: {esql.features.change_point: $change_point_counter} - match: {esql.features.inlinestats: $inlinestats_counter} - match: {esql.features.rerank: $rerank_counter} - - match: {esql.features.dedup: $dedup_counter} - match: {esql.features.insist: $insist_counter} - match: {esql.features.fork: $fork_counter} - - match: {esql.features.rrf: $rrf_counter} + - match: {esql.features.fuse: $fuse_counter} - match: {esql.features.completion: $completion_counter} - gt: {esql.queries.rest.total: $rest_total_counter} - match: {esql.queries.rest.failed: $rest_failed_counter}