1818use std:: sync:: Arc ;
1919
2020use crate :: physical_optimizer:: test_utils:: {
21- coalesce_partitions_exec, global_limit_exec, hash_join_exec , local_limit_exec ,
22- sort_exec , sort_preserving_merge_exec, stream_exec,
21+ coalesce_partitions_exec, global_limit_exec, local_limit_exec , sort_exec ,
22+ sort_preserving_merge_exec, stream_exec,
2323} ;
2424
2525use arrow:: compute:: SortOptions ;
@@ -29,7 +29,6 @@ use datafusion_common::error::Result;
2929use datafusion_expr:: { JoinType , Operator } ;
3030use datafusion_physical_expr:: Partitioning ;
3131use datafusion_physical_expr:: expressions:: { BinaryExpr , col, lit} ;
32- use datafusion_physical_expr_common:: physical_expr:: PhysicalExprRef ;
3332use datafusion_physical_expr_common:: sort_expr:: { LexOrdering , PhysicalSortExpr } ;
3433use datafusion_physical_optimizer:: PhysicalOptimizerRule ;
3534use datafusion_physical_optimizer:: limit_pushdown:: LimitPushdown ;
@@ -162,168 +161,6 @@ fn transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_li
162161 Ok ( ( ) )
163162}
164163
165- fn join_on_columns (
166- left_col : & str ,
167- right_col : & str ,
168- ) -> Vec < ( PhysicalExprRef , PhysicalExprRef ) > {
169- vec ! [ (
170- Arc :: new( datafusion_physical_expr:: expressions:: Column :: new(
171- left_col, 0 ,
172- ) ) as _,
173- Arc :: new( datafusion_physical_expr:: expressions:: Column :: new(
174- right_col, 0 ,
175- ) ) as _,
176- ) ]
177- }
178-
179- #[ test]
180- fn absorbs_limit_into_hash_join_inner ( ) -> Result < ( ) > {
181- // HashJoinExec with Inner join should absorb limit via with_fetch
182- let schema = create_schema ( ) ;
183- let left = empty_exec ( Arc :: clone ( & schema) ) ;
184- let right = empty_exec ( Arc :: clone ( & schema) ) ;
185- let on = join_on_columns ( "c1" , "c1" ) ;
186- let hash_join = hash_join_exec ( left, right, on, None , & JoinType :: Inner ) ?;
187- let global_limit = global_limit_exec ( hash_join, 0 , Some ( 5 ) ) ;
188-
189- let initial = format_plan ( & global_limit) ;
190- insta:: assert_snapshot!(
191- initial,
192- @r"
193- GlobalLimitExec: skip=0, fetch=5
194- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
195- EmptyExec
196- EmptyExec
197- "
198- ) ;
199-
200- let after_optimize =
201- LimitPushdown :: new ( ) . optimize ( global_limit, & ConfigOptions :: new ( ) ) ?;
202- let optimized = format_plan ( & after_optimize) ;
203- // The limit should be absorbed by the hash join (not pushed to children)
204- insta:: assert_snapshot!(
205- optimized,
206- @r"
207- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=5
208- EmptyExec
209- EmptyExec
210- "
211- ) ;
212-
213- Ok ( ( ) )
214- }
215-
216- #[ test]
217- fn absorbs_limit_into_hash_join_right ( ) -> Result < ( ) > {
218- // HashJoinExec with Right join should absorb limit via with_fetch
219- let schema = create_schema ( ) ;
220- let left = empty_exec ( Arc :: clone ( & schema) ) ;
221- let right = empty_exec ( Arc :: clone ( & schema) ) ;
222- let on = join_on_columns ( "c1" , "c1" ) ;
223- let hash_join = hash_join_exec ( left, right, on, None , & JoinType :: Right ) ?;
224- let global_limit = global_limit_exec ( hash_join, 0 , Some ( 10 ) ) ;
225-
226- let initial = format_plan ( & global_limit) ;
227- insta:: assert_snapshot!(
228- initial,
229- @r"
230- GlobalLimitExec: skip=0, fetch=10
231- HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)]
232- EmptyExec
233- EmptyExec
234- "
235- ) ;
236-
237- let after_optimize =
238- LimitPushdown :: new ( ) . optimize ( global_limit, & ConfigOptions :: new ( ) ) ?;
239- let optimized = format_plan ( & after_optimize) ;
240- // The limit should be absorbed by the hash join
241- insta:: assert_snapshot!(
242- optimized,
243- @r"
244- HashJoinExec: mode=Partitioned, join_type=Right, on=[(c1@0, c1@0)], fetch=10
245- EmptyExec
246- EmptyExec
247- "
248- ) ;
249-
250- Ok ( ( ) )
251- }
252-
253- #[ test]
254- fn absorbs_limit_into_hash_join_left ( ) -> Result < ( ) > {
255- // during probing, then unmatched rows at the end, stopping when limit is reached
256- let schema = create_schema ( ) ;
257- let left = empty_exec ( Arc :: clone ( & schema) ) ;
258- let right = empty_exec ( Arc :: clone ( & schema) ) ;
259- let on = join_on_columns ( "c1" , "c1" ) ;
260- let hash_join = hash_join_exec ( left, right, on, None , & JoinType :: Left ) ?;
261- let global_limit = global_limit_exec ( hash_join, 0 , Some ( 5 ) ) ;
262-
263- let initial = format_plan ( & global_limit) ;
264- insta:: assert_snapshot!(
265- initial,
266- @r"
267- GlobalLimitExec: skip=0, fetch=5
268- HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)]
269- EmptyExec
270- EmptyExec
271- "
272- ) ;
273-
274- let after_optimize =
275- LimitPushdown :: new ( ) . optimize ( global_limit, & ConfigOptions :: new ( ) ) ?;
276- let optimized = format_plan ( & after_optimize) ;
277- // Left join now absorbs the limit
278- insta:: assert_snapshot!(
279- optimized,
280- @r"
281- HashJoinExec: mode=Partitioned, join_type=Left, on=[(c1@0, c1@0)], fetch=5
282- EmptyExec
283- EmptyExec
284- "
285- ) ;
286-
287- Ok ( ( ) )
288- }
289-
290- #[ test]
291- fn absorbs_limit_with_skip_into_hash_join ( ) -> Result < ( ) > {
292- let schema = create_schema ( ) ;
293- let left = empty_exec ( Arc :: clone ( & schema) ) ;
294- let right = empty_exec ( Arc :: clone ( & schema) ) ;
295- let on = join_on_columns ( "c1" , "c1" ) ;
296- let hash_join = hash_join_exec ( left, right, on, None , & JoinType :: Inner ) ?;
297- let global_limit = global_limit_exec ( hash_join, 3 , Some ( 5 ) ) ;
298-
299- let initial = format_plan ( & global_limit) ;
300- insta:: assert_snapshot!(
301- initial,
302- @r"
303- GlobalLimitExec: skip=3, fetch=5
304- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)]
305- EmptyExec
306- EmptyExec
307- "
308- ) ;
309-
310- let after_optimize =
311- LimitPushdown :: new ( ) . optimize ( global_limit, & ConfigOptions :: new ( ) ) ?;
312- let optimized = format_plan ( & after_optimize) ;
313- // With skip, GlobalLimit is kept but fetch (skip + limit = 8) is absorbed by the join
314- insta:: assert_snapshot!(
315- optimized,
316- @r"
317- GlobalLimitExec: skip=3, fetch=5
318- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c1@0, c1@0)], fetch=8
319- EmptyExec
320- EmptyExec
321- "
322- ) ;
323-
324- Ok ( ( ) )
325- }
326-
327164#[ test]
328165fn pushes_global_limit_exec_through_projection_exec ( ) -> Result < ( ) > {
329166 let schema = create_schema ( ) ;
0 commit comments