Skip to content

Commit 9b76ef5

Browse files
authored
chore: make ArrayAccessor infallible (#5303)
1 parent 86c065d commit 9b76ef5

File tree

41 files changed

+265
-368
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+265
-368
lines changed

encodings/fsst/benches/fsst_compress.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ const BENCH_ARGS: &[(usize, usize, u8)] = &[
3838
#[divan::bench(args = BENCH_ARGS)]
3939
fn compress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) {
4040
let array = generate_test_data(string_count, avg_len, unique_chars);
41-
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
42-
bencher.bench(|| fsst_compress(array.as_ref(), &compressor).unwrap())
41+
let compressor = fsst_train_compressor(&array);
42+
bencher.bench(|| fsst_compress(&array, &compressor))
4343
}
4444

4545
#[divan::bench(args = BENCH_ARGS)]
4646
fn decompress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) {
4747
let array = generate_test_data(string_count, avg_len, unique_chars);
48-
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
49-
let encoded = fsst_compress(array.as_ref(), &compressor).unwrap();
48+
let compressor = fsst_train_compressor(&array);
49+
let encoded = fsst_compress(array, &compressor);
5050

5151
bencher
5252
.with_inputs(|| encoded.clone())
@@ -56,14 +56,14 @@ fn decompress_fsst(bencher: Bencher, (string_count, avg_len, unique_chars): (usi
5656
#[divan::bench(args = BENCH_ARGS)]
5757
fn train_compressor(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) {
5858
let array = generate_test_data(string_count, avg_len, unique_chars);
59-
bencher.bench(|| fsst_train_compressor(array.as_ref()).unwrap())
59+
bencher.bench(|| fsst_train_compressor(&array))
6060
}
6161

6262
#[divan::bench(args = BENCH_ARGS)]
6363
fn pushdown_compare(bencher: Bencher, (string_count, avg_len, unique_chars): (usize, usize, u8)) {
6464
let array = generate_test_data(string_count, avg_len, unique_chars);
65-
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
66-
let fsst_array = fsst_compress(array.as_ref(), &compressor).unwrap();
65+
let compressor = fsst_train_compressor(&array);
66+
let fsst_array = fsst_compress(&array, &compressor);
6767
let constant = ConstantArray::new(Scalar::from(&b"const"[..]), array.len());
6868

6969
bencher
@@ -79,8 +79,8 @@ fn canonicalize_compare(
7979
(string_count, avg_len, unique_chars): (usize, usize, u8),
8080
) {
8181
let array = generate_test_data(string_count, avg_len, unique_chars);
82-
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
83-
let fsst_array = fsst_compress(array.as_ref(), &compressor).unwrap();
82+
let compressor = fsst_train_compressor(&array);
83+
let fsst_array = fsst_compress(&array, &compressor);
8484
let constant = ConstantArray::new(Scalar::from(&b"const"[..]), array.len());
8585

8686
bencher
@@ -168,11 +168,9 @@ fn generate_chunked_test_data(
168168
) -> ChunkedArray {
169169
(0..chunk_size)
170170
.map(|_| {
171-
let array = generate_test_data(string_count, avg_len, unique_chars).into_array();
172-
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
173-
fsst_compress(array.as_ref(), &compressor)
174-
.unwrap()
175-
.into_array()
171+
let array = generate_test_data(string_count, avg_len, unique_chars);
172+
let compressor = fsst_train_compressor(&array);
173+
fsst_compress(array, &compressor).into_array()
176174
})
177175
.collect::<ChunkedArray>()
178176
}

encodings/fsst/src/canonical.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ mod tests {
105105

106106
use crate::{fsst_compress, fsst_train_compressor};
107107

108-
fn make_data() -> (ArrayRef, Vec<Option<Vec<u8>>>) {
108+
fn make_data() -> (VarBinArray, Vec<Option<Vec<u8>>>) {
109109
const STRING_COUNT: usize = 1000;
110110
let mut rng = StdRng::seed_from_u64(0);
111111
let mut strings = Vec::with_capacity(STRING_COUNT);
@@ -133,8 +133,7 @@ mod tests {
133133
.into_iter()
134134
.map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
135135
DType::Binary(Nullability::Nullable),
136-
)
137-
.into_array(),
136+
),
138137
strings,
139138
)
140139
}
@@ -144,11 +143,8 @@ mod tests {
144143
let (arr_vec, data_vec): (Vec<ArrayRef>, Vec<Vec<Option<Vec<u8>>>>) = (0..10)
145144
.map(|_| {
146145
let (array, data) = make_data();
147-
let compressor = fsst_train_compressor(&array).unwrap();
148-
(
149-
fsst_compress(&array, &compressor).unwrap().into_array(),
150-
data,
151-
)
146+
let compressor = fsst_train_compressor(&array);
147+
(fsst_compress(&array, &compressor).into_array(), data)
152148
})
153149
.unzip();
154150

@@ -168,17 +164,15 @@ mod tests {
168164

169165
{
170166
let arr = builder.finish_into_canonical().into_varbinview();
171-
let res1 = arr
172-
.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
173-
.unwrap();
167+
let res1 =
168+
arr.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
174169
assert_eq!(data, res1);
175170
};
176171

177172
{
178173
let arr2 = chunked_arr.to_varbinview();
179-
let res2 = arr2
180-
.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>())
181-
.unwrap();
174+
let res2 =
175+
arr2.with_iterator(|iter| iter.map(|b| b.map(|v| v.to_vec())).collect::<Vec<_>>());
182176
assert_eq!(data, res2)
183177
};
184178
}

encodings/fsst/src/compress.rs

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,64 +6,30 @@
66
use fsst::{Compressor, Symbol};
77
use vortex_array::accessor::ArrayAccessor;
88
use vortex_array::arrays::builder::VarBinBuilder;
9-
use vortex_array::arrays::{VarBinVTable, VarBinViewVTable};
109
use vortex_array::{Array, IntoArray};
1110
use vortex_buffer::{Buffer, BufferMut};
1211
use vortex_dtype::DType;
13-
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_bail};
12+
use vortex_error::{VortexExpect, VortexUnwrap};
1413

1514
use crate::FSSTArray;
1615

17-
/// Compress an array using FSST.
18-
///
19-
/// # Panics
20-
///
21-
/// If the `strings` array is not encoded as either [`vortex_array::arrays::VarBinArray`] or
22-
/// [`vortex_array::arrays::VarBinViewArray`].
23-
pub fn fsst_compress(strings: &dyn Array, compressor: &Compressor) -> VortexResult<FSSTArray> {
24-
let len = strings.len();
25-
let dtype = strings.dtype().clone();
26-
27-
// Compress VarBinArray
28-
if let Some(varbin) = strings.as_opt::<VarBinVTable>() {
29-
return varbin
30-
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
31-
.map_err(|err| err.with_context("Failed to compress VarBinArray with FSST"));
32-
}
33-
34-
// Compress VarBinViewArray
35-
if let Some(varbin_view) = strings.as_opt::<VarBinViewVTable>() {
36-
return varbin_view
37-
.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
38-
.map_err(|err| err.with_context("Failed to compress VarBinViewArray with FSST"));
39-
}
40-
41-
vortex_bail!(
42-
"cannot fsst_compress array with unsupported encoding {:?}",
43-
strings.encoding_id()
44-
)
16+
/// Compress a string array using FSST.
17+
pub fn fsst_compress<A: ArrayAccessor<[u8]> + AsRef<dyn Array>>(
18+
strings: A,
19+
compressor: &Compressor,
20+
) -> FSSTArray {
21+
let len = strings.as_ref().len();
22+
let dtype = strings.as_ref().dtype().clone();
23+
strings.with_iterator(|iter| fsst_compress_iter(iter, len, dtype, compressor))
4524
}
4625

4726
/// Train a compressor from an array.
4827
///
4928
/// # Panics
5029
///
5130
/// If the provided array is not FSST compressible.
52-
pub fn fsst_train_compressor(array: &dyn Array) -> VortexResult<Compressor> {
53-
if let Some(varbin) = array.as_opt::<VarBinVTable>() {
54-
varbin
55-
.with_iterator(|iter| fsst_train_compressor_iter(iter))
56-
.map_err(|err| err.with_context("Failed to train FSST Compressor from VarBinArray"))
57-
} else if let Some(varbin_view) = array.as_opt::<VarBinViewVTable>() {
58-
varbin_view
59-
.with_iterator(|iter| fsst_train_compressor_iter(iter))
60-
.map_err(|err| err.with_context("Failed to train FSST Compressor from VarBinViewArray"))
61-
} else {
62-
vortex_bail!(
63-
"cannot fsst_compress array with unsupported encoding {:?}",
64-
array.encoding_id()
65-
)
66-
}
31+
pub fn fsst_train_compressor<A: ArrayAccessor<[u8]>>(array: &A) -> Compressor {
32+
array.with_iterator(|iter| fsst_train_compressor_iter(iter))
6733
}
6834

6935
/// Train a [compressor][Compressor] from an iterator of bytestrings.

encodings/fsst/src/compute/cast.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ mod tests {
5555
DType::Utf8(Nullability::NonNullable),
5656
);
5757

58-
let compressor = fsst_train_compressor(strings.as_ref()).unwrap();
59-
let fsst = fsst_compress(strings.as_ref(), &compressor).unwrap();
58+
let compressor = fsst_train_compressor(&strings);
59+
let fsst = fsst_compress(strings, &compressor);
6060

6161
// Cast to nullable
6262
let casted = cast(fsst.as_ref(), &DType::Utf8(Nullability::Nullable)).unwrap();
@@ -77,8 +77,8 @@ mod tests {
7777
DType::Utf8(Nullability::NonNullable)
7878
))]
7979
fn test_cast_fsst_conformance(#[case] array: VarBinArray) {
80-
let compressor = fsst_train_compressor(array.as_ref()).unwrap();
81-
let fsst = fsst_compress(array.as_ref(), &compressor).unwrap();
80+
let compressor = fsst_train_compressor(&array);
81+
let fsst = fsst_compress(&array, &compressor);
8282
test_cast_conformance(fsst.as_ref());
8383
}
8484
}

encodings/fsst/src/compute/compare.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ mod tests {
131131
],
132132
DType::Utf8(Nullability::Nullable),
133133
);
134-
let compressor = fsst_train_compressor(lhs.as_ref()).unwrap();
135-
let lhs = fsst_compress(lhs.as_ref(), &compressor).unwrap();
134+
let compressor = fsst_train_compressor(&lhs);
135+
let lhs = fsst_compress(lhs, &compressor);
136136

137137
let rhs = ConstantArray::new("world", lhs.len());
138138

encodings/fsst/src/compute/filter.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ mod test {
4646
builder.append_value(b"world");
4747
let varbin = builder.finish(DType::Utf8(Nullability::NonNullable));
4848

49-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
50-
let array = fsst_compress(varbin.as_ref(), &compressor).unwrap();
49+
let compressor = fsst_train_compressor(&varbin);
50+
let array = fsst_compress(&varbin, &compressor);
5151
test_filter_conformance(array.as_ref());
5252

5353
// Test with longer strings that benefit from compression
@@ -59,8 +59,8 @@ mod test {
5959
builder.append_value(b"the lazy dog sleeps");
6060
let varbin = builder.finish(DType::Utf8(Nullability::NonNullable));
6161

62-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
63-
let array = fsst_compress(varbin.as_ref(), &compressor).unwrap();
62+
let compressor = fsst_train_compressor(&varbin);
63+
let array = fsst_compress(&varbin, &compressor);
6464
test_filter_conformance(array.as_ref());
6565

6666
// Test with nullable strings
@@ -72,8 +72,8 @@ mod test {
7272
builder.append_null();
7373
let varbin = builder.finish(DType::Utf8(Nullability::Nullable));
7474

75-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
76-
let array = fsst_compress(varbin.as_ref(), &compressor).unwrap();
75+
let compressor = fsst_train_compressor(&varbin);
76+
let array = fsst_compress(&varbin, &compressor);
7777
test_filter_conformance(array.as_ref());
7878
}
7979
}

encodings/fsst/src/compute/mod.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ mod tests {
5454
#[test]
5555
fn test_take_null() {
5656
let arr = VarBinArray::from_iter([Some("h")], DType::Utf8(Nullability::NonNullable));
57-
let compr = fsst_train_compressor(arr.as_ref()).unwrap();
58-
let fsst = fsst_compress(arr.as_ref(), &compr).unwrap();
57+
let compr = fsst_train_compressor(&arr);
58+
let fsst = fsst_compress(&arr, &compr);
5959

6060
let idx1: PrimitiveArray = (0..1).collect();
6161

@@ -86,8 +86,8 @@ mod tests {
8686
DType::Utf8(Nullability::NonNullable),
8787
))]
8888
fn test_take_fsst_conformance(#[case] varbin: VarBinArray) {
89-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
90-
let array = fsst_compress(varbin.as_ref(), &compressor).unwrap();
89+
let compressor = fsst_train_compressor(&varbin);
90+
let array = fsst_compress(&varbin, &compressor);
9191
test_take_conformance(array.as_ref());
9292
}
9393

@@ -98,43 +98,43 @@ mod tests {
9898
["hello world", "testing fsst", "compression test", "data array", "vortex encoding"].map(Some),
9999
DType::Utf8(Nullability::NonNullable),
100100
);
101-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
102-
fsst_compress(varbin.as_ref(), &compressor).unwrap()
101+
let compressor = fsst_train_compressor(&varbin);
102+
fsst_compress(&varbin, &compressor)
103103
})]
104104
// Nullable strings
105105
#[case::fsst_nullable({
106106
let varbin = VarBinArray::from_iter(
107107
[Some("hello"), None, Some("world"), Some("test"), None],
108108
DType::Utf8(Nullability::Nullable),
109109
);
110-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
111-
fsst_compress(varbin.as_ref(), &compressor).unwrap()
110+
let compressor = fsst_train_compressor(&varbin);
111+
fsst_compress(varbin, &compressor)
112112
})]
113113
// Repetitive patterns (good for FSST compression)
114114
#[case::fsst_repetitive({
115115
let varbin = VarBinArray::from_iter(
116116
["http://example.com", "http://test.com", "http://vortex.dev", "http://data.org"].map(Some),
117117
DType::Utf8(Nullability::NonNullable),
118118
);
119-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
120-
fsst_compress(varbin.as_ref(), &compressor).unwrap()
119+
let compressor = fsst_train_compressor(&varbin);
120+
fsst_compress(&varbin, &compressor)
121121
})]
122122
// Edge cases
123123
#[case::fsst_single({
124124
let varbin = VarBinArray::from_iter(
125125
["single element"].map(Some),
126126
DType::Utf8(Nullability::NonNullable),
127127
);
128-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
129-
fsst_compress(varbin.as_ref(), &compressor).unwrap()
128+
let compressor = fsst_train_compressor(&varbin);
129+
fsst_compress(&varbin, &compressor)
130130
})]
131131
#[case::fsst_empty_strings({
132132
let varbin = VarBinArray::from_iter(
133133
["", "test", "", "hello", ""].map(Some),
134134
DType::Utf8(Nullability::NonNullable),
135135
);
136-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
137-
fsst_compress(varbin.as_ref(), &compressor).unwrap()
136+
let compressor = fsst_train_compressor(&varbin);
137+
fsst_compress(varbin, &compressor)
138138
})]
139139
// Large arrays
140140
#[case::fsst_large({
@@ -153,8 +153,8 @@ mod tests {
153153
}))
154154
.collect();
155155
let varbin = VarBinArray::from_iter(data, DType::Utf8(Nullability::NonNullable));
156-
let compressor = fsst_train_compressor(varbin.as_ref()).unwrap();
157-
fsst_compress(varbin.as_ref(), &compressor).unwrap()
156+
let compressor = fsst_train_compressor(&varbin);
157+
fsst_compress(varbin, &compressor)
158158
})]
159159

