diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec index fe7788beea1b8..97a8f6345adb0 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec @@ -762,6 +762,7 @@ required_capability: fuse_v6 ROW _id = ["a", "b", "c"], _score = 0.0, _index = "my_index", my_fork = "foo" | MV_EXPAND _id +| LIMIT 10 | FUSE LINEAR GROUP BY my_fork WITH { "normalizer": "l2_norm" } | SORT _id | KEEP _id, _score @@ -779,6 +780,7 @@ required_capability: fuse_v6 ROW _id = ["a", "b", "c"], _score = 0.0, _index = "my_index", my_fork = "foo" | MV_EXPAND _id +| LIMIT 10 | FUSE LINEAR GROUP BY my_fork WITH { "normalizer": "minmax" } | SORT _id | KEEP _id, _score diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/Fuse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/Fuse.java index 170998d1b5f6f..da657a703d6e4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/Fuse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/Fuse.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; @@ -26,7 +27,7 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail; -public class Fuse extends UnaryPlan implements TelemetryAware, PostAnalysisVerificationAware { +public class Fuse extends UnaryPlan implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator { private final Attribute score; private final Attribute discriminator; private final List keys; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java index 9da9899c9bb6f..9863f9fba4af7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java @@ -26,7 +26,10 @@ import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import java.io.IOException; @@ -36,7 +39,12 @@ import java.util.Map; import java.util.Objects; -public class FuseScoreEval extends UnaryPlan implements LicenseAware, PostAnalysisVerificationAware { +public class FuseScoreEval extends UnaryPlan + implements + LicenseAware, + PostAnalysisVerificationAware, + ExecutesOn.Coordinator, + PipelineBreaker { private final Attribute discriminatorAttr; private final Attribute scoreAttr; private final Fuse.FuseType fuseType; @@ -122,6 +130,7 @@ public boolean licenseCheck(XPackLicenseState state) { @Override public void postAnalysisVerification(Failures failures) { + validateLimitedInput(failures); if (options == null) { return; } @@ -132,6 +141,24 @@ public void postAnalysisVerification(Failures failures) { } } + private void validateLimitedInput(Failures failures) { + var myself = this; + Holder hasLimitedInput = new Holder<>(false); + this.forEachUp(LogicalPlan.class, plan -> { + if (plan == myself) { + return; + } + + if (plan instanceof PipelineBreaker) { + hasLimitedInput.set(true); + } + }); + + if (hasLimitedInput.get() == false) { + failures.add(new Failure(this, "FUSE can only be used on a limited number of rows. Consider adding a LIMIT before FUSE.")); + } + } + private void validateRrfOptions(Failures failures) { options.keyFoldedMap().forEach((key, value) -> { if (key.equals(RrfConfig.RANK_CONSTANT)) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java index bdf824443d0de..6c559bfb77146 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java @@ -2681,6 +2681,11 @@ public void testFuse() { error(queryPrefix + " | fuse linear KEY BY _score"), containsString("expected KEY BY field [_score] to be KEYWORD or TEXT, not DOUBLE") ); + + assertThat( + error("FROM test METADATA _index, _score, _id | EVAL _fork = \"fork1\" | FUSE"), + containsString("FUSE can only be used on a limited number of rows. Consider adding a LIMIT before FUSE.") + ); } public void testNoMetricInStatsByClause() {