diff --git a/plugins/engine-datafusion/Cargo.toml b/plugins/engine-datafusion/Cargo.toml index c2015752579b7..2252604f5c173 100644 --- a/plugins/engine-datafusion/Cargo.toml +++ b/plugins/engine-datafusion/Cargo.toml @@ -6,15 +6,14 @@ members = [ [workspace.dependencies] # DataFusion dependencies -datafusion = "50.0.0" -datafusion-expr = "50.0.0" -datafusion-datasource = "50.0.0" -arrow-json = "56.2" -arrow = { version = "56.2", features = ["ffi", "ipc_compression"] } -#arrow = "55.2.0" -arrow-array = "56.2.0" -arrow-schema = "56.2.0" -arrow-buffer = "56.2.0" +datafusion = "51.0.0" +datafusion-expr = "51.0.0" +datafusion-datasource = "51.0.0" +arrow-json = "57.1.0" +arrow = { version = "57.1.0", features = ["ffi", "ipc_compression"] } +arrow-array = "57.1.0" +arrow-schema = "57.1.0" +arrow-buffer = "57.1.0" downcast-rs = "1.2" @@ -22,8 +21,8 @@ downcast-rs = "1.2" jni = "0.21" # Substrait support -datafusion-substrait = "50.0.0" -prost = "0.13" +datafusion-substrait = "51.0.0" +prost = "0.14" # Async runtime @@ -43,10 +42,10 @@ thiserror = "1.0" # Logging log = "0.4" # Parquet support -parquet = "54.0.0" +parquet = "57.1.0" # Object store for file access -object_store = "=0.12.3" +object_store = "=0.12.4" url = "2.0" # Substrait support diff --git a/plugins/engine-datafusion/jni/src/listing_table.rs b/plugins/engine-datafusion/jni/src/listing_table.rs index 83728c2261adb..987e804fa2787 100644 --- a/plugins/engine-datafusion/jni/src/listing_table.rs +++ b/plugins/engine-datafusion/jni/src/listing_table.rs @@ -1314,7 +1314,7 @@ impl TableProvider for ListingTable { .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) .with_statistics(statistics) - .with_projection(projection.cloned()) + .with_projection_indices(projection.cloned()) .with_limit(limit) .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) diff --git a/plugins/engine-datafusion/jni/src/query_executor.rs b/plugins/engine-datafusion/jni/src/query_executor.rs index 418f19045f9eb..30d8df1c67311 100644 --- a/plugins/engine-datafusion/jni/src/query_executor.rs +++ b/plugins/engine-datafusion/jni/src/query_executor.rs @@ -259,7 +259,7 @@ pub async fn execute_fetch_phase( parquet_schema.clone(), file_source, ) - .with_projection(Option::from(projection_index.clone())) + .with_projection_indices(Some(projection_index.clone())) .with_file_group(file_group) .build(); diff --git a/plugins/engine-datafusion/jni/src/row_id_optimizer.rs b/plugins/engine-datafusion/jni/src/row_id_optimizer.rs index b2bdd0216868e..61adba0a124bc 100644 --- a/plugins/engine-datafusion/jni/src/row_id_optimizer.rs +++ b/plugins/engine-datafusion/jni/src/row_id_optimizer.rs @@ -6,12 +6,10 @@ * compatible open source license. */ -use std::fs; use std::sync::Arc; -use arrow::datatypes::{DataType, Field, Fields, Schema}; +use arrow::datatypes::{Field, Fields, Schema}; use arrow_schema::SchemaRef; -use datafusion::physical_plan::projection::new_projections_for_columns; use datafusion::{ common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}, config::ConfigOptions, @@ -21,10 +19,11 @@ use datafusion::{ }, error::DataFusionError, logical_expr::Operator, - physical_expr::{PhysicalExpr, expressions::{BinaryExpr, Column}}, + physical_expr::{expressions::{BinaryExpr, Column}, PhysicalExpr}, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{ExecutionPlan, filter::FilterExec, projection::{ProjectionExec, ProjectionExpr}}, + physical_plan::{projection::{ProjectionExec, ProjectionExpr}, ExecutionPlan}, }; +use datafusion_datasource::TableSchema; #[derive(Debug)] pub struct ProjectRowIdOptimizer; @@ -37,8 +36,7 @@ impl ProjectRowIdOptimizer { datasource_exec_schema: SchemaRef, ) -> (SchemaRef, Vec) { // Clone projection and add new field index - let mut projections = datasource.projection.clone().unwrap_or_default(); - let file_source_schema = datasource.file_schema.clone(); + let file_source_schema = datasource.file_schema(); let mut new_projections = vec![]; @@ -52,8 +50,8 @@ impl ProjectRowIdOptimizer { // fields.push(Arc::new(Field::new(field.name(), field.data_type().clone(), field.is_nullable()))); // } // } - - if !projections.contains(&file_source_schema.index_of("___row_id").unwrap()) { + if !new_projections.contains(&file_source_schema.index_of("___row_id").unwrap()) { + // if !projections.contains(&file_source_schema.index_of("___row_id").unwrap()) { new_projections.push(file_source_schema.index_of("___row_id").unwrap()); // let field = file_source_schema.field_with_name(&*"___row_id").expect("Field ___row_id not found in file_source_schema"); @@ -133,8 +131,8 @@ impl ProjectRowIdOptimizer { let (new_schema, new_projections) = self.build_updated_file_source_schema(datasource, data_source_exec_schema.clone()); let file_scan_config = FileScanConfigBuilder::from(datasource.clone()) - .with_source(datasource.file_source.with_schema(new_schema.clone())) - .with_projection(Some(new_projections)) + .with_source(datasource.file_source.with_schema(TableSchema::from_file_schema(new_schema.clone()))) + .with_projection_indices(Some(new_projections)) .build(); let new_datasource = DataSourceExec::from_data_source(file_scan_config); @@ -161,7 +159,7 @@ impl PhysicalOptimizerRule for ProjectRowIdOptimizer { .as_any() .downcast_ref::() .expect("DataSource not found"); - let schema = datasource.file_schema.clone(); + let schema = datasource.file_schema(); schema .field_with_name("___row_id") .expect("Field ___row_id missing");