Skip to content

Commit 34118cf

Browse files
authored
ES|QL: Check for unsupported fields in FUSE (#135530)
1 parent e426757 commit 34118cf

File tree

3 files changed

+38
-3
lines changed

3 files changed

+38
-3
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1054,7 +1054,13 @@ private LogicalPlan resolveFuse(Fuse fuse, List<Attribute> childrenOutput) {
10541054
if (attr.name().equals(score.name())) {
10551055
continue;
10561056
}
1057-
aggregates.add(new Alias(source, attr.name(), new Values(source, attr, aggFilter)));
1057+
var valuesAgg = new Values(source, attr, aggFilter);
1058+
// Use VALUES only on supported fields.
1059+
// FuseScoreEval will check that the input contains only columns with supported data types
1060+
// and will fail with an appropriate error message if it doesn't.
1061+
if (valuesAgg.resolved()) {
1062+
aggregates.add(new Alias(source, attr.name(), valuesAgg));
1063+
}
10581064
}
10591065

10601066
return resolveAggregate(new Aggregate(source, scoreEval, new ArrayList<>(keys), aggregates), childrenOutput);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.xpack.esql.core.tree.Source;
2828
import org.elasticsearch.xpack.esql.core.type.DataType;
2929
import org.elasticsearch.xpack.esql.core.util.Holder;
30+
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
3031
import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn;
3132
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3233
import org.elasticsearch.xpack.esql.plan.logical.PipelineBreaker;
@@ -130,7 +131,8 @@ public boolean licenseCheck(XPackLicenseState state) {
130131

131132
@Override
132133
public void postAnalysisVerification(Failures failures) {
133-
validateLimitedInput(failures);
134+
validateInput(failures);
135+
validatePipelineBreakerBeforeFuse(failures);
134136
if (options == null) {
135137
return;
136138
}
@@ -141,7 +143,27 @@ public void postAnalysisVerification(Failures failures) {
141143
}
142144
}
143145

144-
private void validateLimitedInput(Failures failures) {
146+
private void validateInput(Failures failures) {
147+
// Since we use STATS BY to merge rows together, we need to make sure that all columns can be used in STATS BY.
148+
// When the input of FUSE contains unsupported columns, we don't want to fail with a STATS BY validation error,
149+
// but with an error specific to FUSE.
150+
Expression aggFilter = new Literal(source(), true, DataType.BOOLEAN);
151+
152+
for (Attribute attr : child().output()) {
153+
var valuesAgg = new Values(source(), attr, aggFilter);
154+
155+
if (valuesAgg.resolved() == false) {
156+
failures.add(
157+
new Failure(
158+
this,
159+
"cannot use [" + attr.name() + "] as an input of FUSE. Consider using [DROP " + attr.name() + "] before FUSE."
160+
)
161+
);
162+
}
163+
}
164+
}
165+
166+
private void validatePipelineBreakerBeforeFuse(Failures failures) {
145167
var myself = this;
146168
Holder<Boolean> hasLimitedInput = new Holder<>(false);
147169
this.forEachUp(LogicalPlan.class, plan -> {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,13 @@ public void testUnsupportedAndMultiTypedFields() {
303303
)
304304
);
305305
}
306+
307+
if (EsqlCapabilities.Cap.FUSE_V6.isEnabled()) {
308+
assertEquals(
309+
"1:76: cannot use [double] as an input of FUSE. Consider using [DROP double] before FUSE.",
310+
error("from test* METADATA _id, _index, _score | FORK (where true) (where true) | FUSE", analyzer)
311+
);
312+
}
306313
}
307314

308315
public void testRoundFunctionInvalidInputs() {

0 commit comments

Comments
 (0)