Skip to content

Commit 398095f

Browse files
committed
Refactor parquet data generator and benchmarks
Accept explicit target values in the parquet list data generator. Compute matching row groups based on selectivity and set up predictable matching/non-matching ranges for pushdown-focused datasets. Add per-case targets for selectivity benchmarks and document expected match counts, validating row matches with COUNT checks. Implement single-thread Tokio runtime helper for synchronous benchmarking and enable default DataFusion features for async SQL registration and execution.
1 parent 11d0c33 commit 398095f

File tree

2 files changed

+96
-31
lines changed

2 files changed

+96
-31
lines changed

datafusion/datasource-parquet/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ tokio = { workspace = true }
5757
[dev-dependencies]
5858
chrono = { workspace = true }
5959
criterion = { workspace = true }
60-
datafusion = { workspace = true }
60+
datafusion = { workspace = true, default-features = true }
6161
datafusion-functions-nested = { workspace = true }
6262
tempfile = { workspace = true }
6363

datafusion/datasource-parquet/benches/parquet_nested_filter_pushdown.rs

Lines changed: 95 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
//! statistics, significantly reducing the rows that need to be decoded and
3232
//! filtered.
3333
34-
use arrow::array::{ArrayRef, ListArray, StringArray};
34+
use arrow::array::{ArrayRef, ListArray, StringArray, UInt64Array};
3535
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
3636
use arrow::datatypes::{DataType, Field, Schema};
3737
use arrow::record_batch::RecordBatch;
@@ -48,7 +48,7 @@ use std::hint::black_box;
4848
use std::path::PathBuf;
4949
use std::sync::Arc;
5050
use tempfile::TempDir;
51-
use tokio::runtime::Runtime;
51+
use tokio::runtime::{Builder, Runtime};
5252

