@@ -2,22 +2,22 @@ use std::collections::{BTreeSet, VecDeque};
22use std:: sync:: { Arc , RwLock } ;
33
44use bytes:: Bytes ;
5- use itertools:: Itertools ;
65use vortex_array:: ArrayData ;
7- use vortex_error:: VortexUnwrap ;
6+ use vortex_error:: { vortex_panic , VortexUnwrap } ;
87
98use crate :: read:: mask:: RowMask ;
9+ use crate :: read:: splits:: { FixedSplitIterator , MaskIterator , SplitMask } ;
1010use crate :: { BatchRead , LayoutMessageCache , LayoutReader , MessageLocator } ;
1111
12- pub fn layout_splits ( layout : & mut dyn LayoutReader , length : usize ) -> Vec < RowMask > {
12+ fn layout_splits ( layout : & mut dyn LayoutReader , length : usize ) -> impl Iterator < Item = RowMask > {
13+ let mut iter = FixedSplitIterator :: new ( length as u64 , None ) ;
1314 let mut splits = BTreeSet :: new ( ) ;
14- splits. insert ( length) ;
1515 layout. add_splits ( 0 , & mut splits) . vortex_unwrap ( ) ;
16- splits
17- . into_iter ( )
18- . tuple_windows :: < ( usize , usize ) > ( )
19- . map ( | ( begin , end ) | RowMask :: new_valid_between ( begin , end ) )
20- . collect :: < Vec < _ > > ( )
16+ iter . additional_splits ( & mut splits) . vortex_unwrap ( ) ;
17+ iter . map ( |m| m . vortex_unwrap ( ) ) . map ( |m| match m {
18+ SplitMask :: ReadMore ( _ ) => vortex_panic ! ( "Will never read more" ) ,
19+ SplitMask :: Mask ( m ) => m ,
20+ } )
2121}
2222
2323pub fn read_layout_data (
@@ -73,7 +73,6 @@ pub fn filter_read_layout(
7373 length : usize ,
7474) -> VecDeque < ArrayData > {
7575 layout_splits ( filter_layout, length)
76- . into_iter ( )
7776 . flat_map ( |s| read_filters ( filter_layout, cache. clone ( ) , buf, & s) )
7877 . flat_map ( |s| read_layout_data ( layout, cache. clone ( ) , buf, & s) )
7978 . collect ( )
@@ -86,7 +85,6 @@ pub fn read_layout(
8685 length : usize ,
8786) -> VecDeque < ArrayData > {
8887 layout_splits ( layout, length)
89- . into_iter ( )
9088 . flat_map ( |s| read_layout_data ( layout, cache. clone ( ) , buf, & s) )
9189 . collect ( )
9290}
0 commit comments