Skip to content

Commit e305d70

Browse files
committed
Add different SAMPLE_ROW_COUNT_FOR_COUNT_ESTIMATION
1 parent 2ab1ee7 commit e305d70

File tree

2 files changed

+56
-20
lines changed

2 files changed

+56
-20
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximate/Approximate.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,16 @@ public record QueryProperties(boolean canDecreaseRowCount, boolean canIncreaseRo
237237
*/
238238
private static final Set<Class<? extends EsqlScalarFunction>> MULTIVALUED_OUTPUT_FUNCTIONS = Set.of(MvAppend.class);
239239

240+
/**
241+
* The target number of sampled rows to estimate the total number of rows
242+
* reaching the STATS function. This is only relevant when there are commands
243+
* that can change the number of rows. This value leads to an error of about
244+
* 1% in the estimated count.
245+
*/
246+
private static final int SAMPLE_ROW_COUNT_FOR_COUNT_ESTIMATION = 10_000;
247+
240248
// TODO: set via a query setting; and find a good default value
241-
private static final int SAMPLE_ROW_COUNT = 100000;
249+
private static final int SAMPLE_ROW_COUNT = 100_000;
242250

243251
// TODO: set via a query setting
244252
private static final double CONFIDENCE_LEVEL = 0.90;
@@ -447,16 +455,22 @@ private ActionListener<Result> sourceCountListener(ActionListener<Result> listen
447455
return listener.delegateFailureAndWrap((countListener, countResult) -> {
448456
sourceRowCount = rowCount(countResult);
449457
logger.debug("sourceCountPlan result: {} rows", sourceRowCount);
450-
double sampleProbability = sourceRowCount <= SAMPLE_ROW_COUNT ? 1.0 : (double) SAMPLE_ROW_COUNT / sourceRowCount;
451-
if (queryProperties.canIncreaseRowCount == false && sampleProbability > SAMPLE_PROBABILITY_THRESHOLD) {
452-
// If the query cannot increase the number of rows, and the sample probability is large,
453-
// we can directly run the original query without sampling.
458+
if (sourceRowCount == 0) {
459+
// If there are no rows, run the original query.
454460
runner.run(toPhysicalPlan.apply(logicalPlan), configuration, foldContext, listener);
455-
} else if (queryProperties.canIncreaseRowCount == false && queryProperties.canDecreaseRowCount == false) {
461+
return;
462+
}
463+
double sampleProbability = Math.min(1.0, (double) SAMPLE_ROW_COUNT / sourceRowCount);
464+
if (queryProperties.canIncreaseRowCount == false && queryProperties.canDecreaseRowCount == false) {
456465
// If the query preserves all rows, we can directly approximate with the sample probability.
457466
runner.run(toPhysicalPlan.apply(approximatePlan(sampleProbability)), configuration, foldContext, listener);
467+
} else if (queryProperties.canIncreaseRowCount == false && sampleProbability > SAMPLE_PROBABILITY_THRESHOLD) {
468+
// If the query cannot increase the number of rows, and the sample probability is large,
469+
// we can directly run the original query without sampling.
470+
runner.run(toPhysicalPlan.apply(logicalPlan), configuration, foldContext, listener);
458471
} else {
459472
// Otherwise, we need to sample the number of rows first to obtain a good sample probability.
473+
sampleProbability = Math.min(1.0, (double) SAMPLE_ROW_COUNT_FOR_COUNT_ESTIMATION / sourceRowCount);
460474
runner.run(
461475
toPhysicalPlan.apply(countPlan(sampleProbability)),
462476
configuration,
@@ -515,15 +529,20 @@ private ActionListener<Result> countListener(double sampleProbability, ActionLis
515529
return listener.delegateFailureAndWrap((countListener, countResult) -> {
516530
long rowCount = rowCount(countResult);
517531
logger.debug("countPlan result (p={}): {} rows", sampleProbability, rowCount);
518-
double newSampleProbability = Math.min(1.0, sampleProbability * SAMPLE_ROW_COUNT / Math.max(1, rowCount));
519-
if (rowCount <= SAMPLE_ROW_COUNT / 2 && newSampleProbability < SAMPLE_PROBABILITY_THRESHOLD) {
532+
if (rowCount <= SAMPLE_ROW_COUNT_FOR_COUNT_ESTIMATION / 2 && sampleProbability < 1.0) {
533+
// Not enough rows are sampled yet; increase the sample probability and try again.
534+
double newSampleProbability = Math.min(
535+
1.0,
536+
sampleProbability * SAMPLE_ROW_COUNT_FOR_COUNT_ESTIMATION / Math.max(1, rowCount)
537+
);
520538
runner.run(
521539
toPhysicalPlan.apply(countPlan(newSampleProbability)),
522540
configuration,
523541
foldContext,
524542
countListener(newSampleProbability, listener)
525543
);
526544
} else {
545+
double newSampleProbability = Math.min(1.0, sampleProbability * SAMPLE_ROW_COUNT / rowCount);
527546
runner.run(toPhysicalPlan.apply(approximatePlan(newSampleProbability)), configuration, foldContext, listener);
528547
}
529548
});

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximate/ApproximateTests.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,20 @@ public void testVerify_incompatibleAggregation() {
224224
);
225225
}
226226

227+
public void testCountPlan_noData() throws Exception {
228+
TestRunner runner = new TestRunner(0, 0);
229+
Approximate approximate = createApproximate("FROM test | STATS SUM(emp_no)", runner);
230+
approximate.approximate(TestRunner.resultCloser);
231+
// This plan needs three passes:
232+
// - one pass to check whether it's an ES stats query (always no for the test runner)
233+
// - one pass to get the total number of rows (which is zero)
234+
// - one pass to execute the original query
235+
assertThat(runner.invocations, hasSize(3));
236+
assertThat(runner.invocations.get(0), allOf(not(hasSample())));
237+
assertThat(runner.invocations.get(1), allOf(not(hasSample())));
238+
assertThat(runner.invocations.get(2), allOf(not(hasSample())));
239+
}
240+
227241
public void testCountPlan_largeDataNoFilters() throws Exception {
228242
TestRunner runner = new TestRunner(1_000_000_000, 1_000_000_000);
229243
Approximate approximate = createApproximate("FROM test | STATS SUM(emp_no)", runner);
@@ -264,8 +278,8 @@ public void testCountPlan_largeDataAfterFiltering() throws Exception {
264278
assertThat(runner.invocations, hasSize(5));
265279
assertThat(runner.invocations.get(0), allOf(hasFilter("emp_no"), not(hasSample())));
266280
assertThat(runner.invocations.get(1), allOf(not(hasFilter("emp_no")), not(hasSample())));
267-
assertThat(runner.invocations.get(2), allOf(hasFilter("emp_no"), hasSample(1e-7)));
268-
assertThat(runner.invocations.get(3), allOf(hasFilter("emp_no"), hasSample(1e-4)));
281+
assertThat(runner.invocations.get(2), allOf(hasFilter("emp_no"), hasSample(1e-8)));
282+
assertThat(runner.invocations.get(3), allOf(hasFilter("emp_no"), hasSample(1e-5)));
269283
assertThat(runner.invocations.get(4), allOf(hasFilter("emp_no"), hasSample(1e-4)));
270284
}
271285

@@ -278,13 +292,15 @@ public void testCountPlan_smallDataAfterFiltering() throws Exception {
278292
// - one pass to get the total number of rows
279293
// - three passes to get the number of filtered rows (which is small)
280294
// - one pass to execute the original query
281-
assertThat(runner.invocations, hasSize(6));
295+
assertThat(runner.invocations, hasSize(8));
282296
assertThat(runner.invocations.get(0), allOf(hasFilter("emp_no"), not(hasSample())));
283297
assertThat(runner.invocations.get(1), allOf(not(hasFilter("emp_no")), not(hasSample())));
284-
assertThat(runner.invocations.get(2), allOf(hasFilter("emp_no"), hasSample(1e-13)));
285-
assertThat(runner.invocations.get(3), allOf(hasFilter("emp_no"), hasSample(1e-8)));
286-
assertThat(runner.invocations.get(4), allOf(hasFilter("emp_no"), hasSample(1e-3)));
287-
assertThat(runner.invocations.get(5), allOf(hasFilter("emp_no"), not(hasSample())));
298+
assertThat(runner.invocations.get(2), allOf(hasFilter("emp_no"), hasSample(1e-14)));
299+
assertThat(runner.invocations.get(3), allOf(hasFilter("emp_no"), hasSample(1e-10)));
300+
assertThat(runner.invocations.get(4), allOf(hasFilter("emp_no"), hasSample(1e-6)));
301+
assertThat(runner.invocations.get(5), allOf(hasFilter("emp_no"), hasSample(1e-2)));
302+
assertThat(runner.invocations.get(6), allOf(hasFilter("emp_no"), not(hasSample())));
303+
assertThat(runner.invocations.get(7), allOf(hasFilter("emp_no"), not(hasSample())));
288304
}
289305

290306
public void testCountPlan_smallDataBeforeFiltering() throws Exception {
@@ -345,7 +361,7 @@ public void testCountPlan_largeDataBeforeMvExpanding() throws Exception {
345361
assertThat(runner.invocations, hasSize(4));
346362
assertThat(runner.invocations.get(0), allOf(hasMvExpand("emp_no")));
347363
assertThat(runner.invocations.get(1), allOf(not(hasMvExpand("emp_no")), not(hasSample())));
348-
assertThat(runner.invocations.get(2), allOf(hasMvExpand("emp_no"), hasSample(1e-4)));
364+
assertThat(runner.invocations.get(2), allOf(hasMvExpand("emp_no"), hasSample(1e-5)));
349365
assertThat(runner.invocations.get(3), allOf(hasMvExpand("emp_no"), hasSample(1e-7)));
350366
}
351367

@@ -372,12 +388,13 @@ public void testCountPlan_sampleProbabilityThreshold_withFilter() throws Excepti
372388
// - one pass to get the total number of rows
373389
// - two passes to get the number of filtered rows (which determines the sample probability)
374390
// - one pass to execute the original query (because the sample probability is 50%)
375-
assertThat(runner.invocations, hasSize(5));
391+
assertThat(runner.invocations, hasSize(6));
376392
assertThat(runner.invocations.get(0), allOf(not(hasSample()), hasFilter("emp_no")));
377393
assertThat(runner.invocations.get(1), allOf(not(hasSample()), not(hasFilter("emp_no"))));
378-
assertThat(runner.invocations.get(2), allOf(hasSample(1e-7), hasFilter("emp_no")));
379-
assertThat(runner.invocations.get(3), allOf(hasSample(1e-2), hasFilter("emp_no")));
380-
assertThat(runner.invocations.get(4), allOf(not(hasSample()), hasFilter("emp_no")));
394+
assertThat(runner.invocations.get(2), allOf(hasSample(1e-8), hasFilter("emp_no")));
395+
assertThat(runner.invocations.get(3), allOf(hasSample(1e-4), hasFilter("emp_no")));
396+
assertThat(runner.invocations.get(4), allOf(hasSample(0.05), hasFilter("emp_no")));
397+
assertThat(runner.invocations.get(5), allOf(not(hasSample()), hasFilter("emp_no")));
381398
}
382399

383400
public void testApproximatePlan_createsConfidenceInterval_withoutGrouping() throws Exception {

0 commit comments

Comments
 (0)