@@ -9,6 +9,7 @@ use log::trace;
99use url:: Url ;
1010use vortex:: error:: VortexExpect ;
1111use vortex_duckdb:: duckdb:: { Config , Connection , Database } ;
12+ use vortex_duckdb:: register_extension_options;
1213
1314use crate :: statpopgen:: StatPopGenBenchmark ;
1415use crate :: { BenchmarkDataset , Format , IdempotentPath } ;
@@ -33,10 +34,16 @@ pub struct DuckDBCtx {
3334 pub db : Database ,
3435 pub connection : Connection ,
3536 pub db_path : Option < PathBuf > ,
37+ pub threads : Option < usize > ,
3638}
3739
3840impl DuckDBCtx {
39- pub fn new ( dataset : BenchmarkDataset , format : Format , delete_database : bool ) -> Result < Self > {
41+ pub fn new (
42+ dataset : BenchmarkDataset ,
43+ format : Format ,
44+ delete_database : bool ,
45+ threads : Option < usize > ,
46+ ) -> Result < Self > {
4047 let dir = match dataset {
4148 BenchmarkDataset :: ClickBench { flavor, .. } => {
4249 format ! ( "clickbench_{}/{}" , flavor, format. name( ) ) . to_data_path ( )
@@ -60,22 +67,30 @@ impl DuckDBCtx {
6067 std:: fs:: remove_file ( & db_path) ?;
6168 }
6269
63- let ( db, connection) = Self :: open_and_setup_database ( Some ( db_path. clone ( ) ) ) ?;
70+ let ( db, connection) = Self :: open_and_setup_database ( Some ( db_path. clone ( ) ) , threads ) ?;
6471
6572 Ok ( Self {
6673 db,
6774 connection,
6875 db_path : Some ( db_path) ,
76+ threads,
6977 } )
7078 }
7179
72- pub fn open_and_setup_database ( path : Option < PathBuf > ) -> Result < ( Database , Connection ) > {
80+ pub fn open_and_setup_database (
81+ path : Option < PathBuf > ,
82+ threads : Option < usize > ,
83+ ) -> Result < ( Database , Connection ) > {
7384 let config = Config :: new ( ) . vortex_expect ( "failed to create duckdb config" ) ;
7485
86+ // Register Vortex extension options before creating connection
87+ register_extension_options ( & config) ;
88+
7589 let db = match path {
7690 Some ( path) => Database :: open_with_config ( path, config) ,
7791 None => Database :: open_in_memory_with_config ( config) ,
7892 } ?;
93+
7994 let connection = db. connect ( ) ?;
8095 vortex_duckdb:: register_table_functions ( & connection) ?;
8196
@@ -90,6 +105,11 @@ impl DuckDBCtx {
90105 // parquet_metadata_cache" when running DuckDB in debug mode.
91106 connection. query ( "SET parquet_metadata_cache = true" ) ?;
92107
108+ // Set vortex_max_threads if specified
109+ if let Some ( thread_count) = threads {
110+ connection. query ( & format ! ( "SET vortex_max_threads = {}" , thread_count) ) ?;
111+ }
112+
93113 Ok ( ( db, connection) )
94114 }
95115
@@ -105,7 +125,8 @@ impl DuckDBCtx {
105125 drop ( connection) ;
106126 drop ( db) ;
107127
108- let ( mut db, mut connection) = Self :: open_and_setup_database ( self . db_path . clone ( ) ) ?;
128+ let ( mut db, mut connection) =
129+ Self :: open_and_setup_database ( self . db_path . clone ( ) , self . threads ) ?;
109130
110131 std:: mem:: swap ( & mut self . connection , & mut connection) ;
111132 std:: mem:: swap ( & mut self . db , & mut db) ;
@@ -114,13 +135,12 @@ impl DuckDBCtx {
114135 }
115136
116137 pub fn new_in_memory ( ) -> Result < Self > {
117- let db = Database :: open_in_memory ( ) ?;
118- let connection = db. connect ( ) ?;
119- vortex_duckdb:: register_table_functions ( & connection) ?;
138+ let ( db, connection) = Self :: open_and_setup_database ( None , None ) ?;
120139 Ok ( Self {
121140 db,
122141 connection,
123142 db_path : None ,
143+ threads : None ,
124144 } )
125145 }
126146
0 commit comments