Skip to content

Commit f420bca

Browse files
authored
feat: btrblocks compressor v0 (#2247)
This PR is meant to replicate the compressor logic of btrblocks. BtrBlocks compressor has separate schemes for each DType. Currently starting with the integer codecs since that has the most complex search tree with the current sampling compressor. ### Overview - We have a new `Scheme` trait, which is the interface for implementing compressors. Schemes are specialized based on the input array + statistics. For example, `IntegerScheme` is a Scheme that operates over `PrimitiveArray`s that have been summarized with an `IntegerStats`, and `FloatScheme` is one that specializes over PrimitiveArray with `FloatStats`. - Every Scheme can be provided a stats instance and use it to estimate the compression ratio that recursively compressing would produce. For example, for integers there is a `DictScheme` which based on the distinct values and run count, will try and estimate what Dict -> RLE + BP codes would produce. - Schemes can also be applied directly via their `compress` method, which will eagerly perform recursive compression On our PBI compression benchmarks, `Compress::choose_scheme` does seem to only take ~1% of walltime, which matches the BtrBlocks paper result. The goal is to FLUP and wire this compressor in as the block-level compressor we apply when writing chunked layouts.
1 parent 33e9d1f commit f420bca

File tree

23 files changed

+3162
-4
lines changed

23 files changed

+3162
-4
lines changed

Cargo.lock

Lines changed: 53 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ members = [
66
"pyvortex",
77
"vortex",
88
"vortex-array",
9+
"vortex-btrblocks",
910
"vortex-buffer",
1011
"vortex-datafusion",
1112
"vortex-datetime-dtype",
@@ -165,6 +166,7 @@ witchcraft-metrics = "1.0.1"
165166
vortex = { version = "0.24.0", path = "./vortex" }
166167
vortex-alp = { version = "0.24.0", path = "./encodings/alp" }
167168
vortex-array = { version = "0.24.0", path = "./vortex-array" }
169+
vortex-btrblocks = { version = "0.24.0", path = "./vortex-btrblocks" }
168170
vortex-buffer = { version = "0.24.0", path = "./vortex-buffer" }
169171
vortex-bytebool = { version = "0.24.0", path = "./encodings/bytebool" }
170172
vortex-datafusion = { version = "0.24.0", path = "./vortex-datafusion" }

bench-vortex/benches/compress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ fn benchmark_compress<F, U>(
175175
group.throughput(Throughput::Bytes(uncompressed_size as u64));
176176
measurement_time.map(|t| group.measurement_time(t));
177177
group.bench_function(bench_name, |b| {
178-
b.iter_with_large_drop(|| {
178+
b.iter(|| {
179179
compressed_size =
180180
vortex_compressed_written_size(runtime, uncompressed.as_ref()).unwrap();
181181
});

encodings/fastlanes/src/bitpacking/compress.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ pub fn bitpack_encode(array: PrimitiveArray, bit_width: u8) -> VortexResult<BitP
4343

4444
if bit_width >= array.ptype().bit_width() as u8 {
4545
// Nothing we can do
46-
vortex_bail!("Cannot pack -- specified bit width is greater than or equal to raw bit width")
46+
vortex_bail!(
47+
"Cannot pack -- specified bit width {bit_width} >= {}",
48+
array.ptype().bit_width()
49+
)
4750
}
4851

4952
// SAFETY: we check that array only contains non-negative values.

vortex-array/src/array/datetime/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,11 @@ impl TemporalArray {
180180
pub fn ext_dtype(&self) -> Arc<ExtDType> {
181181
self.ext.ext_dtype().clone()
182182
}
183+
184+
/// Retrieve the DType of the array. This will be a `DType::Extension` variant.
185+
pub fn dtype(&self) -> &DType {
186+
self.ext.dtype()
187+
}
183188
}
184189

185190
impl From<TemporalArray> for Array {

vortex-btrblocks/Cargo.toml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
[package]
2+
name = "vortex-btrblocks"
3+
description = "BtrBlocks style compressor"
4+
version.workspace = true
5+
homepage.workspace = true
6+
repository.workspace = true
7+
authors.workspace = true
8+
license.workspace = true
9+
keywords.workspace = true
10+
include.workspace = true
11+
edition.workspace = true
12+
rust-version.workspace = true
13+
readme.workspace = true
14+
categories.workspace = true
15+
16+
[dependencies]
17+
arrow-buffer = { workspace = true }
18+
itertools = { workspace = true }
19+
log = { workspace = true }
20+
num-traits = { workspace = true }
21+
rand = { workspace = true }
22+
rustc-hash = { workspace = true }
23+
vortex-array = { workspace = true }
24+
vortex-alp = { workspace = true }
25+
vortex-buffer = { workspace = true }
26+
vortex-datetime-dtype = { workspace = true }
27+
vortex-datetime-parts = { workspace = true }
28+
vortex-dict = { workspace = true }
29+
vortex-dtype = { workspace = true }
30+
vortex-error = { workspace = true }
31+
vortex-fastlanes = { workspace = true }
32+
vortex-fsst = { workspace = true }
33+
vortex-mask = { workspace = true }
34+
vortex-scalar = { workspace = true }
35+
vortex-sparse = { workspace = true }
36+
vortex-runend = { workspace = true }
37+
vortex-zigzag = { workspace = true }
38+
39+
[dev-dependencies]
40+
divan = { workspace = true }
41+
env_logger = "0.11"
42+
vortex-sampling-compressor = { workspace = true }
43+
44+
[lints]
45+
workspace = true
46+
47+
[[bench]]
48+
name = "compress"
49+
harness = false
50+
test = false
51+
52+
[[bench]]
53+
name = "dict_encode"
54+
harness = false
55+
test = false
56+
57+
[[bench]]
58+
name = "stats_calc"
59+
harness = false
60+
test = false
61+
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#![allow(clippy::unwrap_used)]
2+
3+
use divan::counter::{BytesCount, ItemsCount};
4+
use divan::Bencher;
5+
use rand::prelude::StdRng;
6+
use rand::{RngCore, SeedableRng};
7+
use vortex_array::aliases::hash_set::HashSet;
8+
use vortex_array::{Array, IntoArray, IntoArrayVariant};
9+
use vortex_btrblocks::integer::IntCompressor;
10+
use vortex_btrblocks::Compressor;
11+
use vortex_buffer::buffer_mut;
12+
use vortex_sampling_compressor::SamplingCompressor;
13+
14+
fn make_clickbench_window_name() -> Array {
15+
// A test that's meant to mirror the WindowName column from ClickBench.
16+
let mut values = buffer_mut![-1i32; 1_000_000];
17+
let mut visited = HashSet::new();
18+
let mut rng = StdRng::seed_from_u64(1u64);
19+
while visited.len() < 223 {
20+
let random = (rng.next_u32() as usize) % 1_000_000;
21+
if visited.contains(&random) {
22+
continue;
23+
}
24+
visited.insert(random);
25+
// Pick 100 random values to insert.
26+
values[random] = 5 * (rng.next_u64() % 100) as i32;
27+
}
28+
29+
// Ok, now let's compress
30+
values.freeze().into_array()
31+
}
32+
33+
#[divan::bench]
34+
fn btrblocks(bencher: Bencher) {
35+
bencher
36+
.with_inputs(|| make_clickbench_window_name().into_primitive().unwrap())
37+
.input_counter(|array| ItemsCount::new(array.len()))
38+
.input_counter(|array| BytesCount::of_many::<i32>(array.len()))
39+
.bench_local_values(|array| IntCompressor::compress(&array, false, 3, &[]).unwrap());
40+
}
41+
42+
#[divan::bench]
43+
fn sampling_compressor(bencher: Bencher) {
44+
let compressor = SamplingCompressor::default();
45+
bencher
46+
.with_inputs(make_clickbench_window_name)
47+
.input_counter(|array| ItemsCount::new(array.len()))
48+
.input_counter(|array| BytesCount::of_many::<i32>(array.len()))
49+
.bench_local_values(|array| compressor.compress(&array, None).unwrap());
50+
}
51+
52+
fn main() {
53+
divan::main()
54+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#![allow(clippy::unwrap_used)]
2+
3+
use divan::Bencher;
4+
use vortex_array::array::{BoolArray, PrimitiveArray};
5+
use vortex_array::validity::Validity;
6+
use vortex_array::IntoArray;
7+
use vortex_btrblocks::integer::dictionary::dictionary_encode;
8+
use vortex_btrblocks::integer::IntegerStats;
9+
use vortex_btrblocks::CompressorStats;
10+
use vortex_buffer::BufferMut;
11+
use vortex_dict::builders::dict_encode;
12+
13+
fn make_array() -> PrimitiveArray {
14+
let values: BufferMut<i32> = (0..50).cycle().take(64_000).collect();
15+
16+
let nulls = BoolArray::from_iter(
17+
[true, true, true, true, true, true, false]
18+
.into_iter()
19+
.cycle()
20+
.take(64_000),
21+
)
22+
.into_array();
23+
24+
PrimitiveArray::new(values, Validity::Array(nulls))
25+
}
26+
27+
#[divan::bench]
28+
fn encode_generic(bencher: Bencher) {
29+
bencher
30+
.with_inputs(|| make_array().into_array())
31+
.bench_local_values(|array| dict_encode(&array).unwrap());
32+
}
33+
34+
#[divan::bench]
35+
fn encode_specialized(bencher: Bencher) {
36+
bencher
37+
.with_inputs(|| IntegerStats::generate(&make_array()))
38+
.bench_local_values(|stats| dictionary_encode(&stats).unwrap());
39+
}
40+
41+
fn main() {
42+
divan::main()
43+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#![allow(clippy::cast_possible_truncation, clippy::use_debug)]
2+
3+
use vortex_buffer::{Buffer, BufferMut};
4+
5+
fn generate_dataset(max_run: u32, distinct: u32) -> Buffer<u32> {
6+
let mut output = BufferMut::with_capacity(64_000);
7+
let mut run = 0;
8+
let mut value = 0;
9+
for _ in 0..64_000 {
10+
if run == 0 {
11+
value = rand::random::<u32>() % distinct;
12+
run = std::cmp::max(rand::random::<u32>() % max_run, 1);
13+
}
14+
output.push(value);
15+
run -= 1;
16+
}
17+
18+
output.freeze()
19+
}
20+
21+
#[derive(Debug, Copy, Clone)]
22+
enum Distribution {
23+
LowCardinality,
24+
ShortRuns,
25+
LongRuns,
26+
}
27+
28+
#[divan::bench_group(items_count = 64_000u32, bytes_count = 256_000u32)]
29+
mod stats {
30+
use divan::Bencher;
31+
use vortex_array::array::PrimitiveArray;
32+
use vortex_array::validity::Validity;
33+
use vortex_btrblocks::integer::IntegerStats;
34+
use vortex_btrblocks::{CompressorStats, GenerateStatsOptions};
35+
use vortex_buffer::Buffer;
36+
37+
use crate::{generate_dataset, Distribution};
38+
39+
fn generate_low_cardinality() -> PrimitiveArray {
40+
let values: Buffer<u32> = (0..1024).cycle().take(64_000).collect();
41+
PrimitiveArray::new(values, Validity::NonNullable)
42+
}
43+
44+
fn generate_runs(max_run: u32) -> PrimitiveArray {
45+
let values = generate_dataset(max_run, 1024);
46+
PrimitiveArray::new(values, Validity::NonNullable)
47+
}
48+
49+
#[divan::bench(args = [Distribution::LowCardinality, Distribution::ShortRuns, Distribution::LongRuns])]
50+
fn stats_dict_on(bencher: Bencher, distribution: Distribution) {
51+
let values = match distribution {
52+
Distribution::LowCardinality => generate_low_cardinality(),
53+
Distribution::ShortRuns => generate_runs(4),
54+
Distribution::LongRuns => generate_runs(64),
55+
};
56+
57+
bencher.with_inputs(|| values.clone()).bench_refs(|values| {
58+
IntegerStats::generate_opts(values, GenerateStatsOptions::default());
59+
});
60+
}
61+
62+
#[divan::bench(args = [Distribution::LowCardinality, Distribution::ShortRuns, Distribution::LongRuns])]
63+
fn stats_dict_off(bencher: Bencher, distribution: Distribution) {
64+
let values = match distribution {
65+
Distribution::LowCardinality => generate_low_cardinality(),
66+
Distribution::ShortRuns => generate_runs(4),
67+
Distribution::LongRuns => generate_runs(64),
68+
};
69+
70+
bencher.with_inputs(|| values.clone()).bench_refs(|values| {
71+
IntegerStats::generate_opts(
72+
values,
73+
GenerateStatsOptions {
74+
count_distinct_values: false,
75+
},
76+
);
77+
});
78+
}
79+
}
80+
fn main() {
81+
divan::main();
82+
}

0 commit comments

Comments
 (0)