Skip to content

Commit bae85ca

Browse files
authored
Add delta report for benchmarks (#91)
* Add delta report for benchmarks * rename compare_with_previous to maybe_compare_with_previous
1 parent a2a8163 commit bae85ca

File tree

5 files changed

+126
-25
lines changed

5 files changed

+126
-25
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ serde_json = "1.0.141"
1616
env_logger = "0.11.8"
1717
async-trait = "0.1.88"
1818
datafusion-proto = { version = "49.0.0", optional = true }
19+
chrono = "0.4.41"
1920

2021
[[bin]]
2122
name = "dfbench"

benchmarks/README.md

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,5 @@ Generate TPCH data into the `data/` dir
1313
After generating the data with the command above:
1414

1515
```shell
16-
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path data/tpch_sf1
17-
```
18-
19-
In order to validate the correctness of the results against single node execution, add
20-
`--validate`
21-
22-
```shell
23-
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path data/tpch_sf1 --validate
24-
```
16+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1
17+
```

benchmarks/src/tpch/run.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion::datasource::listing::{
3636
};
3737
use datafusion::datasource::{MemTable, TableProvider};
3838
use datafusion::error::{DataFusionError, Result};
39-
use datafusion::execution::{SessionState, SessionStateBuilder};
39+
use datafusion::execution::SessionStateBuilder;
4040
use datafusion::physical_plan::display::DisplayableExecutionPlan;
4141
use datafusion::physical_plan::{collect, displayable};
4242
use datafusion::prelude::*;
@@ -75,7 +75,7 @@ pub struct RunOpt {
7575
path: PathBuf,
7676

7777
/// File format: `csv` or `parquet`
78-
#[structopt(short = "f", long = "format", default_value = "csv")]
78+
#[structopt(short = "f", long = "format", default_value = "parquet")]
7979
file_format: String,
8080

8181
/// Load the data into a MemTable before executing the query
@@ -100,7 +100,11 @@ pub struct RunOpt {
100100
#[structopt(short = "t", long = "sorted")]
101101
sorted: bool,
102102

103-
/// The maximum number of partitions per task.
103+
/// Run in distributed mode.
104+
#[structopt(short = "D", long = "distributed")]
105+
distributed: bool,
106+
107+
/// Number of partitions per task.
104108
#[structopt(long = "ppt")]
105109
partitions_per_task: Option<usize>,
106110
}
@@ -109,7 +113,7 @@ pub struct RunOpt {
109113
impl SessionBuilder for RunOpt {
110114
fn session_state_builder(
111115
&self,
112-
builder: SessionStateBuilder,
116+
mut builder: SessionStateBuilder,
113117
) -> Result<SessionStateBuilder, DataFusionError> {
114118
let mut config = self
115119
.common
@@ -133,13 +137,16 @@ impl SessionBuilder for RunOpt {
133137
// end critical options section
134138
let rt_builder = self.common.runtime_env_builder()?;
135139

136-
let mut rule = DistributedPhysicalOptimizerRule::new();
137-
if let Some(ppt) = self.partitions_per_task {
138-
rule = rule.with_maximum_partitions_per_task(ppt);
140+
if self.distributed {
141+
let mut rule = DistributedPhysicalOptimizerRule::new();
142+
if let Some(partitions_per_task) = self.partitions_per_task {
143+
rule = rule.with_maximum_partitions_per_task(partitions_per_task)
144+
}
145+
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
139146
}
147+
140148
Ok(builder
141149
.with_config(config)
142-
.with_physical_optimizer_rule(Arc::new(rule))
143150
.with_runtime_env(rt_builder.build_arc()?))
144151
}
145152

@@ -153,14 +160,16 @@ impl SessionBuilder for RunOpt {
153160
}
154161

155162
impl RunOpt {
156-
pub async fn run(self) -> Result<()> {
163+
pub async fn run(mut self) -> Result<()> {
157164
let (ctx, _guard) = start_localhost_context(1, self.clone()).await;
158165
println!("Running benchmarks with the following options: {self:?}");
159166
let query_range = match self.query {
160167
Some(query_id) => query_id..=query_id,
161168
None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
162169
};
163170

171+
self.output_path
172+
.get_or_insert_with(|| self.path.join("results.json"));
164173
let mut benchmark_run = BenchmarkRun::new();
165174

166175
for query_id in query_range {
@@ -178,6 +187,7 @@ impl RunOpt {
178187
}
179188
}
180189
}
190+
benchmark_run.maybe_compare_with_previous(self.output_path.as_ref())?;
181191
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
182192
benchmark_run.maybe_print_failures();
183193
Ok(())

benchmarks/src/util/run.rs

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use chrono::{DateTime, Utc};
1819
use datafusion::common::utils::get_available_parallelism;
20+
use datafusion::error::DataFusionError;
1921
use datafusion::{error::Result, DATAFUSION_VERSION};
20-
use serde::{Serialize, Serializer};
22+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
2123
use serde_json::Value;
24+
use std::error::Error;
2225
use std::{
2326
collections::HashMap,
2427
path::Path,
@@ -36,14 +39,29 @@ where
3639
.as_secs(),
3740
)
3841
}
42+
fn deserialize_start_time<'de, D>(des: D) -> Result<SystemTime, D::Error>
43+
where
44+
D: Deserializer<'de>,
45+
{
46+
let secs = u64::deserialize(des)?;
47+
Ok(SystemTime::UNIX_EPOCH + Duration::from_secs(secs))
48+
}
49+
3950
fn serialize_elapsed<S>(elapsed: &Duration, ser: S) -> Result<S::Ok, S::Error>
4051
where
4152
S: Serializer,
4253
{
4354
let ms = elapsed.as_secs_f64() * 1000.0;
4455
ser.serialize_f64(ms)
4556
}
46-
#[derive(Debug, Serialize)]
57+
fn deserialize_elapsed<'de, D>(des: D) -> Result<Duration, D::Error>
58+
where
59+
D: Deserializer<'de>,
60+
{
61+
let ms = f64::deserialize(des)?;
62+
Ok(Duration::from_secs_f64(ms / 1000.0))
63+
}
64+
#[derive(Debug, Serialize, Deserialize)]
4765
pub struct RunContext {
4866
/// Benchmark crate version
4967
pub benchmark_version: String,
@@ -52,7 +70,10 @@ pub struct RunContext {
5270
/// Number of CPU cores
5371
pub num_cpus: usize,
5472
/// Start time
55-
#[serde(serialize_with = "serialize_start_time")]
73+
#[serde(
74+
serialize_with = "serialize_start_time",
75+
deserialize_with = "deserialize_start_time"
76+
)]
5677
pub start_time: SystemTime,
5778
/// CLI arguments
5879
pub arguments: Vec<String>,
@@ -77,18 +98,24 @@ impl RunContext {
7798
}
7899

79100
/// A single iteration of a benchmark query
80-
#[derive(Debug, Serialize)]
101+
#[derive(Debug, Serialize, Deserialize)]
81102
struct QueryIter {
82-
#[serde(serialize_with = "serialize_elapsed")]
103+
#[serde(
104+
serialize_with = "serialize_elapsed",
105+
deserialize_with = "deserialize_elapsed"
106+
)]
83107
elapsed: Duration,
84108
row_count: usize,
85109
}
86110
/// A single benchmark case
87-
#[derive(Debug, Serialize)]
111+
#[derive(Debug, Serialize, Deserialize)]
88112
pub struct BenchQuery {
89113
query: String,
90114
iterations: Vec<QueryIter>,
91-
#[serde(serialize_with = "serialize_start_time")]
115+
#[serde(
116+
serialize_with = "serialize_start_time",
117+
deserialize_with = "deserialize_start_time"
118+
)]
92119
start_time: SystemTime,
93120
success: bool,
94121
}
@@ -181,4 +208,71 @@ impl BenchmarkRun {
181208
};
182209
Ok(())
183210
}
211+
212+
pub fn maybe_compare_with_previous(&self, maybe_path: Option<impl AsRef<Path>>) -> Result<()> {
213+
let Some(path) = maybe_path else {
214+
return Ok(());
215+
};
216+
let Ok(prev) = std::fs::read(path) else {
217+
return Ok(());
218+
};
219+
220+
let mut prev_output: HashMap<&str, Value> =
221+
serde_json::from_slice(&prev).map_err(external)?;
222+
223+
let prev_queries: Vec<BenchQuery> =
224+
serde_json::from_value(prev_output.remove("queries").unwrap()).map_err(external)?;
225+
226+
let mut header_printed = false;
227+
for query in self.queries.iter() {
228+
let Some(prev_query) = prev_queries.iter().find(|v| v.query == query.query) else {
229+
continue;
230+
};
231+
if prev_query.iterations.is_empty() {
232+
continue;
233+
}
234+
if query.iterations.is_empty() {
235+
println!("{}: Failed ❌", query.query);
236+
continue;
237+
}
238+
239+
let avg_prev = prev_query.avg();
240+
let avg = query.avg();
241+
let (f, tag, emoji) = if avg < avg_prev {
242+
let f = avg_prev as f64 / avg as f64;
243+
(f, "faster", if f > 1.2 { "✅" } else { "✔" })
244+
} else {
245+
let f = avg as f64 / avg_prev as f64;
246+
(f, "slower", if f > 1.2 { "❌" } else { "✖" })
247+
};
248+
if !header_printed {
249+
header_printed = true;
250+
let datetime: DateTime<Utc> = prev_query.start_time.into();
251+
println!(
252+
"==== Comparison with the previous benchmark from {} ====",
253+
datetime.format("%Y-%m-%d %H:%M:%S UTC")
254+
);
255+
}
256+
println!(
257+
"{:>8}: prev={avg_prev:>4} ms, new={avg:>4} ms, diff={f:.2} {tag} {emoji}",
258+
query.query
259+
);
260+
}
261+
262+
Ok(())
263+
}
264+
}
265+
266+
impl BenchQuery {
267+
fn avg(&self) -> u128 {
268+
self.iterations
269+
.iter()
270+
.map(|v| v.elapsed.as_millis())
271+
.sum::<u128>()
272+
/ self.iterations.len() as u128
273+
}
274+
}
275+
276+
fn external(err: impl Error + Send + Sync + 'static) -> DataFusionError {
277+
DataFusionError::External(Box::new(err))
184278
}

0 commit comments

Comments
 (0)