Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2791,6 +2791,40 @@ mod tests {
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn test_select_from_column_mapping() -> DeltaResult<()> {
let table = crate::open_table(
"../test/tests/data/table_with_column_mapping"
).await?;
let config = DeltaScanConfigBuilder::new()
.build(table.snapshot().unwrap())
.unwrap();
let log = table.log_store();

let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
let ctx: SessionContext = DeltaSessionContext::default().into();
ctx.register_table("test", Arc::new(provider)).unwrap();

let df = ctx
.sql("select * from test")
.await?;
let actual = df.collect().await?;
let expected = vec![
"+--------------------+--------------------+",
"| test.col_1[col_1a] | test.col_1[col_1b] |",
"+--------------------+--------------------+",
"| A | |",
"| B | |",
"| E | E2 |",
"| F | F2 |",
"| G | G2 |",
"+--------------------+--------------------+",
];
assert_batches_sorted_eq!(&expected, &actual);
Ok(())
}

#[tokio::test]
async fn test_multiple_predicate_pushdown() {
use crate::datafusion::prelude::SessionContext;
Expand Down
13 changes: 13 additions & 0 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,19 @@ mod tests {
// );
}

#[tokio::test]
async fn test_open_column_mapping_table() -> crate::DeltaResult<()> {
let table = crate::open_table(
"../test/tests/data/table_with_column_mapping"
).await?;
let file_stats = table
.snapshot()
.unwrap()
.snapshot
.log_data();
Ok(())
}

#[tokio::test]
#[ignore = "re-enable once https://github.com/delta-io/delta-kernel-rs/issues/1075 is resolved."]
async fn df_stats_delta_1_2_1_struct_stats_table() {
Expand Down
122 changes: 118 additions & 4 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::expressions::Scalar;
use delta_kernel::schema::DataType;
use delta_kernel::schema::PrimitiveType;
use tracing::log::*;

use super::parse::collect_map;
use crate::kernel::arrow::extract::{self as ex};
Expand All @@ -19,6 +20,7 @@ pub(crate) fn parse_partitions(
partition_schema: &StructType,
raw_path: &str,
) -> DeltaResult<StructArray> {
trace!("parse_partitions: batch: {batch:?}\npartition_schema: {partition_schema:?}\npath: {raw_path}");
let partitions =
ex::extract_and_cast_opt::<MapArray>(batch, raw_path).ok_or(DeltaTableError::generic(
"No partitionValues column found in files batch. This is unexpected.",
Expand All @@ -29,7 +31,7 @@ pub(crate) fn parse_partitions(
.fields()
.map(|f| {
(
f.name().to_string(),
f.physical_name().to_string(),
Vec::<Scalar>::with_capacity(partitions.len()),
)
})
Expand Down Expand Up @@ -69,17 +71,17 @@ pub(crate) fn parse_partitions(

partition_schema.fields().for_each(|f| {
let value = data
.get(f.name())
.get(f.physical_name())
.cloned()
.unwrap_or(Scalar::Null(f.data_type().clone()));
values.get_mut(f.name()).unwrap().push(value);
values.get_mut(f.physical_name()).unwrap().push(value);
});
}

let columns = partition_schema
.fields()
.map(|f| {
let values = values.get(f.name()).unwrap();
let values = values.get(f.physical_name()).unwrap();
match f.data_type() {
DataType::Primitive(p) => {
// Safety: we created the Scalars above using the parsing function of the same PrimitiveType
Expand Down Expand Up @@ -201,3 +203,115 @@ pub(crate) fn parse_partitions(
num_rows,
)?)
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::MapBuilder;
use arrow::array::MapFieldNames;
use arrow::array::StringBuilder;
use arrow::datatypes::DataType as ArrowDataType;
use arrow::datatypes::Field as ArrowField;
use arrow::datatypes::Schema as ArrowSchema;
use arrow_schema::Field;
use delta_kernel::schema::{MapType, MetadataValue, SchemaRef, StructField};

#[test]
fn test_physical_partition_name_mapping() -> crate::DeltaResult<()> {
let physical_partition_name = "col-173b4db9-b5ad-427f-9e75-516aae37fbbb".to_string();
let schema: SchemaRef = delta_kernel::scan::scan_row_schema().project(&[
"path",
"size",
"fileConstantValues",
])?;
let partition_schema = StructType::new(vec![StructField::nullable(
"Company Very Short",
DataType::STRING,
)
.with_metadata(vec![
(
"delta.columnMapping.id".to_string(),
MetadataValue::Number(1),
),
(
"delta.columnMapping.physicalName".to_string(),
MetadataValue::String(physical_partition_name.clone()),
),
])]);

let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
let file_constant_values: SchemaRef = Arc::new(StructType::new([StructField::nullable(
"partitionValues",
partition_values,
)]));
// Inspecting the schema of file_constant_values:
let _: ArrowSchema = file_constant_values.as_ref().try_into_arrow()?;

// Constructing complex types in Arrow is hell.
// Absolute hell.
//
// The partition column values that should be coming off the log are:
// "col-173b4db9-b5ad-427f-9e75-516aae37fbbb":"BMS"
let keys_builder = StringBuilder::new();
let values_builder = StringBuilder::new();
let map_fields = MapFieldNames {
entry: "key_value".into(),
key: "key".into(),
value: "value".into(),
};
let mut partitions = MapBuilder::new(Some(map_fields), keys_builder, values_builder);

// The partition named in the schema, we need to get the physical name's "rename" out though
partitions
.keys()
.append_value(physical_partition_name.clone());
partitions.values().append_value("BMS");
partitions.append(true).unwrap();
let partitions = partitions.finish();

let struct_fields = Fields::from(vec![
Field::new("key", ArrowDataType::Utf8, false),
Field::new("value", ArrowDataType::Utf8, true),
]);
let map_field = Arc::new(ArrowField::new(
"key_value",
ArrowDataType::Struct(struct_fields),
false,
));

let parts = StructArray::from(vec![(
Arc::new(ArrowField::new(
"partitionValues",
ArrowDataType::Map(map_field, false),
true,
)),
Arc::new(partitions) as ArrayRef,
)]);

let batch = RecordBatch::try_new(
Arc::new(schema.as_ref().try_into_arrow()?),
vec![
// path
Arc::new(StringArray::from(vec!["foo.parquet".to_string()])),
// size
Arc::new(Int64Array::from(vec![1])),
// fileConstantValues
Arc::new(parts),
],
)?;

let raw_path = "fileConstantValues.partitionValues";
let partitions = parse_partitions(&batch, &partition_schema, raw_path)?;
assert_eq!(
None,
partitions.column_by_name(&physical_partition_name),
"Should not have found the physical column name"
);
assert_ne!(
None,
partitions.column_by_name("Company Very Short"),
"Should have found the renamed column"
);
Ok(())
}
}
Loading