Skip to content

Commit 8f721a6

Browse files
authored
keep fetch when merge FilterExec in FilterPushdown (apache#21070)
## Which issue does this PR close? - Closes apache#21069 ## Rationale for this change - see apache#21069 ## What changes are included in this PR? When `FilterPushdown` merges/eliminates a `FilterExec`, it now correctly propagates the `fetch` limit that was on the FilterExec. Previously, if a `FilterExec` had a fetch (e.g., LIMIT 10) and all its predicates were fully pushed down to a under `FilterExec`, the FilterExec node was dropped — and the fetch was silently lost, potentially returning more rows than requested. The fix: - If the FilterExec being eliminated has a fetch, propagate it to the child node via `with_fetch()` - If both the outer and inner nodes have a fetch, use min(outer, inner) to preserve the tighter constraint ## Are these changes tested? yes, add some test cases, trying to reproduce use sql, current not able ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 78d5ac6 commit 8f721a6

File tree

2 files changed

+361
-1
lines changed

2 files changed

+361
-1
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown.rs

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,240 @@ fn test_filter_with_projection() {
532532
);
533533
}
534534

535+
#[test]
536+
fn test_filter_collapse_outer_fetch_preserved() {
537+
// When the outer filter has fetch and inner does not, the merged filter should preserve fetch
538+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
539+
let predicate1 = col_lit_predicate("a", "foo", &schema());
540+
let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap());
541+
let predicate2 = col_lit_predicate("b", "bar", &schema());
542+
let plan = Arc::new(
543+
FilterExecBuilder::new(predicate2, filter1)
544+
.with_fetch(Some(10))
545+
.build()
546+
.unwrap(),
547+
);
548+
549+
insta::assert_snapshot!(
550+
OptimizationTest::new(plan, FilterPushdown::new(), true),
551+
@r"
552+
OptimizationTest:
553+
input:
554+
- FilterExec: b@1 = bar, fetch=10
555+
- FilterExec: a@0 = foo
556+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
557+
output:
558+
Ok:
559+
- FilterExec: b@1 = bar AND a@0 = foo, fetch=10
560+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
561+
"
562+
);
563+
}
564+
565+
#[test]
566+
fn test_filter_collapse_inner_fetch_preserved() {
567+
// When the inner filter has fetch and outer does not, the merged filter should preserve fetch
568+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
569+
let predicate1 = col_lit_predicate("a", "foo", &schema());
570+
let filter1 = Arc::new(
571+
FilterExecBuilder::new(predicate1, scan)
572+
.with_fetch(Some(5))
573+
.build()
574+
.unwrap(),
575+
);
576+
let predicate2 = col_lit_predicate("b", "bar", &schema());
577+
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
578+
579+
insta::assert_snapshot!(
580+
OptimizationTest::new(plan, FilterPushdown::new(), true),
581+
@r"
582+
OptimizationTest:
583+
input:
584+
- FilterExec: b@1 = bar
585+
- FilterExec: a@0 = foo, fetch=5
586+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
587+
output:
588+
Ok:
589+
- FilterExec: b@1 = bar AND a@0 = foo, fetch=5
590+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
591+
"
592+
);
593+
}
594+
595+
#[test]
596+
fn test_filter_collapse_both_fetch_uses_minimum() {
597+
// When both filters have fetch, the merged filter should use the smaller (tighter) fetch.
598+
// Inner fetch=5 is tighter than outer fetch=10, so the result should be fetch=5.
599+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
600+
let predicate1 = col_lit_predicate("a", "foo", &schema());
601+
let filter1 = Arc::new(
602+
FilterExecBuilder::new(predicate1, scan)
603+
.with_fetch(Some(5))
604+
.build()
605+
.unwrap(),
606+
);
607+
let predicate2 = col_lit_predicate("b", "bar", &schema());
608+
let plan = Arc::new(
609+
FilterExecBuilder::new(predicate2, filter1)
610+
.with_fetch(Some(10))
611+
.build()
612+
.unwrap(),
613+
);
614+
615+
insta::assert_snapshot!(
616+
OptimizationTest::new(plan, FilterPushdown::new(), true),
617+
@r"
618+
OptimizationTest:
619+
input:
620+
- FilterExec: b@1 = bar, fetch=10
621+
- FilterExec: a@0 = foo, fetch=5
622+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
623+
output:
624+
Ok:
625+
- FilterExec: b@1 = bar AND a@0 = foo, fetch=5
626+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
627+
"
628+
);
629+
}
630+
631+
#[test]
632+
fn test_filter_with_fetch_fully_pushed_to_scan() {
633+
// When a FilterExec has a fetch limit and all predicates are pushed down
634+
// to a supportive DataSourceExec, the FilterExec is removed and the fetch
635+
// must be propagated to the DataSourceExec.
636+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
637+
let predicate = col_lit_predicate("a", "foo", &schema());
638+
let plan = Arc::new(
639+
FilterExecBuilder::new(predicate, scan)
640+
.with_fetch(Some(10))
641+
.build()
642+
.unwrap(),
643+
);
644+
645+
insta::assert_snapshot!(
646+
OptimizationTest::new(plan, FilterPushdown::new(), true),
647+
@r"
648+
OptimizationTest:
649+
input:
650+
- FilterExec: a@0 = foo, fetch=10
651+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
652+
output:
653+
Ok:
654+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], limit=10, file_type=test, pushdown_supported=true, predicate=a@0 = foo
655+
"
656+
);
657+
}
658+
659+
#[test]
660+
fn test_filter_with_fetch_and_projection_fully_pushed_to_scan() {
661+
// When a FilterExec has both fetch and projection, and all predicates are
662+
// pushed down, the filter is replaced by a ProjectionExec and the fetch
663+
// must still be propagated to the DataSourceExec.
664+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
665+
let projection = vec![1, 0];
666+
let predicate = col_lit_predicate("a", "foo", &schema());
667+
let plan = Arc::new(
668+
FilterExecBuilder::new(predicate, scan)
669+
.with_fetch(Some(5))
670+
.apply_projection(Some(projection))
671+
.unwrap()
672+
.build()
673+
.unwrap(),
674+
);
675+
676+
insta::assert_snapshot!(
677+
OptimizationTest::new(plan, FilterPushdown::new(), true),
678+
@r"
679+
OptimizationTest:
680+
input:
681+
- FilterExec: a@0 = foo, projection=[b@1, a@0], fetch=5
682+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
683+
output:
684+
Ok:
685+
- ProjectionExec: expr=[b@1 as b, a@0 as a]
686+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], limit=5, file_type=test, pushdown_supported=true, predicate=a@0 = foo
687+
"
688+
);
689+
}
690+
691+
#[test]
692+
fn test_filter_with_fetch_partially_pushed_to_scan() {
693+
// When a FilterExec has fetch and only some predicates are pushed down,
694+
// the FilterExec remains with the unpushed predicate and keeps its fetch.
695+
let scan = TestScanBuilder::new(schema()).with_support(true).build();
696+
let pushed_predicate = col_lit_predicate("a", "foo", &schema());
697+
let volatile_predicate = {
698+
let cfg = Arc::new(ConfigOptions::default());
699+
Arc::new(BinaryExpr::new(
700+
Arc::new(Column::new_with_schema("a", &schema()).unwrap()),
701+
Operator::Eq,
702+
Arc::new(
703+
ScalarFunctionExpr::try_new(
704+
Arc::new(ScalarUDF::from(RandomFunc::new())),
705+
vec![],
706+
&schema(),
707+
cfg,
708+
)
709+
.unwrap(),
710+
),
711+
)) as Arc<dyn PhysicalExpr>
712+
};
713+
// Combine: a = 'foo' AND a = random()
714+
let combined = Arc::new(BinaryExpr::new(
715+
pushed_predicate,
716+
Operator::And,
717+
volatile_predicate,
718+
)) as Arc<dyn PhysicalExpr>;
719+
let plan = Arc::new(
720+
FilterExecBuilder::new(combined, scan)
721+
.with_fetch(Some(7))
722+
.build()
723+
.unwrap(),
724+
);
725+
726+
insta::assert_snapshot!(
727+
OptimizationTest::new(plan, FilterPushdown::new(), true),
728+
@r"
729+
OptimizationTest:
730+
input:
731+
- FilterExec: a@0 = foo AND a@0 = random(), fetch=7
732+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
733+
output:
734+
Ok:
735+
- FilterExec: a@0 = random(), fetch=7
736+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
737+
"
738+
);
739+
}
740+
741+
#[test]
742+
fn test_filter_with_fetch_not_pushed_to_unsupportive_scan() {
743+
// When the DataSourceExec does not support pushdown, the FilterExec
744+
// remains unchanged with its fetch intact.
745+
let scan = TestScanBuilder::new(schema()).with_support(false).build();
746+
let predicate = col_lit_predicate("a", "foo", &schema());
747+
let plan = Arc::new(
748+
FilterExecBuilder::new(predicate, scan)
749+
.with_fetch(Some(3))
750+
.build()
751+
.unwrap(),
752+
);
753+
754+
insta::assert_snapshot!(
755+
OptimizationTest::new(plan, FilterPushdown::new(), true),
756+
@r"
757+
OptimizationTest:
758+
input:
759+
- FilterExec: a@0 = foo, fetch=3
760+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
761+
output:
762+
Ok:
763+
- FilterExec: a@0 = foo, fetch=3
764+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
765+
"
766+
);
767+
}
768+
535769
#[test]
536770
fn test_push_down_through_transparent_nodes() {
537771
// expect the predicate to be pushed down into the DataSource
@@ -2087,6 +2321,116 @@ fn test_filter_pushdown_through_union_does_not_support() {
20872321
);
20882322
}
20892323

