Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions encodings/alp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ readme = { workspace = true }
workspace = true

[dependencies]
arrow-array = { workspace = true }
itertools = { workspace = true }
num-traits = { workspace = true }
serde = { workspace = true, features = ["derive"] }
Expand All @@ -29,6 +30,7 @@ vortex-scalar = { workspace = true }

[dev-dependencies]
divan = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
vortex-array = { workspace = true, features = ["test-harness"] }

Expand Down
55 changes: 44 additions & 11 deletions encodings/alp/benches/alp_compress.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,60 @@
#![allow(clippy::unwrap_used)]

use divan::Bencher;
use vortex_alp::{ALPFloat, ALPRDFloat, Exponents, RDEncoder};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng as _};
use vortex_alp::{alp_encode, ALPFloat, ALPRDFloat, RDEncoder};
use vortex_array::array::PrimitiveArray;
use vortex_array::validity::Validity;
use vortex_array::IntoCanonical;
use vortex_buffer::{buffer, Buffer};
use vortex_buffer::buffer;
use vortex_dtype::NativePType;

fn main() {
divan::main();
}

#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])]
fn compress_alp<T: ALPFloat>(n: usize) -> (Exponents, Buffer<T::ALPInt>, Buffer<u64>, Buffer<T>) {
let values: Vec<T> = vec![T::from(1.234).unwrap(); n];
T::encode(values.as_slice(), None)
#[divan::bench(types = [f32, f64], args = [
(100_000, 1.0),
(10_000_000, 1.0),
(100_000, 0.25),
(10_000_000, 0.25),
(100_000, 0.95),
(10_000_000, 0.95),
])]
fn compress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64)) {
let (n, fraction_valid) = args;
let mut rng = StdRng::seed_from_u64(0);
let values = buffer![T::from(1.234).unwrap(); n];
let validity = if fraction_valid < 1.0 {
Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid)))
} else {
Validity::NonNullable
};
bencher.bench_local(move || {
alp_encode(&PrimitiveArray::new(values.clone(), validity.clone())).unwrap()
})
}

#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])]
fn decompress_alp<T: ALPFloat>(bencher: Bencher, n: usize) {
let values: Vec<T> = vec![T::from(1.234).unwrap(); n];
let (exponents, encoded, ..) = T::encode(values.as_slice(), None);
bencher.bench_local(move || T::decode(&encoded, exponents));
#[divan::bench(types = [f32, f64], args = [
(100_000, 1.0),
(10_000_000, 1.0),
(100_000, 0.25),
(10_000_000, 0.25),
(100_000, 0.95),
(10_000_000, 0.95),
])]
fn decompress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64)) {
let (n, fraction_valid) = args;
let mut rng = StdRng::seed_from_u64(0);
let values = buffer![T::from(1.234).unwrap(); n];
let validity = if fraction_valid < 1.0 {
Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid)))
} else {
Validity::NonNullable
};
let array = alp_encode(&PrimitiveArray::new(values, validity)).unwrap();
bencher.bench_local(move || array.clone().into_canonical().unwrap());
}

#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])]
Expand Down
8 changes: 8 additions & 0 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ impl ALPArray {
let mut children = Vec::with_capacity(2);
children.push(encoded);
if let Some(patches) = &patches {
if patches.dtype() != &dtype {
vortex_bail!(MismatchedTypes: dtype, patches.dtype());
}

if patches.values().logical_validity().null_count()? != 0 {
vortex_bail!("ALPArray: patches must not contain invalid entries");
}

children.push(patches.indices().clone());
children.push(patches.values().clone());
}
Expand Down
121 changes: 93 additions & 28 deletions encodings/alp/src/alp/compress.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use vortex_array::array::PrimitiveArray;
use vortex_array::patches::Patches;
use vortex_array::validity::{ArrayValidity as _, LogicalValidity, Validity};
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
use vortex_buffer::Buffer;
use vortex_dtype::{NativePType, PType};
use vortex_error::{vortex_bail, VortexResult, VortexUnwrap};
use vortex_error::{vortex_bail, VortexResult};
use vortex_scalar::ScalarType;

use crate::alp::{ALPArray, ALPFloat};
Expand All @@ -24,39 +26,69 @@ macro_rules! match_each_alp_float_ptype {
})
}

pub fn alp_encode_components<T>(
pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult<ALPArray> {
let (exponents, encoded, patches) = alp_encode_components(parray)?;
ALPArray::try_new(encoded, exponents, patches)
}

pub fn alp_encode_components(
parray: &PrimitiveArray,
) -> VortexResult<(Exponents, ArrayData, Option<Patches>)> {
match parray.ptype() {
PType::F32 => alp_encode_components_typed::<f32>(parray),
PType::F64 => alp_encode_components_typed::<f64>(parray),
_ => vortex_bail!("ALP can only encode f32 and f64"),
}
}

#[allow(clippy::cast_possible_truncation)]
fn alp_encode_components_typed<T>(
values: &PrimitiveArray,
exponents: Option<Exponents>,
) -> (Exponents, ArrayData, Option<Patches>)
) -> VortexResult<(Exponents, ArrayData, Option<Patches>)>
where
T: ALPFloat + NativePType,
T::ALPInt: NativePType,
T: ScalarType,
{
let (exponents, encoded, exc_pos, exc) = T::encode(values.as_slice::<T>(), exponents);
let len = encoded.len();
(
exponents,
PrimitiveArray::new(encoded, values.validity()).into_array(),
(!exc.is_empty()).then(|| {
let position_arr = exc_pos.into_array();
let patch_validity = values.validity().take(&position_arr).vortex_unwrap();
Patches::new(
len,
position_arr,
PrimitiveArray::new(exc, patch_validity).into_array(),
)
}),
)
}
let values_slice = values.as_slice::<T>();

pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult<ALPArray> {
let (exponents, encoded, patches) = match parray.ptype() {
PType::F32 => alp_encode_components::<f32>(parray, None),
PType::F64 => alp_encode_components::<f64>(parray, None),
_ => vortex_bail!("ALP can only encode f32 and f64"),
let exponents = T::find_best_exponents(values_slice);
let (encoded, exceptional_positions) = T::chunked_encode(values.as_slice::<T>(), exponents);

let encoded_array = PrimitiveArray::new(encoded, values.validity()).into_array();
let exceptional_positions = match values.logical_validity() {
LogicalValidity::AllValid(_) => exceptional_positions,
LogicalValidity::AllInvalid(_) => Buffer::empty(),
LogicalValidity::Array(is_valid) => {
let is_valid_buf = is_valid.into_bool()?.boolean_buffer();
exceptional_positions
.into_iter()
// index is a valid usize because it is an index into values.as_slice::<T>()
.filter(|index| is_valid_buf.value(*index as usize))
.collect()
}
};
ALPArray::try_new(encoded, exponents, patches)
let patches = if exceptional_positions.is_empty() {
None
} else {
let patches_validity = if values.dtype().is_nullable() {
Validity::AllValid
} else {
Validity::NonNullable
};
let exceptional_values: Buffer<T> = exceptional_positions
.iter()
.map(|index| values_slice[*index as usize])
.collect();
let exceptional_values =
PrimitiveArray::new(exceptional_values, patches_validity).into_array();
Some(Patches::new(
values_slice.len(),
exceptional_positions.into_array(),
exceptional_values,
))
};
Ok((exponents, encoded_array, patches))
}

pub fn decompress(array: ALPArray) -> VortexResult<PrimitiveArray> {
Expand All @@ -83,7 +115,7 @@ mod tests {
use core::f64;

use vortex_array::compute::scalar_at;
use vortex_array::validity::Validity;
use vortex_array::validity::{ArrayValidity as _, Validity};
use vortex_buffer::{buffer, Buffer};

use super::*;
Expand Down Expand Up @@ -148,6 +180,39 @@ mod tests {
assert_eq!(values.as_slice(), decoded.as_slice::<f64>());
}

#[test]
#[allow(clippy::approx_constant)] // ALP doesn't like E
fn test_compress_ignores_invalid_exceptional_values() {
let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0];
let array = PrimitiveArray::new(values, Validity::from_iter([true, true, false, true]));
let encoded = alp_encode(&array).unwrap();
assert!(encoded.patches().is_none());
assert_eq!(
encoded
.encoded()
.into_primitive()
.unwrap()
.as_slice::<i64>(),
vec![1234i64, 2718, 1234, 4000] // fill forward
);
assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 });

let decoded = decompress(encoded).unwrap();
assert_eq!(
scalar_at(&decoded, 0).unwrap(),
scalar_at(&array, 0).unwrap()
);
assert_eq!(
scalar_at(&decoded, 1).unwrap(),
scalar_at(&array, 1).unwrap()
);
assert!(!decoded.is_valid(2));
assert_eq!(
scalar_at(&decoded, 3).unwrap(),
scalar_at(&array, 3).unwrap()
);
}

#[test]
#[allow(clippy::approx_constant)] // ALP doesn't like E
fn test_nullable_patched_scalar_at() {
Expand All @@ -168,6 +233,7 @@ mod tests {
assert!(s.is_valid());
}

assert!(!encoded.is_valid(4));
let s = scalar_at(encoded.as_ref(), 4).unwrap();
assert!(s.is_null());

Expand All @@ -190,7 +256,6 @@ mod tests {
);
let alp_arr = alp_encode(&original).unwrap();
let decompressed = alp_arr.into_primitive().unwrap();
assert_eq!(original.as_slice::<f64>(), decompressed.as_slice::<f64>());
assert_eq!(original.validity(), decompressed.validity());
}
}
7 changes: 6 additions & 1 deletion encodings/alp/src/alp/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use vortex_array::compute::{
filter, scalar_at, slice, take, ComputeVTable, FilterFn, FilterMask, ScalarAtFn, SliceFn,
TakeFn,
};
use vortex_array::validity::ArrayValidity as _;
use vortex_array::variants::PrimitiveArrayTrait;
use vortex_array::{ArrayDType, ArrayData, IntoArrayData};
use vortex_error::VortexResult;
Expand Down Expand Up @@ -29,9 +30,13 @@ impl ComputeVTable for ALPEncoding {

impl ScalarAtFn<ALPArray> for ALPEncoding {
fn scalar_at(&self, array: &ALPArray, index: usize) -> VortexResult<Scalar> {
if !array.encoded().is_valid(index) {
return Ok(Scalar::null(array.dtype().clone()));
}

if let Some(patches) = array.patches() {
if let Some(patch) = patches.get_patched(index)? {
return Ok(patch);
return patch.cast(array.dtype());
}
}

Expand Down
Loading
Loading