11use std:: cell:: OnceCell ;
22use std:: path:: { Path , PathBuf } ;
3+ use std:: sync:: Arc ;
34use std:: time:: { Duration , Instant } ;
45
56use bench_vortex:: clickbench:: { Flavor , clickbench_queries} ;
@@ -13,13 +14,14 @@ use bench_vortex::{
1314 BenchmarkDataset , Engine , Format , IdempotentPath , Target , ddb, default_env_filter, df,
1415} ;
1516use clap:: { Parser , value_parser} ;
16- use datafusion:: physical_plan:: execution_plan;
1717use datafusion:: prelude;
18+ use datafusion_physical_plan:: ExecutionPlan ;
1819use indicatif:: ProgressBar ;
1920use itertools:: Itertools ;
2021use log:: warn;
2122use prelude:: SessionContext ;
2223use tempfile:: { TempDir , tempdir} ;
24+ use tokio:: runtime:: Runtime ;
2325use tracing:: { debug, info_span} ;
2426use tracing_futures:: Instrument ;
2527use url:: Url ;
@@ -77,7 +79,7 @@ struct Args {
7779}
7880
7981struct DataFusionCtx {
80- execution_plans : Vec < ( usize , std :: sync :: Arc < dyn execution_plan :: ExecutionPlan > ) > ,
82+ execution_plans : Vec < ( usize , Arc < dyn ExecutionPlan > ) > ,
8183 metrics : Vec < (
8284 usize ,
8385 Format ,
@@ -207,11 +209,11 @@ fn main() -> anyhow::Result<()> {
207209
208210 let mut query_measurements = Vec :: new ( ) ;
209211
210- let resolved_path = if args. targets . iter ( ) . any ( |t| t . engine ( ) == Engine :: DuckDB ) {
211- Some ( ddb :: build_and_get_executable_path ( & args . duckdb_path ) )
212- } else {
213- None
214- } ;
212+ let resolved_path = args
213+ . targets
214+ . iter ( )
215+ . any ( |t| t . engine ( ) == Engine :: DuckDB )
216+ . then ( || ddb :: build_and_get_executable_path ( & args . duckdb_path ) ) ;
215217
216218 for target in args. targets . iter ( ) {
217219 let engine = target. engine ( ) ;
@@ -224,14 +226,13 @@ fn main() -> anyhow::Result<()> {
224226 args. disable_datafusion_cache ,
225227 ) ;
226228 // Register object store to the session.
227- df:: make_object_store ( & session_ctx, & base_url)
228- . expect ( "Failed to make object store" ) ;
229+ df:: make_object_store ( & session_ctx, & base_url) ?;
229230
230231 EngineCtx :: new_with_datafusion ( session_ctx, args. emit_plan )
231232 }
232- Engine :: DuckDB => {
233- EngineCtx :: new_with_duckdb ( resolved_path. as_ref ( ) . expect ( "path resolved above" ) )
234- }
233+ Engine :: DuckDB => EngineCtx :: new_with_duckdb (
234+ resolved_path. as_ref ( ) . vortex_expect ( "path resolved above" ) ,
235+ ) ,
235236 _ => unreachable ! ( "engine not supported" ) ,
236237 } ;
237238
@@ -243,8 +244,7 @@ fn main() -> anyhow::Result<()> {
243244 args. single_file ,
244245 & engine_ctx,
245246 & tokio_runtime,
246- )
247- . expect ( "Failed to initialize data source" ) ;
247+ ) ?;
248248
249249 let bench_measurements = execute_queries (
250250 & queries,
@@ -272,20 +272,19 @@ fn main() -> anyhow::Result<()> {
272272 query_measurements. extend ( bench_measurements) ;
273273 }
274274
275- print_results ( & args. display_format , query_measurements, & args. targets ) ;
276-
277- Ok ( ( ) )
275+ print_results ( & args. display_format , query_measurements, & args. targets )
278276}
279277
280278fn validate_args ( engines : & [ Engine ] , args : & Args ) {
281- if args. duckdb_path . is_some ( ) && !engines. contains ( & Engine :: DuckDB ) {
282- panic ! ( "--duckdb-path is only valid when DuckDB engine is used" ) ;
283- }
279+ assert ! (
280+ args. duckdb_path. is_none( ) || engines. contains( & Engine :: DuckDB ) ,
281+ "--duckdb-path is only valid when DuckDB engine is used"
282+ ) ;
284283
285284 if ( args. emit_plan || args. export_spans || args. show_metrics || args. threads . is_some ( ) )
286285 && !engines. contains ( & Engine :: DataFusion )
287286 {
288- panic ! (
287+ vortex_panic ! (
289288 "--emit-plan, --export-spans, --show_metrics, --threads are only valid if DataFusion is used"
290289 ) ;
291290 }
@@ -319,11 +318,11 @@ fn print_results(
319318 display_format : & DisplayFormat ,
320319 query_measurements : Vec < QueryMeasurement > ,
321320 targets : & [ Target ] ,
322- ) {
321+ ) -> anyhow :: Result < ( ) > {
323322 match display_format {
324- DisplayFormat :: Table => render_table ( query_measurements, RatioMode :: Time , targets) . unwrap ( ) ,
323+ DisplayFormat :: Table => render_table ( query_measurements, RatioMode :: Time , targets) ,
325324
326- DisplayFormat :: GhJson => print_measurements_json ( query_measurements) . unwrap ( ) ,
325+ DisplayFormat :: GhJson => print_measurements_json ( query_measurements) ,
327326 }
328327}
329328
@@ -371,7 +370,7 @@ fn init_data_source(
371370 base_url : & Url ,
372371 single_file : bool ,
373372 engine_ctx : & EngineCtx ,
374- tokio_runtime : & tokio :: runtime :: Runtime ,
373+ tokio_runtime : & Runtime ,
375374) -> anyhow:: Result < ( ) > {
376375 let dataset = BenchmarkDataset :: ClickBench { single_file } ;
377376
@@ -431,7 +430,7 @@ fn init_data_source(
431430fn execute_queries (
432431 queries : & [ ( usize , String ) ] ,
433432 iterations : usize ,
434- tokio_runtime : & tokio :: runtime :: Runtime ,
433+ tokio_runtime : & Runtime ,
435434 file_format : Format ,
436435 progress_bar : & ProgressBar ,
437436 engine_ctx : & mut EngineCtx ,
@@ -506,13 +505,14 @@ fn execute_queries(
506505///
507506/// - The duration of the fastest execution
508507/// - The execution plan used for the query
508+ #[ allow( clippy:: unwrap_used) ]
509509fn benchmark_datafusion_query (
510510 query_idx : usize ,
511511 query_string : & str ,
512512 iterations : usize ,
513513 context : & SessionContext ,
514- tokio_runtime : & tokio :: runtime :: Runtime ,
515- ) -> ( Duration , std :: sync :: Arc < dyn execution_plan :: ExecutionPlan > ) {
514+ tokio_runtime : & Runtime ,
515+ ) -> ( Duration , Arc < dyn ExecutionPlan > ) {
516516 let execution_plan = OnceCell :: new ( ) ;
517517
518518 let fastest_run =
@@ -521,12 +521,12 @@ fn benchmark_datafusion_query(
521521 let ( duration, plan) =
522522 execute_datafusion_query ( query_idx, query_string, iteration, context. clone ( ) )
523523 . await
524- . unwrap_or_else ( |err| panic ! ( "query: {query_idx} failed with: {err}" ) ) ;
524+ . unwrap_or_else ( |err| {
525+ vortex_panic ! ( "query: {query_idx} failed with: {err}" )
526+ } ) ;
525527
526528 if execution_plan. get ( ) . is_none ( ) {
527- execution_plan
528- . set ( plan)
529- . expect ( "assign the execution plan only once" ) ;
529+ execution_plan. set ( plan) . unwrap ( ) ;
530530 }
531531
532532 fastest. min ( duration)
@@ -537,7 +537,7 @@ fn benchmark_datafusion_query(
537537 fastest_run,
538538 execution_plan
539539 . into_inner ( )
540- . expect ( "Execution plan must be set" ) ,
540+ . vortex_expect ( "Execution plan must be set" ) ,
541541 )
542542}
543543
@@ -546,15 +546,15 @@ async fn execute_datafusion_query(
546546 query_string : & str ,
547547 iteration : usize ,
548548 session_context : SessionContext ,
549- ) -> anyhow:: Result < ( Duration , std :: sync :: Arc < dyn execution_plan :: ExecutionPlan > ) > {
549+ ) -> anyhow:: Result < ( Duration , Arc < dyn ExecutionPlan > ) > {
550550 let query_string = query_string. to_owned ( ) ;
551551
552552 let ( duration, execution_plan) = tokio:: task:: spawn ( async move {
553553 let time_instant = Instant :: now ( ) ;
554554 let ( _, execution_plan) = df:: execute_query ( & session_context, & query_string)
555555 . instrument ( info_span ! ( "execute_query" , query_idx, iteration) )
556556 . await
557- . unwrap_or_else ( |e| panic ! ( "executing query {query_idx}: {e}" ) ) ;
557+ . unwrap_or_else ( |e| vortex_panic ! ( "executing query {query_idx}: {e}" ) ) ;
558558
559559 ( time_instant. elapsed ( ) , execution_plan)
560560 } )
@@ -568,19 +568,16 @@ async fn execute_datafusion_query(
568568/// # Returns
569569///
570570/// The duration of the fastest execution
571- #[ allow( clippy:: let_and_return) ]
572571fn benchmark_duckdb_query (
573572 query_idx : usize ,
574573 query_string : & str ,
575574 iterations : usize ,
576575 duckdb_executor : & DuckDBExecutor ,
577576) -> Duration {
578- let fastest_run = ( 0 ..iterations) . fold ( Duration :: from_millis ( u64:: MAX ) , |fastest, _| {
577+ ( 0 ..iterations) . fold ( Duration :: from_millis ( u64:: MAX ) , |fastest, _| {
579578 let duration = ddb:: execute_clickbench_query ( query_string, duckdb_executor)
580- . unwrap_or_else ( |err| panic ! ( "query: {query_idx} failed with: {err}" ) ) ;
579+ . unwrap_or_else ( |err| vortex_panic ! ( "query: {query_idx} failed with: {err}" ) ) ;
581580
582581 fastest. min ( duration)
583- } ) ;
584-
585- fastest_run
582+ } )
586583}
0 commit comments