Skip to content

Commit 560e0c5

Browse files
authored
ESQL: fix COUNT filter pushdown (#117503)
If `COUNT` agg has a filter applied, this must also be push down to source. This currently does not happen, but this issue is masked currently by two factors: * a logical optimisation, `ExtractAggregateCommonFilter` that extracts the filter out of the STATS entirely (and pushes it to source then from a `WHERE`); * the phisical plan optimisation implementing the push down, `PushStatsToSource`, currently only applies if there's just one agg function to push down. However, this fix needs to be applied since: * it's still present in versions prior to `ExtractAggregateCommonFilter` introduction; * the defect might resurface when the restriction in `PushStatsToSource` is lifted. Fixes #115522.
1 parent 2ed318f commit 560e0c5

File tree

5 files changed

+120
-4
lines changed

5 files changed

+120
-4
lines changed

docs/changelog/117503.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 117503
2+
summary: Fix COUNT filter pushdown
3+
area: ES|QL
4+
type: bug
5+
issues:
6+
- 115522

x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats.csv-spec

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2688,6 +2688,16 @@ c1:long
26882688
41
26892689
;
26902690

2691+
simpleCountOnFieldWithFilteringOnDifferentFieldAndNoGrouping
2692+
required_capability: per_agg_filtering
2693+
from employees
2694+
| stats c1 = count(hire_date) where emp_no < 10042
2695+
;
2696+
2697+
c1:long
2698+
41
2699+
;
2700+
26912701
simpleCountOnStarWithFilteringAndNoGrouping
26922702
required_capability: per_agg_filtering
26932703
from employees
@@ -2698,6 +2708,27 @@ c1:long
26982708
41
26992709
;
27002710

2711+
simpleCountWithFilteringAndNoGroupingOnFieldWithNulls
2712+
required_capability: per_agg_filtering
2713+
from employees
2714+
| stats c1 = count(birth_date) where emp_no <= 10050
2715+
;
2716+
2717+
c1:long
2718+
40
2719+
;
2720+
2721+
2722+
simpleCountWithFilteringAndNoGroupingOnFieldWithMultivalues
2723+
required_capability: per_agg_filtering
2724+
from employees
2725+
| stats c1 = count(job_positions) where emp_no <= 10003
2726+
;
2727+
2728+
c1:long
2729+
3
2730+
;
2731+
27012732
commonFilterExtractionWithAliasing
27022733
required_capability: per_agg_filtering
27032734
from employees

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushStatsToSource.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.xpack.esql.core.expression.Expression;
1717
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
1818
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
19+
import org.elasticsearch.xpack.esql.core.util.Queries;
1920
import org.elasticsearch.xpack.esql.core.util.StringUtils;
2021
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
2122
import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext;
@@ -25,12 +26,15 @@
2526
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
2627
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2728
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
29+
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
2830

2931
import java.util.ArrayList;
3032
import java.util.List;
3133

34+
import static java.util.Arrays.asList;
3235
import static java.util.Collections.emptyList;
3336
import static java.util.Collections.singletonList;
37+
import static org.elasticsearch.xpack.esql.optimizer.rules.physical.local.PushFiltersToSource.canPushToSource;
3438
import static org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec.StatsType.COUNT;
3539

3640
/**
@@ -98,6 +102,13 @@ private Tuple<List<Attribute>, List<EsStatsQueryExec.Stat>> pushableStats(
98102
}
99103
}
100104
if (fieldName != null) {
105+
if (count.hasFilter()) {
106+
if (canPushToSource(count.filter()) == false) {
107+
return null; // can't push down
108+
}
109+
var countFilter = PlannerUtils.TRANSLATOR_HANDLER.asQuery(count.filter());
110+
query = Queries.combine(Queries.Clause.MUST, asList(countFilter.asBuilder(), query));
111+
}
101112
return new EsStatsQueryExec.Stat(fieldName, COUNT, query);
102113
}
103114
}

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@
4242
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
4343
import org.elasticsearch.xpack.esql.index.EsIndex;
4444
import org.elasticsearch.xpack.esql.index.IndexResolution;
45+
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ExtractAggregateCommonFilter;
4546
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
47+
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
4648
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
4749
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
4850
import org.elasticsearch.xpack.esql.plan.physical.EsStatsQueryExec;
@@ -59,6 +61,7 @@
5961
import org.elasticsearch.xpack.esql.planner.FilterTests;
6062
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
6163
import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
64+
import org.elasticsearch.xpack.esql.rule.Rule;
6265
import org.elasticsearch.xpack.esql.session.Configuration;
6366
import org.elasticsearch.xpack.esql.stats.Metrics;
6467
import org.elasticsearch.xpack.esql.stats.SearchContextStats;
@@ -67,9 +70,11 @@
6770
import org.junit.Before;
6871

6972
import java.io.IOException;
73+
import java.util.ArrayList;
7074
import java.util.List;
7175
import java.util.Locale;
7276
import java.util.Map;
77+
import java.util.function.Function;
7378

7479
import static java.util.Arrays.asList;
7580
import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL;
@@ -380,6 +385,67 @@ public void testMultiCountAllWithFilter() {
380385
assertThat(plan.anyMatch(EsQueryExec.class::isInstance), is(true));
381386
}
382387

388+
@SuppressWarnings("unchecked")
389+
public void testSingleCountWithStatsFilter() {
390+
// an optimizer that filters out the ExtractAggregateCommonFilter rule
391+
var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config)) {
392+
@Override
393+
protected List<Batch<LogicalPlan>> batches() {
394+
var oldBatches = super.batches();
395+
List<Batch<LogicalPlan>> newBatches = new ArrayList<>(oldBatches.size());
396+
for (var batch : oldBatches) {
397+
List<Rule<?, LogicalPlan>> rules = new ArrayList<>(List.of(batch.rules()));
398+
rules.removeIf(r -> r instanceof ExtractAggregateCommonFilter);
399+
newBatches.add(batch.with(rules.toArray(Rule[]::new)));
400+
}
401+
return newBatches;
402+
}
403+
};
404+
var analyzer = makeAnalyzer("mapping-default.json");
405+
var plannerOptimizer = new TestPlannerOptimizer(config, analyzer, logicalOptimizer);
406+
var plan = plannerOptimizer.plan("""
407+
from test
408+
| stats c = count(hire_date) where emp_no < 10042
409+
""", IS_SV_STATS);
410+
411+
var limit = as(plan, LimitExec.class);
412+
var agg = as(limit.child(), AggregateExec.class);
413+
assertThat(agg.getMode(), is(FINAL));
414+
var exchange = as(agg.child(), ExchangeExec.class);
415+
var esStatsQuery = as(exchange.child(), EsStatsQueryExec.class);
416+
417+
Function<String, String> compact = s -> s.replaceAll("\\s+", "");
418+
assertThat(compact.apply(esStatsQuery.query().toString()), is(compact.apply("""
419+
{
420+
"bool": {
421+
"must": [
422+
{
423+
"exists": {
424+
"field": "hire_date",
425+
"boost": 1.0
426+
}
427+
},
428+
{
429+
"esql_single_value": {
430+
"field": "emp_no",
431+
"next": {
432+
"range": {
433+
"emp_no": {
434+
"lt": 10042,
435+
"boost": 1.0
436+
}
437+
}
438+
},
439+
"source": "emp_no < 10042@2:36"
440+
}
441+
}
442+
],
443+
"boost": 1.0
444+
}
445+
}
446+
""")));
447+
}
448+
383449
/**
384450
* Expecting
385451
* LimitExec[1000[INTEGER]]

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/TestPlannerOptimizer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1111
import org.elasticsearch.xpack.esql.analysis.Analyzer;
12-
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
1312
import org.elasticsearch.xpack.esql.parser.EsqlParser;
1413
import org.elasticsearch.xpack.esql.plan.physical.EstimatesRowSize;
1514
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
@@ -23,19 +22,22 @@ public class TestPlannerOptimizer {
2322
private final Analyzer analyzer;
2423
private final LogicalPlanOptimizer logicalOptimizer;
2524
private final PhysicalPlanOptimizer physicalPlanOptimizer;
26-
private final EsqlFunctionRegistry functionRegistry;
2725
private final Mapper mapper;
2826
private final Configuration config;
2927

3028
public TestPlannerOptimizer(Configuration config, Analyzer analyzer) {
29+
this(config, analyzer, new LogicalPlanOptimizer(new LogicalOptimizerContext(config)));
30+
}
31+
32+
public TestPlannerOptimizer(Configuration config, Analyzer analyzer, LogicalPlanOptimizer logicalOptimizer) {
3133
this.analyzer = analyzer;
3234
this.config = config;
35+
this.logicalOptimizer = logicalOptimizer;
3336

3437
parser = new EsqlParser();
35-
logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(config));
3638
physicalPlanOptimizer = new PhysicalPlanOptimizer(new PhysicalOptimizerContext(config));
37-
functionRegistry = new EsqlFunctionRegistry();
3839
mapper = new Mapper();
40+
3941
}
4042

4143
public PhysicalPlan plan(String query) {

0 commit comments

Comments
 (0)