@@ -20,6 +20,7 @@ use std::sync::Arc;
2020use crate :: parquet:: Unit :: Page ;
2121use crate :: parquet:: { ContextWithParquet , Scenario } ;
2222
23+ use arrow:: array:: RecordBatch ;
2324use datafusion:: datasource:: file_format:: parquet:: ParquetFormat ;
2425use datafusion:: datasource:: file_format:: FileFormat ;
2526use datafusion:: datasource:: listing:: PartitionedFile ;
@@ -40,7 +41,11 @@ use futures::StreamExt;
4041use object_store:: path:: Path ;
4142use object_store:: ObjectMeta ;
4243
43- async fn get_parquet_exec ( state : & SessionState , filter : Expr ) -> DataSourceExec {
44+ async fn get_parquet_exec (
45+ state : & SessionState ,
46+ filter : Expr ,
47+ pushdown_filters : bool ,
48+ ) -> DataSourceExec {
4449 let object_store_url = ObjectStoreUrl :: local_filesystem ( ) ;
4550 let store = state. runtime_env ( ) . object_store ( & object_store_url) . unwrap ( ) ;
4651
@@ -78,7 +83,8 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
7883 let source = Arc :: new (
7984 ParquetSource :: default ( )
8085 . with_predicate ( predicate)
81- . with_enable_page_index ( true ) ,
86+ . with_enable_page_index ( true )
87+ . with_pushdown_filters ( pushdown_filters) ,
8288 ) ;
8389 let base_config = FileScanConfigBuilder :: new ( object_store_url, schema, source)
8490 . with_file ( partitioned_file)
@@ -87,38 +93,44 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
8793 DataSourceExec :: new ( Arc :: new ( base_config) )
8894}
8995
96+ async fn get_filter_results (
97+ state : & SessionState ,
98+ filter : Expr ,
99+ pushdown_filters : bool ,
100+ ) -> Vec < RecordBatch > {
101+ let parquet_exec = get_parquet_exec ( state, filter, pushdown_filters) . await ;
102+ let task_ctx = state. task_ctx ( ) ;
103+ let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
104+ let mut batches = Vec :: new ( ) ;
105+ while let Some ( Ok ( batch) ) = results. next ( ) . await {
106+ batches. push ( batch) ;
107+ }
108+ batches
109+ }
110+
90111#[ tokio:: test]
91112async fn page_index_filter_one_col ( ) {
92113 let session_ctx = SessionContext :: new ( ) ;
93114 let state = session_ctx. state ( ) ;
94- let task_ctx = state. task_ctx ( ) ;
95115
96116 // 1.create filter month == 1;
97117 let filter = col ( "month" ) . eq ( lit ( 1_i32 ) ) ;
98118
99- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
100-
101- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
102-
103- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
104-
119+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
105120 // `month = 1` from the page index should create below RowSelection
106121 // vec.push(RowSelector::select(312));
107122 // vec.push(RowSelector::skip(3330));
108123 // vec.push(RowSelector::select(339));
109124 // vec.push(RowSelector::skip(3319));
110125 // total 651 row
111- assert_eq ! ( batch. num_rows( ) , 651 ) ;
126+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 651 ) ;
127+
128+ let batches = get_filter_results ( & state, filter, true ) . await ;
129+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 620 ) ;
112130
113131 // 2. create filter month == 1 or month == 2;
114132 let filter = col ( "month" ) . eq ( lit ( 1_i32 ) ) . or ( col ( "month" ) . eq ( lit ( 2_i32 ) ) ) ;
115-
116- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
117-
118- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
119-
120- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
121-
133+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
122134 // `month = 1` or `month = 2` from the page index should create below RowSelection
123135 // vec.push(RowSelector::select(312));
124136 // vec.push(RowSelector::skip(900));
@@ -128,95 +140,78 @@ async fn page_index_filter_one_col() {
128140 // vec.push(RowSelector::skip(873));
129141 // vec.push(RowSelector::select(318));
130142 // vec.push(RowSelector::skip(2128));
131- assert_eq ! ( batch. num_rows( ) , 1281 ) ;
143+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 1281 ) ;
144+
145+ let batches = get_filter_results ( & state, filter, true ) . await ;
146+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 1180 ) ;
132147
133148 // 3. create filter month == 1 and month == 12;
134149 let filter = col ( "month" )
135150 . eq ( lit ( 1_i32 ) )
136151 . and ( col ( "month" ) . eq ( lit ( 12_i32 ) ) ) ;
152+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
153+ assert ! ( batches. is_empty( ) ) ;
137154
138- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
139-
140- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
141-
142- let batch = results. next ( ) . await ;
143-
144- assert ! ( batch. is_none( ) ) ;
155+ let batches = get_filter_results ( & state, filter, true ) . await ;
156+ assert ! ( batches. is_empty( ) ) ;
145157
146158 // 4.create filter 0 < month < 2 ;
147159 let filter = col ( "month" ) . gt ( lit ( 0_i32 ) ) . and ( col ( "month" ) . lt ( lit ( 2_i32 ) ) ) ;
148-
149- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
150-
151- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
152-
153- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
154-
160+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
155161 // should same with `month = 1`
156- assert_eq ! ( batch. num_rows( ) , 651 ) ;
157-
158- let session_ctx = SessionContext :: new ( ) ;
159- let task_ctx = session_ctx. task_ctx ( ) ;
162+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 651 ) ;
163+ let batches = get_filter_results ( & state, filter, true ) . await ;
164+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 620 ) ;
160165
161166 // 5.create filter date_string_col == "01/01/09"`;
162167 // Note this test doesn't apply type coercion so the literal must match the actual view type
163168 let filter = col ( "date_string_col" ) . eq ( lit ( ScalarValue :: new_utf8view ( "01/01/09" ) ) ) ;
164- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
165- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
166- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
169+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
170+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 14 ) ;
167171
168172 // there should only two pages match the filter
169173 // min max
170174 // page-20 0 01/01/09 01/02/09
171175 // page-21 0 01/01/09 01/01/09
172176 // each 7 rows
173- assert_eq ! ( batch. num_rows( ) , 14 ) ;
177+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 14 ) ;
178+ let batches = get_filter_results ( & state, filter, true ) . await ;
179+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 10 ) ;
174180}
175181
176182#[ tokio:: test]
177183async fn page_index_filter_multi_col ( ) {
178184 let session_ctx = SessionContext :: new ( ) ;
179185 let state = session_ctx. state ( ) ;
180- let task_ctx = session_ctx. task_ctx ( ) ;
181186
182187 // create filter month == 1 and year = 2009;
183188 let filter = col ( "month" ) . eq ( lit ( 1_i32 ) ) . and ( col ( "year" ) . eq ( lit ( 2009 ) ) ) ;
184-
185- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
186-
187- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
188-
189- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
190-
189+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
191190 // `year = 2009` from the page index should create below RowSelection
192191 // vec.push(RowSelector::select(3663));
193192 // vec.push(RowSelector::skip(3642));
194193 // combine with `month = 1` total 333 row
195- assert_eq ! ( batch. num_rows( ) , 333 ) ;
194+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 333 ) ;
195+ let batches = get_filter_results ( & state, filter, true ) . await ;
196+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 310 ) ;
196197
197198 // create filter (year = 2009 or id = 1) and month = 1;
198199 // this should only use `month = 1` to evaluate the page index.
199200 let filter = col ( "month" )
200201 . eq ( lit ( 1_i32 ) )
201202 . and ( col ( "year" ) . eq ( lit ( 2009 ) ) . or ( col ( "id" ) . eq ( lit ( 1 ) ) ) ) ;
202-
203- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
204-
205- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
206-
207- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
208- assert_eq ! ( batch. num_rows( ) , 651 ) ;
203+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
204+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 651 ) ;
205+ let batches = get_filter_results ( & state, filter, true ) . await ;
206+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 310 ) ;
209207
210208 // create filter (year = 2009 or id = 1)
211209 // this filter use two columns will not push down
212210 let filter = col ( "year" ) . eq ( lit ( 2009 ) ) . or ( col ( "id" ) . eq ( lit ( 1 ) ) ) ;
213-
214- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
215-
216- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
217-
218- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
219- assert_eq ! ( batch. num_rows( ) , 7300 ) ;
211+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
212+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 7300 ) ;
213+ let batches = get_filter_results ( & state, filter, true ) . await ;
214+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 3650 ) ;
220215
221216 // create filter (year = 2009 and id = 1) or (year = 2010)
222217 // this filter use two columns will not push down
@@ -226,13 +221,10 @@ async fn page_index_filter_multi_col() {
226221 . eq ( lit ( 2009 ) )
227222 . and ( col ( "id" ) . eq ( lit ( 1 ) ) )
228223 . or ( col ( "year" ) . eq ( lit ( 2010 ) ) ) ;
229-
230- let parquet_exec = get_parquet_exec ( & state, filter) . await ;
231-
232- let mut results = parquet_exec. execute ( 0 , task_ctx. clone ( ) ) . unwrap ( ) ;
233-
234- let batch = results. next ( ) . await . unwrap ( ) . unwrap ( ) ;
235- assert_eq ! ( batch. num_rows( ) , 7300 ) ;
224+ let batches = get_filter_results ( & state, filter. clone ( ) , false ) . await ;
225+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 7300 ) ;
226+ let batches = get_filter_results ( & state, filter, true ) . await ;
227+ assert_eq ! ( batches[ 0 ] . num_rows( ) , 3651 ) ;
236228}
237229
238230async fn test_prune (
0 commit comments