2324+
#[test]
2325+
fn test_filter_with_fetch_fully_pushed_through_union() {
2326+
// When a FilterExec with fetch wraps a UnionExec and all predicates are
2327+
// pushed down, UnionExec does not support with_fetch, so a LocalLimitExec
2328+
// should be inserted to preserve the fetch limit.
2329+
let scan1 = TestScanBuilder::new(schema()).with_support(true).build();
2330+
let scan2 = TestScanBuilder::new(schema()).with_support(true).build();
2331+
let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
2332+
let predicate = col_lit_predicate("a", "foo", &schema());
2333+
let plan = Arc::new(
2334+
FilterExecBuilder::new(predicate, union)
2335+
.with_fetch(Some(10))
2336+
.build()
2337+
.unwrap(),
2338+
);
2339+
2340+
insta::assert_snapshot!(
2341+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2342+
@"
2343+
OptimizationTest:
2344+
input:
2345+
- FilterExec: a@0 = foo, fetch=10
2346+
- UnionExec
2347+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2348+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2349+
output:
2350+
Ok:
2351+
- LocalLimitExec: fetch=10
2352+
- UnionExec
2353+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2354+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2355+
"
2356+
);
2357+
}
2358+
2359+
#[test]
2360+
fn test_filter_with_fetch_and_projection_fully_pushed_through_union() {
2361+
// When a FilterExec with both fetch and projection wraps a UnionExec and
2362+
// all predicates are pushed down, we should get a ProjectionExec on top of
2363+
// a LocalLimitExec wrapping the UnionExec.
2364+
let scan1 = TestScanBuilder::new(schema()).with_support(true).build();
2365+
let scan2 = TestScanBuilder::new(schema()).with_support(true).build();
2366+
let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
2367+
let projection = vec![1, 0];
2368+
let predicate = col_lit_predicate("a", "foo", &schema());
2369+
let plan = Arc::new(
2370+
FilterExecBuilder::new(predicate, union)
2371+
.with_fetch(Some(5))
2372+
.apply_projection(Some(projection))
2373+
.unwrap()
2374+
.build()
2375+
.unwrap(),
2376+
);
2377+
2378+
insta::assert_snapshot!(
2379+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2380+
@"
2381+
OptimizationTest:
2382+
input:
2383+
- FilterExec: a@0 = foo, projection=[b@1, a@0], fetch=5
2384+
- UnionExec
2385+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2386+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2387+
output:
2388+
Ok:
2389+
- ProjectionExec: expr=[b@1 as b, a@0 as a]
2390+
- LocalLimitExec: fetch=5
2391+
- UnionExec
2392+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2393+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2394+
"
2395+
);
2396+
}
2397+
2398+
#[test]
2399+
fn test_filter_with_fetch_not_fully_pushed_through_union() {
2400+
// When a FilterExec with fetch wraps a UnionExec but children don't support
2401+
// pushdown, the FilterExec remains with its fetch — no LocalLimitExec needed.
2402+
let scan1 = TestScanBuilder::new(schema()).with_support(false).build();
2403+
let scan2 = TestScanBuilder::new(schema()).with_support(false).build();
2404+
let union = UnionExec::try_new(vec![scan1, scan2]).unwrap();
2405+
let predicate = col_lit_predicate("a", "foo", &schema());
2406+
let plan = Arc::new(
2407+
FilterExecBuilder::new(predicate, union)
2408+
.with_fetch(Some(8))
2409+
.build()
2410+
.unwrap(),
2411+
);
2412+
2413+
insta::assert_snapshot!(
2414+
OptimizationTest::new(plan, FilterPushdown::new(), true),
2415+
@"
2416+
OptimizationTest:
2417+
input:
2418+
- FilterExec: a@0 = foo, fetch=8
2419+
- UnionExec
2420+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
2421+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
2422+
output:
2423+
Ok:
2424+
- LocalLimitExec: fetch=8
2425+
- UnionExec
2426+
- FilterExec: a@0 = foo
2427+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
2428+
- FilterExec: a@0 = foo
2429+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
2430+
"
2431+
);
2432+
}
2433+
20902434
/// Schema:
20912435
/// a: String
20922436
/// b: String

