77
88package org .elasticsearch .xpack .esql .approximate ;
99
10+ import org .apache .logging .log4j .LogManager ;
11+ import org .apache .logging .log4j .Logger ;
1012import org .elasticsearch .action .ActionListener ;
1113import org .elasticsearch .compute .data .LongBlock ;
1214import org .elasticsearch .xpack .esql .core .InvalidArgumentException ;
3537import java .util .Locale ;
3638import java .util .Set ;
3739
40+ /**
41+ * This class computes approximate and fast results for certain classes of
42+ * ES|QL queries.
43+ * <p>
44+ * A query is suitable for approximation if it contains at least one
45+ * {@code STATS} command, and all commands between the source and the leftmost
46+ * {@code STATS} command can be swapped with {@code SAMPLE}. A command can be
47+ * swapped with {@code SAMPLE} if it is either mapping one row to one row (e.g.
48+ * {@code EVAL} or {@code GROK}), or if it is filtering rows (e.g. {@code FILTER}
49+ * or {@code SAMPLE}). This is verified by {@link Approximate#verifyPlan}.
50+ * <p>
51+ * If this is the case, the {@code STATS} can be replaced by {@code SAMPLE} and
52+ * a {@code STATS} with sample correction terms, and the {@code SAMPLE} can be
53+ * moved to the source and executed inside Lucene. This new logical plan is
54+ * generated by {@link Approximate#approximatePlan}.
55+ * <p>
56+ * To compute the appropriate sample probability, first a target number of rows
57+ * is set. For now this is a fixed number ({@link Approximate#SAMPLE_ROW_COUNT}).
58+ * <p>
59+ * Next, the total number of rows in the source index is counted via the plan
60+ * {@link Approximate#sourceCountPlan}. This plan should execute fast. When
61+ * there are no filter commands, the sample probability can be directly
62+ * computed as a ratio of the target number of rows and this total number.
63+ * <p>
64+ * In the presence of filters commands, another step is needed. The initial
65+ * sample probability is set to the ratio above and the number of rows is
66+ * sampled with the plan {@link Approximate#countPlan}. As long as the sampled
67+ * number of rows is smaller than intended, the probability is scaled up until
68+ * a good probability is reached. This final probability is then used for
69+ * approximating the original plan.
70+ */
3871public class Approximate {
3972
4073 public interface LogicalPlanRunner {
4174 void run (LogicalPlan plan , ActionListener <Result > listener );
4275 }
4376
44- private static final Set <Class <? extends LogicalPlan >> SWAPPABLE_WITH_SAMPLE = Set .of (
77+ private static final Set <Class <? extends LogicalPlan >> ONE_TO_ONE_COMMANDS = Set .of (
4578 Dissect .class ,
4679 Drop .class ,
4780 Eval .class ,
48- Filter .class ,
4981 Grok .class ,
5082 Keep .class ,
5183 OrderBy .class ,
52- Rename .class ,
53- Sample .class
84+ Rename .class
5485 );
5586
56- // TODO: not a good value
87+ private static final Set <Class <? extends LogicalPlan >> FILTER_COMMANDS = Set .of (Filter .class , Sample .class );
88+
89+ // TODO: find a good default value, or alternative ways of setting it
5790 private static final int SAMPLE_ROW_COUNT = 1000 ;
5891
92+ private static final Logger logger = LogManager .getLogger (Approximate .class );
93+
5994 private final LogicalPlan logicalPlan ;
95+ private final boolean hasFilters ;
6096
6197 public Approximate (LogicalPlan logicalPlan ) {
6298 this .logicalPlan = logicalPlan ;
63- verifyPlan ();
99+ this . hasFilters = verifyPlan ();
64100 }
65101
66102 /**
67- * Computes approximate results for the given logical plan.
68- *
69- * This works by first executing a plan that counts the number of rows
70- * getting to the aggregation. That count is used to compute a sample
71- * probability, which is then used to sample approximately 1000 rows
72- * to aggregate over and approximate the aggregation.
103+ * Computes approximate results for the logical plan.
73104 */
74105 public void approximate (LogicalPlanRunner runner , ActionListener <Result > listener ) {
75- runner .run (
76- countPlan (),
77- listener .delegateFailureAndWrap (
78- (countListener , countResult ) -> runner .run (approximatePlan (sampleProbability (countResult )), listener )
79- )
80- );
106+ runner .run (sourceCountPlan (), sourceCountListener (runner , listener ));
81107 }
82108
83109 /**
84110 * Verifies that a plan is suitable for approximation.
85111 *
86- * To be so, the plan must contain at least one STATS function, and all
87- * functions between the source and the leftmost STATS function must be
88- * swappable with SAMPLE.
89- *
90- * In that case, the STATS can be replaced by SAMPLE, STATS with sample
91- * correction terms, and the SAMPLE can be moved to the source and
92- * executed inside Lucene.
112+ * @return whether the plan contains filters commands
93113 */
94- private void verifyPlan () {
114+ private boolean verifyPlan () {
95115 if (logicalPlan .preOptimized () == false ) {
96116 throw new IllegalStateException ("Expected pre-optimized plan" );
97117 }
@@ -101,39 +121,88 @@ private void verifyPlan() {
101121 }
102122
103123 Holder <Boolean > encounteredStats = new Holder <>(false );
124+ Holder <Boolean > hasFilters = new Holder <>(false );
104125 logicalPlan .transformUp (plan -> {
126+ // TODO: check/fix for JOIN / FORK / INLINESTATS / ...
105127 if (plan instanceof LeafPlan ) {
106128 encounteredStats .set (false );
107129 } else if (encounteredStats .get () == false ) {
108130 if (plan instanceof Aggregate ) {
109131 encounteredStats .set (true );
110- } else if (SWAPPABLE_WITH_SAMPLE .contains (plan .getClass ()) == false ) {
132+ } else if (ONE_TO_ONE_COMMANDS . contains ( plan . getClass ()) == false && FILTER_COMMANDS .contains (plan .getClass ()) == false ) {
111133 throw new InvalidArgumentException (
112134 "query with [" + plan .nodeName ().toUpperCase (Locale .ROOT ) + "] before [STATS] function cannot be approximated"
113135 );
136+ } else if (FILTER_COMMANDS .contains (plan .getClass ())) {
137+ hasFilters .set (true );
114138 }
115139 }
116140 return plan ;
117141 });
142+
143+ return hasFilters .get ();
144+ }
145+
146+ /**
147+ * Plan that counts the number of rows in the source index.
148+ * This is the ES|QL query {@code FROM index | STATS COUNT(*)}.
149+ */
150+ private LogicalPlan sourceCountPlan () {
151+ LogicalPlan sourceCountPlan = logicalPlan .transformUp (plan -> {
152+ // TODO: check/fix for JOIN / FORK / INLINESTATS / ...
153+ if (plan instanceof LeafPlan ) {
154+ plan = new Aggregate (
155+ Source .EMPTY ,
156+ plan ,
157+ List .of (),
158+ List .of (new Alias (Source .EMPTY , "approximate-count" , new Count (Source .EMPTY , Literal .keyword (Source .EMPTY , "*" ))))
159+ );
160+ } else {
161+ plan = plan .children ().getFirst ();
162+ }
163+ return plan ;
164+ });
165+
166+ sourceCountPlan .setPreOptimized ();
167+ return sourceCountPlan ;
168+ }
169+
170+ /**
171+ * Receives the total number of rows, and runs either the
172+ * {@link Approximate#approximatePlan} or {@link Approximate#countPlan}
173+ * depending on whether filter commands are present.
174+ */
175+ private ActionListener <Result > sourceCountListener (LogicalPlanRunner runner , ActionListener <Result > listener ) {
176+ return listener .delegateFailureAndWrap ((countListener , countResult ) -> {
177+ logger .debug ("sourceCountPlan result: {} rows" , rowCount (countResult ));
178+ double sampleProbability = sampleProbability (countResult );
179+ if (hasFilters ) {
180+ runner .run (countPlan (sampleProbability ), countListener (runner , sampleProbability , listener ));
181+ } else {
182+ runner .run (approximatePlan (sampleProbability ), listener );
183+ }
184+ });
118185 }
119186
120187 /**
121- * Returns a plan that counts the number of rows of the original plan that
122- * would reach the leftmost STATS function. So it's the original plan cut
123- * off at the leftmost STATS function, followed by "| STATS COUNT(*)".
124- * This value can be used to pick a good sample probability.
188+ * Plan that counts the number of rows reaching the leftmost STATS function.
189+ * This is number is approximated to speed up the query execution.
190+ * This is the ES|QL query {@code FROM index | (...) | SAMPLE p | STATS COUNT(*) / p}.
125191 */
126- private LogicalPlan countPlan () {
192+ private LogicalPlan countPlan (double sampleProbability ) {
127193 Holder <Boolean > encounteredStats = new Holder <>(false );
128194 LogicalPlan countPlan = logicalPlan .transformUp (plan -> {
195+ // TODO: check/fix for JOIN / FORK / INLINESTATS / ...
129196 if (plan instanceof LeafPlan ) {
130197 encounteredStats .set (false );
131198 } else if (encounteredStats .get () == false ) {
132199 if (plan instanceof Aggregate aggregate ) {
133200 encounteredStats .set (true );
201+ Expression sampleProbabilityExpr = new Literal (Source .EMPTY , sampleProbability , DataType .DOUBLE );
202+ Sample sample = new Sample (Source .EMPTY , sampleProbabilityExpr , aggregate .child ());
134203 plan = new Aggregate (
135204 Source .EMPTY ,
136- aggregate . child () ,
205+ sample ,
137206 List .of (),
138207 List .of (new Alias (Source .EMPTY , "approximate-count" , new Count (Source .EMPTY , Literal .keyword (Source .EMPTY , "*" ))))
139208 );
@@ -148,22 +217,54 @@ private LogicalPlan countPlan() {
148217 return countPlan ;
149218 }
150219
220+ /**
221+ * Receives the sampled number of rows reaching the leftmost STATS function.
222+ * Runs either the {@link Approximate#approximatePlan} or a next iteration
223+ * {@link Approximate#countPlan} depending on whether the current count is
224+ * sufficient.
225+ */
226+ private ActionListener <Result > countListener (LogicalPlanRunner runner , double probability , ActionListener <Result > listener ) {
227+ return listener .delegateFailureAndWrap ((countListener , countResult ) -> {
228+ long rowCount = rowCount (countResult );
229+ logger .debug ("countPlan result (p={}):{} rows" , probability , rowCount );
230+ double newProbability = probability * SAMPLE_ROW_COUNT / Math .max (1 , rowCount );
231+ if (rowCount <= SAMPLE_ROW_COUNT / 2 && newProbability < 1.0 ) {
232+ runner .run (countPlan (newProbability ), countListener (runner , newProbability , listener ));
233+ } else {
234+ runner .run (approximatePlan (newProbability ), listener );
235+ }
236+ });
237+ }
238+
151239 /**
152240 * Returns a sample probability based on the total number of rows.
153241 */
154242 private double sampleProbability (Result countResult ) {
155- long rowCount = (( LongBlock ) ( countResult . pages (). getFirst (). getBlock ( 0 ))). getLong ( 0 );
243+ long rowCount = rowCount ( countResult );
156244 return rowCount <= SAMPLE_ROW_COUNT ? 1.0 : (double ) SAMPLE_ROW_COUNT / rowCount ;
157245 }
158246
247+ /**
248+ * Returns the row count in the result.
249+ */
250+ private long rowCount (Result countResult ) {
251+ return ((LongBlock ) (countResult .pages ().getFirst ().getBlock (0 ))).getLong (0 );
252+ }
253+
159254 /**
160255 * Returns a plan that approximates the original plan. It consists of the
161256 * original plan, with the leftmost STATS function replaced by:
162257 * "SAMPLE probability | STATS sample_corrected_aggs".
163258 */
164259 private LogicalPlan approximatePlan (double sampleProbability ) {
260+ if (sampleProbability >= 1.0 ) {
261+ logger .debug ("using original plan (too few rows)" );
262+ return logicalPlan ;
263+ }
264+ logger .debug ("generating approximate plan (p={})" , sampleProbability );
165265 Holder <Boolean > encounteredStats = new Holder <>(false );
166266 LogicalPlan approximatePlan = logicalPlan .transformUp (plan -> {
267+ // TODO: check/fix for JOIN / FORK / INLINESTATS / ...
167268 if (plan instanceof LeafPlan ) {
168269 encounteredStats .set (false );
169270 } else if (encounteredStats .get () == false ) {
0 commit comments