Skip to content

Commit 11d0c33

Browse files
committed
Add pushdown helper methods and benchmark updates
Introduce helpers to create pushdown-enabled SessionContext instances and assert that parquet scans include pushed-down row filters in the nested filter benchmarks. Update array pushdown and selectivity benchmarks to utilize the configured contexts, validate physical plans for row filters, and continue to collect results for execution timing.
1 parent 7a4742f commit 11d0c33

File tree

1 file changed

+65
-15
lines changed

1 file changed

+65
-15
lines changed

datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ use arrow::buffer::{OffsetBuffer, ScalarBuffer};
3636
use arrow::datatypes::{DataType, Field, Schema};
3737
use arrow::record_batch::RecordBatch;
3838
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
39+
use datafusion::config::{ConfigOptions, SessionConfig};
40+
use datafusion::datasource::{file_scan_config::FileScanConfig, source::DataSourceExec};
3941
use datafusion::execution::context::SessionContext;
42+
use datafusion::physical_plan::ExecutionPlan;
4043
use datafusion::prelude::*;
4144
use parquet::arrow::ArrowWriter;
4245
use parquet::file::properties::WriterProperties;
@@ -89,12 +92,12 @@ fn generate_sorted_list_data(
8992
]));
9093

9194
let file = File::create(&file_path)?;
92-
95+
9396
// Configure writer with explicit row group size
9497
let props = WriterProperties::builder()
9598
.set_max_row_group_size(config.rows_per_group)
9699
.build();
97-
100+
98101
let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))
99102
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
100103

@@ -163,6 +166,39 @@ fn generate_sorted_list_data(
163166
Ok(file_path)
164167
}
165168

169+
fn assert_scan_has_row_filter(plan: &Arc<dyn ExecutionPlan>) {
170+
let mut stack = vec![Arc::clone(plan)];
171+
172+
while let Some(plan) = stack.pop() {
173+
if let Some(source_exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
174+
if let Some(file_scan_config) = source_exec
175+
.data_source()
176+
.as_any()
177+
.downcast_ref::<FileScanConfig>()
178+
{
179+
assert!(
180+
file_scan_config.file_source().filter().is_some(),
181+
"Expected DataSourceExec to include a pushed-down row filter"
182+
);
183+
return;
184+
}
185+
}
186+
187+
stack.extend(plan.children().into_iter().cloned());
188+
}
189+
190+
panic!("Expected physical plan to contain a DataSourceExec");
191+
}
192+
193+
fn create_pushdown_context() -> SessionContext {
194+
let mut config_options = ConfigOptions::new();
195+
config_options.execution.parquet.pushdown_filters = true;
196+
config_options.execution.parquet.reorder_filters = true;
197+
198+
let session_config = SessionConfig::new().with_options(config_options);
199+
SessionContext::new_with_config(session_config)
200+
}
201+
166202
/// Benchmark for array_has filter with pushdown enabled
167203
///
168204
/// This measures the performance of filtering using array_has when pushdown
@@ -191,8 +227,8 @@ fn benchmark_array_has_with_pushdown(c: &mut Criterion) {
191227
)),
192228
|b| {
193229
b.to_async(&rt).iter(|| async {
194-
let ctx = SessionContext::new();
195-
230+
let ctx = create_pushdown_context();
231+
196232
// Register the parquet file
197233
ctx.register_parquet(
198234
"test_table",
@@ -204,12 +240,19 @@ fn benchmark_array_has_with_pushdown(c: &mut Criterion) {
204240

205241
// Execute query with array_has filter
206242
// This should demonstrate pushdown benefits for selective filters
207-
let sql = "SELECT * FROM test_table WHERE array_has(list_col, 'aa0_value_a')";
243+
let sql =
244+
"SELECT * FROM test_table WHERE array_has(list_col, 'aa0_value_a')";
208245
let df = ctx.sql(sql).await.expect("Failed to create dataframe");
209-
246+
247+
let plan = df
248+
.create_physical_plan()
249+
.await
250+
.expect("Failed to create physical plan");
251+
assert_scan_has_row_filter(&plan);
252+
210253
// Collect results to ensure full execution
211254
let results = df.collect().await.expect("Failed to collect results");
212-
255+
213256
black_box(results)
214257
});
215258
},
@@ -227,13 +270,13 @@ fn benchmark_selectivity_comparison(c: &mut Criterion) {
227270
let mut group = c.benchmark_group("parquet_selectivity_impact");
228271

229272
let temp_dir = TempDir::new().expect("Failed to create temp directory");
230-
273+
231274
// Pre-generate all test data
232275
let test_cases = vec![
233-
(0.1, "aa0_value_a"), // 10% - matches first row group
234-
(0.3, "ac0_value_a"), // 30% - matches first 3 row groups
235-
(0.5, "ae0_value_a"), // 50% - matches first 5 row groups
236-
(0.9, "ai0_value_a"), // 90% - matches first 9 row groups
276+
(0.1, "aa0_value_a"), // 10% - matches first row group
277+
(0.3, "ac0_value_a"), // 30% - matches first 3 row groups
278+
(0.5, "ae0_value_a"), // 50% - matches first 5 row groups
279+
(0.9, "ai0_value_a"), // 90% - matches first 9 row groups
237280
];
238281

239282
for (selectivity, _target_value) in test_cases {
@@ -253,8 +296,8 @@ fn benchmark_selectivity_comparison(c: &mut Criterion) {
253296
)),
254297
|b| {
255298
b.to_async(&rt).iter(|| async {
256-
let ctx = SessionContext::new();
257-
299+
let ctx = create_pushdown_context();
300+
258301
ctx.register_parquet(
259302
"test_table",
260303
file_path.to_str().unwrap(),
@@ -266,8 +309,15 @@ fn benchmark_selectivity_comparison(c: &mut Criterion) {
266309
// Use a filter that matches the selectivity level
267310
let sql = "SELECT COUNT(*) FROM test_table WHERE array_has(list_col, 'aa0_value_a')";
268311
let df = ctx.sql(sql).await.expect("Failed to create dataframe");
312+
313+
let plan = df
314+
.create_physical_plan()
315+
.await
316+
.expect("Failed to create physical plan");
317+
assert_scan_has_row_filter(&plan);
318+
269319
let results = df.collect().await.expect("Failed to collect");
270-
320+
271321
black_box(results)
272322
});
273323
},

0 commit comments

Comments
 (0)