Skip to content

Commit 3a695ed

Browse files
committed
DataFusion 52 migration
1 parent 20d5b74 commit 3a695ed

File tree

7 files changed

+361
-50
lines changed

7 files changed

+361
-50
lines changed

native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ parquet = { version = "57.0.0", default-features = false, features = ["experimen
4141
datafusion = { version = "52.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
4242
datafusion-datasource = { version = "52.0.0" }
4343
datafusion-spark = { version = "52.0.0" }
44+
datafusion-physical-expr-adapter = { version = "52.0.0" }
4445
datafusion-comet-spark-expr = { path = "spark-expr" }
4546
datafusion-comet-proto = { path = "proto" }
4647
chrono = { version = "0.4", default-features = false, features = ["clock"] }

native/core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ tempfile = "3.24.0"
6060
itertools = "0.14.0"
6161
paste = "1.0.14"
6262
datafusion = { workspace = true, features = ["parquet_encryption", "sql"] }
63+
datafusion-physical-expr-adapter = { workspace = true }
6364
datafusion-datasource = { workspace = true }
6465
datafusion-spark = { workspace = true }
6566
once_cell = "1.18.0"

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ use iceberg::io::FileIO;
4242

4343
use crate::execution::operators::ExecutionError;
4444
use crate::parquet::parquet_support::SparkParquetOptions;
45+
use crate::parquet::schema_adapter::adapt_batch_with_expressions;
4546
use datafusion_comet_spark_expr::EvalMode;
4647
use datafusion_datasource::file_stream::FileStreamMetrics;
47-
use crate::parquet::schema_adapter::adapt_batch_with_expressions;
4848

4949
/// Iceberg table scan operator that uses iceberg-rust to read Iceberg tables.
5050
///

native/core/src/execution/planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,8 +1049,8 @@ impl PhysicalPlanner {
10491049
.as_any()
10501050
.downcast_ref::<DataFusionLiteral>()
10511051
.ok_or_else(|| {
1052-
GeneralError("Expected literal of default value.".to_string())
1053-
})?;
1052+
GeneralError("Expected literal of default value.".to_string())
1053+
})?;
10541054
Ok(df_literal.value().clone())
10551055
})
10561056
.collect();

native/core/src/parquet/parquet_exec.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::collections::HashMap;
1918
use crate::execution::operators::ExecutionError;
2019
use crate::parquet::encryption_support::{CometEncryptionConfig, ENCRYPTION_FACTORY_ID};
2120
use crate::parquet::parquet_support::SparkParquetOptions;
21+
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
2222
use arrow::datatypes::{Field, SchemaRef};
2323
use datafusion::config::TableParquetOptions;
2424
use datafusion::datasource::listing::PartitionedFile;
@@ -29,13 +29,13 @@ use datafusion::datasource::source::DataSourceExec;
2929
use datafusion::execution::object_store::ObjectStoreUrl;
3030
use datafusion::physical_expr::expressions::BinaryExpr;
3131
use datafusion::physical_expr::PhysicalExpr;
32+
use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory;
3233
use datafusion::prelude::SessionContext;
34+
use datafusion::scalar::ScalarValue;
3335
use datafusion_comet_spark_expr::EvalMode;
3436
use datafusion_datasource::TableSchema;
37+
use std::collections::HashMap;
3538
use std::sync::Arc;
36-
use datafusion::physical_expr_adapter::PhysicalExprAdapterFactory;
37-
use datafusion::scalar::ScalarValue;
38-
use crate::parquet::schema_adapter::SparkPhysicalExprAdapterFactory;
3939

4040
/// Initializes a DataSourceExec plan with a ParquetSource. This may be used by either the
4141
/// `native_datafusion` scan or the `native_iceberg_compat` scan.
@@ -138,11 +138,12 @@ pub(crate) fn init_datasource_exec(
138138
FileScanConfigBuilder::new(object_store_url, file_source).with_file_groups(file_groups);
139139

140140
if let Some(projection_vector) = projection_vector {
141-
file_scan_config_builder =
142-
file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
141+
file_scan_config_builder = file_scan_config_builder
142+
.with_projection_indices(Some(projection_vector))?
143+
.with_expr_adapter(Some(expr_adapter_factory));
143144
}
144145

145-
let file_scan_config = file_scan_config_builder.with_expr_adapter(Some(expr_adapter_factory)).build();
146+
let file_scan_config = file_scan_config_builder.build();
146147

147148
Ok(Arc::new(DataSourceExec::new(Arc::new(file_scan_config))))
148149
}

0 commit comments

Comments
 (0)