1515// specific language governing permissions and limitations
1616// under the License.
1717
18+ extern crate arrow;
1819#[ macro_use]
1920extern crate criterion;
20- extern crate arrow;
2121extern crate datafusion;
2222
2323mod data_utils;
24+
2425use crate :: criterion:: Criterion ;
2526use arrow:: datatypes:: { DataType , Field , Fields , Schema } ;
2627use datafusion:: datasource:: MemTable ;
2728use datafusion:: execution:: context:: SessionContext ;
29+ use itertools:: Itertools ;
30+ use std:: fs:: File ;
31+ use std:: io:: { BufRead , BufReader } ;
32+ use std:: path:: PathBuf ;
2833use std:: sync:: Arc ;
2934use test_utils:: tpcds:: tpcds_schemas;
3035use test_utils:: tpch:: tpch_schemas;
3136use test_utils:: TableDef ;
3237use tokio:: runtime:: Runtime ;
3338
39+ const BENCHMARKS_PATH_1 : & str = "../../benchmarks/" ;
40+ const BENCHMARKS_PATH_2 : & str = "./benchmarks/" ;
41+ const CLICKBENCH_DATA_PATH : & str = "data/hits_partitioned/" ;
42+
3443/// Create a logical plan from the specified sql
3544fn logical_plan ( ctx : & SessionContext , sql : & str ) {
3645 let rt = Runtime :: new ( ) . unwrap ( ) ;
@@ -91,7 +100,37 @@ fn register_defs(ctx: SessionContext, defs: Vec<TableDef>) -> SessionContext {
91100 ctx
92101}
93102
103+ fn register_clickbench_hits_table ( ) -> SessionContext {
104+ let ctx = SessionContext :: new ( ) ;
105+ let rt = Runtime :: new ( ) . unwrap ( ) ;
106+
107+ // use an external table for clickbench benchmarks
108+ let path =
109+ if PathBuf :: from ( format ! ( "{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}" ) ) . exists ( ) {
110+ format ! ( "{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}" )
111+ } else {
112+ format ! ( "{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}" )
113+ } ;
114+
115+ let sql = format ! ( "CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '{path}'" ) ;
116+
117+ rt. block_on ( ctx. sql ( & sql) ) . unwrap ( ) ;
118+
119+ let count =
120+ rt. block_on ( async { ctx. table ( "hits" ) . await . unwrap ( ) . count ( ) . await . unwrap ( ) } ) ;
121+ assert ! ( count > 0 ) ;
122+ ctx
123+ }
124+
94125fn criterion_benchmark ( c : & mut Criterion ) {
126+ // verify that we can load the clickbench data prior to running the benchmark
127+ if !PathBuf :: from ( format ! ( "{BENCHMARKS_PATH_1}{CLICKBENCH_DATA_PATH}" ) ) . exists ( )
128+ && !PathBuf :: from ( format ! ( "{BENCHMARKS_PATH_2}{CLICKBENCH_DATA_PATH}" ) ) . exists ( )
129+ {
130+ panic ! ( "benchmarks/data/hits_partitioned/ could not be loaded. Please run \
131+ 'benchmarks/bench.sh data clickbench_partitioned' prior to running this benchmark")
132+ }
133+
95134 let ctx = create_context ( ) ;
96135
97136 // Test simplest
@@ -235,9 +274,15 @@ fn criterion_benchmark(c: &mut Criterion) {
235274 "q16" , "q17" , "q18" , "q19" , "q20" , "q21" , "q22" ,
236275 ] ;
237276
277+ let benchmarks_path = if PathBuf :: from ( BENCHMARKS_PATH_1 ) . exists ( ) {
278+ BENCHMARKS_PATH_1
279+ } else {
280+ BENCHMARKS_PATH_2
281+ } ;
282+
238283 for q in tpch_queries {
239284 let sql =
240- std:: fs:: read_to_string ( format ! ( "../../benchmarks/ queries/{q}.sql" ) ) . unwrap ( ) ;
285+ std:: fs:: read_to_string ( format ! ( "{benchmarks_path} queries/{q}.sql" ) ) . unwrap ( ) ;
241286 c. bench_function ( & format ! ( "physical_plan_tpch_{}" , q) , |b| {
242287 b. iter ( || physical_plan ( & tpch_ctx, & sql) )
243288 } ) ;
@@ -246,7 +291,7 @@ fn criterion_benchmark(c: &mut Criterion) {
246291 let all_tpch_sql_queries = tpch_queries
247292 . iter ( )
248293 . map ( |q| {
249- std:: fs:: read_to_string ( format ! ( "../../benchmarks/ queries/{q}.sql" ) ) . unwrap ( )
294+ std:: fs:: read_to_string ( format ! ( "{benchmarks_path} queries/{q}.sql" ) ) . unwrap ( )
250295 } )
251296 . collect :: < Vec < _ > > ( ) ;
252297
@@ -258,20 +303,25 @@ fn criterion_benchmark(c: &mut Criterion) {
258303 } )
259304 } ) ;
260305
261- c. bench_function ( "logical_plan_tpch_all" , |b| {
262- b. iter ( || {
263- for sql in & all_tpch_sql_queries {
264- logical_plan ( & tpch_ctx, sql)
265- }
266- } )
267- } ) ;
306+ // c.bench_function("logical_plan_tpch_all", |b| {
307+ // b.iter(|| {
308+ // for sql in &all_tpch_sql_queries {
309+ // logical_plan(&tpch_ctx, sql)
310+ // }
311+ // })
312+ // });
268313
269314 // --- TPC-DS ---
270315
271316 let tpcds_ctx = register_defs ( SessionContext :: new ( ) , tpcds_schemas ( ) ) ;
317+ let tests_path = if PathBuf :: from ( "./tests/" ) . exists ( ) {
318+ "./tests/"
319+ } else {
320+ "datafusion/core/tests/"
321+ } ;
272322
273323 let raw_tpcds_sql_queries = ( 1 ..100 )
274- . map ( |q| std:: fs:: read_to_string ( format ! ( "./tests/ tpc-ds/{q}.sql" ) ) . unwrap ( ) )
324+ . map ( |q| std:: fs:: read_to_string ( format ! ( "{tests_path} tpc-ds/{q}.sql" ) ) . unwrap ( ) )
275325 . collect :: < Vec < _ > > ( ) ;
276326
277327 // some queries have multiple statements
@@ -288,10 +338,53 @@ fn criterion_benchmark(c: &mut Criterion) {
288338 } )
289339 } ) ;
290340
291- c. bench_function ( "logical_plan_tpcds_all" , |b| {
341+ // c.bench_function("logical_plan_tpcds_all", |b| {
342+ // b.iter(|| {
343+ // for sql in &all_tpcds_sql_queries {
344+ // logical_plan(&tpcds_ctx, sql)
345+ // }
346+ // })
347+ // });
348+
349+ // -- clickbench --
350+
351+ let queries_file =
352+ File :: open ( format ! ( "{benchmarks_path}queries/clickbench/queries.sql" ) ) . unwrap ( ) ;
353+ let extended_file =
354+ File :: open ( format ! ( "{benchmarks_path}queries/clickbench/extended.sql" ) ) . unwrap ( ) ;
355+
356+ let clickbench_queries: Vec < String > = BufReader :: new ( queries_file)
357+ . lines ( )
358+ . chain ( BufReader :: new ( extended_file) . lines ( ) )
359+ . map ( |l| l. expect ( "Could not parse line" ) )
360+ . collect_vec ( ) ;
361+
362+ let clickbench_ctx = register_clickbench_hits_table ( ) ;
363+
364+ // for (i, sql) in clickbench_queries.iter().enumerate() {
365+ // c.bench_function(&format!("logical_plan_clickbench_q{}", i + 1), |b| {
366+ // b.iter(|| logical_plan(&clickbench_ctx, sql))
367+ // });
368+ // }
369+
370+ for ( i, sql) in clickbench_queries. iter ( ) . enumerate ( ) {
371+ c. bench_function ( & format ! ( "physical_plan_clickbench_q{}" , i + 1 ) , |b| {
372+ b. iter ( || physical_plan ( & clickbench_ctx, sql) )
373+ } ) ;
374+ }
375+
376+ // c.bench_function("logical_plan_clickbench_all", |b| {
377+ // b.iter(|| {
378+ // for sql in &clickbench_queries {
379+ // logical_plan(&clickbench_ctx, sql)
380+ // }
381+ // })
382+ // });
383+
384+ c. bench_function ( "physical_plan_clickbench_all" , |b| {
292385 b. iter ( || {
293- for sql in & all_tpcds_sql_queries {
294- logical_plan ( & tpcds_ctx , sql)
386+ for sql in & clickbench_queries {
387+ physical_plan ( & clickbench_ctx , sql)
295388 }
296389 } )
297390 } ) ;
0 commit comments