Skip to content

Commit 4c19675

Browse files
committed
add bench
1 parent 38255bd commit 4c19675

File tree

3 files changed

+101
-1
lines changed

3 files changed

+101
-1
lines changed

datafusion/physical-plan/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,7 @@ name = "partial_ordering"
8686
[[bench]]
8787
harness = false
8888
name = "spill_io"
89+
90+
[[bench]]
91+
harness = false
92+
name = "sort_merge_join"
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_schema::SortOptions;
19+
use criterion::{criterion_group, criterion_main, Criterion};
20+
use datafusion_common::JoinType::Inner;
21+
use datafusion_execution::config::SessionConfig;
22+
use datafusion_execution::disk_manager::DiskManagerConfig;
23+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
24+
use datafusion_execution::TaskContext;
25+
use datafusion_physical_expr::expressions::Column;
26+
use datafusion_physical_plan::common::collect;
27+
use datafusion_physical_plan::joins::SortMergeJoinExec;
28+
use datafusion_physical_plan::test::{build_table_i32, TestMemoryExec};
29+
use datafusion_physical_plan::ExecutionPlan;
30+
use std::sync::Arc;
31+
use tokio::runtime::Runtime;
32+
33+
fn create_test_data() -> SortMergeJoinExec {
34+
let left_batch = build_table_i32(
35+
("a1", &vec![0, 1, 2, 3, 4, 5]),
36+
("b1", &vec![1, 2, 3, 4, 5, 6]),
37+
("c1", &vec![4, 5, 6, 7, 8, 9]),
38+
);
39+
let left_schema = left_batch.schema();
40+
let left =
41+
TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None).unwrap();
42+
let right_batch = build_table_i32(
43+
("a2", &vec![0, 10, 20, 30, 40]),
44+
("b2", &vec![1, 3, 4, 6, 8]),
45+
("c2", &vec![50, 60, 70, 80, 90]),
46+
);
47+
let right_schema = right_batch.schema();
48+
let right =
49+
TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None).unwrap();
50+
let on = vec![(
51+
Arc::new(Column::new_with_schema("b1", &left.schema()).unwrap()) as _,
52+
Arc::new(Column::new_with_schema("b2", &right.schema()).unwrap()) as _,
53+
)];
54+
let sort_options = vec![SortOptions::default(); on.len()];
55+
56+
SortMergeJoinExec::try_new(left, right, on, None, Inner, sort_options, false)
57+
.unwrap()
58+
}
59+
60+
// `cargo bench --bench sort_merge_join`
61+
fn bench_spill(c: &mut Criterion) {
62+
let sort_merge_join_exec = create_test_data();
63+
64+
let mut group = c.benchmark_group("sort_merge_join_spill");
65+
let rt = Runtime::new().unwrap();
66+
67+
let runtime = RuntimeEnvBuilder::new()
68+
.with_memory_limit(100, 1.0) // Set memory limit to 100 bytes
69+
.with_disk_manager(DiskManagerConfig::NewOs) // Enable DiskManager to allow spilling
70+
.build_arc()
71+
.unwrap();
72+
let session_config = SessionConfig::default();
73+
let task_ctx = Arc::new(
74+
TaskContext::default()
75+
.with_session_config(session_config.clone())
76+
.with_runtime(Arc::clone(&runtime)),
77+
);
78+
79+
group.bench_function("SortMergeJoinExec_spill", |b| {
80+
b.iter(|| {
81+
criterion::black_box(
82+
rt.block_on(async {
83+
let stream = sort_merge_join_exec.execute(0, Arc::clone(&task_ctx)).unwrap();
84+
collect(stream).await.unwrap()
85+
})
86+
)
87+
})
88+
});
89+
group.finish();
90+
91+
assert!(sort_merge_join_exec.metrics().unwrap().spill_count().unwrap() > 0);
92+
assert!(sort_merge_join_exec.metrics().unwrap().spilled_bytes().unwrap() > 0);
93+
assert!(sort_merge_join_exec.metrics().unwrap().spilled_rows().unwrap() > 0);
94+
}
95+
96+
criterion_group!(benches, bench_spill);
97+
criterion_main!(benches);

datafusion/physical-plan/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,5 +92,4 @@ pub mod udaf {
9292
}
9393

9494
pub mod coalesce;
95-
#[cfg(test)]
9695
pub mod test;

0 commit comments

Comments
 (0)