Skip to content

Commit f042159

Browse files
elasticsearchmachineGalLalouche
authored andcommitted
ESQL: Handle filters when pushing COUNT(*) BY DATE_TRUNC to source
1 parent 3f2ea0d commit f042159

File tree

4 files changed

+314
-41
lines changed

4 files changed

+314
-41
lines changed

x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,31 @@ c:l | d:datetime
850850
4 | 1994-01-01T00:00:00.000Z
851851
;
852852

853+
countStarGroupedTruncWithFilterInsideStats
854+
from employees | stats c = count(*) where hire_date > "1990-01-01" by d=date_trunc(1 year, hire_date) | sort d | limit 10;
855+
c:l | d:datetime
856+
0 | 1985-01-01T00:00:00.000Z
857+
0 | 1986-01-01T00:00:00.000Z
858+
0 | 1987-01-01T00:00:00.000Z
859+
0 | 1988-01-01T00:00:00.000Z
860+
0 | 1989-01-01T00:00:00.000Z
861+
12 | 1990-01-01T00:00:00.000Z
862+
6 | 1991-01-01T00:00:00.000Z
863+
8 | 1992-01-01T00:00:00.000Z
864+
3 | 1993-01-01T00:00:00.000Z
865+
4 | 1994-01-01T00:00:00.000Z
866+
;
867+
868+
countStarGroupedTruncWithFilterOutsideStats
869+
from employees | where hire_date > "1990-01-01" and hire_date < "1995-01-01" | stats c = count(*) by d=date_trunc(1 year, hire_date) | sort d | limit 10;
870+
c:l | d:datetime
871+
12 | 1990-01-01T00:00:00.000Z
872+
6 | 1991-01-01T00:00:00.000Z
873+
8 | 1992-01-01T00:00:00.000Z
874+
3 | 1993-01-01T00:00:00.000Z
875+
4 | 1994-01-01T00:00:00.000Z
876+
;
877+
853878
countAllAndOtherStatGrouped
854879
from employees | stats c = count(*), min = min(emp_no) by languages | sort languages;
855880

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushCountQueryAndTagsToSource.java

Lines changed: 175 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77

88
package org.elasticsearch.xpack.esql.optimizer.rules.physical.local;
99

10+
import org.elasticsearch.core.Nullable;
1011
import org.elasticsearch.index.query.BoolQueryBuilder;
12+
import org.elasticsearch.index.query.ExistsQueryBuilder;
13+
import org.elasticsearch.index.query.QueryBuilder;
14+
import org.elasticsearch.index.query.RangeQueryBuilder;
15+
import org.elasticsearch.index.query.TermQueryBuilder;
1116
import org.elasticsearch.xpack.esql.core.expression.Alias;
1217
import org.elasticsearch.xpack.esql.core.expression.Attribute;
1318
import org.elasticsearch.xpack.esql.core.expression.Literal;
@@ -24,6 +29,10 @@
2429
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2530
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
2631

