Skip to content

Commit ef405d8

Browse files
authored
refactor: use BatchAdapterFactory for scan adaptation (#4195)
# Description Follow up to the DF 52 upgrade. Replaces custom scan batch casting logic with DataFusion's `BatchAdapterFactory` via `datafusion-physical-expr-adapter`. Adds hardening tests for schema evolution and DV scan behavior Keeps DF default behavior with no custom compatibility mode # Related Issue(s) <!--- For example: - closes #106 ---> # Documentation <!--- Share links to useful documentation ---> --------- Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
1 parent f157cdc commit ef405d8

File tree

5 files changed

+235
-31
lines changed

5 files changed

+235
-31
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ parquet = { version = "57" }
4141
# datafusion 52.1
4242
datafusion = { version = "52.1.0" }
4343
datafusion-datasource = { version = "52.1.0" }
44+
datafusion-physical-expr-adapter = { version = "52.1.0" }
4445
datafusion-ffi = { version = "52.1.0" }
4546
datafusion-proto = { version = "52.1.0" }
4647

crates/core/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ object_store = { workspace = true }
3737
# datafusion
3838
datafusion = { workspace = true, optional = true }
3939
datafusion-datasource = { workspace = true, optional = true }
40+
datafusion-physical-expr-adapter = { workspace = true, optional = true }
4041
datafusion-proto = { workspace = true, optional = true }
4142

4243
# serde
@@ -108,6 +109,7 @@ default = ["rustls"]
108109
datafusion = [
109110
"dep:datafusion",
110111
"datafusion-datasource",
112+
"datafusion-physical-expr-adapter",
111113
"datafusion-proto",
112114
]
113115
datafusion-ext = ["datafusion"]

crates/core/src/delta_datafusion/table_provider/next/scan/exec_meta.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ use arrow_array::{
1818
};
1919
use arrow_schema::{DataType, FieldRef, Fields, Schema};
2020
use dashmap::DashMap;
21-
use datafusion::common::HashMap;
2221
use datafusion::common::config::ConfigOptions;
2322
use datafusion::common::error::{DataFusionError, Result};
23+
use datafusion::common::{HashMap, internal_datafusion_err};
2424
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
2525
use datafusion::physical_expr::EquivalenceProperties;
2626
use datafusion::physical_plan::execution_plan::{
@@ -283,12 +283,16 @@ impl DeltaScanMetaStream {
283283
)?;
284284

285285
let batch = if let Some(selection) = self.selection_vectors.get(&file_id) {
286-
let missing = batch.num_rows() - selection.len();
287-
let filter = if missing > 0 {
288-
BooleanArray::from_iter(selection.iter().chain(std::iter::repeat_n(&true, missing)))
289-
} else {
290-
BooleanArray::from_iter(selection.iter())
291-
};
286+
if selection.len() != batch.num_rows() {
287+
return Err(internal_datafusion_err!(
288+
"Selection vector length ({}) does not match row count ({}) for file '{}'. \
289+
This indicates a bug in deletion vector processing.",
290+
selection.len(),
291+
batch.num_rows(),
292+
file_id
293+
));
294+
}
295+
let filter = BooleanArray::from_iter(selection.iter());
292296
filter_record_batch(&batch, &filter)?
293297
} else {
294298
batch

crates/core/src/delta_datafusion/table_provider/next/scan/mod.rs

Lines changed: 111 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@
1616
1717
use std::{collections::VecDeque, pin::Pin, sync::Arc};
1818

19-
use arrow::array::AsArray;
20-
use arrow_array::{ArrayRef, RecordBatch, StructArray};
19+
use arrow_array::{ArrayRef, RecordBatch};
2120
use arrow_cast::{CastOptions, cast_with_options};
22-
use arrow_schema::{DataType, FieldRef, Schema, SchemaBuilder, SchemaRef};
21+
use arrow_schema::{FieldRef, Schema, SchemaBuilder, SchemaRef};
2322
use chrono::{TimeZone as _, Utc};
2423
use dashmap::DashMap;
2524
use datafusion::{
@@ -43,6 +42,7 @@ use datafusion_datasource::{
4342
PartitionedFile, TableSchema, compute_all_files_statistics, file_groups::FileGroup,
4443
file_scan_config::FileScanConfigBuilder, source::DataSourceExec,
4544
};
45+
use datafusion_physical_expr_adapter::BatchAdapterFactory;
4646
use delta_kernel::{
4747
Engine, Expression, expressions::StructData, scan::ScanMetadata, table_features::TableFeature,
4848
};
@@ -505,36 +505,25 @@ fn finalize_transformed_batch(
505505
}
506506

507507
fn cast_record_batch(batch: RecordBatch, target_schema: &SchemaRef) -> Result<RecordBatch> {
508-
if batch.num_columns() == 0 {
509-
if !target_schema.fields().is_empty() {
510-
return plan_err!(
511-
"Cannot cast empty RecordBatch to non-empty schema: {:?}",
512-
target_schema
513-
);
514-
}
508+
if batch.schema_ref().eq(target_schema) {
515509
return Ok(batch);
516510
}
517511

518-
let options = CastOptions {
519-
safe: true,
520-
..Default::default()
521-
};
522-
Ok(cast_with_options(
523-
&StructArray::from(batch),
524-
&DataType::Struct(target_schema.fields().clone()),
525-
&options,
526-
)?
527-
.as_struct()
528-
.into())
512+
let adapter_factory = BatchAdapterFactory::new(Arc::clone(target_schema));
513+
let adapter = adapter_factory.make_adapter(batch.schema())?;
514+
adapter.adapt_batch(&batch)
529515
}
530516

531517
#[cfg(test)]
532518
mod tests {
519+
use arrow_array::Array;
533520
use arrow_array::{
534-
BinaryArray, BinaryViewArray, Int32Array, RecordBatch, StringArray, StructArray,
521+
BinaryArray, BinaryViewArray, Int32Array, Int64Array, RecordBatch, RecordBatchOptions,
522+
StringArray, StructArray,
535523
};
536-
use arrow_schema::{DataType, Field, Fields, Schema};
524+
use arrow_schema::{ArrowError, DataType, Field, Fields, Schema};
537525
use datafusion::{
526+
error::DataFusionError,
538527
physical_plan::collect,
539528
prelude::{col, lit},
540529
};
@@ -562,6 +551,105 @@ mod tests {
562551
assert_eq!(groups[1].len(), 1);
563552
}
564553

554+
#[test]
555+
fn test_cast_record_batch_empty_input_synthesizes_nullable_columns() {
556+
let source_schema = Arc::new(Schema::new(Fields::empty()));
557+
let source = RecordBatch::try_new_with_options(
558+
source_schema,
559+
vec![],
560+
&RecordBatchOptions::new().with_row_count(Some(2)),
561+
)
562+
.unwrap();
563+
564+
let target_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, true)]));
565+
let adapted = cast_record_batch(source, &target_schema).unwrap();
566+
567+
assert_eq!(adapted.schema().as_ref(), target_schema.as_ref());
568+
assert_eq!(adapted.num_rows(), 2);
569+
let id = adapted
570+
.column(0)
571+
.as_any()
572+
.downcast_ref::<Int32Array>()
573+
.unwrap();
574+
assert_eq!(id.null_count(), 2);
575+
}
576+
577+
#[test]
578+
fn test_cast_record_batch_empty_input_missing_non_nullable_column_errors() {
579+
let source_schema = Arc::new(Schema::new(Fields::empty()));
580+
let source = RecordBatch::try_new_with_options(
581+
source_schema,
582+
vec![],
583+
&RecordBatchOptions::new().with_row_count(Some(1)),
584+
)
585+
.unwrap();
586+
587+
let target_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
588+
let err = cast_record_batch(source, &target_schema)
589+
.expect_err("missing non-nullable columns should error");
590+
match err {
591+
DataFusionError::Execution(msg) => {
592+
assert!(
593+
msg.contains("Non-nullable column 'id'"),
594+
"expected non-nullable missing-column error, got: {msg}"
595+
);
596+
assert!(
597+
msg.contains("missing from the physical schema"),
598+
"expected missing physical schema detail, got: {msg}"
599+
);
600+
}
601+
other => {
602+
panic!("expected execution error for missing non-nullable column, got: {other}")
603+
}
604+
}
605+
}
606+
607+
#[test]
608+
fn test_cast_record_batch_invalid_scalar_cast_errors() {
609+
let source_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, true)]));
610+
let source = RecordBatch::try_new(
611+
source_schema,
612+
vec![Arc::new(StringArray::from(vec![Some("not-an-int")]))],
613+
)
614+
.unwrap();
615+
616+
let target_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, true)]));
617+
let err = cast_record_batch(source, &target_schema)
618+
.expect_err("invalid value cast should fail under DataFusion default cast semantics");
619+
match err {
620+
DataFusionError::ArrowError(inner, _) => {
621+
assert!(
622+
matches!(inner.as_ref(), ArrowError::CastError(_)),
623+
"expected arrow cast error, got: {inner}"
624+
);
625+
}
626+
other => panic!("expected arrow cast error for invalid scalar cast, got: {other}"),
627+
}
628+
}
629+
630+
#[test]
631+
fn test_cast_record_batch_overflow_cast_errors() {
632+
let source_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, true)]));
633+
let source = RecordBatch::try_new(
634+
source_schema,
635+
vec![Arc::new(Int64Array::from(vec![i64::from(i32::MAX) + 1]))],
636+
)
637+
.unwrap();
638+
639+
let target_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, true)]));
640+
let err = cast_record_batch(source, &target_schema)
641+
.expect_err("overflow cast should fail under DataFusion default cast semantics");
642+
match err {
643+
DataFusionError::ArrowError(inner, _) => {
644+
assert!(
645+
matches!(inner.as_ref(), ArrowError::CastError(_)),
646+
"expected arrow cast error, got: {inner}"
647+
);
648+
}
649+
other => panic!("expected arrow cast error for overflow cast, got: {other}"),
650+
}
651+
}
652+
565653
#[tokio::test]
566654
async fn test_parquet_plan() -> TestResult {
567655
let store = Arc::new(InMemory::new());

crates/core/tests/datafusion_table_provider.rs

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ use std::sync::Arc;
33

44
use arrow_array::RecordBatch;
55
use datafusion::assert_batches_sorted_eq;
6+
use datafusion::catalog::TableProvider;
67
use datafusion::physical_plan::{ExecutionPlan, collect_partitioned};
7-
use datafusion::prelude::{SessionContext, col, lit};
8+
use datafusion::prelude::{SessionConfig, SessionContext, col, lit};
9+
use deltalake_core::delta_datafusion::DeltaScanConfig;
810
use deltalake_core::delta_datafusion::DeltaScanNext;
911
use deltalake_core::delta_datafusion::create_session;
1012
use deltalake_core::delta_datafusion::engine::DataFusionEngine;
@@ -31,6 +33,23 @@ async fn scan_dat(case: &str) -> TestResult<(Snapshot, SessionContext)> {
3133
Ok((snapshot, session))
3234
}
3335

36+
async fn scan_dat_with_session(case: &str, session: &SessionContext) -> TestResult<Snapshot> {
37+
let root_dir = format!(
38+
"{}/../../dat/v0.0.3/reader_tests/generated/{}/",
39+
env!["CARGO_MANIFEST_DIR"],
40+
case
41+
);
42+
let root_dir = std::fs::canonicalize(root_dir)?;
43+
let case = read_dat_case(root_dir)?;
44+
45+
let engine = DataFusionEngine::new_from_session(&session.state());
46+
let snapshot =
47+
Snapshot::try_new_with_engine(engine.clone(), case.table_root()?, Default::default(), None)
48+
.await?;
49+
50+
Ok(snapshot)
51+
}
52+
3453
async fn collect_plan(
3554
plan: Arc<dyn ExecutionPlan>,
3655
session: &SessionContext,
@@ -113,6 +132,72 @@ async fn test_all_primitive_types() -> TestResult<()> {
113132
Ok(())
114133
}
115134

135+
#[tokio::test]
136+
async fn test_view_types_filter_exec_compatibility() -> TestResult<()> {
137+
use arrow_schema::DataType;
138+
139+
let config =
140+
SessionConfig::new().set_bool("datafusion.execution.parquet.schema_force_view_types", true);
141+
let session = SessionContext::new_with_config(config);
142+
let snapshot = scan_dat_with_session("all_primitive_types", &session).await?;
143+
let provider: Arc<dyn TableProvider> = Arc::new(DeltaScanNext::new(
144+
snapshot,
145+
DeltaScanConfig::new_from_session(&session.state()),
146+
)?);
147+
148+
let plan = provider.scan(&session.state(), None, &[], None).await?;
149+
let has_view_types = plan
150+
.schema()
151+
.fields()
152+
.iter()
153+
.any(|field| matches!(field.data_type(), DataType::Utf8View | DataType::BinaryView));
154+
assert!(
155+
has_view_types,
156+
"view types should be present when configured"
157+
);
158+
159+
let filter = col("utf8").eq(lit("1"));
160+
let batches = session
161+
.read_table(provider.clone())?
162+
.filter(filter)?
163+
.select(vec![col("utf8")])?
164+
.collect()
165+
.await?;
166+
let expected = vec!["+------+", "| utf8 |", "+------+", "| 1 |", "+------+"];
167+
assert_batches_sorted_eq!(&expected, &batches);
168+
169+
Ok(())
170+
}
171+
172+
#[tokio::test]
173+
async fn test_view_types_disabled() -> TestResult<()> {
174+
use arrow_schema::DataType;
175+
176+
let config = SessionConfig::new().set_bool(
177+
"datafusion.execution.parquet.schema_force_view_types",
178+
false,
179+
);
180+
let session = SessionContext::new_with_config(config);
181+
let snapshot = scan_dat_with_session("all_primitive_types", &session).await?;
182+
let provider: Arc<dyn TableProvider> = Arc::new(DeltaScanNext::new(
183+
snapshot,
184+
DeltaScanConfig::new_from_session(&session.state()),
185+
)?);
186+
187+
let plan = provider.scan(&session.state(), None, &[], None).await?;
188+
let has_view_types = plan
189+
.schema()
190+
.fields()
191+
.iter()
192+
.any(|field| matches!(field.data_type(), DataType::Utf8View | DataType::BinaryView));
193+
assert!(
194+
!has_view_types,
195+
"view types should be disabled when configured"
196+
);
197+
198+
Ok(())
199+
}
200+
116201
#[tokio::test]
117202
async fn test_multi_partitioned() -> TestResult<()> {
118203
let (snapshot, session) = scan_dat("multi_partitioned").await?;
@@ -234,3 +319,27 @@ async fn test_deletion_vectors() -> TestResult<()> {
234319

235320
Ok(())
236321
}
322+
323+
#[tokio::test]
324+
async fn test_deletion_vectors_multi_batch() -> TestResult<()> {
325+
let config = SessionConfig::new().with_batch_size(1);
326+
let session = SessionContext::new_with_config(config);
327+
let snapshot = scan_dat_with_session("deletion_vectors", &session).await?;
328+
let provider: Arc<dyn TableProvider> = Arc::new(DeltaScanNext::new(
329+
snapshot,
330+
DeltaScanConfig::new_from_session(&session.state()),
331+
)?);
332+
333+
let plan = provider.scan(&session.state(), None, &[], None).await?;
334+
let batches: Vec<_> = collect_plan(plan, &session).await?;
335+
let expected = vec![
336+
"+--------+-----+------------+",
337+
"| letter | int | date |",
338+
"+--------+-----+------------+",
339+
"| b | 228 | 1978-12-01 |",
340+
"+--------+-----+------------+",
341+
];
342+
assert_batches_sorted_eq!(&expected, &batches);
343+
344+
Ok(())
345+
}

0 commit comments

Comments
 (0)