diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 854ca0d..a035d0d 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -20,13 +20,14 @@ use super::{ TPCH_QUERY_START_ID, TPCH_TABLES, }; use crate::util::{ - BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryResult, + BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryIter, WarmingUpMarker, }; use async_trait::async_trait; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty::{self, pretty_format_batches}; use datafusion::common::instant::Instant; +use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::common::utils::get_available_parallelism; use datafusion::common::{exec_err, DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION}; use datafusion::datasource::file_format::csv::CsvFormat; @@ -46,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{ }; use datafusion_distributed::{ DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder, - DistributedSessionBuilderContext, + DistributedSessionBuilderContext, StageExec, }; use log::info; use std::fs; @@ -186,7 +187,10 @@ impl RunOpt { self.output_path .get_or_insert(self.get_path()?.join("results.json")); - let mut benchmark_run = BenchmarkRun::new(); + let mut benchmark_run = BenchmarkRun::new( + self.workers.len(), + self.threads.unwrap_or(get_available_parallelism()), + ); // Warmup the cache for the in-memory mode. if self.mem_table { @@ -209,7 +213,7 @@ impl RunOpt { match query_run { Ok(query_results) => { for iter in query_results { - benchmark_run.write_iter(iter.elapsed, iter.row_count); + benchmark_run.write_iter(iter); } } Err(e) => { @@ -228,13 +232,14 @@ impl RunOpt { &self, query_id: usize, ctx: &SessionContext, - ) -> Result> { + ) -> Result> { let mut millis = vec![]; // run benchmark let mut query_results = vec![]; let sql = &get_query_sql(query_id)?; + let mut n_tasks = 0; for i in 0..self.iterations() { let start = Instant::now(); let mut result = vec![]; @@ -245,7 +250,7 @@ impl RunOpt { for (i, query) in sql.iter().enumerate() { if i == result_stmt { - result = self.execute_query(ctx, query).await?; + (result, n_tasks) = self.execute_query(ctx, query).await?; } else { self.execute_query(ctx, query).await?; } @@ -260,11 +265,18 @@ impl RunOpt { "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" ); - query_results.push(QueryResult { elapsed, row_count }); + query_results.push(QueryIter { + elapsed, + row_count, + n_tasks, + }); } let avg = millis.iter().sum::() / millis.len() as f64; println!("Query {query_id} avg time: {avg:.2} ms"); + if n_tasks > 0 { + println!("Query {query_id} number of tasks: {n_tasks}"); + } Ok(query_results) } @@ -276,7 +288,11 @@ impl RunOpt { Ok(()) } - async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result> { + async fn execute_query( + &self, + ctx: &SessionContext, + sql: &str, + ) -> Result<(Vec, usize)> { let debug = self.common.debug; let plan = ctx.sql(sql).await?; let (state, plan) = plan.into_parts(); @@ -296,6 +312,13 @@ impl RunOpt { displayable(physical_plan.as_ref()).indent(true) ); } + let mut n_tasks = 0; + physical_plan.clone().transform_down(|node| { + if let Some(node) = node.as_any().downcast_ref::() { + n_tasks += node.tasks.len() + } + Ok(Transformed::no(node)) + })?; let result = collect(physical_plan.clone(), state.task_ctx()).await?; if debug { println!( @@ -308,7 +331,7 @@ impl RunOpt { pretty::print_batches(&result)?; } } - Ok(result) + Ok((result, n_tasks)) } fn get_path(&self) -> Result { diff --git a/benchmarks/src/util/mod.rs b/benchmarks/src/util/mod.rs index f8041cf..40d8d95 100644 --- a/benchmarks/src/util/mod.rs +++ b/benchmarks/src/util/mod.rs @@ -22,4 +22,4 @@ mod run; pub use memory::{InMemoryCacheExecCodec, InMemoryDataSourceRule, WarmingUpMarker}; pub use options::CommonOpt; -pub use run::{BenchmarkRun, QueryResult}; +pub use run::{BenchmarkRun, QueryIter}; diff --git a/benchmarks/src/util/options.rs b/benchmarks/src/util/options.rs index f36c175..f08ee7a 100644 --- a/benchmarks/src/util/options.rs +++ b/benchmarks/src/util/options.rs @@ -37,6 +37,7 @@ pub struct CommonOpt { pub iterations: usize, /// Number of partitions to process in parallel. Defaults to number of available cores. + /// Should typically be less or equal than --threads. #[structopt(short = "n", long = "partitions")] pub partitions: Option, diff --git a/benchmarks/src/util/run.rs b/benchmarks/src/util/run.rs index e7ed065..0ec6a6e 100644 --- a/benchmarks/src/util/run.rs +++ b/benchmarks/src/util/run.rs @@ -17,13 +17,9 @@ use chrono::{DateTime, Utc}; use datafusion::common::utils::get_available_parallelism; -use datafusion::error::DataFusionError; use datafusion::{error::Result, DATAFUSION_VERSION}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use serde_json::Value; -use std::error::Error; use std::{ - collections::HashMap, path::Path, time::{Duration, SystemTime}, }; @@ -69,6 +65,10 @@ pub struct RunContext { pub datafusion_version: String, /// Number of CPU cores pub num_cpus: usize, + /// Number of workers involved in a distributed query + pub workers: usize, + /// Number of physical threads used per worker + pub threads: usize, /// Start time #[serde( serialize_with = "serialize_start_time", @@ -79,18 +79,14 @@ pub struct RunContext { pub arguments: Vec, } -impl Default for RunContext { - fn default() -> Self { - Self::new() - } -} - impl RunContext { - pub fn new() -> Self { + pub fn new(workers: usize, threads: usize) -> Self { Self { benchmark_version: env!("CARGO_PKG_VERSION").to_owned(), datafusion_version: DATAFUSION_VERSION.to_owned(), num_cpus: get_available_parallelism(), + workers, + threads, start_time: SystemTime::now(), arguments: std::env::args().skip(1).collect::>(), } @@ -99,13 +95,14 @@ impl RunContext { /// A single iteration of a benchmark query #[derive(Debug, Serialize, Deserialize)] -struct QueryIter { +pub struct QueryIter { #[serde( serialize_with = "serialize_elapsed", deserialize_with = "deserialize_elapsed" )] - elapsed: Duration, - row_count: usize, + pub elapsed: Duration, + pub row_count: usize, + pub n_tasks: usize, } /// A single benchmark case #[derive(Debug, Serialize, Deserialize)] @@ -119,29 +116,20 @@ pub struct BenchQuery { start_time: SystemTime, success: bool, } -/// Internal representation of a single benchmark query iteration result. -pub struct QueryResult { - pub elapsed: Duration, - pub row_count: usize, -} + /// collects benchmark run data and then serializes it at the end +#[derive(Debug, Serialize, Deserialize)] pub struct BenchmarkRun { context: RunContext, queries: Vec, current_case: Option, } -impl Default for BenchmarkRun { - fn default() -> Self { - Self::new() - } -} - impl BenchmarkRun { // create new - pub fn new() -> Self { + pub fn new(workers: usize, threads: usize) -> Self { Self { - context: RunContext::new(), + context: RunContext::new(workers, threads), queries: vec![], current_case: None, } @@ -161,11 +149,9 @@ impl BenchmarkRun { } } /// Write a new iteration to the current case - pub fn write_iter(&mut self, elapsed: Duration, row_count: usize) { + pub fn write_iter(&mut self, query_iter: QueryIter) { if let Some(idx) = self.current_case { - self.queries[idx] - .iterations - .push(QueryIter { elapsed, row_count }) + self.queries[idx].iterations.push(query_iter) } else { panic!("no cases existed yet"); } @@ -195,10 +181,7 @@ impl BenchmarkRun { /// Stringify data into formatted json pub fn to_json(&self) -> String { - let mut output = HashMap::<&str, Value>::new(); - output.insert("context", serde_json::to_value(&self.context).unwrap()); - output.insert("queries", serde_json::to_value(&self.queries).unwrap()); - serde_json::to_string_pretty(&output).unwrap() + serde_json::to_string_pretty(&self).unwrap() } /// Write data as json into output path if it exists. @@ -217,15 +200,14 @@ impl BenchmarkRun { return Ok(()); }; - let mut prev_output: HashMap<&str, Value> = - serde_json::from_slice(&prev).map_err(external)?; - - let prev_queries: Vec = - serde_json::from_value(prev_output.remove("queries").unwrap()).map_err(external)?; + let Ok(prev_output) = serde_json::from_slice::(&prev) else { + return Ok(()); + }; let mut header_printed = false; for query in self.queries.iter() { - let Some(prev_query) = prev_queries.iter().find(|v| v.query == query.query) else { + let Some(prev_query) = prev_output.queries.iter().find(|v| v.query == query.query) + else { continue; }; if prev_query.iterations.is_empty() { @@ -248,10 +230,24 @@ impl BenchmarkRun { if !header_printed { header_printed = true; let datetime: DateTime = prev_query.start_time.into(); - println!( + let header = format!( "==== Comparison with the previous benchmark from {} ====", datetime.format("%Y-%m-%d %H:%M:%S UTC") ); + println!("{header}"); + // Print machine information + println!("os: {}", std::env::consts::OS); + println!("arch: {}", std::env::consts::ARCH); + println!("cpu cores: {}", get_available_parallelism()); + println!( + "threads: {} -> {}", + prev_output.context.threads, self.context.threads + ); + println!( + "workers: {} -> {}", + prev_output.context.workers, self.context.workers + ); + println!("{}", "=".repeat(header.len())) } println!( "{:>8}: prev={avg_prev:>4} ms, new={avg:>4} ms, diff={f:.2} {tag} {emoji}", @@ -272,7 +268,3 @@ impl BenchQuery { / self.iterations.len() as u128 } } - -fn external(err: impl Error + Send + Sync + 'static) -> DataFusionError { - DataFusionError::External(Box::new(err)) -}