Skip to content

Commit 329c1dc

Browse files
committed
FSST Filter Kernel
Signed-off-by: Nicholas Gates <[email protected]>
1 parent f5ee1a0 commit 329c1dc

File tree

5 files changed

+217
-0
lines changed

5 files changed

+217
-0
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/fsst/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ workspace = true
1818

1919
[dependencies]
2020
fsst-rs = { workspace = true }
21+
num-traits = { workspace = true }
2122
prost = { workspace = true }
2223
rand = { workspace = true, optional = true }
2324
vortex-array = { workspace = true }
2425
vortex-buffer = { workspace = true }
26+
vortex-compute = { workspace = true }
2527
vortex-dtype = { workspace = true }
2628
vortex-error = { workspace = true }
2729
vortex-mask = { workspace = true }

encodings/fsst/src/array.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use vortex_array::ArrayHash;
1818
use vortex_array::ArrayRef;
1919
use vortex_array::Canonical;
2020
use vortex_array::DeserializeMetadata;
21+
use vortex_array::ExecutionCtx;
2122
use vortex_array::Precision;
2223
use vortex_array::ProstMetadata;
2324
use vortex_array::SerializeMetadata;
@@ -46,9 +47,11 @@ use vortex_error::VortexResult;
4647
use vortex_error::vortex_bail;
4748
use vortex_error::vortex_ensure;
4849
use vortex_error::vortex_err;
50+
use vortex_vector::Vector;
4951

5052
use crate::fsst_compress;
5153
use crate::fsst_train_compressor;
54+
use crate::kernel::PARENT_KERNELS;
5255

5356
vtable!(FSST);
5457

@@ -178,6 +181,15 @@ impl VTable for FSSTVTable {
178181

179182
Ok(())
180183
}
184+
185+
fn execute_parent(
186+
array: &Self::Array,
187+
parent: &ArrayRef,
188+
child_idx: usize,
189+
ctx: &mut ExecutionCtx,
190+
) -> VortexResult<Option<Vector>> {
191+
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
192+
}
181193
}
182194

183195
#[derive(Clone)]

