diff --git a/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs b/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs index 03145cd65..092c61955 100644 --- a/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs +++ b/crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs @@ -15,7 +15,7 @@ //! predicates, while execution plans handle the actual data reading and transformation. use std::{collections::VecDeque, pin::Pin, sync::Arc}; - +use std::collections::HashSet; use arrow::array::AsArray; use arrow_array::{ArrayRef, RecordBatch, StructArray}; use arrow_cast::{CastOptions, cast_with_options}; @@ -49,6 +49,7 @@ use datafusion_datasource::file::FileSource; use delta_kernel::{ Engine, Expression, expressions::StructData, scan::ScanMetadata, table_features::TableFeature, }; +use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; use futures::{Stream, TryStreamExt as _, future::ready}; use itertools::Itertools as _; use object_store::{ObjectMeta, path::Path}; @@ -238,6 +239,17 @@ async fn get_data_scan_plan( // let pq_plan = if false { let pq_plan = if let Some(result_projection_deep) = scan_plan.result_projection_deep.clone() && has_deep_projection(&result_projection_deep) { + + // Build the true physical schema from the kernel table config (no virtual overlay fields). + // scan_plan.parquet_read_schema may include virtual sub-fields injected via a custom + // DeltaScanConfig::with_schema, so we derive physical field names from table_config instead. + let table_config_schema: SchemaRef = Arc::new(table_config.schema().as_ref().try_into_arrow()?); + let physical_projection_deep = filter_projection_deep_for_physical_schema( + &result_projection_deep, + &scan_plan.result_schema, + &table_config_schema, + ); + get_read_plan_deep( session, files_by_store, @@ -245,7 +257,7 @@ async fn get_data_scan_plan( limit, &file_id_field, predicate, - result_projection_deep.clone() + physical_projection_deep, ) .await? } else { @@ -501,6 +513,64 @@ async fn get_read_plan_deep( }) } +/// Translates a logical `projection_deep` map — produced by DataFusion against +/// the overlay/virtual schema — into one that is safe to hand to the Parquet reader, +/// which only knows about the delta table schema. +/// +/// For each entry: +/// - If the top-level field is absent from `delta_table_schema` (entirely virtual), +/// the entry is dropped. it will be computed from other physical columns. +/// - If the field is a Struct, virtual sub-field names are filtered out. If all +/// requested sub-fields are virtual, the first physical sub-field is injected as a +/// row-count anchor so the reader produces the correct number of rows for null +/// materialization of virtual sub-fields. +/// - Non-struct physical fields are carried through unchanged. +/// +/// Keys in the returned map are indices into `delta_table_schema` (not the result schema). +fn filter_projection_deep_for_physical_schema( + projection_deep: &std::collections::HashMap>, + result_schema: &SchemaRef, + table_config_schema: &SchemaRef, +) -> std::collections::HashMap> { + use std::collections::{HashMap, HashSet}; + + let mut physical = HashMap::with_capacity(projection_deep.len()); + + for (logical_idx, requested) in projection_deep { + // Resolve logical → physical index + let Some(field) = result_schema.fields().get(*logical_idx) else { continue }; + let Ok(physical_idx) = table_config_schema.index_of(field.name()) else { continue }; + + let filtered = match table_config_schema.field(physical_idx).data_type() { + DataType::Struct(physical_fields) => { + let physical_names: HashSet<&str> = + physical_fields.iter().map(|f| f.name().as_str()).collect(); + + let kept: Vec<_> = requested + .iter() + .filter(|n| physical_names.contains(n.as_str())) + .cloned() + .collect(); + + if kept.is_empty() { + // All sub-fields are virtual — anchor on the first physical one + match physical_fields.first() { + Some(anchor) => vec![anchor.name().clone()], + None => continue, + } + } else { + kept + } + } + _ => requested.clone(), + }; + + physical.insert(physical_idx, filtered); + } + + physical +} + // Small helper to reuse some code between exec and exec_meta fn finalize_transformed_batch( batch: RecordBatch, diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index d9dff72c9..3e4cd3298 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -2196,6 +2196,7 @@ mod deep { use std::sync::Arc; use arrow_cast::display::FormatOptions; use arrow_cast::pretty; + use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion::datasource::physical_plan::ParquetSource; use datafusion::optimizer::optimize_projections_deep::DeepColumnIndexMap; @@ -2208,7 +2209,8 @@ mod deep { use datafusion_proto::protobuf::PhysicalPlanNode; use prost::Message; use tracing::info; - use deltalake_core::delta_datafusion::{DeltaNextPhysicalCodec, DeltaPhysicalCodec, DeltaScanExec}; + use deltalake_core::DeltaTableBuilder; + use deltalake_core::delta_datafusion::{DataFusionMixins, DeltaNextPhysicalCodec, DeltaPhysicalCodec, DeltaScanConfig, DeltaScanExec, DeltaScanNext}; use deltalake_core::delta_datafusion::table_provider_old::DeltaTableOldProvider; use deltalake_core::delta_datafusion::udtf::register_delta_table_udtf; @@ -2485,5 +2487,180 @@ mod deep { } } + fn add_virtual_field_to_schema( + base: &Schema, + field_name: &str, + field_to_add: Field, + ) -> SchemaRef { + let new_fields: Vec> = base.fields().iter().map(|f| { + if f.name() == field_name { + if let DataType::Struct(inner) = f.data_type() { + let mut new_inner: Vec> = inner.iter().cloned().collect(); + new_inner.push(Arc::new(field_to_add.clone())); + return Arc::new(Field::new( + f.name(), + DataType::Struct(Fields::from(new_inner)), + f.is_nullable(), + )); + } + } + f.clone() + }).collect(); + Arc::new(Schema::new(new_fields)) + } + + /// Bug regression: when a custom logical schema adds a virtual sub-field alongside + /// physical sub-fields in a struct, deep_projection must not pass the virtual + /// field name to the Parquet reader (which falls back to reading the full struct). + #[tokio::test] + async fn test_deep_projection_with_virtual_sibling_field() -> datafusion::common::Result<()> { + unsafe { std::env::set_var("DELTA_USE_EXPR_ADAPTER", "1"); } + + let config = SessionConfig::new() + .set_bool("datafusion.sql_parser.enable_ident_normalization", false); + let ctx = SessionContext::new_with_config(config); + + let delta_path = format!("{}/tests/data/deep", env!("CARGO_MANIFEST_DIR")); + let url = url::Url::from_directory_path( + std::path::Path::new(&delta_path).canonicalize()?, + ) + .unwrap(); + + let table = DeltaTableBuilder::from_url(url)? + .load() + .await?; + + let base_schema = table.snapshot()?.snapshot().read_schema(); + let custom_schema = add_virtual_field_to_schema( + &base_schema, + "_acp_system_metadata", + Field::new("virtual_field", DataType::Utf8, true), + ); + + let scan_config = DeltaScanConfig::new().with_schema(custom_schema); + let scan = DeltaScanNext::new( + table.snapshot()?.snapshot().clone(), + scan_config, + ) + .expect("failed to build DeltaScanNext"); + ctx.register_table("t", Arc::new(scan))?; + + // Select one physical sub-field and one virtual sub-field from the same struct. + let query = r#" + SELECT + t._acp_system_metadata.acp_sourceBatchId AS batch_id, + t._acp_system_metadata.virtual_field AS vfield + FROM t + "#; + + let plan = ctx.state().create_logical_plan(query).await?; + let optimized = ctx.state().optimize(&plan)?; + let physical_plan = ctx.state().query_planner() + .create_physical_plan(&optimized, &ctx.state()) + .await?; + + // Correct data: physical field has a value, virtual field is null. + let batches = collect(physical_plan.clone(), ctx.state().task_ctx()).await?; + let results = pretty::pretty_format_batches_with_options( + &batches, + &FormatOptions::default(), + )? + .to_string(); + println!("{}", results); + assert!(results.contains("b1"), "acp_sourceBatchId should be 'b1'"); + assert!(results.contains('|'), "expected tabular output"); + + // The physical Parquet plan must NOT include "virtual_field" in projection_deep. + let proj = extract_projection_deep_from_plan(physical_plan); + assert!(!proj.is_empty(), "expected at least one deep projection in plan"); + let deep_map = proj[0].as_ref().expect("deep projection must be set"); + assert!( + deep_map.values().all(|fields| !fields.contains(&"virtual_field".to_string())), + "virtual_field must be stripped from physical projection_deep, got: {:?}", + deep_map, + ); + assert!( + deep_map.values().any(|fields| fields.contains(&"acp_sourceBatchId".to_string())), + "acp_sourceBatchId must remain in physical projection_deep, got: {:?}", + deep_map, + ); + + Ok(()) + } + + /// Bug regression: when ALL requested sub-fields of a struct are virtual, + /// the filter must inject the first physical sub-field as a row-count anchor + /// so the reader produces the right number of null rows. + #[tokio::test] + async fn test_deep_projection_only_virtual_sub_fields() -> datafusion::common::Result<()> { + unsafe { std::env::set_var("DELTA_USE_EXPR_ADAPTER", "1"); } + + let config = SessionConfig::new() + .set_bool("datafusion.sql_parser.enable_ident_normalization", false); + let ctx = SessionContext::new_with_config(config); + + let delta_path = format!("{}/tests/data/deep", env!("CARGO_MANIFEST_DIR")); + let url = url::Url::from_directory_path( + std::path::Path::new(&delta_path).canonicalize()?, + ) + .unwrap(); + + let table = DeltaTableBuilder::from_url(url)? + .load() + .await?; + + let base_schema = table.snapshot()?.snapshot().read_schema(); + let custom_schema = add_virtual_field_to_schema( + &base_schema, + "_acp_system_metadata", + Field::new("virtual_field", DataType::Utf8, true), + ); + + let scan_config = DeltaScanConfig::new().with_schema(custom_schema); + let scan = DeltaScanNext::new( + table.snapshot()?.snapshot().clone(), + scan_config, + ) + .expect("failed to build DeltaScanNext"); + ctx.register_table("t", Arc::new(scan))?; + + // Select ONLY a virtual sub-field — no physical sub-field requested. + let query = r#" + SELECT t._acp_system_metadata.virtual_field AS vfield + FROM t + "#; + + let plan = ctx.state().create_logical_plan(query).await?; + let optimized = ctx.state().optimize(&plan)?; + let physical_plan = ctx.state().query_planner() + .create_physical_plan(&optimized, &ctx.state()) + .await?; + + // virtual_field must be all-null with the correct row count (1 row in this table). + let batches = collect(physical_plan.clone(), ctx.state().task_ctx()).await?; + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 1, "expected 1 row matching physical table row count"); + let col = batches[0].column_by_name("vfield").expect("vfield column must exist"); + assert_eq!(col.null_count(), col.len(), "virtual_field column must be all-null"); + + // The physical plan must have a non-empty projection_deep for the struct + // (the row-count anchor), so we did not fall back to reading the full struct. + let proj = extract_projection_deep_from_plan(physical_plan); + assert!(!proj.is_empty(), "expected deep projection to be present"); + let deep_map = proj[0].as_ref().expect("deep projection must be set"); + assert!( + deep_map.values().any(|fields| !fields.is_empty()), + "projection_deep must have at least one physical anchor field, got: {:?}", + deep_map, + ); + assert!( + deep_map.values().all(|fields| !fields.contains(&"virtual_field".to_string())), + "virtual_field must not appear in physical projection_deep, got: {:?}", + deep_map, + ); + + Ok(()) + } + }