Skip to content

Commit 96c915b

Browse files
AdamGSgatesnrobert3005
authored
feat: Use previous chunks to inform compressions decisions (#2724)
Followup of #2723, re-encoding array children according to a known well-compressed chunk from the `BtrBlockCompressor`. Current [benchmarks](#2724 (comment)) seem to show that compress throughput is up, with decompression slightly slower and query times are up by up to 10%. Two interesting points are that wide columned arrays are basically always slower to compress but decompression held-up better. Arade is also the one dataset where compressed files shrunk significantly, regardless of throughput. I think these tradeoffs should be configurable, and I assume that increasing the tolerance (currently hard coded at 20%) for compression ratio drift will allow users to control this knob in a way that better fits their usecases. Other benchmarks (because there are so many commits/comments on this PR): - [Clickbench on NVME](#2724 (comment)) - [TPCH on NVME](#2724 (comment)) - [Random Access](#2724 (comment)) - [TPC-H on S3](#2724 (comment)) - Note that it doesn't actually generate new files here, so the results aren't useful. --------- Co-authored-by: Nicholas Gates <[email protected]> Co-authored-by: Robert Kruszewski <[email protected]>
1 parent 516f922 commit 96c915b

File tree

68 files changed

+997
-257
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+997
-257
lines changed

bench-vortex/src/bin/compress.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@ fn compress(
118118
let vx_array =
119119
runtime.block_on(async { dataset_handle.to_vortex_array().await });
120120
ChunkedArray::from_iter(vx_array.as_::<ChunkedArray>().chunks().iter().map(
121-
|c| {
122-
let mut builder = builder_with_capacity(c.dtype(), c.len());
123-
c.append_to_builder(builder.as_mut()).unwrap();
121+
|chunk| {
122+
let mut builder = builder_with_capacity(chunk.dtype(), chunk.len());
123+
chunk.append_to_builder(builder.as_mut()).unwrap();
124124
builder.finish()
125125
},
126126
))

bench-vortex/src/bin/notimplemented.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ use vortex::encodings::fastlanes::{BitPackedArray, DeltaArray, FoRArray};
2121
use vortex::encodings::fsst::{fsst_compress, fsst_train_compressor};
2222
use vortex::encodings::runend::RunEndArray;
2323
use vortex::encodings::sparse::SparseArray;
24-
use vortex::encodings::zigzag::ZigZagArray;
24+
use vortex::encodings::zigzag::ZigZagEncoding;
2525
use vortex::scalar::Scalar;
2626
use vortex::validity::Validity;
27+
use vortex::vtable::EncodingVTable;
2728
use vortex::{Array, ArrayRef, IntoArray};
2829

2930
fn fsst_array() -> ArrayRef {
@@ -140,7 +141,12 @@ fn enc_impls() -> Vec<ArrayRef> {
140141
.into_array(),
141142
varbin_array(),
142143
varbinview_array(),
143-
ZigZagArray::encode(&buffer![-1, 1, -9, 9].into_array())
144+
ZigZagEncoding
145+
.encode(
146+
&buffer![-1, 1, -9, 9].into_array().to_canonical().unwrap(),
147+
None,
148+
)
149+
.unwrap()
144150
.unwrap()
145151
.into_array(),
146152
]

bench-vortex/src/clickbench.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator};
1515
use reqwest::IntoUrl;
1616
use reqwest::blocking::Response;
1717
use tokio::fs::{OpenOptions, create_dir_all};
18+
use tokio::io::AsyncWriteExt;
1819
use tracing::{info, warn};
1920
use url::Url;
2021
use vortex::TryIntoArray;
@@ -208,7 +209,11 @@ pub async fn convert_parquet_to_vortex(
208209
.open(&vtx_file)
209210
.await?;
210211

211-
VortexWriteOptions::default().write(f, array_stream).await?;
212+
VortexWriteOptions::default()
213+
.write(f, array_stream)
214+
.await?
215+
.flush()
216+
.await?;
212217

213218
anyhow::Ok(())
214219
})

bench-vortex/src/compress/bench.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ pub fn benchmark_compress<F>(
5656
where
5757
F: Fn() -> ArrayRef,
5858
{
59+
tracing::info!("Running {bench_name} benchmark");
5960
let uncompressed = make_uncompressed();
6061
let uncompressed_size = uncompressed.nbytes();
6162
let compressed_size = AtomicU64::default();

bench-vortex/src/datasets/public_bi_data.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use humansize::{DECIMAL, format_size};
1313
use log::{debug, info};
1414
use reqwest::Url;
1515
use tokio::fs::File;
16+
use tokio::io::AsyncWriteExt;
1617
use vortex::aliases::hash_map::HashMap;
1718
use vortex::arrays::ChunkedArray;
1819
use vortex::error::{VortexExpect, VortexResult, vortex_err};
@@ -521,12 +522,16 @@ impl PBIDataset {
521522
let compressed = idempotent_async(
522523
&self.path_for_file_type(output_fname, FileType::Vortex),
523524
|output_path| async {
524-
VortexWriteOptions::default()
525+
let mut f = VortexWriteOptions::default()
525526
.write(
526527
File::create(output_path).await.unwrap(),
527528
parquet_to_vortex(f).await.unwrap(),
528529
)
529-
.await
530+
.await?;
531+
532+
f.flush().await?;
533+
534+
VortexResult::Ok(())
530535
},
531536
)
532537
.await

bench-vortex/src/datasets/taxi_data.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::path::PathBuf;
22

33
use async_trait::async_trait;
44
use tokio::fs::File;
5+
use tokio::io::AsyncWriteExt;
56
use vortex::ArrayRef;
67
use vortex::error::VortexError;
78
use vortex::file::{VortexOpenOptions, VortexWriteOptions};
@@ -53,6 +54,8 @@ pub async fn taxi_data_vortex() -> PathBuf {
5354
output_file,
5455
parquet_to_vortex(taxi_data_parquet()).await.unwrap(),
5556
)
57+
.await?
58+
.flush()
5659
.await?;
5760
Ok::<PathBuf, VortexError>(buf)
5861
})

encodings/alp/benches/alp_compress.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ fn compress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64,
5858
bencher
5959
.with_inputs(|| (values.clone(), validity.clone()))
6060
.bench_values(|(values, validity)| {
61-
alp_encode(&PrimitiveArray::new(values, validity)).unwrap()
61+
alp_encode(&PrimitiveArray::new(values, validity), None).unwrap()
6262
})
6363
}
6464

@@ -80,7 +80,7 @@ fn decompress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64
8080
Validity::NonNullable
8181
};
8282
let values = values.freeze();
83-
let array = alp_encode(&PrimitiveArray::new(values, validity)).unwrap();
83+
let array = alp_encode(&PrimitiveArray::new(values, validity), None).unwrap();
8484
bencher
8585
.with_inputs(|| array.clone())
8686
.bench_values(|array| array.to_canonical().unwrap());

encodings/alp/src/alp/array.rs

Lines changed: 17 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,26 @@
11
use std::fmt::Debug;
22

3-
use vortex_array::arrays::PrimitiveArray;
43
use vortex_array::patches::Patches;
54
use vortex_array::stats::{ArrayStats, StatsSetRef};
65
use vortex_array::variants::PrimitiveArrayTrait;
76
use vortex_array::vtable::VTableRef;
87
use vortex_array::{
9-
Array, ArrayCanonicalImpl, ArrayExt, ArrayImpl, ArrayRef, ArrayStatisticsImpl,
10-
ArrayValidityImpl, ArrayVariantsImpl, Canonical, Encoding, SerdeMetadata,
8+
Array, ArrayCanonicalImpl, ArrayImpl, ArrayRef, ArrayStatisticsImpl, ArrayValidityImpl,
9+
ArrayVariantsImpl, Canonical, Encoding, SerdeMetadata,
1110
};
1211
use vortex_dtype::{DType, PType};
1312
use vortex_error::{VortexResult, vortex_bail};
1413
use vortex_mask::Mask;
1514

1615
use crate::alp::serde::ALPMetadata;
17-
use crate::alp::{Exponents, alp_encode, decompress};
16+
use crate::alp::{Exponents, decompress};
1817

1918
#[derive(Clone, Debug)]
2019
pub struct ALPArray {
21-
dtype: DType,
2220
encoded: ArrayRef,
23-
exponents: Exponents,
2421
patches: Option<Patches>,
22+
dtype: DType,
23+
exponents: Exponents,
2524
stats_set: ArrayStats,
2625
}
2726

@@ -52,14 +51,6 @@ impl ALPArray {
5251
})
5352
}
5453

