Skip to content

Commit 36a55ec

Browse files
committed
"Fix" JOIN/FORK/INLINESTATS
1 parent a197833 commit 36a55ec

File tree

1 file changed

+28
-9
lines changed
  • x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximate

1 file changed

+28
-9
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximate/Approximate.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import org.elasticsearch.xpack.esql.plan.logical.Drop;
2525
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2626
import org.elasticsearch.xpack.esql.plan.logical.Filter;
27+
import org.elasticsearch.xpack.esql.plan.logical.Fork;
2728
import org.elasticsearch.xpack.esql.plan.logical.Grok;
2829
import org.elasticsearch.xpack.esql.plan.logical.Keep;
2930
import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
3031
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3132
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
3233
import org.elasticsearch.xpack.esql.plan.logical.Rename;
3334
import org.elasticsearch.xpack.esql.plan.logical.Sample;
35+
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
3436
import org.elasticsearch.xpack.esql.session.Result;
3537

3638
import java.util.List;
@@ -74,6 +76,9 @@ public interface LogicalPlanRunner {
7476
void run(LogicalPlan plan, ActionListener<Result> listener);
7577
}
7678

79+
/**
80+
* Commands that map one row to one row. These commands can be swapped with {@code SAMPLE}.
81+
*/
7782
private static final Set<Class<? extends LogicalPlan>> ONE_TO_ONE_COMMANDS = Set.of(
7883
Dissect.class,
7984
Drop.class,
@@ -84,8 +89,16 @@ public interface LogicalPlanRunner {
8489
Rename.class
8590
);
8691

92+
/**
93+
* Commands that map one row to one or zero rows. These commands can be swapped with {@code SAMPLE}.
94+
*/
8795
private static final Set<Class<? extends LogicalPlan>> FILTER_COMMANDS = Set.of(Filter.class, Sample.class);
8896

97+
/**
98+
* Commands that cannot be used anywhere in an approximated query.
99+
*/
100+
private static final Set<Class<? extends LogicalPlan>> INCOMPATIBLE_COMMANDS = Set.of(Fork.class, Join.class);
101+
89102
// TODO: find a good default value, or alternative ways of setting it
90103
private static final int SAMPLE_ROW_COUNT = 1000;
91104

@@ -115,23 +128,32 @@ private boolean verifyPlan() {
115128
if (logicalPlan.preOptimized() == false) {
116129
throw new IllegalStateException("Expected pre-optimized plan");
117130
}
118-
119131
if (logicalPlan.anyMatch(plan -> plan instanceof Aggregate) == false) {
120-
throw new InvalidArgumentException("query without [STATS] function cannot be approximated");
132+
throw new InvalidArgumentException("query without [STATS] command cannot be approximated");
121133
}
134+
logicalPlan.forEachUp(plan -> {
135+
if (INCOMPATIBLE_COMMANDS.contains(plan.getClass())) {
136+
throw new InvalidArgumentException(
137+
"query with [" + plan.nodeName().toUpperCase(Locale.ROOT) + "] command cannot be approximated"
138+
);
139+
}
140+
});
122141

123142
Holder<Boolean> encounteredStats = new Holder<>(false);
124143
Holder<Boolean> hasFilters = new Holder<>(false);
125144
logicalPlan.transformUp(plan -> {
126-
// TODO: check/fix for JOIN / FORK / INLINESTATS / ...
127-
if (plan instanceof LeafPlan) {
145+
if (INCOMPATIBLE_COMMANDS.contains(plan.getClass())) {
146+
throw new InvalidArgumentException(
147+
"query with [" + plan.nodeName().toUpperCase(Locale.ROOT) + "] command cannot be approximated"
148+
);
149+
} else if (plan instanceof LeafPlan) {
128150
encounteredStats.set(false);
129151
} else if (encounteredStats.get() == false) {
130152
if (plan instanceof Aggregate) {
131153
encounteredStats.set(true);
132154
} else if (ONE_TO_ONE_COMMANDS.contains(plan.getClass()) == false && FILTER_COMMANDS.contains(plan.getClass()) == false) {
133155
throw new InvalidArgumentException(
134-
"query with [" + plan.nodeName().toUpperCase(Locale.ROOT) + "] before [STATS] function cannot be approximated"
156+
"query with [" + plan.nodeName().toUpperCase(Locale.ROOT) + "] before [STATS] command cannot be approximated"
135157
);
136158
} else if (FILTER_COMMANDS.contains(plan.getClass())) {
137159
hasFilters.set(true);
@@ -149,7 +171,6 @@ private boolean verifyPlan() {
149171
*/
150172
private LogicalPlan sourceCountPlan() {
151173
LogicalPlan sourceCountPlan = logicalPlan.transformUp(plan -> {
152-
// TODO: check/fix for JOIN / FORK / INLINESTATS / ...
153174
if (plan instanceof LeafPlan) {
154175
plan = new Aggregate(
155176
Source.EMPTY,
@@ -192,7 +213,6 @@ private ActionListener<Result> sourceCountListener(LogicalPlanRunner runner, Act
192213
private LogicalPlan countPlan(double sampleProbability) {
193214
Holder<Boolean> encounteredStats = new Holder<>(false);
194215
LogicalPlan countPlan = logicalPlan.transformUp(plan -> {
195-
// TODO: check/fix for JOIN / FORK / INLINESTATS / ...
196216
if (plan instanceof LeafPlan) {
197217
encounteredStats.set(false);
198218
} else if (encounteredStats.get() == false) {
@@ -226,7 +246,7 @@ private LogicalPlan countPlan(double sampleProbability) {
226246
private ActionListener<Result> countListener(LogicalPlanRunner runner, double probability, ActionListener<Result> listener) {
227247
return listener.delegateFailureAndWrap((countListener, countResult) -> {
228248
long rowCount = rowCount(countResult);
229-
logger.debug("countPlan result (p={}):{} rows", probability, rowCount);
249+
logger.debug("countPlan result (p={}): {} rows", probability, rowCount);
230250
double newProbability = probability * SAMPLE_ROW_COUNT / Math.max(1, rowCount);
231251
if (rowCount <= SAMPLE_ROW_COUNT / 2 && newProbability < 1.0) {
232252
runner.run(countPlan(newProbability), countListener(runner, newProbability, listener));
@@ -264,7 +284,6 @@ private LogicalPlan approximatePlan(double sampleProbability) {
264284
logger.debug("generating approximate plan (p={})", sampleProbability);
265285
Holder<Boolean> encounteredStats = new Holder<>(false);
266286
LogicalPlan approximatePlan = logicalPlan.transformUp(plan -> {
267-
// TODO: check/fix for JOIN / FORK / INLINESTATS / ...
268287
if (plan instanceof LeafPlan) {
269288
encounteredStats.set(false);
270289
} else if (encounteredStats.get() == false) {

0 commit comments

Comments
 (0)