@@ -11,8 +11,6 @@ use clap::Parser;
1111use indicatif:: ProgressBar ;
1212use itertools:: Itertools ;
1313use regex:: Regex ;
14- use tokio:: runtime:: Runtime ;
15- use vortex:: array:: Array ;
1614use vortex:: array:: IntoArray ;
1715use vortex:: array:: arrays:: ChunkedArray ;
1816use vortex:: array:: arrays:: ChunkedVTable ;
@@ -21,9 +19,14 @@ use vortex::utils::aliases::hash_map::HashMap;
2119use vortex_bench:: Engine ;
2220use vortex_bench:: Format ;
2321use vortex_bench:: Target ;
24- use vortex_bench:: compress:: bench as compress;
25- use vortex_bench:: compress:: bench:: CompressMeasurements ;
26- use vortex_bench:: compress:: bench:: CompressOp ;
22+ use vortex_bench:: compress:: CompressMeasurements ;
23+ use vortex_bench:: compress:: CompressOp ;
24+ use vortex_bench:: compress:: Compressor ;
25+ use vortex_bench:: compress:: ParquetCompressor ;
26+ use vortex_bench:: compress:: VortexCompressor ;
27+ use vortex_bench:: compress:: benchmark_compress;
28+ use vortex_bench:: compress:: benchmark_decompress;
29+ use vortex_bench:: compress:: calculate_ratios;
2730use vortex_bench:: datasets:: Dataset ;
2831use vortex_bench:: datasets:: struct_list_of_ints:: StructListOfInts ;
2932use vortex_bench:: datasets:: taxi_data:: TaxiData ;
@@ -33,8 +36,6 @@ use vortex_bench::display::DisplayFormat;
3336use vortex_bench:: display:: print_measurements_json;
3437use vortex_bench:: display:: render_table;
3538use vortex_bench:: downloadable_dataset:: DownloadableDataset ;
36- use vortex_bench:: measurements:: CompressionTimingMeasurement ;
37- use vortex_bench:: measurements:: CustomUnitMeasurement ;
3839use vortex_bench:: public_bi:: PBI_DATASETS ;
3940use vortex_bench:: public_bi:: PBIDataset :: Arade ;
4041use vortex_bench:: public_bi:: PBIDataset :: Bimbo ;
@@ -43,7 +44,6 @@ use vortex_bench::public_bi::PBIDataset::Euro2016;
4344use vortex_bench:: public_bi:: PBIDataset :: Food ;
4445use vortex_bench:: public_bi:: PBIDataset :: HashTags ;
4546use vortex_bench:: setup_logging_and_tracing;
46- use vortex_bench:: utils:: new_tokio_runtime;
4747
4848#[ derive( Parser , Debug ) ]
4949#[ command( version, about, long_about = None ) ]
@@ -58,8 +58,6 @@ struct Args {
5858 #[ arg( short, long, default_value_t = 5 ) ]
5959 iterations : usize ,
6060 #[ arg( short, long) ]
61- threads : Option < usize > ,
62- #[ arg( short, long) ]
6361 verbose : bool ,
6462 #[ arg(
6563 long,
@@ -77,26 +75,33 @@ struct Args {
7775 tracing : bool ,
7876}
7977
80- fn main ( ) -> anyhow:: Result < ( ) > {
78+ #[ tokio:: main]
79+ async fn main ( ) -> anyhow:: Result < ( ) > {
8180 let args = Args :: parse ( ) ;
8281
8382 setup_logging_and_tracing ( args. verbose , args. tracing ) ?;
8483
85- let runtime = new_tokio_runtime ( args. threads ) ?;
86-
87- compress (
88- runtime,
84+ run_compress (
8985 args. iterations ,
9086 args. datasets . map ( |d| Regex :: new ( & d) ) . transpose ( ) ?,
9187 args. formats ,
9288 args. ops ,
9389 args. display_format ,
9490 & args. output_path ,
9591 )
92+ . await
93+ }
94+
95+ /// Get a compressor for the given format.
96+ fn get_compressor ( format : Format ) -> Box < dyn Compressor > {
97+ match format {
98+ Format :: OnDiskVortex => Box :: new ( VortexCompressor ) ,
99+ Format :: Parquet => Box :: new ( ParquetCompressor :: new ( ) ) ,
100+ _ => unimplemented ! ( "Compress bench not implemented for {format}" ) ,
101+ }
96102}
97103
98- fn compress (
99- runtime : Runtime ,
104+ async fn run_compress (
100105 iterations : usize ,
101106 datasets_filter : Option < Regex > ,
102107 formats : Vec < Format > ,
@@ -151,21 +156,15 @@ fn compress(
151156
152157 let progress = ProgressBar :: new ( ( datasets. len ( ) * formats. len ( ) * ops. len ( ) ) as u64 ) ;
153158
154- let measurements = datasets
155- . into_iter ( )
156- . map ( |dataset_handle| {
157- benchmark_compress (
158- & runtime,
159- & progress,
160- & formats,
161- & ops,
162- iterations,
163- dataset_handle,
164- )
165- } )
166- . try_collect :: < _ , Vec < _ > , _ > ( ) ?
167- . into_iter ( )
168- . collect :: < CompressMeasurements > ( ) ;
159+ let mut measurements = vec ! [ ] ;
160+
161+ for dataset_handle in datasets. into_iter ( ) {
162+ let m = run_benchmark_for_dataset ( & progress, & formats, & ops, iterations, dataset_handle)
163+ . await ?;
164+ measurements. push ( m) ;
165+ }
166+
167+ let measurements = CompressMeasurements :: from_iter ( measurements) ;
169168
170169 progress. finish ( ) ;
171170
@@ -195,28 +194,7 @@ fn compress(
195194 }
196195}
197196
198- // Type aliases for compression and decompression function signatures.
199- type CompressFn = fn (
200- & Runtime ,
201- & dyn Array ,
202- usize ,
203- & str ,
204- ) -> anyhow:: Result < (
205- Duration ,
206- u64 ,
207- Vec < CustomUnitMeasurement > ,
208- CompressionTimingMeasurement ,
209- ) > ;
210-
211- type DecompressFn = fn (
212- & Runtime ,
213- & dyn Array ,
214- usize ,
215- & str ,
216- ) -> anyhow:: Result < ( Duration , CompressionTimingMeasurement ) > ;
217-
218- pub fn benchmark_compress (
219- runtime : & Runtime ,
197+ async fn run_benchmark_for_dataset (
220198 progress : & ProgressBar ,
221199 formats : & [ Format ] ,
222200 ops : & [ CompressOp ] ,
@@ -226,7 +204,7 @@ pub fn benchmark_compress(
226204 let bench_name = dataset_handle. name ( ) ;
227205 tracing:: info!( "Running {bench_name} benchmark" ) ;
228206
229- let vx_array = runtime . block_on ( async { dataset_handle. to_vortex_array ( ) . await } ) ?;
207+ let vx_array = dataset_handle. to_vortex_array ( ) . await ?;
230208 let uncompressed = ChunkedArray :: from_iter (
231209 vx_array
232210 . as_ :: < ChunkedVTable > ( )
@@ -246,36 +224,33 @@ pub fn benchmark_compress(
246224 let mut compressed_sizes: HashMap < Format , u64 > = HashMap :: new ( ) ;
247225
248226 for format in formats {
227+ let compressor = get_compressor ( * format) ;
228+
249229 for op in ops {
250230 let time = match op {
251231 CompressOp :: Compress => {
252- // Select the compression function based on format.
253- let compress_fn: CompressFn = match format {
254- Format :: OnDiskVortex => compress:: benchmark_vortex_compress,
255- Format :: Parquet => compress:: benchmark_parquet_compress,
256- _ => unimplemented ! ( "Compress bench not implemented for {format}" ) ,
257- } ;
258-
259- let ( time, size, ratios_part, timing) =
260- compress_fn ( runtime, & uncompressed, iterations, bench_name) ?;
261- compressed_sizes. insert ( * format, size) ;
262- ratios. extend ( ratios_part) ;
263- timings. push ( timing) ;
264-
265- time
232+ let result = benchmark_compress (
233+ compressor. as_ref ( ) ,
234+ & uncompressed,
235+ iterations,
236+ bench_name,
237+ )
238+ . await ?;
239+ compressed_sizes. insert ( * format, result. compressed_size ) ;
240+ ratios. extend ( result. ratios ) ;
241+ timings. push ( result. timing ) ;
242+ result. time
266243 }
267244 CompressOp :: Decompress => {
268- // Select the decompression function based on format.
269- let decompress_fn: DecompressFn = match format {
270- Format :: OnDiskVortex => compress:: benchmark_vortex_decompress,
271- Format :: Parquet => compress:: benchmark_parquet_decompress,
272- _ => unimplemented ! ( "Decompress bench not implemented for {format}" ) ,
273- } ;
274-
275- let ( time, timing) =
276- decompress_fn ( runtime, & uncompressed, iterations, bench_name) ?;
277- timings. push ( timing) ;
278- time
245+ let result = benchmark_decompress (
246+ compressor. as_ref ( ) ,
247+ & uncompressed,
248+ iterations,
249+ bench_name,
250+ )
251+ . await ?;
252+ timings. push ( result. timing ) ;
253+ result. time
279254 }
280255 } ;
281256
@@ -285,7 +260,7 @@ pub fn benchmark_compress(
285260 }
286261
287262 // Calculate cross-format ratios after all measurements.
288- compress :: calculate_ratios (
263+ calculate_ratios (
289264 & measurements_map,
290265 & compressed_sizes,
291266 bench_name,
0 commit comments