55-
pub fn encode(array: ArrayRef) -> VortexResult<ArrayRef> {
56-
if let Some(parray) = array.as_opt::<PrimitiveArray>() {
57-
Ok(alp_encode(parray)?.into_array())
58-
} else {
59-
vortex_bail!("ALP can only encode primitive arrays");
60-
}
61-
}
62-
6354
pub fn encoded(&self) -> &ArrayRef {
6455
&self.encoded
6556
}
@@ -88,6 +79,18 @@ impl ArrayImpl for ALPArray {
8879
fn _vtable(&self) -> VTableRef {
8980
VTableRef::new_ref(&ALPEncoding)
9081
}
82+
83+
fn _with_children(&self, children: &[ArrayRef]) -> VortexResult<Self> {
84+
let encoded = children[0].clone();
85+
86+
let patches = self.patches().map(|existing| {
87+
let indices = children[1].clone();
88+
let values = children[2].clone();
89+
Patches::new(existing.array_len(), existing.offset(), indices, values)
90+
});
91+
92+
ALPArray::try_new(encoded, self.exponents(), patches)
93+
}
9194
}
9295

9396
impl ArrayCanonicalImpl for ALPArray {
@@ -135,29 +138,3 @@ impl ArrayVariantsImpl for ALPArray {
135138
}
136139

137140
impl PrimitiveArrayTrait for ALPArray {}
138-
139-
#[cfg(test)]
140-
mod tests {
141-
use vortex_array::SerdeMetadata;
142-
use vortex_array::patches::PatchesMetadata;
143-
use vortex_array::test_harness::check_metadata;
144-
use vortex_dtype::PType;
145-
146-
use crate::Exponents;
147-
use crate::alp::serde::ALPMetadata;
148-
149-
#[cfg_attr(miri, ignore)]
150-
#[test]
151-
fn test_alp_metadata() {
152-
check_metadata(
153-
"alp.metadata",
154-
SerdeMetadata(ALPMetadata {
155-
patches: Some(PatchesMetadata::new(usize::MAX, usize::MAX, PType::U64)),
156-
exponents: Exponents {
157-
e: u8::MAX,
158-
f: u8::MAX,
159-
},
160-
}),
161-
);
162-
}
163-
}

encodings/alp/src/alp/compress.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,20 @@ macro_rules! match_each_alp_float_ptype {
2828
})
2929
}
3030

