Skip to content

Commit 14820af

Browse files
committed
Merge branch 'develop' into ngates/stat-expression
2 parents 41244e3 + 8f7b707 commit 14820af

File tree

17 files changed

+271
-209
lines changed

17 files changed

+271
-209
lines changed

.github/release-drafter.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ categories:
1515
collapse-after: 8
1616
labels:
1717
- "fix"
18+
- title: "📖 Documentation"
19+
labels:
20+
- "documentation"
1821
- title: "🧰 Maintenance"
1922
labels:
2023
- "chore"

.github/workflows/labels.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
run: |
2929
REQUIRED_LABELS=(
3030
"chore"
31+
"documentation"
3132
"feature"
3233
"fix"
3334
"performance"

encodings/zstd/src/array.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,10 +454,32 @@ impl ZstdArray {
454454

455455
let decompressed = decompressed.freeze();
456456
// Last, we slice the exact values requested out of the decompressed data.
457-
let slice_validity = self
457+
let mut slice_validity = self
458458
.unsliced_validity
459459
.slice(self.slice_start..self.slice_stop);
460460

461+
// NOTE: this block handles setting the output type when the validity and DType disagree.
462+
//
463+
// ZSTD is a compact block compressor, meaning that null values are not stored inline in
464+
// the data frames. A ZSTD Array that was initialized must always hold onto its full
465+
// validity bitmap, even if sliced to only include non-null values.
466+
//
467+
// We ensure that the validity of the decompressed array ALWAYS matches the validity
468+
// implied by the DType.
469+
if !self.dtype().is_nullable() && slice_validity != Validity::NonNullable {
470+
assert!(
471+
slice_validity.all_valid(slice_n_rows),
472+
"ZSTD array expects to be non-nullable but there are nulls after decompression"
473+
);
474+
475+
slice_validity = Validity::NonNullable;
476+
} else if self.dtype.is_nullable() && slice_validity == Validity::NonNullable {
477+
slice_validity = Validity::AllValid;
478+
}
479+
//
480+
// END OF IMPORTANT BLOCK
481+
//
482+
461483
match &self.dtype {
462484
DType::Primitive(..) => {
463485
let slice_values_buffer = decompressed.slice(
@@ -531,6 +553,21 @@ impl ZstdArray {
531553
}
532554

533555
pub(crate) fn _slice(&self, start: usize, stop: usize) -> ZstdArray {
556+
let new_start = self.slice_start + start;
557+
let new_stop = self.slice_start + stop;
558+
559+
assert!(
560+
new_start <= self.slice_stop,
561+
"new slice start {new_start} exceeds end {}",
562+
self.slice_stop
563+
);
564+
565+
assert!(
566+
new_stop <= self.slice_stop,
567+
"new slice stop {new_stop} exceeds end {}",
568+
self.slice_stop
569+
);
570+
534571
ZstdArray {
535572
slice_start: self.slice_start + start,
536573
slice_stop: self.slice_start + stop,

encodings/zstd/src/compute/cast.rs

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,67 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use vortex_array::compute::{CastKernel, CastKernelAdapter};
5-
use vortex_array::{ArrayRef, IntoArray, register_kernel};
6-
use vortex_dtype::DType;
5+
use vortex_array::{ArrayRef, register_kernel};
6+
use vortex_dtype::{DType, Nullability};
77
use vortex_error::VortexResult;
88

99
use crate::{ZstdArray, ZstdVTable};
1010

1111
impl CastKernel for ZstdVTable {
1212
fn cast(&self, array: &ZstdArray, dtype: &DType) -> VortexResult<Option<ArrayRef>> {
13-
if !dtype.is_nullable() || !array.all_valid() {
14-
// We cannot cast to non-nullable since the validity containing nulls is used to decode
15-
// the ZSTD array, this would require rewriting tables.
13+
if !dtype.eq_ignore_nullability(array.dtype()) {
14+
// Type changes can't be handled in ZSTD, need to decode and tweak.
15+
// TODO(aduffy): handle trivial conversions like Binary -> UTF8, integer widening, etc.
1616
return Ok(None);
1717
}
18-
// ZstdArray is a general-purpose compression encoding using Zstandard compression.
19-
// It can handle nullability changes without decompression by updating the validity
20-
// bitmap, but type changes require decompression since the compressed data is
21-
// type-specific and Zstd operates on raw bytes.
22-
if array.dtype().eq_ignore_nullability(dtype) {
23-
// Create a new validity with the target nullability
24-
let new_validity = array
25-
.unsliced_validity
26-
.clone()
27-
.cast_nullability(dtype.nullability(), array.len())?;
28-
29-
return Ok(Some(
18+
19+
let src_nullability = array.dtype().nullability();
20+
let target_nullability = dtype.nullability();
21+
22+
match (src_nullability, target_nullability) {
23+
// Same type case. This should be handled in the layer above but for
24+
// completeness of the match arms we also handle it here.
25+
(Nullability::Nullable, Nullability::Nullable)
26+
| (Nullability::NonNullable, Nullability::NonNullable) => Ok(Some(array.to_array())),
27+
(Nullability::NonNullable, Nullability::Nullable) => Ok(Some(
28+
// nonnull => null, trivial cast by altering the validity
3029
ZstdArray::new(
3130
array.dictionary.clone(),
3231
array.frames.clone(),
3332
dtype.clone(),
3433
array.metadata.clone(),
3534
array.unsliced_n_rows(),
36-
new_validity,
35+
array.unsliced_validity.clone(),
3736
)
38-
._slice(array.slice_start(), array.slice_stop())
39-
.into_array(),
40-
));
37+
.slice(array.slice_start()..array.slice_stop()),
38+
)),
39+
(Nullability::Nullable, Nullability::NonNullable) => {
40+
// null => non-null works if there are no nulls in the sliced range
41+
let sliced_len = array.slice_stop() - array.slice_start();
42+
let has_nulls = !array
43+
.unsliced_validity
44+
.slice(array.slice_start()..array.slice_stop())
45+
.all_valid(sliced_len);
46+
47+
// We don't attempt to handle casting when there are nulls.
48+
if has_nulls {
49+
return Ok(None);
50+
}
51+
52+
// If there are no nulls, the cast is trivial
53+
Ok(Some(
54+
ZstdArray::new(
55+
array.dictionary.clone(),
56+
array.frames.clone(),
57+
dtype.clone(),
58+
array.metadata.clone(),
59+
array.unsliced_n_rows(),
60+
array.unsliced_validity.clone(),
61+
)
62+
.slice(array.slice_start()..array.slice_stop()),
63+
))
64+
}
4165
}
42-
43-
// For other casts (e.g., type changes), decode to canonical and let the underlying array handle it
44-
Ok(None)
4566
}
4667
}
4768

vortex-buffer/src/bit/buf.rs

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::bit::{
88
BitChunks, BitIndexIterator, BitIterator, BitSliceIterator, UnalignedBitChunk,
99
get_bit_unchecked,
1010
};
11-
use crate::{Alignment, BitBufferMut, Buffer, BufferMut, ByteBuffer, buffer};
11+
use crate::{Alignment, BitBufferMut, Buffer, ByteBuffer, buffer};
1212

1313
/// An immutable bitset stored as a packed byte buffer.
1414
#[derive(Debug, Clone, Eq)]
@@ -114,37 +114,9 @@ impl BitBuffer {
114114
}
115115
}
116116

117-
/// Invokes `f` with indexes `0..len` collecting the boolean results into a new `BitBuffer`
118-
pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
119-
let mut buffer = BufferMut::with_capacity(len.div_ceil(64) * 8);
120-
121-
let chunks = len / 64;
122-
let remainder = len % 64;
123-
for chunk in 0..chunks {
124-
let mut packed = 0;
125-
for bit_idx in 0..64 {
126-
let i = bit_idx + chunk * 64;
127-
packed |= (f(i) as u64) << bit_idx;
128-
}
129-
130-
// SAFETY: Already allocated sufficient capacity
131-
unsafe { buffer.push_unchecked(packed) }
132-
}
133-
134-
if remainder != 0 {
135-
let mut packed = 0;
136-
for bit_idx in 0..remainder {
137-
let i = bit_idx + chunks * 64;
138-
packed |= (f(i) as u64) << bit_idx;
139-
}
140-
141-
// SAFETY: Already allocated sufficient capacity
142-
unsafe { buffer.push_unchecked(packed) }
143-
}
144-
145-
buffer.truncate(len.div_ceil(8));
146-
147-
Self::new(buffer.freeze().into_byte_buffer(), len)
117+
/// Invokes `f` with indexes `0..len` collecting the boolean results into a new [`BitBuffer`].
118+
pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, f: F) -> Self {
119+
BitBufferMut::collect_bool(len, f).freeze()
148120
}
149121

150122
/// Get the logical length of this `BoolBuffer`.

vortex-buffer/src/bit/buf_mut.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,43 @@ impl BitBufferMut {
117117
}
118118
}
119119

120+
/// Invokes `f` with indexes `0..len` collecting the boolean results into a new `BitBufferMut`
121+
pub fn collect_bool<F: FnMut(usize) -> bool>(len: usize, mut f: F) -> Self {
122+
let mut buffer = BufferMut::with_capacity(len.div_ceil(64) * 8);
123+
124+
let chunks = len / 64;
125+
let remainder = len % 64;
126+
for chunk in 0..chunks {
127+
let mut packed = 0;
128+
for bit_idx in 0..64 {
129+
let i = bit_idx + chunk * 64;
130+
packed |= (f(i) as u64) << bit_idx;
131+
}
132+
133+
// SAFETY: Already allocated sufficient capacity
134+
unsafe { buffer.push_unchecked(packed) }
135+
}
136+
137+
if remainder != 0 {
138+
let mut packed = 0;
139+
for bit_idx in 0..remainder {
140+
let i = bit_idx + chunks * 64;
141+
packed |= (f(i) as u64) << bit_idx;
142+
}
143+
144+
// SAFETY: Already allocated sufficient capacity
145+
unsafe { buffer.push_unchecked(packed) }
146+
}
147+
148+
buffer.truncate(len.div_ceil(8));
149+
150+
Self {
151+
buffer: buffer.into_byte_buffer(),
152+
offset: 0,
153+
len,
154+
}
155+
}
156+
120157
/// Return the underlying byte buffer.
121158
pub fn inner(&self) -> &ByteBufferMut {
122159
&self.buffer

vortex-buffer/src/buffer_mut.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ impl<T> BufferMut<T> {
110110
buffer
111111
}
112112

113+
/// Return the [`ByteBufferMut`] for this [`BufferMut<T>`].
114+
pub fn into_byte_buffer(self) -> ByteBufferMut {
115+
ByteBufferMut {
116+
bytes: self.bytes,
117+
length: self.length * size_of::<T>(),
118+
alignment: self.alignment,
119+
_marker: Default::default(),
120+
}
121+
}
122+
113123
/// Get the alignment of the buffer.
114124
#[inline(always)]
115125
pub fn alignment(&self) -> Alignment {

vortex-compute/src/filter/bitbuffer.rs

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,10 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use vortex_buffer::{BitBuffer, BitBufferMut, get_bit};
5-
use vortex_mask::{Mask, MaskIter};
5+
use vortex_mask::Mask;
66

77
use crate::filter::{Filter, MaskIndices};
88

9-
/// If the filter density is above 80%, we use slices to filter the array instead of indices.
10-
// TODO(ngates): we need more experimentation to determine the best threshold here.
11-
const FILTER_SLICES_DENSITY_THRESHOLD: f64 = 0.8;
12-
139
impl Filter<Mask> for &BitBuffer {
1410
type Output = BitBuffer;
1511

@@ -23,12 +19,29 @@ impl Filter<Mask> for &BitBuffer {
2319
match selection_mask {
2420
Mask::AllTrue(_) => self.clone(),
2521
Mask::AllFalse(_) => BitBuffer::empty(),
26-
Mask::Values(v) => match v.threshold_iter(FILTER_SLICES_DENSITY_THRESHOLD) {
27-
MaskIter::Indices(indices) => filter_indices(self, indices),
28-
MaskIter::Slices(slices) => {
29-
filter_slices(self, selection_mask.true_count(), slices)
30-
}
31-
},
22+
Mask::Values(v) => {
23+
filter_indices(self.inner().as_ref(), self.offset(), v.indices()).freeze()
24+
}
25+
}
26+
}
27+
}
28+
29+
impl Filter<Mask> for &mut BitBufferMut {
30+
type Output = ();
31+
32+
fn filter(self, selection_mask: &Mask) {
33+
assert_eq!(
34+
selection_mask.len(),
35+
self.len(),
36+
"Selection mask length must equal the mask length"
37+
);
38+
39+
match selection_mask {
40+
Mask::AllTrue(_) => {}
41+
Mask::AllFalse(_) => self.clear(),
42+
Mask::Values(v) => {
43+
*self = filter_indices(self.inner().as_slice(), self.offset(), v.indices())
44+
}
3245
}
3346
}
3447
}
@@ -37,25 +50,24 @@ impl Filter<MaskIndices<'_>> for &BitBuffer {
3750
type Output = BitBuffer;
3851

3952
fn filter(self, indices: &MaskIndices) -> BitBuffer {
40-
filter_indices(self, indices)
53+
filter_indices(self.inner().as_ref(), self.offset(), indices).freeze()
4154
}
4255
}
4356

44-
fn filter_indices(bools: &BitBuffer, indices: &[usize]) -> BitBuffer {
45-
let buffer = bools.inner().as_ref();
46-
BitBuffer::collect_bool(indices.len(), |idx| {
47-
let idx = *unsafe { indices.get_unchecked(idx) };
48-
get_bit(buffer, bools.offset() + idx)
49-
})
50-
}
57+
impl Filter<MaskIndices<'_>> for &mut BitBufferMut {
58+
type Output = ();
5159

52-
fn filter_slices(buffer: &BitBuffer, output_len: usize, slices: &[(usize, usize)]) -> BitBuffer {
53-
let mut builder = BitBufferMut::with_capacity(output_len);
54-
for (start, end) in slices {
55-
// TODO(ngates): we probably want a borrowed slice for things like this.
56-
builder.append_buffer(&buffer.slice(*start..*end));
60+
fn filter(self, indices: &MaskIndices) {
61+
*self = filter_indices(self.inner().as_ref(), self.offset(), indices)
5762
}
58-
builder.freeze()
63+
}
64+
65+
fn filter_indices(bools: &[u8], bit_offset: usize, indices: &[usize]) -> BitBufferMut {
66+
// FIXME(ngates): this is slower than it could be!
67+
BitBufferMut::collect_bool(indices.len(), |idx| {
68+
let idx = *unsafe { indices.get_unchecked(idx) };
69+
get_bit(bools, bit_offset + idx)
70+
})
5971
}
6072

6173
#[cfg(test)]
@@ -64,20 +76,10 @@ mod test {
6476

6577
use super::*;
6678

67-
#[test]
68-
fn filter_bool_by_slice_test() {
69-
let bits = bitbuffer![1 1 0];
70-
71-
let filtered = filter_slices(&bits, 2, &[(0, 1), (2, 3)]);
72-
assert_eq!(2, filtered.len());
73-
74-
assert_eq!(filtered, bitbuffer![1 0])
75-
}
76-
7779
#[test]
7880
fn filter_bool_by_index_test() {
7981
let buf = bitbuffer![1 1 0];
80-
let filtered = filter_indices(&buf, &[0, 2]);
82+
let filtered = filter_indices(buf.inner().as_ref(), 0, &[0, 2]).freeze();
8183
assert_eq!(2, filtered.len());
8284
assert_eq!(filtered, bitbuffer![1 0])
8385
}

vortex-compute/src/filter/slice.rs

Whitespace-only changes.

0 commit comments

Comments
 (0)