datafusion/physical-plan/src/filter.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use crate::filter_pushdown::{
3535
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
3636
FilterPushdownPropagation, PushedDown,
3737
};
38+
use crate::limit::LocalLimitExec;
3839
use crate::metrics::{MetricBuilder, MetricType};
3940
use crate::projection::{
4041
EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
@@ -695,7 +696,22 @@ impl ExecutionPlan for FilterExec {
695696
let filter_input = Arc::clone(self.input());
696697
let new_predicate = conjunction(unhandled_filters);
697698
let updated_node = if new_predicate.eq(&lit(true)) {
698-
// FilterExec is no longer needed, but we may need to leave a projection in place
699+
// FilterExec is no longer needed, but we may need to leave a projection in place.
700+
// If this FilterExec had a fetch limit, propagate it to the child.
701+
// When the child also has a fetch, use the minimum of both to preserve
702+
// the tighter constraint.
703+
let filter_input = if let Some(outer_fetch) = self.fetch {
704+
let effective_fetch = match filter_input.fetch() {
705+
Some(inner_fetch) => outer_fetch.min(inner_fetch),
706+
None => outer_fetch,
707+
};
708+
match filter_input.with_fetch(Some(effective_fetch)) {
709+
Some(node) => node,
710+
None => Arc::new(LocalLimitExec::new(filter_input, effective_fetch)),
711+
}
712+
} else {
713+
filter_input
714+
};
699715
match self.projection().as_ref() {
700716
Some(projection_indices) => {
701717
let filter_child_schema = filter_input.schema();

0 commit comments

Comments
 (0)