160160
fn test_fsst_consistency(#[case] array: FSSTArray) {

encodings/fsst/src/serde.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ impl EncodeVTable<FSSTVTable> for FSSTVTable {
8686

8787
let compressor = match like {
8888
Some(like) => Compressor::rebuild_from(like.symbols(), like.symbol_lengths()),
89-
None => fsst_train_compressor(array.as_ref())?,
89+
None => fsst_train_compressor(&array),
9090
};
9191

92-
Ok(Some(fsst_compress(array.as_ref(), &compressor)?))
92+
Ok(Some(fsst_compress(array, &compressor)))
9393
}
9494
}
9595

encodings/fsst/src/test_utils.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,9 @@ pub fn gen_fsst_test_data(len: usize, avg_str_len: usize, unique_chars: u8) -> A
3333
.map(|opt_s| opt_s.map(Vec::into_boxed_slice)),
3434
DType::Binary(Nullability::NonNullable),
3535
);
36-
let compressor = fsst_train_compressor(varbin.as_ref()).vortex_unwrap();
36+
let compressor = fsst_train_compressor(&varbin);
3737

38-
fsst_compress(varbin.as_ref(), &compressor)
39-
.vortex_unwrap()
40-
.into_array()
38+
fsst_compress(varbin, &compressor).into_array()
4139
}
4240

4341
pub fn gen_dict_fsst_test_data<T: NativePType>(

encodings/fsst/src/tests.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@ pub(crate) fn build_fsst_array() -> ArrayRef {
2626
input_array.append_value(b"Nothing in present history can contradict them");
2727
let input_array = input_array.finish(DType::Utf8(Nullability::NonNullable));
2828

29-
let compressor = fsst_train_compressor(input_array.as_ref()).unwrap();
30-
fsst_compress(input_array.as_ref(), &compressor)
31-
.unwrap()
32-
.into_array()
29+
let compressor = fsst_train_compressor(&input_array);
30+
fsst_compress(input_array, &compressor).into_array()
3331
}
3432

3533
#[test]

0 commit comments

Comments
 (0)