77
88package org .elasticsearch .xpack .esql .approximate ;
99
10+ import org .elasticsearch .action .ActionListener ;
1011import org .elasticsearch .compute .data .LongBlock ;
1112import org .elasticsearch .xpack .esql .core .InvalidArgumentException ;
1213import org .elasticsearch .xpack .esql .core .expression .Alias ;
3637
3738public class Approximate {
3839
40+ public interface LogicalPlanRunner {
41+ void run (LogicalPlan plan , ActionListener <Result > listener );
42+ }
43+
3944 private static final Set <Class <? extends LogicalPlan >> SWAPPABLE_WITH_SAMPLE = Set .of (
4045 Dissect .class ,
4146 Drop .class ,
@@ -58,12 +63,29 @@ public Approximate(LogicalPlan logicalPlan) {
5863 verifyPlan ();
5964 }
6065
66+ /**
67+ * Computes approximate results for the given logical plan.
68+ *
69+ * This works by first executing a plan that counts the number of rows
70+ * getting to the aggregation. That count is used to compute a sample
71+ * probability, which is then used to sample approximately 1000 rows
72+ * to aggregate over and approximate the aggregation.
73+ */
74+ public void approximate (LogicalPlanRunner runner , ActionListener <Result > listener ) {
75+ runner .run (
76+ countPlan (),
77+ listener .delegateFailureAndWrap (
78+ (countListener , countResult ) -> runner .run (approximatePlan (sampleProbability (countResult )), listener )
79+ )
80+ );
81+ }
82+
6183 /**
6284 * Verifies that a plan is suitable for approximation.
6385 *
6486 * To be so, the plan must contain at least one STATS function, and all
6587 * functions between the source and the leftmost STATS function must be
66- * swappable with STATS .
88+ * swappable with SAMPLE .
6789 *
6890 * In that case, the STATS can be replaced by SAMPLE, STATS with sample
6991 * correction terms, and the SAMPLE can be moved to the source and
@@ -101,7 +123,7 @@ private void verifyPlan() {
101123 * off at the leftmost STATS function, followed by "| STATS COUNT(*)".
102124 * This value can be used to pick a good sample probability.
103125 */
104- public LogicalPlan countPlan () {
126+ private LogicalPlan countPlan () {
105127 Holder <Boolean > encounteredStats = new Holder <>(false );
106128 LogicalPlan countPlan = logicalPlan .transformUp (plan -> {
107129 if (plan instanceof LeafPlan ) {
@@ -126,21 +148,20 @@ public LogicalPlan countPlan() {
126148 return countPlan ;
127149 }
128150
151+ /**
152+ * Returns a sample probability based on the total number of rows.
153+ */
154+ private double sampleProbability (Result countResult ) {
155+ long rowCount = ((LongBlock ) (countResult .pages ().getFirst ().getBlock (0 ))).getLong (0 );
156+ return rowCount <= SAMPLE_ROW_COUNT ? 1.0 : (double ) SAMPLE_ROW_COUNT / rowCount ;
157+ }
158+
129159 /**
130160 * Returns a plan that approximates the original plan. It consists of the
131161 * original plan, with the leftmost STATS function replaced by:
132162 * "SAMPLE probability | STATS sample_corrected_aggs".
133- *
134- * The sample probability is based on the total row count that would reach
135- * the STATS function, which is obtained by executing the countPlan.
136163 */
137- public LogicalPlan approximatePlan (Result countResult ) {
138- long rowCount = ((LongBlock ) (countResult .pages ().getFirst ().getBlock (0 ))).getLong (0 );
139- if (rowCount <= SAMPLE_ROW_COUNT ) {
140- return logicalPlan ;
141- }
142- double sampleProbability = (double ) SAMPLE_ROW_COUNT / rowCount ;
143-
164+ private LogicalPlan approximatePlan (double sampleProbability ) {
144165 Holder <Boolean > encounteredStats = new Holder <>(false );
145166 LogicalPlan approximatePlan = logicalPlan .transformUp (plan -> {
146167 if (plan instanceof LeafPlan ) {
0 commit comments