diff --git a/Cargo.lock b/Cargo.lock index 4b03f80..bc630ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -685,7 +685,9 @@ checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", + "wasm-bindgen", "windows-link", ] @@ -1143,6 +1145,7 @@ name = "datafusion-distributed-benchmarks" version = "0.1.0" dependencies = [ "async-trait", + "chrono", "datafusion", "datafusion-distributed", "datafusion-proto", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index cd65950..41babb4 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -16,6 +16,7 @@ serde_json = "1.0.141" env_logger = "0.11.8" async-trait = "0.1.88" datafusion-proto = { version = "49.0.0", optional = true } +chrono = "0.4.41" [[bin]] name = "dfbench" diff --git a/benchmarks/README.md b/benchmarks/README.md index 75c1b8b..19da292 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -13,12 +13,5 @@ Generate TPCH data into the `data/` dir After generating the data with the command above: ```shell -cargo run -p datafusion-distributed-benchmarks --release -- tpch --path data/tpch_sf1 -``` - -In order to validate the correctness of the results against single node execution, add -`--validate` - -```shell -cargo run -p datafusion-distributed-benchmarks --release -- tpch --path data/tpch_sf1 --validate -``` +cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1 +``` \ No newline at end of file diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 882603a..507e408 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -36,7 +36,7 @@ use datafusion::datasource::listing::{ }; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; -use datafusion::execution::{SessionState, SessionStateBuilder}; +use datafusion::execution::SessionStateBuilder; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; @@ -75,7 +75,7 @@ pub struct RunOpt { path: PathBuf, /// File format: `csv` or `parquet` - #[structopt(short = "f", long = "format", default_value = "csv")] + #[structopt(short = "f", long = "format", default_value = "parquet")] file_format: String, /// Load the data into a MemTable before executing the query @@ -100,7 +100,11 @@ pub struct RunOpt { #[structopt(short = "t", long = "sorted")] sorted: bool, - /// The maximum number of partitions per task. + /// Run in distributed mode. + #[structopt(short = "D", long = "distributed")] + distributed: bool, + + /// Number of partitions per task. #[structopt(long = "ppt")] partitions_per_task: Option, } @@ -109,7 +113,7 @@ pub struct RunOpt { impl SessionBuilder for RunOpt { fn session_state_builder( &self, - builder: SessionStateBuilder, + mut builder: SessionStateBuilder, ) -> Result { let mut config = self .common @@ -133,13 +137,16 @@ impl SessionBuilder for RunOpt { // end critical options section let rt_builder = self.common.runtime_env_builder()?; - let mut rule = DistributedPhysicalOptimizerRule::new(); - if let Some(ppt) = self.partitions_per_task { - rule = rule.with_maximum_partitions_per_task(ppt); + if self.distributed { + let mut rule = DistributedPhysicalOptimizerRule::new(); + if let Some(partitions_per_task) = self.partitions_per_task { + rule = rule.with_maximum_partitions_per_task(partitions_per_task) + } + builder = builder.with_physical_optimizer_rule(Arc::new(rule)); } + Ok(builder .with_config(config) - .with_physical_optimizer_rule(Arc::new(rule)) .with_runtime_env(rt_builder.build_arc()?)) } @@ -153,7 +160,7 @@ impl SessionBuilder for RunOpt { } impl RunOpt { - pub async fn run(self) -> Result<()> { + pub async fn run(mut self) -> Result<()> { let (ctx, _guard) = start_localhost_context(1, self.clone()).await; println!("Running benchmarks with the following options: {self:?}"); let query_range = match self.query { @@ -161,6 +168,8 @@ impl RunOpt { None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID, }; + self.output_path + .get_or_insert_with(|| self.path.join("results.json")); let mut benchmark_run = BenchmarkRun::new(); for query_id in query_range { @@ -178,6 +187,7 @@ impl RunOpt { } } } + benchmark_run.maybe_compare_with_previous(self.output_path.as_ref())?; benchmark_run.maybe_write_json(self.output_path.as_ref())?; benchmark_run.maybe_print_failures(); Ok(()) diff --git a/benchmarks/src/util/run.rs b/benchmarks/src/util/run.rs index fdbe82a..e7ed065 100644 --- a/benchmarks/src/util/run.rs +++ b/benchmarks/src/util/run.rs @@ -15,10 +15,13 @@ // specific language governing permissions and limitations // under the License. +use chrono::{DateTime, Utc}; use datafusion::common::utils::get_available_parallelism; +use datafusion::error::DataFusionError; use datafusion::{error::Result, DATAFUSION_VERSION}; -use serde::{Serialize, Serializer}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_json::Value; +use std::error::Error; use std::{ collections::HashMap, path::Path, @@ -36,6 +39,14 @@ where .as_secs(), ) } +fn deserialize_start_time<'de, D>(des: D) -> Result +where + D: Deserializer<'de>, +{ + let secs = u64::deserialize(des)?; + Ok(SystemTime::UNIX_EPOCH + Duration::from_secs(secs)) +} + fn serialize_elapsed(elapsed: &Duration, ser: S) -> Result where S: Serializer, @@ -43,7 +54,14 @@ where let ms = elapsed.as_secs_f64() * 1000.0; ser.serialize_f64(ms) } -#[derive(Debug, Serialize)] +fn deserialize_elapsed<'de, D>(des: D) -> Result +where + D: Deserializer<'de>, +{ + let ms = f64::deserialize(des)?; + Ok(Duration::from_secs_f64(ms / 1000.0)) +} +#[derive(Debug, Serialize, Deserialize)] pub struct RunContext { /// Benchmark crate version pub benchmark_version: String, @@ -52,7 +70,10 @@ pub struct RunContext { /// Number of CPU cores pub num_cpus: usize, /// Start time - #[serde(serialize_with = "serialize_start_time")] + #[serde( + serialize_with = "serialize_start_time", + deserialize_with = "deserialize_start_time" + )] pub start_time: SystemTime, /// CLI arguments pub arguments: Vec, @@ -77,18 +98,24 @@ impl RunContext { } /// A single iteration of a benchmark query -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] struct QueryIter { - #[serde(serialize_with = "serialize_elapsed")] + #[serde( + serialize_with = "serialize_elapsed", + deserialize_with = "deserialize_elapsed" + )] elapsed: Duration, row_count: usize, } /// A single benchmark case -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct BenchQuery { query: String, iterations: Vec, - #[serde(serialize_with = "serialize_start_time")] + #[serde( + serialize_with = "serialize_start_time", + deserialize_with = "deserialize_start_time" + )] start_time: SystemTime, success: bool, } @@ -181,4 +208,71 @@ impl BenchmarkRun { }; Ok(()) } + + pub fn maybe_compare_with_previous(&self, maybe_path: Option>) -> Result<()> { + let Some(path) = maybe_path else { + return Ok(()); + }; + let Ok(prev) = std::fs::read(path) else { + return Ok(()); + }; + + let mut prev_output: HashMap<&str, Value> = + serde_json::from_slice(&prev).map_err(external)?; + + let prev_queries: Vec = + serde_json::from_value(prev_output.remove("queries").unwrap()).map_err(external)?; + + let mut header_printed = false; + for query in self.queries.iter() { + let Some(prev_query) = prev_queries.iter().find(|v| v.query == query.query) else { + continue; + }; + if prev_query.iterations.is_empty() { + continue; + } + if query.iterations.is_empty() { + println!("{}: Failed ❌", query.query); + continue; + } + + let avg_prev = prev_query.avg(); + let avg = query.avg(); + let (f, tag, emoji) = if avg < avg_prev { + let f = avg_prev as f64 / avg as f64; + (f, "faster", if f > 1.2 { "✅" } else { "✔" }) + } else { + let f = avg as f64 / avg_prev as f64; + (f, "slower", if f > 1.2 { "❌" } else { "✖" }) + }; + if !header_printed { + header_printed = true; + let datetime: DateTime = prev_query.start_time.into(); + println!( + "==== Comparison with the previous benchmark from {} ====", + datetime.format("%Y-%m-%d %H:%M:%S UTC") + ); + } + println!( + "{:>8}: prev={avg_prev:>4} ms, new={avg:>4} ms, diff={f:.2} {tag} {emoji}", + query.query + ); + } + + Ok(()) + } +} + +impl BenchQuery { + fn avg(&self) -> u128 { + self.iterations + .iter() + .map(|v| v.elapsed.as_millis()) + .sum::() + / self.iterations.len() as u128 + } +} + +fn external(err: impl Error + Send + Sync + 'static) -> DataFusionError { + DataFusionError::External(Box::new(err)) }