@@ -532,6 +532,240 @@ fn test_filter_with_projection() {
532532 ) ;
533533}
534534
535+ #[ test]
536+ fn test_filter_collapse_outer_fetch_preserved ( ) {
537+ // When the outer filter has fetch and inner does not, the merged filter should preserve fetch
538+ let scan = TestScanBuilder :: new ( schema ( ) ) . with_support ( false ) . build ( ) ;
539+ let predicate1 = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
540+ let filter1 = Arc :: new ( FilterExec :: try_new ( predicate1, scan) . unwrap ( ) ) ;
541+ let predicate2 = col_lit_predicate ( "b" , "bar" , & schema ( ) ) ;
542+ let plan = Arc :: new (
543+ FilterExecBuilder :: new ( predicate2, filter1)
544+ . with_fetch ( Some ( 10 ) )
545+ . build ( )
546+ . unwrap ( ) ,
547+ ) ;
548+
549+ insta:: assert_snapshot!(
550+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
551+ @r"
552+ OptimizationTest:
553+ input:
554+ - FilterExec: b@1 = bar, fetch=10
555+ - FilterExec: a@0 = foo
556+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
557+ output:
558+ Ok:
559+ - FilterExec: b@1 = bar AND a@0 = foo, fetch=10
560+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
561+ "
562+ ) ;
563+ }
564+
565+ #[ test]
566+ fn test_filter_collapse_inner_fetch_preserved ( ) {
567+ // When the inner filter has fetch and outer does not, the merged filter should preserve fetch
568+ let scan = TestScanBuilder :: new ( schema ( ) ) . with_support ( false ) . build ( ) ;
569+ let predicate1 = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
570+ let filter1 = Arc :: new (
571+ FilterExecBuilder :: new ( predicate1, scan)
572+ . with_fetch ( Some ( 5 ) )
573+ . build ( )
574+ . unwrap ( ) ,
575+ ) ;
576+ let predicate2 = col_lit_predicate ( "b" , "bar" , & schema ( ) ) ;
577+ let plan = Arc :: new ( FilterExec :: try_new ( predicate2, filter1) . unwrap ( ) ) ;
578+
579+ insta:: assert_snapshot!(
580+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
581+ @r"
582+ OptimizationTest:
583+ input:
584+ - FilterExec: b@1 = bar
585+ - FilterExec: a@0 = foo, fetch=5
586+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
587+ output:
588+ Ok:
589+ - FilterExec: b@1 = bar AND a@0 = foo, fetch=5
590+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
591+ "
592+ ) ;
593+ }
594+
595+ #[ test]
596+ fn test_filter_collapse_both_fetch_uses_minimum ( ) {
597+ // When both filters have fetch, the merged filter should use the smaller (tighter) fetch.
598+ // Inner fetch=5 is tighter than outer fetch=10, so the result should be fetch=5.
599+ let scan = TestScanBuilder :: new ( schema ( ) ) . with_support ( false ) . build ( ) ;
600+ let predicate1 = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
601+ let filter1 = Arc :: new (
602+ FilterExecBuilder :: new ( predicate1, scan)
603+ . with_fetch ( Some ( 5 ) )
604+ . build ( )
605+ . unwrap ( ) ,
606+ ) ;
607+ let predicate2 = col_lit_predicate ( "b" , "bar" , & schema ( ) ) ;
608+ let plan = Arc :: new (
609+ FilterExecBuilder :: new ( predicate2, filter1)
610+ . with_fetch ( Some ( 10 ) )
611+ . build ( )
612+ . unwrap ( ) ,
613+ ) ;
614+
615+ insta:: assert_snapshot!(
616+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
617+ @r"
618+ OptimizationTest:
619+ input:
620+ - FilterExec: b@1 = bar, fetch=10
621+ - FilterExec: a@0 = foo, fetch=5
622+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
623+ output:
624+ Ok:
625+ - FilterExec: b@1 = bar AND a@0 = foo, fetch=5
626+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
627+ "
628+ ) ;
629+ }
630+
631+ #[ test]
632+ fn test_filter_with_fetch_fully_pushed_to_scan ( ) {
633+ // When a FilterExec has a fetch limit and all predicates are pushed down
634+ // to a supportive DataSourceExec, the FilterExec is removed and the fetch
635+ // must be propagated to the DataSourceExec.
636+ let scan = TestScanBuilder :: new ( schema ( ) ) . with_support ( true ) . build ( ) ;
637+ let predicate = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
638+ let plan = Arc :: new (
639+ FilterExecBuilder :: new ( predicate, scan)
640+ . with_fetch ( Some ( 10 ) )
641+ . build ( )
642+ . unwrap ( ) ,
643+ ) ;
644+
645+ insta:: assert_snapshot!(
646+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
647+ @r"
648+ OptimizationTest:
649+ input:
650+ - FilterExec: a@0 = foo, fetch=10
651+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
652+ output:
653+ Ok:
654+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], limit=10, file_type=test, pushdown_supported=true, predicate=a@0 = foo
655+ "
656+ ) ;
657+ }
658+
659+ #[ test]
660+ fn test_filter_with_fetch_and_projection_fully_pushed_to_scan ( ) {
661+ // When a FilterExec has both fetch and projection, and all predicates are
662+ // pushed down, the filter is replaced by a ProjectionExec and the fetch
663+ // must still be propagated to the DataSourceExec.
664+ let scan = TestScanBuilder :: new ( schema ( ) ) . with_support ( true ) . build ( ) ;
665+ let projection = vec ! [ 1 , 0 ] ;
666+ let predicate = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
667+ let plan = Arc :: new (
668+ FilterExecBuilder :: new ( predicate, scan)
669+ . with_fetch ( Some ( 5 ) )
670+ . apply_projection ( Some ( projection) )
671+ . unwrap ( )
672+ . build ( )
673+ . unwrap ( ) ,
674+ ) ;
675+
676+ insta:: assert_snapshot!(
677+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
678+ @r"
679+ OptimizationTest:
680+ input:
681+ - FilterExec: a@0 = foo, projection=[b@1, a@0], fetch=5
682+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
683+ output:
684+ Ok:
685+ - ProjectionExec: expr=[b@1 as b, a@0 as a]
686+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], limit=5, file_type=test, pushdown_supported=true, predicate=a@0 = foo
687+ "
688+ ) ;
689+ }
690+
691+ #[ test]
692+ fn test_filter_with_fetch_partially_pushed_to_scan ( ) {
693+ // When a FilterExec has fetch and only some predicates are pushed down,
694+ // the FilterExec remains with the unpushed predicate and keeps its fetch.
695+ let scan = TestScanBuilder :: new ( schema ( ) ) . with_support ( true ) . build ( ) ;
696+ let pushed_predicate = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
697+ let volatile_predicate = {
698+ let cfg = Arc :: new ( ConfigOptions :: default ( ) ) ;
699+ Arc :: new ( BinaryExpr :: new (
700+ Arc :: new ( Column :: new_with_schema ( "a" , & schema ( ) ) . unwrap ( ) ) ,
701+ Operator :: Eq ,
702+ Arc :: new (
703+ ScalarFunctionExpr :: try_new (
704+ Arc :: new ( ScalarUDF :: from ( RandomFunc :: new ( ) ) ) ,
705+ vec ! [ ] ,
706+ & schema ( ) ,
707+ cfg,
708+ )
709+ . unwrap ( ) ,
710+ ) ,
711+ ) ) as Arc < dyn PhysicalExpr >
712+ } ;
713+ // Combine: a = 'foo' AND a = random()
714+ let combined = Arc :: new ( BinaryExpr :: new (
715+ pushed_predicate,
716+ Operator :: And ,
717+ volatile_predicate,
718+ ) ) as Arc < dyn PhysicalExpr > ;
719+ let plan = Arc :: new (
720+ FilterExecBuilder :: new ( combined, scan)
721+ . with_fetch ( Some ( 7 ) )
722+ . build ( )
723+ . unwrap ( ) ,
724+ ) ;
725+
726+ insta:: assert_snapshot!(
727+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
728+ @r"
729+ OptimizationTest:
730+ input:
731+ - FilterExec: a@0 = foo AND a@0 = random(), fetch=7
732+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
733+ output:
734+ Ok:
735+ - FilterExec: a@0 = random(), fetch=7
736+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
737+ "
738+ ) ;
739+ }
740+
741+ #[ test]
742+ fn test_filter_with_fetch_not_pushed_to_unsupportive_scan ( ) {
743+ // When the DataSourceExec does not support pushdown, the FilterExec
744+ // remains unchanged with its fetch intact.
745+ let scan = TestScanBuilder :: new ( schema ( ) ) . with_support ( false ) . build ( ) ;
746+ let predicate = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
747+ let plan = Arc :: new (
748+ FilterExecBuilder :: new ( predicate, scan)
749+ . with_fetch ( Some ( 3 ) )
750+ . build ( )
751+ . unwrap ( ) ,
752+ ) ;
753+
754+ insta:: assert_snapshot!(
755+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
756+ @r"
757+ OptimizationTest:
758+ input:
759+ - FilterExec: a@0 = foo, fetch=3
760+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
761+ output:
762+ Ok:
763+ - FilterExec: a@0 = foo, fetch=3
764+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
765+ "
766+ ) ;
767+ }
768+
535769#[ test]
536770fn test_push_down_through_transparent_nodes ( ) {
537771 // expect the predicate to be pushed down into the DataSource
@@ -2087,6 +2321,116 @@ fn test_filter_pushdown_through_union_does_not_support() {
20872321 ) ;
20882322}
20892323
2324+ #[ test]
2325+ fn test_filter_with_fetch_fully_pushed_through_union ( ) {
2326+ // When a FilterExec with fetch wraps a UnionExec and all predicates are
2327+ // pushed down, UnionExec does not support with_fetch, so a LocalLimitExec
2328+ // should be inserted to preserve the fetch limit.
2329+ let scan1 = TestScanBuilder :: new ( schema ( ) ) . with_support ( true ) . build ( ) ;
2330+ let scan2 = TestScanBuilder :: new ( schema ( ) ) . with_support ( true ) . build ( ) ;
2331+ let union = UnionExec :: try_new ( vec ! [ scan1, scan2] ) . unwrap ( ) ;
2332+ let predicate = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
2333+ let plan = Arc :: new (
2334+ FilterExecBuilder :: new ( predicate, union)
2335+ . with_fetch ( Some ( 10 ) )
2336+ . build ( )
2337+ . unwrap ( ) ,
2338+ ) ;
2339+
2340+ insta:: assert_snapshot!(
2341+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
2342+ @"
2343+ OptimizationTest:
2344+ input:
2345+ - FilterExec: a@0 = foo, fetch=10
2346+ - UnionExec
2347+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2348+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2349+ output:
2350+ Ok:
2351+ - LocalLimitExec: fetch=10
2352+ - UnionExec
2353+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2354+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2355+ "
2356+ ) ;
2357+ }
2358+
2359+ #[ test]
2360+ fn test_filter_with_fetch_and_projection_fully_pushed_through_union ( ) {
2361+ // When a FilterExec with both fetch and projection wraps a UnionExec and
2362+ // all predicates are pushed down, we should get a ProjectionExec on top of
2363+ // a LocalLimitExec wrapping the UnionExec.
2364+ let scan1 = TestScanBuilder :: new ( schema ( ) ) . with_support ( true ) . build ( ) ;
2365+ let scan2 = TestScanBuilder :: new ( schema ( ) ) . with_support ( true ) . build ( ) ;
2366+ let union = UnionExec :: try_new ( vec ! [ scan1, scan2] ) . unwrap ( ) ;
2367+ let projection = vec ! [ 1 , 0 ] ;
2368+ let predicate = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
2369+ let plan = Arc :: new (
2370+ FilterExecBuilder :: new ( predicate, union)
2371+ . with_fetch ( Some ( 5 ) )
2372+ . apply_projection ( Some ( projection) )
2373+ . unwrap ( )
2374+ . build ( )
2375+ . unwrap ( ) ,
2376+ ) ;
2377+
2378+ insta:: assert_snapshot!(
2379+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
2380+ @"
2381+ OptimizationTest:
2382+ input:
2383+ - FilterExec: a@0 = foo, projection=[b@1, a@0], fetch=5
2384+ - UnionExec
2385+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2386+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
2387+ output:
2388+ Ok:
2389+ - ProjectionExec: expr=[b@1 as b, a@0 as a]
2390+ - LocalLimitExec: fetch=5
2391+ - UnionExec
2392+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2393+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
2394+ "
2395+ ) ;
2396+ }
2397+
2398+ #[ test]
2399+ fn test_filter_with_fetch_not_fully_pushed_through_union ( ) {
2400+ // When a FilterExec with fetch wraps a UnionExec but children don't support
2401+ // pushdown, the FilterExec remains with its fetch — no LocalLimitExec needed.
2402+ let scan1 = TestScanBuilder :: new ( schema ( ) ) . with_support ( false ) . build ( ) ;
2403+ let scan2 = TestScanBuilder :: new ( schema ( ) ) . with_support ( false ) . build ( ) ;
2404+ let union = UnionExec :: try_new ( vec ! [ scan1, scan2] ) . unwrap ( ) ;
2405+ let predicate = col_lit_predicate ( "a" , "foo" , & schema ( ) ) ;
2406+ let plan = Arc :: new (
2407+ FilterExecBuilder :: new ( predicate, union)
2408+ . with_fetch ( Some ( 8 ) )
2409+ . build ( )
2410+ . unwrap ( ) ,
2411+ ) ;
2412+
2413+ insta:: assert_snapshot!(
2414+ OptimizationTest :: new( plan, FilterPushdown :: new( ) , true ) ,
2415+ @"
2416+ OptimizationTest:
2417+ input:
2418+ - FilterExec: a@0 = foo, fetch=8
2419+ - UnionExec
2420+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
2421+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
2422+ output:
2423+ Ok:
2424+ - LocalLimitExec: fetch=8
2425+ - UnionExec
2426+ - FilterExec: a@0 = foo
2427+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
2428+ - FilterExec: a@0 = foo
2429+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=false
2430+ "
2431+ ) ;
2432+ }
2433+
20902434/// Schema:
20912435/// a: String
20922436/// b: String
0 commit comments