Skip to content

Commit b333dcc

Browse files
committed
refactor
1 parent 234b196 commit b333dcc

File tree

3 files changed

+107
-39
lines changed

3 files changed

+107
-39
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximate/Approximate.java

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.xpack.esql.core.expression.Alias;
1717
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1818
import org.elasticsearch.xpack.esql.core.expression.Expression;
19+
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
1920
import org.elasticsearch.xpack.esql.core.expression.Literal;
2021
import org.elasticsearch.xpack.esql.core.expression.NameId;
2122
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
@@ -46,6 +47,7 @@
4647
import org.elasticsearch.xpack.esql.expression.predicate.nulls.IsNotNull;
4748
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Div;
4849
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
50+
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
4951
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
5052
import org.elasticsearch.xpack.esql.plan.logical.ChangePoint;
5153
import org.elasticsearch.xpack.esql.plan.logical.Dissect;
@@ -71,6 +73,9 @@
7173
import org.elasticsearch.xpack.esql.plan.logical.inference.Completion;
7274
import org.elasticsearch.xpack.esql.plan.logical.inference.Rerank;
7375
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
76+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
77+
import org.elasticsearch.xpack.esql.session.Configuration;
78+
import org.elasticsearch.xpack.esql.session.EsqlSession;
7479
import org.elasticsearch.xpack.esql.session.Result;
7580

7681
import java.util.ArrayList;
@@ -79,6 +84,7 @@
7984
import java.util.Locale;
8085
import java.util.Map;
8186
import java.util.Set;
87+
import java.util.function.Function;
8288
import java.util.stream.Collectors;
8389