encodings/fsst/src/kernel.rs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use fsst::Decompressor;
7+
use num_traits::AsPrimitive;
8+
use vortex_array::Array;
9+
use vortex_array::ExecutionCtx;
10+
use vortex_array::VectorExecutor;
11+
use vortex_array::arrays::FilterArray;
12+
use vortex_array::arrays::FilterVTable;
13+
use vortex_array::builtins::ArrayBuiltins;
14+
use vortex_array::kernel::ExecuteParentKernel;
15+
use vortex_array::kernel::ParentKernelSet;
16+
use vortex_array::matchers::Exact;
17+
use vortex_buffer::Buffer;
18+
use vortex_buffer::BufferMut;
19+
use vortex_buffer::ByteBuffer;
20+
use vortex_buffer::ByteBufferMut;
21+
use vortex_compute::filter::Filter;
22+
use vortex_dtype::DType;
23+
use vortex_dtype::IntegerPType;
24+
use vortex_dtype::Nullability;
25+
use vortex_dtype::PType;
26+
use vortex_dtype::PTypeDowncastExt;
27+
use vortex_dtype::match_each_integer_ptype;
28+
use vortex_error::VortexResult;
29+
use vortex_mask::Mask;
30+
use vortex_mask::MaskValues;
31+
use vortex_vector::Vector;
32+
use vortex_vector::binaryview::BinaryVector;
33+
use vortex_vector::binaryview::BinaryView;
34+
use vortex_vector::binaryview::StringVector;
35+
36+
use crate::FSSTArray;
37+
use crate::FSSTVTable;
38+
39+
pub(super) const PARENT_KERNELS: ParentKernelSet<FSSTVTable> =
40+
ParentKernelSet::new(&[ParentKernelSet::lift(&FSSTFilterKernel)]);
41+
42+
#[derive(Debug)]
43+
struct FSSTFilterKernel;
44+
45+
impl ExecuteParentKernel<FSSTVTable> for FSSTFilterKernel {
46+
type Parent = Exact<FilterVTable>;
47+
48+
fn parent(&self) -> Self::Parent {
49+
Exact::from(&FilterVTable)
50+
}
51+
52+
fn execute_parent(
53+
&self,
54+
array: &FSSTArray,
55+
parent: &FilterArray,
56+
_child_idx: usize,
57+
ctx: &mut ExecutionCtx,
58+
) -> VortexResult<Option<Vector>> {
59+
let mask_values = match parent.filter_mask() {
60+
Mask::AllTrue(_) | Mask::AllFalse(_) => return Ok(None),
61+
Mask::Values(v) => v,
62+
};
63+
64+
// We filter the uncompressed lengths
65+
let uncompressed_lens = array
66+
.uncompressed_lengths()
67+
.filter(parent.filter_mask().clone())?
68+
.execute(ctx)?
69+
.into_primitive();
70+
71+
// Extract the filtered validity
72+
let validity = array.codes().validity_mask().filter(parent.filter_mask());
73+
74+
// First we unpack the codes VarBinArray to get access to the raw data.
75+
let codes_data = array.codes().bytes();
76+
let codes_offsets = array
77+
.codes()
78+
.offsets()
79+
.cast(DType::Primitive(PType::U32, Nullability::NonNullable))?
80+
.execute(ctx)?
81+
.into_primitive()
82+
.downcast::<u32>()
83+
.into_nonnull_buffer();
84+
85+
let decompressor = array.decompressor();
86+
87+
let (views, buffer) = match_each_integer_ptype!(uncompressed_lens.ptype(), |S| {
88+
fsst_decode::<S>(
89+
decompressor,
90+
codes_data,
91+
&codes_offsets,
92+
mask_values,
93+
&validity,
94+
&uncompressed_lens.downcast::<S>().into_nonnull_buffer(),
95+
)
96+
});
97+
98+
let vector = match array.dtype() {
99+
DType::Binary(_) => unsafe {
100+
BinaryVector::new_unchecked(views, Arc::new(vec![buffer].into()), validity)
101+
}
102+
.into(),
103+
DType::Utf8(_) => unsafe {
104+
StringVector::new_unchecked(views, Arc::new(vec![buffer].into()), validity)
105+
}
106+
.into(),
107+
_ => unreachable!("Not a supported FSST DType"),
108+
};
109+
110+
Ok(Some(vector))
111+
}
112+
}
113+
114+
fn fsst_decode<S: IntegerPType + AsPrimitive<usize> + AsPrimitive<u32>>(
115+
decompressor: Decompressor,
116+
codes_data: &[u8],
117+
codes_offsets: &[u32],
118+
filter_mask: &MaskValues,
119+
filtered_validity: &Mask,
120+
filtered_uncompressed_lengths: &[S],
121+
) -> (Buffer<BinaryView>, ByteBuffer) {
122+
let total_uncompressed_size: usize = filtered_uncompressed_lengths
123+
.iter()
124+
.map(|x| <S as AsPrimitive<usize>>::as_(*x))
125+
.sum();
126+
127+
// We allocate an extra 7 bytes per the FSST decompressor's requirement for padding.
128+
let mut uncompressed = ByteBufferMut::with_capacity(total_uncompressed_size + 7);
129+
let mut spare_capacity = uncompressed.spare_capacity_mut();
130+
131+
match filtered_validity {
132+
Mask::AllTrue(_) => {
133+
for &idx in filter_mask.indices() {
134+
let start = codes_offsets[idx] as usize;
135+
let end = codes_offsets[idx + 1] as usize;
136+
let compressed_slice = &codes_data[start..end];
137+
138+
let uncompressed_len =
139+
decompressor.decompress_into(compressed_slice, spare_capacity);
140+
spare_capacity = &mut spare_capacity[uncompressed_len..];
141+
}
142+
}
143+
Mask::AllFalse(_) => {
144+
// Nothing to decompress
145+
unsafe { uncompressed.set_len(0) };
146+
return (Buffer::empty(), uncompressed.freeze());
147+
}
148+
Mask::Values(values) => {
149+
for (idx, is_valid) in filter_mask
150+
.indices()
151+
.iter()
152+
.copied()
153+
.zip(values.bit_buffer().iter())
154+
{
155+
if is_valid {
156+
let start = codes_offsets[idx] as usize;
157+
let end = codes_offsets[idx + 1] as usize;
158+
let compressed_slice = &codes_data[start..end];
159+
160+
let uncompressed_len =
161+
decompressor.decompress_into(compressed_slice, spare_capacity);
162+
spare_capacity = &mut spare_capacity[uncompressed_len..];
163+
} else {
164+
// We advance the output buffer to make it faster to assemble views below.
165+
spare_capacity =
166+
&mut spare_capacity[filtered_uncompressed_lengths[idx].as_()..];
167+
}
168+
}
169+
}
170+
}
171+
172+
for &idx in filter_mask.indices() {
173+
let start = codes_offsets[idx] as usize;
174+
let end = codes_offsets[idx + 1] as usize;
175+
let compressed_slice = &codes_data[start..end];
176+
177+
let uncompressed_len = decompressor.decompress_into(compressed_slice, spare_capacity);
178+
spare_capacity = &mut spare_capacity[uncompressed_len..];
179+
}
180+
181+
unsafe { uncompressed.set_len(total_uncompressed_size) };
182+
let uncompressed = uncompressed.freeze();
183+
let uncompressed_slice = uncompressed.as_ref();
184+
185+
// Loop over the uncompressed lengths to construct the BinaryViews.
186+
let mut views = BufferMut::<BinaryView>::with_capacity(filtered_uncompressed_lengths.len());
187+
let mut offset = 0u32;
188+
for len in filtered_uncompressed_lengths {
189+
let view = BinaryView::make_view(
190+
&uncompressed_slice[offset as usize..][..len.as_()],
191+
0u32,
192+
offset,
193+
);
194+
offset += <S as AsPrimitive<u32>>::as_(*len);
195+
unsafe { views.push_unchecked(view) };
196+
}
197+
unsafe { views.set_len(filtered_uncompressed_lengths.len()) };
198+
199+
(views.freeze(), uncompressed)
200+
}

encodings/fsst/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod array;
1515
mod canonical;
1616
mod compress;
1717
mod compute;
18+
mod kernel;
1819
mod ops;
1920
#[cfg(feature = "test-harness")]
2021
pub mod test_utils;

0 commit comments

Comments
 (0)