@@ -38,7 +38,7 @@ use datafusion_expr_common::operator::Operator::Gt;
3838use datafusion_physical_expr:: expressions:: { col, BinaryExpr , Column , Literal } ;
3939use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
4040use datafusion_physical_expr_common:: sort_expr:: PhysicalSortExpr ;
41- use datafusion_physical_optimizer:: insert_yield_exec :: InsertYieldExec ;
41+ use datafusion_physical_optimizer:: ensure_coop :: EnsureCooperative ;
4242use datafusion_physical_optimizer:: PhysicalOptimizerRule ;
4343use datafusion_physical_plan:: coalesce_batches:: CoalesceBatchesExec ;
4444use datafusion_physical_plan:: filter:: FilterExec ;
@@ -146,7 +146,7 @@ async fn test_infinite_agg_cancel(
146146
147147 // 3) optimize the plan with InsertYieldExec to auto-insert Yield
148148 let config = ConfigOptions :: new ( ) ;
149- let optimized = InsertYieldExec :: new ( ) . optimize ( aggr, & config) ?;
149+ let optimized = EnsureCooperative :: new ( ) . optimize ( aggr, & config) ?;
150150
151151 // 4) get the stream
152152 let mut stream = physical_plan:: execute_stream ( optimized, session_ctx. task_ctx ( ) ) ?;
@@ -199,7 +199,7 @@ async fn test_infinite_sort_cancel(
199199
200200 // 4) optimize the plan with InsertYieldExec to auto-insert Yield
201201 let config = ConfigOptions :: new ( ) ;
202- let optimized = InsertYieldExec :: new ( ) . optimize ( sort_exec, & config) ?;
202+ let optimized = EnsureCooperative :: new ( ) . optimize ( sort_exec, & config) ?;
203203
204204 // 5) get the stream
205205 let mut stream = physical_plan:: execute_stream ( optimized, session_ctx. task_ctx ( ) ) ?;
@@ -284,7 +284,7 @@ async fn test_infinite_interleave_cancel(
284284 // 6) Apply InsertYieldExec to insert YieldStreamExec under every leaf.
285285 // Each InfiniteExec → FilterExec → CoalesceBatchesExec chain will yield periodically.
286286 let config = ConfigOptions :: new ( ) ;
287- let optimized = InsertYieldExec :: new ( ) . optimize ( coalesced_top, & config) ?;
287+ let optimized = EnsureCooperative :: new ( ) . optimize ( coalesced_top, & config) ?;
288288
289289 // 7) Execute the optimized plan with a 1-second timeout.
290290 // Because the top-level FilterExec always discards rows and the inputs are infinite,
@@ -385,7 +385,7 @@ async fn test_infinite_interleave_agg_cancel(
385385 // That way, each InfiniteExec (through the FilterExec/CoalesceBatchesExec/RepartitionExec chain)
386386 // yields to the runtime periodically instead of spinning CPU.
387387 let config = ConfigOptions :: new ( ) ;
388- let optimized = InsertYieldExec :: new ( ) . optimize ( aggr, & config) ?;
388+ let optimized = EnsureCooperative :: new ( ) . optimize ( aggr, & config) ?;
389389
390390 // 6) Execute the stream. Because AggregateExec(mode=Single) only emits a final batch
391391 // after all inputs finish—and those inputs are infinite—we expect no output
@@ -472,7 +472,7 @@ async fn test_infinite_join_cancel(
472472
473473 // 3) Wrap yields under each infinite leaf
474474 let config = ConfigOptions :: new ( ) ;
475- let optimized = InsertYieldExec :: new ( ) . optimize ( join, & config) ?;
475+ let optimized = EnsureCooperative :: new ( ) . optimize ( join, & config) ?;
476476
477477 // 4) Execute + 1 sec timeout
478478 let mut stream = physical_plan:: execute_stream ( optimized, session_ctx. task_ctx ( ) ) ?;
@@ -590,7 +590,7 @@ async fn test_infinite_join_agg_cancel(
590590
591591 // 5) Wrap yields under each infinite leaf
592592 let config = ConfigOptions :: new ( ) ;
593- let optimized = InsertYieldExec :: new ( ) . optimize ( aggr, & config) ?;
593+ let optimized = EnsureCooperative :: new ( ) . optimize ( aggr, & config) ?;
594594
595595 // 6) Execute + 1 sec timeout
596596 let mut stream = physical_plan:: execute_stream ( optimized, session_ctx. task_ctx ( ) ) ?;
@@ -644,7 +644,7 @@ async fn test_filter_reject_all_batches_cancel(
644644
645645 // 3) InsertYieldExec to insert YieldExec—so that the InfiniteExec yields control between batches
646646 let config = ConfigOptions :: new ( ) ;
647- let optimized = InsertYieldExec :: new ( ) . optimize ( coalesced, & config) ?;
647+ let optimized = EnsureCooperative :: new ( ) . optimize ( coalesced, & config) ?;
648648
649649 // 4) Execute with a 1-second timeout. Because Filter discards all 8192 rows each time
650650 // without ever producing output, no batch will arrive within 1 second. And since
@@ -723,7 +723,7 @@ async fn test_infinite_hash_join_without_repartition_and_no_agg(
723723 // because there is no aggregation so no wrapper is inserted. Here we simply do
724724 // not call InsertYieldExec, ensuring the plan has neither aggregation nor repartition.
725725 let config = ConfigOptions :: new ( ) ;
726- let optimized = InsertYieldExec :: new ( ) . optimize ( join, & config) ?;
726+ let optimized = EnsureCooperative :: new ( ) . optimize ( join, & config) ?;
727727
728728 // 4) Execute with a 1 second timeout
729729 let mut stream = physical_plan:: execute_stream ( optimized, session_ctx. task_ctx ( ) ) ?;
@@ -801,7 +801,7 @@ async fn test_infinite_sort_merge_join_without_repartition_and_no_agg(
801801
802802 // 3) Do not apply InsertYieldExec (no aggregation, no repartition → no built-in yields).
803803 let config = ConfigOptions :: new ( ) ;
804- let optimized = InsertYieldExec :: new ( ) . optimize ( join, & config) ?;
804+ let optimized = EnsureCooperative :: new ( ) . optimize ( join, & config) ?;
805805
806806 // 4) Execute with a 1-second timeout. Because both sides are infinite and never match,
807807 // the SortMergeJoin will never produce output within 1s.
0 commit comments