Skip to content

Commit 2940ac9

Browse files
authored
chore: minor benchmark improvements (#1310)
1 parent 38e9860 commit 2940ac9

File tree

2 files changed

+73
-3
lines changed

2 files changed

+73
-3
lines changed

bench-vortex/benches/compress_noci.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
mod tokio_runtime;
22

3+
use core::str::FromStr;
4+
use core::sync::atomic::{AtomicBool, Ordering};
35
use std::io::Cursor;
46
use std::path::Path;
57
use std::sync::Arc;
@@ -16,11 +18,13 @@ use bench_vortex::tpch::dbgen::{DBGen, DBGenOptions};
1618
use bench_vortex::{fetch_taxi_data, tpch};
1719
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
1820
use futures::TryStreamExt;
21+
use log::LevelFilter;
1922
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
2023
use parquet::arrow::ArrowWriter;
2124
use parquet::basic::{Compression, ZstdLevel};
2225
use parquet::file::properties::WriterProperties;
2326
use regex::Regex;
27+
use simplelog::*;
2428
use tokio::runtime::Runtime;
2529
use vortex::array::{ChunkedArray, StructArray};
2630
use vortex::buffer::Buffer;
@@ -41,6 +45,8 @@ struct GenericBenchmarkResults<'a> {
4145
range: f64,
4246
}
4347

48+
static LOG_INITIALIZED: AtomicBool = AtomicBool::new(false);
49+
4450
fn ensure_dir_exists(dir: &str) -> std::io::Result<()> {
4551
let path = Path::new(dir);
4652
if !path.exists() {
@@ -164,6 +170,20 @@ fn benchmark_compress<F, U>(
164170
F: Fn() -> U,
165171
U: AsRef<Array>,
166172
{
173+
// if no logging is enabled, enable it
174+
if !LOG_INITIALIZED.swap(true, Ordering::SeqCst) {
175+
TermLogger::init(
176+
env::var("RUST_LOG")
177+
.ok()
178+
.and_then(|s| LevelFilter::from_str(&s).ok())
179+
.unwrap_or(LevelFilter::Off),
180+
Config::default(),
181+
TerminalMode::Mixed,
182+
ColorChoice::Auto,
183+
)
184+
.unwrap();
185+
}
186+
167187
ensure_dir_exists("benchmarked-files").unwrap();
168188
let runtime = &TOKIO_RUNTIME;
169189
let uncompressed = make_uncompressed();

bench-vortex/benches/compressor_throughput.rs

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
use criterion::{black_box, criterion_group, criterion_main, BatchSize, Criterion, Throughput};
22
use itertools::Itertools as _;
33
use mimalloc::MiMalloc;
4-
use rand::{Rng, SeedableRng as _};
4+
use rand::distributions::Alphanumeric;
5+
use rand::seq::SliceRandom as _;
6+
use rand::{thread_rng, Rng, SeedableRng as _};
57
use vortex::aliases::hash_set::HashSet;
6-
use vortex::array::PrimitiveArray;
8+
use vortex::array::{PrimitiveArray, VarBinViewArray};
79
use vortex::compute::unary::try_cast;
10+
use vortex::dict::{dict_encode_varbinview, DictArray};
811
use vortex::dtype::PType;
12+
use vortex::fsst::{fsst_compress, fsst_train_compressor};
913
use vortex::sampling_compressor::compressors::alp::ALPCompressor;
1014
use vortex::sampling_compressor::compressors::alp_rd::ALPRDCompressor;
1115
use vortex::sampling_compressor::compressors::bitpacked::{
@@ -92,5 +96,51 @@ fn primitive(c: &mut Criterion) {
9296
}
9397
}
9498

95-
criterion_group!(benches, primitive);
99+
fn strings(c: &mut Criterion) {
100+
let mut group = c.benchmark_group("string-decompression");
101+
let num_values = u16::MAX as u64;
102+
group.throughput(Throughput::Bytes(num_values * 8));
103+
104+
let varbinview_arr = VarBinViewArray::from_iter_str(gen_varbin_words(1_000_000, 0.00005));
105+
let (codes, values) = dict_encode_varbinview(&varbinview_arr);
106+
group.throughput(Throughput::Bytes(
107+
varbinview_arr.clone().into_array().nbytes() as u64,
108+
));
109+
group.bench_function("dict_decode_varbinview", |b| {
110+
b.iter_batched(
111+
|| DictArray::try_new(codes.clone().into_array(), values.clone().into_array()).unwrap(),
112+
|dict_arr| black_box(dict_arr.into_canonical().unwrap()),
113+
BatchSize::SmallInput,
114+
);
115+
});
116+
117+
let fsst_compressor = fsst_train_compressor(&varbinview_arr.clone().into_array()).unwrap();
118+
let fsst_array = fsst_compress(&varbinview_arr.clone().into_array(), &fsst_compressor).unwrap();
119+
group.bench_function("fsst_decompress_varbinview", |b| {
120+
b.iter_batched(
121+
|| fsst_array.clone(),
122+
|fsst_arr| black_box(fsst_arr.into_canonical().unwrap()),
123+
BatchSize::SmallInput,
124+
);
125+
});
126+
}
127+
128+
fn gen_varbin_words(len: usize, uniqueness: f64) -> Vec<String> {
129+
let mut rng = thread_rng();
130+
let uniq_cnt = (len as f64 * uniqueness) as usize;
131+
let dict: Vec<String> = (0..uniq_cnt)
132+
.map(|_| {
133+
(&mut rng)
134+
.sample_iter(&Alphanumeric)
135+
.take(8)
136+
.map(char::from)
137+
.collect()
138+
})
139+
.collect();
140+
(0..len)
141+
.map(|_| dict.choose(&mut rng).unwrap().clone())
142+
.collect()
143+
}
144+
145+
criterion_group!(benches, primitive, strings);
96146
criterion_main!(benches);

0 commit comments

Comments
 (0)