Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 72 additions & 2 deletions crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! predicates, while execution plans handle the actual data reading and transformation.

use std::{collections::VecDeque, pin::Pin, sync::Arc};

use std::collections::HashSet;
use arrow::array::AsArray;
use arrow_array::{ArrayRef, RecordBatch, StructArray};
use arrow_cast::{CastOptions, cast_with_options};
Expand Down Expand Up @@ -49,6 +49,7 @@ use datafusion_datasource::file::FileSource;
use delta_kernel::{
Engine, Expression, expressions::StructData, scan::ScanMetadata, table_features::TableFeature,
};
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use futures::{Stream, TryStreamExt as _, future::ready};
use itertools::Itertools as _;
use object_store::{ObjectMeta, path::Path};
Expand Down Expand Up @@ -238,14 +239,25 @@ async fn get_data_scan_plan(
// let pq_plan = if false {
let pq_plan = if let Some(result_projection_deep) = scan_plan.result_projection_deep.clone()
&& has_deep_projection(&result_projection_deep) {

// Build the true physical schema from the kernel table config (no virtual overlay fields).
// scan_plan.parquet_read_schema may include virtual sub-fields injected via a custom
// DeltaScanConfig::with_schema, so we derive physical field names from table_config instead.
let table_config_schema: SchemaRef = Arc::new(table_config.schema().as_ref().try_into_arrow()?);
let physical_projection_deep = filter_projection_deep_for_physical_schema(
&result_projection_deep,
&scan_plan.result_schema,
&table_config_schema,
);

get_read_plan_deep(
session,
files_by_store,
&scan_plan.parquet_read_schema,
limit,
&file_id_field,
predicate,
result_projection_deep.clone()
physical_projection_deep,
)
.await?
} else {
Expand Down Expand Up @@ -501,6 +513,64 @@ async fn get_read_plan_deep(
})
}

/// Translates a logical `projection_deep` map — produced by DataFusion against
/// the overlay/virtual schema — into one that is safe to hand to the Parquet reader,
/// which only knows about the delta table schema.
///
/// For each entry:
/// - If the top-level field is absent from `delta_table_schema` (entirely virtual),
/// the entry is dropped. it will be computed from other physical columns.
/// - If the field is a Struct, virtual sub-field names are filtered out. If all
/// requested sub-fields are virtual, the first physical sub-field is injected as a
/// row-count anchor so the reader produces the correct number of rows for null
/// materialization of virtual sub-fields.
/// - Non-struct physical fields are carried through unchanged.
///
/// Keys in the returned map are indices into `delta_table_schema` (not the result schema).
fn filter_projection_deep_for_physical_schema(
projection_deep: &std::collections::HashMap<usize, Vec<String>>,
result_schema: &SchemaRef,
table_config_schema: &SchemaRef,
) -> std::collections::HashMap<usize, Vec<String>> {
use std::collections::{HashMap, HashSet};

let mut physical = HashMap::with_capacity(projection_deep.len());

for (logical_idx, requested) in projection_deep {
// Resolve logical → physical index
let Some(field) = result_schema.fields().get(*logical_idx) else { continue };
let Ok(physical_idx) = table_config_schema.index_of(field.name()) else { continue };

let filtered = match table_config_schema.field(physical_idx).data_type() {
DataType::Struct(physical_fields) => {
let physical_names: HashSet<&str> =
physical_fields.iter().map(|f| f.name().as_str()).collect();

let kept: Vec<_> = requested
.iter()
.filter(|n| physical_names.contains(n.as_str()))
.cloned()
.collect();

if kept.is_empty() {
// All sub-fields are virtual — anchor on the first physical one
match physical_fields.first() {
Some(anchor) => vec![anchor.name().clone()],
None => continue,
}
} else {
kept
}
}
_ => requested.clone(),
};

physical.insert(physical_idx, filtered);
}

physical
}

