Skip to content

Commit 80a8102

Browse files
authored
ESQL: fix COUNT filter pushdown (#117503) (#117654)
* ESQL: fix COUNT filter pushdown (#117503) If `COUNT` agg has a filter applied, this must also be push down to source. This currently does not happen, but this issue is masked currently by two factors: * a logical optimisation, `ExtractAggregateCommonFilter` that extracts the filter out of the STATS entirely (and pushes it to source then from a `WHERE`); * the phisical plan optimisation implementing the push down, `PushStatsToSource`, currently only applies if there's just one agg function to push down. However, this fix needs to be applied since: * it's still present in versions prior to `ExtractAggregateCommonFilter` introduction; * the defect might resurface when the restriction in `PushStatsToSource` is lifted. Fixes #115522. (cherry picked from commit 560e0c5) * 8.17 adaptation
1 parent 7ed32c2 commit 80a8102

File tree

5 files changed

+140
-4
lines changed

5 files changed

+140
-4
lines changed

docs/changelog/117503.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 117503
2+
summary: Fix COUNT filter pushdown
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 115522

x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2678,6 +2678,57 @@ c2:l |c2_f:l |m2:i |m2_f:i |c:l
26782678
1 |1 |5 |5 |21
26792679
;
26802680

2681+
simpleCountOnFieldWithFilteringAndNoGrouping
2682+
required_capability: per_agg_filtering
2683+
from employees
2684+
| stats c1 = count(emp_no) where emp_no < 10042
2685+
;
2686+
2687+
c1:long
2688+
41
2689+
;
2690+
2691+
simpleCountOnFieldWithFilteringOnDifferentFieldAndNoGrouping
2692+
required_capability: per_agg_filtering
2693+
from employees
2694+
| stats c1 = count(hire_date) where emp_no < 10042
2695+
;
2696+
2697+
c1:long
2698+
41
2699+
;
2700+
2701+
simpleCountOnStarWithFilteringAndNoGrouping
2702+
required_capability: per_agg_filtering
2703+
from employees
2704+
| stats c1 = count(*) where emp_no < 10042
2705+
;
2706+
2707+
c1:long
2708+
41
2709+
;
2710+
2711+
simpleCountWithFilteringAndNoGroupingOnFieldWithNulls
2712+
required_capability: per_agg_filtering
2713+
from employees
2714+
| stats c1 = count(birth_date) where emp_no <= 10050
2715+
;
2716+
2717+
c1:long
2718+
40
2719+
;
2720+
2721+
2722+
simpleCountWithFilteringAndNoGroupingOnFieldWithMultivalues
2723+
required_capability: per_agg_filtering
2724+
from employees
2725+
| stats c1 = count(job_positions) where emp_no <= 10003
2726+
;
2727+
2728+
c1:long
2729+
3
2730+
;
2731+
26812732
commonFilterExtractionWithAliasing
26822733
required_capability: per_agg_filtering
26832734
from employees

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.xpack.esql.core.expression.Expression;
1717
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1818
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
19+
import org.elasticsearch.xpack.esql.core.util.Queries;
1920
import org.elasticsearch.xpack.esql.core.util.StringUtils;
2021
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
2122
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
@@ -25,12 +26,15 @@
2526
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
2627
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2728
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
29+
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
2830

2931
import java.util.ArrayList;
3032
import java.util.List;
3133

34+
import static java.util.Arrays.asList;
3235
import static java.util.Collections.emptyList;
3336
import static java.util.Collections.singletonList;
37+
import static org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource.canPushToSource;
3438
import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType.COUNT;
3539

3640
/**
@@ -98,6 +102,13 @@ private Tuple<List<Attribute>, List<EsStatsQueryExec.Stat>> pushableStats(
98102
}
99103
}
100104
if (fieldName != null) {
105+
if (count.hasFilter()) {
106+
if (canPushToSource(count.filter()) == false) {
107+
return null; // can't push down
108+
}
109+
var countFilter = PlannerUtils.TRANSLATOR_HANDLER.asQuery(count.filter());
110+
query = Queries.combine(Queries.Clause.MUST, asList(countFilter.asBuilder(), query));
111+
}
101112
return new EsStatsQueryExec.Stat(fieldName, COUNT, query);
102113
}
103114
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
4141
import org.elasticsearch.xpack.esql.index.EsIndex;
4242
import org.elasticsearch.xpack.esql.index.IndexResolution;
43+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ExtractAggregateCommonFilter;
4344
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
45+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
4446
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
4547
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
4648
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
@@ -57,16 +59,19 @@
5759
import org.elasticsearch.xpack.esql.planner.FilterTests;
5860
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
5961
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
62+
import org.elasticsearch.xpack.esql.rule.Rule;
6063
import org.elasticsearch.xpack.esql.session.Configuration;
6164
import org.elasticsearch.xpack.esql.stats.Metrics;
6265
import org.elasticsearch.xpack.esql.stats.SearchContextStats;
6366
import org.elasticsearch.xpack.esql.stats.SearchStats;
6467
import org.junit.Before;
6568

6669
import java.io.IOException;
70+
import java.util.ArrayList;
6771
import java.util.List;
6872
import java.util.Locale;
6973
import java.util.Map;
74+
import java.util.function.Function;
7075

7176
import static java.util.Arrays.asList;
7277
import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
@@ -373,6 +378,67 @@ public void testMultiCountAllWithFilter() {
373378
assertThat(plan.anyMatch(EsQueryExec.class::isInstance), is(true));
374379
}
375380

381+
@SuppressWarnings("unchecked")
382+
public void testSingleCountWithStatsFilter() {
383+
// an optimizer that filters out the ExtractAggregateCommonFilter rule
384+
var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config)) {
385+
@Override
386+
protected List<Batch<LogicalPlan>> batches() {
387+
var oldBatches = super.batches();
388+
List<Batch<LogicalPlan>> newBatches = new ArrayList<>(oldBatches.size());
389+
for (var batch : oldBatches) {
390+
List<Rule<?, LogicalPlan>> rules = new ArrayList<>(List.of(batch.rules()));
391+
rules.removeIf(r -> r instanceof ExtractAggregateCommonFilter);
392+
newBatches.add(batch.with(rules.toArray(Rule[]::new)));
393+
}
394+
return newBatches;
395+
}
396+
};
397+
var analyzer = makeAnalyzer("mapping-default.json", new EnrichResolution());
398+
var plannerOptimizer = new TestPlannerOptimizer(config, analyzer, logicalOptimizer);
399+
var plan = plannerOptimizer.plan("""
400+
from test
401+
| stats c = count(hire_date) where emp_no < 10042
402+
""", IS_SV_STATS);
403+
404+
var limit = as(plan, LimitExec.class);
405+
var agg = as(limit.child(), AggregateExec.class);
406+
assertThat(agg.getMode(), is(FINAL));
407+
var exchange = as(agg.child(), ExchangeExec.class);
408+
var esStatsQuery = as(exchange.child(), EsStatsQueryExec.class);
409+
410+
Function<String, String> compact = s -> s.replaceAll("\\s+", "");
411+
assertThat(compact.apply(esStatsQuery.query().toString()), is(compact.apply("""
412+
{
413+
"bool": {
414+
"must": [
415+
{
416+
"exists": {
417+
"field": "hire_date",
418+
"boost": 1.0
419+
}
420+
},
421+
{
422+
"esql_single_value": {
423+
"field": "emp_no",
424+
"next": {
425+
"range": {
426+
"emp_no": {
427+
"lt": 10042,
428+
"boost": 1.0
429+
}
430+
}
431+
},
432+
"source": "emp_no < 10042@2:36"
433+
}
434+
}
435+
],
436+
"boost": 1.0
437+
}
438+
}
439+
""")));
440+
}
441+
376442
/**
377443
* Expecting
378444
* LimitExec[1000[INTEGER]]

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1111
import org.elasticsearch.xpack.esql.analysis.Analyzer;
12-
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
1312
import org.elasticsearch.xpack.esql.parser.EsqlParser;
1413
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
1514
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -23,19 +22,22 @@ public class TestPlannerOptimizer {
2322
private final Analyzer analyzer;
2423
private final LogicalPlanOptimizer logicalOptimizer;
2524
private final PhysicalPlanOptimizer physicalPlanOptimizer;
26-
private final EsqlFunctionRegistry functionRegistry;
2725
private final Mapper mapper;
2826
private final Configuration config;
2927

3028
public TestPlannerOptimizer(Configuration config, Analyzer analyzer) {
29+
this(config, analyzer, new LogicalPlanOptimizer(new LogicalOptimizerContext(config)));
30+
}
31+
32+
public TestPlannerOptimizer(Configuration config, Analyzer analyzer, LogicalPlanOptimizer logicalOptimizer) {
3133
this.analyzer = analyzer;
3234
this.config = config;
35+
this.logicalOptimizer = logicalOptimizer;
3336

3437
parser = new EsqlParser();
35-
logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config));
3638
physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(config));
37-
functionRegistry = new EsqlFunctionRegistry();
3839
mapper = new Mapper();
40+
3941
}
4042

4143
public PhysicalPlan plan(String query) {

0 commit comments

Comments
 (0)