Skip to content

Commit 06f0956

Browse files
authored
FSST Filter Kernel (#5755)
Signed-off-by: Nicholas Gates <[email protected]>
1 parent 4c65aa5 commit 06f0956

File tree

7 files changed

+241
-7
lines changed

7 files changed

+241
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

encodings/fsst/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ workspace = true
1818

1919
[dependencies]
2020
fsst-rs = { workspace = true }
21+
num-traits = { workspace = true }
2122
prost = { workspace = true }
2223
rand = { workspace = true, optional = true }
2324
vortex-array = { workspace = true }

encodings/fsst/src/array.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use vortex_array::ArrayHash;
1818
use vortex_array::ArrayRef;
1919
use vortex_array::Canonical;
2020
use vortex_array::DeserializeMetadata;
21+
use vortex_array::ExecutionCtx;
2122
use vortex_array::Precision;
2223
use vortex_array::ProstMetadata;
2324
use vortex_array::SerializeMetadata;
@@ -46,9 +47,11 @@ use vortex_error::VortexResult;
4647
use vortex_error::vortex_bail;
4748
use vortex_error::vortex_ensure;
4849
use vortex_error::vortex_err;
50+
use vortex_vector::Vector;
4951

5052
use crate::fsst_compress;
5153
use crate::fsst_train_compressor;
54+
use crate::kernel::PARENT_KERNELS;
5255

5356
vtable!(FSST);
5457

@@ -178,6 +181,15 @@ impl VTable for FSSTVTable {
178181

179182
Ok(())
180183
}
184+
185+
fn execute_parent(
186+
array: &Self::Array,
187+
parent: &ArrayRef,
188+
child_idx: usize,
189+
ctx: &mut ExecutionCtx,
190+
) -> VortexResult<Option<Vector>> {
191+
PARENT_KERNELS.execute(array, parent, child_idx, ctx)
192+
}
181193
}
182194

183195
#[derive(Clone)]

