Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ required_capability: fuse_v6

ROW _id = ["a", "b", "c"], _score = 0.0, _index = "my_index", my_fork = "foo"
| MV_EXPAND _id
| LIMIT 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - I understand why this is needed, but probably makes no sense in practice. Maybe we can check that there's a ROW at the top and avoid the need for LIMIT in that case, as input will be already limited?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the usage of ROW is limited in practice, especially in combination with FUSE, but I added a note in #123389 and will definitely look into this as a follow up.

| FUSE LINEAR GROUP BY my_fork WITH { "normalizer": "l2_norm" }
| SORT _id
| KEEP _id, _score
Expand All @@ -779,6 +780,7 @@ required_capability: fuse_v6

ROW _id = ["a", "b", "c"], _score = 0.0, _index = "my_index", my_fork = "foo"
| MV_EXPAND _id
| LIMIT 10
| FUSE LINEAR GROUP BY my_fork WITH { "normalizer": "minmax" }
| SORT _id
| KEEP _id, _score
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;

Expand All @@ -26,7 +27,7 @@

import static org.elasticsearch.xpack.esql.common.Failure.fail;

public class Fuse extends UnaryPlan implements TelemetryAware, PostAnalysisVerificationAware {
public class Fuse extends UnaryPlan implements TelemetryAware, PostAnalysisVerificationAware, ExecutesOn.Coordinator {
private final Attribute score;
private final Attribute discriminator;
private final List<NamedExpression> keys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;

import java.io.IOException;
Expand All @@ -36,7 +39,12 @@
import java.util.Map;
import java.util.Objects;

public class FuseScoreEval extends UnaryPlan implements LicenseAware, PostAnalysisVerificationAware {
public class FuseScoreEval extends UnaryPlan
implements
LicenseAware,
PostAnalysisVerificationAware,
ExecutesOn.Coordinator,
PipelineBreaker {
private final Attribute discriminatorAttr;
private final Attribute scoreAttr;
private final Fuse.FuseType fuseType;
Expand Down Expand Up @@ -122,6 +130,7 @@ public boolean licenseCheck(XPackLicenseState state) {

@Override
public void postAnalysisVerification(Failures failures) {
validateLimitedInput(failures);
if (options == null) {
return;
}
Expand All @@ -132,6 +141,24 @@ public void postAnalysisVerification(Failures failures) {
}
}

private void validateLimitedInput(Failures failures) {
var myself = this;
Holder<Boolean> hasLimitedInput = new Holder<>(false);
this.forEachUp(LogicalPlan.class, plan -> {
if (plan == myself) {
return;
}

if (plan instanceof PipelineBreaker) {
hasLimitedInput.set(true);
}
});

if (hasLimitedInput.get() == false) {
failures.add(new Failure(this, "FUSE can only be used on a limited number of rows. Consider adding a LIMIT before FUSE."));
}
}

private void validateRrfOptions(Failures failures) {
options.keyFoldedMap().forEach((key, value) -> {
if (key.equals(RrfConfig.RANK_CONSTANT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2681,6 +2681,11 @@ public void testFuse() {
error(queryPrefix + " | fuse linear KEY BY _score"),
containsString("expected KEY BY field [_score] to be KEYWORD or TEXT, not DOUBLE")
);

assertThat(
error("FROM test METADATA _index, _score, _id | EVAL _fork = \"fork1\" | FUSE"),
containsString("FUSE can only be used on a limited number of rows. Consider adding a LIMIT before FUSE.")
);
}

public void testNoMetricInStatsByClause() {
Expand Down