Skip to content

Commit 2e3c7db

Browse files
committed
vortex-datafusion: Fix schema projection order
Signed-off-by: Frederic Branczyk <[email protected]>
1 parent 90b5691 commit 2e3c7db

File tree

1 file changed

+15
-44
lines changed

1 file changed

+15
-44
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 15 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -260,56 +260,27 @@ impl FileOpener for VortexOpener {
260260
predicate_file_schema = physical_file_schema.clone();
261261
}
262262

263-
// Create the initial mapping from physical file schema to projected schema.
264-
// This gives us the field reordering and tells us which logical schema fields
265-
// to select.
266-
let (_schema_mapping, adapted_projections) =
267-
schema_adapter.map_schema(&physical_file_schema)?;
268-
269-
// Build the Vortex projection expression using the adapted projections.
270-
// This will reorder the fields to match the target order.
263+
// Compute the logical file schema by merging physical file types with logical table types.
264+
// This schema has the same field names as logical_schema, but with physical types from the file.
265+
let logical_file_schema =
266+
compute_logical_file_schema(&physical_file_schema, &logical_schema);
267+
268+
// Use the pre-created schema adapter to map logical_file_schema to projected_schema.
269+
// Since logical_file_schema has the same field names as logical_schema (which the adapter
270+
// was created with), this works correctly and gives us the projection indices.
271+
let (schema_mapping, adapted_projections) =
272+
schema_adapter.map_schema(&logical_file_schema)?;
273+
274+
// Build the Vortex projection expression using field names from logical_file_schema
271275
let fields = adapted_projections
272276
.iter()
273-
.map(|idx| {
274-
let field = logical_schema.field(*idx);
277+
.map(|&idx| {
278+
let field = logical_file_schema.field(idx);
275279
FieldName::from(field.name().as_str())
276280
})
277281
.collect::<Vec<_>>();
278282
let projection_expr = select(fields, root());
279283

280-
// After Vortex applies the projection, the batch will have fields in the target
281-
// order (matching adapted_projections), but with the physical file types.
282-
// We need a second schema mapping for type casting only, not reordering.
283-
// Build a schema that represents what Vortex will return: fields in target order
284-
// with physical types.
285-
let projected_physical_fields: Vec<Field> = adapted_projections
286-
.iter()
287-
.map(|&idx| {
288-
let logical_field = logical_schema.field(idx);
289-
let field_name = logical_field.name();
290-
291-
// Find this field in the physical schema to get its physical type
292-
physical_file_schema
293-
.field_with_name(field_name)
294-
.map(|phys_field| {
295-
Field::new(
296-
field_name,
297-
merge_field_types(phys_field, logical_field),
298-
phys_field.is_nullable(),
299-
)
300-
})
301-
.unwrap_or_else(|_| (*logical_field).clone())
302-
})
303-
.collect();
304-
305-
let projected_physical_schema =
306-
Arc::new(arrow_schema::Schema::new(projected_physical_fields));
307-
308-
// Create a second mapping from the projected physical schema (what Vortex returns)
309-
// to the final projected schema. This mapping will handle type casting without reordering.
310-
let (batch_schema_mapping, _) =
311-
schema_adapter.map_schema(&projected_physical_schema)?;
312-
313284
// We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
314285
let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) {
315286
Entry::Occupied(mut occupied_entry) => {
@@ -404,7 +375,7 @@ impl FileOpener for VortexOpener {
404375
))))
405376
})
406377
.try_flatten()
407-
.map(move |batch| batch.and_then(|b| batch_schema_mapping.map_batch(b)))
378+
.map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b)))
408379
.boxed();
409380

410381
Ok(stream)

0 commit comments

Comments
 (0)