diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 120000 index 00000000..30fa1cea --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1 @@ +config \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index c63c715c..d2ee0703 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1495,9 +1495,9 @@ dependencies = [ [[package]] name = "pgrx" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6468e2a7c4085707209cf7f7e14a7b73c8b7f41fc716283024a09180b620a45" +checksum = "227bf7e162ce710994306a97bc56bb3fe305f21120ab6692e2151c48416f5c0d" dependencies = [ "atomic-traits", "bitflags 2.6.0", @@ -1519,9 +1519,9 @@ dependencies = [ [[package]] name = "pgrx-bindgen" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04d822012e4919882cb9b1851ae9e4b4b03703ddf2ef4af3522025c8944e9c7d" +checksum = "81cbcd956c2da35baaf0a116e6f6a49a6c2fbc8f6b332f66d6fd060bfd00615f" dependencies = [ "bindgen", "cc", @@ -1537,9 +1537,9 @@ dependencies = [ [[package]] name = "pgrx-macros" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ffdb879a3880d3034661b4d19f802029abfcde6b8232299f4d4d2c202680af" +checksum = "e2f4291450d65e4deb770ce57ea93e22353d97950566222429cd166ebdf6f938" dependencies = [ "pgrx-sql-entity-graph", "proc-macro2", @@ -1549,9 +1549,9 @@ dependencies = [ [[package]] name = "pgrx-pg-config" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "945d664e710e4dcccd459628c6e58b3ccbd0495d2ed751b1d6f73996ceca01a2" +checksum = "86a64a4c6e4e43e73cf8d3379d9533df98ded45c920e1ba8131c979633d74132" dependencies = [ "cargo_toml", "eyre", @@ -1567,9 +1567,9 @@ dependencies = [ [[package]] name = "pgrx-pg-sys" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "489c96adc8af0b917165f47a11e25f77753e227144aa3790be7e241afab9886c" +checksum = "63a5dc64f2a8226434118aa2c4700450fa42b04f29488ad98268848b21c1a4ec" dependencies = [ "cee-scape", "libc", @@ -1582,9 +1582,9 @@ dependencies = [ [[package]] name = "pgrx-sql-entity-graph" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de77a74f3dc80283a97322ab4d65f9f81691eb54d0a5b7c37b93686d351c6096" +checksum = "d81cc2e851c7e36b2f47c03e22d64d56c1d0e762fbde0039ba2cd490cfef3615" dependencies = [ "convert_case", "eyre", @@ -1598,9 +1598,9 @@ dependencies = [ [[package]] name = "pgrx-tests" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73cbf89286dda64fb8ee2126794d140de2294d0b8e04604b5ecddd6ee291fbae" +checksum = "0c2dd5d674cb7d92024709543da06d26723a2f7450c02083116b232587160929" dependencies = [ "clap-cargo", "eyre", @@ -2427,7 +2427,7 @@ dependencies = [ [[package]] name = "timescaledb_toolkit" -version = "1.19.0-dev" +version = "1.20.0-dev" dependencies = [ "aggregate_builder", "approx 0.4.0", diff --git a/Changelog.md b/Changelog.md index 41e153d5..7bfa04a7 100644 --- a/Changelog.md +++ b/Changelog.md @@ -16,6 +16,18 @@ This changelog should be updated as part of a PR if the work is worth noting (mo **Full Changelog**: [TODO] +## [1.19.0](https://github.com/timescale/timescaledb-toolkit/releases/tag/1.19.0) (2024-11-14) + +#### New experimental features + +#### Bug fixes + +#### Other notable changes + +#### Shout-outs + +**Full Changelog**: [TODO] + ## [1.18.0](https://github.com/timescale/timescaledb-toolkit/releases/tag/1.18.0) (2023-11-28) #### New experimental features diff --git a/crates/udd-sketch/src/compactor.rs b/crates/udd-sketch/src/compactor.rs new file mode 100644 index 00000000..86df64ae --- /dev/null +++ b/crates/udd-sketch/src/compactor.rs @@ -0,0 +1,45 @@ +use crate::swap::SwapBucket; +use crate::SketchHashKey::Invalid; +use crate::{SketchHashEntry, SketchHashKey, SketchHashMap}; + +#[inline] +pub fn compact_from_iter( + swap_iter: &mut impl Iterator, + map: &mut SketchHashMap, +) { + let Some(mut current) = swap_iter.next() else { + return; + }; + + for next in swap_iter { + if next.key == Invalid { + break; + } + + // This combines those buckets that compact into the same one + // For example, Positive(9) and Positive(8) both + // compact into Positive(4) + if current.key == next.key { + current.count += next.count; + } else { + map.map.insert( + current.key, + SketchHashEntry { + count: current.count, + next: next.key, + }, + ); + current = next; + } + } + + // And the final one ... + map.map.insert( + current.key, + SketchHashEntry { + count: current.count, + next: Invalid, + }, + ); + map.head = map.head.compact_key(); +} diff --git a/crates/udd-sketch/src/compactor_attempt.rs b/crates/udd-sketch/src/compactor_attempt.rs new file mode 100644 index 00000000..e69de29b diff --git a/crates/udd-sketch/src/lib.rs b/crates/udd-sketch/src/lib.rs index 315feef8..a63c1b46 100644 --- a/crates/udd-sketch/src/lib.rs +++ b/crates/udd-sketch/src/lib.rs @@ -2,18 +2,27 @@ //! Based on the paper: https://arxiv.org/abs/2004.08604 use serde::{Deserialize, Serialize}; +use std::cmp::Ordering; use std::collections::HashMap; +use crate::compactor::compact_from_iter; +use crate::swap::{Swap, SwapBucket}; +use crate::SketchHashKey::{Invalid, Zero}; #[cfg(test)] use ordered_float::OrderedFloat; #[cfg(test)] use std::collections::HashSet; + #[cfg(test)] extern crate quickcheck; #[cfg(test)] #[macro_use(quickcheck)] extern crate quickcheck_macros; +mod compactor; +mod sketchhashvec; +mod swap; + // This is used to index the buckets of the UddSketch. In particular, because UddSketch stores values // based on a logarithmic scale, we need to track negative values separately from positive values, and // zero also needs special casing. @@ -26,8 +35,8 @@ pub enum SketchHashKey { } // Invalid is treated as greater than valid values (making it a nice boundary value for list end) -impl std::cmp::PartialOrd for SketchHashKey { - fn partial_cmp(&self, other: &Self) -> Option { +impl Ord for SketchHashKey { + fn cmp(&self, other: &Self) -> Ordering { use self::SketchHashKey::*; use std::cmp::Ordering::*; match (self, other) { @@ -37,14 +46,19 @@ impl std::cmp::PartialOrd for SketchHashKey { (Zero, Zero) => Equal, (Positive(a), Positive(b)) => a.cmp(b), (Negative(a), Negative(b)) => a.cmp(b).reverse(), - (_, Positive(_)) => Less, - (Positive(_), _) => Greater, - (_, Negative(_)) => Greater, - (Negative(_), _) => Less, + (Negative(_) | Zero, Positive(_)) => Less, + (Positive(_), Negative(_) | Zero) => Greater, + (Zero, Negative(_)) => Greater, + (Negative(_), Zero) => Less, } .into() } } +impl PartialOrd for SketchHashKey { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} impl SketchHashKey { /// This is the key corresponding to the current key after the SketchHashMap it refers to has gone through one compaction. @@ -57,7 +71,23 @@ impl SketchHashKey { Positive(i64::MAX) => *self, Negative(x) => Negative(if x > 0 { x + 1 } else { x } / 2), Positive(x) => Positive(if x > 0 { x + 1 } else { x } / 2), - x => x, // Zero and Invalid don't compact + Invalid | Zero => *self, // Zero and Invalid don't compact + } + } + + /// Compact the key multiple times. Useful for repeated compaction calls, + /// which may occur when merging two sketches + fn compact_key_multiple(&self, times: u32) -> SketchHashKey { + use SketchHashKey::*; + + match *self { + Negative(i64::MAX | 0) => *self, // Infinite buckets remain infinite + Positive(i64::MAX | 0) => *self, + Negative(x) if x > 0 => Negative(x.saturating_add(i64::from(times))), + Positive(x) if x > 0 => Positive(x.saturating_add(i64::from(times))), + Positive(x) => Positive(x / 2_i64.saturating_pow(times)), + Negative(x) => Negative(x / 2_i64.saturating_pow(times)), + Invalid | Zero => *self, // Zero and Invalid don't compact } } } @@ -113,6 +143,13 @@ impl SketchHashMap { } } + fn with_capacity(capacity: usize) -> SketchHashMap { + SketchHashMap { + map: HashMap::with_capacity(capacity), + head: SketchHashKey::Invalid, + } + } + // Increment the count at a key, creating the entry if needed. fn increment(&mut self, key: SketchHashKey) { self.entry(key).count += 1; @@ -153,6 +190,27 @@ impl SketchHashMap { // Combine adjacent buckets fn compact(&mut self) { + self.compact_with_swap(&mut Swap::default()) + } + + /// `compact_with_swap` will reuse the provided `Swap`. When + /// this function is called in a loop, this reuse of the `Swap` ensures + /// we don't malloc/free multiple times, but reuse that piece of memory + fn compact_with_swap(&mut self, swap: &mut Swap) { + debug_assert!(swap.buckets.is_empty()); + swap.buckets.reserve(self.map.len()); + + for (k, v) in self.map.drain() { + swap.buckets.push(SwapBucket { + key: k.compact_key(), + count: v.count, + }); + } + + swap.populate(self) + } + // Combine adjacent buckets (former implementation) + fn compact_old(&mut self) { let mut target = self.head; // TODO can we do without this additional map? let old_map = std::mem::take(&mut self.map); @@ -187,6 +245,8 @@ impl SketchHashMap { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct UDDSketch { buckets: SketchHashMap, + #[serde(skip)] + swap: Swap, alpha: f64, gamma: f64, compactions: u32, // should always be smaller than 64 @@ -206,49 +266,49 @@ impl UDDSketch { max_buckets, num_values: 0, values_sum: 0.0, + swap: Swap::default(), } } - // This constructor is used to recreate a UddSketch from it's component data + /// This constructor is used to recreate a UddSketch from its component data + /// it assumes the provided keys/counts are ordered by the keys. pub fn new_from_data( max_buckets: u64, current_error: f64, compactions: u64, + num_buckets: u32, values: u64, sum: f64, keys: impl Iterator, counts: impl Iterator, ) -> Self { let mut sketch = UDDSketch { - buckets: SketchHashMap::new(), + buckets: SketchHashMap::with_capacity(num_buckets as usize), alpha: current_error, gamma: gamma(current_error), compactions: compactions as u32, max_buckets, num_values: values, values_sum: sum, + swap: Swap::default(), }; - // TODO - let keys: Vec<_> = keys.collect(); - let counts: Vec<_> = counts.collect(); - assert_eq!(keys.len(), counts.len()); - // assert!(keys.is_sorted()); - for i in 0..keys.len() { + + let mut iter = keys.into_iter().zip(counts.into_iter()).peekable(); + sketch.buckets.head = iter.peek().map(|p| p.0).unwrap_or(Invalid); + + while let Some((key, count)) = iter.next() { sketch .buckets .map - .entry(keys[i]) + .entry(key) + .and_modify(|e| e.count += count) .or_insert(SketchHashEntry { - count: 0, - next: if i == keys.len() - 1 { - SketchHashKey::Invalid - } else { - keys[i + 1] - }, - }) - .count = counts[i]; + count, + next: iter.peek().map(|p| p.0).unwrap_or(SketchHashKey::Invalid), + }); } - sketch.buckets.head = keys[0]; + + //assert_eq!(num_buckets as usize, sketch.buckets.map.len()); sketch } @@ -261,7 +321,8 @@ impl UDDSketch { } pub fn compact_buckets(&mut self) { - self.buckets.compact(); + let swap = &mut self.swap; + self.buckets.compact_with_swap(swap); self.compactions += 1; self.gamma *= self.gamma; // See https://arxiv.org/pdf/2004.08604.pdf Equation 3 @@ -285,6 +346,59 @@ impl UDDSketch { self.values_sum += value; } + + /// `merge_items` will merge these values into the current sketch + /// it requires less memory than `merge_sketch`, as that needs a fully serialized + /// `UDDSketch`, whereas this function relies on iterators to do its job. + pub fn merge_items( + &mut self, + mut keys: impl Iterator, + mut counts: impl Iterator, + // We take a lot of values as input, mainly to ensure we can run the + // asserts that are also present in other parts of the codebase. + other_alpha: f64, + other_compactions: u64, + other_values: u64, + other_sum: f64, + other_max_buckets: u32, + ) { + let other_gamma = gamma(other_alpha); + // Require matching initial parameters + debug_assert!( + (self + .gamma + .powf(1.0 / f64::powi(2.0, self.compactions as i32)) + - other_gamma + .powf(1.0 / f64::powi(2.0, other_compactions as i32))) + .abs() + < 1e-9 // f64::EPSILON too small, see issue #396 + ); + debug_assert_eq!(self.max_buckets as u32, other_max_buckets); + + + if other_values == 0 { + return; + } + + while self.compactions < other_compactions as u32 { + self.compact_buckets(); + } + + let extra_compactions = self.compactions - other_compactions as u32; + while let ((Some(mut key), Some(count))) = (keys.next(), counts.next()) { + for _ in 0..extra_compactions { + key = key.compact_key(); + } + self.buckets.entry(key).count += count as u64; + } + + while self.buckets.len() > self.max_buckets as usize { + self.compact_buckets(); + } + + self.num_values += other_values; + self.values_sum += other_sum; + } pub fn merge_sketch(&mut self, other: &UDDSketch) { // Require matching initial parameters assert!( @@ -307,14 +421,24 @@ impl UDDSketch { return; } - let mut other = other.clone(); - - while self.compactions > other.compactions { - other.compact_buckets(); - } - while other.compactions > self.compactions { - self.compact_buckets(); - } + let mut tmp: UDDSketch; + // We only need to fully clone the other sketch + // if we need to compact it. Not doing it + // is useful, as it doesn't require us to + // allocate any more memory. + // We optimize here, as this is code is called frequently + let other = if self.compactions > other.compactions { + tmp = other.clone(); + while self.compactions > tmp.compactions { + tmp.compact_buckets(); + } + &tmp + } else { + while other.compactions > self.compactions { + self.compact_buckets(); + } + other + }; for entry in other.buckets.iter() { let (key, value) = entry; @@ -473,6 +597,8 @@ pub fn gamma(alpha: f64) -> f64 { #[cfg(test)] mod tests { + use std::{thread, time::Duration}; + use rand::{Rng, SeedableRng}; use super::*; @@ -513,6 +639,58 @@ mod tests { assert_eq!(sketch.max_error(), a2); } + // A temporary test to ensure we do the right thing. + #[ignore] + #[test] + fn merge_buckets_compact_implementations() { + let mut sketch1 = UDDSketch::new(20, 0.1); + sketch1.add_value(1.1); // Bucket #1 + sketch1.add_value(1.5); // Bucket #3 + sketch1.add_value(1.6); // Bucket #3 + sketch1.add_value(1.3); // Bucket #2 + sketch1.add_value(4.2); // Bucket #8 + + let mut sketch2 = sketch1.clone(); + + sketch1.buckets.compact_old(); + sketch2.buckets.compact_with_swap(&mut Vec::new()); + + assert_eq!(sketch1.buckets.head, sketch2.buckets.head); + + let mut b1 = sketch1.buckets.map.into_iter().collect::>(); + let mut b2 = sketch2.buckets.map.into_iter().collect::>(); + + b1.sort_unstable_by_key(|k| k.0); + b2.sort_unstable_by_key(|k| k.0); + assert_eq!(b1, b2); + } + + #[test] + fn merge_sketches_malloc() { + let mut sketch1 = UDDSketch::new(20, 0.1); + sketch1.add_value(1.1); // Bucket #1 + sketch1.add_value(1.5); // Bucket #3 + sketch1.add_value(1.6); // Bucket #3 + sketch1.add_value(1.3); // Bucket #2 + sketch1.add_value(4.2); // Bucket #8 + + let mut sketch2 = UDDSketch::new(20, 0.1); + let mut sketch3 = UDDSketch::new(20, 0.1); + let mut a = 1.0; + let mut b = 0.008; + for _ in 0..200 { + a *= 1.33; + b *= 2.987; + sketch2.add_value(a); + sketch3.add_value(b); + } + + for _ in 0..10_000 { + sketch1.merge_sketch(&sketch2); + sketch1.merge_sketch(&sketch3); + } + } + #[test] fn merge_sketches() { let a1 = 0.1; // alpha for up to 20 buckets diff --git a/crates/udd-sketch/src/sketchhashvec.rs b/crates/udd-sketch/src/sketchhashvec.rs new file mode 100644 index 00000000..caccd39b --- /dev/null +++ b/crates/udd-sketch/src/sketchhashvec.rs @@ -0,0 +1,136 @@ +use crate::SketchHashKey::Invalid; +use crate::{SketchHashEntry, SketchHashKey, SketchHashMap}; +use std::collections::VecDeque; + +pub enum SketchHashContainer { + Map(SketchHashMap), + Vec(SketchHashVec), +} + +impl SketchHashMap { + pub fn compact_into(&mut self, vec: &mut SketchHashVec, compactions: u32) { + assert!(vec.inner.is_empty()); + vec.inner.reserve(self.map.len()); + + for (k, v) in self.map.drain() { + let k = k.compact_key_multiple(compactions); + vec.inner.push(SketchBucket::new(k, v.count)); + } + } +} + +#[derive(Copy, Clone)] +struct SketchBucket { + bucket: SketchHashKey, + count: u64, +} + +impl SketchBucket { + const fn new(bucket: SketchHashKey, count: u64) -> Self { + Self { bucket, count } + } +} + +/// A `SketchHashVec` contains the buckets of a `UDDSketch`. +/// It may contain duplicate buckets, for example, after compacting or merging. +/// Therefore, it is not usable by itself by an end-user, but it is an intermediate representation +/// to speed up compactions and merges. +pub struct SketchHashVec { + inner: Vec, +} + +impl SketchHashVec { + /// Compact the values in this `SketchHashVec`. + pub fn compact(&mut self, compactions: u32) { + for entry in self.inner.iter_mut() { + entry.bucket = entry.bucket.compact_key_multiple(compactions); + } + } + + /// `reduce` does an in-place reduction in number of elements + /// in this Vec. It combines adjacent values into 1 aggregated value. + /// It does not allocate. + pub fn reduce(&mut self) { + if self.inner.len() < 2 { + return; + } + + self.inner.sort_unstable_by_key(|k| k.bucket); + + let mut new_idx = 0; + let mut current = self.inner[0]; + + // We're both taking values from the same Vec and populating + // the Vec. That could be frowned upon, however, we ensure that + // - new_idx < old_idx all the time + // - number of new elements <= number of old elements + for old_idx in 1..self.inner.len() { + debug_assert!(new_idx < old_idx); + + let next = self.inner[old_idx]; + if next.bucket != current.bucket { + self.inner[new_idx] = current; + current = next; + new_idx += 1; + } else { + current.count += next.count; + } + } + + // Final one + self.inner[new_idx] = current; + self.inner.truncate(new_idx + 1); + } + + /// Populate the linked-list style of the Map using this Vec. + /// The caller should ensure that the provided `map` is actually empty. + pub fn drain_into(&mut self, map: &mut SketchHashMap) { + assert!(map.map.is_empty()); + + // We need to sort as we want to recreate the linked list style + // in the Hash Map + // We use the `unstable` variant, as it does not allocate, and its + // properties are fine for our use-case, and should perform + // better than the non-stable variant. + // > This sort is unstable (i.e., may reorder equal elements), + // > in-place (i.e., does not allocate), and O(n * log(n)) worst-case. + self.inner.sort_unstable_by_key(|k| k.bucket); + + let mut swap_iter = self.inner.drain(..); + let Some(mut current) = swap_iter.next() else { + map.head = Invalid; + return; + }; + + for next in swap_iter { + if next.bucket == Invalid { + break; + } + + // This combines those buckets that compact into the same one + // For example, Positive(9) and Positive(8) both + // compact into Positive(4) + if current.bucket == next.bucket { + current.count += next.count; + } else { + map.map.insert( + current.bucket, + SketchHashEntry { + count: current.count, + next: next.bucket, + }, + ); + current = next; + } + } + + // And the final one ... + map.map.insert( + current.bucket, + SketchHashEntry { + count: current.count, + next: Invalid, + }, + ); + } +} diff --git a/crates/udd-sketch/src/swap.rs b/crates/udd-sketch/src/swap.rs new file mode 100644 index 00000000..7fd21305 --- /dev/null +++ b/crates/udd-sketch/src/swap.rs @@ -0,0 +1,197 @@ +use crate::swap::Kind::Aggregated; +use crate::SketchHashKey::Invalid; +use crate::{SketchHashEntry, SketchHashKey, SketchHashMap}; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct SwapBucket { + pub key: SketchHashKey, + pub count: u64, +} + +#[derive(Debug, Clone, PartialEq)] +enum Kind { + Ordered, + Unordered, + Aggregated, +} + +/// `Swap` is used for merging sketches. By making it part of the `UDDSketch`, we prevent +/// repeated allocations if it is needed. +/// If it is never used (no aggregations are done) it doesn't heap allocate. +#[derive(Debug, Clone, PartialEq)] +pub struct Swap { + kind: Kind, + pub buckets: Vec, +} + +impl Default for Swap { + fn default() -> Self { + Self { + kind: Aggregated, + buckets: Vec::new(), + } + } +} + + +impl Swap { + /// Populates the `Swap` from the given `SketchHashMap`. Will empty the `SketchHashMap` + pub fn populate_from_map(&mut self, map: &mut SketchHashMap, additional_compactions: u32, capacity: usize) { + assert!(self.buckets.is_empty()); + self.buckets.reserve(map.len()); + + for (mut key, entry) in map.map.drain() { + for _ in 0..additional_compactions { + key = key.compact_key(); + } + self.buckets.push(SwapBucket { + key, + count: entry.count, + }); + } + } + + /// Ensure the number of distinct buckets in this swap is below this value + /// Returns the number of compactions executed in order to do this. + /// The caller is expected to update certain + /// values in response to the + /// number of compactions. + #[must_use] + pub fn reduce_buckets(&mut self, max_buckets: usize) -> u32 { + let mut compactions = 0; + while self.buckets.len() >= max_buckets { + self.aggregate(); + if self.buckets.len() <= max_buckets { + break; + } + self.compact(1); + compactions += 1; + } + + compactions + } + + /// Append the keys and counts to the Swap + /// Returns the number of compactions that were required to be able + /// to stay below `max_buckets` + pub fn append( + &mut self, + keys: impl Iterator, + counts: impl Iterator, + compactions_to_apply_to_keys: u32, + ) { + let mut iter = keys.zip(counts); + + let Some(mut current) = iter.next() else { + return; + }; + + // As we're adding items to our Vec, our ordering is chaos. + self.kind = Kind::Unordered; + + for _ in 0..compactions_to_apply_to_keys { + current.0 = current.0.compact_key(); + } + + while let Some(mut next) = iter.next() { + for _ in 0..compactions_to_apply_to_keys { + next.0 = next.0.compact_key(); + } + + if current.0 != next.0 { + self.buckets.push(SwapBucket { + key: current.0, + count: current.1, + }); + current = next; + } else { + current.1 += next.1; + } + } + + // Final one + self.buckets.push(SwapBucket { + key: current.0, + count: current.1, + }); + } + + /// Populate the `SketchHashMap` with the values from the Swap + /// Drains the `Swap` + pub fn populate(&mut self, map: &mut SketchHashMap) { + debug_assert!(map.map.is_empty()); + + self.aggregate(); + + let mut iter = self.buckets.drain(..).peekable(); + + map.head = iter.peek().map(|i| i.key).unwrap_or(Invalid); + + while let Some(i) = iter.next() { + map.map.insert( + i.key, + SketchHashEntry { + count: i.count, + next: iter.peek().map(|i| i.key).unwrap_or(Invalid), + }, + ); + } + } + + /// Sort the values + fn sort(&mut self) { + if matches!(self.kind, Kind::Unordered) { + self.buckets.sort_unstable_by(|a, b| a.key.cmp(&b.key)); + self.kind = Kind::Ordered; + } + } + + /// Run a number of compactions of the `sketch` + pub fn compact(&mut self, compactions: u32) { + for bucket in self.buckets.iter_mut() { + for _ in 0..compactions { + bucket.key = bucket.key.compact_key(); + } + } + + self.kind = Kind::Unordered; + } + + /// Aggregate the values in the `Vec` so that all elements are unique + /// and are in ascending order + pub fn aggregate(&mut self) { + match &self.kind { + Kind::Ordered => (), + Kind::Unordered => self.sort(), + Kind::Aggregated => return, + }; + debug_assert!(matches!(self.kind, Kind::Ordered)); + + let Some(current) = self.buckets.first() else { + return; + }; + let mut current = *current; + + let mut new_idx = 0; + + // We're both taking values from the same Vec and populating + // the Vec. That could be frowned upon, however, we ensure that + // - new_idx < old_idx all the time + // - number of new elements <= number of old elements + for old_idx in 1..self.buckets.len() { + let next = self.buckets[old_idx]; + if next.key != current.key { + self.buckets[new_idx] = current; + current = next; + new_idx += 1; + } else { + current.count += next.count; + } + } + + // Final one + self.buckets[new_idx] = current; + self.buckets.truncate(new_idx + 1); + self.kind = Kind::Aggregated; + } +} diff --git a/docs/tdigest.md b/docs/tdigest.md index 7583c759..432965bb 100644 --- a/docs/tdigest.md +++ b/docs/tdigest.md @@ -12,7 +12,7 @@ TimescaleDB Toolkit provides an implementation of the [t-digest data structure]( ## Details -Timescale's t-digest is implemented as an aggregate function in PostgreSQL. They do not support moving-aggregate mode, and are not ordered-set aggregates. Presently they are restricted to float values, but the goal is to make them polymorphic. They are partializable and are good candidates for [continuous aggregation](https://docs.timescale.com/latest/using-timescaledb/continuous-aggregates). +Timescale's t-digest is implemented as an aggregate function in PostgreSQL. They do not support moving-aggregate mode, and are not ordered-set aggregates. Presently they are restricted to float values, but the goal is to make them polymorphic. They are partializable and are good candidates for [continuous aggregation](https://docs.timescale.com/use-timescale/latest/continuous-aggregates/). One additional thing to note about TDigests is that they are somewhat dependent on the order of inputs. The percentile approximations should be nearly equal for the same underlying data, especially at the extremes of the quantile range where the TDigest is inherently more accurate, they are unlikely to be identical if built in a different order. While this should have little effect on the accuracy of the estimates, it is worth noting that repeating the creation of the TDigest might have subtle differences if the call is being parallelized by Postgres. Similarly, building a TDigest by combining several subdigests using the [summary aggregate](#tdigest-summary) is likely to produce a subtley different result than combining all of the underlying data using a single [point aggregate](#tdigest). @@ -94,7 +94,7 @@ FROM high_temp; ``` ## Example Using TimeScale Continuous Aggregates (tdigest-cagg-example) -Timescale [continuous aggregates](https://docs.timescale.com/latest/using-timescaledb/continuous-aggregates) +Timescale [continuous aggregates](https://docs.timescale.com/use-timescale/latest/continuous-aggregates/) provide an easy way to keep a tdigest up to date as more data is added to a table. The following example shows how this might look in practice. The first step is to create a Timescale hypertable to store our data. diff --git a/extension/Cargo.toml b/extension/Cargo.toml index e414ad32..2edcf052 100644 --- a/extension/Cargo.toml +++ b/extension/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "timescaledb_toolkit" -version = "1.19.0-dev" +version = "1.20.0-dev" edition = "2021" [[bin]] @@ -11,7 +11,7 @@ path = "./src/bin/pgrx_embed.rs" crate-type = ["cdylib", "lib"] [features] -default = ["pg16"] +default = ["pg17"] pg12 = ["pgrx/pg12", "pgrx-tests/pg12"] pg13 = ["pgrx/pg13", "pgrx-tests/pg13"] pg14 = ["pgrx/pg14", "pgrx-tests/pg14"] @@ -23,9 +23,9 @@ pg_test = ["approx"] [dependencies] # Keep synchronized with `cargo install --version N.N.N cargo-pgrx` in Readme.md and docker/ci/Dockerfile # Also `pgrx-tests` down below in `dev-dependencies`. -pgrx = "=0.12.8" -pgrx-macros = "=0.12.8" -pgrx-sql-entity-graph = "=0.12.8" +pgrx = "=0.12.9" +pgrx-macros = "=0.12.9" +pgrx-sql-entity-graph = "=0.12.9" encodings = {path="../crates/encodings"} flat_serialize = {path="../crates/flat_serialize/flat_serialize"} flat_serialize_macro = {path="../crates/flat_serialize/flat_serialize_macro"} @@ -62,5 +62,5 @@ spfunc = "0.1.0" statrs = "0.15.0" [dev-dependencies] -pgrx-tests = "=0.12.8" +pgrx-tests = "=0.12.9" approx = "0.4.0" diff --git a/extension/src/uddsketch.rs b/extension/src/uddsketch.rs index bcf18bbe..9297d39d 100644 --- a/extension/src/uddsketch.rs +++ b/extension/src/uddsketch.rs @@ -2,7 +2,7 @@ use pgrx::*; use encodings::{delta, prefix_varint}; -use uddsketch::{SketchHashKey, UDDSketch as UddSketchInternal}; +use uddsketch::{gamma, SketchHashKey, UDDSketch as UddSketchInternal}; use crate::{ accessors::{ @@ -92,11 +92,10 @@ pub fn uddsketch_combine_inner( in_aggregate_context(fcinfo, || match (state1, state2) { (None, None) => None, (None, Some(state2)) => Some(state2.clone().into()), - (Some(state1), None) => Some(state1.clone().into()), - (Some(state1), Some(state2)) => { - let mut sketch = state1.clone(); - sketch.merge_sketch(&state2); - Some(sketch.into()) + (Some(state1), None) => Some(state1.into()), + (Some(mut state1), Some(state2)) => { + state1.merge_sketch(&state2); + Some(state1.into()) } }) } @@ -151,6 +150,7 @@ impl From for UddSketchInternal { sketch.max_buckets as u64, sketch.alpha, sketch.compactions as u64, + sketch.num_buckets, sketch.count, sketch.sum, sketch.keys(), @@ -309,6 +309,7 @@ impl<'input> UddSketch<'input> { self.max_buckets as u64, self.alpha, self.compactions, + self.num_buckets, self.count, self.sum, self.keys(), @@ -520,15 +521,22 @@ pub fn uddsketch_compound_trans_inner( ) -> Option> { unsafe { in_aggregate_context(fcinfo, || { - let value = match value { - None => return state, - Some(value) => value.to_uddsketch(), + let Some(value) = value else { return state }; + let Some(mut state) = state else { + return Some(value.to_uddsketch().into()); }; - let mut state = match state { - None => return Some(value.into()), - Some(state) => state, - }; - state.merge_sketch(&value); + + let keys = value.keys(); + let counts = value.counts(); + state.merge_items( + keys, + counts, + value.alpha, + value.compactions, + value.count, + value.sum, + value.max_buckets, + ); state.into() }) } diff --git a/extension/timescaledb_toolkit.control b/extension/timescaledb_toolkit.control index 959f97f3..0d99c0e1 100644 --- a/extension/timescaledb_toolkit.control +++ b/extension/timescaledb_toolkit.control @@ -5,4 +5,4 @@ superuser = false module_pathname = '$libdir/timescaledb_toolkit' # only for testing, will be removed for real installs # comma-separated list of previous versions this version can be upgraded from # directly. This is used to generate upgrade scripts. -# upgradeable_from = '1.6.0, 1.7.0, 1.8.0, 1.10.0-dev, 1.10.1, 1.11.0, 1.12.0, 1.12.1, 1.13.0, 1.13.1, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0' +# upgradeable_from = '1.6.0, 1.7.0, 1.8.0, 1.10.0-dev, 1.10.1, 1.11.0, 1.12.0, 1.12.1, 1.13.0, 1.13.1, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0, 1.19.0'