diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 92c3679b21..c5e6b96ef4 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -2791,6 +2791,40 @@ mod tests { assert_batches_sorted_eq!(&expected, &actual); } + #[tokio::test] + async fn test_select_from_column_mapping() -> DeltaResult<()> { + let table = crate::open_table( + "../test/tests/data/table_with_column_mapping" + ).await?; + let config = DeltaScanConfigBuilder::new() + .build(table.snapshot().unwrap()) + .unwrap(); + let log = table.log_store(); + + let provider = + DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + let ctx: SessionContext = DeltaSessionContext::default().into(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + + let df = ctx + .sql("select * from test") + .await?; + let actual = df.collect().await?; + let expected = vec![ + "+--------------------+--------------------+", + "| test.col_1[col_1a] | test.col_1[col_1b] |", + "+--------------------+--------------------+", + "| A | |", + "| B | |", + "| E | E2 |", + "| F | F2 |", + "| G | G2 |", + "+--------------------+--------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + Ok(()) + } + #[tokio::test] async fn test_multiple_predicate_pushdown() { use crate::datafusion::prelude::SessionContext; diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index e5167c5b09..956522948c 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -950,6 +950,19 @@ mod tests { // ); } + #[tokio::test] + async fn test_open_column_mapping_table() -> crate::DeltaResult<()> { + let table = crate::open_table( + "../test/tests/data/table_with_column_mapping" + ).await?; + let file_stats = table + .snapshot() + .unwrap() + .snapshot + .log_data(); + Ok(()) + } + #[tokio::test] #[ignore = "re-enable once https://github.com/delta-io/delta-kernel-rs/issues/1075 is resolved."] async fn df_stats_delta_1_2_1_struct_stats_table() { diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index fbb9189ad3..769244a663 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -8,6 +8,7 @@ use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; use delta_kernel::expressions::Scalar; use delta_kernel::schema::DataType; use delta_kernel::schema::PrimitiveType; +use tracing::log::*; use super::parse::collect_map; use crate::kernel::arrow::extract::{self as ex}; @@ -19,6 +20,7 @@ pub(crate) fn parse_partitions( partition_schema: &StructType, raw_path: &str, ) -> DeltaResult { + trace!("parse_partitions: batch: {batch:?}\npartition_schema: {partition_schema:?}\npath: {raw_path}"); let partitions = ex::extract_and_cast_opt::(batch, raw_path).ok_or(DeltaTableError::generic( "No partitionValues column found in files batch. This is unexpected.", @@ -29,7 +31,7 @@ pub(crate) fn parse_partitions( .fields() .map(|f| { ( - f.name().to_string(), + f.physical_name().to_string(), Vec::::with_capacity(partitions.len()), ) }) @@ -69,17 +71,17 @@ pub(crate) fn parse_partitions( partition_schema.fields().for_each(|f| { let value = data - .get(f.name()) + .get(f.physical_name()) .cloned() .unwrap_or(Scalar::Null(f.data_type().clone())); - values.get_mut(f.name()).unwrap().push(value); + values.get_mut(f.physical_name()).unwrap().push(value); }); } let columns = partition_schema .fields() .map(|f| { - let values = values.get(f.name()).unwrap(); + let values = values.get(f.physical_name()).unwrap(); match f.data_type() { DataType::Primitive(p) => { // Safety: we created the Scalars above using the parsing function of the same PrimitiveType @@ -201,3 +203,115 @@ pub(crate) fn parse_partitions( num_rows, )?) } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::MapBuilder; + use arrow::array::MapFieldNames; + use arrow::array::StringBuilder; + use arrow::datatypes::DataType as ArrowDataType; + use arrow::datatypes::Field as ArrowField; + use arrow::datatypes::Schema as ArrowSchema; + use arrow_schema::Field; + use delta_kernel::schema::{MapType, MetadataValue, SchemaRef, StructField}; + + #[test] + fn test_physical_partition_name_mapping() -> crate::DeltaResult<()> { + let physical_partition_name = "col-173b4db9-b5ad-427f-9e75-516aae37fbbb".to_string(); + let schema: SchemaRef = delta_kernel::scan::scan_row_schema().project(&[ + "path", + "size", + "fileConstantValues", + ])?; + let partition_schema = StructType::new(vec![StructField::nullable( + "Company Very Short", + DataType::STRING, + ) + .with_metadata(vec![ + ( + "delta.columnMapping.id".to_string(), + MetadataValue::Number(1), + ), + ( + "delta.columnMapping.physicalName".to_string(), + MetadataValue::String(physical_partition_name.clone()), + ), + ])]); + + let partition_values = MapType::new(DataType::STRING, DataType::STRING, true); + let file_constant_values: SchemaRef = Arc::new(StructType::new([StructField::nullable( + "partitionValues", + partition_values, + )])); + // Inspecting the schema of file_constant_values: + let _: ArrowSchema = file_constant_values.as_ref().try_into_arrow()?; + + // Constructing complex types in Arrow is hell. + // Absolute hell. + // + // The partition column values that should be coming off the log are: + // "col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BMS" + let keys_builder = StringBuilder::new(); + let values_builder = StringBuilder::new(); + let map_fields = MapFieldNames { + entry: "key_value".into(), + key: "key".into(), + value: "value".into(), + }; + let mut partitions = MapBuilder::new(Some(map_fields), keys_builder, values_builder); + + // The partition named in the schema, we need to get the physical name's "rename" out though + partitions + .keys() + .append_value(physical_partition_name.clone()); + partitions.values().append_value("BMS"); + partitions.append(true).unwrap(); + let partitions = partitions.finish(); + + let struct_fields = Fields::from(vec![ + Field::new("key", ArrowDataType::Utf8, false), + Field::new("value", ArrowDataType::Utf8, true), + ]); + let map_field = Arc::new(ArrowField::new( + "key_value", + ArrowDataType::Struct(struct_fields), + false, + )); + + let parts = StructArray::from(vec![( + Arc::new(ArrowField::new( + "partitionValues", + ArrowDataType::Map(map_field, false), + true, + )), + Arc::new(partitions) as ArrayRef, + )]); + + let batch = RecordBatch::try_new( + Arc::new(schema.as_ref().try_into_arrow()?), + vec![ + // path + Arc::new(StringArray::from(vec!["foo.parquet".to_string()])), + // size + Arc::new(Int64Array::from(vec![1])), + // fileConstantValues + Arc::new(parts), + ], + )?; + + let raw_path = "fileConstantValues.partitionValues"; + let partitions = parse_partitions(&batch, &partition_schema, raw_path)?; + assert_eq!( + None, + partitions.column_by_name(&physical_partition_name), + "Should not have found the physical column name" + ); + assert_ne!( + None, + partitions.column_by_name("Company Very Short"), + "Should have found the renamed column" + ); + Ok(()) + } +}