diff --git a/Cargo.lock b/Cargo.lock index 422fa77d226..d1a820ad944 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5252,9 +5252,11 @@ dependencies = [ name = "vortex-alp" version = "0.23.0" dependencies = [ + "arrow-array", "divan", "itertools 0.14.0", "num-traits", + "rand", "rstest", "serde", "vortex-array", diff --git a/encodings/alp/Cargo.toml b/encodings/alp/Cargo.toml index f7462689693..b6831fb1644 100644 --- a/encodings/alp/Cargo.toml +++ b/encodings/alp/Cargo.toml @@ -17,6 +17,7 @@ readme = { workspace = true } workspace = true [dependencies] +arrow-array = { workspace = true } itertools = { workspace = true } num-traits = { workspace = true } serde = { workspace = true, features = ["derive"] } @@ -30,6 +31,7 @@ vortex-scalar = { workspace = true } [dev-dependencies] divan = { workspace = true } +rand = { workspace = true } rstest = { workspace = true } vortex-array = { workspace = true, features = ["test-harness"] } diff --git a/encodings/alp/benches/alp_compress.rs b/encodings/alp/benches/alp_compress.rs index d51d34038de..561665a6a9a 100644 --- a/encodings/alp/benches/alp_compress.rs +++ b/encodings/alp/benches/alp_compress.rs @@ -1,27 +1,60 @@ #![allow(clippy::unwrap_used)] use divan::Bencher; -use vortex_alp::{ALPFloat, ALPRDFloat, Exponents, RDEncoder}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng as _}; +use vortex_alp::{alp_encode, ALPFloat, ALPRDFloat, RDEncoder}; use vortex_array::array::PrimitiveArray; use vortex_array::validity::Validity; use vortex_array::IntoCanonical; -use vortex_buffer::{buffer, Buffer}; +use vortex_buffer::buffer; +use vortex_dtype::NativePType; fn main() { divan::main(); } -#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])] -fn compress_alp(n: usize) -> (Exponents, Buffer, Buffer, Buffer) { - let values: Vec = vec![T::from(1.234).unwrap(); n]; - T::encode(values.as_slice(), None) +#[divan::bench(types = [f32, f64], args = [ + (100_000, 0.25), + (10_000_000, 0.25), + (100_000, 0.95), + (10_000_000, 0.95), + (100_000, 1.0), + (10_000_000, 1.0), +])] +fn compress_alp(bencher: Bencher, args: (usize, f64)) { + let (n, fraction_valid) = args; + let mut rng = StdRng::seed_from_u64(0); + let values = buffer![T::from(1.234).unwrap(); n]; + let validity = if fraction_valid < 1.0 { + Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid))) + } else { + Validity::NonNullable + }; + bencher.bench_local(move || { + alp_encode(&PrimitiveArray::new(values.clone(), validity.clone())).unwrap() + }) } -#[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])] -fn decompress_alp(bencher: Bencher, n: usize) { - let values: Vec = vec![T::from(1.234).unwrap(); n]; - let (exponents, encoded, ..) = T::encode(values.as_slice(), None); - bencher.bench_local(move || T::decode(&encoded, exponents)); +#[divan::bench(types = [f32, f64], args = [ + (100_000, 0.25), + (10_000_000, 0.25), + (100_000, 0.95), + (10_000_000, 0.95), + (100_000, 1.0), + (10_000_000, 1.0), +])] +fn decompress_alp(bencher: Bencher, args: (usize, f64)) { + let (n, fraction_valid) = args; + let mut rng = StdRng::seed_from_u64(0); + let values = buffer![T::from(1.234).unwrap(); n]; + let validity = if fraction_valid < 1.0 { + Validity::from_iter((0..values.len()).map(|_| rng.gen_bool(fraction_valid))) + } else { + Validity::NonNullable + }; + let array = alp_encode(&PrimitiveArray::new(values, validity)).unwrap(); + bencher.bench_local(move || array.clone().into_canonical().unwrap()); } #[divan::bench(types = [f32, f64], args = [100_000, 10_000_000])] diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index fd670ef0da7..968e736f197 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -46,6 +46,14 @@ impl ALPArray { let mut children = Vec::with_capacity(2); children.push(encoded); if let Some(patches) = &patches { + if patches.dtype() != &dtype { + vortex_bail!(MismatchedTypes: dtype, patches.dtype()); + } + + if patches.values().logical_validity()?.false_count() != 0 { + vortex_bail!("ALPArray: patches must not contain invalid entries"); + } + children.push(patches.indices().clone()); children.push(patches.values().clone()); } diff --git a/encodings/alp/src/alp/compress.rs b/encodings/alp/src/alp/compress.rs index 3a40cd57079..4026b6c2409 100644 --- a/encodings/alp/src/alp/compress.rs +++ b/encodings/alp/src/alp/compress.rs @@ -1,9 +1,11 @@ use vortex_array::array::PrimitiveArray; use vortex_array::patches::Patches; +use vortex_array::validity::Validity; use vortex_array::variants::PrimitiveArrayTrait; use vortex_array::{Array, IntoArray, IntoArrayVariant}; +use vortex_buffer::Buffer; use vortex_dtype::{NativePType, PType}; -use vortex_error::{vortex_bail, VortexResult, VortexUnwrap}; +use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::ScalarType; use crate::alp::{ALPArray, ALPFloat}; @@ -24,39 +26,73 @@ macro_rules! match_each_alp_float_ptype { }) } -pub fn alp_encode_components( +pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult { + let (exponents, encoded, patches) = alp_encode_components(parray)?; + ALPArray::try_new(encoded, exponents, patches) +} + +pub fn alp_encode_components( + parray: &PrimitiveArray, +) -> VortexResult<(Exponents, Array, Option)> { + match parray.ptype() { + PType::F32 => alp_encode_components_typed::(parray), + PType::F64 => alp_encode_components_typed::(parray), + _ => vortex_bail!("ALP can only encode f32 and f64"), + } +} + +#[allow(clippy::cast_possible_truncation)] +fn alp_encode_components_typed( values: &PrimitiveArray, - exponents: Option, -) -> (Exponents, Array, Option) +) -> VortexResult<(Exponents, Array, Option)> where T: ALPFloat + NativePType, T::ALPInt: NativePType, T: ScalarType, { - let (exponents, encoded, exc_pos, exc) = T::encode(values.as_slice::(), exponents); - let len = encoded.len(); - ( - exponents, - PrimitiveArray::new(encoded, values.validity()).into_array(), - (!exc.is_empty()).then(|| { - let position_arr = exc_pos.into_array(); - let patch_validity = values.validity().take(&position_arr).vortex_unwrap(); - Patches::new( - len, - position_arr, - PrimitiveArray::new(exc, patch_validity).into_array(), - ) - }), - ) -} + let values_slice = values.as_slice::(); -pub fn alp_encode(parray: &PrimitiveArray) -> VortexResult { - let (exponents, encoded, patches) = match parray.ptype() { - PType::F32 => alp_encode_components::(parray, None), - PType::F64 => alp_encode_components::(parray, None), - _ => vortex_bail!("ALP can only encode f32 and f64"), + let exponents = T::find_best_exponents(values_slice); + let (encoded, exceptional_positions) = T::encode_chunkwise(values_slice, exponents); + + let encoded_array = PrimitiveArray::new(encoded, values.validity()).into_array(); + + let validity = values.logical_validity()?; + let n_valid = validity.true_count(); + // exceptional_positions may contain exceptions at invalid positions (which contain garbage + // data). We remove invalid exceptional positions in order to keep the Patches small. + let valid_exceptional_positions = if n_valid == 0 { + Buffer::empty() + } else if n_valid == values.len() { + exceptional_positions + } else { + exceptional_positions + .into_iter() + .filter(|index| validity.value(*index as usize)) + .collect() }; - ALPArray::try_new(encoded, exponents, patches) + + let patches = if valid_exceptional_positions.is_empty() { + None + } else { + let patches_validity = if values.dtype().is_nullable() { + Validity::AllValid + } else { + Validity::NonNullable + }; + let exceptional_values: Buffer = valid_exceptional_positions + .iter() + .map(|index| values_slice[*index as usize]) + .collect(); + let exceptional_values = + PrimitiveArray::new(exceptional_values, patches_validity).into_array(); + Some(Patches::new( + values_slice.len(), + valid_exceptional_positions.into_array(), + exceptional_values, + )) + }; + Ok((exponents, encoded_array, patches)) } pub fn decompress(array: ALPArray) -> VortexResult { @@ -85,6 +121,7 @@ mod tests { use vortex_array::compute::scalar_at; use vortex_array::validity::Validity; use vortex_buffer::{buffer, Buffer}; + use vortex_scalar::Scalar; use super::*; @@ -128,7 +165,7 @@ mod tests { } #[test] - #[allow(clippy::approx_constant)] // ALP doesn't like E + #[allow(clippy::approx_constant)] // Clippy objects to 2.718, an approximation of e, the base of the natural logarithm. fn test_patched_compress() { let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0]; let array = PrimitiveArray::new(values.clone(), Validity::NonNullable); @@ -140,7 +177,7 @@ mod tests { .into_primitive() .unwrap() .as_slice::(), - vec![1234i64, 2718, 1234, 4000] // fill forward + vec![1234i64, 2718, 1234, 4000] ); assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 }); @@ -148,6 +185,39 @@ mod tests { assert_eq!(values.as_slice(), decoded.as_slice::()); } + #[test] + #[allow(clippy::approx_constant)] // Clippy objects to 2.718, an approximation of e, the base of the natural logarithm. + fn test_compress_ignores_invalid_exceptional_values() { + let values = buffer![1.234f64, 2.718, f64::consts::PI, 4.0]; + let array = PrimitiveArray::new(values, Validity::from_iter([true, true, false, true])); + let encoded = alp_encode(&array).unwrap(); + assert!(encoded.patches().is_none()); + assert_eq!( + encoded + .encoded() + .into_primitive() + .unwrap() + .as_slice::(), + vec![1234i64, 2718, 1234, 4000] + ); + assert_eq!(encoded.exponents(), Exponents { e: 16, f: 13 }); + + let decoded = decompress(encoded).unwrap(); + assert_eq!( + scalar_at(&decoded, 0).unwrap(), + scalar_at(&array, 0).unwrap() + ); + assert_eq!( + scalar_at(&decoded, 1).unwrap(), + scalar_at(&array, 1).unwrap() + ); + assert!(!decoded.is_valid(2).unwrap()); + assert_eq!( + scalar_at(&decoded, 3).unwrap(), + scalar_at(&array, 3).unwrap() + ); + } + #[test] #[allow(clippy::approx_constant)] // ALP doesn't like E fn test_nullable_patched_scalar_at() { @@ -168,6 +238,7 @@ mod tests { assert!(s.is_valid()); } + assert!(!encoded.is_valid(4).unwrap()); let s = scalar_at(encoded.as_ref(), 4).unwrap(); assert!(s.is_null()); @@ -190,7 +261,23 @@ mod tests { ); let alp_arr = alp_encode(&original).unwrap(); let decompressed = alp_arr.into_primitive().unwrap(); - assert_eq!(original.as_slice::(), decompressed.as_slice::()); + assert_eq!( + // The second and third values become exceptions and are replaced + [195.26274, 195.26274, 195.26274], + decompressed.as_slice::() + ); assert_eq!(original.validity(), decompressed.validity()); + assert_eq!( + scalar_at(&original, 0).unwrap(), + Scalar::null_typed::() + ); + assert_eq!( + scalar_at(&original, 1).unwrap(), + Scalar::null_typed::() + ); + assert_eq!( + scalar_at(&original, 2).unwrap(), + Scalar::null_typed::() + ); } } diff --git a/encodings/alp/src/alp/compute/mod.rs b/encodings/alp/src/alp/compute/mod.rs index bc857290d48..dc189c111a9 100644 --- a/encodings/alp/src/alp/compute/mod.rs +++ b/encodings/alp/src/alp/compute/mod.rs @@ -36,9 +36,13 @@ impl ComputeVTable for ALPEncoding { impl ScalarAtFn for ALPEncoding { fn scalar_at(&self, array: &ALPArray, index: usize) -> VortexResult { + if !array.encoded().is_valid(index)? { + return Ok(Scalar::null(array.dtype().clone())); + } + if let Some(patches) = array.patches() { if let Some(patch) = patches.get_patched(index)? { - return Ok(patch); + return patch.cast(array.dtype()); } } diff --git a/encodings/alp/src/alp/mod.rs b/encodings/alp/src/alp/mod.rs index 1c205567bb5..1fe4d72fc77 100644 --- a/encodings/alp/src/alp/mod.rs +++ b/encodings/alp/src/alp/mod.rs @@ -35,7 +35,7 @@ mod private { } pub trait ALPFloat: private::Sealed + Float + Display + 'static { - type ALPInt: PrimInt + Display + ToPrimitive + Copy; + type ALPInt: PrimInt + Display + ToPrimitive + Copy + Default; const FRACTIONAL_BITS: u8; const MAX_EXPONENT: u8; @@ -63,18 +63,17 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { values .iter() .step_by(values.len() / SAMPLE_SIZE) + .take(SAMPLE_SIZE) .cloned() .collect_vec() }); for e in (0..Self::MAX_EXPONENT).rev() { for f in 0..e { - let (_, encoded, _, exc_patches) = Self::encode( - sample.as_deref().unwrap_or(values), - Some(Exponents { e, f }), - ); + let (encoded, exceptional_positions) = + Self::encode(sample.as_deref().unwrap_or(values), Exponents { e, f }); - let size = Self::estimate_encoded_size(&encoded, &exc_patches); + let size = Self::estimate_encoded_size(&encoded, exceptional_positions.len()); if size < best_nbytes { best_nbytes = size; best_exp = Exponents { e, f }; @@ -88,7 +87,7 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { } #[inline] - fn estimate_encoded_size(encoded: &[Self::ALPInt], patches: &[Self]) -> usize { + fn estimate_encoded_size(encoded: &[Self::ALPInt], n_exceptions: usize) -> usize { let bits_per_encoded = encoded .iter() .minmax() @@ -107,42 +106,85 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { let encoded_bytes = (encoded.len() * bits_per_encoded + 7) / 8; // each patch is a value + a position // in practice, patch positions are in [0, u16::MAX] because of how we chunk - let patch_bytes = patches.len() * (size_of::() + size_of::()); + let patch_bytes = n_exceptions * (size_of::() + size_of::()); encoded_bytes + patch_bytes } - fn encode( + /// A quantity of [Self] expected to fit into L1 cache. + const ENCODE_CHUNK_SIZE: usize = (32 << 10) / size_of::(); + + /// ALP encode chunk-by-chunk. + /// + /// Unlike [Self::encode], this operation processes no more than [Self::ENCODE_CHUNK_SIZE] + /// elements at once which can make better use of the L1 cache because [Self::encode] makes two + /// passes over `values`: first to encode and second to extract the exceptional values. + fn encode_chunkwise( values: &[Self], - exponents: Option, - ) -> (Exponents, Buffer, Buffer, Buffer) { - let exp = exponents.unwrap_or_else(|| Self::find_best_exponents(values)); - - let mut encoded_output = BufferMut::::with_capacity(values.len()); - let mut patch_indices = BufferMut::::with_capacity(values.len()); - let mut patch_values = BufferMut::::with_capacity(values.len()); - let mut fill_value: Option = None; - - // this is intentionally branchless - // we batch this into 32KB of values at a time to make it more L1 cache friendly - let encode_chunk_size: usize = (32 << 10) / size_of::(); - for chunk in values.chunks(encode_chunk_size) { - encode_chunk_unchecked( - chunk, - exp, - &mut encoded_output, - &mut patch_indices, - &mut patch_values, - &mut fill_value, - ); + exponents: Exponents, + ) -> (Buffer, Buffer) { + let mut encoded = BufferMut::::with_capacity(values.len()); + let mut patch_indices = BufferMut::::empty(); + for chunk in values.chunks(Self::ENCODE_CHUNK_SIZE) { + let (encoded_chunk, patches_indices_chunk) = Self::encode(chunk, exponents); + encoded.extend(encoded_chunk); + patch_indices.extend(patches_indices_chunk); + } + (encoded.freeze(), patch_indices.freeze()) + } + + /// ALP encode the given values using the given exponents. + /// + /// The index of each value for which encode-decode is not the identity function is returned. + /// + /// See also: [Self::encode_chunkwise]. + #[allow(clippy::cast_possible_truncation)] // The patch_indices are known to be valid indices into encoded. + fn encode(values: &[Self], exponents: Exponents) -> (Vec, Vec) { + let (mut encoded, needs_patch): (Vec, Vec) = values + .iter() + .map(|value| { + let encoded = unsafe { Self::encode_single_unchecked(*value, exponents) }; + let maybe_decoded = Self::decode_single(encoded, exponents); + let needs_patch = maybe_decoded != *value; + (encoded, needs_patch) + }) + .unzip(); + + // Patched values either have tiny differences (e.g. 1.01, 1.02, 1.00000001) or big + // differences (e.g. 1.01, 1.02, 1000.0). In the latter case, this large value + // prevents bitpacking (or forces patches into bitpacking). + // + // Zero allows bitpacking but prevents frame-of-reference encoding, so we choose the first + // successfully encoded value. + let patch_indices: Vec = needs_patch + .into_iter() + .enumerate() + .filter(|(_, needs_patch)| *needs_patch) + .map(|(index, _)| index as u64) + .collect(); + + if !patch_indices.is_empty() { + let fill_value = + Self::first_non_patched_encoded_value(&encoded, &patch_indices).unwrap_or_default(); + for index in patch_indices.iter() { + let index = *index as usize; + encoded[index] = fill_value; + } } - ( - exp, - encoded_output.freeze(), - patch_indices.freeze(), - patch_values.freeze(), - ) + (encoded, patch_indices) + } + + fn first_non_patched_encoded_value( + encoded: &[Self::ALPInt], + patch_indices: &[u64], + ) -> Option { + for index in 0..encoded.len() { + if index >= patch_indices.len() || patch_indices[index] != index as u64 { + return Some(encoded[index]); + } + } + None } #[inline] @@ -195,79 +237,6 @@ pub trait ALPFloat: private::Sealed + Float + Display + 'static { } } -#[allow(clippy::cast_possible_truncation)] -fn encode_chunk_unchecked( - chunk: &[T], - exp: Exponents, - encoded_output: &mut BufferMut, - patch_indices: &mut BufferMut, - patch_values: &mut BufferMut, - fill_value: &mut Option, -) { - let num_prev_encoded = encoded_output.len(); - let num_prev_patches = patch_indices.len(); - assert_eq!(patch_indices.len(), patch_values.len()); - let has_filled = fill_value.is_some(); - - // encode the chunk, counting the number of patches - let mut chunk_patch_count = 0; - encoded_output.extend(chunk.iter().map(|v| { - let encoded = unsafe { T::encode_single_unchecked(*v, exp) }; - let decoded = T::decode_single(encoded, exp); - let neq = (decoded != *v) as usize; - chunk_patch_count += neq; - encoded - })); - let chunk_patch_count = chunk_patch_count; // immutable hereafter - assert_eq!(encoded_output.len(), num_prev_encoded + chunk.len()); - - if chunk_patch_count > 0 { - // we need to gather the patches for this chunk - // preallocate space for the patches (plus one because our loop may attempt to write one past the end) - patch_indices.reserve(chunk_patch_count + 1); - patch_values.reserve(chunk_patch_count + 1); - - // record the patches in this chunk - let patch_indices_mut = patch_indices.spare_capacity_mut(); - let patch_values_mut = patch_values.spare_capacity_mut(); - let mut chunk_patch_index = 0; - for i in num_prev_encoded..encoded_output.len() { - let decoded = T::decode_single(encoded_output[i], exp); - // write() is only safe to call more than once because the values are primitive (i.e., Drop is a no-op) - patch_indices_mut[chunk_patch_index].write(i as u64); - patch_values_mut[chunk_patch_index].write(chunk[i - num_prev_encoded]); - chunk_patch_index += (decoded != chunk[i - num_prev_encoded]) as usize; - } - assert_eq!(chunk_patch_index, chunk_patch_count); - unsafe { - patch_indices.set_len(num_prev_patches + chunk_patch_count); - patch_values.set_len(num_prev_patches + chunk_patch_count); - } - } - - // find the first successfully encoded value (i.e., not patched) - // this is our fill value for missing values - if fill_value.is_none() && (num_prev_encoded + chunk_patch_count < encoded_output.len()) { - assert_eq!(num_prev_encoded, num_prev_patches); - for i in num_prev_encoded..encoded_output.len() { - if i >= patch_indices.len() || patch_indices[i] != i as u64 { - *fill_value = Some(encoded_output[i]); - break; - } - } - } - - // replace the patched values in the encoded array with the fill value - // for better downstream compression - if let Some(fill_value) = fill_value { - // handle the edge case where the first N >= 1 chunks are all patches - let start_patch = if !has_filled { 0 } else { num_prev_patches }; - for patch_idx in &patch_indices[start_patch..] { - encoded_output[*patch_idx as usize] = *fill_value; - } - } -} - impl ALPFloat for f32 { type ALPInt = i32; const FRACTIONAL_BITS: u8 = 23; diff --git a/vortex-sampling-compressor/src/compressors/alp.rs b/vortex-sampling-compressor/src/compressors/alp.rs index 4e82c88ed1b..9fb53c17dae 100644 --- a/vortex-sampling-compressor/src/compressors/alp.rs +++ b/vortex-sampling-compressor/src/compressors/alp.rs @@ -1,6 +1,4 @@ -use vortex_alp::{ - alp_encode_components, match_each_alp_float_ptype, ALPArray, ALPEncoding, ALPRDEncoding, -}; +use vortex_alp::{alp_encode_components, ALPArray, ALPEncoding, ALPRDEncoding}; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::PrimitiveArray; use vortex_array::variants::PrimitiveArrayTrait; @@ -43,12 +41,8 @@ impl EncodingCompressor for ALPCompressor { like: Option>, ctx: SamplingCompressor<'a>, ) -> VortexResult> { - let parray = array.clone().into_primitive()?; - - let (exponents, encoded, patches) = match_each_alp_float_ptype!( - parray.ptype(), |$T| { - alp_encode_components::<$T>(&parray, None) - }); + let (exponents, encoded, patches) = + alp_encode_components(&array.clone().into_primitive()?)?; let compressed_encoded = ctx .named("packed")