@@ -37,9 +37,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3737use datafusion_physical_expr_common:: metrics:: {
3838 ExecutionPlanMetricsSet , MetricBuilder , MetricsSet ,
3939} ;
40- use datafusion_physical_expr_common:: physical_expr:: {
41- PhysicalExpr , is_dynamic_physical_expr,
42- } ;
40+ use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
4341use datafusion_physical_expr_common:: sort_expr:: PhysicalSortExpr ;
4442use futures:: { Stream , StreamExt , TryStreamExt } ;
4543use pin_project_lite:: pin_project;
@@ -50,7 +48,7 @@ use std::sync::Arc;
5048use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
5149use std:: task:: { Context , Poll } ;
5250use tokio:: sync:: mpsc:: UnboundedReceiver ;
53- use tokio:: sync:: { Notify , OwnedSemaphorePermit , Semaphore } ;
51+ use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
5452
5553/// Decouples production and consumption of record batches with an internal queue per partition,
5654/// eagerly filling up the capacity of the queues even before any message is requested.
@@ -91,7 +89,6 @@ pub struct BufferExec {
9189 input : Arc < dyn ExecutionPlan > ,
9290 properties : PlanProperties ,
9391 capacity : usize ,
94- wait_first_poll : bool ,
9592 metrics : ExecutionPlanMetricsSet ,
9693}
9794
@@ -107,7 +104,6 @@ impl BufferExec {
107104 input,
108105 properties,
109106 capacity,
110- wait_first_poll : false ,
111107 metrics : ExecutionPlanMetricsSet :: new ( ) ,
112108 }
113109 }
@@ -207,12 +203,8 @@ impl ExecutionPlan for BufferExec {
207203 }
208204 } ) ;
209205 // Buffer the input.
210- let out_stream = MemoryBufferedStream :: new (
211- in_stream,
212- self . capacity ,
213- mem_reservation,
214- self . wait_first_poll ,
215- ) ;
206+ let out_stream =
207+ MemoryBufferedStream :: new ( in_stream, self . capacity , mem_reservation) ;
216208 // Update in the metrics that when an element gets out, some memory gets freed.
217209 let out_stream = out_stream. inspect_ok ( move |v| {
218210 curr_mem_out. fetch_sub ( v. get_array_memory_size ( ) , Ordering :: Relaxed ) ;
@@ -268,22 +260,7 @@ impl ExecutionPlan for BufferExec {
268260 child_pushdown_result : ChildPushdownResult ,
269261 _config : & ConfigOptions ,
270262 ) -> Result < FilterPushdownPropagation < Arc < dyn ExecutionPlan > > > {
271- // If there is a dynamic filter being pushed down through this node, we don't want to buffer,
272- // we prefer to give a chance to the dynamic filter to be populated with something rather
273- // than eagerly polling data with an empty dynamic filter.
274- let has_dynamic_filter = child_pushdown_result
275- . parent_filters
276- . iter ( )
277- . any ( |v| is_dynamic_physical_expr ( & v. filter ) ) ;
278- if has_dynamic_filter {
279- let mut new_self = self . clone ( ) ;
280- new_self. wait_first_poll = true ;
281- let mut result = FilterPushdownPropagation :: if_all ( child_pushdown_result) ;
282- result. updated_node = Some ( Arc :: new ( new_self) as _ ) ;
283- Ok ( result)
284- } else {
285- Ok ( FilterPushdownPropagation :: if_all ( child_pushdown_result) )
286- }
263+ Ok ( FilterPushdownPropagation :: if_all ( child_pushdown_result) )
287264 }
288265
289266 fn try_pushdown_sort (
@@ -319,7 +296,6 @@ pub struct MemoryBufferedStream<T: SizedMessage> {
319296 task: SpawnedTask <( ) >,
320297 batch_rx: UnboundedReceiver <Result <( T , OwnedSemaphorePermit ) >>,
321298 memory_reservation: Arc <MemoryReservation >,
322- first_poll_notify: Option <Arc <Notify >>,
323299} }
324300
325301impl < T : Send + SizedMessage + ' static > MemoryBufferedStream < T > {
@@ -330,23 +306,13 @@ impl<T: Send + SizedMessage + 'static> MemoryBufferedStream<T> {
330306 mut input : impl Stream < Item = Result < T > > + Unpin + Send + ' static ,
331307 capacity : usize ,
332308 memory_reservation : MemoryReservation ,
333- wait_first_pool : bool ,
334309 ) -> Self {
335310 let semaphore = Arc :: new ( Semaphore :: new ( capacity) ) ;
336311 let ( batch_tx, batch_rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
337312
338- let mut first_poll_notify = None ;
339- if wait_first_pool {
340- first_poll_notify = Some ( Arc :: new ( Notify :: new ( ) ) ) ;
341- }
342- let mut first_poll_notify_clone = first_poll_notify. clone ( ) ;
343-
344313 let memory_reservation = Arc :: new ( memory_reservation) ;
345314 let memory_reservation_clone = Arc :: clone ( & memory_reservation) ;
346315 let task = SpawnedTask :: spawn ( async move {
347- if let Some ( first_poll_notify) = first_poll_notify_clone. take ( ) {
348- first_poll_notify. notified_owned ( ) . await ;
349- }
350316 while let Some ( item_or_err) = input. next ( ) . await {
351317 let item = match item_or_err {
352318 Ok ( batch) => batch,
@@ -383,7 +349,6 @@ impl<T: Send + SizedMessage + 'static> MemoryBufferedStream<T> {
383349 task,
384350 batch_rx,
385351 memory_reservation : memory_reservation_clone,
386- first_poll_notify,
387352 }
388353 }
389354
@@ -398,9 +363,6 @@ impl<T: SizedMessage> Stream for MemoryBufferedStream<T> {
398363
399364 fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
400365 let self_project = self . project ( ) ;
401- if let Some ( first_poll_notify) = self_project. first_poll_notify . take ( ) {
402- first_poll_notify. notify_one ( ) ;
403- }
404366 match self_project. batch_rx . poll_recv ( cx) {
405367 Poll :: Ready ( Some ( Ok ( ( item, _semaphore_permit) ) ) ) => {
406368 self_project. memory_reservation . shrink ( item. size ( ) ) ;
@@ -440,29 +402,18 @@ mod tests {
440402 let input = futures:: stream:: iter ( [ 1 , 2 , 3 , 4 ] ) . map ( Ok ) ;
441403 let ( _, res) = memory_pool_and_reservation ( ) ;
442404
443- let buffered = MemoryBufferedStream :: new ( input, 4 , res, false ) ;
405+ let buffered = MemoryBufferedStream :: new ( input, 4 , res) ;
444406 wait_for_buffering ( ) . await ;
445407 assert_eq ! ( buffered. messages_queued( ) , 2 ) ;
446408 Ok ( ( ) )
447409 }
448410
449- #[ tokio:: test]
450- async fn respects_wait_for_first_poll ( ) -> Result < ( ) , Box < dyn Error > > {
451- let input = futures:: stream:: iter ( [ 1 , 2 , 3 , 4 ] ) . map ( Ok ) ;
452- let ( _, res) = memory_pool_and_reservation ( ) ;
453-
454- let buffered = MemoryBufferedStream :: new ( input, 4 , res, true ) ;
455- wait_for_buffering ( ) . await ;
456- assert_eq ! ( buffered. messages_queued( ) , 0 ) ;
457- Ok ( ( ) )
458- }
459-
460411 #[ tokio:: test]
461412 async fn yields_all_messages ( ) -> Result < ( ) , Box < dyn Error > > {
462413 let input = futures:: stream:: iter ( [ 1 , 2 , 3 , 4 ] ) . map ( Ok ) ;
463414 let ( _, res) = memory_pool_and_reservation ( ) ;
464415
465- let mut buffered = MemoryBufferedStream :: new ( input, 10 , res, false ) ;
416+ let mut buffered = MemoryBufferedStream :: new ( input, 10 , res) ;
466417 wait_for_buffering ( ) . await ;
467418 assert_eq ! ( buffered. messages_queued( ) , 4 ) ;
468419
@@ -474,29 +425,12 @@ mod tests {
474425 Ok ( ( ) )
475426 }
476427
477- #[ tokio:: test]
478- async fn yields_all_messages_after_first_poll ( ) -> Result < ( ) , Box < dyn Error > > {
479- let input = futures:: stream:: iter ( [ 1 , 2 , 3 , 4 ] ) . map ( Ok ) ;
480- let ( _, res) = memory_pool_and_reservation ( ) ;
481-
482- let mut buffered = MemoryBufferedStream :: new ( input, 10 , res, true ) ;
483- wait_for_buffering ( ) . await ;
484- assert_eq ! ( buffered. messages_queued( ) , 0 ) ;
485-
486- pull_ok_msg ( & mut buffered) . await ?;
487- pull_ok_msg ( & mut buffered) . await ?;
488- pull_ok_msg ( & mut buffered) . await ?;
489- pull_ok_msg ( & mut buffered) . await ?;
490- finished ( & mut buffered) . await ?;
491- Ok ( ( ) )
492- }
493-
494428 #[ tokio:: test]
495429 async fn yields_first_msg_even_if_big ( ) -> Result < ( ) , Box < dyn Error > > {
496430 let input = futures:: stream:: iter ( [ 25 , 1 , 2 , 3 ] ) . map ( Ok ) ;
497431 let ( _, res) = memory_pool_and_reservation ( ) ;
498432
499- let mut buffered = MemoryBufferedStream :: new ( input, 10 , res, false ) ;
433+ let mut buffered = MemoryBufferedStream :: new ( input, 10 , res) ;
500434 wait_for_buffering ( ) . await ;
501435 assert_eq ! ( buffered. messages_queued( ) , 1 ) ;
502436 pull_ok_msg ( & mut buffered) . await ?;
@@ -508,7 +442,7 @@ mod tests {
508442 let input = futures:: stream:: iter ( [ 1 , 2 , 3 , 4 ] ) . map ( Ok ) ;
509443 let ( _, res) = bounded_memory_pool_and_reservation ( 7 ) ;
510444
511- let mut buffered = MemoryBufferedStream :: new ( input, 10 , res, false ) ;
445+ let mut buffered = MemoryBufferedStream :: new ( input, 10 , res) ;
512446 wait_for_buffering ( ) . await ;
513447
514448 pull_ok_msg ( & mut buffered) . await ?;
@@ -525,7 +459,7 @@ mod tests {
525459 let input = futures:: stream:: iter ( [ 1 , 2 , 3 , 4 ] ) . map ( Ok ) ;
526460 let ( _, res) = bounded_memory_pool_and_reservation ( 7 ) ;
527461
528- let mut buffered = MemoryBufferedStream :: new ( input, 3 , res, false ) ;
462+ let mut buffered = MemoryBufferedStream :: new ( input, 3 , res) ;
529463 wait_for_buffering ( ) . await ;
530464 pull_ok_msg ( & mut buffered) . await ?;
531465
@@ -548,7 +482,7 @@ mod tests {
548482 let input = futures:: stream:: iter ( [ 3 , 3 , 3 , 3 ] ) . map ( Ok ) ;
549483 let ( _, res) = memory_pool_and_reservation ( ) ;
550484
551- let mut buffered = MemoryBufferedStream :: new ( input, 2 , res, false ) ;
485+ let mut buffered = MemoryBufferedStream :: new ( input, 2 , res) ;
552486 wait_for_buffering ( ) . await ;
553487 assert_eq ! ( buffered. messages_queued( ) , 1 ) ;
554488 pull_ok_msg ( & mut buffered) . await ?;
@@ -580,7 +514,7 @@ mod tests {
580514 } ) ;
581515 let ( _, res) = memory_pool_and_reservation ( ) ;
582516
583- let mut buffered = MemoryBufferedStream :: new ( input, 10 , res, false ) ;
517+ let mut buffered = MemoryBufferedStream :: new ( input, 10 , res) ;
584518 wait_for_buffering ( ) . await ;
585519
586520 pull_ok_msg ( & mut buffered) . await ?;
@@ -595,7 +529,7 @@ mod tests {
595529 let input = futures:: stream:: iter ( [ 1 , 2 , 3 , 4 ] ) . map ( Ok ) ;
596530 let ( pool, res) = memory_pool_and_reservation ( ) ;
597531
598- let mut buffered = MemoryBufferedStream :: new ( input, 10 , res, false ) ;
532+ let mut buffered = MemoryBufferedStream :: new ( input, 10 , res) ;
599533 wait_for_buffering ( ) . await ;
600534 assert_eq ! ( buffered. messages_queued( ) , 4 ) ;
601535 assert_eq ! ( pool. reserved( ) , 10 ) ;
0 commit comments