Skip to content

Commit e5d1b0a

Browse files
committed
DataFusion 52 migration
1 parent 38b4ce7 commit e5d1b0a

File tree

5 files changed

+221
-120
lines changed

5 files changed

+221
-120
lines changed

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ use iceberg::io::FileIO;
4242

4343
use crate::execution::operators::ExecutionError;
4444
use crate::parquet::parquet_support::SparkParquetOptions;
45-
use crate::parquet::schema_adapter::SparkSchemaMapperFactory;
4645
use datafusion_comet_spark_expr::EvalMode;
4746
use datafusion_datasource::file_stream::FileStreamMetrics;
47+
use crate::parquet::schema_adapter::adapt_batch_with_expressions;
4848

4949
/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.
5050
///
@@ -298,19 +298,14 @@ impl IcebergFileStream {
298298
.map_err(|e| DataFusionError::Execution(format!("Iceberg scan error: {}", e)))
299299
.and_then(move |batch| {
300300
let spark_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
301-
let mapper_factory = SparkSchemaMapperFactory::new(spark_options, None);
302-
let file_schema = batch.schema();
303-
304-
let result = mapper_factory
305-
.create_mapper(Arc::clone(&target_schema), file_schema.as_ref())
306-
.and_then(|mapper| {
307-
mapper.map_batch(batch).map_err(|e| {
308-
DataFusionError::Execution(format!("Batch mapping failed: {}", e))
309-
})
310-
})
311-
.map_err(|e| {
312-
DataFusionError::Execution(format!("Schema mapping failed: {}", e))
313-
});
301+
let result =
302+
adapt_batch_with_expressions(batch, &target_schema, &spark_options)
303+
.map_err(|e| {
304+
DataFusionError::Execution(format!(
305+
"Batch adaptation failed: {}",
306+
e
307+
))
308+
});
314309
futures::future::ready(result)
315310
});
316311

native/core/src/execution/planner.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,6 +1034,42 @@ impl PhysicalPlanner {
10341034
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema)))
10351035
.collect();
10361036

1037+
let default_values: Option<HashMap<usize, ScalarValue>> = if !scan
1038+
.default_values
1039+
.is_empty()
1040+
{
1041+
// We have default values. Extract the two lists (same length) of values and
1042+
// indexes in the schema, and then create a HashMap to use in the SchemaMapper.
1043+
let default_values: Result<Vec<ScalarValue>, DataFusionError> = scan
1044+
.default_values
1045+
.iter()
1046+
.map(|expr| {
1047+
let literal = self.create_expr(expr, Arc::clone(&required_schema))?;
1048+
let df_literal = literal
1049+
.as_any()
1050+
.downcast_ref::<DataFusionLiteral>()
1051+
.ok_or_else(|| {
1052+
GeneralError("Expected literal of default value.".to_string())
1053+
})?;
1054+
Ok(df_literal.value().clone())
1055+
})
1056+
.collect();
1057+
let default_values = default_values?;
1058+
let default_values_indexes: Vec<usize> = scan
1059+
.default_values_indexes
1060+
.iter()
1061+
.map(|offset| *offset as usize)
1062+
.collect();
1063+
Some(
1064+
default_values_indexes
1065+
.into_iter()
1066+
.zip(default_values)
1067+
.collect(),
1068+
)
1069+
} else {
1070+
None
1071+
};
1072+
10371073
// Get one file from this partition (we know it's not empty due to early return above)
10381074
let one_file = partition_files
10391075
.partitioned_file
@@ -1066,6 +1102,7 @@ impl PhysicalPlanner {
10661102
file_groups,
10671103
Some(projection_vector),
10681104
Some(data_filters?),
1105+
default_values,
10691106
scan.session_timezone.as_str(),
10701107
scan.case_sensitive,
10711108
self.session_ctx(),

native/core/src/parquet/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
769769
file_groups,
770770
None,
771771
data_filters,
772+
None,
772773
session_timezone.as_str(),
773774
case_sensitive != JNI_FALSE,
774775
session_ctx,

native/core/src/parquet/parquet_exec.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::collections::HashMap;
1819
use crate::execution::operators::ExecutionError;
1920
use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID};
2021
use crate::parquet::parquet_support::SparkParquetOptions;
@@ -32,6 +33,9 @@ use datafusion::prelude::SessionContext;
3233
use datafusion_comet_spark_expr::EvalMode;
3334
use datafusion_datasource::TableSchema;
3435
use std::sync::Arc;
36+
use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory;
37+
use datafusion::scalar::ScalarValue;
38+
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
3539

3640
/// Initializes a DataSourceExec plan with a ParquetSource. This may be used by either the
3741
/// `native_datafusion` scan or the `native_iceberg_compat` scan.
@@ -61,12 +65,13 @@ pub(crate) fn init_datasource_exec(
6165
file_groups: Vec<Vec<PartitionedFile>>,
6266
projection_vector: Option<Vec<usize>>,
6367
data_filters: Option<Vec<Arc<dyn PhysicalExpr>>>,
68+
default_values: Option<HashMap<usize, ScalarValue>>,
6469
session_timezone: &str,
6570
case_sensitive: bool,
6671
session_ctx: &Arc<SessionContext>,
6772
encryption_enabled: bool,
6873
) -> Result<Arc<DataSourceExec>, ExecutionError> {
69-
let (table_parquet_options, _) = get_options(
74+
let (table_parquet_options, spark_parquet_options) = get_options(
7075
session_timezone,
7176
case_sensitive,
7277
&object_store_url,
@@ -118,7 +123,11 @@ pub(crate) fn init_datasource_exec(
118123
);
119124
}
120125

121-
let file_source = Arc::new(parquet_source) as Arc<dyn FileSource>;
126+
let expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory> = Arc::new(
127+
SparkPhysicalExprAdapterFactory::new(spark_parquet_options, default_values),
128+
);
129+
130+
let file_source: Arc<dyn FileSource> = Arc::new(parquet_source);
122131

123132
let file_groups = file_groups
124133
.iter()
@@ -133,7 +142,7 @@ pub(crate) fn init_datasource_exec(
133142
file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
134143
}
135144

136-
let file_scan_config = file_scan_config_builder.build();
145+
let file_scan_config = file_scan_config_builder.with_expr_adapter(Some(expr_adapter_factory)).build();
137146

138147
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
139148
}

0 commit comments

Comments
 (0)