11use std:: path:: PathBuf ;
22use std:: sync:: Arc ;
33use std:: time:: { Duration , Instant } ;
4+ use std:: { env, fs} ;
45
56use anyhow:: anyhow;
67use bench_vortex:: ddb:: { DuckDBExecutor , register_tables} ;
@@ -25,6 +26,7 @@ use datafusion::physical_plan::metrics::{Label, MetricsSet};
2526use indicatif:: ProgressBar ;
2627use itertools:: Itertools ;
2728use log:: { info, warn} ;
29+ use similar:: { ChangeTag , TextDiff } ;
2830use tempfile:: tempdir;
2931use url:: Url ;
3032use vortex:: aliases:: hash_map:: HashMap ;
@@ -455,10 +457,78 @@ async fn bench_main(
455457 }
456458
457459 if verify_row_counts ( & row_counts, expected_row_counts, & queries, & exclude_queries) {
458- Err ( anyhow ! ( "Mismatched row counts. See logs for details." ) )
459- } else {
460- anyhow:: Ok ( ( ) )
460+ return Err ( anyhow ! ( "Mismatched row counts. See logs for details." ) ) ;
461+ }
462+
463+ if targets. iter ( ) . any ( |t| t. engine ( ) == Engine :: DuckDB ) {
464+ verify_duckdb_tpch_results ( scale_factor, duckdb_resolved_path) ?;
465+ }
466+
467+ anyhow:: Ok ( ( ) )
468+ }
469+
470+ fn verify_duckdb_tpch_results ( scale_factor : u8 , duckdb_path : PathBuf ) -> anyhow:: Result < ( ) > {
471+ let query_dir = PathBuf :: from ( "duckdb-vortex/duckdb/extension/tpch/dbgen/queries" ) ;
472+ let tmp_dir = format ! (
473+ "{}/spiral-tpch" ,
474+ // $RUNNER_TEMP is defined by GitHub Actions.
475+ env:: var( "TMPDIR" ) . unwrap_or( env:: var( "RUNNER_TEMP" ) ?)
476+ ) ;
477+ if PathBuf :: from ( & tmp_dir) . exists ( ) {
478+ fs:: remove_dir_all ( & tmp_dir) ?;
479+ }
480+ fs:: create_dir ( & tmp_dir) ?;
481+ let db_path = format ! ( "{tmp_dir}/tpch_results_sf.db" ) ;
482+
483+ let executor = DuckDBExecutor :: new ( duckdb_path, & db_path) ;
484+ ddb:: execute_tpch_query ( & [ format ! ( "CALL dbgen(sf={})" , scale_factor) ] , & executor) ?;
485+
486+ let query_files = fs:: read_dir ( query_dir) ?
487+ . filter_map ( Result :: ok)
488+ . filter ( |entry| entry. path ( ) . extension ( ) . is_some_and ( |ext| ext == "sql" ) )
489+ . collect :: < Vec < _ > > ( ) ;
490+
491+ for query_file in & query_files {
492+ let query_file_path = query_file. path ( ) ;
493+ let query_name = query_file_path
494+ . file_stem ( )
495+ . and_then ( |stem| stem. to_str ( ) )
496+ . ok_or_else ( || anyhow ! ( "Invalid query filename" ) ) ?;
497+
498+ let create_table = format ! (
499+ "CREATE OR REPLACE TABLE {query_name}_result AS {};" ,
500+ fs:: read_to_string( & query_file_path) ?
501+ ) ;
502+
503+ let csv_actual = format ! ( "{tmp_dir}/{query_name}.csv" ) ;
504+ let write_csv =
505+ format ! ( "COPY {query_name}_result TO '{csv_actual}' (HEADER, DELIMITER '|');" , ) ;
506+
507+ ddb:: execute_tpch_query ( & [ create_table, write_csv] , & executor) ?;
508+
509+ let csv_expected = format ! ( "bench-vortex/tpch_results/duckdb/{query_name}.csv" ) ;
510+ let expected = fs:: read_to_string ( csv_expected) ?;
511+ let actual = fs:: read_to_string ( csv_actual) ?;
512+
513+ if expected != actual {
514+ let diff = TextDiff :: from_lines ( & expected, & actual) ;
515+
516+ for change in diff. iter_all_changes ( ) {
517+ let sign = match change. tag ( ) {
518+ ChangeTag :: Delete => "-" ,
519+ ChangeTag :: Insert => "+" ,
520+ ChangeTag :: Equal => " " ,
521+ } ;
522+ print ! ( "{}{}" , sign, change) ;
523+ }
524+
525+ return Err ( anyhow ! ( format!(
526+ "query output does not match the reference for {query_name}"
527+ ) ) ) ;
528+ }
461529 }
530+
531+ Ok ( ( ) )
462532}
463533
464534fn validate_args ( engines : & [ Engine ] , args : & Args ) {
0 commit comments