Skip to content

Commit 8d38e64

Browse files
committed
error on seed in sampling operator
1 parent 4ba3655 commit 8d38e64

File tree

7 files changed

+91
-19
lines changed

7 files changed

+91
-19
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.capabilities;
9+
10+
import org.elasticsearch.xpack.esql.common.Failures;
11+
12+
/**
13+
* Interface implemented by expressions that require validation post physical optimization.
14+
*/
15+
public interface PostPhysicalOptimizationVerificationAware {
16+
17+
/**
18+
* Validates the implementing expression - discovered failures are reported to the given
19+
* {@link Failures} class.
20+
*/
21+
void postPhysicalOptimizationVerification(Failures failures);
22+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.elasticsearch.xpack.esql.VerificationException;
11-
import org.elasticsearch.xpack.esql.common.Failure;
11+
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.EnableSpatialDistancePushdown;
1313
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.InsertFieldExtraction;
1414
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource;
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.xpack.esql.rule.Rule;
2525

2626
import java.util.ArrayList;
27-
import java.util.Collection;
2827
import java.util.List;
2928

3029
/**
@@ -46,8 +45,8 @@ public PhysicalPlan localOptimize(PhysicalPlan plan) {
4645
}
4746

4847
PhysicalPlan verify(PhysicalPlan plan) {
49-
Collection<Failure> failures = verifier.verify(plan);
50-
if (failures.isEmpty() == false) {
48+
Failures failures = verifier.verify(plan);
49+
if (failures.hasFailures()) {
5150
throw new VerificationException(failures);
5251
}
5352
return plan;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,13 @@
88
package org.elasticsearch.xpack.esql.optimizer;
99

1010
import org.elasticsearch.xpack.esql.VerificationException;
11-
import org.elasticsearch.xpack.esql.common.Failure;
11+
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
1313
import org.elasticsearch.xpack.esql.plan.physical.FragmentExec;
1414
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1515
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
1616
import org.elasticsearch.xpack.esql.rule.RuleExecutor;
1717

18-
import java.util.Collection;
1918
import java.util.List;
2019

2120
/**
@@ -39,8 +38,8 @@ public PhysicalPlan optimize(PhysicalPlan plan) {
3938
}
4039

4140
PhysicalPlan verify(PhysicalPlan plan) {
42-
Collection<Failure> failures = verifier.verify(plan);
43-
if (failures.isEmpty() == false) {
41+
Failures failures = verifier.verify(plan);
42+
if (failures.hasFailures()) {
4443
throw new VerificationException(failures);
4544
}
4645
return plan;

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/PhysicalVerifier.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.optimizer;
99

10-
import org.elasticsearch.xpack.esql.common.Failure;
10+
import org.elasticsearch.xpack.esql.capabilities.PostPhysicalOptimizationVerificationAware;
1111
import org.elasticsearch.xpack.esql.common.Failures;
1212
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1313
import org.elasticsearch.xpack.esql.core.expression.Expressions;
@@ -17,10 +17,6 @@
1717
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
1818
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
1919

20-
import java.util.Collection;
21-
import java.util.LinkedHashSet;
22-
import java.util.Set;
23-
2420
import static org.elasticsearch.xpack.esql.common.Failure.fail;
2521

2622
/** Physical plan verifier. */
@@ -31,8 +27,8 @@ public final class PhysicalVerifier {
3127
private PhysicalVerifier() {}
3228

3329
/** Verifies the physical plan. */
34-
public Collection<Failure> verify(PhysicalPlan plan) {
35-
Set<Failure> failures = new LinkedHashSet<>();
30+
public Failures verify(PhysicalPlan plan) {
31+
Failures failures = new Failures();
3632
Failures depFailures = new Failures();
3733

3834
// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
@@ -56,6 +52,17 @@ public Collection<Failure> verify(PhysicalPlan plan) {
5652
}
5753
}
5854
PlanConsistencyChecker.checkPlan(p, depFailures);
55+
56+
if (failures.hasFailures() == false) {
57+
if (p instanceof PostPhysicalOptimizationVerificationAware va) {
58+
va.postPhysicalOptimizationVerification(failures);
59+
}
60+
p.forEachExpression(ex -> {
61+
if (ex instanceof PostPhysicalOptimizationVerificationAware va) {
62+
va.postPhysicalOptimizationVerification(failures);
63+
}
64+
});
65+
}
5966
});
6067

6168
if (depFailures.hasFailures()) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/SampleExec.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.core.Nullable;
14+
import org.elasticsearch.xpack.esql.capabilities.PostPhysicalOptimizationVerificationAware;
15+
import org.elasticsearch.xpack.esql.common.Failures;
1416
import org.elasticsearch.xpack.esql.core.expression.Expression;
1517
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
1618
import org.elasticsearch.xpack.esql.core.tree.Source;
@@ -19,7 +21,9 @@
1921
import java.io.IOException;
2022
import java.util.Objects;
2123

22-
public class SampleExec extends UnaryExec {
24+
import static org.elasticsearch.xpack.esql.common.Failure.fail;
25+
26+
public class SampleExec extends UnaryExec implements PostPhysicalOptimizationVerificationAware {
2327
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
2428
PhysicalPlan.class,
2529
"SampleExec",
@@ -96,4 +100,15 @@ public boolean equals(Object obj) {
96100

97101
return Objects.equals(child(), other.child()) && Objects.equals(probability, other.probability) && Objects.equals(seed, other.seed);
98102
}
103+
104+
@Override
105+
public void postPhysicalOptimizationVerification(Failures failures) {
106+
// It's currently impossible in ES|QL to handle all data in deterministic order, therefore
107+
// a fixed random seed in the sample operator doesn't work as intended and is disallowed.
108+
// TODO: fix this.
109+
if (seed != null) {
110+
// TODO: what should the error message here be? This doesn't seem right.
111+
failures.add(fail(seed, "Seed not supported when sampling can't be pushed down to Lucene"));
112+
}
113+
}
99114
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7732,6 +7732,8 @@ public void testPruneRedundantOrderBy() {
77327732
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
77337733
*/
77347734
public void testSampleMerged() {
7735+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7736+
77357737
var query = """
77367738
FROM TEST
77377739
| SAMPLE .3 5
@@ -7752,6 +7754,8 @@ public void testSampleMerged() {
77527754
}
77537755

77547756
public void testSamplePushDown() {
7757+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7758+
77557759
for (var command : List.of(
77567760
"ENRICH languages_idx on first_name",
77577761
"EVAL x = 1",
@@ -7776,6 +7780,8 @@ public void testSamplePushDown() {
77767780
}
77777781

77787782
public void testSamplePushDown_sort() {
7783+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7784+
77797785
var query = "FROM TEST | WHERE emp_no > 0 | SAMPLE 0.5 | LIMIT 100";
77807786
var optimized = optimizedPlan(query);
77817787

@@ -7789,6 +7795,8 @@ public void testSamplePushDown_sort() {
77897795
}
77907796

77917797
public void testSamplePushDown_where() {
7798+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7799+
77927800
var query = "FROM TEST | SORT emp_no | SAMPLE 0.5 | LIMIT 100";
77937801
var optimized = optimizedPlan(query);
77947802

@@ -7801,6 +7809,8 @@ public void testSamplePushDown_where() {
78017809
}
78027810

78037811
public void testSampleNoPushDown() {
7812+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7813+
78047814
for (var command : List.of("LIMIT 100", "MV_EXPAND languages", "STATS COUNT()")) {
78057815
var query = "FROM TEST | " + command + " | SAMPLE .5";
78067816
var optimized = optimizedPlan(query);
@@ -7820,7 +7830,9 @@ public void testSampleNoPushDown() {
78207830
* | \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
78217831
* \_EsRelation[languages_lookup][LOOKUP][language_code{f}#17, language_name{f}#18]
78227832
*/
7823-
public void testRandomSampleNoPushDownLookupJoin() {
7833+
public void testSampleNoPushDownLookupJoin() {
7834+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7835+
78247836
var query = """
78257837
FROM TEST
78267838
| EVAL language_code = emp_no
@@ -7845,6 +7857,8 @@ public void testRandomSampleNoPushDownLookupJoin() {
78457857
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
78467858
*/
78477859
public void testSampleNoPushDownChangePoint() {
7860+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7861+
78487862
var query = """
78497863
FROM TEST
78507864
| CHANGE_POINT emp_no ON hire_date

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7802,7 +7802,9 @@ public void testReductionPlanForAggs() {
78027802
* query[{"bool":{"filter":[{"sampling":{"probability":0.1,"seed":234,"hash":0}}],"boost":1.0}}]
78037803
* [_doc{f}#24], limit[1000], sort[] estimatedRowSize[332]
78047804
*/
7805-
public void testRandomSamplePushDown() {
7805+
public void testSamplePushDown() {
7806+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7807+
78067808
var plan = physicalPlan("""
78077809
FROM test
78087810
| SAMPLE +0.1 -234
@@ -7823,6 +7825,20 @@ public void testRandomSamplePushDown() {
78237825
assertThat(randomSampling.hash(), equalTo(0));
78247826
}
78257827

7828+
public void testSample_seedNotSupportedInOperator() {
7829+
assumeTrue("sample must be enabled", EsqlCapabilities.Cap.SAMPLE.isEnabled());
7830+
7831+
optimizedPlan(physicalPlan("FROM test | SAMPLE 0.1"));
7832+
optimizedPlan(physicalPlan("FROM test | SAMPLE 0.1 42"));
7833+
optimizedPlan(physicalPlan("FROM test | MV_EXPAND first_name | SAMPLE 0.1"));
7834+
7835+
VerificationException e = expectThrows(
7836+
VerificationException.class,
7837+
() -> optimizedPlan(physicalPlan("FROM test | MV_EXPAND first_name | SAMPLE 0.1 42"))
7838+
);
7839+
assertThat(e.getMessage(), equalTo("Found 1 problem\nline 1:47: Seed not supported when sampling can't be pushed down to Lucene"));
7840+
}
7841+
78267842
@SuppressWarnings("SameParameterValue")
78277843
private static void assertFilterCondition(
78287844
Filter filter,
@@ -8006,7 +8022,7 @@ private PhysicalPlan physicalPlan(String query, TestDataSource dataSource, boole
80068022
var logical = logicalOptimizer.optimize(dataSource.analyzer.analyze(parser.createStatement(query)));
80078023
// System.out.println("Logical\n" + logical);
80088024
var physical = mapper.map(logical);
8009-
// System.out.println(physical);
8025+
// System.out.println("Physical\n" + physical);
80108026
if (assertSerialization) {
80118027
assertSerialization(physical);
80128028
}

0 commit comments

Comments
 (0)