Skip to content

Commit cdc6784

Browse files
committed
[X-1289] add benchmark for heavy operation for datafusion-materialized-views
1 parent a5379e2 commit cdc6784

File tree

2 files changed

+172
-0
lines changed

2 files changed

+172
-0
lines changed

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,9 @@ env_logger = "0.11.6"
5353
tempfile = "3.14.0"
5454
tokio = "1.42.0"
5555
url = "2.5.4"
56+
criterion = "0.4"
57+
58+
[[bench]]
59+
name = "materialized_views_benchmark"
60+
harness = false
61+
path = "benches/materialized_views_benchmark.rs"
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// benches/materialized_views_benchmark
2+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
6+
use datafusion::datasource::provider_as_source;
7+
use datafusion::datasource::TableProvider;
8+
use datafusion::prelude::SessionContext;
9+
use datafusion_common::Result as DfResult;
10+
use datafusion_expr::LogicalPlan;
11+
use datafusion_materialized_views::rewrite::normal_form::SpjNormalForm;
12+
use datafusion_sql::TableReference;
13+
use tokio::runtime::Builder;
14+
15+
// Utility: generate CREATE TABLE SQL with n columns named c0..c{n-1}
16+
fn make_create_table_sql(table_name: &str, ncols: usize) -> String {
17+
let cols = (0..ncols)
18+
.map(|i| format!("c{} INT", i))
19+
.collect::<Vec<_>>()
20+
.join(", ");
21+
format!(
22+
"CREATE TABLE {table} ({cols})",
23+
table = table_name,
24+
cols = cols
25+
)
26+
}
27+
28+
// Utility: generate a base SELECT that projects all columns and has a couple filters
29+
fn make_base_sql(table_name: &str, ncols: usize) -> String {
30+
let cols = (0..ncols)
31+
.map(|i| format!("c{}", i))
32+
.collect::<Vec<_>>()
33+
.join(", ");
34+
let mut where_clauses = vec![];
35+
if ncols > 0 {
36+
where_clauses.push("c0 >= 0".to_string());
37+
}
38+
if ncols > 1 {
39+
where_clauses.push("c0 + c1 >= 0".to_string());
40+
}
41+
let where_part = if where_clauses.is_empty() {
42+
"".to_string()
43+
} else {
44+
format!(" WHERE {}", where_clauses.join(" AND "))
45+
};
46+
format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part)
47+
}
48+
49+
// Utility: generate a query that is stricter and selects subset (so rewrite_from has a chance)
50+
fn make_query_sql(table_name: &str, ncols: usize) -> String {
51+
let take = std::cmp::max(1, ncols / 2);
52+
let cols = (0..take)
53+
.map(|i| format!("c{}", i))
54+
.collect::<Vec<_>>()
55+
.join(", ");
56+
57+
let mut where_clauses = vec![];
58+
if ncols > 0 {
59+
where_clauses.push("c0 >= 10".to_string());
60+
}
61+
if ncols > 1 {
62+
where_clauses.push("c0 * c1 > 100".to_string());
63+
}
64+
if ncols > 10 {
65+
where_clauses.push(format!("c{} >= 0", ncols - 1));
66+
}
67+
68+
let where_part = if where_clauses.is_empty() {
69+
"".to_string()
70+
} else {
71+
format!(" WHERE {}", where_clauses.join(" AND "))
72+
};
73+
74+
format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part)
75+
}
76+
77+
// Build fixture: create SessionContext, the table, then return LogicalPlans for base & query and table provider
78+
fn build_fixture_for_cols(
79+
rt: &tokio::runtime::Runtime,
80+
ncols: usize,
81+
) -> DfResult<(LogicalPlan, LogicalPlan, Arc<dyn TableProvider>)> {
82+
rt.block_on(async move {
83+
let ctx = SessionContext::new();
84+
85+
// create table
86+
let table_name = "t";
87+
let create_sql = make_create_table_sql(table_name, ncols);
88+
ctx.sql(&create_sql).await?.collect().await?; // create table in catalog
89+
90+
// base and query plans (optimize to normalize)
91+
let base_sql = make_base_sql(table_name, ncols);
92+
let query_sql = make_query_sql(table_name, ncols);
93+
94+
let base_df = ctx.sql(&base_sql).await?;
95+
let base_plan = base_df.into_optimized_plan()?;
96+
97+
let query_df = ctx.sql(&query_sql).await?;
98+
let query_plan = query_df.into_optimized_plan()?;
99+
100+
// get table provider (Arc<dyn TableProvider>)
101+
let table_ref = TableReference::bare(table_name);
102+
let provider: Arc<dyn TableProvider> = ctx.table_provider(table_ref.clone()).await?;
103+
104+
Ok((base_plan, query_plan, provider))
105+
})
106+
}
107+
108+
// Criterion benchmark
109+
fn criterion_benchmark(c: &mut Criterion) {
110+
// columns to test
111+
let col_cases = vec![1usize, 10, 20, 40, 80, 160, 320];
112+
113+
// build a tokio runtime that's broadly compatible
114+
let rt = Builder::new_current_thread()
115+
.enable_all()
116+
.build()
117+
.expect("tokio runtime");
118+
119+
let mut group = c.benchmark_group("view_matcher_spj");
120+
group.warm_up_time(Duration::from_secs(1));
121+
group.measurement_time(Duration::from_secs(5));
122+
group.sample_size(30);
123+
124+
for &ncols in &col_cases {
125+
// Build fixture
126+
let (base_plan, query_plan, provider) =
127+
build_fixture_for_cols(&rt, ncols).expect("fixture");
128+
129+
// Measure SpjNormalForm::new for base_plan and query_plan separately
130+
let id_base = BenchmarkId::new("spj_normal_form_new", format!("cols={}", ncols));
131+
group.throughput(Throughput::Elements(1));
132+
group.bench_with_input(id_base, &base_plan, |b, plan| {
133+
b.iter(|| {
134+
let _nf = SpjNormalForm::new(plan).unwrap();
135+
});
136+
});
137+
138+
let id_query_nf = BenchmarkId::new("spj_normal_form_new_query", format!("cols={}", ncols));
139+
group.bench_with_input(id_query_nf, &query_plan, |b, plan| {
140+
b.iter(|| {
141+
let _nf = SpjNormalForm::new(plan).unwrap();
142+
});
143+
});
144+
145+
// Precompute normal forms once (to measure rewrite_from cost only)
146+
let base_nf = SpjNormalForm::new(&base_plan).expect("base_nf");
147+
let query_nf = SpjNormalForm::new(&query_plan).expect("query_nf");
148+
149+
// qualifier for rewrite_from and a source created from the provider
150+
let qualifier = TableReference::bare("mv");
151+
let source = provider_as_source(Arc::clone(&provider));
152+
153+
// Benchmark rewrite_from (this is the heavy check)
154+
let id_rewrite = BenchmarkId::new("rewrite_from", format!("cols={}", ncols));
155+
group.bench_with_input(id_rewrite, &ncols, |b, &_n| {
156+
b.iter(|| {
157+
let _res = query_nf.rewrite_from(&base_nf, qualifier.clone(), Arc::clone(&source));
158+
});
159+
});
160+
}
161+
162+
group.finish();
163+
}
164+
165+
criterion_group!(benches, criterion_benchmark);
166+
criterion_main!(benches);

0 commit comments

Comments
 (0)