Skip to content

Commit e3d7ba3

Browse files
authored
ESQL: Provide appropriate messages when can't execute INLINESTATS (#134201)
This adds the guards to reject queries that INLINESTATS cannot currently execute: - SORT with no LIMIT before INLINESTATS - CATEGORIZE with INLINESTATS. Closes #124725
1 parent 18037bb commit e3d7ba3

File tree

9 files changed

+178
-11
lines changed

9 files changed

+178
-11
lines changed

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/generative/GenerativeRestTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public abstract class GenerativeRestTest extends ESRestTestCase {
5050
"cannot sort on .*",
5151
"argument of \\[count.*\\] must",
5252
"Cannot use field \\[.*\\] with unsupported type \\[.*\\]",
53-
"Unbounded sort not supported yet",
53+
"Unbounded SORT not supported yet",
5454
"The field names are too complex to process", // field_caps problem
5555
"must be \\[any type except counter types\\]", // TODO refine the generation of count()
5656

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/capabilities/PostAnalysisPlanVerificationAware.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
* Interface implemented by expressions or plans that require validation after query plan analysis,
1919
* when the indices and references have been resolved, but before the plan is transformed further by optimizations.
2020
* The interface is similar to {@link PostAnalysisVerificationAware}, but focused on the tree structure, oftentimes covering semantic
21-
* checks.
21+
* checks. Generally, whenever one needs to check the plan structure leading to a certain node, which is the node of interest, this node's
22+
* class needs to implement this interface. Otherwise it may implement {@link PostAnalysisVerificationAware}, as more convenient.
2223
*/
2324
public interface PostAnalysisPlanVerificationAware {
2425

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.capabilities;
9+
10+
import org.elasticsearch.xpack.esql.common.Failures;
11+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
12+
13+
import java.util.function.BiConsumer;
14+
15+
/**
16+
* Interface implemented by expressions that require validation post logical optimization,
17+
* when the plan and references have been not just resolved but also replaced.
18+
* The interface is similar to {@link PostOptimizationVerificationAware}, but focused on individual expressions or plans, typically
19+
* covering semantic checks. Generally, whenever one needs to check the plan structure leading to a certain node, which is the node of
20+
* interest, this node's class needs to implement this interface. Otherwise it may implement {@link PostOptimizationVerificationAware},
21+
* as more convenient.
22+
*/
23+
public interface PostOptimizationPlanVerificationAware {
24+
25+
/**
26+
* Validates the implementing expression - discovered failures are reported to the given {@link Failures} class.
27+
*
28+
* <p>
29+
* Example: the SORT command, {@code OrderBy}, can only be executed currently if it can be associated with a LIMIT {@code Limit}
30+
* and together transformed into a {@code TopN} (which is executable). The replacement of the LIMIT+SORT into a TopN is done at
31+
* the end of the optimization phase. This means that any SORT still existing in the plan post optimization is an error.
32+
* However, there can be a LIMIT in the plan, but separated from the SORT by an INLINESTATS; in this case, the LIMIT cannot be
33+
* pushed down near the SORT. To inform the user how they need to modify the query so it can be run, we implement this:
34+
* <pre>
35+
* {@code
36+
*
37+
* @Override
38+
* public BiConsumer<LogicalPlan, Failures> postOptimizationPlanVerification() {
39+
* return (p, failures) -> {
40+
* if (p instanceof InlineJoin inlineJoin) {
41+
* inlineJoin.forEachUp(OrderBy.class, orderBy -> {
42+
* failures.add(
43+
* fail(
44+
* inlineJoin,
45+
* "unbounded sort [{}] not supported before inlinestats [{}], move the sort after the inlinestats",
46+
* orderBy.sourceText(),
47+
* inlineJoin.sourceText()
48+
* )
49+
* );
50+
* });
51+
* } else if (p instanceof OrderBy) {
52+
* failures.add(fail(p, "Unbounded SORT not supported yet [{}] please add a LIMIT", p.sourceText()));
53+
* }
54+
* };
55+
* }
56+
* }
57+
* </pre>
58+
* <p>
59+
* If we didn't need to check the structure of the plan, it would have sufficed to implement the
60+
* {@link PostOptimizationVerificationAware} interface, which would simply check if there is an instance of {@code OrderBy} in the
61+
* plan.
62+
*
63+
* @return a consumer that will receive a tree to check and an accumulator of failures found during inspection.
64+
*/
65+
BiConsumer<LogicalPlan, Failures> postOptimizationPlanVerification();
66+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Categorize.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import org.elasticsearch.license.XPackLicenseState;
1717
import org.elasticsearch.xpack.esql.LicenseAware;
1818
import org.elasticsearch.xpack.esql.SupportsObservabilityTier;
19+
import org.elasticsearch.xpack.esql.common.Failures;
1920
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
21+
import org.elasticsearch.xpack.esql.core.expression.Alias;
2022
import org.elasticsearch.xpack.esql.core.expression.Expression;
2123
import org.elasticsearch.xpack.esql.core.expression.MapExpression;
2224
import org.elasticsearch.xpack.esql.core.expression.Nullability;
@@ -33,6 +35,9 @@
3335
import org.elasticsearch.xpack.esql.expression.function.Options;
3436
import org.elasticsearch.xpack.esql.expression.function.Param;
3537
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
38+
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
39+
import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
40+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3641
import org.elasticsearch.xpack.ml.MachineLearning;
3742

3843
import java.io.IOException;
@@ -41,11 +46,13 @@
4146
import java.util.Locale;
4247
import java.util.Map;
4348
import java.util.TreeMap;
49+
import java.util.function.BiConsumer;
4450

4551
import static java.util.Map.entry;
4652
import static org.elasticsearch.common.logging.LoggerMessageFormat.format;
4753
import static org.elasticsearch.compute.aggregation.blockhash.BlockHash.CategorizeDef.OutputFormat.REGEX;
4854
import static org.elasticsearch.xpack.esql.SupportsObservabilityTier.ObservabilityTier.COMPLETE;
55+
import static org.elasticsearch.xpack.esql.common.Failure.fail;
4956
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
5057
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
5158
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isString;
@@ -248,4 +255,26 @@ public String toString() {
248255
public boolean licenseCheck(XPackLicenseState state) {
249256
return MachineLearning.CATEGORIZE_TEXT_AGG_FEATURE.check(state);
250257
}
258+
259+
@Override
260+
public BiConsumer<LogicalPlan, Failures> postAnalysisPlanVerification() {
261+
return (p, failures) -> {
262+
super.postAnalysisPlanVerification().accept(p, failures);
263+
264+
if (p instanceof InlineStats inlineStats && inlineStats.child() instanceof Aggregate aggregate) {
265+
aggregate.groupings().forEach(grp -> {
266+
if (grp instanceof Alias alias && alias.child() instanceof Categorize categorize) {
267+
failures.add(
268+
fail(
269+
categorize,
270+
"CATEGORIZE [{}] is not yet supported with INLINESTATS [{}]",
271+
categorize.sourceText(),
272+
inlineStats.sourceText()
273+
)
274+
);
275+
}
276+
});
277+
}
278+
};
279+
}
251280
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalVerifier.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,17 @@
77

88
package org.elasticsearch.xpack.esql.optimizer;
99

10+
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationPlanVerificationAware;
1011
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
1112
import org.elasticsearch.xpack.esql.common.Failures;
1213
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
1314
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1415
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
1516

17+
import java.util.ArrayList;
18+
import java.util.List;
19+
import java.util.function.BiConsumer;
20+
1621
public final class LogicalVerifier extends PostOptimizationPhasePlanVerifier<LogicalPlan> {
1722

1823
public static final LogicalVerifier INSTANCE = new LogicalVerifier();
@@ -33,19 +38,26 @@ boolean skipVerification(LogicalPlan optimizedPlan, boolean skipRemoteEnrichVeri
3338

3439
@Override
3540
void checkPlanConsistency(LogicalPlan optimizedPlan, Failures failures, Failures depFailures) {
41+
List<BiConsumer<LogicalPlan, Failures>> checkers = new ArrayList<>();
42+
3643
optimizedPlan.forEachUp(p -> {
3744
PlanConsistencyChecker.checkPlan(p, depFailures);
3845

3946
if (failures.hasFailures() == false) {
4047
if (p instanceof PostOptimizationVerificationAware pova) {
4148
pova.postOptimizationVerification(failures);
4249
}
50+
if (p instanceof PostOptimizationPlanVerificationAware popva) {
51+
checkers.add(popva.postOptimizationPlanVerification());
52+
}
4353
p.forEachExpression(ex -> {
4454
if (ex instanceof PostOptimizationVerificationAware va) {
4555
va.postOptimizationVerification(failures);
4656
}
4757
});
4858
}
4959
});
60+
61+
optimizedPlan.forEachUp(p -> checkers.forEach(checker -> checker.accept(p, failures)));
5062
}
5163
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/OrderBy.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.elasticsearch.common.io.stream.StreamInput;
1111
import org.elasticsearch.common.io.stream.StreamOutput;
1212
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
13-
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationVerificationAware;
13+
import org.elasticsearch.xpack.esql.capabilities.PostOptimizationPlanVerificationAware;
1414
import org.elasticsearch.xpack.esql.capabilities.TelemetryAware;
1515
import org.elasticsearch.xpack.esql.common.Failures;
1616
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
@@ -19,17 +19,19 @@
1919
import org.elasticsearch.xpack.esql.core.type.DataType;
2020
import org.elasticsearch.xpack.esql.expression.Order;
2121
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
22+
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
2223

2324
import java.io.IOException;
2425
import java.util.List;
2526
import java.util.Objects;
27+
import java.util.function.BiConsumer;
2628

2729
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2830

2931
public class OrderBy extends UnaryPlan
3032
implements
3133
PostAnalysisVerificationAware,
32-
PostOptimizationVerificationAware,
34+
PostOptimizationPlanVerificationAware,
3335
TelemetryAware,
3436
SortAgnostic,
3537
PipelineBreaker {
@@ -118,7 +120,25 @@ public void postAnalysisVerification(Failures failures) {
118120
}
119121

120122
@Override
121-
public void postOptimizationVerification(Failures failures) {
122-
failures.add(fail(this, "Unbounded sort not supported yet [{}] please add a limit", this.sourceText()));
123+
public BiConsumer<LogicalPlan, Failures> postOptimizationPlanVerification() {
124+
return (p, failures) -> {
125+
if (p instanceof InlineJoin inlineJoin) {
126+
inlineJoin.left()
127+
.forEachUp(
128+
OrderBy.class,
129+
orderBy -> failures.add(
130+
fail(
131+
inlineJoin,
132+
"INLINESTATS [{}] cannot yet have an unbounded SORT [{}] before it : either move the SORT after it,"
133+
+ " or add a LIMIT before the SORT",
134+
inlineJoin.sourceText(),
135+
orderBy.sourceText()
136+
)
137+
)
138+
);
139+
} else if (p instanceof OrderBy) {
140+
failures.add(fail(p, "Unbounded SORT not supported yet [{}] please add a LIMIT", p.sourceText()));
141+
}
142+
};
123143
}
124144
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/VerifierTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,6 +2068,26 @@ public void testCategorizeOptionSimilarityThreshold() {
20682068
);
20692069
}
20702070

2071+
public void testCategorizeWithInlineStats() {
2072+
assertEquals(
2073+
"1:37: CATEGORIZE [CATEGORIZE(last_name, { \"similarity_threshold\": 1 })] is not yet supported with "
2074+
+ "INLINESTATS [INLINESTATS COUNT(*) BY CATEGORIZE(last_name, { \"similarity_threshold\": 1 })]",
2075+
error("FROM test | INLINESTATS COUNT(*) BY CATEGORIZE(last_name, { \"similarity_threshold\": 1 })")
2076+
);
2077+
2078+
assertEquals("""
2079+
3:35: CATEGORIZE [CATEGORIZE(gender)] is not yet supported with \
2080+
INLINESTATS [INLINESTATS SUM(salary) BY c3 = CATEGORIZE(gender)]
2081+
line 2:91: CATEGORIZE grouping function [CATEGORIZE(first_name)] can only be in the first grouping expression
2082+
line 2:32: CATEGORIZE [CATEGORIZE(last_name, { "similarity_threshold": 1 })] is not yet supported with \
2083+
INLINESTATS [INLINESTATS COUNT(*) BY c1 = CATEGORIZE(last_name, { "similarity_threshold": 1 }), \
2084+
c2 = CATEGORIZE(first_name)]""", error("""
2085+
FROM test
2086+
| INLINESTATS COUNT(*) BY c1 = CATEGORIZE(last_name, { "similarity_threshold": 1 }), c2 = CATEGORIZE(first_name)
2087+
| INLINESTATS SUM(salary) BY c3 = CATEGORIZE(gender)
2088+
"""));
2089+
}
2090+
20712091
public void testChangePoint() {
20722092
assumeTrue("change_point must be enabled", EsqlCapabilities.Cap.CHANGE_POINT.isEnabled());
20732093
var airports = AnalyzerTestUtils.analyzer(loadMapping("mapping-airports.json", "airports"));

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8269,7 +8269,7 @@ public void testUnboundedSortSimple() {
82698269
""";
82708270

82718271
VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
8272-
assertThat(e.getMessage(), containsString("line 2:5: Unbounded sort not supported yet [SORT y] please add a limit"));
8272+
assertThat(e.getMessage(), containsString("line 2:5: Unbounded SORT not supported yet [SORT y] please add a LIMIT"));
82738273
}
82748274

82758275
public void testUnboundedSortJoin() {
@@ -8281,7 +8281,7 @@ public void testUnboundedSortJoin() {
82818281
""";
82828282

82838283
VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
8284-
assertThat(e.getMessage(), containsString("line 2:5: Unbounded sort not supported yet [SORT y] please add a limit"));
8284+
assertThat(e.getMessage(), containsString("line 2:5: Unbounded SORT not supported yet [SORT y] please add a LIMIT"));
82858285
}
82868286

82878287
public void testUnboundedSortWithMvExpandAndFilter() {
@@ -8296,7 +8296,7 @@ public void testUnboundedSortWithMvExpandAndFilter() {
82968296
""";
82978297

82988298
VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
8299-
assertThat(e.getMessage(), containsString("line 4:3: Unbounded sort not supported yet [SORT language_name] please add a limit"));
8299+
assertThat(e.getMessage(), containsString("line 4:3: Unbounded SORT not supported yet [SORT language_name] please add a LIMIT"));
83008300
}
83018301

83028302
public void testUnboundedSortWithLookupJoinAndFilter() {
@@ -8311,7 +8311,7 @@ public void testUnboundedSortWithLookupJoinAndFilter() {
83118311
""";
83128312

83138313
VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
8314-
assertThat(e.getMessage(), containsString("line 5:3: Unbounded sort not supported yet [SORT foo] please add a limit"));
8314+
assertThat(e.getMessage(), containsString("line 5:3: Unbounded SORT not supported yet [SORT foo] please add a LIMIT"));
83158315
}
83168316

83178317
public void testUnboundedSortExpandFilter() {
@@ -8323,7 +8323,7 @@ public void testUnboundedSortExpandFilter() {
83238323
""";
83248324

83258325
VerificationException e = expectThrows(VerificationException.class, () -> plan(query));
8326-
assertThat(e.getMessage(), containsString("line 2:5: Unbounded sort not supported yet [SORT x] please add a limit"));
8326+
assertThat(e.getMessage(), containsString("line 2:5: Unbounded SORT not supported yet [SORT x] please add a LIMIT"));
83278327
}
83288328

83298329
public void testPruneRedundantOrderBy() {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/OptimizerVerificationTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.elasticsearch.xpack.esql.plugin.EsqlPlugin.INLINESTATS_FEATURE_FLAG;
3131
import static org.hamcrest.Matchers.containsString;
3232
import static org.hamcrest.Matchers.instanceOf;
33+
import static org.hamcrest.Matchers.is;
3334

3435
public class OptimizerVerificationTests extends AbstractLogicalPlanOptimizerTests {
3536

@@ -428,4 +429,22 @@ public void testRemoteEnrichAfterLookupJoinWithPipelineBreaker() {
428429
containsString("4:3: LOOKUP JOIN with remote indices can't be executed after [ENRICH _coordinator:languages_coord]@3:3")
429430
);
430431
}
432+
433+
public void testDanglingOrderByInInlineStats() {
434+
var analyzer = AnalyzerTestUtils.analyzer(loadMapping("mapping-default.json", "test"));
435+
436+
var err = error("""
437+
FROM test
438+
| SORT languages
439+
| INLINESTATS count(*) BY languages
440+
| INLINESTATS s = sum(salary) BY first_name
441+
""", analyzer);
442+
443+
assertThat(err, is("""
444+
2:3: Unbounded SORT not supported yet [SORT languages] please add a LIMIT
445+
line 3:3: INLINESTATS [INLINESTATS count(*) BY languages] cannot yet have an unbounded SORT [SORT languages] before\
446+
it : either move the SORT after it, or add a LIMIT before the SORT
447+
line 4:3: INLINESTATS [INLINESTATS s = sum(salary) BY first_name] cannot yet have an unbounded SORT [SORT languages]\
448+
before it : either move the SORT after it, or add a LIMIT before the SORT"""));
449+
}
431450
}

0 commit comments

Comments
 (0)