@@ -11,19 +11,22 @@ use datafusion::datasource::listing::ListingOptions;
1111use datafusion:: datasource:: listing:: ListingTable ;
1212use datafusion:: datasource:: listing:: ListingTableConfig ;
1313use datafusion:: datasource:: listing:: ListingTableUrl ;
14+ use datafusion:: parquet:: arrow:: ParquetRecordBatchStreamBuilder ;
1415use datafusion:: prelude:: SessionContext ;
1516use datafusion_physical_plan:: ExecutionPlan ;
1617use df_bench:: format_to_df_format;
18+ use futures:: StreamExt ;
19+ use tokio:: fs:: File ;
1720use vortex_bench:: Benchmark ;
1821use vortex_bench:: BenchmarkArg ;
19- use vortex_bench:: BenchmarkOutput ;
2022use vortex_bench:: CompactionStrategy ;
2123use vortex_bench:: Engine ;
2224use vortex_bench:: Format ;
2325use vortex_bench:: Opt ;
2426use vortex_bench:: Opts ;
2527use vortex_bench:: conversions:: convert_parquet_to_vortex;
2628use vortex_bench:: create_benchmark;
29+ use vortex_bench:: create_output_writer;
2730use vortex_bench:: display:: DisplayFormat ;
2831use vortex_bench:: runner:: SqlBenchmarkRunner ;
2932use vortex_bench:: runner:: filter_queries;
@@ -160,8 +163,9 @@ async fn main() -> anyhow::Result<()> {
160163 )
161164 . await ?;
162165
163- let output = BenchmarkOutput :: with_path ( benchmark. dataset_name ( ) , args. output_path ) ;
164- runner. export_to ( & args. display_format , output. create_writer ( ) ?) ?;
166+ let benchmark_id = format ! ( "datafusion-{}" , benchmark. dataset_name( ) ) ;
167+ let writer = create_output_writer ( & args. display_format , args. output_path , & benchmark_id) ?;
168+ runner. export_to ( & args. display_format , writer) ?;
165169
166170 Ok ( ( ) )
167171}
@@ -171,26 +175,88 @@ async fn register_benchmark_tables<B: Benchmark + ?Sized>(
171175 benchmark : & B ,
172176 format : Format ,
173177) -> anyhow:: Result < ( ) > {
174- let benchmark_base = benchmark. data_url ( ) . join ( & format ! ( "{}/" , format. name( ) ) ) ?;
175- let file_format = format_to_df_format ( format) ;
178+ if matches ! ( format, Format :: Arrow ) {
179+ // For Arrow format, load Arrow IPC files into in-memory tables
180+ register_arrow_tables ( session, benchmark) . await
181+ } else {
182+ let benchmark_base = benchmark. data_url ( ) . join ( & format ! ( "{}/" , format. name( ) ) ) ?;
183+ let file_format = format_to_df_format ( format) ;
184+
185+ for table in benchmark. table_specs ( ) . iter ( ) {
186+ let pattern = benchmark. pattern ( table. name , format) ;
187+ let table_url = ListingTableUrl :: try_new ( benchmark_base. clone ( ) , pattern) ?;
188+
189+ let mut config = ListingTableConfig :: new ( table_url) . with_listing_options (
190+ ListingOptions :: new ( file_format. clone ( ) )
191+ . with_session_config_options ( session. state ( ) . config ( ) ) ,
192+ ) ;
193+
194+ config = match table. schema . as_ref ( ) {
195+ Some ( schema) => config. with_schema ( Arc :: new ( schema. clone ( ) ) ) ,
196+ None => config. infer_schema ( & session. state ( ) ) . await ?,
197+ } ;
198+
199+ let listing_table = Arc :: new ( ListingTable :: try_new ( config) ?) ;
200+
201+ session. register_table ( table. name , listing_table) ?;
202+ }
176203
177- for table in benchmark . table_specs ( ) . iter ( ) {
178- let pattern = benchmark . pattern ( table . name , format ) ;
179- let table_url = ListingTableUrl :: try_new ( benchmark_base . clone ( ) , pattern ) ? ;
204+ Ok ( ( ) )
205+ }
206+ }
180207
181- let mut config = ListingTableConfig :: new ( table_url) . with_listing_options (
182- ListingOptions :: new ( file_format. clone ( ) )
183- . with_session_config_options ( session. state ( ) . config ( ) ) ,
184- ) ;
208+ /// Load Arrow IPC files into in-memory DataFusion tables.
209+ async fn register_arrow_tables < B : Benchmark + ?Sized > (
210+ session : & SessionContext ,
211+ benchmark : & B ,
212+ ) -> anyhow:: Result < ( ) > {
213+ use datafusion:: datasource:: MemTable ;
185214
186- config = match table. schema . as_ref ( ) {
187- Some ( schema) => config. with_schema ( Arc :: new ( schema. clone ( ) ) ) ,
188- None => config. infer_schema ( & session. state ( ) ) . await ?,
189- } ;
215+ let parquet_dir = benchmark
216+ . data_url ( )
217+ . to_file_path ( )
218+ . map_err ( |_| anyhow:: anyhow!( "Arrow format requires local file path" ) ) ?
219+ . join ( Format :: Parquet . name ( ) ) ;
190220
191- let listing_table = Arc :: new ( ListingTable :: try_new ( config) ?) ;
221+ // Read all arrow files from the directory
222+ let data_files = std:: fs:: read_dir ( & parquet_dir) ?. collect :: < Result < Vec < _ > , _ > > ( ) ?;
192223
193- session. register_table ( table. name , listing_table) ?;
224+ for table in benchmark. table_specs ( ) . iter ( ) {
225+ let pattern = benchmark. pattern ( table. name , Format :: Parquet ) ;
226+
227+ // Find files matching this table's pattern
228+ let matching_files: Vec < _ > = data_files
229+ . iter ( )
230+ . filter ( |entry| {
231+ let filename = entry. file_name ( ) ;
232+ let filename_str = filename. to_str ( ) . unwrap_or ( "" ) ;
233+ match & pattern {
234+ Some ( p) => p. matches ( filename_str) ,
235+ None => filename_str == format ! ( "{}.{}" , table. name, Format :: Parquet . ext( ) ) ,
236+ }
237+ } )
238+ . collect ( ) ;
239+
240+ // Load all matching files into memory
241+ let mut all_batches = Vec :: new ( ) ;
242+ let mut schema = None ;
243+
244+ for dir_entry in matching_files {
245+ let file = File :: open ( dir_entry. path ( ) ) . await ?;
246+ let mut reader = ParquetRecordBatchStreamBuilder :: new ( file) . await ?. build ( ) ?;
247+ if schema. is_none ( ) {
248+ schema = Some ( reader. schema ( ) ) . cloned ( ) ;
249+ }
250+
251+ while let Some ( batch) = reader. next ( ) . await {
252+ all_batches. push ( batch?) ;
253+ }
254+ }
255+
256+ if let Some ( schema) = schema {
257+ let mem_table = MemTable :: try_new ( schema, vec ! [ all_batches] ) ?;
258+ session. register_table ( table. name , Arc :: new ( mem_table) ) ?;
259+ }
194260 }
195261
196262 Ok ( ( ) )
0 commit comments