Skip to content

Commit 1c70e36

Browse files
add ROUNDTO_PUSHDOWN_THRESHOLD in QueryParagmas and EsqlFlags to control the pushdown behavior at query and cluster level
1 parent 72ad539 commit 1c70e36

File tree

7 files changed

+190
-21
lines changed

7 files changed

+190
-21
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,11 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
7575
// execute the rules multiple times to improve the chances of things being pushed down
7676
@SuppressWarnings("unchecked")
7777
var pushdown = new Batch<PhysicalPlan>("Push to ES", esSourceRules.toArray(Rule[]::new));
78-
List<Rule<?, PhysicalPlan>> substitutionRules = new ArrayList<>(1);
79-
if (optimizeForEsSource) {
80-
substitutionRules.add(new ReplaceRoundToWithQueryAndTags());
81-
}
78+
8279
// execute the SubstituteRoundToWithQueryAndTags rule once after all the other pushdown rules are applied, as this rule generate
8380
// multiple QueryBuilders according the number of RoundTo points, it should be applied after all the other eligible pushdowns are
8481
// done, and it should be executed only once.
85-
@SuppressWarnings("unchecked")
86-
var substituteRoundToWithQueryAndTags = new Batch<PhysicalPlan>(
87-
"Substitute RoundTo with QueryAndTags",
88-
Limiter.ONCE,
89-
substitutionRules.toArray(Rule[]::new)
90-
);
82+
var substitutionRules = new Batch<>("Substitute RoundTo with QueryAndTags", Limiter.ONCE, new ReplaceRoundToWithQueryAndTags());
9183

9284
// add the field extraction in just one pass
9385
// add it at the end after all the other rules have ran
@@ -99,6 +91,6 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
9991
new SpatialShapeBoundsExtraction(),
10092
new ParallelizeTimeSeriesSource()
10193
);
102-
return List.of(pushdown, substituteRoundToWithQueryAndTags, fieldExtraction);
94+
return optimizeForEsSource ? List.of(pushdown, substitutionRules, fieldExtraction) : List.of(pushdown, fieldExtraction);
10395
}
10496
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTags.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import static org.elasticsearch.xpack.esql.core.type.DataTypeConverter.safeToLong;
4040
import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER;
41+
import static org.elasticsearch.xpack.esql.plugin.QueryPragmas.ROUNDTO_PUSHDOWN_THRESHOLD;
4142

4243
/**
4344
* {@code ReplaceRoundToWithQueryAndTags} builds a list of ranges and associated tags base on the rounding points defined in a
@@ -255,9 +256,6 @@ public class ReplaceRoundToWithQueryAndTags extends PhysicalOptimizerRules.Param
255256
EvalExec,
256257
LocalPhysicalOptimizerContext> {
257258

258-
// this is the maximum number of rounding points supported by this rule,
259-
// it is the same as the maximum number of buckets used in Rounding.
260-
public static final int MAX_NUM_POINTS = 127;
261259
private static final Logger logger = LogManager.getLogger(ReplaceRoundToWithQueryAndTags.class);
262260

263261
@Override
@@ -277,12 +275,13 @@ protected PhysicalPlan rule(EvalExec evalExec, LocalPhysicalOptimizerContext ctx
277275
if (roundTos.size() == 1) {
278276
RoundTo roundTo = roundTos.get(0);
279277
int count = roundTo.points().size();
280-
if (count > MAX_NUM_POINTS) {
278+
int roundingPointsUpperLimit = roundingPointsThreshold(ctx);
279+
if (count > roundingPointsUpperLimit) {
281280
logger.debug(
282281
"Skipping RoundTo push down for [{}], as it has [{}] points, which is more than [{}]",
283282
roundTo.source(),
284283
count,
285-
MAX_NUM_POINTS
284+
roundingPointsUpperLimit
286285
);
287286
return evalExec;
288287
}
@@ -469,4 +468,21 @@ private static EsQueryExec.QueryBuilderAndTags buildCombinedQueryAndTags(
469468
QueryBuilder combinedQuery = Queries.combine(clause, mainQuery != null ? List.of(mainQuery, newQuery) : List.of(newQuery));
470469
return new EsQueryExec.QueryBuilderAndTags(combinedQuery, tags);
471470
}
471+
472+
/**
473+
* Get the rounding points upper limit for {@code RoundTo} pushdown from query level pragmas or cluster level flags.
474+
* If the query level pragmas is set to -1(default), the cluster level flags will be used.
475+
* If the query level pragmas is set to greater than or equals to 0, the query level pragmas will be used.
476+
*/
477+
private int roundingPointsThreshold(LocalPhysicalOptimizerContext ctx) {
478+
int queryLevelRoundingPointsThreshold = ctx.configuration().pragmas().roundToPushDownThreshold();
479+
int clusterLevelRoundingPointsThreshold = ctx.flags().roundToPushdownThreshold();
480+
int roundingPointsThreshold;
481+
if (queryLevelRoundingPointsThreshold == ROUNDTO_PUSHDOWN_THRESHOLD.getDefault(ctx.configuration().pragmas().getSettings())) {
482+
roundingPointsThreshold = clusterLevelRoundingPointsThreshold;
483+
} else {
484+
roundingPointsThreshold = queryLevelRoundingPointsThreshold;
485+
}
486+
return roundingPointsThreshold;
487+
}
472488
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlFlags.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.common.settings.ClusterSettings;
1111
import org.elasticsearch.common.settings.Setting;
12+
import org.elasticsearch.common.settings.Settings;
1213