8490
/**
@@ -128,10 +134,6 @@ public class Approximate {
128134

129135
public record QueryProperties(boolean hasNonCountAllAgg, boolean preservesRows) {}
130136

131-
public interface LogicalPlanRunner {
132-
void run(LogicalPlan plan, ActionListener<Result> listener);
133-
}
134-
135137
/**
136138
* These processing commands are supported.
137139
*/
@@ -236,27 +238,23 @@ public interface LogicalPlanRunner {
236238

237239
private final LogicalPlan logicalPlan;
238240
private final QueryProperties queryProperties;
239-
private final LogicalPlanRunner runner;
241+
private final EsqlSession.PlanRunner runner;
242+
private final LogicalPlanOptimizer logicalPlanOptimizer;
243+
private final Function<LogicalPlan, PhysicalPlan> toPhysicalPlan;
244+
private final Configuration configuration;
245+
private final FoldContext foldContext;
240246

241247
private long sourceRowCount;
242248

243-
public Approximate(LogicalPlan logicalPlan, LogicalPlanRunner logicalPlanRunner) {
249+
250+
public Approximate(LogicalPlan logicalPlan, LogicalPlanOptimizer logicalPlanOptimizer, Function<LogicalPlan, PhysicalPlan> toPhysicalPlan, EsqlSession.PlanRunner runner, Configuration configuration, FoldContext foldContext) {
244251
this.logicalPlan = logicalPlan;
245252
this.queryProperties = verifyPlan(logicalPlan);
246-
this.runner = logicalPlanRunner;
247-
}
248-
249-
/**
250-
* Computes approximate results for the logical plan.
251-
*/
252-
public void approximate(ActionListener<Result> listener) {
253-
if (queryProperties.hasNonCountAllAgg || queryProperties.preservesRows == false) {
254-
runner.run(sourceCountPlan(), sourceCountListener(listener));
255-
} else {
256-
// Counting all rows is fast for queries that preserve all rows, as it's returned from
257-
// Lucene's metadata. Approximation would only slow things down in this case.
258-
runner.run(logicalPlan, listener);
259-
}
253+
this.logicalPlanOptimizer = logicalPlanOptimizer;
254+
this.toPhysicalPlan = toPhysicalPlan;
255+
this.runner = runner;
256+
this.configuration = configuration;
257+
this.foldContext = foldContext;
260258
}
261259

262260
/**
@@ -331,6 +329,19 @@ public static QueryProperties verifyPlan(LogicalPlan logicalPlan) throws Verific
331329
return new QueryProperties(hasNonCountAllAgg.get(), preservesRows.get());
332330
}
333331

332+
/**
333+
* Computes approximate results for the logical plan.
334+
*/
335+
public void approximate(ActionListener<Result> listener) {
336+
if (queryProperties.hasNonCountAllAgg || queryProperties.preservesRows == false) {
337+
runner.run(toPhysicalPlan.apply(sourceCountPlan()), configuration, foldContext, sourceCountListener(listener));
338+
} else {
339+
// Counting all rows is fast for queries that preserve all rows, as it's returned from
340+
// Lucene's metadata. Approximation would only slow things down in this case.
341+
runner.run(toPhysicalPlan.apply(logicalPlan), configuration, foldContext, listener);
342+
}
343+
}
344+
334345
/**
335346
* Plan that counts the number of rows in the source index.
336347
* This is the ES|QL query:
@@ -351,7 +362,7 @@ private LogicalPlan sourceCountPlan() {
351362
});
352363

353364
sourceCountPlan.setPreOptimized();
354-
return sourceCountPlan;
365+
return logicalPlanOptimizer.optimize(sourceCountPlan);
355366
}
356367

357368
/**
@@ -365,9 +376,9 @@ private ActionListener<Result> sourceCountListener(ActionListener<Result> listen
365376
logger.debug("sourceCountPlan result: {} rows", sourceRowCount);
366377
double sampleProbability = sourceRowCount <= SAMPLE_ROW_COUNT ? 1.0 : (double) SAMPLE_ROW_COUNT / sourceRowCount;
367378
if (queryProperties.preservesRows || sampleProbability == 1.0) {
368-
runner.run(approximatePlan(sampleProbability), listener);
379+
runner.run(toPhysicalPlan.apply(approximatePlan(sampleProbability)), configuration, foldContext, listener);
369380
} else {
370-
runner.run(countPlan(sampleProbability), countListener(sampleProbability, listener));
381+
runner.run(toPhysicalPlan.apply(countPlan(sampleProbability)), configuration, foldContext, countListener(sampleProbability, listener));
371382
}
372383
});
373384
}
@@ -405,7 +416,7 @@ private LogicalPlan countPlan(double sampleProbability) {
405416
});
406417

407418
countPlan.setPreOptimized();
408-
return countPlan;
419+
return logicalPlanOptimizer.optimize(countPlan);
409420
}
410421

411422
/**
@@ -420,9 +431,9 @@ private ActionListener<Result> countListener(double sampleProbability, ActionLis
420431
logger.debug("countPlan result (p={}): {} rows", sampleProbability, rowCount);
421432
double newSampleProbability = sampleProbability * SAMPLE_ROW_COUNT / Math.max(1, rowCount);
422433
if (rowCount <= SAMPLE_ROW_COUNT / 2 && newSampleProbability < 1.0) {
423-
runner.run(countPlan(newSampleProbability), countListener(newSampleProbability, listener));
434+
runner.run(toPhysicalPlan.apply(countPlan(newSampleProbability)), configuration, foldContext, countListener(newSampleProbability, listener));
424435
} else {
425-
runner.run(approximatePlan(newSampleProbability), listener);
436+
runner.run(toPhysicalPlan.apply(approximatePlan(newSampleProbability)), configuration, foldContext, listener);
426437
}
427438
});
428439
}
@@ -711,7 +722,7 @@ private LogicalPlan approximatePlan(double sampleProbability) {
711722

712723
logger.debug("approximate plan:\n{}", approximatePlan);
713724

714-
return approximatePlan;
725+
return logicalPlanOptimizer.optimize(approximatePlan);
715726
}
716727

717728
/**

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.xpack.esql.core.expression.Attribute;
5252
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
5353
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
54+
import org.elasticsearch.xpack.esql.core.expression.function.Function;
5455
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
5556
import org.elasticsearch.xpack.esql.core.tree.Source;
5657
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -205,7 +206,8 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
205206
System.nanoTime(),
206207
request.allowPartialResults(),
207208
clusterSettings.timeseriesResultTruncationMaxSize(),
208-
clusterSettings.timeseriesResultTruncationDefaultSize()
209+
clusterSettings.timeseriesResultTruncationDefaultSize(),
210+
true
209211
);
210212
FoldContext foldContext = configuration.newFoldContext();
211213

@@ -355,13 +357,14 @@ private void executeSubPlans(
355357
ActionListener.runAfter(listener, executionInfo::finishSubPlans)
356358
);
357359
} else if (request.approximate()) {
358-
Approximate.LogicalPlanRunner logicalPlanRunner = (p, l) -> runner.run(
359-
logicalPlanToPhysicalPlan(optimizedPlan(p, logicalPlanOptimizer), request, physicalPlanOptimizer),
360+
new Approximate(
361+
optimizedPlan,
362+
logicalPlanOptimizer,
363+
p -> logicalPlanToPhysicalPlan(optimizedPlan(p, logicalPlanOptimizer), request, physicalPlanOptimizer),
364+
runner,
360365
configuration,
361-
foldContext,
362-
l
363-
);
364-
new Approximate(optimizedPlan, logicalPlanRunner).approximate(listener);
366+
foldContext
367+
).approximate(listener);
365368
} else {
366369
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request, physicalPlanOptimizer);
367370
// execute main plan

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximate/ApproximateTests.java

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,19 @@
1919
import org.elasticsearch.compute.data.Page;
2020
import org.elasticsearch.compute.test.MockBlockFactory;
2121
import org.elasticsearch.test.ESTestCase;
22+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
2223
import org.elasticsearch.xpack.esql.VerificationException;
2324
import org.elasticsearch.xpack.esql.analysis.AnalyzerTestUtils;
2425
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
2526
import org.elasticsearch.xpack.esql.core.expression.Literal;
2627
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
28+
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
2729
import org.elasticsearch.xpack.esql.core.tree.Source;
2830
import org.elasticsearch.xpack.esql.expression.Foldables;
31+
import org.elasticsearch.xpack.esql.expression.function.scalar.approximate.ConfidenceInterval;
2932
import org.elasticsearch.xpack.esql.inference.InferenceService;
33+
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
34+
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
3035
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanPreOptimizer;
3136
import org.elasticsearch.xpack.esql.optimizer.LogicalPreOptimizerContext;
3237
import org.elasticsearch.xpack.esql.parser.EsqlParser;
@@ -35,15 +40,25 @@
3540
import org.elasticsearch.xpack.esql.plan.logical.Filter;
3641
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
3742
import org.elasticsearch.xpack.esql.plan.logical.Sample;
43+
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
44+
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
45+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
46+
import org.elasticsearch.xpack.esql.session.Configuration;
47+
import org.elasticsearch.xpack.esql.session.EsqlSession;
3848
import org.elasticsearch.xpack.esql.session.Result;
3949
import org.hamcrest.Description;
4050
import org.hamcrest.Matcher;
4151
import org.hamcrest.TypeSafeMatcher;
4252

4353
import java.util.ArrayList;
54+
import java.util.Arrays;
55+
import java.util.HashMap;
4456
import java.util.List;
57+
import java.util.Map;
58+
import java.util.function.Function;
4559
import java.util.function.Predicate;
4660

61+
import static java.lang.Double.NaN;
4762
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
4863
import static org.hamcrest.CoreMatchers.allOf;
4964
import static org.hamcrest.CoreMatchers.not;
@@ -73,13 +88,19 @@ public class ApproximateTests extends ESTestCase {
7388
* sampling in the query, the returned number of rows is multiplied by
7489
* the sampling probability.
7590
* <p>
76-
* The runner collects all its invocations.
91+
* The runner also provides the LogicalPlan to PhysicalPlan conversion,
92+
* but it does not return a realistic PhysicalPlan. When running a
93+
* PhysicalPlan is invoked, it maps it back to the original LogicalPlan,
94+
* because LogicalPlans are easier to analyze in tests.
95+
* <p>
96+
* The runner collects the LogicalPlans of its invocations.
7797
*/
78-
private static class TestRunner implements Approximate.LogicalPlanRunner {
98+
private static class TestRunner implements Function<LogicalPlan, PhysicalPlan>, EsqlSession.PlanRunner {
7999

80100
private final long totalRows;
81101
private final long filteredRows;
82102
private final List<LogicalPlan> invocations;
103+
private final Map<PhysicalPlan, LogicalPlan> toLogicalPlan = new HashMap<>();
83104

84105
static ActionListener<Result> resultCloser = ActionListener.wrap(result -> result.pages().getFirst().close(), e -> {});
85106

@@ -90,7 +111,20 @@ private static class TestRunner implements Approximate.LogicalPlanRunner {
90111
}
91112

92113
@Override
93-
public void run(LogicalPlan logicalPlan, ActionListener<Result> listener) {
114+
public PhysicalPlan apply(LogicalPlan logicalPlan) {
115+
// Return a dummy PhysicalPlan that can be mapped back to the LogicalPlan.
116+
PhysicalPlan physicalPlan = new LocalSourceExec(
117+
Source.EMPTY,
118+
List.of(new ReferenceAttribute(Source.EMPTY, null, "id", null)),
119+
EmptyLocalSupplier.EMPTY
120+
);
121+
toLogicalPlan.put(physicalPlan, logicalPlan);
122+
return physicalPlan;
123+
}
124+
125+
@Override
126+
public void run(PhysicalPlan physicalPlan, Configuration configuration, FoldContext foldContext, ActionListener<Result> listener) {
127+
LogicalPlan logicalPlan = toLogicalPlan.get(physicalPlan);
94128
invocations.add(logicalPlan);
95129
List<LogicalPlan> filters = logicalPlan.collect(plan -> plan instanceof Filter);
96130
long numResults = filters.isEmpty() ? totalRows : filteredRows;
@@ -101,6 +135,7 @@ public void run(LogicalPlan logicalPlan, ActionListener<Result> listener) {
101135
LongBlock block = blockFactory.newConstantLongBlockWith(numResults, 1);
102136
listener.onResponse(new Result(null, List.of(new Page(block)), null, null));
103137
}
138+
104139
}
105140

106141
@Override
@@ -358,8 +393,15 @@ private void verify(String query) throws Exception {
358393
Approximate.verifyPlan(getLogicalPlan(query));
359394
}
360395

361-
private Approximate createApproximate(String query, Approximate.LogicalPlanRunner runner) throws Exception {
362-
return new Approximate(getLogicalPlan(query), runner);
396+
private Approximate createApproximate(String query, TestRunner runner) throws Exception {
397+
return new Approximate(
398+
getLogicalPlan(query),
399+
new LogicalPlanOptimizer(new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), EsqlTestUtils.randomMinimumVersion())),
400+
runner,
401+
runner,
402+
EsqlTestUtils.TEST_CFG,
403+
FoldContext.small()
404+
);
363405
}
364406

365407
private LogicalPlan getLogicalPlan(String query) throws Exception {
@@ -374,4 +416,16 @@ private LogicalPlan getLogicalPlan(String query) throws Exception {
374416
}
375417
return resultHolder.get();
376418
}
419+
420+
public void test() {
421+
double bestEstimate = 17600.0;
422+
double[] estimates = new double[] {
423+
NaN, NaN, NaN, 93768.0, NaN, NaN, NaN, 93916.0, NaN, NaN, NaN, NaN, NaN, NaN, 93916.0, NaN,
424+
93916.0, NaN, NaN, NaN, NaN, NaN, 93768.0, NaN, NaN, NaN, NaN, NaN, NaN, 93916.0, NaN, NaN,
425+
93916.0, NaN, NaN, NaN, NaN, NaN, NaN, NaN, 93768.0, 93916.0, NaN, NaN, NaN, NaN, NaN, NaN
426+
};
427+
int trialCount=3;
428+
int bucketCount=16;
429+
System.out.println(Arrays.toString(ConfidenceInterval.computeConfidenceInterval(bestEstimate, estimates, trialCount, bucketCount, 0.9)));
430+
}
377431
}

0 commit comments

Comments
 (0)