@@ -11,7 +11,7 @@ use std::sync::Arc;
1111
1212use arrow:: datatypes:: { DataType , Field , Fields , Schema } ;
1313use arrow_schema:: SchemaRef ;
14- use datafusion:: physical_plan:: projection:: new_projections_for_columns;
14+ use datafusion:: physical_plan:: projection:: { new_projections_for_columns, ProjectionExpr } ;
1515use datafusion:: {
1616 common:: tree_node:: { Transformed , TreeNode , TreeNodeRecursion } ,
1717 config:: ConfigOptions ,
@@ -21,15 +21,21 @@ use datafusion::{
2121 } ,
2222 error:: DataFusionError ,
2323 logical_expr:: Operator ,
24- physical_expr:: { PhysicalExpr , expressions:: { BinaryExpr , Column } } ,
24+ parquet:: arrow:: arrow_reader:: ParquetRecordBatchReaderBuilder ,
25+ physical_expr:: {
26+ expressions:: { BinaryExpr , Column } ,
27+ PhysicalExpr ,
28+ } ,
2529 physical_optimizer:: PhysicalOptimizerRule ,
26- physical_plan:: { ExecutionPlan , filter:: FilterExec , projection:: { ProjectionExec , ProjectionExpr } } ,
30+ physical_plan:: { filter:: FilterExec , projection:: ProjectionExec , ExecutionPlan } ,
2731} ;
2832
2933#[ derive( Debug ) ]
30- pub struct ProjectRowIdOptimizer ;
34+ pub struct AbsoluteRowIdOptimizer ;
35+ pub const ROW_ID_FIELD_NAME : & ' static str = "___row_id" ;
36+ pub const ROW_BASE_FIELD_NAME : & ' static str = "row_base" ;
3137
32- impl ProjectRowIdOptimizer {
38+ impl AbsoluteRowIdOptimizer {
3339 /// Helper to build new schema and projection info with added `row_base` column.
3440 fn build_updated_file_source_schema (
3541 & self ,
@@ -53,27 +59,19 @@ impl ProjectRowIdOptimizer {
5359 // }
5460 // }
5561
56- if !projections. contains ( & file_source_schema. index_of ( "___row_id" ) . unwrap ( ) ) {
57- new_projections. push ( file_source_schema. index_of ( "___row_id" ) . unwrap ( ) ) ;
62+ if !projections. contains ( & file_source_schema. index_of ( ROW_ID_FIELD_NAME ) . unwrap ( ) ) {
63+ new_projections. push ( file_source_schema. index_of ( ROW_ID_FIELD_NAME ) . unwrap ( ) ) ;
5864
59- // let field = file_source_schema.field_with_name(&*"___row_id" ).expect("Field ___row_id not found in file_source_schema");
60- // fields.push(Arc::new(Field::new("___row_id" , field.data_type().clone(), field.is_nullable())));
65+ // let field = file_source_schema.field_with_name(&*ROW_ID_FIELD_NAME ).expect("Field ___row_id not found in file_source_schema");
66+ // fields.push(Arc::new(Field::new(ROW_ID_FIELD_NAME , field.data_type().clone(), field.is_nullable())));
6167 }
6268 new_projections. push ( file_source_schema. fields . len ( ) ) ;
63- // fields.push(Arc::new(Field::new("row_base", file_source_schema.field_with_name("___row_id" ).unwrap().data_type().clone(), true)));
69+ // fields.push(Arc::new(Field::new("row_base", file_source_schema.field_with_name(ROW_ID_FIELD_NAME ).unwrap().data_type().clone(), true)));
6470
6571 // Add row_base field to schema
6672
6773 let mut new_fields = file_source_schema. fields ( ) . clone ( ) . to_vec ( ) ;
68- new_fields. push ( Arc :: new ( Field :: new (
69- "row_base" ,
70- file_source_schema
71- . field_with_name ( "___row_id" )
72- . unwrap ( )
73- . data_type ( )
74- . clone ( ) ,
75- true ,
76- ) ) ) ;
74+ new_fields. push ( Arc :: new ( Field :: new ( ROW_BASE_FIELD_NAME , file_source_schema. field_with_name ( ROW_ID_FIELD_NAME ) . unwrap ( ) . data_type ( ) . clone ( ) , true ) ) ) ;
7775
7876 let new_schema = Arc :: new ( Schema {
7977 metadata : file_source_schema. metadata ( ) . clone ( ) ,
@@ -84,31 +82,23 @@ impl ProjectRowIdOptimizer {
8482 }
8583
8684 /// Creates a projection expression that adds `row_base` to `___row_id`.
87- fn build_projection_exprs (
88- & self ,
89- new_schema : & SchemaRef ,
90- ) -> Result < Vec < ( Arc < dyn PhysicalExpr > , String ) > , DataFusionError > {
91- let row_id_idx = new_schema
92- . index_of ( "___row_id" )
93- . expect ( "Field ___row_id missing" ) ;
94- let row_base_idx = new_schema
95- . index_of ( "row_base" )
96- . expect ( "Field row_base missing" ) ;
97-
85+ fn build_projection_exprs ( & self , new_schema : & SchemaRef ) -> Result < Vec < ( Arc < dyn PhysicalExpr > , String ) > , DataFusionError > {
86+ let row_id_idx = new_schema. index_of ( ROW_ID_FIELD_NAME ) . expect ( "Field ___row_id missing" ) ;
87+ let row_base_idx = new_schema. index_of ( ROW_BASE_FIELD_NAME ) . expect ( "Field row_base missing" ) ;
9888 let sum_expr: Arc < dyn PhysicalExpr > = Arc :: new ( BinaryExpr :: new (
99- Arc :: new ( Column :: new ( "___row_id" , row_id_idx) ) ,
89+ Arc :: new ( Column :: new ( ROW_ID_FIELD_NAME , row_id_idx) ) ,
10090 Operator :: Plus ,
101- Arc :: new ( Column :: new ( "row_base" , row_base_idx) ) ,
91+ Arc :: new ( Column :: new ( ROW_BASE_FIELD_NAME , row_base_idx) ) ,
10292 ) ) ;
10393
10494 let mut projection_exprs: Vec < ( Arc < dyn PhysicalExpr > , String ) > = Vec :: new ( ) ;
10595
10696 let mut has_row_id = false ;
10797 for field_name in new_schema. fields ( ) . to_vec ( ) {
108- if field_name. name ( ) == "___row_id" {
98+ if field_name. name ( ) == ROW_ID_FIELD_NAME {
10999 projection_exprs. push ( ( sum_expr. clone ( ) , field_name. name ( ) . clone ( ) ) ) ;
110100 has_row_id = true ;
111- } else if ( field_name. name ( ) != "row_base" ) {
101+ } else if ( field_name. name ( ) != ROW_BASE_FIELD_NAME ) {
112102 // Match the column by name from new_schema
113103 let idx = new_schema
114104 . index_of ( & * field_name. name ( ) . clone ( ) )
@@ -120,7 +110,7 @@ impl ProjectRowIdOptimizer {
120110 }
121111 }
122112 if !has_row_id {
123- projection_exprs. push ( ( sum_expr. clone ( ) , "___row_id" . parse ( ) . unwrap ( ) ) ) ;
113+ projection_exprs. push ( ( sum_expr. clone ( ) , ROW_ID_FIELD_NAME . parse ( ) . unwrap ( ) ) ) ;
124114 }
125115 Ok ( projection_exprs)
126116 }
@@ -147,7 +137,7 @@ impl ProjectRowIdOptimizer {
147137 }
148138}
149139
150- impl PhysicalOptimizerRule for ProjectRowIdOptimizer {
140+ impl PhysicalOptimizerRule for AbsoluteRowIdOptimizer {
151141 fn optimize (
152142 & self ,
153143 plan : Arc < dyn ExecutionPlan > ,
@@ -162,34 +152,16 @@ impl PhysicalOptimizerRule for ProjectRowIdOptimizer {
162152 . downcast_ref :: < FileScanConfig > ( )
163153 . expect ( "DataSource not found" ) ;
164154 let schema = datasource. file_schema . clone ( ) ;
165- schema
166- . field_with_name ( "___row_id" )
167- . expect ( "Field ___row_id missing" ) ;
168- let projection = self
169- . create_datasource_projection ( datasource, datasource_exec. schema ( ) )
170- . expect ( "Failed to create ProjectionExec from datasource" ) ;
171- return Ok ( Transformed :: new (
172- Arc :: new ( projection) ,
173- true ,
174- TreeNodeRecursion :: Continue ,
175- ) ) ;
155+ schema. field_with_name ( ROW_ID_FIELD_NAME ) . expect ( "Field ___row_id missing" ) ;
156+ let projection = self . create_datasource_projection ( datasource, datasource_exec. schema ( ) ) . expect ( "Failed to create ProjectionExec from datasource" ) ;
157+ return Ok ( Transformed :: new ( Arc :: new ( projection) , true , TreeNodeRecursion :: Continue ) ) ;
158+
176159 } else if let Some ( projection_exec) = node. as_any ( ) . downcast_ref :: < ProjectionExec > ( ) {
177- if !projection_exec
178- . schema ( )
179- . field_with_name ( "___row_id" )
180- . is_ok ( )
181- {
160+ if !projection_exec. schema ( ) . field_with_name ( ROW_ID_FIELD_NAME ) . is_ok ( ) {
161+
182162 let mut projection_exprs = projection_exec. expr ( ) . to_vec ( ) ;
183- if ( projection_exec
184- . input ( )
185- . schema ( )
186- . index_of ( "___row_id" )
187- . is_ok ( ) )
188- {
189- if projection_exec. input ( ) . schema ( ) . index_of ( "___row_id" ) . is_ok ( ) {
190- let row_id_col: Arc < dyn PhysicalExpr > = Arc :: new ( Column :: new ( "___row_id" , projection_exec. input ( ) . schema ( ) . index_of ( "___row_id" ) . unwrap ( ) ) ) ;
191- projection_exprs. push ( ProjectionExpr :: new ( row_id_col, "___row_id" . to_string ( ) ) ) ;
192- }
163+ if ( projection_exec. input ( ) . schema ( ) . index_of ( ROW_ID_FIELD_NAME ) . is_ok ( ) ) {
164+ projection_exprs. push ( ProjectionExpr :: new ( Arc :: new ( Column :: new ( ROW_ID_FIELD_NAME , projection_exec. input ( ) . schema ( ) . index_of ( ROW_ID_FIELD_NAME ) . unwrap ( ) ) ) , ROW_ID_FIELD_NAME . to_string ( ) ) ) ;
193165 }
194166
195167 let projection =
0 commit comments