|
| 1 | +// SPDX-License-Identifier: Apache-2.0 |
| 2 | +// SPDX-FileCopyrightText: Copyright the Vortex contributors |
| 3 | + |
| 4 | +//! Benchmark driver that handles CLI logic and orchestrates benchmark execution |
| 5 | +
|
| 6 | +use std::path::PathBuf; |
| 7 | + |
| 8 | +use anyhow::Result; |
| 9 | +use indicatif::ProgressBar; |
| 10 | +use itertools::Itertools; |
| 11 | +use log::warn; |
| 12 | +use vortex_datafusion::metrics::VortexMetricsFinder; |
| 13 | + |
| 14 | +use crate::benchmark_trait::Benchmark; |
| 15 | +use crate::display::DisplayFormat; |
| 16 | +use crate::engines::{EngineCtx, benchmark_datafusion_query, benchmark_duckdb_query}; |
| 17 | +use crate::measurements::QueryMeasurement; |
| 18 | +use crate::metrics::{MetricsSetExt, export_plan_spans}; |
| 19 | +use crate::query_bench::{filter_queries, print_results, setup_logging_and_tracing}; |
| 20 | +use crate::utils::{new_tokio_runtime, url_scheme_to_storage}; |
| 21 | +use crate::{Engine, Format, Target, df, vortex_panic}; |
| 22 | + |
| 23 | +/// Configuration for the benchmark driver |
| 24 | +pub struct DriverConfig { |
| 25 | + pub targets: Vec<Target>, |
| 26 | + pub iterations: usize, |
| 27 | + pub threads: Option<usize>, |
| 28 | + pub verbose: bool, |
| 29 | + pub display_format: DisplayFormat, |
| 30 | + pub disable_datafusion_cache: bool, |
| 31 | + pub queries: Option<Vec<usize>>, |
| 32 | + pub exclude_queries: Option<Vec<usize>>, |
| 33 | + pub output_path: Option<PathBuf>, |
| 34 | + pub emit_plan: bool, |
| 35 | + pub export_spans: bool, |
| 36 | + pub show_metrics: bool, |
| 37 | + pub hide_progress_bar: bool, |
| 38 | +} |
| 39 | + |
| 40 | +/// Run a benchmark using the provided implementation and configuration |
| 41 | +pub fn run_benchmark<B: Benchmark>(benchmark: B, config: DriverConfig) -> Result<()> { |
| 42 | + let _trace_guard = setup_logging_and_tracing( |
| 43 | + config.verbose, |
| 44 | + &format!("{}.trace.json", benchmark.dataset_name()), |
| 45 | + )?; |
| 46 | + |
| 47 | + // Validate arguments |
| 48 | + validate_args(&config)?; |
| 49 | + |
| 50 | + // Generate data for each target (idempotent) |
| 51 | + for target in &config.targets { |
| 52 | + benchmark.generate_data(target)?; |
| 53 | + } |
| 54 | + |
| 55 | + let filtered_queries = filter_queries( |
| 56 | + benchmark.queries()?, |
| 57 | + config.queries.as_ref(), |
| 58 | + config.exclude_queries.as_ref(), |
| 59 | + ); |
| 60 | + |
| 61 | + let progress_bar = if config.hide_progress_bar { |
| 62 | + ProgressBar::hidden() |
| 63 | + } else { |
| 64 | + ProgressBar::new((filtered_queries.len() * config.targets.len()) as u64) |
| 65 | + }; |
| 66 | + |
| 67 | + let mut query_measurements = Vec::new(); |
| 68 | + |
| 69 | + for target in config.targets.iter() { |
| 70 | + let tokio_runtime = new_tokio_runtime(config.threads); |
| 71 | + |
| 72 | + let mut engine_ctx = benchmark.setup_engine_context( |
| 73 | + target, |
| 74 | + config.disable_datafusion_cache, |
| 75 | + config.emit_plan, |
| 76 | + )?; |
| 77 | + |
| 78 | + tokio_runtime.block_on(benchmark.register_tables(&engine_ctx, target.format()))?; |
| 79 | + |
| 80 | + let bench_measurements = execute_queries( |
| 81 | + &filtered_queries, |
| 82 | + config.iterations, |
| 83 | + &tokio_runtime, |
| 84 | + target.format(), |
| 85 | + &progress_bar, |
| 86 | + &mut engine_ctx, |
| 87 | + &benchmark, |
| 88 | + )?; |
| 89 | + |
| 90 | + tokio_runtime.block_on(export_metrics_if_requested( |
| 91 | + &engine_ctx, |
| 92 | + config.export_spans, |
| 93 | + ))?; |
| 94 | + |
| 95 | + if config.show_metrics { |
| 96 | + print_metrics(&engine_ctx); |
| 97 | + } |
| 98 | + |
| 99 | + query_measurements.extend(bench_measurements); |
| 100 | + } |
| 101 | + |
| 102 | + print_results( |
| 103 | + &config.display_format, |
| 104 | + query_measurements, |
| 105 | + &config.targets, |
| 106 | + &config.output_path, |
| 107 | + ) |
| 108 | +} |
| 109 | + |
| 110 | +fn validate_args(config: &DriverConfig) -> Result<()> { |
| 111 | + let engines = config |
| 112 | + .targets |
| 113 | + .iter() |
| 114 | + .map(|t| t.engine()) |
| 115 | + .unique() |
| 116 | + .collect_vec(); |
| 117 | + |
| 118 | + if (config.emit_plan || config.export_spans || config.show_metrics || config.threads.is_some()) |
| 119 | + && !engines.contains(&Engine::DataFusion) |
| 120 | + { |
| 121 | + vortex_panic!( |
| 122 | + "--emit-plan, --export-spans, --show-metrics, --threads are only valid if DataFusion is used" |
| 123 | + ); |
| 124 | + } |
| 125 | + Ok(()) |
| 126 | +} |
| 127 | + |
| 128 | +fn execute_queries<B: Benchmark>( |
| 129 | + queries: &[(usize, String)], |
| 130 | + iterations: usize, |
| 131 | + runtime: &tokio::runtime::Runtime, |
| 132 | + format: Format, |
| 133 | + progress_bar: &ProgressBar, |
| 134 | + engine_ctx: &mut EngineCtx, |
| 135 | + benchmark: &B, |
| 136 | +) -> Result<Vec<QueryMeasurement>> { |
| 137 | + let mut query_measurements = Vec::new(); |
| 138 | + let expected_row_counts = benchmark.expected_row_counts(); |
| 139 | + |
| 140 | + for &(query_idx, ref query_string) in queries.iter() { |
| 141 | + match engine_ctx { |
| 142 | + EngineCtx::DataFusion(ctx) => { |
| 143 | + let (runs, (row_count, execution_plan)) = runtime.block_on(async { |
| 144 | + benchmark_datafusion_query(iterations, || async { |
| 145 | + let (batches, plan) = df::execute_query(&ctx.session, query_string) |
| 146 | + .await |
| 147 | + .unwrap_or_else(|err| { |
| 148 | + vortex_panic!("query: {query_idx} failed with: {err}") |
| 149 | + }); |
| 150 | + let row_count: usize = batches.iter().map(|batch| batch.num_rows()).sum(); |
| 151 | + (row_count, plan) |
| 152 | + }) |
| 153 | + .await |
| 154 | + }); |
| 155 | + |
| 156 | + // Validate row count if expected counts are provided |
| 157 | + if let Some(expected_counts) = expected_row_counts { |
| 158 | + if query_idx < expected_counts.len() { |
| 159 | + assert_eq!( |
| 160 | + row_count, expected_counts[query_idx], |
| 161 | + "Row count mismatch for query {query_idx} - datafusion:{format}", |
| 162 | + ); |
| 163 | + } |
| 164 | + } |
| 165 | + |
| 166 | + ctx.execution_plans |
| 167 | + .push((query_idx, execution_plan.clone())); |
| 168 | + |
| 169 | + if ctx.emit_plan { |
| 170 | + df::write_execution_plan( |
| 171 | + query_idx, |
| 172 | + format, |
| 173 | + benchmark.dataset_name(), |
| 174 | + execution_plan.as_ref(), |
| 175 | + ); |
| 176 | + } |
| 177 | + |
| 178 | + ctx.metrics.push(( |
| 179 | + query_idx, |
| 180 | + format, |
| 181 | + VortexMetricsFinder::find_all(execution_plan.as_ref()), |
| 182 | + )); |
| 183 | + |
| 184 | + query_measurements.push(QueryMeasurement { |
| 185 | + query_idx, |
| 186 | + target: Target::new(Engine::DataFusion, format), |
| 187 | + benchmark_dataset: benchmark.dataset(), |
| 188 | + storage: url_scheme_to_storage(benchmark.data_url())?, |
| 189 | + runs, |
| 190 | + }); |
| 191 | + } |
| 192 | + EngineCtx::DuckDB(ctx) => { |
| 193 | + let (runs, row_count) = |
| 194 | + benchmark_duckdb_query(query_idx, query_string, iterations, ctx); |
| 195 | + |
| 196 | + // Validate row count if expected counts are provided |
| 197 | + if let Some(expected_counts) = expected_row_counts { |
| 198 | + if query_idx < expected_counts.len() { |
| 199 | + assert_eq!( |
| 200 | + row_count, expected_counts[query_idx], |
| 201 | + "Row count mismatch for query {query_idx} - duckdb:{format}", |
| 202 | + ); |
| 203 | + } |
| 204 | + } |
| 205 | + |
| 206 | + query_measurements.push(QueryMeasurement { |
| 207 | + query_idx, |
| 208 | + target: Target::new(Engine::DuckDB, format), |
| 209 | + benchmark_dataset: benchmark.dataset(), |
| 210 | + storage: url_scheme_to_storage(benchmark.data_url())?, |
| 211 | + runs, |
| 212 | + }); |
| 213 | + } |
| 214 | + } |
| 215 | + |
| 216 | + progress_bar.inc(1); |
| 217 | + } |
| 218 | + |
| 219 | + Ok(query_measurements) |
| 220 | +} |
| 221 | + |
| 222 | +async fn export_metrics_if_requested(engine_ctx: &EngineCtx, export_spans: bool) -> Result<()> { |
| 223 | + if let EngineCtx::DataFusion(ctx) = engine_ctx { |
| 224 | + if export_spans { |
| 225 | + if let Err(err) = export_plan_spans(Format::OnDiskVortex, &ctx.execution_plans).await { |
| 226 | + warn!("failed to export spans {err}"); |
| 227 | + } |
| 228 | + } |
| 229 | + } |
| 230 | + Ok(()) |
| 231 | +} |
| 232 | + |
| 233 | +fn print_metrics(engine_ctx: &EngineCtx) { |
| 234 | + if let EngineCtx::DataFusion(ctx) = engine_ctx { |
| 235 | + for (query_idx, file_format, metric_sets) in &ctx.metrics { |
| 236 | + eprintln!("metrics for query={query_idx}, {file_format}:"); |
| 237 | + for (scan_idx, metrics_set) in metric_sets.iter().enumerate() { |
| 238 | + eprintln!("scan[{scan_idx}]:"); |
| 239 | + for metric in metrics_set |
| 240 | + .clone() |
| 241 | + .timestamps_removed() |
| 242 | + .aggregate() |
| 243 | + .sorted_for_display() |
| 244 | + .iter() |
| 245 | + { |
| 246 | + eprintln!("{metric}"); |
| 247 | + } |
| 248 | + } |
| 249 | + } |
| 250 | + } |
| 251 | +} |
0 commit comments