Skip to content

Commit 141c0dc

Browse files
committed
error on seed in sampling operator
1 parent 1e372a3 commit 141c0dc

File tree

7 files changed

+91
-19
lines changed

7 files changed

+91
-19
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.capabilities;
9+
10+
import org.elasticsearch.xpack.esql.common.Failures;
11+
12+
/**
13+
* Interface implemented by expressions that require validation post physical optimization.
14+
*/
15+
public interface PostPhysicalOptimizationVerificationAware {
16+
17+
/**
18+
* Validates the implementing expression - discovered failures are reported to the given
19+
* {@link Failures} class.
20+
*/
21+
void postPhysicalOptimizationVerification(Failures failures);
22+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.elasticsearch.xpack.esql.VerificationException;
11-
import org.elasticsearch.xpack.esql.common.Failure;
11+
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
1313
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
1414
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.xpack.esql.rule.Rule;
2525

2626
import java.util.ArrayList;
27-
import java.util.Collection;
2827
import java.util.List;
2928

3029
/**
@@ -46,8 +45,8 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) {
4645
}
4746

4847
PhysicalPlan verify(PhysicalPlan plan) {
49-
Collection<Failure> failures = verifier.verify(plan);
50-
if (failures.isEmpty() == false) {
48+
Failures failures = verifier.verify(plan);
49+
if (failures.hasFailures()) {
5150
throw new VerificationException(failures);
5251
}
5352
return plan;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.elasticsearch.xpack.esql.VerificationException;
11-
import org.elasticsearch.xpack.esql.common.Failure;
11+
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
1313
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
1414
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1515
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
1616
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
1717

18-
import java.util.Collection;
1918
import java.util.List;
2019

2120
/**
@@ -39,8 +38,8 @@ public PhysicalPlan optimize(PhysicalPlan plan) {
3938
}
4039

4140
PhysicalPlan verify(PhysicalPlan plan) {
42-
Collection<Failure> failures = verifier.verify(plan);
43-
if (failures.isEmpty() == false) {
41+
Failures failures = verifier.verify(plan);
42+
if (failures.hasFailures()) {
4443
throw new VerificationException(failures);
4544
}
4645
return plan;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.optimizer;
99

10-
import org.elasticsearch.xpack.esql.common.Failure;
10+
import org.elasticsearch.xpack.esql.capabilities.PostPhysicalOptimizationVerificationAware;
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1313
import org.elasticsearch.xpack.esql.core.expression.Expressions;
@@ -17,10 +17,6 @@
1717
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
1818
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1919

20-
import java.util.Collection;
21-
import java.util.LinkedHashSet;
22-
import java.util.Set;
23-
2420
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2521

2622
/** Physical plan verifier. */
@@ -31,8 +27,8 @@ public final class PhysicalVerifier {
3127
private PhysicalVerifier() {}
3228

3329
/** Verifies the physical plan. */
34-
public Collection<Failure> verify(PhysicalPlan plan) {
35-
Set<Failure> failures = new LinkedHashSet<>();
30+
public Failures verify(PhysicalPlan plan) {
31+
Failures failures = new Failures();
3632
Failures depFailures = new Failures();
3733

3834
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
@@ -56,6 +52,17 @@ public Collection<Failure> verify(PhysicalPlan plan) {
5652
}
5753
}
5854
PlanConsistencyChecker.checkPlan(p, depFailures);
55+
56+
if (failures.hasFailures() == false) {
57+
if (p instanceof PostPhysicalOptimizationVerificationAware va) {
58+
va.postPhysicalOptimizationVerification(failures);
59+
}
60+
p.forEachExpression(ex -> {
61+
if (ex instanceof PostPhysicalOptimizationVerificationAware va) {
62+
va.postPhysicalOptimizationVerification(failures);
63+
}
64+
});
65+
}
5966
});
6067

6168
if (depFailures.hasFailures()) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/SampleExec.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.core.Nullable;
14+
import org.elasticsearch.xpack.esql.capabilities.PostPhysicalOptimizationVerificationAware;
15+
import org.elasticsearch.xpack.esql.common.Failures;
1416
import org.elasticsearch.xpack.esql.core.expression.Expression;
1517
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1618
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -19,7 +21,9 @@
1921
import java.io.IOException;
2022
import java.util.Objects;
2123

22-
public class SampleExec extends UnaryExec {
24+
import static org.elasticsearch.xpack.esql.common.Failure.fail;
25+
26+
public class SampleExec extends UnaryExec implements PostPhysicalOptimizationVerificationAware {
2327
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
2428
PhysicalPlan.class,
2529
"SampleExec",
@@ -96,4 +100,15 @@ public boolean equals(Object obj) {
96100

97101
return Objects.equals(child(), other.child()) && Objects.equals(probability, other.probability) && Objects.equals(seed, other.seed);
98102
}
103+
104+
@Override
105+
public void postPhysicalOptimizationVerification(Failures failures) {
106+
// It's currently impossible in ES|QL to handle all data in deterministic order, therefore
107+
// a fixed random seed in the sample operator doesn't work as intended and is disallowed.
108+
// TODO: fix this.
109+
if (seed != null) {
110+
// TODO: what should the error message here be? This doesn't seem right.
111+
failures.add(fail(seed, "Seed not supported when sampling can't be pushed down to Lucene"));
112+
}
113+
}
99114
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7701,6 +7701,8 @@ public void testPruneRedundantOrderBy() {
77017701
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
77027702
*/
77037703
public void testSampleMerged() {
7704+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7705+
77047706
var query = """
77057707
FROM TEST
77067708
| SAMPLE .3 5
@@ -7721,6 +7723,8 @@ public void testSampleMerged() {
77217723
}
77227724

77237725
public void testSamplePushDown() {
7726+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7727+
77247728
for (var command : List.of(
77257729
"ENRICH languages_idx on first_name",
77267730
"EVAL x = 1",
@@ -7745,6 +7749,8 @@ public void testSamplePushDown() {
77457749
}
77467750

77477751
public void testSamplePushDown_sort() {
7752+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7753+
77487754
var query = "FROM TEST | WHERE emp_no > 0 | SAMPLE 0.5 | LIMIT 100";
77497755
var optimized = optimizedPlan(query);
77507756

@@ -7758,6 +7764,8 @@ public void testSamplePushDown_sort() {
77587764
}
77597765

77607766
public void testSamplePushDown_where() {
7767+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7768+
77617769
var query = "FROM TEST | SORT emp_no | SAMPLE 0.5 | LIMIT 100";
77627770
var optimized = optimizedPlan(query);
77637771

@@ -7770,6 +7778,8 @@ public void testSamplePushDown_where() {
77707778
}
77717779

77727780
public void testSampleNoPushDown() {
7781+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7782+
77737783
for (var command : List.of("LIMIT 100", "MV_EXPAND languages", "STATS COUNT()")) {
77747784
var query = "FROM TEST | " + command + " | SAMPLE .5";
77757785
var optimized = optimizedPlan(query);
@@ -7789,7 +7799,9 @@ public void testSampleNoPushDown() {
77897799
* | \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
77907800
* \_EsRelation[languages_lookup][LOOKUP][language_code{f}#17, language_name{f}#18]
77917801
*/
7792-
public void testRandomSampleNoPushDownLookupJoin() {
7802+
public void testSampleNoPushDownLookupJoin() {
7803+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7804+
77937805
var query = """
77947806
FROM TEST
77957807
| EVAL language_code = emp_no
@@ -7814,6 +7826,8 @@ public void testRandomSampleNoPushDownLookupJoin() {
78147826
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
78157827
*/
78167828
public void testSampleNoPushDownChangePoint() {
7829+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7830+
78177831
var query = """
78187832
FROM TEST
78197833
| CHANGE_POINT emp_no ON hire_date

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7801,7 +7801,9 @@ public void testReductionPlanForAggs() {
78017801
* query[{"bool":{"filter":[{"sampling":{"probability":0.1,"seed":234,"hash":0}}],"boost":1.0}}]
78027802
* [_doc{f}#24], limit[1000], sort[] estimatedRowSize[332]
78037803
*/
7804-
public void testRandomSamplePushDown() {
7804+
public void testSamplePushDown() {
7805+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7806+
78057807
var plan = physicalPlan("""
78067808
FROM test
78077809
| SAMPLE +0.1 -234
@@ -7822,6 +7824,20 @@ public void testRandomSamplePushDown() {
78227824
assertThat(randomSampling.hash(), equalTo(0));
78237825
}
78247826

7827+
public void testSample_seedNotSupportedInOperator() {
7828+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7829+
7830+
optimizedPlan(physicalPlan("FROM test | SAMPLE 0.1"));
7831+
optimizedPlan(physicalPlan("FROM test | SAMPLE 0.1 42"));
7832+
optimizedPlan(physicalPlan("FROM test | MV_EXPAND first_name | SAMPLE 0.1"));
7833+
7834+
VerificationException e = expectThrows(
7835+
VerificationException.class,
7836+
() -> optimizedPlan(physicalPlan("FROM test | MV_EXPAND first_name | SAMPLE 0.1 42"))
7837+
);
7838+
assertThat(e.getMessage(), equalTo("Found 1 problem\nline 1:47: Seed not supported when sampling can't be pushed down to Lucene"));
7839+
}
7840+
78257841
@SuppressWarnings("SameParameterValue")
78267842
private static void assertFilterCondition(
78277843
Filter filter,
@@ -8005,7 +8021,7 @@ private PhysicalPlan physicalPlan(String query, TestDataSource dataSource, boole
80058021
var logical = logicalOptimizer.optimize(dataSource.analyzer.analyze(parser.createStatement(query)));
80068022
// System.out.println("Logical\n" + logical);
80078023
var physical = mapper.map(logical);
8008-
// System.out.println(physical);
8024+
// System.out.println("Physical\n" + physical);
80098025
if (assertSerialization) {
80108026
assertSerialization(physical);
80118027
}

0 commit comments

Comments
 (0)