encodings/fsst/src/kernel.rs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
use std::sync::Arc;
5+
6+
use fsst::Decompressor;
7+
use num_traits::AsPrimitive;
8+
use vortex_array::Array;
9+
use vortex_array::ExecutionCtx;
10+
use vortex_array::VectorExecutor;
11+
use vortex_array::arrays::FilterArray;
12+
use vortex_array::arrays::FilterVTable;
13+
use vortex_array::builtins::ArrayBuiltins;
14+
use vortex_array::kernel::ExecuteParentKernel;
15+
use vortex_array::kernel::ParentKernelSet;
16+
use vortex_array::mask::MaskExecutor;
17+
use vortex_array::matchers::Exact;
18+
use vortex_array::validity::Validity;
19+
use vortex_array::vtable::ValidityHelper;
20+
use vortex_buffer::Buffer;
21+
use vortex_buffer::BufferMut;
22+
use vortex_buffer::ByteBuffer;
23+
use vortex_buffer::ByteBufferMut;
24+
use vortex_dtype::DType;
25+
use vortex_dtype::IntegerPType;
26+
use vortex_dtype::Nullability;
27+
use vortex_dtype::PType;
28+
use vortex_dtype::PTypeDowncastExt;
29+
use vortex_dtype::match_each_integer_ptype;
30+
use vortex_error::VortexResult;
31+
use vortex_mask::Mask;
32+
use vortex_mask::MaskValues;
33+
use vortex_vector::Vector;
34+
use vortex_vector::binaryview::BinaryVector;
35+
use vortex_vector::binaryview::BinaryView;
36+
use vortex_vector::binaryview::StringVector;
37+
38+
use crate::FSSTArray;
39+
use crate::FSSTVTable;
40+
41+
pub(super) const PARENT_KERNELS: ParentKernelSet<FSSTVTable> =
42+
ParentKernelSet::new(&[ParentKernelSet::lift(&FSSTFilterKernel)]);
43+
44+
#[derive(Debug)]
45+
struct FSSTFilterKernel;
46+
47+
impl ExecuteParentKernel<FSSTVTable> for FSSTFilterKernel {
48+
type Parent = Exact<FilterVTable>;
49+
50+
fn parent(&self) -> Self::Parent {
51+
Exact::from(&FilterVTable)
52+
}
53+
54+
fn execute_parent(
55+
&self,
56+
array: &FSSTArray,
57+
parent: &FilterArray,
58+
_child_idx: usize,
59+
ctx: &mut ExecutionCtx,
60+
) -> VortexResult<Option<Vector>> {
61+
let mask_values = match parent.filter_mask() {
62+
Mask::AllTrue(_) | Mask::AllFalse(_) => return Ok(None),
63+
Mask::Values(v) => v,
64+
};
65+
66+
// We filter the uncompressed lengths
67+
let uncompressed_lens = array
68+
.uncompressed_lengths()
69+
.filter(parent.filter_mask().clone())?
70+
.execute(ctx)?
71+
.into_primitive();
72+
73+
// Extract the filtered validity
74+
let validity = match array.codes().validity().filter(parent.filter_mask())? {
75+
Validity::NonNullable | Validity::AllValid => {
76+
Mask::new_true(parent.filter_mask().true_count())
77+
}
78+
Validity::AllInvalid => Mask::new_false(parent.filter_mask().true_count()),
79+
Validity::Array(a) => a.execute_mask(ctx.session())?,
80+
};
81+
82+
// First we unpack the codes VarBinArray to get access to the raw data.
83+
let codes_data = array.codes().bytes();
84+
let codes_offsets = array
85+
.codes()
86+
.offsets()
87+
.cast(DType::Primitive(PType::U32, Nullability::NonNullable))?
88+
.execute(ctx)?
89+
.into_primitive()
90+
.downcast::<u32>()
91+
.into_nonnull_buffer();
92+
93+
let decompressor = array.decompressor();
94+
95+
let (views, buffer) = match_each_integer_ptype!(uncompressed_lens.ptype(), |S| {
96+
fsst_decode::<S>(
97+
decompressor,
98+
codes_data,
99+
&codes_offsets,
100+
mask_values,
101+
&validity,
102+
&uncompressed_lens.downcast::<S>().into_nonnull_buffer(),
103+
)
104+
});
105+
106+
let vector = match array.dtype() {
107+
DType::Binary(_) => unsafe {
108+
BinaryVector::new_unchecked(views, Arc::new(vec![buffer].into()), validity)
109+
}
110+
.into(),
111+
DType::Utf8(_) => unsafe {
112+
StringVector::new_unchecked(views, Arc::new(vec![buffer].into()), validity)
113+
}
114+
.into(),
115+
_ => unreachable!("Not a supported FSST DType"),
116+
};
117+
118+
Ok(Some(vector))
119+
}
120+
}
121+
122+
fn fsst_decode<S: IntegerPType + AsPrimitive<usize> + AsPrimitive<u32>>(
123+
decompressor: Decompressor,
124+
codes_data: &[u8],
125+
codes_offsets: &[u32],
126+
filter_mask: &MaskValues,
127+
filtered_validity: &Mask,
128+
filtered_uncompressed_lengths: &[S],
129+
) -> (Buffer<BinaryView>, ByteBuffer) {
130+
let total_uncompressed_size: usize = filtered_uncompressed_lengths
131+
.iter()
132+
.map(|x| <S as AsPrimitive<usize>>::as_(*x))
133+
.sum();
134+
135+
// We allocate an extra 7 bytes per the FSST decompressor's requirement for padding.
136+
let mut uncompressed = ByteBufferMut::with_capacity(total_uncompressed_size + 7);
137+
let mut spare_capacity = uncompressed.spare_capacity_mut();
138+
139+
match filtered_validity {
140+
Mask::AllTrue(_) => {
141+
for &idx in filter_mask.indices() {
142+
let start = codes_offsets[idx] as usize;
143+
let end = codes_offsets[idx + 1] as usize;
144+
let compressed_slice = &codes_data[start..end];
145+
146+
let uncompressed_len =
147+
decompressor.decompress_into(compressed_slice, spare_capacity);
148+
spare_capacity = &mut spare_capacity[uncompressed_len..];
149+
}
150+
}
151+
Mask::AllFalse(_) => {
152+
// Nothing to decompress
153+
unsafe { uncompressed.set_len(0) };
154+
return (Buffer::empty(), uncompressed.freeze());
155+
}
156+
Mask::Values(values) => {
157+
for (filtered_idx, (idx, is_valid)) in filter_mask
158+
.indices()
159+
.iter()
160+
.copied()
161+
.zip(values.bit_buffer().iter())
162+
.enumerate()
163+
{
164+
if is_valid {
165+
let start = codes_offsets[idx] as usize;
166+
let end = codes_offsets[idx + 1] as usize;
167+
let compressed_slice = &codes_data[start..end];
168+
169+
let uncompressed_len =
170+
decompressor.decompress_into(compressed_slice, spare_capacity);
171+
spare_capacity = &mut spare_capacity[uncompressed_len..];
172+
} else {
173+
// We advance the output buffer to make it faster to assemble views below.
174+
spare_capacity =
175+
&mut spare_capacity[filtered_uncompressed_lengths[filtered_idx].as_()..];
176+
}
177+
}
178+
}
179+
}
180+
181+
unsafe { uncompressed.set_len(total_uncompressed_size) };
182+
let uncompressed = uncompressed.freeze();
183+
let uncompressed_slice = uncompressed.as_ref();
184+
185+
// Loop over the uncompressed lengths to construct the BinaryViews.
186+
let mut views = BufferMut::<BinaryView>::with_capacity(filtered_uncompressed_lengths.len());
187+
let mut offset = 0u32;
188+
for len in filtered_uncompressed_lengths {
189+
let view = BinaryView::make_view(
190+
&uncompressed_slice[offset as usize..][..len.as_()],
191+
0u32,
192+
offset,
193+
);
194+
offset += <S as AsPrimitive<u32>>::as_(*len);
195+
unsafe { views.push_unchecked(view) };
196+
}
197+
unsafe { views.set_len(filtered_uncompressed_lengths.len()) };
198+
199+
(views.freeze(), uncompressed)
200+
}

