@@ -324,6 +324,20 @@ impl ArrayReader for CachedArrayReader {
324324 let mask = selection_buffer. slice ( selection_start, selection_length) ;
325325
326326 if mask. count_set_bits ( ) == 0 {
327+ // Even when all records are filtered out, check if the batch has levels
328+ // so we can return Some([]) instead of None to indicate this reader provides levels.
329+ // Check local cache first, then shared cache (since skip_records doesn't populate local cache)
330+ let cached_batch = self . local_cache . get ( & batch_id) . cloned ( ) . or_else ( || {
331+ self . shared_cache . lock ( ) . unwrap ( ) . get ( self . column_idx , batch_id)
332+ } ) ;
333+ if let Some ( batch) = cached_batch {
334+ if batch. def_levels . is_some ( ) {
335+ has_def_levels = true ;
336+ }
337+ if batch. rep_levels . is_some ( ) {
338+ has_rep_levels = true ;
339+ }
340+ }
327341 continue ;
328342 }
329343
@@ -1089,4 +1103,52 @@ mod tests {
10891103 assert_eq ! ( consumer. get_def_levels( ) . unwrap( ) , & [ 1 , 0 , 1 , 1 , 0 ] ) ;
10901104 assert_eq ! ( consumer. get_rep_levels( ) . unwrap( ) , & [ 0 , 1 , 0 , 1 , 1 ] ) ;
10911105 }
1106+
1107+ #[ test]
1108+ fn test_level_propagation_empty_after_skip ( ) {
1109+ let metrics = ArrowReaderMetrics :: disabled ( ) ;
1110+ let cache = Arc :: new ( Mutex :: new ( RowGroupCache :: new ( 4 , usize:: MAX ) ) ) ;
1111+
1112+ // Producer populates cache with levels
1113+ let data = vec ! [ 1 , 2 , 3 , 4 ] ;
1114+ let def_levels = vec ! [ 1 , 0 , 1 , 1 ] ;
1115+ let rep_levels = vec ! [ 0 , 1 , 1 , 0 ] ;
1116+ let mock_reader =
1117+ MockArrayReaderWithLevels :: new ( data, def_levels. clone ( ) , rep_levels. clone ( ) ) ;
1118+ let mut producer = CachedArrayReader :: new (
1119+ Box :: new ( mock_reader) ,
1120+ cache. clone ( ) ,
1121+ 0 ,
1122+ CacheRole :: Producer ,
1123+ metrics. clone ( ) ,
1124+ ) ;
1125+
1126+ producer. read_records ( 4 ) . unwrap ( ) ;
1127+ producer. consume_batch ( ) . unwrap ( ) ;
1128+
1129+ // Consumer skips all rows, resulting in an empty output batch
1130+ let mock_reader2 = MockArrayReaderWithLevels :: new (
1131+ vec ! [ 10 , 20 , 30 , 40 ] ,
1132+ vec ! [ 0 , 0 , 0 , 0 ] ,
1133+ vec ! [ 0 , 0 , 0 , 0 ] ,
1134+ ) ;
1135+ let mut consumer = CachedArrayReader :: new (
1136+ Box :: new ( mock_reader2) ,
1137+ cache,
1138+ 0 ,
1139+ CacheRole :: Consumer ,
1140+ metrics,
1141+ ) ;
1142+
1143+ let skipped = consumer. skip_records ( 4 ) . unwrap ( ) ;
1144+ assert_eq ! ( skipped, 4 ) ;
1145+
1146+ let array = consumer. consume_batch ( ) . unwrap ( ) ;
1147+ assert_eq ! ( array. len( ) , 0 ) ;
1148+
1149+ let def_levels = consumer. get_def_levels ( ) . map ( |l| l. to_vec ( ) ) ;
1150+ assert_eq ! ( def_levels, Some ( vec![ ] ) ) ;
1151+ let rep_levels = consumer. get_rep_levels ( ) . map ( |l| l. to_vec ( ) ) ;
1152+ assert_eq ! ( rep_levels, Some ( vec![ ] ) ) ;
1153+ }
10921154}
0 commit comments