1314
public class EsqlFlags {
1415
public static final Setting<Boolean> ESQL_STRING_LIKE_ON_INDEX = Setting.boolSetting(
@@ -18,17 +19,65 @@ public class EsqlFlags {
1819
Setting.Property.Dynamic
1920
);
2021

22+
/**
23+
* The maximum number of rounding points to push down to Lucene for the {@code roundTo} function at cluster level.
24+
* {@code ReplaceRoundToWithQueryAndTags} checks this threshold before rewriting {@code RoundTo} to range queries.
25+
*
26+
* There is also a query level ROUNDTO_PUSHDOWN_THRESHOLD defined in {@code QueryPragmas}.
27+
* The cluster level threshold defaults to 127, it is the same as the maximum number of buckets used in {@code Rounding}.
28+
* The query level threshold defaults to -1, which means this query level setting is not set and cluster level upper limit will be used.
29+
* If query level threshold is set to greater than or equals to 0, the query level threshold will be used, and it overrides the cluster
30+
* level threshold.
31+
*
32+
* If the cluster level threshold is set to -1 or 0, no {@code RoundTo} pushdown will be performed, query level threshold is not set to
33+
* -1 or 0.
34+
*/
35+
public static final Setting<Integer> ESQL_ROUNDTO_PUSHDOWN_THRESHOLD = Setting.intSetting(
36+
"esql.query.roundto_pushdown_threshold",
37+
127,
38+
-1,
39+
Setting.Property.NodeScope,
40+
Setting.Property.Dynamic
41+
);
42+
2143
private final boolean stringLikeOnIndex;
2244

45+
private final int roundToPushdownThreshold;
46+
47+
/**
48+
* Constructor for tests.
49+
*/
2350
public EsqlFlags(boolean stringLikeOnIndex) {
2451
this.stringLikeOnIndex = stringLikeOnIndex;
52+
this.roundToPushdownThreshold = ESQL_ROUNDTO_PUSHDOWN_THRESHOLD.getDefault(Settings.EMPTY);
53+
}
54+
55+
/**
56+
* Constructor for tests.
57+
*/
58+
public EsqlFlags(int roundToPushdownThreshold) {
59+
this.stringLikeOnIndex = ESQL_STRING_LIKE_ON_INDEX.getDefault(Settings.EMPTY);
60+
this.roundToPushdownThreshold = roundToPushdownThreshold;
61+
}
62+
63+
/**
64+
* Constructor for tests.
65+
*/
66+
public EsqlFlags(boolean stringLikeOnIndex, int roundToPushdownThreshold) {
67+
this.stringLikeOnIndex = stringLikeOnIndex;
68+
this.roundToPushdownThreshold = roundToPushdownThreshold;
2569
}
2670

2771
public EsqlFlags(ClusterSettings settings) {
2872
this.stringLikeOnIndex = settings.get(ESQL_STRING_LIKE_ON_INDEX);
73+
this.roundToPushdownThreshold = settings.get(ESQL_ROUNDTO_PUSHDOWN_THRESHOLD);
2974
}
3075

3176
public boolean stringLikeOnIndex() {
3277
return stringLikeOnIndex;
3378
}
79+
80+
public int roundToPushdownThreshold() {
81+
return roundToPushdownThreshold;
82+
}
3483
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ public List<Setting<?>> getSettings() {
260260
PhysicalSettings.DEFAULT_DATA_PARTITIONING,
261261
PhysicalSettings.VALUES_LOADING_JUMBO_SIZE,
262262
STORED_FIELDS_SEQUENTIAL_PROPORTION,
263-
EsqlFlags.ESQL_STRING_LIKE_ON_INDEX
263+
EsqlFlags.ESQL_STRING_LIKE_ON_INDEX,
264+
EsqlFlags.ESQL_ROUNDTO_PUSHDOWN_THRESHOLD
264265
);
265266
}
266267

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/QueryPragmas.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,20 @@ public final class QueryPragmas implements Writeable {
8080
MappedFieldType.FieldExtractPreference.NONE
8181
);
8282

83+
/**
84+
* The maximum number of rounding points to push down to Lucene for the {@code roundTo} function at query level.
85+
* {@code ReplaceRoundToWithQueryAndTags} checks this threshold before rewriting {@code RoundTo} to range queries.
86+
*
87+
* There is also a cluster level ESQL_ROUNDTO_PUSHDOWN_THRESHOLD defined in {@code EsqlFlags}.
88+
* The query level threshold defaults to -1, which means this query level setting is not set and cluster level upper limit will be used.
89+
* The cluster level threshold defaults to 127, it is the same as the maximum number of buckets used in {@code Rounding}.
90+
* If query level threshold is set to greater than or equals to 0, the query level threshold will be used, and it overrides the cluster
91+
* level threshold.
92+
*
93+
* If the query level threshold is set to 0, no {@code RoundTo} pushdown will be performed.
94+
*/
95+
public static final Setting<Integer> ROUNDTO_PUSHDOWN_THRESHOLD = Setting.intSetting("roundto_pushdown_threshold", -1, -1);
96+
8397
public static final QueryPragmas EMPTY = new QueryPragmas(Settings.EMPTY);
8498

8599
private final Settings settings;
@@ -197,6 +211,10 @@ public MappedFieldType.FieldExtractPreference fieldExtractPreference() {
197211
return FIELD_EXTRACT_PREFERENCE.get(settings);
198212
}
199213

214+
public int roundToPushDownThreshold() {
215+
return ROUNDTO_PUSHDOWN_THRESHOLD.get(settings);
216+
}
217+
200218
public boolean isEmpty() {
201219
return settings.isEmpty();
202220
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,15 @@ public PhysicalPlan plan(String query, SearchStats stats, Analyzer analyzer) {
5555
return physical;
5656
}
5757

58+
public PhysicalPlan plan(String query, SearchStats stats, EsqlFlags esqlFlags) {
59+
return optimizedPlan(physicalPlan(query, analyzer), stats, esqlFlags);
60+
}
61+
5862
private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) {
63+
return optimizedPlan(plan, searchStats, new EsqlFlags(true));
64+
}
65+
66+
private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats, EsqlFlags esqlFlags) {
5967
// System.out.println("* Physical Before\n" + plan);
6068
var physicalPlan = EstimatesRowSize.estimateRowSize(0, physicalPlanOptimizer.optimize(plan));
6169
// System.out.println("* Physical After\n" + physicalPlan);
@@ -67,7 +75,7 @@ private PhysicalPlan optimizedPlan(PhysicalPlan plan, SearchStats searchStats) {
6775
new LocalLogicalOptimizerContext(config, FoldContext.small(), searchStats)
6876
);
6977
var physicalTestOptimizer = new TestLocalPhysicalPlanOptimizer(
70-
new LocalPhysicalOptimizerContext(new EsqlFlags(true), config, FoldContext.small(), searchStats),
78+
new LocalPhysicalOptimizerContext(esqlFlags, config, FoldContext.small(), searchStats),
7179
true
7280
);
7381
var l = PlannerUtils.localPlan(physicalPlan, logicalTestOptimizer, physicalTestOptimizer);

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/ReplaceRoundToWithQueryAndTagsTests.java

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
99

1010
import org.elasticsearch.common.logging.LoggerMessageFormat;
11+
import org.elasticsearch.common.settings.Settings;
1112
import org.elasticsearch.index.query.BoolQueryBuilder;
1213
import org.elasticsearch.index.query.MatchQueryBuilder;
1314
import org.elasticsearch.index.query.QueryBuilder;
@@ -27,6 +28,7 @@
2728
import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc;
2829
import org.elasticsearch.xpack.esql.expression.function.scalar.math.RoundTo;
2930
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizerTests;
31+
import org.elasticsearch.xpack.esql.optimizer.TestPlannerOptimizer;
3032
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
3133
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
3234
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
@@ -38,12 +40,15 @@
3840
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
3941
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
4042
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
43+
import org.elasticsearch.xpack.esql.plugin.EsqlFlags;
44+
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
4145
import org.elasticsearch.xpack.esql.session.Configuration;
4246
import org.elasticsearch.xpack.esql.stats.SearchStats;
4347

4448
import java.util.ArrayList;
4549
import java.util.HashMap;
4650
import java.util.List;
51+
import java.util.Locale;
4752
import java.util.Map;
4853
import java.util.stream.Collectors;
4954

@@ -55,6 +60,7 @@
5560
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
5661
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
5762
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
63+
import static org.elasticsearch.xpack.esql.EsqlTestUtils.configuration;
5864
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.DEFAULT_DATE_NANOS_FORMATTER;
5965
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.DEFAULT_DATE_TIME_FORMATTER;
6066
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.dateNanosToLong;
@@ -391,9 +397,11 @@ public void testDateTruncBucketNotTransformToQueryAndTagsWithFork() {
391397
}
392398
}
393399

