diff --git a/Cargo.lock b/Cargo.lock index 1c712a02..84e8d020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1137,7 +1137,10 @@ dependencies = [ name = "datafusion-distributed-benchmarks" version = "0.1.0" dependencies = [ + "async-trait", "datafusion", + "datafusion-distributed", + "datafusion-proto", "env_logger", "log", "parquet", diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 9e7d2a9e..cd659503 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -6,6 +6,7 @@ default-run = "dfbench" [dependencies] datafusion = { workspace = true } +datafusion-distributed = { path = "..", features = ["integration"] } tokio = { version = "1.46.1", features = ["full"] } parquet = { version = "55.2.0" } structopt = { version = "0.3.26" } @@ -13,7 +14,14 @@ log = "0.4.27" serde = "1.0.219" serde_json = "1.0.141" env_logger = "0.11.8" +async-trait = "0.1.88" +datafusion-proto = { version = "49.0.0", optional = true } [[bin]] name = "dfbench" -path = "src/bin/dfbench.rs" \ No newline at end of file +path = "src/bin/dfbench.rs" + +[features] +ci = [ + "datafusion-proto" +] \ No newline at end of file diff --git a/benchmarks/src/tpch/run.rs b/benchmarks/src/tpch/run.rs index 9a6392c6..b0ac2560 100644 --- a/benchmarks/src/tpch/run.rs +++ b/benchmarks/src/tpch/run.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::path::PathBuf; -use std::sync::Arc; - use super::{ get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_QUERY_END_ID, TPCH_QUERY_START_ID, TPCH_TABLES, }; +use async_trait::async_trait; +use std::path::PathBuf; +use std::sync::Arc; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty::{self, pretty_format_batches}; @@ -35,12 +35,15 @@ use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::error::Result; +use datafusion::error::{DataFusionError, Result}; +use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{collect, displayable}; use datafusion::prelude::*; use crate::util::{print_memory_stats, BenchmarkRun, CommonOpt, QueryResult}; +use datafusion_distributed::test_utils::localhost::start_localhost_context; +use datafusion_distributed::{DistributedPhysicalOptimizerRule, SessionBuilder}; use log::info; use structopt::StructOpt; @@ -96,10 +99,48 @@ pub struct RunOpt { /// The tables should have been created with the `--sort` option for this to have any effect. #[structopt(short = "t", long = "sorted")] sorted: bool, + + /// Mark the first column of each table as sorted in ascending order. + /// The tables should have been created with the `--sort` option for this to have any effect. + #[structopt(long = "ppt")] + partitions_per_task: Option, +} + +#[async_trait] +impl SessionBuilder for RunOpt { + fn session_state_builder( + &self, + builder: SessionStateBuilder, + ) -> Result { + let mut config = self + .common + .config()? + .with_collect_statistics(!self.disable_statistics); + config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; + let rt_builder = self.common.runtime_env_builder()?; + + let mut rule = DistributedPhysicalOptimizerRule::new(); + if let Some(ppt) = self.partitions_per_task { + rule = rule.with_maximum_partitions_per_task(ppt); + } + Ok(builder + .with_config(config) + .with_physical_optimizer_rule(Arc::new(rule)) + .with_runtime_env(rt_builder.build_arc()?)) + } + + async fn session_context( + &self, + ctx: SessionContext, + ) -> std::result::Result { + self.register_tables(&ctx).await?; + Ok(ctx) + } } impl RunOpt { pub async fn run(self) -> Result<()> { + let (ctx, _guard) = start_localhost_context([50051], self.clone()).await; println!("Running benchmarks with the following options: {self:?}"); let query_range = match self.query { Some(query_id) => query_id..=query_id, @@ -107,15 +148,6 @@ impl RunOpt { }; let mut benchmark_run = BenchmarkRun::new(); - let mut config = self - .common - .config()? - .with_collect_statistics(!self.disable_statistics); - config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join; - let rt_builder = self.common.runtime_env_builder()?; - let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); - // register tables - self.register_tables(&ctx).await?; for query_id in query_range { benchmark_run.start_new_case(&format!("Query {query_id}")); @@ -368,6 +400,7 @@ mod tests { disable_statistics: false, prefer_hash_join: true, sorted: false, + partitions_per_task: None, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?; @@ -405,6 +438,7 @@ mod tests { disable_statistics: false, prefer_hash_join: true, sorted: false, + partitions_per_task: None, }; opt.register_tables(&ctx).await?; let queries = get_query_sql(query)?;