@@ -227,8 +227,10 @@ impl FileOpener for VortexOpener {
227227 // for schema evolution and divergence between the table's schema and individual files.
228228 filter = filter
229229 . map ( |filter| {
230- let logical_file_schema =
231- compute_logical_file_schema ( & physical_file_schema, & logical_schema) ;
230+ let logical_file_schema = compute_logical_file_schema (
231+ & physical_file_schema. clone ( ) ,
232+ & logical_schema,
233+ ) ;
232234
233235 let expr = expr_adapter_factory
234236 . create ( logical_file_schema, physical_file_schema. clone ( ) )
@@ -244,9 +246,14 @@ impl FileOpener for VortexOpener {
244246 predicate_file_schema = physical_file_schema. clone ( ) ;
245247 }
246248
247- let ( schema_mapping, adapted_projections) =
249+ // Create the initial mapping from physical file schema to projected schema.
250+ // This gives us the field reordering and tells us which logical schema fields
251+ // to select.
252+ let ( _schema_mapping, adapted_projections) =
248253 schema_adapter. map_schema ( & physical_file_schema) ?;
249254
255+ // Build the Vortex projection expression using the adapted projections.
256+ // This will reorder the fields to match the target order.
250257 let fields = adapted_projections
251258 . iter ( )
252259 . map ( |idx| {
@@ -256,6 +263,39 @@ impl FileOpener for VortexOpener {
256263 . collect :: < Vec < _ > > ( ) ;
257264 let projection_expr = select ( fields, root ( ) ) ;
258265
266+ // After Vortex applies the projection, the batch will have fields in the target
267+ // order (matching adapted_projections), but with the physical file types.
268+ // We need a second schema mapping for type casting only, not reordering.
269+ // Build a schema that represents what Vortex will return: fields in target order
270+ // with physical types.
271+ let projected_physical_fields: Vec < Field > = adapted_projections
272+ . iter ( )
273+ . map ( |& idx| {
274+ let logical_field = logical_schema. field ( idx) ;
275+ let field_name = logical_field. name ( ) ;
276+
277+ // Find this field in the physical schema to get its physical type
278+ physical_file_schema
279+ . field_with_name ( field_name)
280+ . map ( |phys_field| {
281+ Field :: new (
282+ field_name,
283+ merge_field_types ( phys_field, logical_field) ,
284+ phys_field. is_nullable ( ) ,
285+ )
286+ } )
287+ . unwrap_or_else ( |_| ( * logical_field) . clone ( ) )
288+ } )
289+ . collect ( ) ;
290+
291+ let projected_physical_schema =
292+ Arc :: new ( arrow_schema:: Schema :: new ( projected_physical_fields) ) ;
293+
294+ // Create a second mapping from the projected physical schema (what Vortex returns)
295+ // to the final projected schema. This mapping will handle type casting without reordering.
296+ let ( batch_schema_mapping, _) =
297+ schema_adapter. map_schema ( & projected_physical_schema) ?;
298+
259299 // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
260300 let layout_reader = match layout_reader. entry ( file_meta. object_meta . location . clone ( ) ) {
261301 Entry :: Occupied ( mut occupied_entry) => {
@@ -350,7 +390,7 @@ impl FileOpener for VortexOpener {
350390 ) ) ) )
351391 } )
352392 . try_flatten ( )
353- . map ( move |batch| batch. and_then ( |b| schema_mapping . map_batch ( b) ) )
393+ . map ( move |batch| batch. and_then ( |b| batch_schema_mapping . map_batch ( b) ) )
354394 . boxed ( ) ;
355395
356396 Ok ( stream)
0 commit comments