encodings/fsst/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ mod array;
1515
mod canonical;
1616
mod compress;
1717
mod compute;
18+
mod kernel;
1819
mod ops;
1920
#[cfg(feature = "test-harness")]
2021
pub mod test_utils;

vortex-array/src/arrays/scalar_fn/rules.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ impl ArrayReduceRule<ScalarFnVTable> for ScalarFnConstantRule {
7171
let result = match result {
7272
Datum::Scalar(s) => s,
7373
Datum::Vector(v) => {
74-
tracing::warn!(
74+
tracing::info!(
7575
"Scalar function {} returned vector from execution over all scalar inputs",
76-
array.scalar_fn
76+
array.scalar_fn,
7777
);
7878
v.scalar_at(0)
7979
}

vortex-layout/src/layouts/flat/reader.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,16 @@ use crate::segments::SegmentSource;
3737
// actual expression? Perhaps all expressions are given a selection mask to decide for themselves?
3838
const EXPR_EVAL_THRESHOLD: f64 = 0.2;
3939

40+
/// Below this mask density we will propagate filters one by one. In other words, we filter an
41+
/// array using a mask prior to running a filter expression, and then have to perform a more
42+
/// expensive rank intersection on the result. This threshold exists because filtering has a
43+
/// non-trivial cost, and often that cost outweighs evaluating the filter expression over a few
44+
/// more rows that are already known to be false.
45+
///
46+
/// TODO(ngates): this threshold should really be estimated based on the cost of the filter + the
47+
/// the cost of the expression itself.
48+
const FILTER_OF_FILTER_THRESHOLD: f64 = 0.8;
49+
4050
pub struct FlatReader {
4151
layout: FlatLayout,
4252
name: Arc<str>,
@@ -147,11 +157,20 @@ impl LayoutReader for FlatReader {
147157
}
148158

149159
let array_mask = if *USE_VORTEX_OPERATORS {
150-
// Apply the expression to the array.
151-
let array = array.apply(&expr)?;
152-
// Evaluate the array into a mask.
153-
let array_mask = array.execute_mask(&session)?;
154-
mask.bitand(&array_mask)
160+
if mask.density() < FILTER_OF_FILTER_THRESHOLD {
161+
// Run only over the pre-filtered rows.
162+
let array = array.filter(mask.clone())?;
163+
let array = array.apply(&expr)?;
164+
let array_mask = array.execute_mask(&session)?;
165+
166+
mask.intersect_by_rank(&array_mask)
167+
} else {
168+
// Run over the full array, with a simpler bitand at the end.
169+
let array = array.apply(&expr)?;
170+
let array_mask = array.execute_mask(&session)?;
171+
172+
mask.bitand(&array_mask)
173+
}
155174
} else {
156175
// TODO(ngates): the mask may actually be dense within a range, as is often the case when
157176
// we have approximate mask results from a zone map. In which case we could look at

0 commit comments

Comments
 (0)