diff --git a/benchmarks/README.md b/benchmarks/README.md index 19da292..e411e90 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -1,6 +1,6 @@ # Distributed DataFusion Benchmarks -### Generating tpch data +### Generating TPCH data Generate TPCH data into the `data/` dir @@ -8,10 +8,74 @@ Generate TPCH data into the `data/` dir ./gen-tpch.sh ``` -### Running tpch benchmarks +### Running TPCH benchmarks in single-node mode -After generating the data with the command above: +After generating the data with the command above, the benchmarks can be run with ```shell -cargo run -p datafusion-distributed-benchmarks --release -- tpch --path benchmarks/data/tpch_sf1 +cargo run -p datafusion-distributed-benchmarks --release -- tpch +``` + +For preloading the TPCH data in-memory, the `-m` flag can be passed + +```shell +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m +``` + +For running the benchmarks with using just a specific amount of physical threads: + +```shell +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 3 +``` + +### Running TPCH benchmarks in distributed mode + +Running the benchmarks in distributed mode implies: + +- running 1 or more workers in separate terminals +- running the benchmarks in an additional terminal + +The workers can be spawned by passing the `--spawn ` flag, for example, for spawning 3 workers: + +```shell +cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8000 +``` + +```shell +cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8001 +``` + +```shell +cargo run -p datafusion-distributed-benchmarks --release -- tpch --spawn 8002 +``` + +With the three workers running in separate terminals, the TPCH benchmarks can be run in distributed mode with: + +```shell +cargo run -p datafusion-distributed-benchmarks --release -- tpch --workers 8000,8001,8002 +``` + +A good way of measuring the impact of distribution is to limit the physical threads each worker can use. For example, +it's expected that running 8 workers with 2 physical threads each one (8 * 2 = 16 total) is faster than running in +single-node with just 2 threads (1 * 3 = 2 total). + +```shell +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8000 & +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8001 & +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8002 & +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8003 & +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8004 & +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8005 & +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8006 & +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --spawn 8007 & +``` + +```shell +cargo run -p datafusion-distributed-benchmarks --release -- tpch -m --threads 2 --workers 8000,8001,8002,8003,8004,8005,8006,8007 +``` + +The `run.sh` script already does this for you in a more ergonomic way: + +```shell +WORKERS=8 run.sh --threads 2 -m ``` \ No newline at end of file diff --git a/benchmarks/run.sh b/benchmarks/run.sh new file mode 100755 index 0000000..7cb0151 --- /dev/null +++ b/benchmarks/run.sh @@ -0,0 +1,49 @@ +#!/usr/bin/env bash + +set -e + +WORKERS=${WORKERS:-8} + +# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +if [ "$WORKERS" == "0" ]; then + cargo run -p datafusion-distributed-benchmarks --release -- tpch "$@" + exit +fi + +cleanup() { + echo "Cleaning up processes..." + for i in $(seq 1 $((WORKERS))); do + kill "%$i" + done +} + +wait_for_port() { + local port=$1 + local timeout=30 + local elapsed=0 + while ! nc -z localhost "$port" 2>/dev/null; do + if [ "$elapsed" -ge "$timeout" ]; then + echo "Timeout waiting for port $port" + return 1 + fi + sleep 0.1 + elapsed=$((elapsed + 1)) + done + echo "Port $port is ready" +} + +cargo build -p datafusion-distributed-benchmarks --release + +trap cleanup EXIT INT TERM +for i in $(seq 0 $((WORKERS-1))); do + "$SCRIPT_DIR"/../target/release/dfbench tpch --spawn $((8000+i)) "$@" & +done + +echo "Waiting for worker ports to be ready..." +for i in $(seq 0 $((WORKERS-1))); do + wait_for_port $((8000+i)) +done + +"$SCRIPT_DIR"/../target/release/dfbench tpch --workers $(seq -s, 8000 $((8000+WORKERS-1))) "$@" diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index be63e84..d626f79 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -30,12 +30,14 @@ enum Options { } // Main benchmark runner entrypoint -#[tokio::main] -pub async fn main() -> Result<()> { +pub fn main() -> Result<()> { env_logger::init(); match Options::from_args() { - Options::Tpch(opt) => Box::pin(opt.run()).await, - Options::TpchConvert(opt) => opt.run().await, + Options::Tpch(opt) => opt.run(), + Options::TpchConvert(opt) => { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async { opt.run().await }) + } } } diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 310cddf..854ca0d 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -19,15 +19,16 @@ use super::{ get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, }; +use crate::util::{ + BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryResult, + WarmingUpMarker, +}; use async_trait::async_trait; -use std::path::PathBuf; -use std::sync::Arc; - use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty::{self, pretty_format_batches}; use datafusion::common::instant::Instant; use datafusion::common::utils::get_available_parallelism; -use datafusion::common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION}; +use datafusion::common::{exec_err, DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION}; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; @@ -40,21 +41,19 @@ use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; - -use crate::util::{ - BenchmarkRun, CommonOpt, InMemoryCacheExecCodec, InMemoryDataSourceRule, QueryResult, - WarmingUpMarker, +use datafusion_distributed::test_utils::localhost::{ + spawn_flight_service, LocalHostChannelResolver, }; -use datafusion_distributed::test_utils::localhost::start_localhost_context; use datafusion_distributed::{ DistributedExt, DistributedPhysicalOptimizerRule, DistributedSessionBuilder, DistributedSessionBuilderContext, }; use log::info; +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; use structopt::StructOpt; - -// hack to avoid `default_value is meaningless for bool` errors -type BoolDefaultTrue = bool; +use tokio::net::TcpListener; /// Run the tpch benchmark. /// @@ -77,8 +76,8 @@ pub struct RunOpt { common: CommonOpt, /// Path to data files - #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] - path: PathBuf, + #[structopt(parse(from_os_str), short = "p", long = "path")] + path: Option, /// File format: `csv` or `parquet` #[structopt(short = "f", long = "format", default_value = "parquet")] @@ -96,23 +95,26 @@ pub struct RunOpt { #[structopt(short = "S", long = "disable-statistics")] disable_statistics: bool, - /// If true then hash join used, if false then sort merge join - /// True by default. - #[structopt(short = "j", long = "prefer_hash_join", default_value = "true")] - prefer_hash_join: BoolDefaultTrue, - /// Mark the first column of each table as sorted in ascending order. /// The tables should have been created with the `--sort` option for this to have any effect. #[structopt(short = "t", long = "sorted")] sorted: bool, - /// Run in distributed mode. - #[structopt(short = "D", long = "distributed")] - distributed: bool, - /// Number of partitions per task. #[structopt(long = "ppt")] partitions_per_task: Option, + + /// Spawns a worker in the specified port. + #[structopt(long)] + spawn: Option, + + /// The ports of all the workers involved in the query. + #[structopt(long, use_delimiter = true)] + workers: Vec, + + /// Number of physical threads per worker. + #[structopt(long)] + threads: Option, } #[async_trait] @@ -128,6 +130,7 @@ impl DistributedSessionBuilder for RunOpt { .config()? .with_collect_statistics(!self.disable_statistics) .with_distributed_user_codec(InMemoryCacheExecCodec) + .with_distributed_channel_resolver(LocalHostChannelResolver::new(self.workers.clone())) .with_distributed_option_extension_from_headers::(&ctx.headers)? .with_target_partitions(self.partitions()); @@ -136,7 +139,7 @@ impl DistributedSessionBuilder for RunOpt { if self.mem_table { builder = builder.with_physical_optimizer_rule(Arc::new(InMemoryDataSourceRule)); } - if self.distributed { + if !self.workers.is_empty() { let mut rule = DistributedPhysicalOptimizerRule::new(); if let Some(partitions_per_task) = self.partitions_per_task { rule = rule.with_maximum_partitions_per_task(partitions_per_task) @@ -144,20 +147,37 @@ impl DistributedSessionBuilder for RunOpt { builder = builder.with_physical_optimizer_rule(Arc::new(rule)); } - let state = builder + Ok(builder .with_config(config) .with_runtime_env(rt_builder.build_arc()?) - .build(); - - let ctx = SessionContext::from(state); - self.register_tables(&ctx).await?; - Ok(ctx.state()) + .build()) } } impl RunOpt { - pub async fn run(mut self) -> Result<()> { - let (ctx, _guard) = start_localhost_context(1, self.clone()).await; + pub fn run(self) -> Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(self.threads.unwrap_or(get_available_parallelism())) + .enable_all() + .build()?; + + if let Some(port) = self.spawn { + rt.block_on(async move { + let listener = TcpListener::bind(format!("127.0.0.1:{port}")).await?; + println!("Listening on {}...", listener.local_addr().unwrap()); + spawn_flight_service(self, listener).await + })?; + } else { + rt.block_on(self.run_local())?; + } + Ok(()) + } + + async fn run_local(mut self) -> Result<()> { + let state = self.build_session_state(Default::default()).await?; + let ctx = SessionContext::new_with_state(state); + self.register_tables(&ctx).await?; + println!("Running benchmarks with the following options: {self:?}"); let query_range = match self.query { Some(query_id) => query_id..=query_id, @@ -165,9 +185,24 @@ impl RunOpt { }; self.output_path - .get_or_insert_with(|| self.path.join("results.json")); + .get_or_insert(self.get_path()?.join("results.json")); let mut benchmark_run = BenchmarkRun::new(); + // Warmup the cache for the in-memory mode. + if self.mem_table { + for query_id in query_range.clone() { + // put the WarmingUpMarker in the context, otherwise, queries will fail as the + // InMemoryCacheExec node will think they should already be warmed up. + let ctx = ctx + .clone() + .with_distributed_option_extension(WarmingUpMarker::warming_up())?; + for query in get_query_sql(query_id)? { + self.execute_query(&ctx, &query).await?; + } + println!("Query {query_id} data loaded in memory"); + } + } + for query_id in query_range { benchmark_run.start_new_case(&format!("Query {query_id}")); let query_run = self.benchmark_query(query_id, &ctx).await; @@ -179,7 +214,7 @@ impl RunOpt { } Err(e) => { benchmark_run.mark_failed(); - eprintln!("Query {query_id} failed: {e}"); + eprintln!("Query {query_id} failed: {e:?}"); } } } @@ -200,19 +235,6 @@ impl RunOpt { let sql = &get_query_sql(query_id)?; - // Warmup the cache for the in-memory mode. - if self.mem_table { - // put the WarmingUpMarker in the context, otherwise, queries will fail as the - // InMemoryCacheExec node will think they should already be warmed up. - let ctx = ctx - .clone() - .with_distributed_option_extension(WarmingUpMarker::warming_up())?; - for query in sql.iter() { - self.execute_query(&ctx, query).await?; - } - println!("Query {query_id} data loaded in memory"); - } - for i in 0..self.iterations() { let start = Instant::now(); let mut result = vec![]; @@ -289,8 +311,25 @@ impl RunOpt { Ok(result) } + fn get_path(&self) -> Result { + if let Some(path) = &self.path { + return Ok(path.clone()); + } + let crate_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let data_path = crate_path.join("data"); + let entries = fs::read_dir(&data_path)?.collect::, _>>()?; + if entries.is_empty() { + exec_err!("No TPCH dataset present in '{data_path:?}'. Generate one with ./benchmarks/gen-tpch.sh") + } else if entries.len() == 1 { + Ok(entries[0].path()) + } else { + exec_err!("Multiple TPCH datasets present in '{data_path:?}'. One must be selected with --path") + } + } + async fn get_table(&self, ctx: &SessionContext, table: &str) -> Result> { - let path = self.path.to_str().unwrap(); + let path = self.get_path()?; + let path = path.to_str().unwrap(); let table_format = self.file_format.as_str(); let target_partitions = self.partitions(); @@ -358,177 +397,12 @@ impl RunOpt { } fn partitions(&self) -> usize { - self.common - .partitions - .unwrap_or_else(get_available_parallelism) - } -} - -#[cfg(test)] -// Only run with "ci" mode when we have the data -#[cfg(feature = "ci")] -mod tests { - use std::path::Path; - - use super::*; - - use datafusion::common::exec_err; - use datafusion::error::Result; - use datafusion_proto::bytes::{ - logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes, - physical_plan_to_bytes, - }; - - fn get_tpch_data_path() -> Result { - let path = std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string()); - if !Path::new(&path).exists() { - return exec_err!( - "Benchmark data not found (set TPCH_DATA env var to override): {}", - path - ); - } - Ok(path) - } - - async fn round_trip_logical_plan(query: usize) -> Result<()> { - let ctx = SessionContext::default(); - let path = get_tpch_data_path()?; - let common = CommonOpt { - iterations: 1, - partitions: Some(2), - batch_size: Some(8192), - mem_pool_type: "fair".to_string(), - memory_limit: None, - sort_spill_reservation_bytes: None, - debug: false, - }; - let opt = RunOpt { - query: Some(query), - common, - path: PathBuf::from(path.to_string()), - file_format: "tbl".to_string(), - mem_table: false, - output_path: None, - disable_statistics: false, - prefer_hash_join: true, - sorted: false, - partitions_per_task: None, - }; - opt.register_tables(&ctx).await?; - let queries = get_query_sql(query)?; - for query in queries { - let plan = ctx.sql(&query).await?; - let plan = plan.into_optimized_plan()?; - let bytes = logical_plan_to_bytes(&plan)?; - let plan2 = logical_plan_from_bytes(&bytes, &ctx)?; - let plan_formatted = format!("{}", plan.display_indent()); - let plan2_formatted = format!("{}", plan2.display_indent()); - assert_eq!(plan_formatted, plan2_formatted); + if let Some(partitions) = self.common.partitions { + return partitions; } - Ok(()) - } - - async fn round_trip_physical_plan(query: usize) -> Result<()> { - let ctx = SessionContext::default(); - let path = get_tpch_data_path()?; - let common = CommonOpt { - iterations: 1, - partitions: Some(2), - batch_size: Some(8192), - mem_pool_type: "fair".to_string(), - memory_limit: None, - sort_spill_reservation_bytes: None, - debug: false, - }; - let opt = RunOpt { - query: Some(query), - common, - path: PathBuf::from(path.to_string()), - file_format: "tbl".to_string(), - mem_table: false, - output_path: None, - disable_statistics: false, - prefer_hash_join: true, - sorted: false, - partitions_per_task: None, - }; - opt.register_tables(&ctx).await?; - let queries = get_query_sql(query)?; - for query in queries { - let plan = ctx.sql(&query).await?; - let plan = plan.create_physical_plan().await?; - let bytes = physical_plan_to_bytes(plan.clone())?; - let plan2 = physical_plan_from_bytes(&bytes, &ctx)?; - let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false)); - let plan2_formatted = format!("{}", displayable(plan2.as_ref()).indent(false)); - assert_eq!(plan_formatted, plan2_formatted); + if let Some(threads) = self.threads { + return threads; } - Ok(()) + get_available_parallelism() } - - macro_rules! test_round_trip_logical { - ($tn:ident, $query:expr) => { - #[tokio::test] - async fn $tn() -> Result<()> { - round_trip_logical_plan($query).await - } - }; - } - - macro_rules! test_round_trip_physical { - ($tn:ident, $query:expr) => { - #[tokio::test] - async fn $tn() -> Result<()> { - round_trip_physical_plan($query).await - } - }; - } - - // logical plan tests - test_round_trip_logical!(round_trip_logical_plan_q1, 1); - test_round_trip_logical!(round_trip_logical_plan_q2, 2); - test_round_trip_logical!(round_trip_logical_plan_q3, 3); - test_round_trip_logical!(round_trip_logical_plan_q4, 4); - test_round_trip_logical!(round_trip_logical_plan_q5, 5); - test_round_trip_logical!(round_trip_logical_plan_q6, 6); - test_round_trip_logical!(round_trip_logical_plan_q7, 7); - test_round_trip_logical!(round_trip_logical_plan_q8, 8); - test_round_trip_logical!(round_trip_logical_plan_q9, 9); - test_round_trip_logical!(round_trip_logical_plan_q10, 10); - test_round_trip_logical!(round_trip_logical_plan_q11, 11); - test_round_trip_logical!(round_trip_logical_plan_q12, 12); - test_round_trip_logical!(round_trip_logical_plan_q13, 13); - test_round_trip_logical!(round_trip_logical_plan_q14, 14); - test_round_trip_logical!(round_trip_logical_plan_q15, 15); - test_round_trip_logical!(round_trip_logical_plan_q16, 16); - test_round_trip_logical!(round_trip_logical_plan_q17, 17); - test_round_trip_logical!(round_trip_logical_plan_q18, 18); - test_round_trip_logical!(round_trip_logical_plan_q19, 19); - test_round_trip_logical!(round_trip_logical_plan_q20, 20); - test_round_trip_logical!(round_trip_logical_plan_q21, 21); - test_round_trip_logical!(round_trip_logical_plan_q22, 22); - - // physical plan tests - test_round_trip_physical!(round_trip_physical_plan_q1, 1); - test_round_trip_physical!(round_trip_physical_plan_q2, 2); - test_round_trip_physical!(round_trip_physical_plan_q3, 3); - test_round_trip_physical!(round_trip_physical_plan_q4, 4); - test_round_trip_physical!(round_trip_physical_plan_q5, 5); - test_round_trip_physical!(round_trip_physical_plan_q6, 6); - test_round_trip_physical!(round_trip_physical_plan_q7, 7); - test_round_trip_physical!(round_trip_physical_plan_q8, 8); - test_round_trip_physical!(round_trip_physical_plan_q9, 9); - test_round_trip_physical!(round_trip_physical_plan_q10, 10); - test_round_trip_physical!(round_trip_physical_plan_q11, 11); - test_round_trip_physical!(round_trip_physical_plan_q12, 12); - test_round_trip_physical!(round_trip_physical_plan_q13, 13); - test_round_trip_physical!(round_trip_physical_plan_q14, 14); - test_round_trip_physical!(round_trip_physical_plan_q15, 15); - test_round_trip_physical!(round_trip_physical_plan_q16, 16); - test_round_trip_physical!(round_trip_physical_plan_q17, 17); - test_round_trip_physical!(round_trip_physical_plan_q18, 18); - test_round_trip_physical!(round_trip_physical_plan_q19, 19); - test_round_trip_physical!(round_trip_physical_plan_q20, 20); - test_round_trip_physical!(round_trip_physical_plan_q21, 21); - test_round_trip_physical!(round_trip_physical_plan_q22, 22); } diff --git a/benchmarks/src/util/memory.rs b/benchmarks/src/util/memory.rs index f268dfb..8d7edf8 100644 --- a/benchmarks/src/util/memory.rs +++ b/benchmarks/src/util/memory.rs @@ -24,23 +24,13 @@ static CACHE: LazyLock> = LazyLock::new(DashMap::default); /// Caches all the record batches in a global [CACHE] on the first run, and serves /// them from the cache in any subsequent run. -/// The order of events looks like this: -/// 1. A first query is run. -/// 2. Data is not cached, so it is gathered from the underlying node. -/// 3. The cache is populated with the recently gathered data. -/// 4. A second query is run. -/// 5. The cache is hit, and data is returned from there. -/// -/// The cache key includes the result of "explaining" the underlying node, so different -/// nodes applying different filters under the same parquet files will be cached -/// independently. #[derive(Debug, Clone)] pub struct InMemoryCacheExec { inner: Arc, } extensions_options! { - /// Marker used by the [InMemoryCacheExec] that determines whether it's fine + /// Marker used by the [InMemoryCacheExec] that determines wether its fine /// to load data from disk because we are warming up, or not. /// /// If this marker is not present during InMemoryCacheExec::execute(), and