Skip to content

Commit 392fc8e

Browse files
committed
don't approximate stats queries
1 parent 5caca3c commit 392fc8e

File tree

5 files changed

+149
-42
lines changed

5 files changed

+149
-42
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/approximate/Approximate.java

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@
132132
*/
133133
public class Approximate {
134134

135-
public record QueryProperties(boolean hasNonCountAllAgg, boolean preservesRows) {}
135+
public record QueryProperties(boolean preservesRows) {}
136136

137137
/**
138138
* These processing commands are supported.
@@ -246,8 +246,14 @@ public record QueryProperties(boolean hasNonCountAllAgg, boolean preservesRows)
246246

247247
private long sourceRowCount;
248248

249-
250-
public Approximate(LogicalPlan logicalPlan, LogicalPlanOptimizer logicalPlanOptimizer, Function<LogicalPlan, PhysicalPlan> toPhysicalPlan, EsqlSession.PlanRunner runner, Configuration configuration, FoldContext foldContext) {
249+
public Approximate(
250+
LogicalPlan logicalPlan,
251+
LogicalPlanOptimizer logicalPlanOptimizer,
252+
Function<LogicalPlan, PhysicalPlan> toPhysicalPlan,
253+
EsqlSession.PlanRunner runner,
254+
Configuration configuration,
255+
FoldContext foldContext
256+
) {
251257
this.logicalPlan = logicalPlan;
252258
this.queryProperties = verifyPlan(logicalPlan);
253259
this.logicalPlanOptimizer = logicalPlanOptimizer;
@@ -284,15 +290,11 @@ public static QueryProperties verifyPlan(LogicalPlan logicalPlan) throws Verific
284290
});
285291

286292
Holder<Boolean> encounteredStats = new Holder<>(false);
287-
Holder<Boolean> hasNonCountAllAgg = new Holder<>(false);
288293
Holder<Boolean> preservesRows = new Holder<>(true);
289294

290295
logicalPlan.transformUp(plan -> {
291296
if (encounteredStats.get() == false) {
292297
if (plan instanceof Aggregate aggregate) {
293-
if (aggregate.groupings().isEmpty() == false) {
294-
hasNonCountAllAgg.set(true);
295-
}
296298
// Verify that the aggregate functions are supported.
297299
encounteredStats.set(true);
298300
plan.transformExpressionsOnly(AggregateFunction.class, aggFn -> {
@@ -308,9 +310,6 @@ public static QueryProperties verifyPlan(LogicalPlan logicalPlan) throws Verific
308310
)
309311
);
310312
}
311-
if (aggFn.equals(COUNT_ALL_ROWS) == false) {
312-
hasNonCountAllAgg.set(true);
313-
}
314313
return aggFn;
315314
});
316315
} else if (plan instanceof LeafPlan == false && ROW_PRESERVING_COMMANDS.contains(plan.getClass()) == false) {
@@ -326,20 +325,52 @@ public static QueryProperties verifyPlan(LogicalPlan logicalPlan) throws Verific
326325
return plan;
327326
});
328327

329-
return new QueryProperties(hasNonCountAllAgg.get(), preservesRows.get());
328+
return new QueryProperties(preservesRows.get());
330329
}
331330

332331
/**
333332
* Computes approximate results for the logical plan.
334333
*/
335334
public void approximate(ActionListener<Result> listener) {
336-
if (queryProperties.hasNonCountAllAgg || queryProperties.preservesRows == false) {
337-
runner.run(toPhysicalPlan.apply(sourceCountPlan()), configuration, foldContext, sourceCountListener(listener));
338-
} else {
339-
// Counting all rows is fast for queries that preserve all rows, as it's returned from
340-
// Lucene's metadata. Approximation would only slow things down in this case.
341-
runner.run(toPhysicalPlan.apply(logicalPlan), configuration, foldContext, listener);
342-
}
335+
// Try to execute the query if it translates to an ES stats query. Results for
336+
// them come from Lucene's metadata and are computed fast. Approximation would
337+
// only slow things down in that case. When the query is not an ES stats query,
338+
// an exception is thrown and approximation is attempted.
339+
runner.run(
340+
toPhysicalPlan.apply(logicalPlan),
341+
configuration.throwOnNonEsStatsQuery(true),
342+
foldContext,
343+
approximateListener(listener)
344+
);
345+
}
346+
347+
private ActionListener<Result> approximateListener(ActionListener<Result> listener) {
348+
return new ActionListener<>() {
349+
@Override
350+
public void onResponse(Result result) {
351+
assert result.executionInfo() != null;
352+
boolean esStatsQueryExecuted = result.executionInfo().clusterInfo.values()
353+
.stream()
354+
.noneMatch(
355+
cluster -> cluster.getFailures().stream().anyMatch(e -> e.getCause() instanceof UnsupportedOperationException)
356+
);
357+
if (esStatsQueryExecuted) {
358+
logger.debug("not approximating stats query");
359+
listener.onResponse(result);
360+
} else {
361+
runner.run(toPhysicalPlan.apply(sourceCountPlan()), configuration, foldContext, sourceCountListener(listener));
362+
}
363+
}
364+
365+
@Override
366+
public void onFailure(Exception e) {
367+
if (e instanceof UnsupportedOperationException) {
368+
runner.run(toPhysicalPlan.apply(sourceCountPlan()), configuration, foldContext, sourceCountListener(listener));
369+
} else {
370+
listener.onFailure(e);
371+
}
372+
}
373+
};
343374
}
344375

345376
/**
@@ -378,7 +409,12 @@ private ActionListener<Result> sourceCountListener(ActionListener<Result> listen
378409
if (queryProperties.preservesRows || sampleProbability == 1.0) {
379410
runner.run(toPhysicalPlan.apply(approximatePlan(sampleProbability)), configuration, foldContext, listener);
380411
} else {
381-
runner.run(toPhysicalPlan.apply(countPlan(sampleProbability)), configuration, foldContext, countListener(sampleProbability, listener));
412+
runner.run(
413+
toPhysicalPlan.apply(countPlan(sampleProbability)),
414+
configuration,
415+
foldContext,
416+
countListener(sampleProbability, listener)
417+
);
382418
}
383419
});
384420
}
@@ -431,7 +467,12 @@ private ActionListener<Result> countListener(double sampleProbability, ActionLis
431467
logger.debug("countPlan result (p={}): {} rows", sampleProbability, rowCount);
432468
double newSampleProbability = sampleProbability * SAMPLE_ROW_COUNT / Math.max(1, rowCount);
433469
if (rowCount <= SAMPLE_ROW_COUNT / 2 && newSampleProbability < 1.0) {
434-
runner.run(toPhysicalPlan.apply(countPlan(newSampleProbability)), configuration, foldContext, countListener(newSampleProbability, listener));
470+
runner.run(
471+
toPhysicalPlan.apply(countPlan(newSampleProbability)),
472+
configuration,
473+
foldContext,
474+
countListener(newSampleProbability, listener)
475+
);
435476
} else {
436477
runner.run(toPhysicalPlan.apply(approximatePlan(newSampleProbability)), configuration, foldContext, listener);
437478
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ protected PhysicalPlan rule(AggregateExec aggregateExec, LocalPhysicalOptimizerC
6868
);
6969
}
7070
}
71+
72+
// TODO: what is a good place to do something like this?
73+
if (context.configuration().throwOnNonEsStatsQuery() && plan instanceof EsStatsQueryExec == false) {
74+
throw new UnsupportedOperationException("not executing query of type [" + plan.getClass().getSimpleName() + "]");
75+
}
76+
7177
return plan;
7278
}
7379

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public class Configuration implements Writeable {
6363
private final Map<String, Map<String, Column>> tables;
6464
private final long queryStartTimeNanos;
6565

66+
// TODO: is this Configuration a good place for this flag?
67+
private final boolean throwOnNonEsStatsQuery;
68+
6669
public Configuration(
6770
ZoneId zi,
6871
Locale locale,
@@ -78,6 +81,42 @@ public Configuration(
7881
boolean allowPartialResults,
7982
int resultTruncationMaxSizeTimeseries,
8083
int resultTruncationDefaultSizeTimeseries
84+
) {
85+
this(
86+
zi,
87+
locale,
88+
username,
89+
clusterName,
90+
pragmas,
91+
resultTruncationMaxSizeRegular,
92+
resultTruncationDefaultSizeRegular,
93+
query,
94+
profile,
95+
tables,
96+
queryStartTimeNanos,
97+
allowPartialResults,
98+
resultTruncationMaxSizeTimeseries,
99+
resultTruncationDefaultSizeTimeseries,
100+
false
101+
);
102+
}
103+
104+
private Configuration(
105+
ZoneId zi,
106+
Locale locale,
107+
String username,
108+
String clusterName,
109+
QueryPragmas pragmas,
110+
int resultTruncationMaxSizeRegular,
111+
int resultTruncationDefaultSizeRegular,
112+
String query,
113+
boolean profile,
114+
Map<String, Map<String, Column>> tables,
115+
long queryStartTimeNanos,
116+
boolean allowPartialResults,
117+
int resultTruncationMaxSizeTimeseries,
118+
int resultTruncationDefaultSizeTimeseries,
119+
boolean throwOnNonEsStatsQuery
81120
) {
82121
this.zoneId = zi.normalized();
83122
this.now = ZonedDateTime.now(Clock.tick(Clock.system(zoneId), Duration.ofNanos(1)));
@@ -95,6 +134,7 @@ public Configuration(
95134
assert tables != null;
96135
this.queryStartTimeNanos = queryStartTimeNanos;
97136
this.allowPartialResults = allowPartialResults;
137+
this.throwOnNonEsStatsQuery = throwOnNonEsStatsQuery;
98138
}
99139

100140
public Configuration(BlockStreamInput in) throws IOException {
@@ -122,6 +162,8 @@ public Configuration(BlockStreamInput in) throws IOException {
122162
this.resultTruncationMaxSizeTimeseries = this.resultTruncationMaxSizeRegular;
123163
this.resultTruncationDefaultSizeTimeseries = this.resultTruncationDefaultSizeRegular;
124164
}
165+
// TODO: TransportVersion
166+
this.throwOnNonEsStatsQuery = in.readBoolean();
125167
}
126168

127169
@Override
@@ -147,6 +189,8 @@ public void writeTo(StreamOutput out) throws IOException {
147189
out.writeVInt(resultTruncationMaxSizeTimeseries);
148190
out.writeVInt(resultTruncationDefaultSizeTimeseries);
149191
}
192+
// TODO: TransportVersion
193+
out.writeBoolean(throwOnNonEsStatsQuery);
150194
}
151195

152196
public ZoneId zoneId() {
@@ -235,7 +279,28 @@ public Configuration withoutTables() {
235279
queryStartTimeNanos,
236280
allowPartialResults,
237281
resultTruncationMaxSizeTimeseries,
238-
resultTruncationDefaultSizeTimeseries
282+
resultTruncationDefaultSizeTimeseries,
283+
throwOnNonEsStatsQuery
284+
);
285+
}
286+
287+
public Configuration throwOnNonEsStatsQuery(boolean throwOnNonEsStatsQuery) {
288+
return new Configuration(
289+
zoneId,
290+
locale,
291+
username,
292+
clusterName,
293+
pragmas,
294+
resultTruncationMaxSizeRegular,
295+
resultTruncationDefaultSizeRegular,
296+
query,
297+
profile,
298+
tables,
299+
queryStartTimeNanos,
300+
allowPartialResults,
301+
resultTruncationMaxSizeTimeseries,
302+
resultTruncationDefaultSizeTimeseries,
303+
throwOnNonEsStatsQuery
239304
);
240305
}
241306

@@ -254,6 +319,14 @@ public boolean allowPartialResults() {
254319
return allowPartialResults;
255320
}
256321

322+
/**
323+
* Whether to throw an exception when a non-ES stats query is attempted to be executed.
324+
* This is used by query approximation, see {@link org.elasticsearch.xpack.esql.approximate.Approximate}.
325+
*/
326+
public boolean throwOnNonEsStatsQuery() {
327+
return throwOnNonEsStatsQuery;
328+
}
329+
257330
private static void writeQuery(StreamOutput out, String query) throws IOException {
258331
if (query.length() > QUERY_COMPRESS_THRESHOLD_CHARS) { // compare on chars to avoid UTF-8 encoding unless actually required
259332
out.writeBoolean(true);
@@ -293,7 +366,8 @@ public boolean equals(Object o) {
293366
&& Objects.equals(that.query, query)
294367
&& profile == that.profile
295368
&& tables.equals(that.tables)
296-
&& allowPartialResults == that.allowPartialResults;
369+
&& allowPartialResults == that.allowPartialResults
370+
&& throwOnNonEsStatsQuery == that.throwOnNonEsStatsQuery;
297371
}
298372

299373
@Override
@@ -312,7 +386,8 @@ public int hashCode() {
312386
tables,
313387
allowPartialResults,
314388
resultTruncationMaxSizeTimeseries,
315-
resultTruncationDefaultSizeTimeseries
389+
resultTruncationDefaultSizeTimeseries,
390+
throwOnNonEsStatsQuery
316391
);
317392
}
318393

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.elasticsearch.xpack.esql.core.expression.Attribute;
5252
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
5353
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
54-
import org.elasticsearch.xpack.esql.core.expression.function.Function;
5554
import org.elasticsearch.xpack.esql.core.tree.NodeUtils;
5655
import org.elasticsearch.xpack.esql.core.tree.Source;
5756
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -206,8 +205,7 @@ public void execute(EsqlQueryRequest request, EsqlExecutionInfo executionInfo, P
206205
System.nanoTime(),
207206
request.allowPartialResults(),
208207
clusterSettings.timeseriesResultTruncationMaxSize(),
209-
clusterSettings.timeseriesResultTruncationDefaultSize(),
210-
true
208+
clusterSettings.timeseriesResultTruncationDefaultSize()
211209
);
212210
FoldContext foldContext = configuration.newFoldContext();
213211

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/approximate/ApproximateTests.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
2929
import org.elasticsearch.xpack.esql.core.tree.Source;
3030
import org.elasticsearch.xpack.esql.expression.Foldables;
31-
import org.elasticsearch.xpack.esql.expression.function.scalar.approximate.ConfidenceInterval;
3231
import org.elasticsearch.xpack.esql.inference.InferenceService;
3332
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
3433
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
@@ -51,14 +50,12 @@
5150
import org.hamcrest.TypeSafeMatcher;
5251

5352
import java.util.ArrayList;
54-
import java.util.Arrays;
5553
import java.util.HashMap;
5654
import java.util.List;
5755
import java.util.Map;
5856
import java.util.function.Function;
5957
import java.util.function.Predicate;
6058

61-
import static java.lang.Double.NaN;
6259
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
6360
import static org.hamcrest.CoreMatchers.allOf;
6461
import static org.hamcrest.CoreMatchers.not;
@@ -396,7 +393,9 @@ private void verify(String query) throws Exception {
396393
private Approximate createApproximate(String query, TestRunner runner) throws Exception {
397394
return new Approximate(
398395
getLogicalPlan(query),
399-
new LogicalPlanOptimizer(new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), EsqlTestUtils.randomMinimumVersion())),
396+
new LogicalPlanOptimizer(
397+
new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG, FoldContext.small(), EsqlTestUtils.randomMinimumVersion())
398+
),
400399
runner,
401400
runner,
402401
EsqlTestUtils.TEST_CFG,
@@ -416,16 +415,4 @@ private LogicalPlan getLogicalPlan(String query) throws Exception {
416415
}
417416
return resultHolder.get();
418417
}
419-
420-
public void test() {
421-
double bestEstimate = 17600.0;
422-
double[] estimates = new double[] {
423-
NaN, NaN, NaN, 93768.0, NaN, NaN, NaN, 93916.0, NaN, NaN, NaN, NaN, NaN, NaN, 93916.0, NaN,
424-
93916.0, NaN, NaN, NaN, NaN, NaN, 93768.0, NaN, NaN, NaN, NaN, NaN, NaN, 93916.0, NaN, NaN,
425-
93916.0, NaN, NaN, NaN, NaN, NaN, NaN, NaN, 93768.0, 93916.0, NaN, NaN, NaN, NaN, NaN, NaN
426-
};
427-
int trialCount=3;
428-
int bucketCount=16;
429-
System.out.println(Arrays.toString(ConfidenceInterval.computeConfidenceInterval(bestEstimate, estimates, trialCount, bucketCount, 0.9)));
430-
}
431418
}

0 commit comments

Comments
 (0)