Skip to content

Commit 79b0763

Browse files
authored
Fix: Correctly handle inconsistent file and table schema order in vortex-datafusion (#5436)
I think the unit test and PR title are self-explanatory. --------- Signed-off-by: Frederic Branczyk <[email protected]>
1 parent 2013eff commit 79b0763

File tree

1 file changed

+112
-4
lines changed

1 file changed

+112
-4
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 112 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,10 @@ impl FileOpener for VortexOpener {
227227
// for schema evolution and divergence between the table's schema and individual files.
228228
filter = filter
229229
.map(|filter| {
230-
let logical_file_schema =
231-
compute_logical_file_schema(&physical_file_schema, &logical_schema);
230+
let logical_file_schema = compute_logical_file_schema(
231+
&physical_file_schema.clone(),
232+
&logical_schema,
233+
);
232234

233235
let expr = expr_adapter_factory
234236
.create(logical_file_schema, physical_file_schema.clone())
@@ -244,9 +246,14 @@ impl FileOpener for VortexOpener {
244246
predicate_file_schema = physical_file_schema.clone();
245247
}
246248

247-
let (schema_mapping, adapted_projections) =
249+
// Create the initial mapping from physical file schema to projected schema.
250+
// This gives us the field reordering and tells us which logical schema fields
251+
// to select.
252+
let (_schema_mapping, adapted_projections) =
248253
schema_adapter.map_schema(&physical_file_schema)?;
249254

255+
// Build the Vortex projection expression using the adapted projections.
256+
// This will reorder the fields to match the target order.
250257
let fields = adapted_projections
251258
.iter()
252259
.map(|idx| {
@@ -256,6 +263,39 @@ impl FileOpener for VortexOpener {
256263
.collect::<Vec<_>>();
257264
let projection_expr = select(fields, root());
258265

266+
// After Vortex applies the projection, the batch will have fields in the target
267+
// order (matching adapted_projections), but with the physical file types.
268+
// We need a second schema mapping for type casting only, not reordering.
269+
// Build a schema that represents what Vortex will return: fields in target order
270+
// with physical types.
271+
let projected_physical_fields: Vec<Field> = adapted_projections
272+
.iter()
273+
.map(|&idx| {
274+
let logical_field = logical_schema.field(idx);
275+
let field_name = logical_field.name();
276+
277+
// Find this field in the physical schema to get its physical type
278+
physical_file_schema
279+
.field_with_name(field_name)
280+
.map(|phys_field| {
281+
Field::new(
282+
field_name,
283+
merge_field_types(phys_field, logical_field),
284+
phys_field.is_nullable(),
285+
)
286+
})
287+
.unwrap_or_else(|_| (*logical_field).clone())
288+
})
289+
.collect();
290+
291+
let projected_physical_schema =
292+
Arc::new(arrow_schema::Schema::new(projected_physical_fields));
293+
294+
// Create a second mapping from the projected physical schema (what Vortex returns)
295+
// to the final projected schema. This mapping will handle type casting without reordering.
296+
let (batch_schema_mapping, _) =
297+
schema_adapter.map_schema(&projected_physical_schema)?;
298+
259299
// We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
260300
let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) {
261301
Entry::Occupied(mut occupied_entry) => {
@@ -350,7 +390,7 @@ impl FileOpener for VortexOpener {
350390
))))
351391
})
352392
.try_flatten()
353-
.map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b)))
393+
.map(move |batch| batch.and_then(|b| batch_schema_mapping.map_batch(b)))
354394
.boxed();
355395

356396
Ok(stream)
@@ -658,6 +698,74 @@ mod tests {
658698
Ok(())
659699
}
660700

701+
#[tokio::test]
702+
// This test verifies that files with different column order than the
703+
// table schema can be opened without errors. The fix ensures that the
704+
// schema mapper is only used for type casting, not for reordering,
705+
// since the vortex projection already handles reordering.
706+
async fn test_schema_different_column_order() -> anyhow::Result<()> {
707+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
708+
709+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
710+
let file_path = "/path/file.vortex";
711+
712+
// File has columns in order: c, b, a
713+
let batch = record_batch!(
714+
("c", Int32, vec![Some(300), Some(301), Some(302)]),
715+
("b", Int32, vec![Some(200), Some(201), Some(202)]),
716+
("a", Int32, vec![Some(100), Some(101), Some(102)])
717+
)
718+
.unwrap();
719+
let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch).await?;
720+
let file = PartitionedFile::new(file_path.to_string(), data_size);
721+
722+
// Table schema has columns in different order: a, b, c
723+
let table_schema = Arc::new(Schema::new(vec![
724+
Field::new("a", DataType::Int32, true),
725+
Field::new("b", DataType::Int32, true),
726+
Field::new("c", DataType::Int32, true),
727+
]));
728+
729+
let opener = VortexOpener {
730+
session: SESSION.clone(),
731+
object_store: object_store.clone(),
732+
projection: Some([0, 1, 2].into()),
733+
filter: None,
734+
file_pruning_predicate: None,
735+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
736+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
737+
partition_fields: vec![],
738+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
739+
logical_schema: table_schema.clone(),
740+
batch_size: 100,
741+
limit: None,
742+
metrics: Default::default(),
743+
layout_readers: Default::default(),
744+
has_output_ordering: false,
745+
};
746+
747+
// The opener should successfully open the file and reorder columns
748+
let stream = opener.open(make_meta(file_path, data_size), file)?.await?;
749+
750+
let format_opts = FormatOptions::new().with_types_info(true);
751+
let data = stream.try_collect::<Vec<_>>().await?;
752+
753+
// Verify the output has columns in table schema order (a, b, c)
754+
// not file order (c, b, a)
755+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
756+
+-------+-------+-------+
757+
| a | b | c |
758+
| Int32 | Int32 | Int32 |
759+
+-------+-------+-------+
760+
| 100 | 200 | 300 |
761+
| 101 | 201 | 301 |
762+
| 102 | 202 | 302 |
763+
+-------+-------+-------+
764+
");
765+
766+
Ok(())
767+
}
768+
661769
#[tokio::test]
662770
// This test verifies that expression rewriting doesn't fail when there is
663771
// a nested schema mismatch between the physical file schema and logical

0 commit comments

Comments
 (0)