Skip to content

Commit 63ae2e5

Browse files
Rewrite RoundTo to QueryAndTags
1 parent 4be260a commit 63ae2e5

File tree

5 files changed

+390
-12
lines changed

5 files changed

+390
-12
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizer.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.ReplaceSourceAttributes;
2222
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialDocValuesExtraction;
2323
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SpatialShapeBoundsExtraction;
24+
import org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SubstituteRoundToWithQueryAndTags;
2425
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2526
import org.elasticsearch.xpack.esql.rule.ParameterizedRuleExecutor;
2627
import org.elasticsearch.xpack.esql.rule.Rule;
@@ -60,7 +61,7 @@ protected List<Batch<PhysicalPlan>> batches() {
6061
}
6162

6263
protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
63-
List<Rule<?, PhysicalPlan>> esSourceRules = new ArrayList<>(6);
64+
List<Rule<?, PhysicalPlan>> esSourceRules = new ArrayList<>(7);
6465
esSourceRules.add(new ReplaceSourceAttributes());
6566
if (optimizeForEsSource) {
6667
esSourceRules.add(new PushTopNToSource());
@@ -74,6 +75,20 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
7475
// execute the rules multiple times to improve the chances of things being pushed down
7576
@SuppressWarnings("unchecked")
7677
var pushdown = new Batch<PhysicalPlan>("Push to ES", esSourceRules.toArray(Rule[]::new));
78+
List<Rule<?, PhysicalPlan>> substitutionRules = new ArrayList<>(1);
79+
if (optimizeForEsSource) {
80+
substitutionRules.add(new SubstituteRoundToWithQueryAndTags());
81+
}
82+
// execute the SubstituteRoundToWithQueryAndTags rule once after all the other pushdown rules are applied, as this rule generate
83+
// multiple QueryBuilders according the number of RoundTo points, it should be applied after all the other eligible pushdowns are
84+
// done.
85+
@SuppressWarnings("unchecked")
86+
var substituteRoundToWithQueryAndTags = new Batch<PhysicalPlan>(
87+
"Substitute RoundTo with QueryAndTags",
88+
Limiter.ONCE,
89+
substitutionRules.toArray(Rule[]::new)
90+
);
91+
7792
// add the field extraction in just one pass
7893
// add it at the end after all the other rules have ran
7994
var fieldExtraction = new Batch<>(
@@ -84,6 +99,6 @@ protected static List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
8499
new SpatialShapeBoundsExtraction(),
85100
new ParallelizeTimeSeriesSource()
86101
);
87-
return List.of(pushdown, fieldExtraction);
102+
return List.of(pushdown, substituteRoundToWithQueryAndTags, fieldExtraction);
88103
}
89104
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
9+
10+
import org.elasticsearch.index.query.QueryBuilder;
11+
import org.elasticsearch.xpack.esql.core.expression.Alias;
12+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
13+
import org.elasticsearch.xpack.esql.core.expression.Expression;
14+
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
15+
import org.elasticsearch.xpack.esql.core.expression.Literal;
16+
import org.elasticsearch.xpack.esql.core.querydsl.query.Query;
17+
import org.elasticsearch.xpack.esql.core.tree.Source;
18+
import org.elasticsearch.xpack.esql.core.type.DataType;
19+
import org.elasticsearch.xpack.esql.core.type.EsField;
20+
import org.elasticsearch.xpack.esql.core.util.Queries;
21+
import org.elasticsearch.xpack.esql.expression.function.scalar.math.RoundTo;
22+
import org.elasticsearch.xpack.esql.expression.predicate.logical.And;
23+
import org.elasticsearch.xpack.esql.expression.predicate.nulls.IsNull;
24+
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.GreaterThanOrEqual;
25+
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThan;
26+
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
27+
import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules;
28+
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
29+
import org.elasticsearch.xpack.esql.plan.physical.EvalExec;
30+
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
31+
32+
import java.time.ZoneId;
33+
import java.util.ArrayList;
34+
import java.util.Collections;
35+
import java.util.List;
36+
import java.util.Map;
37+
38+
import static org.elasticsearch.xpack.esql.core.type.DataTypeConverter.safeToLong;
39+
import static org.elasticsearch.xpack.esql.planner.TranslatorHandler.TRANSLATOR_HANDLER;
40+
41+
public class SubstituteRoundToWithQueryAndTags extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
42+
EvalExec,
43+
LocalPhysicalOptimizerContext> {
44+
45+
@Override
46+
protected PhysicalPlan rule(EvalExec evalExec, LocalPhysicalOptimizerContext ctx) {
47+
PhysicalPlan plan = evalExec;
48+
// Skip TIME_SERIES indices, as it is not quite clear how to deal with TimeSeriesAggregations
49+
if (evalExec.child() instanceof EsQueryExec queryExec && queryExec.canSubstituteRoundToWithQueryBuilderAndTags()) {
50+
// LuceneTopNSourceOperator does not support QueryAndTags, if the sort is pushed down to EsQueryExec, skip this rule.
51+
List<EsQueryExec.Sort> sorts = queryExec.sorts();
52+
if (sorts != null && sorts.isEmpty() == false) {
53+
return plan;
54+
}
55+
// Look for RoundTo and plan the push down for it.
56+
// It is not clear how to push down multiple RoundTos, push down only one RoundTo for now.
57+
List<RoundTo> roundTos = evalExec.fields()
58+
.stream()
59+
.map(Alias::child)
60+
.filter(RoundTo.class::isInstance)
61+
.map(RoundTo.class::cast)
62+
.toList();
63+
if (roundTos.size() == 1) {
64+
plan = planRoundTo(roundTos.get(0), evalExec, queryExec, ctx);
65+
}
66+
}
67+
return plan;
68+
}
69+
70+
/**
71+
* TODO add an example for the RoundTo pushdown
72+
*/
73+
private static PhysicalPlan planRoundTo(RoundTo roundTo, EvalExec evalExec, EsQueryExec queryExec, LocalPhysicalOptimizerContext ctx) {
74+
// Usually EsQueryExec has only one QueryBuilder, one Lucene query, without RoundTo push down.
75+
// If the RoundTo can be pushed down, EsQueryExec will have a list of QueryBuilders with tags that will be sent to
76+
// EsPhysicalOperationProviders.sourcePhysicalOperation to create a list of LuceneSliceQueue.QueryAndTags
77+
List<EsQueryExec.QueryBuilderAndTags> queryBuilderAndTags = queryBuilderAndTags(roundTo, queryExec, ctx);
78+
if (queryBuilderAndTags == null || queryBuilderAndTags.isEmpty()) {
79+
return evalExec;
80+
}
81+
82+
FieldAttribute fieldAttribute = (FieldAttribute) roundTo.field();
83+
String tagFieldName = Attribute.rawTemporaryName(
84+
fieldAttribute.fieldName().string(),
85+
"round_to",
86+
roundTo.field().dataType().typeName()
87+
);
88+
FieldAttribute tagField = new FieldAttribute(
89+
roundTo.source(),
90+
tagFieldName,
91+
new EsField(tagFieldName, roundTo.dataType(), Map.of(), false)
92+
);
93+
// Add new tag field to attributes/output
94+
List<Attribute> newAttributes = new ArrayList<>(queryExec.attrs());
95+
newAttributes.add(tagField);
96+
97+
// create a new EsQueryExec with newAttributes/output and queryBuilderAndTags
98+
EsQueryExec queryExecWithTags = new EsQueryExec(
99+
queryExec.source(),
100+
queryExec.indexPattern(),
101+
queryExec.indexMode(),
102+
queryExec.indexNameWithModes(),
103+
newAttributes,
104+
queryExec.query(),
105+
queryExec.limit(),
106+
queryExec.sorts(),
107+
queryExec.estimatedRowSize(),
108+
queryBuilderAndTags
109+
);
110+
111+
// Replace RoundTo with new tag field in EvalExec
112+
List<Alias> updatedFields = evalExec.fields()
113+
.stream()
114+
.map(alias -> alias.child() instanceof RoundTo ? alias.replaceChild(tagField) : alias)
115+
.toList();
116+
117+
return new EvalExec(evalExec.source(), queryExecWithTags, updatedFields);
118+
}
119+
120+
private static List<EsQueryExec.QueryBuilderAndTags> queryBuilderAndTags(
121+
RoundTo roundTo,
122+
EsQueryExec queryExec,
123+
LocalPhysicalOptimizerContext ctx
124+
) {
125+
LucenePushdownPredicates pushdownPredicates = LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags());
126+
Expression field = roundTo.field();
127+
if (pushdownPredicates.isPushableFieldAttribute(field) == false) {
128+
return null;
129+
}
130+
List<Expression> roundingPoints = roundTo.points();
131+
int count = roundingPoints.size();
132+
DataType dataType = roundTo.dataType();
133+
List<? extends Number> points = resolveRoundingPoints(dataType, roundingPoints);
134+
if (points.size() != count || points.isEmpty()) {
135+
return null;
136+
}
137+
List<EsQueryExec.QueryBuilderAndTags> queries = new ArrayList<>(count);
138+
139+
Number tag = points.get(0);
140+
if (points.size() == 1) {
141+
EsQueryExec.QueryBuilderAndTags queryBuilderAndTags = tageOnlyBucket(queryExec, tag);
142+
queries.add(queryBuilderAndTags);
143+
} else {
144+
Source source = queryExec.source();
145+
Number lower = null;
146+
Number upper = null;
147+
Queries.Clause clause = queryExec.hasScoring() ? Queries.Clause.MUST : Queries.Clause.FILTER;
148+
ZoneId zoneId = ctx.configuration().zoneId();
149+
for (int i = 1; i < count; i++) {
150+
upper = points.get(i);
151+
// build predicates for RoundTo ranges
152+
queries.add(rangeBucket(source, field, dataType, lower, upper, tag, zoneId, queryExec, pushdownPredicates, clause));
153+
lower = upper;
154+
tag = upper;
155+
}
156+
// build the last/gte bucket
157+
queries.add(rangeBucket(source, field, dataType, lower, null, lower, zoneId, queryExec, pushdownPredicates, clause));
158+
// build null bucket
159+
queries.add(nullBucket(source, field, queryExec, pushdownPredicates, clause));
160+
}
161+
return queries;
162+
}
163+
164+
private static List<? extends Number> resolveRoundingPoints(DataType dataType, List<Expression> roundingPoints) {
165+
return switch (dataType) {
166+
case LONG, DATETIME, DATE_NANOS -> sortedLongRoundingPoints(roundingPoints);
167+
case INTEGER -> sortedIntRoundingPoints(roundingPoints);
168+
case DOUBLE -> sortedDoubleRoundingPoints(roundingPoints);
169+
default -> List.of();
170+
};
171+
}
172+
173+
private static List<Long> sortedLongRoundingPoints(List<Expression> roundingPoints) {
174+
List<Long> points = new ArrayList<>(roundingPoints.size());
175+
for (Expression e : roundingPoints) {
176+
if (e instanceof Literal l && l.value() instanceof Number n) {
177+
points.add(safeToLong(n));
178+
}
179+
}
180+
Collections.sort(points);
181+
return points;
182+
}
183+
184+
private static List<Integer> sortedIntRoundingPoints(List<Expression> roundingPoints) {
185+
List<Integer> points = new ArrayList<>(roundingPoints.size());
186+
for (Expression e : roundingPoints) {
187+
if (e instanceof Literal l) {
188+
points.add((Integer) l.value());
189+
}
190+
}
191+
Collections.sort(points);
192+
return points;
193+
}
194+
195+
private static List<Double> sortedDoubleRoundingPoints(List<Expression> roundingPoints) {
196+
List<Double> points = new ArrayList<>(roundingPoints.size());
197+
for (Expression e : roundingPoints) {
198+
if (e instanceof Literal l) {
199+
points.add((Double) l.value());
200+
}
201+
}
202+
Collections.sort(points);
203+
return points;
204+
}
205+
206+
private static Expression createRangeExpression(
207+
Source source,
208+
Expression field,
209+
DataType dataType,
210+
Object lower,
211+
Object upper,
212+
ZoneId zoneId
213+
) {
214+
LessThan lt = new LessThan(source, field, new Literal(source, upper, dataType), zoneId);
215+
GreaterThanOrEqual gte = new GreaterThanOrEqual(source, field, new Literal(source, lower, dataType), zoneId);
216+
And and = new And(source, lt, gte);
217+
if (lower == null) {
218+
return lt;
219+
} else if (upper == null) {
220+
return gte;
221+
} else {
222+
return and;
223+
}
224+
}
225+
226+
private static EsQueryExec.QueryBuilderAndTags tageOnlyBucket(EsQueryExec queryExec, Object tag) {
227+
return new EsQueryExec.QueryBuilderAndTags(queryExec.query(), List.of(tag));
228+
}
229+
230+
private static EsQueryExec.QueryBuilderAndTags nullBucket(
231+
Source source,
232+
Expression field,
233+
EsQueryExec queryExec,
234+
LucenePushdownPredicates pushdownPredicates,
235+
Queries.Clause clause
236+
) {
237+
IsNull isNull = new IsNull(source, field);
238+
List<Object> nullTags = new ArrayList<>(1);
239+
nullTags.add(null);
240+
return buildCombinedQueryAndTags(queryExec, pushdownPredicates, isNull, clause, nullTags);
241+
}
242+
243+
private static EsQueryExec.QueryBuilderAndTags rangeBucket(
244+
Source source,
245+
Expression field,
246+
DataType dataType,
247+
Object lower,
248+
Object upper,
249+
Object tag,
250+
ZoneId zoneId,
251+
EsQueryExec queryExec,
252+
LucenePushdownPredicates pushdownPredicates,
253+
Queries.Clause clause
254+
) {
255+
Expression range = createRangeExpression(source, field, dataType, lower, upper, zoneId);
256+
return buildCombinedQueryAndTags(queryExec, pushdownPredicates, range, clause, List.of(tag));
257+
}
258+
259+
private static EsQueryExec.QueryBuilderAndTags buildCombinedQueryAndTags(
260+
EsQueryExec queryExec,
261+
LucenePushdownPredicates pushdownPredicates,
262+
Expression expression,
263+
Queries.Clause clause,
264+
List<Object> tags
265+
) {
266+
Query queryDSL = TRANSLATOR_HANDLER.asQuery(pushdownPredicates, expression);
267+
QueryBuilder mainQuery = queryExec.query();
268+
QueryBuilder newQuery = queryDSL.toQueryBuilder();
269+
QueryBuilder combinedQuery = Queries.combine(clause, mainQuery != null ? List.of(mainQuery, newQuery) : List.of(newQuery));
270+
return new EsQueryExec.QueryBuilderAndTags(combinedQuery, tags);
271+
}
272+
}

0 commit comments

Comments
 (0)