@@ -19,6 +19,10 @@ use super::{
1919 get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_QUERY_END_ID ,
2020 TPCH_QUERY_START_ID , TPCH_TABLES ,
2121} ;
22+ use crate :: util:: {
23+ BenchmarkRun , CommonOpt , InMemoryCacheExecCodec , InMemoryDataSourceRule , QueryResult ,
24+ WarmingUpMarker ,
25+ } ;
2226use async_trait:: async_trait;
2327use datafusion:: arrow:: record_batch:: RecordBatch ;
2428use datafusion:: arrow:: util:: pretty:: { self , pretty_format_batches} ;
@@ -37,29 +41,21 @@ use datafusion::execution::{SessionState, SessionStateBuilder};
3741use datafusion:: physical_plan:: display:: DisplayableExecutionPlan ;
3842use datafusion:: physical_plan:: { collect, displayable} ;
3943use datafusion:: prelude:: * ;
40- use datafusion_distributed:: MappedDistributedSessionBuilderExt ;
41- use std:: path:: PathBuf ;
42- use std:: sync:: Arc ;
43-
44- use crate :: util:: {
45- BenchmarkRun , CommonOpt , InMemoryCacheExecCodec , InMemoryDataSourceRule , QueryResult ,
46- WarmingUpMarker ,
47- } ;
4844use datafusion_distributed:: test_utils:: localhost:: {
4945 get_free_ports, spawn_flight_service, start_localhost_context, LocalHostChannelResolver ,
5046} ;
47+ use datafusion_distributed:: MappedDistributedSessionBuilderExt ;
5148use datafusion_distributed:: {
5249 DistributedExt , DistributedPhysicalOptimizerRule , DistributedSessionBuilder ,
5350 DistributedSessionBuilderContext ,
5451} ;
5552use log:: info;
53+ use std:: path:: PathBuf ;
54+ use std:: sync:: Arc ;
5655use structopt:: StructOpt ;
5756use tokio:: net:: TcpListener ;
5857use tokio:: task:: JoinHandle ;
5958
60- // hack to avoid `default_value is meaningless for bool` errors
61- type BoolDefaultTrue = bool ;
62-
6359/// Run the tpch benchmark.
6460///
6561/// This benchmarks is derived from the [TPC-H][1] version
@@ -100,11 +96,6 @@ pub struct RunOpt {
10096 #[ structopt( short = "S" , long = "disable-statistics" ) ]
10197 disable_statistics : bool ,
10298
103- /// If true then hash join used, if false then sort merge join
104- /// True by default.
105- #[ structopt( short = "j" , long = "prefer_hash_join" , default_value = "true" ) ]
106- prefer_hash_join : BoolDefaultTrue ,
107-
10899 /// Mark the first column of each table as sorted in ascending order.
109100 /// The tables should have been created with the `--sort` option for this to have any effect.
110101 #[ structopt( short = "t" , long = "sorted" ) ]
@@ -418,172 +409,3 @@ impl RunOpt {
418409 . unwrap_or_else ( get_available_parallelism)
419410 }
420411}
421-
422- #[ cfg( test) ]
423- // Only run with "ci" mode when we have the data
424- #[ cfg( feature = "ci" ) ]
425- mod tests {
426- use std:: path:: Path ;
427-
428- use super :: * ;
429-
430- use datafusion:: common:: exec_err;
431- use datafusion:: error:: Result ;
432- use datafusion_proto:: bytes:: {
433- logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes,
434- physical_plan_to_bytes,
435- } ;
436-
437- fn get_tpch_data_path ( ) -> Result < String > {
438- let path = std:: env:: var ( "TPCH_DATA" ) . unwrap_or_else ( |_| "benchmarks/data" . to_string ( ) ) ;
439- if !Path :: new ( & path) . exists ( ) {
440- return exec_err ! (
441- "Benchmark data not found (set TPCH_DATA env var to override): {}" ,
442- path
443- ) ;
444- }
445- Ok ( path)
446- }
447-
448- async fn round_trip_logical_plan ( query : usize ) -> Result < ( ) > {
449- let ctx = SessionContext :: default ( ) ;
450- let path = get_tpch_data_path ( ) ?;
451- let common = CommonOpt {
452- iterations : 1 ,
453- partitions : Some ( 2 ) ,
454- batch_size : Some ( 8192 ) ,
455- mem_pool_type : "fair" . to_string ( ) ,
456- memory_limit : None ,
457- sort_spill_reservation_bytes : None ,
458- debug : false ,
459- } ;
460- let opt = RunOpt {
461- query : Some ( query) ,
462- common,
463- path : PathBuf :: from ( path. to_string ( ) ) ,
464- file_format : "tbl" . to_string ( ) ,
465- mem_table : false ,
466- output_path : None ,
467- disable_statistics : false ,
468- prefer_hash_join : true ,
469- sorted : false ,
470- partitions_per_task : None ,
471- } ;
472- opt. register_tables ( & ctx) . await ?;
473- let queries = get_query_sql ( query) ?;
474- for query in queries {
475- let plan = ctx. sql ( & query) . await ?;
476- let plan = plan. into_optimized_plan ( ) ?;
477- let bytes = logical_plan_to_bytes ( & plan) ?;
478- let plan2 = logical_plan_from_bytes ( & bytes, & ctx) ?;
479- let plan_formatted = format ! ( "{}" , plan. display_indent( ) ) ;
480- let plan2_formatted = format ! ( "{}" , plan2. display_indent( ) ) ;
481- assert_eq ! ( plan_formatted, plan2_formatted) ;
482- }
483- Ok ( ( ) )
484- }
485-
486- async fn round_trip_physical_plan ( query : usize ) -> Result < ( ) > {
487- let ctx = SessionContext :: default ( ) ;
488- let path = get_tpch_data_path ( ) ?;
489- let common = CommonOpt {
490- iterations : 1 ,
491- partitions : Some ( 2 ) ,
492- batch_size : Some ( 8192 ) ,
493- mem_pool_type : "fair" . to_string ( ) ,
494- memory_limit : None ,
495- sort_spill_reservation_bytes : None ,
496- debug : false ,
497- } ;
498- let opt = RunOpt {
499- query : Some ( query) ,
500- common,
501- path : PathBuf :: from ( path. to_string ( ) ) ,
502- file_format : "tbl" . to_string ( ) ,
503- mem_table : false ,
504- output_path : None ,
505- disable_statistics : false ,
506- prefer_hash_join : true ,
507- sorted : false ,
508- partitions_per_task : None ,
509- } ;
510- opt. register_tables ( & ctx) . await ?;
511- let queries = get_query_sql ( query) ?;
512- for query in queries {
513- let plan = ctx. sql ( & query) . await ?;
514- let plan = plan. create_physical_plan ( ) . await ?;
515- let bytes = physical_plan_to_bytes ( plan. clone ( ) ) ?;
516- let plan2 = physical_plan_from_bytes ( & bytes, & ctx) ?;
517- let plan_formatted = format ! ( "{}" , displayable( plan. as_ref( ) ) . indent( false ) ) ;
518- let plan2_formatted = format ! ( "{}" , displayable( plan2. as_ref( ) ) . indent( false ) ) ;
519- assert_eq ! ( plan_formatted, plan2_formatted) ;
520- }
521- Ok ( ( ) )
522- }
523-
524- macro_rules! test_round_trip_logical {
525- ( $tn: ident, $query: expr) => {
526- #[ tokio:: test]
527- async fn $tn( ) -> Result <( ) > {
528- round_trip_logical_plan( $query) . await
529- }
530- } ;
531- }
532-
533- macro_rules! test_round_trip_physical {
534- ( $tn: ident, $query: expr) => {
535- #[ tokio:: test]
536- async fn $tn( ) -> Result <( ) > {
537- round_trip_physical_plan( $query) . await
538- }
539- } ;
540- }
541-
542- // logical plan tests
543- test_round_trip_logical ! ( round_trip_logical_plan_q1, 1 ) ;
544- test_round_trip_logical ! ( round_trip_logical_plan_q2, 2 ) ;
545- test_round_trip_logical ! ( round_trip_logical_plan_q3, 3 ) ;
546- test_round_trip_logical ! ( round_trip_logical_plan_q4, 4 ) ;
547- test_round_trip_logical ! ( round_trip_logical_plan_q5, 5 ) ;
548- test_round_trip_logical ! ( round_trip_logical_plan_q6, 6 ) ;
549- test_round_trip_logical ! ( round_trip_logical_plan_q7, 7 ) ;
550- test_round_trip_logical ! ( round_trip_logical_plan_q8, 8 ) ;
551- test_round_trip_logical ! ( round_trip_logical_plan_q9, 9 ) ;
552- test_round_trip_logical ! ( round_trip_logical_plan_q10, 10 ) ;
553- test_round_trip_logical ! ( round_trip_logical_plan_q11, 11 ) ;
554- test_round_trip_logical ! ( round_trip_logical_plan_q12, 12 ) ;
555- test_round_trip_logical ! ( round_trip_logical_plan_q13, 13 ) ;
556- test_round_trip_logical ! ( round_trip_logical_plan_q14, 14 ) ;
557- test_round_trip_logical ! ( round_trip_logical_plan_q15, 15 ) ;
558- test_round_trip_logical ! ( round_trip_logical_plan_q16, 16 ) ;
559- test_round_trip_logical ! ( round_trip_logical_plan_q17, 17 ) ;
560- test_round_trip_logical ! ( round_trip_logical_plan_q18, 18 ) ;
561- test_round_trip_logical ! ( round_trip_logical_plan_q19, 19 ) ;
562- test_round_trip_logical ! ( round_trip_logical_plan_q20, 20 ) ;
563- test_round_trip_logical ! ( round_trip_logical_plan_q21, 21 ) ;
564- test_round_trip_logical ! ( round_trip_logical_plan_q22, 22 ) ;
565-
566- // physical plan tests
567- test_round_trip_physical ! ( round_trip_physical_plan_q1, 1 ) ;
568- test_round_trip_physical ! ( round_trip_physical_plan_q2, 2 ) ;
569- test_round_trip_physical ! ( round_trip_physical_plan_q3, 3 ) ;
570- test_round_trip_physical ! ( round_trip_physical_plan_q4, 4 ) ;
571- test_round_trip_physical ! ( round_trip_physical_plan_q5, 5 ) ;
572- test_round_trip_physical ! ( round_trip_physical_plan_q6, 6 ) ;
573- test_round_trip_physical ! ( round_trip_physical_plan_q7, 7 ) ;
574- test_round_trip_physical ! ( round_trip_physical_plan_q8, 8 ) ;
575- test_round_trip_physical ! ( round_trip_physical_plan_q9, 9 ) ;
576- test_round_trip_physical ! ( round_trip_physical_plan_q10, 10 ) ;
577- test_round_trip_physical ! ( round_trip_physical_plan_q11, 11 ) ;
578- test_round_trip_physical ! ( round_trip_physical_plan_q12, 12 ) ;
579- test_round_trip_physical ! ( round_trip_physical_plan_q13, 13 ) ;
580- test_round_trip_physical ! ( round_trip_physical_plan_q14, 14 ) ;
581- test_round_trip_physical ! ( round_trip_physical_plan_q15, 15 ) ;
582- test_round_trip_physical ! ( round_trip_physical_plan_q16, 16 ) ;
583- test_round_trip_physical ! ( round_trip_physical_plan_q17, 17 ) ;
584- test_round_trip_physical ! ( round_trip_physical_plan_q18, 18 ) ;
585- test_round_trip_physical ! ( round_trip_physical_plan_q19, 19 ) ;
586- test_round_trip_physical ! ( round_trip_physical_plan_q20, 20 ) ;
587- test_round_trip_physical ! ( round_trip_physical_plan_q21, 21 ) ;
588- test_round_trip_physical ! ( round_trip_physical_plan_q22, 22 ) ;
589- }
0 commit comments