4747 * └── StatsQuery [count with query + tags]
4848 * </pre>
4949 * Where the filter is needed since the original Aggregate would not produce buckets with count = 0.
50+ *
51+ * If there's more than one query on the query builder, and both filter <b>are on the same field</b>, this rule will also attempt to merge
52+ * them before pushing. If the queries cannot be merged, the rule will not be applied.
5053 */
5154public class PushCountQueryAndTagsToSource extends PhysicalOptimizerRules .OptimizerRule <AggregateExec > {
5255 @ Override
@@ -62,8 +65,8 @@ protected PhysicalPlan rule(AggregateExec aggregateExec) {
6265 && evalExec .child () instanceof EsQueryExec queryExec
6366 && queryExec .queryBuilderAndTags ().size () > 1 // Ensures there are query and tags to push down.
6467 ) {
65- var withFilter = foo (queryExec .queryBuilderAndTags ());
66- if (withFilter .isEmpty () || withFilter .stream ().anyMatch (PushCountQueryAndTagsToSource ::isInvalid ) ) {
68+ var withFilter = tryMerge (queryExec .queryBuilderAndTags ());
69+ if (withFilter .isEmpty () || withFilter .stream ().allMatch (PushCountQueryAndTagsToSource ::shouldPush ) == false ) {
6770 return aggregateExec ;
6871 }
6972 EsStatsQueryExec statsQueryExec = new EsStatsQueryExec (
@@ -82,121 +85,124 @@ protected PhysicalPlan rule(AggregateExec aggregateExec) {
8285 return aggregateExec ;
8386 }
8487
85- private static boolean isInvalid (EsQueryExec .QueryBuilderAndTags queryBuilderAndTags ) {
88+ /** We only push down single and simple queries, since otherwise we risk overloading Lucene with a complex query. */
89+ private static boolean shouldPush (EsQueryExec .QueryBuilderAndTags queryBuilderAndTags ) {
8690 return switch (queryBuilderAndTags .query ()) {
87- case SingleValueQuery .Builder unused -> false ;
88- case RangeQueryBuilder unused -> false ;
89- case TermQueryBuilder unused -> false ;
90- case ExistsQueryBuilder unused -> false ;
91- case BoolQueryBuilder bq -> bq .filter ().size () + bq .must ().size () + bq .should ().size () + bq .mustNot ().size () > 1 ;
92- default -> true ;
91+ case SingleValueQuery .Builder unused -> true ;
92+ case RangeQueryBuilder unused -> true ;
93+ case TermQueryBuilder unused -> true ;
94+ case ExistsQueryBuilder unused -> true ;
95+ case BoolQueryBuilder bq -> bq .filter ().size () + bq .must ().size () + bq .should ().size () + bq .mustNot ().size () <= 1 ;
96+ default -> false ;
9397 };
9498 }
9599
96- private List <EsQueryExec .QueryBuilderAndTags > foo (List <EsQueryExec .QueryBuilderAndTags > queryBuilderAndTags ) {
97- return queryBuilderAndTags .stream ()
98- .map (PushCountQueryAndTagsToSource ::foo )
99- .filter (e -> e != null )
100- .flatMap (e -> normalizeRange (e ).stream ())
101- .toList ();
100+ private static List <EsQueryExec .QueryBuilderAndTags > tryMerge (List <EsQueryExec .QueryBuilderAndTags > queryBuilderAndTags ) {
101+ return queryBuilderAndTags .stream ().flatMap (e -> tryMergeBoolQuery (e ).stream ()).flatMap (e -> trySimplifyRange (e ).stream ()).toList ();
102102 }
103103
104- // FIXME(gal, NOCOMMIT) document
105- private static Optional <EsQueryExec .QueryBuilderAndTags > normalizeRange (EsQueryExec .QueryBuilderAndTags qbt ) {
106- return switch (qbt .query ()) {
107- case RangeQueryBuilder rqb when rqb .from () != null && rqb .to () != null -> {
108- @ SuppressWarnings ("unchecked" )
109- Comparable <Object > from = (Comparable <Object >) rqb .from ();
110- int comparison = from .compareTo (rqb .to ());
111- if (comparison > 0 ) {
112- yield Optional .empty ();
113- } else if (comparison == 0 ) {
114- yield rqb .includeLower () && rqb .includeUpper ()
115- ? Optional .of (new EsQueryExec .QueryBuilderAndTags (new TermQueryBuilder (rqb .fieldName (), rqb .from ()), qbt .tags ()))
116- : Optional .empty ();
117- } else {
118- yield Optional .of (qbt );
119- }
104+ /**
105+ * If the query specifies an impossible range (e.g., from > to), returns {@link Optional#empty()}; if from == to and both ends are
106+ * inclusive, returns a {@link TermQueryBuilder}; otherwise returns the original range.
107+ */
108+ private static Optional <EsQueryExec .QueryBuilderAndTags > trySimplifyRange (EsQueryExec .QueryBuilderAndTags qbt ) {
109+ if (qbt .query () instanceof RangeQueryBuilder rqb && rqb .from () != null && rqb .to () != null ) {
110+ int comparison = compare (rqb .from (), rqb .to ());
111+ if (comparison > 0 ) {
112+ // from > to, can remove the query entry.
113+ return Optional .empty ();
114+ } else if (comparison == 0 ) {
115+ // from == to should be kept only if both ends are inclusive, and then we can replace the range with a term query.
116+ return rqb .includeLower () && rqb .includeUpper ()
117+ ? Optional .of (qbt .withQuery (new TermQueryBuilder (rqb .fieldName (), rqb .from ())))
118+ : Optional .empty ();
120119 }
121- default -> Optional .of (qbt );
122- };
120+ return Optional .of (qbt ); // valid range, keep as is.
121+
122+ }
123+ return Optional .of (qbt );
123124 }
124125
125- private static @ Nullable EsQueryExec .QueryBuilderAndTags foo (EsQueryExec .QueryBuilderAndTags qbt ) {
126- return switch (qbt .query ()) {
127- case BoolQueryBuilder bqb when bqb .filter ().size () == 2 -> {
128- System .out .println (bqb );
129- if (qbt .query () instanceof BoolQueryBuilder bqb1
130- && bqb1 .filter ().get (1 ) instanceof BoolQueryBuilder bqb2
131- && bqb2 .mustNot ().size () == 1
132- && bqb2 .mustNot ().get (0 ) instanceof ExistsQueryBuilder ) {
133- // FIXME(gal, NOCOMMIT) Explain why we don't need the must not null part here (since the WHERE already handles this)
134- yield null ;
135- }
136- var range1 = extractRangeQuery (bqb .filter ().get (0 ));
137- var range2 = extractRangeQuery (bqb .filter ().get (1 ));
138- if (range1 == null || range2 == null ) {
139- yield qbt ;
140- }
141- QueryBuilder merge = merge (range1 , range2 );
142- yield merge == null ? qbt : new EsQueryExec .QueryBuilderAndTags (merge , qbt .tags ());
126+ /**
127+ * Attempts to merge a {@link BoolQueryBuilder} filters. Returns {@link Optional#empty()} if the query can be dropped. If the filters
128+ * cannot be merged, returns the original query.
129+ */
130+ private static Optional <EsQueryExec .QueryBuilderAndTags > tryMergeBoolQuery (EsQueryExec .QueryBuilderAndTags qbt ) {
131+ if (qbt .query () instanceof BoolQueryBuilder bqb && bqb .filter ().size () == 2 ) {
132+ QueryBuilder filter1 = bqb .filter ().get (0 );
133+ var range1 = tryExtractSingleRangeQuery (filter1 );
134+ QueryBuilder filter2 = bqb .filter ().get (1 );
135+ if (range1 != null
136+ && filter2 instanceof BoolQueryBuilder internalQuery
137+ && internalQuery .mustNot ().size () == 1
138+ && internalQuery .mustNot ().get (0 ) instanceof ExistsQueryBuilder ) {
139+ // Simple "must not exist" cases can be dropped since they never match anything when combined with a range.
140+ return Optional .empty ();
143141 }
144- default -> qbt ;
145- };
142+ var range2 = tryExtractSingleRangeQuery (filter2 );
143+ return Optional .of (mergeRanges (qbt , range1 , range2 ));
144+ }
145+ return Optional .of (qbt );
146+ }
147+
148+ private static EsQueryExec .QueryBuilderAndTags mergeRanges (
149+ EsQueryExec .QueryBuilderAndTags original ,
150+ RangeQueryBuilder range1 ,
151+ RangeQueryBuilder range2
152+ ) {
153+ return range1 == null || range2 == null ? original : merge (range1 , range2 ).map (original ::withQuery ).orElse (original );
146154 }
147155
148- private static @ Nullable QueryBuilder merge (RangeQueryBuilder range1 , RangeQueryBuilder range2 ) {
156+ /** Returns {@link Optional#empty()} if the queries cannot be merged. */
157+ private static Optional <QueryBuilder > merge (RangeQueryBuilder range1 , RangeQueryBuilder range2 ) {
149158 if (range1 .fieldName ().equals (range2 .fieldName ()) == false ) {
150- return null ;
159+ return Optional . empty () ;
151160 }
152- if (Objects .equals (normalizeTimezone (range1 .timeZone ()), normalizeTimezone (range2 .timeZone ())) == false ) {
153- throw new IllegalArgumentException (
154- "Cannot merge range queries with different time zones: " + range1 .timeZone () + " and " + range2 .timeZone ()
155- );
161+ if (Objects .equals (nonDefaultTimezone (range1 .timeZone ()), nonDefaultTimezone (range2 .timeZone ())) == false ) {
162+ return Optional .empty ();
156163 }
157164
158165 RangeQueryBuilder merged = new RangeQueryBuilder (range1 .fieldName ());
159- setTighterBound (merged , range1 .from (), range2 .from (), range1 .includeLower (), range2 .includeLower (), BoundType .LOWER );
160- setTighterBound (merged , range1 .to (), range2 .to (), range1 .includeUpper (), range2 .includeUpper (), BoundType .UPPER );
166+ setTighterBound (merged , range1 .from (), range2 .from (), range1 .includeLower (), range2 .includeLower (), BoundType .FROM );
167+ setTighterBound (merged , range1 .to (), range2 .to (), range1 .includeUpper (), range2 .includeUpper (), BoundType .TO );
161168
162169 String timeZone = range1 .timeZone ();
163170 if (timeZone != null ) {
164171 merged .timeZone (timeZone );
165172 }
166173
167174 if (range1 .format () != null && range2 .format () != null && range1 .format ().equals (range2 .format ()) == false ) {
168- throw new IllegalArgumentException (
169- "Cannot merge range queries with different formats: " + range1 .format () + " and " + range2 .format ()
170- );
175+ return Optional .empty ();
171176 }
172177 var format = range1 .format () != null ? range1 .format () : range2 .format ();
173178 if (format != null ) {
174179 merged .format (format );
175180 }
176181
177182 merged .boost (Math .max (range1 .boost (), range2 .boost ()));
178- return merged ;
183+ return Optional . of ( merged ) ;
179184 }
180185
181- // FIXME(gal, NOCOMMIT) hack
182- private static Object normalizeTimezone (String s ) {
186+ private static String nonDefaultTimezone (String s ) {
183187 return s == null || s == "Z" ? null : s ;
184188 }
185189
186- private static @ Nullable RangeQueryBuilder extractRangeQuery (QueryBuilder queryBuilder ) {
187- return switch (queryBuilder ) {
190+ /** Returns {@code null} if no single range query could be extracted. */
191+ private static @ Nullable RangeQueryBuilder tryExtractSingleRangeQuery (QueryBuilder qb ) {
192+ return switch (qb ) {
188193 case RangeQueryBuilder rqb -> rqb ;
189- case SingleValueQuery .Builder single -> extractRangeQuery (single .next ());
190- case BoolQueryBuilder bqb when bqb .filter ().size () == 1 -> extractRangeQuery (bqb .filter ().getFirst ());
194+ case SingleValueQuery .Builder single -> tryExtractSingleRangeQuery (single .next ());
195+ case BoolQueryBuilder bqb when bqb .filter ().size () == 1 -> tryExtractSingleRangeQuery (bqb .filter ().getFirst ());
191196 default -> null ;
192197 };
193198 }
194199
195200 enum BoundType {
196- LOWER ,
197- UPPER
201+ FROM ,
202+ TO
198203 }
199204
205+ // Given two bounds, sets the tighter one on the range.
200206 private static void setTighterBound (
201207 RangeQueryBuilder range ,
202208 Object bound1 ,
@@ -216,10 +222,10 @@ private static void setTighterBound(
216222 }
217223
218224 @ SuppressWarnings ("unchecked" )
219- int compare = (( Comparable < Object >) bound1 ). compareTo ( bound2 );
225+ int compare = compare ( bound1 , bound2 );
220226 boolean useFirst = switch (boundType ) {
221- case LOWER -> compare > 0 ;
222- case UPPER -> compare < 0 ;
227+ case FROM -> compare > 0 ;
228+ case TO -> compare < 0 ;
223229 };
224230 Object value = useFirst ? bound1 : bound2 ;
225231 boolean include = useFirst ? include1 : include2 ;
@@ -230,10 +236,15 @@ private static void setTighterBound(
230236 setRange (range , value , include , boundType );
231237 }
232238
239+ @ SuppressWarnings ("unchecked" )
240+ private static int compare (Object o1 , Object o2 ) {
241+ return ((Comparable <Object >) o1 ).compareTo (o2 );
242+ }
243+
233244 private static void setRange (RangeQueryBuilder range , Object val , boolean include , BoundType boundType ) {
234245 switch (boundType ) {
235- case LOWER -> range .from (val , include );
236- case UPPER -> range .to (val , include );
246+ case FROM -> range .from (val , include );
247+ case TO -> range .to (val , include );
237248 }
238249 }
239250
0 commit comments