Skip to content

Commit ec7e88a

Browse files
authored
Add benchmark for heavy operation for datafusion-materialized-views (#101)
1 parent e620594 commit ec7e88a

File tree

2 files changed

+188
-0
lines changed

2 files changed

+188
-0
lines changed

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,13 @@ ordered-float = "5.0.0"
5050

5151
[dev-dependencies]
5252
anyhow = "1.0.95"
53+
criterion = "0.4"
5354
env_logger = "0.11.6"
5455
tempfile = "3.14.0"
5556
tokio = "1.42.0"
5657
url = "2.5.4"
58+
59+
[[bench]]
60+
name = "materialized_views_benchmark"
61+
harness = false
62+
path = "benches/materialized_views_benchmark.rs"
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
19+
use std::sync::Arc;
20+
use std::time::Duration;
21+
22+
use datafusion::datasource::provider_as_source;
23+
use datafusion::datasource::TableProvider;
24+
use datafusion::prelude::SessionContext;
25+
use datafusion_common::Result as DfResult;
26+
use datafusion_expr::LogicalPlan;
27+
use datafusion_materialized_views::rewrite::normal_form::SpjNormalForm;
28+
use datafusion_sql::TableReference;
29+
use tokio::runtime::Builder;
30+
31+
// Utility: generate CREATE TABLE SQL with n columns named c0..c{n-1}
32+
fn make_create_table_sql(table_name: &str, ncols: usize) -> String {
33+
let cols = (0..ncols)
34+
.map(|i| format!("c{} INT", i))
35+
.collect::<Vec<_>>()
36+
.join(", ");
37+
format!(
38+
"CREATE TABLE {table} ({cols})",
39+
table = table_name,
40+
cols = cols
41+
)
42+
}
43+
44+
// Utility: generate a base SELECT that projects all columns and has a couple filters
45+
fn make_base_sql(table_name: &str, ncols: usize) -> String {
46+
let cols = (0..ncols)
47+
.map(|i| format!("c{}", i))
48+
.collect::<Vec<_>>()
49+
.join(", ");
50+
let mut where_clauses = vec![];
51+
if ncols > 0 {
52+
where_clauses.push("c0 >= 0".to_string());
53+
}
54+
if ncols > 1 {
55+
where_clauses.push("c0 + c1 >= 0".to_string());
56+
}
57+
let where_part = if where_clauses.is_empty() {
58+
"".to_string()
59+
} else {
60+
format!(" WHERE {}", where_clauses.join(" AND "))
61+
};
62+
format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part)
63+
}
64+
65+
// Utility: generate a query that is stricter and selects subset (so rewrite_from has a chance)
66+
fn make_query_sql(table_name: &str, ncols: usize) -> String {
67+
let take = std::cmp::max(1, ncols / 2);
68+
let cols = (0..take)
69+
.map(|i| format!("c{}", i))
70+
.collect::<Vec<_>>()
71+
.join(", ");
72+
73+
let mut where_clauses = vec![];
74+
if ncols > 0 {
75+
where_clauses.push("c0 >= 10".to_string());
76+
}
77+
if ncols > 1 {
78+
where_clauses.push("c0 * c1 > 100".to_string());
79+
}
80+
if ncols > 10 {
81+
where_clauses.push(format!("c{} >= 0", ncols - 1));
82+
}
83+
84+
let where_part = if where_clauses.is_empty() {
85+
"".to_string()
86+
} else {
87+
format!(" WHERE {}", where_clauses.join(" AND "))
88+
};
89+
90+
format!("SELECT {cols} FROM {table}{where}", cols = cols, table = table_name, where = where_part)
91+
}
92+
93+
// Build fixture: create SessionContext, the table, then return LogicalPlans for base & query and table provider
94+
fn build_fixture_for_cols(
95+
rt: &tokio::runtime::Runtime,
96+
ncols: usize,
97+
) -> DfResult<(LogicalPlan, LogicalPlan, Arc<dyn TableProvider>)> {
98+
rt.block_on(async move {
99+
let ctx = SessionContext::new();
100+
101+
// create table
102+
let table_name = "t";
103+
let create_sql = make_create_table_sql(table_name, ncols);
104+
ctx.sql(&create_sql).await?.collect().await?; // create table in catalog
105+
106+
// base and query plans (optimize to normalize)
107+
let base_sql = make_base_sql(table_name, ncols);
108+
let query_sql = make_query_sql(table_name, ncols);
109+
110+
let base_df = ctx.sql(&base_sql).await?;
111+
let base_plan = base_df.into_optimized_plan()?;
112+
113+
let query_df = ctx.sql(&query_sql).await?;
114+
let query_plan = query_df.into_optimized_plan()?;
115+
116+
// get table provider (Arc<dyn TableProvider>)
117+
let table_ref = TableReference::bare(table_name);
118+
let provider: Arc<dyn TableProvider> = ctx.table_provider(table_ref.clone()).await?;
119+
120+
Ok((base_plan, query_plan, provider))
121+
})
122+
}
123+
124+
// Criterion benchmark
125+
fn criterion_benchmark(c: &mut Criterion) {
126+
// columns to test
127+
let col_cases = vec![1usize, 10, 20, 40, 80, 160, 320];
128+
129+
// build a tokio runtime that's broadly compatible
130+
let rt = Builder::new_current_thread()
131+
.enable_all()
132+
.build()
133+
.expect("tokio runtime");
134+
135+
let mut group = c.benchmark_group("view_matcher_spj");
136+
group.warm_up_time(Duration::from_secs(1));
137+
group.measurement_time(Duration::from_secs(5));
138+
group.sample_size(30);
139+
140+
for &ncols in &col_cases {
141+
// Build fixture
142+
let (base_plan, query_plan, provider) =
143+
build_fixture_for_cols(&rt, ncols).expect("fixture");
144+
145+
// Measure SpjNormalForm::new for base_plan and query_plan separately
146+
let id_base = BenchmarkId::new("spj_normal_form_new", format!("cols={}", ncols));
147+
group.throughput(Throughput::Elements(1));
148+
group.bench_with_input(id_base, &base_plan, |b, plan| {
149+
b.iter(|| {
150+
let _nf = SpjNormalForm::new(plan).unwrap();
151+
});
152+
});
153+
154+
let id_query_nf = BenchmarkId::new("spj_normal_form_new_query", format!("cols={}", ncols));
155+
group.bench_with_input(id_query_nf, &query_plan, |b, plan| {
156+
b.iter(|| {
157+
let _nf = SpjNormalForm::new(plan).unwrap();
158+
});
159+
});
160+
161+
// Precompute normal forms once (to measure rewrite_from cost only)
162+
let base_nf = SpjNormalForm::new(&base_plan).expect("base_nf");
163+
let query_nf = SpjNormalForm::new(&query_plan).expect("query_nf");
164+
165+
// qualifier for rewrite_from and a source created from the provider
166+
let qualifier = TableReference::bare("mv");
167+
let source = provider_as_source(Arc::clone(&provider));
168+
169+
// Benchmark rewrite_from (this is the heavy check)
170+
let id_rewrite = BenchmarkId::new("rewrite_from", format!("cols={}", ncols));
171+
group.bench_with_input(id_rewrite, &ncols, |b, &_n| {
172+
b.iter(|| {
173+
let _res = query_nf.rewrite_from(&base_nf, qualifier.clone(), Arc::clone(&source));
174+
});
175+
});
176+
}
177+
178+
group.finish();
179+
}
180+
181+
criterion_group!(benches, criterion_benchmark);
182+
criterion_main!(benches);

0 commit comments

Comments
 (0)