Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ serde_json = "1.0.141"
env_logger = "0.11.8"
async-trait = "0.1.88"
datafusion-proto = { version = "49.0.0", optional = true }
chrono = "0.4.41"

[[bin]]
name = "dfbench"
Expand Down
11 changes: 2 additions & 9 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,5 @@ Generate TPCH data into the `data/` dir
After generating the data with the command above:

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path data/tpch_sf1
```

In order to validate the correctness of the results against single node execution, add
`--validate`

```shell
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path data/tpch_sf1 --validate
```
cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1
```
28 changes: 19 additions & 9 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use datafusion::datasource::listing::{
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::{SessionState, SessionStateBuilder};
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct RunOpt {
path: PathBuf,

/// File format: `csv` or `parquet`
#[structopt(short = "f", long = "format", default_value = "csv")]
#[structopt(short = "f", long = "format", default_value = "parquet")]
file_format: String,

/// Load the data into a MemTable before executing the query
Expand All @@ -100,7 +100,11 @@ pub struct RunOpt {
#[structopt(short = "t", long = "sorted")]
sorted: bool,

/// The maximum number of partitions per task.
/// Run in distributed mode.
#[structopt(short = "D", long = "distributed")]
distributed: bool,

/// Number of partitions per task.
#[structopt(long = "ppt")]
partitions_per_task: Option<usize>,
}
Expand All @@ -109,7 +113,7 @@ pub struct RunOpt {
impl SessionBuilder for RunOpt {
fn session_state_builder(
&self,
builder: SessionStateBuilder,
mut builder: SessionStateBuilder,
) -> Result<SessionStateBuilder, DataFusionError> {
let mut config = self
.common
Expand All @@ -133,13 +137,16 @@ impl SessionBuilder for RunOpt {
// end critical options section
let rt_builder = self.common.runtime_env_builder()?;

let mut rule = DistributedPhysicalOptimizerRule::new();
if let Some(ppt) = self.partitions_per_task {
rule = rule.with_maximum_partitions_per_task(ppt);
if self.distributed {
let mut rule = DistributedPhysicalOptimizerRule::new();
if let Some(partitions_per_task) = self.partitions_per_task {
rule = rule.with_maximum_partitions_per_task(partitions_per_task)
}
builder = builder.with_physical_optimizer_rule(Arc::new(rule));
}

Ok(builder
.with_config(config)
.with_physical_optimizer_rule(Arc::new(rule))
.with_runtime_env(rt_builder.build_arc()?))
}

Expand All @@ -153,14 +160,16 @@ impl SessionBuilder for RunOpt {
}

impl RunOpt {
pub async fn run(self) -> Result<()> {
pub async fn run(mut self) -> Result<()> {
let (ctx, _guard) = start_localhost_context(1, self.clone()).await;
println!("Running benchmarks with the following options: {self:?}");
let query_range = match self.query {
Some(query_id) => query_id..=query_id,
None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
};

self.output_path
.get_or_insert_with(|| self.path.join("results.json"));
let mut benchmark_run = BenchmarkRun::new();

for query_id in query_range {
Expand All @@ -178,6 +187,7 @@ impl RunOpt {
}
}
}
benchmark_run.maybe_compare_with_previous(self.output_path.as_ref())?;
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
benchmark_run.maybe_print_failures();
Ok(())
Expand Down
108 changes: 101 additions & 7 deletions benchmarks/src/util/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use chrono::{DateTime, Utc};
use datafusion::common::utils::get_available_parallelism;
use datafusion::error::DataFusionError;
use datafusion::{error::Result, DATAFUSION_VERSION};
use serde::{Serialize, Serializer};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::Value;
use std::error::Error;
use std::{
collections::HashMap,
path::Path,
Expand All @@ -36,14 +39,29 @@ where
.as_secs(),
)
}
fn deserialize_start_time<'de, D>(des: D) -> Result<SystemTime, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(des)?;
Ok(SystemTime::UNIX_EPOCH + Duration::from_secs(secs))
}

fn serialize_elapsed<S>(elapsed: &Duration, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let ms = elapsed.as_secs_f64() * 1000.0;
ser.serialize_f64(ms)
}
#[derive(Debug, Serialize)]
fn deserialize_elapsed<'de, D>(des: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let ms = f64::deserialize(des)?;
Ok(Duration::from_secs_f64(ms / 1000.0))
}
#[derive(Debug, Serialize, Deserialize)]
pub struct RunContext {
/// Benchmark crate version
pub benchmark_version: String,
Expand All @@ -52,7 +70,10 @@ pub struct RunContext {
/// Number of CPU cores
pub num_cpus: usize,
/// Start time
#[serde(serialize_with = "serialize_start_time")]
#[serde(
serialize_with = "serialize_start_time",
deserialize_with = "deserialize_start_time"
)]
pub start_time: SystemTime,
/// CLI arguments
pub arguments: Vec<String>,
Expand All @@ -77,18 +98,24 @@ impl RunContext {
}

/// A single iteration of a benchmark query
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
struct QueryIter {
#[serde(serialize_with = "serialize_elapsed")]
#[serde(
serialize_with = "serialize_elapsed",
deserialize_with = "deserialize_elapsed"
)]
elapsed: Duration,
row_count: usize,
}
/// A single benchmark case
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct BenchQuery {
query: String,
iterations: Vec<QueryIter>,
#[serde(serialize_with = "serialize_start_time")]
#[serde(
serialize_with = "serialize_start_time",
deserialize_with = "deserialize_start_time"
)]
start_time: SystemTime,
success: bool,
}
Expand Down Expand Up @@ -181,4 +208,71 @@ impl BenchmarkRun {
};
Ok(())
}

pub fn maybe_compare_with_previous(&self, maybe_path: Option<impl AsRef<Path>>) -> Result<()> {
let Some(path) = maybe_path else {
return Ok(());
};
let Ok(prev) = std::fs::read(path) else {
return Ok(());
};

let mut prev_output: HashMap<&str, Value> =
serde_json::from_slice(&prev).map_err(external)?;

let prev_queries: Vec<BenchQuery> =
serde_json::from_value(prev_output.remove("queries").unwrap()).map_err(external)?;

let mut header_printed = false;
for query in self.queries.iter() {
let Some(prev_query) = prev_queries.iter().find(|v| v.query == query.query) else {
continue;
};
if prev_query.iterations.is_empty() {
continue;
}
if query.iterations.is_empty() {
println!("{}: Failed ❌", query.query);
continue;
}

let avg_prev = prev_query.avg();
let avg = query.avg();
let (f, tag, emoji) = if avg < avg_prev {
let f = avg_prev as f64 / avg as f64;
(f, "faster", if f > 1.2 { "✅" } else { "✔" })
} else {
let f = avg as f64 / avg_prev as f64;
(f, "slower", if f > 1.2 { "❌" } else { "✖" })
};
if !header_printed {
header_printed = true;
let datetime: DateTime<Utc> = prev_query.start_time.into();
println!(
"==== Comparison with the previous benchmark from {} ====",
datetime.format("%Y-%m-%d %H:%M:%S UTC")
);
}
println!(
"{:>8}: prev={avg_prev:>4} ms, new={avg:>4} ms, diff={f:.2} {tag} {emoji}",
query.query
);
}

Ok(())
}
}

impl BenchQuery {
fn avg(&self) -> u128 {
self.iterations
.iter()
.map(|v| v.elapsed.as_millis())
.sum::<u128>()
/ self.iterations.len() as u128
}
}

fn external(err: impl Error + Send + Sync + 'static) -> DataFusionError {
DataFusionError::External(Box::new(err))
}