Skip to content

Commit 1b9d35f

Browse files
committed
refactor a bit
1 parent dd092aa commit 1b9d35f

File tree

2 files changed

+41
-28
lines changed

2 files changed

+41
-28
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximate/Approximate.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.esql.approximate;
99

10+
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.compute.data.LongBlock;
1112
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
1213
import org.elasticsearch.xpack.esql.core.expression.Alias;
@@ -36,6 +37,10 @@
3637

3738
public class Approximate {
3839

40+
public interface LogicalPlanRunner {
41+
void run(LogicalPlan plan, ActionListener<Result> listener);
42+
}
43+
3944
private static final Set<Class<? extends LogicalPlan>> SWAPPABLE_WITH_SAMPLE = Set.of(
4045
Dissect.class,
4146
Drop.class,
@@ -58,12 +63,29 @@ public Approximate(LogicalPlan logicalPlan) {
5863
verifyPlan();
5964
}
6065

66+
/**
67+
* Computes approximate results for the given logical plan.
68+
*
69+
* This works by first executing a plan that counts the number of rows
70+
* getting to the aggregation. That count is used to compute a sample
71+
* probability, which is then used to sample approximately 1000 rows
72+
* to aggregate over and approximate the aggregation.
73+
*/
74+
public void approximate(LogicalPlanRunner runner, ActionListener<Result> listener) {
75+
runner.run(
76+
countPlan(),
77+
listener.delegateFailureAndWrap(
78+
(countListener, countResult) -> runner.run(approximatePlan(sampleProbability(countResult)), listener)
79+
)
80+
);
81+
}
82+
6183
/**
6284
* Verifies that a plan is suitable for approximation.
6385
*
6486
* To be so, the plan must contain at least one STATS function, and all
6587
* functions between the source and the leftmost STATS function must be
66-
* swappable with STATS.
88+
* swappable with SAMPLE.
6789
*
6890
* In that case, the STATS can be replaced by SAMPLE, STATS with sample
6991
* correction terms, and the SAMPLE can be moved to the source and
@@ -101,7 +123,7 @@ private void verifyPlan() {
101123
* off at the leftmost STATS function, followed by "| STATS COUNT(*)".
102124
* This value can be used to pick a good sample probability.
103125
*/
104-
public LogicalPlan countPlan() {
126+
private LogicalPlan countPlan() {
105127
Holder<Boolean> encounteredStats = new Holder<>(false);
106128
LogicalPlan countPlan = logicalPlan.transformUp(plan -> {
107129
if (plan instanceof LeafPlan) {
@@ -126,21 +148,20 @@ public LogicalPlan countPlan() {
126148
return countPlan;
127149
}
128150

151+
/**
152+
* Returns a sample probability based on the total number of rows.
153+
*/
154+
private double sampleProbability(Result countResult) {
155+
long rowCount = ((LongBlock) (countResult.pages().getFirst().getBlock(0))).getLong(0);
156+
return rowCount <= SAMPLE_ROW_COUNT ? 1.0 : (double) SAMPLE_ROW_COUNT / rowCount;
157+
}
158+
129159
/**
130160
* Returns a plan that approximates the original plan. It consists of the
131161
* original plan, with the leftmost STATS function replaced by:
132162
* "SAMPLE probability | STATS sample_corrected_aggs".
133-
*
134-
* The sample probability is based on the total row count that would reach
135-
* the STATS function, which is obtained by executing the countPlan.
136163
*/
137-
public LogicalPlan approximatePlan(Result countResult) {
138-
long rowCount = ((LongBlock) (countResult.pages().getFirst().getBlock(0))).getLong(0);
139-
if (rowCount <= SAMPLE_ROW_COUNT) {
140-
return logicalPlan;
141-
}
142-
double sampleProbability = (double) SAMPLE_ROW_COUNT / rowCount;
143-
164+
private LogicalPlan approximatePlan(double sampleProbability) {
144165
Holder<Boolean> encounteredStats = new Holder<>(false);
145166
LogicalPlan approximatePlan = logicalPlan.transformUp(plan -> {
146167
if (plan instanceof LeafPlan) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -242,23 +242,15 @@ private void executeSubPlans(
242242
if (subPlan != null) {
243243
// code-path to execute subplans
244244
executeSubPlan(new DriverCompletionInfo.Accumulator(), optimizedPlan, subPlan, executionInfo, runner, request, listener);
245+
} else if (request.approximate()) {
246+
new Approximate(optimizedPlan).approximate(
247+
(p, l) -> runner.run(logicalPlanToPhysicalPlan(optimizedPlan(p), request), l),
248+
listener
249+
);
245250
} else {
246-
if (request.approximate()) {
247-
Approximate approximate = new Approximate(optimizedPlan);
248-
runner.run(
249-
logicalPlanToPhysicalPlan(optimizedPlan(approximate.countPlan()), request),
250-
listener.delegateFailureAndWrap(
251-
(countListener, countResult) -> runner.run(
252-
logicalPlanToPhysicalPlan(optimizedPlan(approximate.approximatePlan(countResult)), request),
253-
listener
254-
)
255-
)
256-
);
257-
} else {
258-
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
259-
// execute main plan
260-
runner.run(physicalPlan, listener);
261-
}
251+
PhysicalPlan physicalPlan = logicalPlanToPhysicalPlan(optimizedPlan, request);
252+
// execute main plan
253+
runner.run(physicalPlan, listener);
262254
}
263255
}
264256

0 commit comments

Comments
 (0)