66 * compatible open source license.
77 */
88
9- use std:: fs;
109use std:: sync:: Arc ;
1110
12- use arrow:: datatypes:: { DataType , Field , Fields , Schema } ;
11+ use arrow:: datatypes:: { Field , Fields , Schema } ;
1312use arrow_schema:: SchemaRef ;
14- use datafusion:: physical_plan:: projection:: new_projections_for_columns;
1513use datafusion:: {
1614 common:: tree_node:: { Transformed , TreeNode , TreeNodeRecursion } ,
1715 config:: ConfigOptions ,
@@ -21,10 +19,11 @@ use datafusion::{
2119 } ,
2220 error:: DataFusionError ,
2321 logical_expr:: Operator ,
24- physical_expr:: { PhysicalExpr , expressions:: { BinaryExpr , Column } } ,
22+ physical_expr:: { expressions:: { BinaryExpr , Column } , PhysicalExpr } ,
2523 physical_optimizer:: PhysicalOptimizerRule ,
26- physical_plan:: { ExecutionPlan , filter :: FilterExec , projection:: { ProjectionExec , ProjectionExpr } } ,
24+ physical_plan:: { projection:: { ProjectionExec , ProjectionExpr } , ExecutionPlan } ,
2725} ;
26+ use datafusion_datasource:: TableSchema ;
2827
2928#[ derive( Debug ) ]
3029pub struct ProjectRowIdOptimizer ;
@@ -37,8 +36,7 @@ impl ProjectRowIdOptimizer {
3736 datasource_exec_schema : SchemaRef ,
3837 ) -> ( SchemaRef , Vec < usize > ) {
3938 // Clone projection and add new field index
40- let mut projections = datasource. projection . clone ( ) . unwrap_or_default ( ) ;
41- let file_source_schema = datasource. file_schema . clone ( ) ;
39+ let file_source_schema = datasource. file_schema ( ) ;
4240
4341 let mut new_projections = vec ! [ ] ;
4442
@@ -52,8 +50,8 @@ impl ProjectRowIdOptimizer {
5250 // fields.push(Arc::new(Field::new(field.name(), field.data_type().clone(), field.is_nullable())));
5351 // }
5452 // }
55-
56- if !projections. contains ( & file_source_schema. index_of ( "___row_id" ) . unwrap ( ) ) {
53+ if !new_projections . contains ( & file_source_schema . index_of ( "___row_id" ) . unwrap ( ) ) {
54+ // if !projections.contains(&file_source_schema.index_of("___row_id").unwrap()) {
5755 new_projections. push ( file_source_schema. index_of ( "___row_id" ) . unwrap ( ) ) ;
5856
5957 // let field = file_source_schema.field_with_name(&*"___row_id").expect("Field ___row_id not found in file_source_schema");
@@ -133,8 +131,8 @@ impl ProjectRowIdOptimizer {
133131 let ( new_schema, new_projections) =
134132 self . build_updated_file_source_schema ( datasource, data_source_exec_schema. clone ( ) ) ;
135133 let file_scan_config = FileScanConfigBuilder :: from ( datasource. clone ( ) )
136- . with_source ( datasource. file_source . with_schema ( new_schema. clone ( ) ) )
137- . with_projection ( Some ( new_projections) )
134+ . with_source ( datasource. file_source . with_schema ( TableSchema :: from_file_schema ( new_schema. clone ( ) ) ) )
135+ . with_projection_indices ( Some ( new_projections) )
138136 . build ( ) ;
139137
140138 let new_datasource = DataSourceExec :: from_data_source ( file_scan_config) ;
@@ -161,7 +159,7 @@ impl PhysicalOptimizerRule for ProjectRowIdOptimizer {
161159 . as_any ( )
162160 . downcast_ref :: < FileScanConfig > ( )
163161 . expect ( "DataSource not found" ) ;
164- let schema = datasource. file_schema . clone ( ) ;
162+ let schema = datasource. file_schema ( ) ;
165163 schema
166164 . field_with_name ( "___row_id" )
167165 . expect ( "Field ___row_id missing" ) ;
0 commit comments