@@ -107,22 +107,17 @@ impl<D: ScanDriver> ScanBuilder<D> {
107107 self . driver_options = Some ( options) ;
108108 self
109109 }
110- }
111110
112- impl < D : ScanDriver > ScanBuilder < D > {
113- /// Perform the scan operation and return a stream of arrays.
114- pub fn into_array_stream ( self ) -> VortexResult < impl ArrayStream + ' static > {
111+ pub fn build ( self ) -> VortexResult < Scan < D > > {
115112 let projection = simplify_typed ( self . projection , self . layout . dtype ( ) ) ?;
116113 let filter = self
117114 . filter
118115 . map ( |f| simplify_typed ( f, self . layout . dtype ( ) ) )
119116 . transpose ( ) ?;
120-
121117 let field_mask = field_mask ( & projection, filter. as_ref ( ) , self . layout . dtype ( ) ) ?;
122118
123- let splits = self . split_by . splits ( & self . layout , & field_mask) ?;
124119 let row_indices = self . row_indices . clone ( ) ;
125-
120+ let splits = self . split_by . splits ( & self . layout , & field_mask ) ? ;
126121 let row_masks = splits
127122 . into_iter ( )
128123 . filter_map ( move |row_range| {
@@ -171,10 +166,44 @@ impl<D: ScanDriver> ScanBuilder<D> {
171166 } )
172167 . collect_vec ( ) ;
173168
174- let scanner = Arc :: new ( Scanner :: new (
175- self . layout . dtype ( ) . clone ( ) ,
169+ Ok ( Scan {
170+ driver : self . driver ,
171+ layout : self . layout ,
172+ ctx : self . ctx ,
176173 projection,
177174 filter,
175+ row_masks,
176+ } )
177+ }
178+
179+ /// Perform the scan operation and return a stream of arrays.
180+ pub fn into_array_stream ( self ) -> VortexResult < impl ArrayStream + ' static > {
181+ self . build ( ) ?. into_array_stream ( )
182+ }
183+
184+ pub async fn into_array ( self ) -> VortexResult < Array > {
185+ self . into_array_stream ( ) ?. into_array ( ) . await
186+ }
187+ }
188+
189+ pub struct Scan < D > {
190+ driver : D ,
191+ layout : Layout ,
192+ ctx : ContextRef ,
193+ // Guaranteed to be simplified
194+ projection : ExprRef ,
195+ // Guaranteed to be simplified
196+ filter : Option < ExprRef > ,
197+ row_masks : Vec < RowMask > ,
198+ }
199+
200+ impl < D : ScanDriver > Scan < D > {
201+ /// Perform the scan operation and return a stream of arrays.
202+ pub fn into_array_stream ( self ) -> VortexResult < impl ArrayStream + ' static > {
203+ let scanner = Arc :: new ( Scanner :: new (
204+ self . layout . dtype ( ) . clone ( ) ,
205+ self . projection ,
206+ self . filter ,
178207 ) ?) ;
179208
180209 let result_dtype = scanner. result_dtype ( ) . clone ( ) ;
@@ -184,10 +213,10 @@ impl<D: ScanDriver> ScanBuilder<D> {
184213 . layout
185214 . reader ( self . driver . segment_reader ( ) , self . ctx . clone ( ) ) ?;
186215
187- let mut results = Vec :: with_capacity ( row_masks. len ( ) ) ;
188- let mut tasks = Vec :: with_capacity ( row_masks. len ( ) ) ;
216+ let mut results = Vec :: with_capacity ( self . row_masks . len ( ) ) ;
217+ let mut tasks = Vec :: with_capacity ( self . row_masks . len ( ) ) ;
189218
190- for row_mask in row_masks. into_iter ( ) {
219+ for row_mask in self . row_masks . into_iter ( ) {
191220 let ( send_result, recv_result) = oneshot:: channel :: < VortexResult < Option < Array > > > ( ) ;
192221 results. push ( recv_result) ;
193222
0 commit comments