Skip to content

Commit 16d179b

Browse files
authored
Fix: Schema projection order mismatch from table/file schema (#5541)
Looks like I only solved a subset of the issue with #5436 The test I added failed before the fix. --------- Signed-off-by: Frederic Branczyk <[email protected]>
1 parent fe34efa commit 16d179b

File tree

1 file changed

+93
-50
lines changed

1 file changed

+93
-50
lines changed

vortex-datafusion/src/persistent/opener.rs

Lines changed: 93 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,11 @@ impl FileOpener for VortexOpener {
230230
DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}"))
231231
})?);
232232

233+
// Compute the logical file schema by merging physical file types with logical table types.
234+
// This schema has the same field names as logical_schema, but with physical types from the file.
235+
let logical_file_schema =
236+
compute_logical_file_schema(&physical_file_schema, &logical_schema);
237+
233238
if let Some(expr_adapter_factory) = expr_adapter_factory {
234239
let partition_values = partition_fields
235240
.iter()
@@ -241,13 +246,8 @@ impl FileOpener for VortexOpener {
241246
// for schema evolution and divergence between the table's schema and individual files.
242247
filter = filter
243248
.map(|filter| {
244-
let logical_file_schema = compute_logical_file_schema(
245-
&physical_file_schema.clone(),
246-
&logical_schema,
247-
);
248-
249249
let expr = expr_adapter_factory
250-
.create(logical_file_schema, physical_file_schema.clone())
250+
.create(logical_file_schema.clone(), physical_file_schema.clone())
251251
.with_partition_values(partition_values)
252252
.rewrite(filter)?;
253253

@@ -257,59 +257,25 @@ impl FileOpener for VortexOpener {
257257
})
258258
.transpose()?;
259259

260-
predicate_file_schema = physical_file_schema.clone();
260+
predicate_file_schema = physical_file_schema;
261261
}
262262

263-
// Create the initial mapping from physical file schema to projected schema.
264-
// This gives us the field reordering and tells us which logical schema fields
265-
// to select.
266-
let (_schema_mapping, adapted_projections) =
267-
schema_adapter.map_schema(&physical_file_schema)?;
263+
// Use the pre-created schema adapter to map logical_file_schema to projected_schema.
264+
// Since logical_file_schema has the same field names as logical_schema (which the adapter
265+
// was created with), this works correctly and gives us the projection indices.
266+
let (schema_mapping, adapted_projections) =
267+
schema_adapter.map_schema(&logical_file_schema)?;
268268

269-
// Build the Vortex projection expression using the adapted projections.
270-
// This will reorder the fields to match the target order.
269+
// Build the Vortex projection expression using field names from logical_file_schema
271270
let fields = adapted_projections
272271
.iter()
273-
.map(|idx| {
274-
let field = logical_schema.field(*idx);
272+
.map(|&idx| {
273+
let field = logical_file_schema.field(idx);
275274
FieldName::from(field.name().as_str())
276275
})
277276
.collect::<Vec<_>>();
278277
let projection_expr = select(fields, root());
279278

280-
// After Vortex applies the projection, the batch will have fields in the target
281-
// order (matching adapted_projections), but with the physical file types.
282-
// We need a second schema mapping for type casting only, not reordering.
283-
// Build a schema that represents what Vortex will return: fields in target order
284-
// with physical types.
285-
let projected_physical_fields: Vec<Field> = adapted_projections
286-
.iter()
287-
.map(|&idx| {
288-
let logical_field = logical_schema.field(idx);
289-
let field_name = logical_field.name();
290-
291-
// Find this field in the physical schema to get its physical type
292-
physical_file_schema
293-
.field_with_name(field_name)
294-
.map(|phys_field| {
295-
Field::new(
296-
field_name,
297-
merge_field_types(phys_field, logical_field),
298-
phys_field.is_nullable(),
299-
)
300-
})
301-
.unwrap_or_else(|_| (*logical_field).clone())
302-
})
303-
.collect();
304-
305-
let projected_physical_schema =
306-
Arc::new(arrow_schema::Schema::new(projected_physical_fields));
307-
308-
// Create a second mapping from the projected physical schema (what Vortex returns)
309-
// to the final projected schema. This mapping will handle type casting without reordering.
310-
let (batch_schema_mapping, _) =
311-
schema_adapter.map_schema(&projected_physical_schema)?;
312-
313279
// We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
314280
let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) {
315281
Entry::Occupied(mut occupied_entry) => {
@@ -404,7 +370,7 @@ impl FileOpener for VortexOpener {
404370
))))
405371
})
406372
.try_flatten()
407-
.map(move |batch| batch.and_then(|b| batch_schema_mapping.map_batch(b)))
373+
.map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b)))
408374
.boxed();
409375

410376
Ok(stream)
@@ -870,4 +836,81 @@ mod tests {
870836

871837
Ok(())
872838
}
839+
840+
#[tokio::test]
841+
// Minimal reproducing test for the schema projection bug.
842+
// Before the fix, this would fail with a cast error when the file schema
843+
// and table schema have different field orders and we project a subset of columns.
844+
async fn test_projection_bug_minimal_repro() -> anyhow::Result<()> {
845+
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
846+
let file_path = "/path/file.vortex";
847+
848+
// File has columns in order: a, b, c with simple types
849+
let batch = record_batch!(
850+
("a", Int32, vec![Some(1)]),
851+
("b", Utf8, vec![Some("test")]),
852+
("c", Int32, vec![Some(2)])
853+
)
854+
.unwrap();
855+
let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch).await?;
856+
857+
// Table schema has columns in DIFFERENT order: c, a, b
858+
// and different types that require casting (Utf8 -> Dictionary)
859+
let table_schema = Arc::new(Schema::new(vec![
860+
Field::new("c", DataType::Int32, true),
861+
Field::new("a", DataType::Int32, true),
862+
Field::new(
863+
"b",
864+
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
865+
true,
866+
),
867+
]));
868+
869+
// Project columns [0, 2] from table schema, which should give us: c, b
870+
// Before the fix, the schema adapter would get confused about which fields
871+
// to select from the file, causing incorrect type mappings.
872+
let projection = vec![0, 2];
873+
874+
let opener = VortexOpener {
875+
session: SESSION.clone(),
876+
object_store: object_store.clone(),
877+
projection: Some(projection.into()),
878+
filter: None,
879+
file_pruning_predicate: None,
880+
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
881+
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
882+
partition_fields: vec![],
883+
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
884+
logical_schema: table_schema.clone(),
885+
batch_size: 100,
886+
limit: None,
887+
metrics: Default::default(),
888+
layout_readers: Default::default(),
889+
has_output_ordering: false,
890+
};
891+
892+
// This should succeed and return the correctly projected and cast data
893+
let data = opener
894+
.open(
895+
make_meta(file_path, data_size),
896+
PartitionedFile::new(file_path.to_string(), data_size),
897+
)?
898+
.await?
899+
.try_collect::<Vec<_>>()
900+
.await?;
901+
902+
// Verify the columns are in the right order and have the right values
903+
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
904+
let format_opts = FormatOptions::new().with_types_info(true);
905+
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
906+
+-------+--------------------------+
907+
| c | b |
908+
| Int32 | Dictionary(UInt32, Utf8) |
909+
+-------+--------------------------+
910+
| 2 | test |
911+
+-------+--------------------------+
912+
");
913+
914+
Ok(())
915+
}
873916
}

0 commit comments

Comments
 (0)