@@ -913,6 +913,10 @@ pub(crate) fn evaluate_predicate(
913913 } )
914914}
915915
916+ /// Maximum number of bytes that can be evaluated in a row filter
917+ /// before yielding back to the scheduler
918+ const DECODE_BUDGET : usize = 2 * 1024 * 1024 ;
919+
916920/// Evaluates an [`ArrowPredicate`], returning a [`RowSelection`] indicating
917921/// which rows to return.
918922///
@@ -930,10 +934,14 @@ pub(crate) async fn evaluate_predicate_coop(
930934 input_selection : Option < RowSelection > ,
931935 predicate : & mut dyn ArrowPredicate ,
932936) -> Result < RowSelection > {
937+ let mut budget = DECODE_BUDGET ;
938+
933939 let reader = ParquetRecordBatchReader :: new ( batch_size, array_reader, input_selection. clone ( ) ) ;
934940 let mut filters = vec ! [ ] ;
935941 for maybe_batch in reader {
936942 let maybe_batch = maybe_batch?;
943+ budget = budget. saturating_sub ( maybe_batch. get_array_memory_size ( ) ) ;
944+
937945 let input_rows = maybe_batch. num_rows ( ) ;
938946 let filter = predicate. evaluate ( maybe_batch) ?;
939947 // Since user supplied predicate, check error here to catch bugs quickly
@@ -948,8 +956,13 @@ pub(crate) async fn evaluate_predicate_coop(
948956 _ => filters. push ( prep_null_mask_filter ( & filter) ) ,
949957 } ;
950958
951- #[ cfg( feature = "async" ) ]
952- tokio:: task:: consume_budget ( ) . await ;
959+ if budget == 0 {
960+ // If we have consumed our decode budget, reset the budget and yield
961+ // back to the scheduler
962+ budget = DECODE_BUDGET ;
963+ #[ cfg( feature = "async" ) ]
964+ tokio:: task:: yield_now ( ) . await ;
965+ }
953966 }
954967
955968 let raw = RowSelection :: from_filters ( & filters) ;
0 commit comments