// Small helper to reuse some code between exec and exec_meta
fn finalize_transformed_batch(
batch: RecordBatch,
Expand Down
179 changes: 178 additions & 1 deletion crates/core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2196,6 +2196,7 @@ mod deep {
use std::sync::Arc;
use arrow_cast::display::FormatOptions;
use arrow_cast::pretty;
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::optimizer::optimize_projections_deep::DeepColumnIndexMap;
Expand All @@ -2208,7 +2209,8 @@ mod deep {
use datafusion_proto::protobuf::PhysicalPlanNode;
use prost::Message;
use tracing::info;
use deltalake_core::delta_datafusion::{DeltaNextPhysicalCodec, DeltaPhysicalCodec, DeltaScanExec};
use deltalake_core::DeltaTableBuilder;
use deltalake_core::delta_datafusion::{DataFusionMixins, DeltaNextPhysicalCodec, DeltaPhysicalCodec, DeltaScanConfig, DeltaScanExec, DeltaScanNext};
use deltalake_core::delta_datafusion::table_provider_old::DeltaTableOldProvider;
use deltalake_core::delta_datafusion::udtf::register_delta_table_udtf;

Expand Down Expand Up @@ -2485,5 +2487,180 @@ mod deep {
}
}

fn add_virtual_field_to_schema(
base: &Schema,
field_name: &str,
field_to_add: Field,
) -> SchemaRef {
let new_fields: Vec<Arc<Field>> = base.fields().iter().map(|f| {
if f.name() == field_name {
if let DataType::Struct(inner) = f.data_type() {
let mut new_inner: Vec<Arc<Field>> = inner.iter().cloned().collect();
new_inner.push(Arc::new(field_to_add.clone()));
return Arc::new(Field::new(
f.name(),
DataType::Struct(Fields::from(new_inner)),
f.is_nullable(),
));
}
}
f.clone()
}).collect();
Arc::new(Schema::new(new_fields))
}

/// Bug regression: when a custom logical schema adds a virtual sub-field alongside
/// physical sub-fields in a struct, deep_projection must not pass the virtual
/// field name to the Parquet reader (which falls back to reading the full struct).
#[tokio::test]
async fn test_deep_projection_with_virtual_sibling_field() -> datafusion::common::Result<()> {
unsafe { std::env::set_var("DELTA_USE_EXPR_ADAPTER", "1"); }

let config = SessionConfig::new()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false);
let ctx = SessionContext::new_with_config(config);

let delta_path = format!("{}/tests/data/deep", env!("CARGO_MANIFEST_DIR"));
let url = url::Url::from_directory_path(
std::path::Path::new(&delta_path).canonicalize()?,
)
.unwrap();

let table = DeltaTableBuilder::from_url(url)?
.load()
.await?;

let base_schema = table.snapshot()?.snapshot().read_schema();
let custom_schema = add_virtual_field_to_schema(
&base_schema,
"_acp_system_metadata",
Field::new("virtual_field", DataType::Utf8, true),
);

let scan_config = DeltaScanConfig::new().with_schema(custom_schema);
let scan = DeltaScanNext::new(
table.snapshot()?.snapshot().clone(),
scan_config,
)
.expect("failed to build DeltaScanNext");
ctx.register_table("t", Arc::new(scan))?;

// Select one physical sub-field and one virtual sub-field from the same struct.
let query = r#"
SELECT
t._acp_system_metadata.acp_sourceBatchId AS batch_id,
t._acp_system_metadata.virtual_field AS vfield
FROM t
"#;

let plan = ctx.state().create_logical_plan(query).await?;
let optimized = ctx.state().optimize(&plan)?;
let physical_plan = ctx.state().query_planner()
.create_physical_plan(&optimized, &ctx.state())
.await?;

// Correct data: physical field has a value, virtual field is null.
let batches = collect(physical_plan.clone(), ctx.state().task_ctx()).await?;
let results = pretty::pretty_format_batches_with_options(
&batches,
&FormatOptions::default(),
)?
.to_string();
println!("{}", results);
assert!(results.contains("b1"), "acp_sourceBatchId should be 'b1'");
assert!(results.contains('|'), "expected tabular output");

// The physical Parquet plan must NOT include "virtual_field" in projection_deep.
let proj = extract_projection_deep_from_plan(physical_plan);
assert!(!proj.is_empty(), "expected at least one deep projection in plan");
let deep_map = proj[0].as_ref().expect("deep projection must be set");
assert!(
deep_map.values().all(|fields| !fields.contains(&"virtual_field".to_string())),
"virtual_field must be stripped from physical projection_deep, got: {:?}",
deep_map,
);
assert!(
deep_map.values().any(|fields| fields.contains(&"acp_sourceBatchId".to_string())),
"acp_sourceBatchId must remain in physical projection_deep, got: {:?}",
deep_map,
);

Ok(())
}

/// Bug regression: when ALL requested sub-fields of a struct are virtual,
/// the filter must inject the first physical sub-field as a row-count anchor
/// so the reader produces the right number of null rows.
#[tokio::test]
async fn test_deep_projection_only_virtual_sub_fields() -> datafusion::common::Result<()> {
unsafe { std::env::set_var("DELTA_USE_EXPR_ADAPTER", "1"); }

let config = SessionConfig::new()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false);
let ctx = SessionContext::new_with_config(config);

let delta_path = format!("{}/tests/data/deep", env!("CARGO_MANIFEST_DIR"));
let url = url::Url::from_directory_path(
std::path::Path::new(&delta_path).canonicalize()?,
)
.unwrap();

let table = DeltaTableBuilder::from_url(url)?
.load()
.await?;

let base_schema = table.snapshot()?.snapshot().read_schema();
let custom_schema = add_virtual_field_to_schema(
&base_schema,
"_acp_system_metadata",
Field::new("virtual_field", DataType::Utf8, true),
);

let scan_config = DeltaScanConfig::new().with_schema(custom_schema);
let scan = DeltaScanNext::new(
table.snapshot()?.snapshot().clone(),
scan_config,
)
.expect("failed to build DeltaScanNext");
ctx.register_table("t", Arc::new(scan))?;

// Select ONLY a virtual sub-field — no physical sub-field requested.
let query = r#"
SELECT t._acp_system_metadata.virtual_field AS vfield
FROM t
"#;

let plan = ctx.state().create_logical_plan(query).await?;
let optimized = ctx.state().optimize(&plan)?;
let physical_plan = ctx.state().query_planner()
.create_physical_plan(&optimized, &ctx.state())
.await?;

// virtual_field must be all-null with the correct row count (1 row in this table).
let batches = collect(physical_plan.clone(), ctx.state().task_ctx()).await?;
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1, "expected 1 row matching physical table row count");
let col = batches[0].column_by_name("vfield").expect("vfield column must exist");
assert_eq!(col.null_count(), col.len(), "virtual_field column must be all-null");

// The physical plan must have a non-empty projection_deep for the struct
// (the row-count anchor), so we did not fall back to reading the full struct.
let proj = extract_projection_deep_from_plan(physical_plan);
assert!(!proj.is_empty(), "expected deep projection to be present");
let deep_map = proj[0].as_ref().expect("deep projection must be set");
assert!(
deep_map.values().any(|fields| !fields.is_empty()),
"projection_deep must have at least one physical anchor field, got: {:?}",
deep_map,
);
assert!(
deep_map.values().all(|fields| !fields.contains(&"virtual_field".to_string())),
"virtual_field must not appear in physical projection_deep, got: {:?}",
deep_map,
);

Ok(())
}

}

Loading