Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use super::{
TPCH_QUERY_START_ID, TPCH_TABLES,
};
use crate::util::{
BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryResult,
BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryIter,
WarmingUpMarker,
};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::{self, pretty_format_batches};
use datafusion::common::instant::Instant;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::utils::get_available_parallelism;
use datafusion::common::{exec_err, DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
use datafusion::datasource::file_format::csv::CsvFormat;
Expand All @@ -46,7 +47,7 @@ use datafusion_distributed::test_utils::localhost::{
};
use datafusion_distributed::{
DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder,
DistributedSessionBuilderContext,
DistributedSessionBuilderContext, StageExec,
};
use log::info;
use std::fs;
Expand Down Expand Up @@ -186,7 +187,10 @@ impl RunOpt {

self.output_path
.get_or_insert(self.get_path()?.join("results.json"));
let mut benchmark_run = BenchmarkRun::new();
let mut benchmark_run = BenchmarkRun::new(
self.workers.len(),
self.threads.unwrap_or(get_available_parallelism()),
);

// Warmup the cache for the in-memory mode.
if self.mem_table {
Expand All @@ -209,7 +213,7 @@ impl RunOpt {
match query_run {
Ok(query_results) => {
for iter in query_results {
benchmark_run.write_iter(iter.elapsed, iter.row_count);
benchmark_run.write_iter(iter);
}
}
Err(e) => {
Expand All @@ -228,13 +232,14 @@ impl RunOpt {
&self,
query_id: usize,
ctx: &SessionContext,
) -> Result<Vec<QueryResult>> {
) -> Result<Vec<QueryIter>> {
let mut millis = vec![];
// run benchmark
let mut query_results = vec![];

let sql = &get_query_sql(query_id)?;

let mut n_tasks = 0;
for i in 0..self.iterations() {
let start = Instant::now();
let mut result = vec![];
Expand All @@ -245,7 +250,7 @@ impl RunOpt {

for (i, query) in sql.iter().enumerate() {
if i == result_stmt {
result = self.execute_query(ctx, query).await?;
(result, n_tasks) = self.execute_query(ctx, query).await?;
} else {
self.execute_query(ctx, query).await?;
}
Expand All @@ -260,11 +265,18 @@ impl RunOpt {
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
);

query_results.push(QueryResult { elapsed, row_count });
query_results.push(QueryIter {
elapsed,
row_count,
n_tasks,
});
}

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");
if n_tasks > 0 {
println!("Query {query_id} number of tasks: {n_tasks}");
}

Ok(query_results)
}
Expand All @@ -276,7 +288,11 @@ impl RunOpt {
Ok(())
}

async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result<Vec<RecordBatch>> {
async fn execute_query(
&self,
ctx: &SessionContext,
sql: &str,
) -> Result<(Vec<RecordBatch>, usize)> {
let debug = self.common.debug;
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();
Expand All @@ -296,6 +312,13 @@ impl RunOpt {
displayable(physical_plan.as_ref()).indent(true)
);
}
let mut n_tasks = 0;
physical_plan.clone().transform_down(|node| {
if let Some(node) = node.as_any().downcast_ref::<StageExec>() {
n_tasks += node.tasks.len()
}
Ok(Transformed::no(node))
})?;
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
if debug {
println!(
Expand All @@ -308,7 +331,7 @@ impl RunOpt {
pretty::print_batches(&result)?;
}
}
Ok(result)
Ok((result, n_tasks))
}

fn get_path(&self) -> Result<PathBuf> {
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ mod run;

pub use memory::{InMemoryCacheExecCodec, InMemoryDataSourceRule, WarmingUpMarker};
pub use options::CommonOpt;
pub use run::{BenchmarkRun, QueryResult};
pub use run::{BenchmarkRun, QueryIter};
1 change: 1 addition & 0 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct CommonOpt {
pub iterations: usize,

/// Number of partitions to process in parallel. Defaults to number of available cores.
/// Should typically be less or equal than --threads.
#[structopt(short = "n", long = "partitions")]
pub partitions: Option<usize>,

Expand Down
84 changes: 38 additions & 46 deletions benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@

use chrono::{DateTime, Utc};
use datafusion::common::utils::get_available_parallelism;
use datafusion::error::DataFusionError;
use datafusion::{error::Result, DATAFUSION_VERSION};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;
use std::error::Error;
use std::{
collections::HashMap,
path::Path,
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -69,6 +65,10 @@ pub struct RunContext {
pub datafusion_version: String,
/// Number of CPU cores
pub num_cpus: usize,
/// Number of workers involved in a distributed query
pub workers: usize,
/// Number of physical threads used per worker
pub threads: usize,
/// Start time
#[serde(
serialize_with = "serialize_start_time",
Expand All @@ -79,18 +79,14 @@ pub struct RunContext {
pub arguments: Vec<String>,
}

impl Default for RunContext {
fn default() -> Self {
Self::new()
}
}

impl RunContext {
pub fn new() -> Self {
pub fn new(workers: usize, threads: usize) -> Self {
Self {
benchmark_version: env!("CARGO_PKG_VERSION").to_owned(),
datafusion_version: DATAFUSION_VERSION.to_owned(),
num_cpus: get_available_parallelism(),
workers,
threads,
start_time: SystemTime::now(),
arguments: std::env::args().skip(1).collect::<Vec<String>>(),
}
Expand All @@ -99,13 +95,14 @@ impl RunContext {

/// A single iteration of a benchmark query
#[derive(Debug, Serialize, Deserialize)]
struct QueryIter {
pub struct QueryIter {
#[serde(
serialize_with = "serialize_elapsed",
deserialize_with = "deserialize_elapsed"
)]
elapsed: Duration,
row_count: usize,
pub elapsed: Duration,
pub row_count: usize,
pub n_tasks: usize,
}
/// A single benchmark case
#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -119,29 +116,20 @@ pub struct BenchQuery {
start_time: SystemTime,
success: bool,
}
/// Internal representation of a single benchmark query iteration result.
pub struct QueryResult {
pub elapsed: Duration,
pub row_count: usize,
}

/// collects benchmark run data and then serializes it at the end
#[derive(Debug, Serialize, Deserialize)]
pub struct BenchmarkRun {
context: RunContext,
queries: Vec<BenchQuery>,
current_case: Option<usize>,
}

impl Default for BenchmarkRun {
fn default() -> Self {
Self::new()
}
}

impl BenchmarkRun {
// create new
pub fn new() -> Self {
pub fn new(workers: usize, threads: usize) -> Self {
Self {
context: RunContext::new(),
context: RunContext::new(workers, threads),
queries: vec![],
current_case: None,
}
Expand All @@ -161,11 +149,9 @@ impl BenchmarkRun {
}
}
/// Write a new iteration to the current case
pub fn write_iter(&mut self, elapsed: Duration, row_count: usize) {
pub fn write_iter(&mut self, query_iter: QueryIter) {
if let Some(idx) = self.current_case {
self.queries[idx]
.iterations
.push(QueryIter { elapsed, row_count })
self.queries[idx].iterations.push(query_iter)
} else {
panic!("no cases existed yet");
}
Expand Down Expand Up @@ -195,10 +181,7 @@ impl BenchmarkRun {

/// Stringify data into formatted json
pub fn to_json(&self) -> String {
let mut output = HashMap::<&str, Value>::new();
output.insert("context", serde_json::to_value(&self.context).unwrap());
output.insert("queries", serde_json::to_value(&self.queries).unwrap());
serde_json::to_string_pretty(&output).unwrap()
serde_json::to_string_pretty(&self).unwrap()
}

/// Write data as json into output path if it exists.
Expand All @@ -217,15 +200,14 @@ impl BenchmarkRun {
return Ok(());
};

let mut prev_output: HashMap<&str, Value> =
serde_json::from_slice(&prev).map_err(external)?;

let prev_queries: Vec<BenchQuery> =
serde_json::from_value(prev_output.remove("queries").unwrap()).map_err(external)?;
let Ok(prev_output) = serde_json::from_slice::<Self>(&prev) else {
return Ok(());
};

let mut header_printed = false;
for query in self.queries.iter() {
let Some(prev_query) = prev_queries.iter().find(|v| v.query == query.query) else {
let Some(prev_query) = prev_output.queries.iter().find(|v| v.query == query.query)
else {
continue;
};
if prev_query.iterations.is_empty() {
Expand All @@ -248,10 +230,24 @@ impl BenchmarkRun {
if !header_printed {
header_printed = true;
let datetime: DateTime<Utc> = prev_query.start_time.into();
println!(
let header = format!(
"==== Comparison with the previous benchmark from {} ====",
datetime.format("%Y-%m-%d %H:%M:%S UTC")
);
println!("{header}");
// Print machine information
println!("os: {}", std::env::consts::OS);
println!("arch: {}", std::env::consts::ARCH);
println!("cpu cores: {}", get_available_parallelism());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the number of cores per worker or total of cpu cores of all workers?
Is there any constraints between this and number of threads per worker? Should number of threads per worker < CPU cores per worker? Is there any enforcement or unexpected behavior if it is not enforced

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the number of cores per worker or total of cpu cores of all workers?

Total CPU cores available in the machine that runs the benchmarks

Is there any constraints between this and number of threads per worker?

Only the ones specified by the people running the commands, it's possible to do --threads 1000 in a 2 core CPU machine. It does not make much sense to do that though...

Should number of threads per worker < CPU cores per worker?

Assuming get_available_parallelism() returns the number of CPU cores in the machine, I think so yes. As these are just benchmarks meant to be run by us, I think it's fine to not do any enforcement in the code, hopefully people running the benchmarks know what they are doing if the do --threads 1000 in a 2 core CPU machine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree people who run the benchmark should know what we are doing. Maybe we just help remind them that. How about we add a brief message saying number of threads should be < number of cores and number of partitions should be < number of threads?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a doc comment in the CLI

println!(
"threads: {} -> {}",
prev_output.context.threads, self.context.threads
);
println!(
"workers: {} -> {}",
prev_output.context.workers, self.context.workers
);
println!("{}", "=".repeat(header.len()))
}
println!(
"{:>8}: prev={avg_prev:>4} ms, new={avg:>4} ms, diff={f:.2} {tag} {emoji}",
Expand All @@ -272,7 +268,3 @@ impl BenchQuery {
/ self.iterations.len() as u128
}
}

fn external(err: impl Error + Send + Sync + 'static) -> DataFusionError {
DataFusionError::External(Box::new(err))
}