31-
pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult<ALPArray> {
32-
let (exponents, encoded, patches) = alp_encode_components(parray)?;
33-
ALPArray::try_new(encoded, exponents, patches)
34-
}
35-
36-
pub fn alp_encode_components(
37-
parray: &PrimitiveArray,
38-
) -> VortexResult<(Exponents, ArrayRef, Option<Patches>)> {
39-
match parray.ptype() {
40-
PType::F32 => alp_encode_components_typed::<f32>(parray),
41-
PType::F64 => alp_encode_components_typed::<f64>(parray),
31+
pub fn alp_encode(parray: &PrimitiveArray, exponents: Option<Exponents>) -> VortexResult<ALPArray> {
32+
let (exponents, encoded, patches) = match parray.ptype() {
33+
PType::F32 => alp_encode_components_typed::<f32>(parray, exponents)?,
34+
PType::F64 => alp_encode_components_typed::<f64>(parray, exponents)?,
4235
_ => vortex_bail!("ALP can only encode f32 and f64"),
43-
}
36+
};
37+
38+
ALPArray::try_new(encoded, exponents, patches)
4439
}
4540

4641
#[allow(clippy::cast_possible_truncation)]
4742
fn alp_encode_components_typed<T>(
4843
values: &PrimitiveArray,
44+
exponents: Option<Exponents>,
4945
) -> VortexResult<(Exponents, ArrayRef, Option<Patches>)>
5046
where
5147
T: ALPFloat + NativePType,
@@ -55,7 +51,7 @@ where
5551
let values_slice = values.as_slice::<T>();
5652

5753
let (exponents, encoded, exceptional_positions, exceptional_values) =
58-
T::encode(values_slice, None);
54+
T::encode(values_slice, exponents);
5955

6056
let encoded_array = PrimitiveArray::new(encoded, values.validity().clone()).into_array();
6157

@@ -132,7 +128,7 @@ mod tests {
132128
#[test]
133129
fn test_compress() {
134130
let array = PrimitiveArray::new(buffer![1.234f32; 1025], Validity::NonNullable);
135-
let encoded = alp_encode(&array).unwrap();
131+
let encoded = alp_encode(&array, None).unwrap();
136132
assert!(encoded.patches().is_none());
137133
assert_eq!(
138134
encoded.encoded().to_primitive().unwrap().as_slice::<i32>(),
@@ -147,7 +143,7 @@ mod tests {
147143
#[test]
148144
fn test_nullable_compress() {
149145
let array = PrimitiveArray::from_option_iter([None, Some(1.234f32), None]);
150-
let encoded = alp_encode(&array).unwrap();
146+
let encoded = alp_encode(&array, None).unwrap();
151147
assert!(encoded.patches().is_none());
152148
assert_eq!(
153149
encoded.encoded().to_primitive().unwrap().as_slice::<i32>(),
@@ -165,7 +161,7 @@ mod tests {
165161
fn test_patched_compress() {
166162
let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0];
167163
let array = PrimitiveArray::new(values.clone(), Validity::NonNullable);
168-
let encoded = alp_encode(&array).unwrap();
164+
let encoded = alp_encode(&array, None).unwrap();
169165
assert!(encoded.patches().is_some());
170166
assert_eq!(
171167
encoded.encoded().to_primitive().unwrap().as_slice::<i64>(),
@@ -182,7 +178,7 @@ mod tests {
182178
fn test_compress_ignores_invalid_exceptional_values() {
183179
let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0];
184180
let array = PrimitiveArray::new(values, Validity::from_iter([true, true, false, true]));
185-
let encoded = alp_encode(&array).unwrap();
181+
let encoded = alp_encode(&array, None).unwrap();
186182
assert!(encoded.patches().is_none());
187183
assert_eq!(
188184
encoded.encoded().to_primitive().unwrap().as_slice::<i64>(),
@@ -216,7 +212,7 @@ mod tests {
216212
Some(4.0),
217213
None,
218214
]);
219-
let encoded = alp_encode(&array).unwrap();
215+
let encoded = alp_encode(&array, None).unwrap();
220216
assert!(encoded.patches().is_some());
221217

222218
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });
@@ -236,7 +232,7 @@ mod tests {
236232
#[test]
237233
fn roundtrips_close_fractional() {
238234
let original = PrimitiveArray::from_iter([195.26274f32, 195.27837, -48.815685]);
239-
let alp_arr = alp_encode(&original).unwrap();
235+
let alp_arr = alp_encode(&original, None).unwrap();
240236
let decompressed = alp_arr.to_primitive().unwrap();
241237
assert_eq!(original.as_slice::<f32>(), decompressed.as_slice::<f32>());
242238
}
@@ -247,7 +243,7 @@ mod tests {
247243
Buffer::from_iter([195.26274f64, f64::consts::PI, -48.815685]),
248244
Validity::AllInvalid,
249245
);
250-
let alp_arr = alp_encode(&original).unwrap();
246+
let alp_arr = alp_encode(&original, None).unwrap();
251247
let decompressed = alp_arr.to_primitive().unwrap();
252248
assert_eq!(
253249
// The second and third values become exceptions and are replaced
@@ -275,7 +271,7 @@ mod tests {
275271
buffer![0.0f32, -0.0, f32::NAN, f32::NEG_INFINITY, f32::INFINITY],
276272
Validity::NonNullable,
277273
);
278-
let encoded = alp_encode(&original).unwrap();
274+
let encoded = alp_encode(&original, None).unwrap();
279275
let decoded = encoded.to_primitive().unwrap();
280276
for idx in 0..original.len() {
281277
let decoded_val = decoded.as_slice::<f32>()[idx];

0 commit comments

Comments
 (0)