From 1329d26e34cbfbdf2a23dccc3031835de2195a81 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 6 Jan 2026 12:57:48 -0500 Subject: [PATCH 1/2] Migrate SchemaAdapter to PhysicalExprAdapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR prepares for DataFusion 52.0 by migrating from the deprecated SchemaAdapter approach to the new PhysicalExprAdapter approach. Changes: - Add SparkPhysicalExprAdapterFactory and SparkPhysicalExprAdapter that work at planning time (expression rewriting) instead of runtime (batch transformation) - Replace CastColumnExpr with Spark-compatible Cast expressions - Update parquet_exec.rs to use with_expr_adapter() instead of with_schema_adapter_factory() - Update Iceberg scan to use adapt_batch_with_expressions() - Mark old SparkSchemaAdapterFactory as deprecated The new approach: 1. PhysicalExprAdapterFactory.create() returns PhysicalExprAdapter 2. PhysicalExprAdapter.rewrite() transforms expressions at planning time 3. Casts are injected as expressions that execute when the plan runs 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- native/Cargo.lock | 1 + native/Cargo.toml | 1 + native/core/Cargo.toml | 1 + .../src/execution/operators/iceberg_scan.rs | 30 +- native/core/src/parquet/parquet_exec.rs | 18 +- native/core/src/parquet/schema_adapter.rs | 256 +++++++++++++++++- 6 files changed, 271 insertions(+), 36 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 9709cd6f17..7f8ee1454e 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1790,6 +1790,7 @@ dependencies = [ "datafusion-comet-spark-expr", "datafusion-datasource", "datafusion-functions-nested", + "datafusion-physical-expr-adapter", "datafusion-spark", "futures", "hdfs-sys", diff --git a/native/Cargo.toml b/native/Cargo.toml index 554534cc2b..da7ff59f3e 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -41,6 +41,7 @@ parquet = { version = "57.0.0", default-features = false, features = ["experimen datafusion = { version = "51.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] } datafusion-datasource = { version = "51.0.0" } datafusion-spark = { version = "51.0.0" } +datafusion-physical-expr-adapter = { version = "51.0.0" } datafusion-comet-spark-expr = { path = "spark-expr" } datafusion-comet-proto = { path = "proto" } chrono = { version = "0.4", default-features = false, features = ["clock"] } diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 7b32be36a2..5764e8aea2 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -62,6 +62,7 @@ paste = "1.0.14" datafusion = { workspace = true, features = ["parquet_encryption", "sql"] } datafusion-datasource = { workspace = true } datafusion-spark = { workspace = true } +datafusion-physical-expr-adapter = { workspace = true } once_cell = "1.18.0" regex = { workspace = true } crc32fast = "1.3.2" diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index ff6648850b..808efe7756 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -42,8 +42,7 @@ use iceberg::io::FileIO; use crate::execution::operators::ExecutionError; use crate::parquet::parquet_support::SparkParquetOptions; -use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; -use datafusion::datasource::schema_adapter::SchemaAdapterFactory; +use crate::parquet::schema_adapter::adapt_batch_with_expressions; use datafusion_comet_spark_expr::EvalMode; use datafusion_datasource::file_stream::FileStreamMetrics; @@ -294,27 +293,20 @@ impl IcebergFileStream { let target_schema = Arc::clone(&schema); // Schema adaptation handles differences in Arrow field names and metadata - // between the file schema and expected output schema + // between the file schema and expected output schema using expression-based + // adaptation (PhysicalExprAdapter approach) let mapped_stream = stream .map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e))) .and_then(move |batch| { let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); - let adapter_factory = SparkSchemaAdapterFactory::new(spark_options, None); - let file_schema = batch.schema(); - let adapter = adapter_factory - .create(Arc::clone(&target_schema), Arc::clone(&file_schema)); - - let result = match adapter.map_schema(file_schema.as_ref()) { - Ok((schema_mapper, _projection)) => { - schema_mapper.map_batch(batch).map_err(|e| { - DataFusionError::Execution(format!("Batch mapping failed: {}", e)) - }) - } - Err(e) => Err(DataFusionError::Execution(format!( - "Schema mapping failed: {}", - e - ))), - }; + let result = adapt_batch_with_expressions( + batch, + &target_schema, + &spark_options, + ) + .map_err(|e| { + DataFusionError::Execution(format!("Batch adaptation failed: {}", e)) + }); futures::future::ready(result) }); diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index ec18d227f5..922a23afa4 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -18,7 +18,7 @@ use crate::execution::operators::ExecutionError; use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID}; use crate::parquet::parquet_support::SparkParquetOptions; -use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; +use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory; use arrow::datatypes::{Field, SchemaRef}; use datafusion::config::TableParquetOptions; use datafusion::datasource::listing::PartitionedFile; @@ -32,6 +32,7 @@ use datafusion::physical_expr::PhysicalExpr; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; use datafusion_comet_spark_expr::EvalMode; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use itertools::Itertools; use std::collections::HashMap; use std::sync::Arc; @@ -104,9 +105,14 @@ pub(crate) fn init_datasource_exec( ); } - let file_source = parquet_source.with_schema_adapter_factory(Arc::new( - SparkSchemaAdapterFactory::new(spark_parquet_options, default_values), - ))?; + // Create the expression adapter factory for Spark-compatible schema adaptation + let expr_adapter_factory: Arc = + Arc::new(SparkPhysicalExprAdapterFactory::new( + spark_parquet_options, + default_values, + )); + + let file_source: Arc = Arc::new(parquet_source); let file_groups = file_groups .iter() @@ -124,6 +130,7 @@ pub(crate) fn init_datasource_exec( ) .with_projection_indices(Some(projection_vector)) .with_table_partition_cols(partition_fields) + .with_expr_adapter(Some(expr_adapter_factory)) .build() } _ => get_file_config_builder( @@ -133,8 +140,11 @@ pub(crate) fn init_datasource_exec( object_store_url, file_source, ) + .with_expr_adapter(Some(expr_adapter_factory)) .build(), }; + // Note: expr_adapter_factory is only used in one branch due to the match, + // so no clone is needed Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config)))) } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index b321d902a9..d3064ae23a 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -16,19 +16,241 @@ // under the License. //! Custom schema adapter that uses Spark-compatible conversions +//! +//! This module provides both: +//! - The deprecated `SchemaAdapter` approach (for backwards compatibility) +//! - The new `PhysicalExprAdapter` approach (recommended, works at planning time) use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; -use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions}; use arrow::datatypes::{Schema, SchemaRef}; -use datafusion::common::ColumnStatistics; +use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion::common::{ColumnStatistics, Result as DataFusionResult}; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_expr::PhysicalExpr; use datafusion::physical_plan::ColumnarValue; use datafusion::scalar::ScalarValue; +use datafusion_comet_spark_expr::{Cast, SparkCastOptions}; +use datafusion_physical_expr_adapter::{ + DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, + replace_columns_with_literals, +}; use std::collections::HashMap; use std::sync::Arc; +// ============================================================================ +// New PhysicalExprAdapter Implementation (Recommended) +// ============================================================================ + +/// Factory for creating Spark-compatible physical expression adapters. +/// +/// This factory creates adapters that rewrite expressions at planning time +/// to inject Spark-compatible casts where needed. +#[derive(Clone, Debug)] +pub struct SparkPhysicalExprAdapterFactory { + /// Spark-specific parquet options for type conversions + parquet_options: SparkParquetOptions, + /// Default values for columns that may be missing from the physical schema. + /// The key is the column index in the logical schema. + default_values: Option>, +} + +impl SparkPhysicalExprAdapterFactory { + /// Create a new factory with the given options. + pub fn new( + parquet_options: SparkParquetOptions, + default_values: Option>, + ) -> Self { + Self { + parquet_options, + default_values, + } + } +} + +impl PhysicalExprAdapterFactory for SparkPhysicalExprAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + let default_factory = DefaultPhysicalExprAdapterFactory; + let default_adapter = default_factory.create( + Arc::clone(&logical_file_schema), + Arc::clone(&physical_file_schema), + ); + + Arc::new(SparkPhysicalExprAdapter { + logical_file_schema, + physical_file_schema, + parquet_options: self.parquet_options.clone(), + default_values: self.default_values.clone(), + default_adapter, + }) + } +} + +/// Spark-compatible physical expression adapter. +/// +/// This adapter rewrites expressions at planning time to: +/// 1. Replace references to missing columns with default values or nulls +/// 2. Replace standard DataFusion cast expressions with Spark-compatible casts +/// 3. Handle case-insensitive column matching +#[derive(Debug)] +struct SparkPhysicalExprAdapter { + /// The logical schema expected by the query + logical_file_schema: SchemaRef, + /// The physical schema of the actual file being read + physical_file_schema: SchemaRef, + /// Spark-specific options for type conversions + parquet_options: SparkParquetOptions, + /// Default values for missing columns (keyed by logical schema index) + default_values: Option>, + /// The default DataFusion adapter to delegate standard handling to + default_adapter: Arc, +} + +impl PhysicalExprAdapter for SparkPhysicalExprAdapter { + fn rewrite(&self, expr: Arc) -> DataFusionResult> { + // Step 1: Handle default values for missing columns + let expr = self.replace_missing_with_defaults(expr)?; + + // Step 2: Delegate to default adapter for standard handling + // This handles: missing columns → nulls, type mismatches → CastColumnExpr + let expr = self.default_adapter.rewrite(expr)?; + + // Step 3: Replace CastColumnExpr with Spark-compatible Cast expressions + expr.transform(|e| self.replace_with_spark_cast(e)).data() + } +} + +impl SparkPhysicalExprAdapter { + /// Replace CastColumnExpr (DataFusion's cast) with Spark's Cast expression. + fn replace_with_spark_cast( + &self, + expr: Arc, + ) -> DataFusionResult>> { + // Check for CastColumnExpr and replace with spark_expr::Cast + // CastColumnExpr is in datafusion_physical_expr::expressions + if let Some(cast) = expr + .as_any() + .downcast_ref::() + { + let child = cast.expr().clone(); + let target_type = cast.target_field().data_type().clone(); + + // Create Spark-compatible cast options + let mut cast_options = SparkCastOptions::new( + self.parquet_options.eval_mode, + &self.parquet_options.timezone, + self.parquet_options.allow_incompat, + ); + cast_options.allow_cast_unsigned_ints = self.parquet_options.allow_cast_unsigned_ints; + cast_options.is_adapting_schema = true; + + let spark_cast = Arc::new(Cast::new(child, target_type, cast_options)); + + return Ok(Transformed::yes(spark_cast as Arc)); + } + + Ok(Transformed::no(expr)) + } + + /// Replace references to missing columns with default values. + fn replace_missing_with_defaults( + &self, + expr: Arc, + ) -> DataFusionResult> { + let Some(defaults) = &self.default_values else { + return Ok(expr); + }; + + if defaults.is_empty() { + return Ok(expr); + } + + // Convert index-based defaults to name-based for replace_columns_with_literals + let name_based: HashMap<&str, &ScalarValue> = defaults + .iter() + .filter_map(|(idx, val)| { + self.logical_file_schema + .fields() + .get(*idx) + .map(|f| (f.name().as_str(), val)) + }) + .collect(); + + if name_based.is_empty() { + return Ok(expr); + } + + replace_columns_with_literals(expr, &name_based) + } +} + +/// Adapt a batch to match the target schema using expression evaluation. +/// +/// This function is useful for cases like Iceberg scanning where batches +/// are read directly and need to be adapted to the expected schema. +pub fn adapt_batch_with_expressions( + batch: RecordBatch, + target_schema: &SchemaRef, + parquet_options: &SparkParquetOptions, +) -> DataFusionResult { + let file_schema = batch.schema(); + + // If schemas match, no adaptation needed + if file_schema.as_ref() == target_schema.as_ref() { + return Ok(batch); + } + + // Create adapter + let factory = SparkPhysicalExprAdapterFactory::new(parquet_options.clone(), None); + let adapter = factory.create(Arc::clone(target_schema), Arc::clone(&file_schema)); + + // Create column projection expressions for target schema + let projection_exprs: Vec> = target_schema + .fields() + .iter() + .enumerate() + .map(|(i, _field)| { + let col_expr: Arc = Arc::new(Column::new_with_schema( + target_schema.field(i).name(), + target_schema.as_ref(), + )?); + adapter.rewrite(col_expr) + }) + .collect::>>()?; + + // Evaluate expressions against batch + let columns: Vec = projection_exprs + .iter() + .map(|expr| { + expr.evaluate(&batch)? + .into_array(batch.num_rows()) + .map_err(|e| e.into()) + }) + .collect::>>()?; + + RecordBatch::try_new(Arc::clone(target_schema), columns).map_err(|e| e.into()) +} + +// ============================================================================ +// Legacy SchemaAdapter Implementation (Deprecated) +// ============================================================================ + /// An implementation of DataFusion's `SchemaAdapterFactory` that uses a Spark-compatible /// `cast` implementation. +/// +/// # Deprecated +/// This type is deprecated and will be removed in a future release. +/// Use [`SparkPhysicalExprAdapterFactory`] instead, which works at planning time +/// rather than runtime batch transformation. +#[deprecated( + since = "0.14.0", + note = "Use SparkPhysicalExprAdapterFactory instead, which works at planning time" +)] #[derive(Clone, Debug)] pub struct SparkSchemaAdapterFactory { /// Spark cast options @@ -269,7 +491,7 @@ impl SchemaMapper for SchemaMapping { #[cfg(test)] mod test { use crate::parquet::parquet_support::SparkParquetOptions; - use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; + use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory; use arrow::array::UInt32Array; use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::SchemaRef; @@ -278,7 +500,6 @@ mod test { use datafusion::common::config::TableParquetOptions; use datafusion::common::DataFusionError; use datafusion::datasource::listing::PartitionedFile; - use datafusion::datasource::physical_plan::FileSource; use datafusion::datasource::physical_plan::{FileGroup, FileScanConfigBuilder, ParquetSource}; use datafusion::datasource::source::DataSourceExec; use datafusion::execution::object_store::ObjectStoreUrl; @@ -286,6 +507,7 @@ mod test { use datafusion::physical_plan::ExecutionPlan; use datafusion_comet_spark_expr::test_common::file_util::get_temp_filename; use datafusion_comet_spark_expr::EvalMode; + use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use futures::StreamExt; use parquet::arrow::ArrowWriter; use std::fs::File; @@ -328,7 +550,7 @@ mod test { } /// Create a Parquet file containing a single batch and then read the batch back using - /// the specified required_schema. This will cause the SchemaAdapter code to be used. + /// the specified required_schema. This will cause the PhysicalExprAdapter code to be used. async fn roundtrip( batch: &RecordBatch, required_schema: SchemaRef, @@ -345,16 +567,24 @@ mod test { let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); spark_parquet_options.allow_cast_unsigned_ints = true; - let parquet_source = - ParquetSource::new(TableParquetOptions::new()).with_schema_adapter_factory( - Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options, None)), - )?; + // Create expression adapter factory for Spark-compatible schema adaptation + let expr_adapter_factory: Arc = + Arc::new(SparkPhysicalExprAdapterFactory::new( + spark_parquet_options, + None, + )); + + let parquet_source = ParquetSource::new(TableParquetOptions::new()); let files = FileGroup::new(vec![PartitionedFile::from_path(filename.to_string())?]); - let file_scan_config = - FileScanConfigBuilder::new(object_store_url, required_schema, parquet_source) - .with_file_groups(vec![files]) - .build(); + let file_scan_config = FileScanConfigBuilder::new( + object_store_url, + Arc::clone(&required_schema), + Arc::new(parquet_source), + ) + .with_file_groups(vec![files]) + .with_expr_adapter(Some(expr_adapter_factory)) + .build(); let parquet_exec = DataSourceExec::new(Arc::new(file_scan_config)); From 69347938e5ad47537b5604ad50ad65c8d4fd36ab Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Tue, 6 Jan 2026 13:09:36 -0500 Subject: [PATCH 2/2] fmt --- .../core/src/execution/operators/iceberg_scan.rs | 16 ++++++++-------- native/core/src/parquet/parquet_exec.rs | 8 +++----- native/core/src/parquet/schema_adapter.rs | 12 +++++------- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/native/core/src/execution/operators/iceberg_scan.rs b/native/core/src/execution/operators/iceberg_scan.rs index 808efe7756..7cda721df0 100644 --- a/native/core/src/execution/operators/iceberg_scan.rs +++ b/native/core/src/execution/operators/iceberg_scan.rs @@ -299,14 +299,14 @@ impl IcebergFileStream { .map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e))) .and_then(move |batch| { let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false); - let result = adapt_batch_with_expressions( - batch, - &target_schema, - &spark_options, - ) - .map_err(|e| { - DataFusionError::Execution(format!("Batch adaptation failed: {}", e)) - }); + let result = + adapt_batch_with_expressions(batch, &target_schema, &spark_options) + .map_err(|e| { + DataFusionError::Execution(format!( + "Batch adaptation failed: {}", + e + )) + }); futures::future::ready(result) }); diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 922a23afa4..83d62a08c3 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -106,11 +106,9 @@ pub(crate) fn init_datasource_exec( } // Create the expression adapter factory for Spark-compatible schema adaptation - let expr_adapter_factory: Arc = - Arc::new(SparkPhysicalExprAdapterFactory::new( - spark_parquet_options, - default_values, - )); + let expr_adapter_factory: Arc = Arc::new( + SparkPhysicalExprAdapterFactory::new(spark_parquet_options, default_values), + ); let file_source: Arc = Arc::new(parquet_source); diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index d3064ae23a..e499ecf32c 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -33,8 +33,8 @@ use datafusion::physical_plan::ColumnarValue; use datafusion::scalar::ScalarValue; use datafusion_comet_spark_expr::{Cast, SparkCastOptions}; use datafusion_physical_expr_adapter::{ - DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory, - replace_columns_with_literals, + replace_columns_with_literals, DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, + PhysicalExprAdapterFactory, }; use std::collections::HashMap; use std::sync::Arc; @@ -568,11 +568,9 @@ mod test { spark_parquet_options.allow_cast_unsigned_ints = true; // Create expression adapter factory for Spark-compatible schema adaptation - let expr_adapter_factory: Arc = - Arc::new(SparkPhysicalExprAdapterFactory::new( - spark_parquet_options, - None, - )); + let expr_adapter_factory: Arc = Arc::new( + SparkPhysicalExprAdapterFactory::new(spark_parquet_options, None), + ); let parquet_source = ParquetSource::new(TableParquetOptions::new());