32+
import java.util.List;
33+
import java.util.Objects;
34+
import java.util.Optional;
35+
2736
/**
2837
* Pushes count aggregations on top of query and tags to source.
2938
* Will transform:
@@ -38,6 +47,9 @@
3847
* └── StatsQuery [count with query + tags]
3948
* </pre>
4049
* Where the filter is needed since the original Aggregate would not produce buckets with count = 0.
50+
*
51+
* If there's more than one query on the query builder, and both filter <b>are on the same field</b>, this rule will also attempt to merge
52+
* them before pushing. If the queries cannot be merged, the rule will not be applied.
4153
*/
4254
public class PushCountQueryAndTagsToSource extends PhysicalOptimizerRules.OptimizerRule<AggregateExec> {
4355
@Override
@@ -47,19 +59,23 @@ protected PhysicalPlan rule(AggregateExec aggregateExec) {
4759
aggregateExec.aggregates().size() == 2
4860
&& aggregateExec.aggregates().getFirst() instanceof Alias alias
4961
&& alias.child() instanceof Count count
50-
&& count.hasFilter() == false // TODO We don't support filters at the moment (but we definitely should!).
62+
&& count.hasFilter() == false // FIXME explain
5163
&& count.field() instanceof Literal // Ensures count(*) or equivalent.
5264
&& aggregateExec.child() instanceof EvalExec evalExec
5365
&& evalExec.child() instanceof EsQueryExec queryExec
5466
&& queryExec.queryBuilderAndTags().size() > 1 // Ensures there are query and tags to push down.
55-
&& queryExec.queryBuilderAndTags().stream().allMatch(PushCountQueryAndTagsToSource::isSingleFilterQuery)) {
67+
) {
68+
var withFilter = tryMerge(queryExec.queryBuilderAndTags());
69+
if (withFilter.isEmpty() || withFilter.stream().allMatch(PushCountQueryAndTagsToSource::shouldPush) == false) {
70+
return aggregateExec;
71+
}
5672
EsStatsQueryExec statsQueryExec = new EsStatsQueryExec(
5773
queryExec.source(),
5874
queryExec.indexPattern(),
5975
null, // query
6076
queryExec.limit(),
6177
aggregateExec.output(),
62-
new EsStatsQueryExec.ByStat(queryExec.queryBuilderAndTags())
78+
new EsStatsQueryExec.ByStat(withFilter)
6379
);
6480
// Wrap with FilterExec to remove empty buckets (keep buckets where count > 0). This was automatically handled by the
6581
// AggregateExec, but since we removed it, we need to do it manually.
@@ -69,13 +85,168 @@ protected PhysicalPlan rule(AggregateExec aggregateExec) {
6985
return aggregateExec;
7086
}
7187

72-
private static boolean isSingleFilterQuery(EsQueryExec.QueryBuilderAndTags queryBuilderAndTags) {
88+
/** We only push down single and simple queries, since otherwise we risk overloading Lucene with a complex query. */
89+
private static boolean shouldPush(EsQueryExec.QueryBuilderAndTags queryBuilderAndTags) {
7390
return switch (queryBuilderAndTags.query()) {
7491
case SingleValueQuery.Builder unused -> true;
92+
case RangeQueryBuilder unused -> true;
93+
case TermQueryBuilder unused -> true;
94+
case ExistsQueryBuilder unused -> true;
7595
case BoolQueryBuilder bq -> bq.filter().size() + bq.must().size() + bq.should().size() + bq.mustNot().size() <= 1;
7696
default -> false;
7797
};
7898
}
7999

100+
private static List<EsQueryExec.QueryBuilderAndTags> tryMerge(List<EsQueryExec.QueryBuilderAndTags> queryBuilderAndTags) {
101+
return queryBuilderAndTags.stream().flatMap(e -> tryMergeBoolQuery(e).stream()).flatMap(e -> trySimplifyRange(e).stream()).toList();
102+
}
103+
104+
/**
105+
* If the query specifies an impossible range (e.g., from > to), returns {@link Optional#empty()}; if from == to and both ends are
106+
* inclusive, returns a {@link TermQueryBuilder}; otherwise returns the original range.
107+
*/
108+
private static Optional<EsQueryExec.QueryBuilderAndTags> trySimplifyRange(EsQueryExec.QueryBuilderAndTags qbt) {
109+
if (qbt.query() instanceof RangeQueryBuilder rqb && rqb.from() != null && rqb.to() != null) {
110+
int comparison = compare(rqb.from(), rqb.to());
111+
if (comparison > 0) {
112+
// from > to, can remove the query entry.
113+
return Optional.empty();
114+
} else if (comparison == 0) {
115+
// from == to should be kept only if both ends are inclusive, and then we can replace the range with a term query.
116+
return rqb.includeLower() && rqb.includeUpper()
117+
? Optional.of(qbt.withQuery(new TermQueryBuilder(rqb.fieldName(), rqb.from())))
118+
: Optional.empty();
119+
}
120+
return Optional.of(qbt); // valid range, keep as is.
121+
122+
}
123+
return Optional.of(qbt);
124+
}
125+
126+
/**
127+
* Attempts to merge a {@link BoolQueryBuilder} filters. Returns {@link Optional#empty()} if the query can be dropped. If the filters
128+
* cannot be merged, returns the original query.
129+
*/
130+
private static Optional<EsQueryExec.QueryBuilderAndTags> tryMergeBoolQuery(EsQueryExec.QueryBuilderAndTags qbt) {
131+
if (qbt.query() instanceof BoolQueryBuilder bqb && bqb.filter().size() == 2) {
132+
QueryBuilder filter1 = bqb.filter().get(0);
133+
var range1 = tryExtractSingleRangeQuery(filter1);
134+
QueryBuilder filter2 = bqb.filter().get(1);
135+
if (range1 != null
136+
&& filter2 instanceof BoolQueryBuilder internalQuery
137+
&& internalQuery.mustNot().size() == 1
138+
&& internalQuery.mustNot().get(0) instanceof ExistsQueryBuilder) {
139+
// Simple "must not exist" cases can be dropped since they never match anything when combined with a range.
140+
return Optional.empty();
141+
}
142+
var range2 = tryExtractSingleRangeQuery(filter2);
143+
return Optional.of(mergeRanges(qbt, range1, range2));
144+
}
145+
return Optional.of(qbt);
146+
}
147+
148+
private static EsQueryExec.QueryBuilderAndTags mergeRanges(
149+
EsQueryExec.QueryBuilderAndTags original,
150+
RangeQueryBuilder range1,
151+
RangeQueryBuilder range2
152+
) {
153+
return range1 == null || range2 == null ? original : merge(range1, range2).map(original::withQuery).orElse(original);
154+
}
155+
156+
/** Returns {@link Optional#empty()} if the queries cannot be merged. */
157+
private static Optional<QueryBuilder> merge(RangeQueryBuilder range1, RangeQueryBuilder range2) {
158+
if (range1.fieldName().equals(range2.fieldName()) == false) {
159+
return Optional.empty();
160+
}
161+
if (Objects.equals(nonDefaultTimezone(range1.timeZone()), nonDefaultTimezone(range2.timeZone())) == false) {
162+
return Optional.empty();
163+
}
164+
165+
RangeQueryBuilder merged = new RangeQueryBuilder(range1.fieldName());
166+
setTighterBound(merged, range1.from(), range2.from(), range1.includeLower(), range2.includeLower(), BoundType.FROM);
167+
setTighterBound(merged, range1.to(), range2.to(), range1.includeUpper(), range2.includeUpper(), BoundType.TO);
168+
169+
String timeZone = range1.timeZone();
170+
if (timeZone != null) {
171+
merged.timeZone(timeZone);
172+
}
173+
174+
if (range1.format() != null && range2.format() != null && range1.format().equals(range2.format()) == false) {
175+
return Optional.empty();
176+
}
177+
var format = range1.format() != null ? range1.format() : range2.format();
178+
if (format != null) {
179+
merged.format(format);
180+
}
181+
182+
merged.boost(Math.max(range1.boost(), range2.boost()));
183+
return Optional.of(merged);
184+
}
185+
186+
private static String nonDefaultTimezone(String s) {
187+
return s == null || s == "Z" ? null : s;
188+
}
189+
190+
/** Returns {@code null} if no single range query could be extracted. */
191+
private static @Nullable RangeQueryBuilder tryExtractSingleRangeQuery(QueryBuilder qb) {
192+
return switch (qb) {
193+
case RangeQueryBuilder rqb -> rqb;
194+
case SingleValueQuery.Builder single -> tryExtractSingleRangeQuery(single.next());
195+
case BoolQueryBuilder bqb when bqb.filter().size() == 1 -> tryExtractSingleRangeQuery(bqb.filter().getFirst());
196+
default -> null;
197+
};
198+
}
199+
200+
enum BoundType {
201+
FROM,
202+
TO
203+
}
204+
205+
// Given two bounds, sets the tighter one on the range.
206+
private static void setTighterBound(
207+
RangeQueryBuilder range,
208+
Object bound1,
209+
Object bound2,
210+
boolean include1,
211+
boolean include2,
212+
BoundType boundType
213+
) {
214+
if (bound1 == null || bound2 == null) {
215+
if (bound1 != null) {
216+
setRange(range, bound1, include1, boundType);
217+
}
218+
if (bound2 != null) {
219+
setRange(range, bound2, include2, boundType);
220+
}
221+
return;
222+
}
223+
224+
@SuppressWarnings("unchecked")
225+
int compare = compare(bound1, bound2);
226+
boolean useFirst = switch (boundType) {
227+
case FROM -> compare > 0;
228+
case TO -> compare < 0;
229+
};
230+
Object value = useFirst ? bound1 : bound2;
231+
boolean include = useFirst ? include1 : include2;
232+
if (compare == 0) {
233+
include = include1 && include2;
234+
}
235+
236+
setRange(range, value, include, boundType);
237+
}
238+
239+
@SuppressWarnings("unchecked")
240+
private static int compare(Object o1, Object o2) {
241+
return ((Comparable<Object>) o1).compareTo(o2);
242+
}
243+
244+
private static void setRange(RangeQueryBuilder range, Object val, boolean include, BoundType boundType) {
245+
switch (boundType) {
246+
case FROM -> range.from(val, include);
247+
case TO -> range.to(val, include);
248+
}
249+
}
250+
80251
private static final Literal ZERO = new Literal(Source.EMPTY, 0L, DataType.LONG);
81252
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/EsQueryExec.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,11 @@ public DataType resulType() {
129129
}
130130
}
131131

132-
public record QueryBuilderAndTags(QueryBuilder query, List<Object> tags) {};
132+
public record QueryBuilderAndTags(QueryBuilder query, List<Object> tags) {
133+
public QueryBuilderAndTags withQuery(QueryBuilder query) {
134+
return new QueryBuilderAndTags(query, tags);
135+
}
136+
};
133137

134138
public EsQueryExec(
135139
Source source,

0 commit comments

Comments
 (0)