Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions plugins/engine-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,23 @@ members = [

[workspace.dependencies]
# DataFusion dependencies
datafusion = "50.0.0"
datafusion-expr = "50.0.0"
datafusion-datasource = "50.0.0"
arrow-json = "56.2"
arrow = { version = "56.2", features = ["ffi", "ipc_compression"] }
#arrow = "55.2.0"
arrow-array = "56.2.0"
arrow-schema = "56.2.0"
arrow-buffer = "56.2.0"
datafusion = "51.0.0"
datafusion-expr = "51.0.0"
datafusion-datasource = "51.0.0"
arrow-json = "57.1.0"
arrow = { version = "57.1.0", features = ["ffi", "ipc_compression"] }
arrow-array = "57.1.0"
arrow-schema = "57.1.0"
arrow-buffer = "57.1.0"
downcast-rs = "1.2"


# JNI dependencies
jni = "0.21"

# Substrait support
datafusion-substrait = "50.0.0"
prost = "0.13"
datafusion-substrait = "51.0.0"
prost = "0.14"


# Async runtime
Expand All @@ -43,10 +42,10 @@ thiserror = "1.0"
# Logging
log = "0.4"
# Parquet support
parquet = "54.0.0"
parquet = "57.1.0"

# Object store for file access
object_store = "=0.12.3"
object_store = "=0.12.4"
url = "2.0"

# Substrait support
Expand Down
2 changes: 1 addition & 1 deletion plugins/engine-datafusion/jni/src/listing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ impl TableProvider for ListingTable {
.with_file_groups(partitioned_file_lists)
.with_constraints(self.constraints.clone())
.with_statistics(statistics)
.with_projection(projection.cloned())
.with_projection_indices(projection.cloned())
.with_limit(limit)
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
Expand Down
2 changes: 1 addition & 1 deletion plugins/engine-datafusion/jni/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub async fn execute_fetch_phase(
parquet_schema.clone(),
file_source,
)
.with_projection(Option::from(projection_index.clone()))
.with_projection_indices(Some(projection_index.clone()))
.with_file_group(file_group)
.build();

Expand Down
22 changes: 10 additions & 12 deletions plugins/engine-datafusion/jni/src/row_id_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
* compatible open source license.
*/

use std::fs;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Fields, Schema};
use arrow::datatypes::{Field, Fields, Schema};
use arrow_schema::SchemaRef;
use datafusion::physical_plan::projection::new_projections_for_columns;
use datafusion::{
common::tree_node::{Transformed, TreeNode, TreeNodeRecursion},
config::ConfigOptions,
Expand All @@ -21,10 +19,11 @@ use datafusion::{
},
error::DataFusionError,
logical_expr::Operator,
physical_expr::{PhysicalExpr, expressions::{BinaryExpr, Column}},
physical_expr::{expressions::{BinaryExpr, Column}, PhysicalExpr},
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{ExecutionPlan, filter::FilterExec, projection::{ProjectionExec, ProjectionExpr}},
physical_plan::{projection::{ProjectionExec, ProjectionExpr}, ExecutionPlan},
};
use datafusion_datasource::TableSchema;

#[derive(Debug)]
pub struct ProjectRowIdOptimizer;
Expand All @@ -37,8 +36,7 @@ impl ProjectRowIdOptimizer {
datasource_exec_schema: SchemaRef,
) -> (SchemaRef, Vec<usize>) {
// Clone projection and add new field index
let mut projections = datasource.projection.clone().unwrap_or_default();
let file_source_schema = datasource.file_schema.clone();
let file_source_schema = datasource.file_schema();

let mut new_projections = vec![];

Expand All @@ -52,8 +50,8 @@ impl ProjectRowIdOptimizer {
// fields.push(Arc::new(Field::new(field.name(), field.data_type().clone(), field.is_nullable())));
// }
// }

if !projections.contains(&file_source_schema.index_of("___row_id").unwrap()) {
if !new_projections.contains(&file_source_schema.index_of("___row_id").unwrap()) {
// if !projections.contains(&file_source_schema.index_of("___row_id").unwrap()) {
new_projections.push(file_source_schema.index_of("___row_id").unwrap());

// let field = file_source_schema.field_with_name(&*"___row_id").expect("Field ___row_id not found in file_source_schema");
Expand Down Expand Up @@ -133,8 +131,8 @@ impl ProjectRowIdOptimizer {
let (new_schema, new_projections) =
self.build_updated_file_source_schema(datasource, data_source_exec_schema.clone());
let file_scan_config = FileScanConfigBuilder::from(datasource.clone())
.with_source(datasource.file_source.with_schema(new_schema.clone()))
.with_projection(Some(new_projections))
.with_source(datasource.file_source.with_schema(TableSchema::from_file_schema(new_schema.clone())))
.with_projection_indices(Some(new_projections))
.build();

let new_datasource = DataSourceExec::from_data_source(file_scan_config);
Expand All @@ -161,7 +159,7 @@ impl PhysicalOptimizerRule for ProjectRowIdOptimizer {
.as_any()
.downcast_ref::<FileScanConfig>()
.expect("DataSource not found");
let schema = datasource.file_schema.clone();
let schema = datasource.file_schema();
schema
.field_with_name("___row_id")
.expect("Field ___row_id missing");
Expand Down
Loading