Skip to content

Commit e95971d

Browse files
authored
ES|QL: Set FUSE to execute on coordinator (elastic#135515)
1 parent 618bf1d commit e95971d

File tree

4 files changed

+37
-2
lines changed

4 files changed

+37
-2
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/fuse.csv-spec

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,7 @@ required_capability: fuse_v6
762762

763763
ROW _id = ["a", "b", "c"], _score = 0.0, _index = "my_index", my_fork = "foo"
764764
| MV_EXPAND _id
765+
| LIMIT 10
765766
| FUSE LINEAR GROUP BY my_fork WITH { "normalizer": "l2_norm" }
766767
| SORT _id
767768
| KEEP _id, _score
@@ -779,6 +780,7 @@ required_capability: fuse_v6
779780

780781
ROW _id = ["a", "b", "c"], _score = 0.0, _index = "my_index", my_fork = "foo"
781782
| MV_EXPAND _id
783+
| LIMIT 10
782784
| FUSE LINEAR GROUP BY my_fork WITH { "normalizer": "minmax" }
783785
| SORT _id
784786
| KEEP _id, _score

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/Fuse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1919
import org.elasticsearch.xpack.esql.core.tree.Source;
2020
import org.elasticsearch.xpack.esql.core.type.DataType;
21+
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
2122
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
2223
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
2324

@@ -26,7 +27,7 @@
2627

2728
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2829

29-
public class Fuse extends UnaryPlan implements TelemetryAware, PostAnalysisVerificationAware {
30+
public class Fuse extends UnaryPlan implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {
3031
private final Attribute score;
3132
private final Attribute discriminator;
3233
private final List<NamedExpression> keys;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2727
import org.elasticsearch.xpack.esql.core.tree.Source;
2828
import org.elasticsearch.xpack.esql.core.type.DataType;
29+
import org.elasticsearch.xpack.esql.core.util.Holder;
30+
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
2931
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
32+
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
3033
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
3134

3235
import java.io.IOException;
@@ -36,7 +39,12 @@
3639
import java.util.Map;
3740
import java.util.Objects;
3841

39-
public class FuseScoreEval extends UnaryPlan implements LicenseAware, PostAnalysisVerificationAware {
42+
public class FuseScoreEval extends UnaryPlan
43+
implements
44+
LicenseAware,
45+
PostAnalysisVerificationAware,
46+
ExecutesOn.Coordinator,
47+
PipelineBreaker {
4048
private final Attribute discriminatorAttr;
4149
private final Attribute scoreAttr;
4250
private final Fuse.FuseType fuseType;
@@ -122,6 +130,7 @@ public boolean licenseCheck(XPackLicenseState state) {
122130

123131
@Override
124132
public void postAnalysisVerification(Failures failures) {
133+
validateLimitedInput(failures);
125134
if (options == null) {
126135
return;
127136
}
@@ -132,6 +141,24 @@ public void postAnalysisVerification(Failures failures) {
132141
}
133142
}
134143

144+
private void validateLimitedInput(Failures failures) {
145+
var myself = this;
146+
Holder<Boolean> hasLimitedInput = new Holder<>(false);
147+
this.forEachUp(LogicalPlan.class, plan -> {
148+
if (plan == myself) {
149+
return;
150+
}
151+
152+
if (plan instanceof PipelineBreaker) {
153+
hasLimitedInput.set(true);
154+
}
155+
});
156+
157+
if (hasLimitedInput.get() == false) {
158+
failures.add(new Failure(this, "FUSE can only be used on a limited number of rows. Consider adding a LIMIT before FUSE."));
159+
}
160+
}
161+
135162
private void validateRrfOptions(Failures failures) {
136163
options.keyFoldedMap().forEach((key, value) -> {
137164
if (key.equals(RrfConfig.RANK_CONSTANT)) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2681,6 +2681,11 @@ public void testFuse() {
26812681
error(queryPrefix + " | fuse linear KEY BY _score"),
26822682
containsString("expected KEY BY field [_score] to be KEYWORD or TEXT, not DOUBLE")
26832683
);
2684+
2685+
assertThat(
2686+
error("FROM test METADATA _index, _score, _id | EVAL _fork = \"fork1\" | FUSE"),
2687+
containsString("FUSE can only be used on a limited number of rows. Consider adding a LIMIT before FUSE.")
2688+
);
26842689
}
26852690

26862691
public void testNoMetricInStatsByClause() {

0 commit comments

Comments
 (0)