Skip to content

Commit 1e61d51

Browse files
authored
Report host stats on TPCH benchmarks (#131)
* Add support for in-memory TPCH tests * Add --threads and --workers options in tpch benchmarks * Remove useless stuff * Automatically resolve tpch paths * Draft: spawn workers correctly in different tokio runtimes * Register tables only in one place * Spawn benchmark workers as a separate command * Rollback unnecessary changes in localhost.rs * Add run script * Report host stats on benchmarks * Default partitions to threads * Pass through arguments * Also log tasks per query * Add comment
1 parent c12c271 commit 1e61d51

File tree

4 files changed

+72
-56
lines changed

4 files changed

+72
-56
lines changed

benchmarks/src/tpch/run.rs

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ use super::{
2020
TPCH_QUERY_START_ID, TPCH_TABLES,
2121
};
2222
use crate::util::{
23-
BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryResult,
23+
BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryIter,
2424
WarmingUpMarker,
2525
};
2626
use async_trait::async_trait;
2727
use datafusion::arrow::record_batch::RecordBatch;
2828
use datafusion::arrow::util::pretty::{self, pretty_format_batches};
2929
use datafusion::common::instant::Instant;
30+
use datafusion::common::tree_node::{Transformed, TreeNode};
3031
use datafusion::common::utils::get_available_parallelism;
3132
use datafusion::common::{exec_err, DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
3233
use datafusion::datasource::file_format::csv::CsvFormat;
@@ -46,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{
4647
};
4748
use datafusion_distributed::{
4849
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
49-
DistributedSessionBuilderContext,
50+
DistributedSessionBuilderContext, StageExec,
5051
};
5152
use log::info;
5253
use std::fs;
@@ -186,7 +187,10 @@ impl RunOpt {
186187

187188
self.output_path
188189
.get_or_insert(self.get_path()?.join("results.json"));
189-
let mut benchmark_run = BenchmarkRun::new();
190+
let mut benchmark_run = BenchmarkRun::new(
191+
self.workers.len(),
192+
self.threads.unwrap_or(get_available_parallelism()),
193+
);
190194

191195
// Warmup the cache for the in-memory mode.
192196
if self.mem_table {
@@ -209,7 +213,7 @@ impl RunOpt {
209213
match query_run {
210214
Ok(query_results) => {
211215
for iter in query_results {
212-
benchmark_run.write_iter(iter.elapsed, iter.row_count);
216+
benchmark_run.write_iter(iter);
213217
}
214218
}
215219
Err(e) => {
@@ -228,13 +232,14 @@ impl RunOpt {
228232
&self,
229233
query_id: usize,
230234
ctx: &SessionContext,
231-
) -> Result<Vec<QueryResult>> {
235+
) -> Result<Vec<QueryIter>> {
232236
let mut millis = vec![];
233237
// run benchmark
234238
let mut query_results = vec![];
235239

236240
let sql = &get_query_sql(query_id)?;
237241

242+
let mut n_tasks = 0;
238243
for i in 0..self.iterations() {
239244
let start = Instant::now();
240245
let mut result = vec![];
@@ -245,7 +250,7 @@ impl RunOpt {
245250

246251
for (i, query) in sql.iter().enumerate() {
247252
if i == result_stmt {
248-
result = self.execute_query(ctx, query).await?;
253+
(result, n_tasks) = self.execute_query(ctx, query).await?;
249254
} else {
250255
self.execute_query(ctx, query).await?;
251256
}
@@ -260,11 +265,18 @@ impl RunOpt {
260265
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
261266
);
262267

263-
query_results.push(QueryResult { elapsed, row_count });
268+
query_results.push(QueryIter {
269+
elapsed,
270+
row_count,
271+
n_tasks,
272+
});
264273
}
265274

266275
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
267276
println!("Query {query_id} avg time: {avg:.2} ms");
277+
if n_tasks > 0 {
278+
println!("Query {query_id} number of tasks: {n_tasks}");
279+
}
268280

269281
Ok(query_results)
270282
}
@@ -276,7 +288,11 @@ impl RunOpt {
276288
Ok(())
277289
}
278290

279-
async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result<Vec<RecordBatch>> {
291+
async fn execute_query(
292+
&self,
293+
ctx: &SessionContext,
294+
sql: &str,
295+
) -> Result<(Vec<RecordBatch>, usize)> {
280296
let debug = self.common.debug;
281297
let plan = ctx.sql(sql).await?;
282298
let (state, plan) = plan.into_parts();
@@ -296,6 +312,13 @@ impl RunOpt {
296312
displayable(physical_plan.as_ref()).indent(true)
297313
);
298314
}
315+
let mut n_tasks = 0;
316+
physical_plan.clone().transform_down(|node| {
317+
if let Some(node) = node.as_any().downcast_ref::<StageExec>() {
318+
n_tasks += node.tasks.len()
319+
}
320+
Ok(Transformed::no(node))
321+
})?;
299322
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
300323
if debug {
301324
println!(
@@ -308,7 +331,7 @@ impl RunOpt {
308331
pretty::print_batches(&result)?;
309332
}
310333
}
311-
Ok(result)
334+
Ok((result, n_tasks))
312335
}
313336

314337
fn get_path(&self) -> Result<PathBuf> {

benchmarks/src/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ mod run;
2222

2323
pub use memory::{InMemoryCacheExecCodec, InMemoryDataSourceRule, WarmingUpMarker};
2424
pub use options::CommonOpt;
25-
pub use run::{BenchmarkRun, QueryResult};
25+
pub use run::{BenchmarkRun, QueryIter};

benchmarks/src/util/options.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ pub struct CommonOpt {
3737
pub iterations: usize,
3838

3939
/// Number of partitions to process in parallel. Defaults to number of available cores.
40+
/// Should typically be less or equal than --threads.
4041
#[structopt(short = "n", long = "partitions")]
4142
pub partitions: Option<usize>,
4243

benchmarks/src/util/run.rs

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,9 @@
1717

1818
use chrono::{DateTime, Utc};
1919
use datafusion::common::utils::get_available_parallelism;
20-
use datafusion::error::DataFusionError;
2120
use datafusion::{error::Result, DATAFUSION_VERSION};
2221
use serde::{Deserialize, Deserializer, Serialize, Serializer};
23-
use serde_json::Value;
24-
use std::error::Error;
2522
use std::{
26-
collections::HashMap,
2723
path::Path,
2824
time::{Duration, SystemTime},
2925
};
@@ -69,6 +65,10 @@ pub struct RunContext {
6965
pub datafusion_version: String,
7066
/// Number of CPU cores
7167
pub num_cpus: usize,
68+
/// Number of workers involved in a distributed query
69+
pub workers: usize,
70+
/// Number of physical threads used per worker
71+
pub threads: usize,
7272
/// Start time
7373
#[serde(
7474
serialize_with = "serialize_start_time",
@@ -79,18 +79,14 @@ pub struct RunContext {
7979
pub arguments: Vec<String>,
8080
}
8181

82-
impl Default for RunContext {
83-
fn default() -> Self {
84-
Self::new()
85-
}
86-
}
87-
8882
impl RunContext {
89-
pub fn new() -> Self {
83+
pub fn new(workers: usize, threads: usize) -> Self {
9084
Self {
9185
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
9286
datafusion_version: DATAFUSION_VERSION.to_owned(),
9387
num_cpus: get_available_parallelism(),
88+
workers,
89+
threads,
9490
start_time: SystemTime::now(),
9591
arguments: std::env::args().skip(1).collect::<Vec<String>>(),
9692
}
@@ -99,13 +95,14 @@ impl RunContext {
9995

10096
/// A single iteration of a benchmark query
10197
#[derive(Debug, Serialize, Deserialize)]
102-
struct QueryIter {
98+
pub struct QueryIter {
10399
#[serde(
104100
serialize_with = "serialize_elapsed",
105101
deserialize_with = "deserialize_elapsed"
106102
)]
107-
elapsed: Duration,
108-
row_count: usize,
103+
pub elapsed: Duration,
104+
pub row_count: usize,
105+
pub n_tasks: usize,
109106
}
110107
/// A single benchmark case
111108
#[derive(Debug, Serialize, Deserialize)]
@@ -119,29 +116,20 @@ pub struct BenchQuery {
119116
start_time: SystemTime,
120117
success: bool,
121118
}
122-
/// Internal representation of a single benchmark query iteration result.
123-
pub struct QueryResult {
124-
pub elapsed: Duration,
125-
pub row_count: usize,
126-
}
119+
127120
/// collects benchmark run data and then serializes it at the end
121+
#[derive(Debug, Serialize, Deserialize)]
128122
pub struct BenchmarkRun {
129123
context: RunContext,
130124
queries: Vec<BenchQuery>,
131125
current_case: Option<usize>,
132126
}
133127

134-
impl Default for BenchmarkRun {
135-
fn default() -> Self {
136-
Self::new()
137-
}
138-
}
139-
140128
impl BenchmarkRun {
141129
// create new
142-
pub fn new() -> Self {
130+
pub fn new(workers: usize, threads: usize) -> Self {
143131
Self {
144-
context: RunContext::new(),
132+
context: RunContext::new(workers, threads),
145133
queries: vec![],
146134
current_case: None,
147135
}
@@ -161,11 +149,9 @@ impl BenchmarkRun {
161149
}
162150
}
163151
/// Write a new iteration to the current case
164-
pub fn write_iter(&mut self, elapsed: Duration, row_count: usize) {
152+
pub fn write_iter(&mut self, query_iter: QueryIter) {
165153
if let Some(idx) = self.current_case {
166-
self.queries[idx]
167-
.iterations
168-
.push(QueryIter { elapsed, row_count })
154+
self.queries[idx].iterations.push(query_iter)
169155
} else {
170156
panic!("no cases existed yet");
171157
}
@@ -195,10 +181,7 @@ impl BenchmarkRun {
195181

196182
/// Stringify data into formatted json
197183
pub fn to_json(&self) -> String {
198-
let mut output = HashMap::<&str, Value>::new();
199-
output.insert("context", serde_json::to_value(&self.context).unwrap());
200-
output.insert("queries", serde_json::to_value(&self.queries).unwrap());
201-
serde_json::to_string_pretty(&output).unwrap()
184+
serde_json::to_string_pretty(&self).unwrap()
202185
}
203186

204187
/// Write data as json into output path if it exists.
@@ -217,15 +200,14 @@ impl BenchmarkRun {
217200
return Ok(());
218201
};
219202

220-
let mut prev_output: HashMap<&str, Value> =
221-
serde_json::from_slice(&prev).map_err(external)?;
222-
223-
let prev_queries: Vec<BenchQuery> =
224-
serde_json::from_value(prev_output.remove("queries").unwrap()).map_err(external)?;
203+
let Ok(prev_output) = serde_json::from_slice::<Self>(&prev) else {
204+
return Ok(());
205+
};
225206

226207
let mut header_printed = false;
227208
for query in self.queries.iter() {
228-
let Some(prev_query) = prev_queries.iter().find(|v| v.query == query.query) else {
209+
let Some(prev_query) = prev_output.queries.iter().find(|v| v.query == query.query)
210+
else {
229211
continue;
230212
};
231213
if prev_query.iterations.is_empty() {
@@ -248,10 +230,24 @@ impl BenchmarkRun {
248230
if !header_printed {
249231
header_printed = true;
250232
let datetime: DateTime<Utc> = prev_query.start_time.into();
251-
println!(
233+
let header = format!(
252234
"==== Comparison with the previous benchmark from {} ====",
253235
datetime.format("%Y-%m-%d %H:%M:%S UTC")
254236
);
237+
println!("{header}");
238+
// Print machine information
239+
println!("os: {}", std::env::consts::OS);
240+
println!("arch: {}", std::env::consts::ARCH);
241+
println!("cpu cores: {}", get_available_parallelism());
242+
println!(
243+
"threads: {} -> {}",
244+
prev_output.context.threads, self.context.threads
245+
);
246+
println!(
247+
"workers: {} -> {}",
248+
prev_output.context.workers, self.context.workers
249+
);
250+
println!("{}", "=".repeat(header.len()))
255251
}
256252
println!(
257253
"{:>8}: prev={avg_prev:>4} ms, new={avg:>4} ms, diff={f:.2} {tag} {emoji}",
@@ -272,7 +268,3 @@ impl BenchQuery {
272268
/ self.iterations.len() as u128
273269
}
274270
}
275-
276-
fn external(err: impl Error + Send + Sync + 'static) -> DataFusionError {
277-
DataFusionError::External(Box::new(err))
278-
}

0 commit comments

Comments
 (0)