Skip to content

Commit bf6901d

Browse files
committed
Add delta report for benchmarks
1 parent 92c5a0e commit bf6901d

File tree

5 files changed

+126
-19
lines changed

5 files changed

+126
-19
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ serde_json = "1.0.141"
1616
env_logger = "0.11.8"
1717
async-trait = "0.1.88"
1818
datafusion-proto = { version = "49.0.0", optional = true }
19+
chrono = "0.4.41"
1920

2021
[[bin]]
2122
name = "dfbench"

benchmarks/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ Generate TPCH data into the `data/` dir
1313
After generating the data with the command above:
1414

1515
```shell
16-
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path data/tpch_sf1
16+
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1
1717
```

benchmarks/src/tpch/run.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion::datasource::listing::{
3636
};
3737
use datafusion::datasource::{MemTable, TableProvider};
3838
use datafusion::error::{DataFusionError, Result};
39-
use datafusion::execution::{SessionState, SessionStateBuilder};
39+
use datafusion::execution::SessionStateBuilder;
4040
use datafusion::physical_plan::display::DisplayableExecutionPlan;
4141
use datafusion::physical_plan::{collect, displayable};
4242
use datafusion::prelude::*;
@@ -75,7 +75,7 @@ pub struct RunOpt {
7575
path: PathBuf,
7676

7777
/// File format: `csv` or `parquet`
78-
#[structopt(short = "f", long = "format", default_value = "csv")]
78+
#[structopt(short = "f", long = "format", default_value = "parquet")]
7979
file_format: String,
8080

8181
/// Load the data into a MemTable before executing the query
@@ -100,8 +100,11 @@ pub struct RunOpt {
100100
#[structopt(short = "t", long = "sorted")]
101101
sorted: bool,
102102

103-
/// Mark the first column of each table as sorted in ascending order.
104-
/// The tables should have been created with the `--sort` option for this to have any effect.
103+
/// Run in distributed mode.
104+
#[structopt(short = "D", long = "distributed")]
105+
distributed: bool,
106+
107+
/// Number of partitions per task.
105108
#[structopt(long = "ppt")]
106109
partitions_per_task: Option<usize>,
107110
}
@@ -110,7 +113,7 @@ pub struct RunOpt {
110113
impl SessionBuilder for RunOpt {
111114
fn session_state_builder(
112115
&self,
113-
builder: SessionStateBuilder,
116+
mut builder: SessionStateBuilder,
114117
) -> Result<SessionStateBuilder, DataFusionError> {
115118
let mut config = self
116119
.common
@@ -119,13 +122,16 @@ impl SessionBuilder for RunOpt {
119122
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
120123
let rt_builder = self.common.runtime_env_builder()?;
121124

122-
let mut rule = DistributedPhysicalOptimizerRule::new();
123-
if let Some(ppt) = self.partitions_per_task {
124-
rule = rule.with_maximum_partitions_per_task(ppt);
125+
if self.distributed {
126+
let mut rule = DistributedPhysicalOptimizerRule::new();
127+
if let Some(partitions_per_task) = self.partitions_per_task {
128+
rule = rule.with_maximum_partitions_per_task(partitions_per_task)
129+
}
130+
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
125131
}
132+
126133
Ok(builder
127134
.with_config(config)
128-
.with_physical_optimizer_rule(Arc::new(rule))
129135
.with_runtime_env(rt_builder.build_arc()?))
130136
}
131137

@@ -139,14 +145,16 @@ impl SessionBuilder for RunOpt {
139145
}
140146

141147
impl RunOpt {
142-
pub async fn run(self) -> Result<()> {
143-
let (ctx, _guard) = start_localhost_context([50051], self.clone()).await;
148+
pub async fn run(mut self) -> Result<()> {
149+
let (ctx, _guard) = start_localhost_context([40051], self.clone()).await;
144150
println!("Running benchmarks with the following options: {self:?}");
145151
let query_range = match self.query {
146152
Some(query_id) => query_id..=query_id,
147153
None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
148154
};
149155

156+
self.output_path
157+
.get_or_insert_with(|| self.path.join("results.json"));
150158
let mut benchmark_run = BenchmarkRun::new();
151159

152160
for query_id in query_range {
@@ -164,6 +172,7 @@ impl RunOpt {
164172
}
165173
}
166174
}
175+
benchmark_run.compare_with_previous(self.output_path.as_ref())?;
167176
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
168177
benchmark_run.maybe_print_failures();
169178
Ok(())

benchmarks/src/util/run.rs

Lines changed: 101 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use chrono::{DateTime, Utc};
1819
use datafusion::common::utils::get_available_parallelism;
20+
use datafusion::error::DataFusionError;
1921
use datafusion::{error::Result, DATAFUSION_VERSION};
20-
use serde::{Serialize, Serializer};
22+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
2123
use serde_json::Value;
24+
use std::error::Error;
2225
use std::{
2326
collections::HashMap,
2427
path::Path,
@@ -36,14 +39,29 @@ where
3639
.as_secs(),
3740
)
3841
}
42+
fn deserialize_start_time<'de, D>(des: D) -> Result<SystemTime, D::Error>
43+
where
44+
D: Deserializer<'de>,
45+
{
46+
let secs = u64::deserialize(des)?;
47+
Ok(SystemTime::UNIX_EPOCH + Duration::from_secs(secs))
48+
}
49+
3950
fn serialize_elapsed<S>(elapsed: &Duration, ser: S) -> Result<S::Ok, S::Error>
4051
where
4152
S: Serializer,
4253
{
4354
let ms = elapsed.as_secs_f64() * 1000.0;
4455
ser.serialize_f64(ms)
4556
}
46-
#[derive(Debug, Serialize)]
57+
fn deserialize_elapsed<'de, D>(des: D) -> Result<Duration, D::Error>
58+
where
59+
D: Deserializer<'de>,
60+
{
61+
let ms = f64::deserialize(des)?;
62+
Ok(Duration::from_secs_f64(ms / 1000.0))
63+
}
64+
#[derive(Debug, Serialize, Deserialize)]
4765
pub struct RunContext {
4866
/// Benchmark crate version
4967
pub benchmark_version: String,
@@ -52,7 +70,10 @@ pub struct RunContext {
5270
/// Number of CPU cores
5371
pub num_cpus: usize,
5472
/// Start time
55-
#[serde(serialize_with = "serialize_start_time")]
73+
#[serde(
74+
serialize_with = "serialize_start_time",
75+
deserialize_with = "deserialize_start_time"
76+
)]
5677
pub start_time: SystemTime,
5778
/// CLI arguments
5879
pub arguments: Vec<String>,
@@ -77,18 +98,24 @@ impl RunContext {
7798
}
7899

79100
/// A single iteration of a benchmark query
80-
#[derive(Debug, Serialize)]
101+
#[derive(Debug, Serialize, Deserialize)]
81102
struct QueryIter {
82-
#[serde(serialize_with = "serialize_elapsed")]
103+
#[serde(
104+
serialize_with = "serialize_elapsed",
105+
deserialize_with = "deserialize_elapsed"
106+
)]
83107
elapsed: Duration,
84108
row_count: usize,
85109
}
86110
/// A single benchmark case
87-
#[derive(Debug, Serialize)]
111+
#[derive(Debug, Serialize, Deserialize)]
88112
pub struct BenchQuery {
89113
query: String,
90114
iterations: Vec<QueryIter>,
91-
#[serde(serialize_with = "serialize_start_time")]
115+
#[serde(
116+
serialize_with = "serialize_start_time",
117+
deserialize_with = "deserialize_start_time"
118+
)]
92119
start_time: SystemTime,
93120
success: bool,
94121
}
@@ -181,4 +208,71 @@ impl BenchmarkRun {
181208
};
182209
Ok(())
183210
}
211+
212+
pub fn compare_with_previous(&self, maybe_path: Option<impl AsRef<Path>>) -> Result<()> {
213+
let Some(path) = maybe_path else {
214+
return Ok(());
215+
};
216+
let Ok(prev) = std::fs::read(path) else {
217+
return Ok(());
218+
};
219+
220+
let mut prev_output: HashMap<&str, Value> =
221+
serde_json::from_slice(&prev).map_err(external)?;
222+
223+
let prev_queries: Vec<BenchQuery> =
224+
serde_json::from_value(prev_output.remove("queries").unwrap()).map_err(external)?;
225+
226+
let mut header_printed = false;
227+
for query in self.queries.iter() {
228+
let Some(prev_query) = prev_queries.iter().find(|v| v.query == query.query) else {
229+
continue;
230+
};
231+
if prev_query.iterations.is_empty() {
232+
continue;
233+
}
234+
if query.iterations.is_empty() {
235+
println!("{}: Failed ❌", query.query);
236+
continue;
237+
}
238+
239+
let avg_prev = prev_query.avg();
240+
let avg = query.avg();
241+
let (f, tag, emoji) = if avg < avg_prev {
242+
let f = avg_prev as f64 / avg as f64;
243+
(f, "faster", if f > 1.2 { "✅" } else { "✔" })
244+
} else {
245+
let f = avg as f64 / avg_prev as f64;
246+
(f, "slower", if f > 1.2 { "❌" } else { "✖" })
247+
};
248+
if !header_printed {
249+
header_printed = true;
250+
let datetime: DateTime<Utc> = prev_query.start_time.into();
251+
println!(
252+
"==== Comparison with the previous benchmark from {} ====",
253+
datetime.format("%Y-%m-%d %H:%M:%S UTC")
254+
);
255+
}
256+
println!(
257+
"{:>8}: prev={avg_prev:>4} ms, new={avg:>4} ms, diff={f:.2} {tag} {emoji}",
258+
query.query
259+
);
260+
}
261+
262+
Ok(())
263+
}
264+
}
265+
266+
impl BenchQuery {
267+
fn avg(&self) -> u128 {
268+
self.iterations
269+
.iter()
270+
.map(|v| v.elapsed.as_millis())
271+
.sum::<u128>()
272+
/ self.iterations.len() as u128
273+
}
274+
}
275+
276+
fn external(err: impl Error + Send + Sync + 'static) -> DataFusionError {
277+
DataFusionError::External(Box::new(err))
184278
}

0 commit comments

Comments
 (0)