Skip to content

Commit c924f80

Browse files
authored
Compress offsets for FSST codes (#2901)
1 parent 9348644 commit c924f80

File tree

6 files changed

+49
-29
lines changed

6 files changed

+49
-29
lines changed

encodings/fsst/src/array.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
use fsst::{Decompressor, Symbol};
2-
use vortex_array::arrays::VarBinEncoding;
2+
use vortex_array::arrays::VarBinArray;
33
use vortex_array::stats::{ArrayStats, StatsSetRef};
44
use vortex_array::variants::{BinaryArrayTrait, Utf8ArrayTrait};
5-
use vortex_array::vtable::{EncodingVTable, VTableRef};
5+
use vortex_array::vtable::VTableRef;
66
use vortex_array::{
77
Array, ArrayImpl, ArrayRef, ArrayStatisticsImpl, ArrayValidityImpl, ArrayVariantsImpl,
88
Encoding, SerdeMetadata,
99
};
1010
use vortex_buffer::Buffer;
1111
use vortex_dtype::DType;
12-
use vortex_error::{VortexResult, vortex_bail};
12+
use vortex_error::{VortexResult, vortex_bail, vortex_err};
1313
use vortex_mask::Mask;
1414

1515
use crate::serde::FSSTMetadata;
@@ -19,7 +19,7 @@ pub struct FSSTArray {
1919
dtype: DType,
2020
symbols: Buffer<Symbol>,
2121
symbol_lengths: Buffer<u8>,
22-
codes: ArrayRef,
22+
codes: VarBinArray,
2323
/// Lengths of the original values before compression, can be compressed.
2424
uncompressed_lengths: ArrayRef,
2525
stats_set: ArrayStats,
@@ -44,7 +44,7 @@ impl FSSTArray {
4444
dtype: DType,
4545
symbols: Buffer<Symbol>,
4646
symbol_lengths: Buffer<u8>,
47-
codes: ArrayRef,
47+
codes: VarBinArray,
4848
uncompressed_lengths: ArrayRef,
4949
) -> VortexResult<Self> {
5050
// Check: symbols must not have length > MAX_CODE
@@ -63,13 +63,6 @@ impl FSSTArray {
6363
vortex_bail!(InvalidArgument: "uncompressed_lengths must have integer type and cannot be nullable, found {}", uncompressed_lengths.dtype());
6464
}
6565

66-
if codes.encoding() != VarBinEncoding.id() {
67-
vortex_bail!(
68-
InvalidArgument: "codes must have varbin encoding, was {}",
69-
codes.encoding()
70-
);
71-
}
72-
7366
// Check: strings must be a Binary array.
7467
if !matches!(codes.dtype(), DType::Binary(_)) {
7568
vortex_bail!(InvalidArgument: "codes array must be DType::Binary type");
@@ -96,7 +89,7 @@ impl FSSTArray {
9689
}
9790

9891
/// Access the codes array
99-
pub fn codes(&self) -> &ArrayRef {
92+
pub fn codes(&self) -> &VarBinArray {
10093
&self.codes
10194
}
10295

@@ -142,7 +135,11 @@ impl ArrayImpl for FSSTArray {
142135
}
143136

144137
fn _with_children(&self, children: &[ArrayRef]) -> VortexResult<Self> {
145-
let codes = children[0].clone();
138+
let codes = children[0]
139+
.as_any()
140+
.downcast_ref::<VarBinArray>()
141+
.ok_or_else(|| vortex_err!("FSSTArray codes must be a VarBinArray"))?
142+
.clone();
146143
let uncompressed_lengths = children[1].clone();
147144

148145
Self::try_new(

encodings/fsst/src/canonical.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use fsst::Decompressor;
2-
use vortex_array::arrays::{BinaryView, VarBinArray, VarBinViewArray};
2+
use vortex_array::arrays::{BinaryView, VarBinViewArray};
33
use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
44
use vortex_array::validity::Validity;
55
use vortex_array::variants::PrimitiveArrayTrait;
6-
use vortex_array::{Array, ArrayCanonicalImpl, ArrayExt, Canonical, IntoArray, ToCanonical};
6+
use vortex_array::{Array, ArrayCanonicalImpl, Canonical, IntoArray, ToCanonical};
77
use vortex_buffer::{BufferMut, ByteBuffer, ByteBufferMut};
88
use vortex_dtype::match_each_integer_ptype;
99
use vortex_error::VortexResult;
@@ -47,7 +47,7 @@ fn fsst_into_varbin_view(
4747
// To speed up canonicalization, we can decompress the entire string-heap in a single
4848
// call. We then turn our uncompressed_lengths into an offsets buffer
4949
// necessary for a VarBinViewArray and construct the canonical array.
50-
let bytes = fsst_array.codes().as_::<VarBinArray>().sliced_bytes();
50+
let bytes = fsst_array.codes().sliced_bytes();
5151

5252
let uncompressed_lens_array = fsst_array.uncompressed_lengths().to_primitive()?;
5353

encodings/fsst/src/compress.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,7 @@ where
110110
}
111111
}
112112

113-
let codes = builder
114-
.finish(DType::Binary(dtype.nullability()))
115-
.into_array();
113+
let codes = builder.finish(DType::Binary(dtype.nullability()));
116114
let symbols: Buffer<Symbol> = Buffer::copy_from(compressor.symbol_table());
117115
let symbol_lengths: Buffer<u8> = Buffer::<u8>::copy_from(compressor.symbol_lengths());
118116

encodings/fsst/src/compute/mod.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
mod compare;
22

3-
use vortex_array::arrays::varbin_scalar;
3+
use vortex_array::arrays::{VarBinArray, varbin_scalar};
44
use vortex_array::builders::ArrayBuilder;
55
use vortex_array::compute::{
66
CompareFn, FilterKernel, FilterKernelAdapter, KernelRef, ScalarAtFn, SliceFn, TakeFn,
77
fill_null, filter, scalar_at, slice, take,
88
};
99
use vortex_array::vtable::ComputeVTable;
10-
use vortex_array::{Array, ArrayComputeImpl, ArrayRef};
10+
use vortex_array::{Array, ArrayComputeImpl, ArrayExt, ArrayRef};
1111
use vortex_buffer::ByteBuffer;
1212
use vortex_error::{VortexResult, vortex_err};
1313
use vortex_mask::Mask;
@@ -44,7 +44,9 @@ impl SliceFn<&FSSTArray> for FSSTEncoding {
4444
array.dtype().clone(),
4545
array.symbols().clone(),
4646
array.symbol_lengths().clone(),
47-
slice(array.codes(), start, stop)?,
47+
slice(array.codes(), start, stop)?
48+
.as_::<VarBinArray>()
49+
.clone(),
4850
slice(array.uncompressed_lengths(), start, stop)?,
4951
)?
5052
.into_array())
@@ -58,7 +60,7 @@ impl TakeFn<&FSSTArray> for FSSTEncoding {
5860
array.dtype().clone(),
5961
array.symbols().clone(),
6062
array.symbol_lengths().clone(),
61-
take(array.codes(), indices)?,
63+
take(array.codes(), indices)?.as_::<VarBinArray>().clone(),
6264
fill_null(
6365
&take(array.uncompressed_lengths(), indices)?,
6466
Scalar::new(
@@ -101,7 +103,7 @@ impl FilterKernel for FSSTEncoding {
101103
array.dtype().clone(),
102104
array.symbols().clone(),
103105
array.symbol_lengths().clone(),
104-
filter(array.codes(), mask)?,
106+
filter(array.codes(), mask)?.as_::<VarBinArray>().clone(),
105107
filter(array.uncompressed_lengths(), mask)?,
106108
)?
107109
.into_array())

encodings/fsst/src/serde.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use fsst::{Compressor, Symbol};
22
use serde::{Deserialize, Serialize};
3+
use vortex_array::arrays::VarBinArray;
34
use vortex_array::serde::ArrayParts;
45
use vortex_array::vtable::EncodingVTable;
56
use vortex_array::{
@@ -42,7 +43,15 @@ impl EncodingVTable for FSSTEncoding {
4243
}
4344
let codes = parts
4445
.child(0)
45-
.decode(ctx, DType::Binary(dtype.nullability()), len)?;
46+
.decode(ctx, DType::Binary(dtype.nullability()), len)?
47+
.as_opt::<VarBinArray>()
48+
.ok_or_else(|| {
49+
vortex_err!(
50+
"Expected VarBinArray for codes, got {:?}",
51+
ctx.lookup_encoding(parts.child(0).encoding_id())
52+
)
53+
})?
54+
.clone();
4655
let uncompressed_lengths = parts.child(1).decode(
4756
ctx,
4857
DType::Primitive(

vortex-btrblocks/src/string.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use vortex_array::aliases::hash_set::HashSet;
2-
use vortex_array::arrays::VarBinViewArray;
2+
use vortex_array::arrays::{VarBinArray, VarBinViewArray};
33
use vortex_array::{Array, ArrayRef, ToCanonical};
44
use vortex_dict::DictArray;
55
use vortex_dict::builders::dict_encode;
@@ -235,15 +235,29 @@ impl Scheme for FSSTScheme {
235235
let compressed_original_lengths = IntCompressor::compress(
236236
&fsst.uncompressed_lengths().to_primitive()?,
237237
is_sample,
238-
allowed_cascading - 1,
238+
allowed_cascading,
239+
&[],
240+
)?;
241+
242+
// We compress the var bin offsets of the FSST codes array.
243+
let compressed_codes_offsets = IntCompressor::compress(
244+
&fsst.codes().offsets().to_primitive()?,
245+
is_sample,
246+
allowed_cascading,
239247
&[],
240248
)?;
249+
let compressed_codes = VarBinArray::try_new(
250+
compressed_codes_offsets,
251+
fsst.codes().bytes().clone(),
252+
fsst.codes().dtype().clone(),
253+
fsst.codes().validity().clone(),
254+
)?;
241255

242256
let fsst = FSSTArray::try_new(
243257
fsst.dtype().clone(),
244258
fsst.symbols().clone(),
245259
fsst.symbol_lengths().clone(),
246-
fsst.codes().clone(),
260+
compressed_codes,
247261
compressed_original_lengths,
248262
)?;
249263

0 commit comments

Comments
 (0)