Skip to content

Commit 3ad77de

Browse files
committed
vortex-datafusion: Fix schema projection order
Signed-off-by: Frederic Branczyk <[email protected]>
1 parent 90b5691 commit 3ad77de

File tree

1 file changed

+16
-50
lines changed

1 file changed

+16
-50
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ impl FileOpener for VortexOpener {
230230
DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}"))
231231
})?);
232232

233+
// Compute the logical file schema by merging physical file types with logical table types.
234+
// This schema has the same field names as logical_schema, but with physical types from the file.
235+
let logical_file_schema =
236+
compute_logical_file_schema(&physical_file_schema, &logical_schema);
237+
233238
if let Some(expr_adapter_factory) = expr_adapter_factory {
234239
let partition_values = partition_fields
235240
.iter()
@@ -241,13 +246,8 @@ impl FileOpener for VortexOpener {
241246
// for schema evolution and divergence between the table's schema and individual files.
242247
filter = filter
243248
.map(|filter| {
244-
let logical_file_schema = compute_logical_file_schema(
245-
&physical_file_schema.clone(),
246-
&logical_schema,
247-
);
248-
249249
let expr = expr_adapter_factory
250-
.create(logical_file_schema, physical_file_schema.clone())
250+
.create(logical_file_schema.clone(), physical_file_schema.clone())
251251
.with_partition_values(partition_values)
252252
.rewrite(filter)?;
253253

@@ -257,59 +257,25 @@ impl FileOpener for VortexOpener {
257257
})
258258
.transpose()?;
259259

260-
predicate_file_schema = physical_file_schema.clone();
260+
predicate_file_schema = physical_file_schema;
261261
}
262262

263-
// Create the initial mapping from physical file schema to projected schema.
264-
// This gives us the field reordering and tells us which logical schema fields
265-
// to select.
266-
let (_schema_mapping, adapted_projections) =
267-
schema_adapter.map_schema(&physical_file_schema)?;
263+
// Use the pre-created schema adapter to map logical_file_schema to projected_schema.
264+
// Since logical_file_schema has the same field names as logical_schema (which the adapter
265+
// was created with), this works correctly and gives us the projection indices.
266+
let (schema_mapping, adapted_projections) =
267+
schema_adapter.map_schema(&logical_file_schema)?;
268268

269-
// Build the Vortex projection expression using the adapted projections.
270-
// This will reorder the fields to match the target order.
269+
// Build the Vortex projection expression using field names from logical_file_schema
271270
let fields = adapted_projections
272271
.iter()
273-
.map(|idx| {
274-
let field = logical_schema.field(*idx);
272+
.map(|&idx| {
273+
let field = logical_file_schema.field(idx);
275274
FieldName::from(field.name().as_str())
276275
})
277276
.collect::<Vec<_>>();
278277
let projection_expr = select(fields, root());
279278

280-
// After Vortex applies the projection, the batch will have fields in the target
281-
// order (matching adapted_projections), but with the physical file types.
282-
// We need a second schema mapping for type casting only, not reordering.
283-
// Build a schema that represents what Vortex will return: fields in target order
284-
// with physical types.
285-
let projected_physical_fields: Vec<Field> = adapted_projections
286-
.iter()
287-
.map(|&idx| {
288-
let logical_field = logical_schema.field(idx);
289-
let field_name = logical_field.name();
290-
291-
// Find this field in the physical schema to get its physical type
292-
physical_file_schema
293-
.field_with_name(field_name)
294-
.map(|phys_field| {
295-
Field::new(
296-
field_name,
297-
merge_field_types(phys_field, logical_field),
298-
phys_field.is_nullable(),
299-
)
300-
})
301-
.unwrap_or_else(|_| (*logical_field).clone())
302-
})
303-
.collect();
304-
305-
let projected_physical_schema =
306-
Arc::new(arrow_schema::Schema::new(projected_physical_fields));
307-
308-
// Create a second mapping from the projected physical schema (what Vortex returns)
309-
// to the final projected schema. This mapping will handle type casting without reordering.
310-
let (batch_schema_mapping, _) =
311-
schema_adapter.map_schema(&projected_physical_schema)?;
312-
313279
// We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
314280
let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) {
315281
Entry::Occupied(mut occupied_entry) => {
@@ -404,7 +370,7 @@ impl FileOpener for VortexOpener {
404370
))))
405371
})
406372
.try_flatten()
407-
.map(move |batch| batch.and_then(|b| batch_schema_mapping.map_batch(b)))
373+
.map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b)))
408374
.boxed();
409375

410376
Ok(stream)

0 commit comments

Comments
 (0)