394-
// If the number of rounding points is 127 or less, the query is rewritten to use QueryAndTags.
395-
// If the number of rounding points is 128 or more, the query is not rewritten.
396-
public void testRoundToTransformToQueryAndTagsUpperLimit() {
400+
/**
401+
* If the number of rounding points is 127 or less, the query is rewritten to QueryAndTags.
402+
* If the number of rounding points is 128 or more, the query is not rewritten.
403+
*/
404+
public void testRoundToTransformToQueryAndTagsWithDefaultUpperLimit() {
397405
for (int numOfPoints : List.of(127, 128)) {
398406
StringBuilder points = new StringBuilder();
399407
for (int i = 0; i < numOfPoints; i++) {
@@ -445,6 +453,83 @@ public void testRoundToTransformToQueryAndTagsUpperLimit() {
445453
}
446454
}
447455

456+
/**
457+
* Query level threshold(if greater than -1) set in QueryPragmas overrides the cluster level threshold set in EsqlFlags.
458+
*/
459+
public void testRoundToTransformToQueryAndTagsWithCustomizedUpperLimit() {
460+
for (int clusterLevelThreshold : List.of(-1, 0, 60, 126, 128, 256)) {
461+
for (int queryLevelThreshold : List.of(-1, 0, 60, 126, 128, 256)) {
462+
StringBuilder points = new StringBuilder(); // there are 127 rounding points
463+
for (int i = 0; i < 127; i++) {
464+
if (i > 0) {
465+
points.append(", ");
466+
}
467+
points.append(i);
468+
}
469+
String query = LoggerMessageFormat.format(null, """
470+
from test
471+
| stats count(*) by x = round_to(integer, {})
472+
""", points.toString());
473+
474+
TestPlannerOptimizer plannerOptimizerWithPragmas = new TestPlannerOptimizer(
475+
configuration(
476+
new QueryPragmas(
477+
Settings.builder()
478+
.put(QueryPragmas.ROUNDTO_PUSHDOWN_THRESHOLD.getKey().toLowerCase(Locale.ROOT), queryLevelThreshold)
479+
.build()
480+
),
481+
query
482+
),
483+
makeAnalyzer("mapping-all-types.json")
484+
);
485+
EsqlFlags esqlFlags = new EsqlFlags(clusterLevelThreshold);
486+
assertEquals(clusterLevelThreshold, esqlFlags.roundToPushdownThreshold());
487+
assertTrue(esqlFlags.stringLikeOnIndex());
488+
PhysicalPlan plan = plannerOptimizerWithPragmas.plan(query, searchStats, esqlFlags);
489+
boolean pushdown = false;
490+
if (queryLevelThreshold > -1) {
491+
pushdown = queryLevelThreshold >= 127;
492+
} else {
493+
pushdown = clusterLevelThreshold >= 127;
494+
}
495+
496+
LimitExec limit = as(plan, LimitExec.class);
497+
AggregateExec agg = as(limit.child(), AggregateExec.class);
498+
assertThat(agg.getMode(), is(FINAL));
499+
List<? extends Expression> groupings = agg.groupings();
500+
NamedExpression grouping = as(groupings.get(0), NamedExpression.class);
501+
assertEquals("x", grouping.name());
502+
assertEquals(DataType.INTEGER, grouping.dataType());
503+
assertEquals(List.of("count(*)", "x"), Expressions.names(agg.aggregates()));
504+
ExchangeExec exchange = as(agg.child(), ExchangeExec.class);
505+
assertThat(exchange.inBetweenAggs(), is(true));
506+
agg = as(exchange.child(), AggregateExec.class);
507+
EvalExec evalExec = as(agg.child(), EvalExec.class);
508+
List<Alias> aliases = evalExec.fields();
509+
assertEquals(1, aliases.size());
510+
if (pushdown) {
511+
FieldAttribute roundToTag = as(aliases.get(0).child(), FieldAttribute.class);
512+
assertTrue(roundToTag.name().startsWith("$$integer$round_to$"));
513+
EsQueryExec esQueryExec = as(evalExec.child(), EsQueryExec.class);
514+
List<EsQueryExec.QueryBuilderAndTags> queryBuilderAndTags = esQueryExec.queryBuilderAndTags();
515+
assertEquals(128, queryBuilderAndTags.size()); // 127 + nullBucket
516+
assertThrows(UnsupportedOperationException.class, esQueryExec::query);
517+
} else { // query rewrite does not happen
518+
RoundTo roundTo = as(aliases.get(0).child(), RoundTo.class);
519+
assertEquals(127, roundTo.points().size());
520+
FieldExtractExec fieldExtractExec = as(evalExec.child(), FieldExtractExec.class);
521+
EsQueryExec esQueryExec = as(fieldExtractExec.child(), EsQueryExec.class);
522+
List<EsQueryExec.QueryBuilderAndTags> queryBuilderAndTags = esQueryExec.queryBuilderAndTags();
523+
assertEquals(1, queryBuilderAndTags.size());
524+
EsQueryExec.QueryBuilderAndTags queryBuilder = queryBuilderAndTags.get(0);
525+
assertNull(queryBuilder.query());
526+
assertTrue(queryBuilder.tags().isEmpty());
527+
assertNull(esQueryExec.query());
528+
}
529+
}
530+
}
531+
}
532+
448533
private static void verifyQueryAndTags(List<EsQueryExec.QueryBuilderAndTags> expected, List<EsQueryExec.QueryBuilderAndTags> actual) {
449534
assertEquals(expected.size(), actual.size());
450535
for (int i = 0; i < expected.size(); i++) {

0 commit comments

Comments
 (0)