Skip to content

Commit 319d98d

Browse files
committed
iteratively get sample probability
1 parent 1b9d35f commit 319d98d

File tree

2 files changed

+136
-34
lines changed

2 files changed

+136
-34
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
4848
private boolean includeCCSMetadata;
4949
private Locale locale;
5050
private QueryBuilder filter;
51+
// TODO: discuss how to wire the approximation functionality in the API
5152
private boolean approximate;
5253
private QueryPragmas pragmas = new QueryPragmas(Settings.EMPTY);
5354
private QueryParams params = new QueryParams();

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximate/Approximate.java

Lines changed: 135 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.esql.approximate;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.compute.data.LongBlock;
1214
import org.elasticsearch.xpack.esql.core.InvalidArgumentException;
@@ -35,63 +37,81 @@
3537
import java.util.Locale;
3638
import java.util.Set;
3739

40+
/**
41+
* This class computes approximate and fast results for certain classes of
42+
* ES|QL queries.
43+
* <p>
44+
* A query is suitable for approximation if it contains at least one
45+
* {@code STATS} command, and all commands between the source and the leftmost
46+
* {@code STATS} command can be swapped with {@code SAMPLE}. A command can be
47+
* swapped with {@code SAMPLE} if it is either mapping one row to one row (e.g.
48+
* {@code EVAL} or {@code GROK}), or if it is filtering rows (e.g. {@code FILTER}
49+
* or {@code SAMPLE}). This is verified by {@link Approximate#verifyPlan}.
50+
* <p>
51+
* If this is the case, the {@code STATS} can be replaced by {@code SAMPLE} and
52+
* a {@code STATS} with sample correction terms, and the {@code SAMPLE} can be
53+
* moved to the source and executed inside Lucene. This new logical plan is
54+
* generated by {@link Approximate#approximatePlan}.
55+
* <p>
56+
* To compute the appropriate sample probability, first a target number of rows
57+
* is set. For now this is a fixed number ({@link Approximate#SAMPLE_ROW_COUNT}).
58+
* <p>
59+
* Next, the total number of rows in the source index is counted via the plan
60+
* {@link Approximate#sourceCountPlan}. This plan should execute fast. When
61+
* there are no filter commands, the sample probability can be directly
62+
* computed as a ratio of the target number of rows and this total number.
63+
* <p>
64+
* In the presence of filters commands, another step is needed. The initial
65+
* sample probability is set to the ratio above and the number of rows is
66+
* sampled with the plan {@link Approximate#countPlan}. As long as the sampled
67+
* number of rows is smaller than intended, the probability is scaled up until
68+
* a good probability is reached. This final probability is then used for
69+
* approximating the original plan.
70+
*/
3871
public class Approximate {
3972

4073
public interface LogicalPlanRunner {
4174
void run(LogicalPlan plan, ActionListener<Result> listener);
4275
}
4376

44-
private static final Set<Class<? extends LogicalPlan>> SWAPPABLE_WITH_SAMPLE = Set.of(
77+
private static final Set<Class<? extends LogicalPlan>> ONE_TO_ONE_COMMANDS = Set.of(
4578
Dissect.class,
4679
Drop.class,
4780
Eval.class,
48-
Filter.class,
4981
Grok.class,
5082
Keep.class,
5183
OrderBy.class,
52-
Rename.class,
53-
Sample.class
84+
Rename.class
5485
);
5586

56-
// TODO: not a good value
87+
private static final Set<Class<? extends LogicalPlan>> FILTER_COMMANDS = Set.of(Filter.class, Sample.class);
88+
89+
// TODO: find a good default value, or alternative ways of setting it
5790
private static final int SAMPLE_ROW_COUNT = 1000;
5891

92+
private static final Logger logger = LogManager.getLogger(Approximate.class);
93+
5994
private final LogicalPlan logicalPlan;
95+
private final boolean hasFilters;
6096

6197
public Approximate(LogicalPlan logicalPlan) {
6298
this.logicalPlan = logicalPlan;
63-
verifyPlan();
99+
this.hasFilters = verifyPlan();
64100
}
65101

66102
/**
67-
* Computes approximate results for the given logical plan.
68-
*
69-
* This works by first executing a plan that counts the number of rows
70-
* getting to the aggregation. That count is used to compute a sample
71-
* probability, which is then used to sample approximately 1000 rows
72-
* to aggregate over and approximate the aggregation.
103+
* Computes approximate results for the logical plan.
73104
*/
74105
public void approximate(LogicalPlanRunner runner, ActionListener<Result> listener) {
75-
runner.run(
76-
countPlan(),
77-
listener.delegateFailureAndWrap(
78-
(countListener, countResult) -> runner.run(approximatePlan(sampleProbability(countResult)), listener)
79-
)
80-
);
106+
runner.run(sourceCountPlan(), sourceCountListener(runner, listener));
81107
}
82108

83109
/**
84110
* Verifies that a plan is suitable for approximation.
85111
*
86-
* To be so, the plan must contain at least one STATS function, and all
87-
* functions between the source and the leftmost STATS function must be
88-
* swappable with SAMPLE.
89-
*
90-
* In that case, the STATS can be replaced by SAMPLE, STATS with sample
91-
* correction terms, and the SAMPLE can be moved to the source and
92-
* executed inside Lucene.
112+
* @return whether the plan contains filters commands
93113
*/
94-
private void verifyPlan() {
114+
private boolean verifyPlan() {
95115
if (logicalPlan.preOptimized() == false) {
96116
throw new IllegalStateException("Expected pre-optimized plan");
97117
}
@@ -101,39 +121,88 @@ private void verifyPlan() {
101121
}
102122

103123
Holder<Boolean> encounteredStats = new Holder<>(false);
124+
Holder<Boolean> hasFilters = new Holder<>(false);
104125
logicalPlan.transformUp(plan -> {
126+
// TODO: check/fix for JOIN / FORK / INLINESTATS / ...
105127
if (plan instanceof LeafPlan) {
106128
encounteredStats.set(false);
107129
} else if (encounteredStats.get() == false) {
108130
if (plan instanceof Aggregate) {
109131
encounteredStats.set(true);
110-
} else if (SWAPPABLE_WITH_SAMPLE.contains(plan.getClass()) == false) {
132+
} else if (ONE_TO_ONE_COMMANDS.contains(plan.getClass()) == false && FILTER_COMMANDS.contains(plan.getClass()) == false) {
111133
throw new InvalidArgumentException(
112134
"query with [" + plan.nodeName().toUpperCase(Locale.ROOT) + "] before [STATS] function cannot be approximated"
113135
);
136+
} else if (FILTER_COMMANDS.contains(plan.getClass())) {
137+
hasFilters.set(true);
114138
}
115139
}
116140
return plan;
117141
});
142+
143+
return hasFilters.get();
144+
}
145+
146+
/**
147+
* Plan that counts the number of rows in the source index.
148+
* This is the ES|QL query {@code FROM index | STATS COUNT(*)}.
149+
*/
150+
private LogicalPlan sourceCountPlan() {
151+
LogicalPlan sourceCountPlan = logicalPlan.transformUp(plan -> {
152+
// TODO: check/fix for JOIN / FORK / INLINESTATS / ...
153+
if (plan instanceof LeafPlan) {
154+
plan = new Aggregate(
155+
Source.EMPTY,
156+
plan,
157+
List.of(),
158+
List.of(new Alias(Source.EMPTY, "approximate-count", new Count(Source.EMPTY, Literal.keyword(Source.EMPTY, "*"))))
159+
);
160+
} else {
161+
plan = plan.children().getFirst();
162+
}
163+
return plan;
164+
});
165+
166+
sourceCountPlan.setPreOptimized();
167+
return sourceCountPlan;
168+
}
169+
170+
/**
171+
* Receives the total number of rows, and runs either the
172+
* {@link Approximate#approximatePlan} or {@link Approximate#countPlan}
173+
* depending on whether filter commands are present.
174+
*/
175+
private ActionListener<Result> sourceCountListener(LogicalPlanRunner runner, ActionListener<Result> listener) {
176+
return listener.delegateFailureAndWrap((countListener, countResult) -> {
177+
logger.debug("sourceCountPlan result: {} rows", rowCount(countResult));
178+
double sampleProbability = sampleProbability(countResult);
179+
if (hasFilters) {
180+
runner.run(countPlan(sampleProbability), countListener(runner, sampleProbability, listener));
181+
} else {
182+
runner.run(approximatePlan(sampleProbability), listener);
183+
}
184+
});
118185
}
119186

120187
/**
121-
* Returns a plan that counts the number of rows of the original plan that
122-
* would reach the leftmost STATS function. So it's the original plan cut
123-
* off at the leftmost STATS function, followed by "| STATS COUNT(*)".
124-
* This value can be used to pick a good sample probability.
188+
* Plan that counts the number of rows reaching the leftmost STATS function.
189+
* This is number is approximated to speed up the query execution.
190+
* This is the ES|QL query {@code FROM index | (...) | SAMPLE p | STATS COUNT(*) / p}.
125191
*/
126-
private LogicalPlan countPlan() {
192+
private LogicalPlan countPlan(double sampleProbability) {
127193
Holder<Boolean> encounteredStats = new Holder<>(false);
128194
LogicalPlan countPlan = logicalPlan.transformUp(plan -> {
195+
// TODO: check/fix for JOIN / FORK / INLINESTATS / ...
129196
if (plan instanceof LeafPlan) {
130197
encounteredStats.set(false);
131198
} else if (encounteredStats.get() == false) {
132199
if (plan instanceof Aggregate aggregate) {
133200
encounteredStats.set(true);
201+
Expression sampleProbabilityExpr = new Literal(Source.EMPTY, sampleProbability, DataType.DOUBLE);
202+
Sample sample = new Sample(Source.EMPTY, sampleProbabilityExpr, aggregate.child());
134203
plan = new Aggregate(
135204
Source.EMPTY,
136-
aggregate.child(),
205+
sample,
137206
List.of(),
138207
List.of(new Alias(Source.EMPTY, "approximate-count", new Count(Source.EMPTY, Literal.keyword(Source.EMPTY, "*"))))
139208
);
@@ -148,22 +217,54 @@ private LogicalPlan countPlan() {
148217
return countPlan;
149218
}
150219

220+
/**
221+
* Receives the sampled number of rows reaching the leftmost STATS function.
222+
* Runs either the {@link Approximate#approximatePlan} or a next iteration
223+
* {@link Approximate#countPlan} depending on whether the current count is
224+
* sufficient.
225+
*/
226+
private ActionListener<Result> countListener(LogicalPlanRunner runner, double probability, ActionListener<Result> listener) {
227+
return listener.delegateFailureAndWrap((countListener, countResult) -> {
228+
long rowCount = rowCount(countResult);
229+
logger.debug("countPlan result (p={}):{} rows", probability, rowCount);
230+
double newProbability = probability * SAMPLE_ROW_COUNT / Math.max(1, rowCount);
231+
if (rowCount <= SAMPLE_ROW_COUNT / 2 && newProbability < 1.0) {
232+
runner.run(countPlan(newProbability), countListener(runner, newProbability, listener));
233+
} else {
234+
runner.run(approximatePlan(newProbability), listener);
235+
}
236+
});
237+
}
238+
151239
/**
152240
* Returns a sample probability based on the total number of rows.
153241
*/
154242
private double sampleProbability(Result countResult) {
155-
long rowCount = ((LongBlock) (countResult.pages().getFirst().getBlock(0))).getLong(0);
243+
long rowCount = rowCount(countResult);
156244
return rowCount <= SAMPLE_ROW_COUNT ? 1.0 : (double) SAMPLE_ROW_COUNT / rowCount;
157245
}
158246

247+
/**
248+
* Returns the row count in the result.
249+
*/
250+
private long rowCount(Result countResult) {
251+
return ((LongBlock) (countResult.pages().getFirst().getBlock(0))).getLong(0);
252+
}
253+
159254
/**
160255
* Returns a plan that approximates the original plan. It consists of the
161256
* original plan, with the leftmost STATS function replaced by:
162257
* "SAMPLE probability | STATS sample_corrected_aggs".
163258
*/
164259
private LogicalPlan approximatePlan(double sampleProbability) {
260+
if (sampleProbability >= 1.0) {
261+
logger.debug("using original plan (too few rows)");
262+
return logicalPlan;
263+
}
264+
logger.debug("generating approximate plan (p={})", sampleProbability);
165265
Holder<Boolean> encounteredStats = new Holder<>(false);
166266
LogicalPlan approximatePlan = logicalPlan.transformUp(plan -> {
267+
// TODO: check/fix for JOIN / FORK / INLINESTATS / ...
167268
if (plan instanceof LeafPlan) {
168269
encounteredStats.set(false);
169270
} else if (encounteredStats.get() == false) {

0 commit comments

Comments
 (0)