@@ -51,7 +51,7 @@ pub struct VortexGlobalData {
5151 file_paths : SegQueue < PathBuf > ,
5252 _is_first_file_processed : std:: sync:: atomic:: AtomicBool ,
5353 filter_expr : ExprRef ,
54- project_expr : ExprRef ,
54+ projection_expr : ExprRef ,
5555}
5656
5757pub struct VortexLocalData {
@@ -86,6 +86,56 @@ fn extract_schema_from_vortex_file(
8686 Ok ( ( column_names, column_types) )
8787}
8888
89+ /// Creates a projection expression based on the table initialization input.
90+ fn create_projection_expr ( init : & TableInitInput < VortexTableFunction > ) -> ExprRef {
91+ let projection_ids = init. projection_ids ( ) . unwrap_or ( & [ ] ) ;
92+ let column_ids = init. column_ids ( ) ;
93+
94+ let projected_ids = projection_ids. iter ( ) . map ( |p| column_ids[ p. as_usize ( ) ] ) ;
95+ select (
96+ projected_ids
97+ . map ( |idx| {
98+ init. bind_data ( )
99+ . column_names
100+ . get ( idx. as_usize ( ) )
101+ . vortex_expect ( "prune idx in column names" )
102+ } )
103+ . map ( |s| Arc :: from ( s. clone ( ) ) )
104+ . collect :: < FieldNames > ( ) ,
105+ root ( ) ,
106+ )
107+ }
108+
109+ /// Creates a table filter expression from the table filter set.
110+ fn create_table_filter_expr (
111+ init : & TableInitInput < VortexTableFunction > ,
112+ ) -> VortexResult < Option < ExprRef > > {
113+ init. table_filter_set ( )
114+ . and_then ( |filter| {
115+ filter
116+ . into_iter ( )
117+ . map ( |( idx, ex) | {
118+ let name = init
119+ . bind_data ( )
120+ . column_names
121+ . get ( idx. as_usize ( ) )
122+ . vortex_expect ( "exists" ) ;
123+ try_from_table_filter ( & ex, name)
124+ } )
125+ . reduce ( |l, r| Ok ( and ( l?, r?) ) )
126+ } )
127+ . transpose ( )
128+ }
129+
130+ /// Creates a SegQueue populated with file paths from bind data.
131+ fn create_file_paths_queue ( bind_data : & VortexBindData ) -> SegQueue < PathBuf > {
132+ let file_paths = SegQueue :: new ( ) ;
133+ for path in bind_data. file_paths . iter ( ) {
134+ file_paths. push ( path. clone ( ) ) ;
135+ }
136+ file_paths
137+ }
138+
89139impl TableFunction for VortexTableFunction {
90140 type BindData = VortexBindData ;
91141 type GlobalState = VortexGlobalData ;
@@ -156,7 +206,7 @@ impl TableFunction for VortexTableFunction {
156206
157207 let array_iter = file
158208 . scan ( ) ?
159- . with_projection ( global_state. project_expr . clone ( ) )
209+ . with_projection ( global_state. projection_expr . clone ( ) )
160210 . with_filter ( global_state. filter_expr . clone ( ) )
161211 . into_array_iter ( )
162212 . map_err ( |e| vortex_err ! ( "Failed to create array iterator: {}" , e) ) ?;
@@ -184,62 +234,24 @@ impl TableFunction for VortexTableFunction {
184234 Ok ( ( ) )
185235 }
186236
187- fn init_global ( init : & TableInitInput < Self > ) -> VortexResult < Self :: GlobalState > {
188- let bind_data = init. bind_data ( ) ;
189- let file_paths = SegQueue :: new ( ) ;
190-
191- // Skip the first file path, as the file is opened during bind.
192- for path in bind_data. file_paths . iter ( ) {
193- file_paths. push ( path. clone ( ) ) ;
194- }
195-
196- let complex_filter = and_collect ( init. bind_data ( ) . filter_exprs . clone ( ) ) ;
197-
198- let projection_ids = init. projection_ids ( ) . unwrap_or ( & [ ] ) ;
199- let column_ids = init. column_ids ( ) ;
200-
201- let projected_ids = projection_ids. iter ( ) . map ( |p| column_ids[ p. as_usize ( ) ] ) ;
202- let project_expr = select (
203- projected_ids
204- . map ( |idx| {
205- init. bind_data ( )
206- . column_names
207- . get ( idx. as_usize ( ) )
208- . vortex_expect ( "prune idx in column names" )
209- } )
210- . map ( |s| Arc :: from ( s. clone ( ) ) )
211- . collect :: < FieldNames > ( ) ,
212- root ( ) ,
213- ) ;
214-
215- let filter = init
216- . table_filter_set ( )
217- . and_then ( |filter| {
218- filter
219- . into_iter ( )
220- . map ( |( idx, ex) | {
221- let name = init
222- . bind_data ( )
223- . column_names
224- . get ( idx. as_usize ( ) )
225- . vortex_expect ( "exists" ) ;
226- try_from_table_filter ( & ex, name)
227- } )
228- . reduce ( |l, r| Ok ( and ( l?, r?) ) )
229- } )
230- . transpose ( ) ?;
237+ fn init_global ( init_input : & TableInitInput < Self > ) -> VortexResult < Self :: GlobalState > {
238+ let bind_data = init_input. bind_data ( ) ;
239+ let file_paths = create_file_paths_queue ( bind_data) ;
240+ let projection_expr = create_projection_expr ( init_input) ;
241+ let filter_expr = create_table_filter_expr ( init_input) ?;
231242
232- let filter_expr = complex_filter
243+ let complex_filter_expr = and_collect ( init_input. bind_data ( ) . filter_exprs . clone ( ) ) ;
244+ let filter_expr = complex_filter_expr
233245 . into_iter ( )
234- . chain ( filter )
246+ . chain ( filter_expr )
235247 . reduce ( and)
236248 . unwrap_or_else ( || lit ( true ) ) ;
237249
238250 Ok ( VortexGlobalData {
239251 file_paths,
240252 _is_first_file_processed : std:: sync:: atomic:: AtomicBool :: new ( false ) ,
241253 filter_expr,
242- project_expr ,
254+ projection_expr ,
243255 } )
244256 }
245257
0 commit comments