5353
/// Configuration for the benchmark dataset
5454
#[derive(Clone)]
@@ -78,7 +78,8 @@ impl BenchmarkConfig {
7878
fn generate_sorted_list_data(
7979
config: &BenchmarkConfig,
8080
temp_dir: &TempDir,
81-
) -> std::io::Result<PathBuf> {
81+
target_value: &str,
82+
) -> std::io::Result<(PathBuf, usize)> {
8283
let file_path = temp_dir.path().join("data.parquet");
8384

8485
// Define the schema with a List<String> column and an id column
@@ -102,10 +103,13 @@ fn generate_sorted_list_data(
102103
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
103104

104105
let num_groups = config.num_row_groups();
106+
let matching_groups =
107+
((num_groups as f64 * config.selectivity).ceil() as usize).clamp(1, num_groups);
105108
let mut row_id = 0i64;
106109

107110
// Generate row groups with sorted list values
108111
for group_idx in 0..num_groups {
112+
let should_match = group_idx < matching_groups;
109113
let mut batch_ids = Vec::new();
110114
let mut all_values = Vec::new();
111115
let mut offsets = vec![0i32];
@@ -115,19 +119,22 @@ fn generate_sorted_list_data(
115119
batch_ids.push(row_id);
116120
row_id += 1;
117121

118-
// Create lexicographically sorted values
119-
// Each row group has values in a contiguous range
120-
let base_char = (97 + (group_idx % 26)) as u8; // 'a' + group offset
121-
let char1 = base_char as char;
122-
let char2 = (97 + ((group_idx / 26) % 26)) as u8 as char;
123-
let char3 = (48 + (local_idx % 10)) as u8 as char; // '0' to '9'
124-
125-
let prefix = format!("{}{}{}", char1, char2, char3);
126-
127-
// Add 3 values per row
128-
all_values.push(format!("{}_value_a", prefix));
129-
all_values.push(format!("{}_value_b", prefix));
130-
all_values.push(format!("{}_value_c", prefix));
122+
// Create lexicographically sorted values. Matching row groups contain the
123+
// `target_value`, while non-matching groups use a higher prefix so the
124+
// min/max range excludes the target.
125+
let prefix = format!("g{:02}{}", group_idx, local_idx);
126+
if should_match {
127+
all_values.push(format!("{}_before", prefix));
128+
all_values.push(target_value.to_string());
129+
all_values.push(format!("{}_after", prefix));
130+
} else {
131+
// Keep all values lexicographically greater than `target_value` to
132+
// allow pushdown to skip these row groups when filtering by the
133+
// target.
134+
all_values.push(format!("zz{}_value_a", prefix));
135+
all_values.push(format!("zz{}_value_b", prefix));
136+
all_values.push(format!("zz{}_value_c", prefix));
137+
}
131138

132139
offsets.push((offsets.last().unwrap() + 3) as i32);
133140
}
@@ -163,7 +170,7 @@ fn generate_sorted_list_data(
163170
.finish()
164171
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
165172

166-
Ok(file_path)
173+
Ok((file_path, matching_groups))
167174
}
168175

169176
fn assert_scan_has_row_filter(plan: &Arc<dyn ExecutionPlan>) {
@@ -205,7 +212,7 @@ fn create_pushdown_context() -> SessionContext {
205212
/// is active. With selective filters, this should skip ~90% of row groups,
206213
/// resulting in minimal row decoding.
207214
fn benchmark_array_has_with_pushdown(c: &mut Criterion) {
208-
let rt = Runtime::new().expect("Failed to create runtime");
215+
let rt = build_runtime();
209216
let mut group = c.benchmark_group("parquet_array_has_pushdown");
210217

211218
// Test configuration: 100K rows, 10 row groups, selective filter (10% match)
@@ -216,7 +223,7 @@ fn benchmark_array_has_with_pushdown(c: &mut Criterion) {
216223
};
217224

218225
let temp_dir = TempDir::new().expect("Failed to create temp directory");
219-
let file_path = generate_sorted_list_data(&config, &temp_dir)
226+
let (file_path, _) = generate_sorted_list_data(&config, &temp_dir, "aa0_value_a")
220227
.expect("Failed to generate test data");
221228

222229
group.bench_function(
@@ -266,28 +273,40 @@ fn benchmark_array_has_with_pushdown(c: &mut Criterion) {
266273
/// Demonstrates how different selectivity levels (percentage of matching
267274
/// row groups) affect performance with pushdown enabled.
268275
fn benchmark_selectivity_comparison(c: &mut Criterion) {
269-
let rt = Runtime::new().expect("Failed to create runtime");
276+
let rt = build_runtime();
270277
let mut group = c.benchmark_group("parquet_selectivity_impact");
271278

272279
let temp_dir = TempDir::new().expect("Failed to create temp directory");
273280

274-
// Pre-generate all test data
281+
// Pre-generate all test data. Each selectivity level targets a fraction of the
282+
// ten row groups (rounded up), and the target value is injected into every row
283+
// within those matching groups:
284+
// - 10% => 1 matching row group => 10,000 matching rows
285+
// - 30% => 3 matching row groups => 30,000 matching rows
286+
// - 50% => 5 matching row groups => 50,000 matching rows
287+
// - 90% => 9 matching row groups => 90,000 matching rows
275288
let test_cases = vec![
276-
(0.1, "aa0_value_a"), // 10% - matches first row group
277-
(0.3, "ac0_value_a"), // 30% - matches first 3 row groups
278-
(0.5, "ae0_value_a"), // 50% - matches first 5 row groups
279-
(0.9, "ai0_value_a"), // 90% - matches first 9 row groups
289+
(0.1, "aa0_value_a"),
290+
(0.3, "ac0_value_a"),
291+
(0.5, "ae0_value_a"),
292+
(0.9, "ai0_value_a"),
280293
];
281294

282-
for (selectivity, _target_value) in test_cases {
295+
for (selectivity, target_value) in test_cases {
283296
let config = BenchmarkConfig {
284297
total_rows: 100_000,
285298
rows_per_group: 10_000,
286299
selectivity,
287300
};
288301

289-
let file_path = generate_sorted_list_data(&config, &temp_dir)
290-
.expect("Failed to generate test data");
302+
let (file_path, matching_groups) =
303+
generate_sorted_list_data(&config, &temp_dir, target_value)
304+
.expect("Failed to generate test data");
305+
306+
// Validate that the generated data matches the expected selectivity so each
307+
// benchmark run measures a different pushdown rate.
308+
let expected_match_rows = matching_groups * config.rows_per_group;
309+
validate_match_rate(&rt, file_path.clone(), target_value, expected_match_rows);
291310

292311
group.bench_function(
293312
BenchmarkId::from_parameter(format!(
@@ -307,8 +326,11 @@ fn benchmark_selectivity_comparison(c: &mut Criterion) {
307326
.expect("Failed to register parquet");
308327

309328
// Use a filter that matches the selectivity level
310-
let sql = "SELECT COUNT(*) FROM test_table WHERE array_has(list_col, 'aa0_value_a')";
311-
let df = ctx.sql(sql).await.expect("Failed to create dataframe");
329+
let sql = format!(
330+
"SELECT COUNT(*) FROM test_table WHERE array_has(list_col, '{}')",
331+
target_value
332+
);
333+
let df = ctx.sql(&sql).await.expect("Failed to create dataframe");
312334

313335
let plan = df
314336
.create_physical_plan()
@@ -333,3 +355,46 @@ criterion_group!(
333355
benchmark_selectivity_comparison
334356
);
335357
criterion_main!(benches);
358+
359+
fn validate_match_rate(
360+
rt: &Runtime,
361+
file_path: PathBuf,
362+
target_value: &str,
363+
expected_match_rows: usize,
364+
) {
365+
let actual_match_rows = rt.block_on(async {
366+
let ctx = SessionContext::new();
367+
ctx.register_parquet(
368+
"test_table",
369+
file_path.to_str().unwrap(),
370+
ParquetReadOptions::default(),
371+
)
372+
.await
373+
.expect("Failed to register parquet");
374+
375+
let sql = format!(
376+
"SELECT COUNT(*) FROM test_table WHERE array_has(list_col, '{}')",
377+
target_value
378+
);
379+
let df = ctx.sql(&sql).await.expect("Failed to create dataframe");
380+
let results = df.collect().await.expect("Failed to collect");
381+
let count_array = results[0]
382+
.column(0)
383+
.as_any()
384+
.downcast_ref::<UInt64Array>()
385+
.expect("COUNT(*) should be UInt64");
386+
count_array.value(0) as usize
387+
});
388+
389+
assert_eq!(
390+
actual_match_rows, expected_match_rows,
391+
"Generated data did not match expected selectivity"
392+
);
393+
}
394+
395+
fn build_runtime() -> Runtime {
396+
Builder::new_current_thread()
397+
.enable_all()
398+
.build()
399+
.expect("Failed to create runtime")
400+
}

0 commit comments

Comments
 (0)