Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cargo/config.toml
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,18 @@ This changelog should be updated as part of a PR if the work is worth noting (mo

**Full Changelog**: [TODO]

## [1.19.0](https://github.com/timescale/timescaledb-toolkit/releases/tag/1.19.0) (2024-11-14)

#### New experimental features

#### Bug fixes

#### Other notable changes

#### Shout-outs

**Full Changelog**: [TODO]

## [1.18.0](https://github.com/timescale/timescaledb-toolkit/releases/tag/1.18.0) (2023-11-28)

#### New experimental features
Expand Down
173 changes: 154 additions & 19 deletions crates/udd-sketch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
//! Based on the paper: https://arxiv.org/abs/2004.08604

use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;

use crate::SketchHashKey::{Invalid, Zero};
#[cfg(test)]
use ordered_float::OrderedFloat;
#[cfg(test)]
use std::collections::HashSet;

#[cfg(test)]
extern crate quickcheck;
#[cfg(test)]
Expand All @@ -26,8 +29,8 @@ pub enum SketchHashKey {
}

// Invalid is treated as greater than valid values (making it a nice boundary value for list end)
impl std::cmp::PartialOrd for SketchHashKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
impl Ord for SketchHashKey {
fn cmp(&self, other: &Self) -> Ordering {
use self::SketchHashKey::*;
use std::cmp::Ordering::*;
match (self, other) {
Expand All @@ -37,14 +40,19 @@ impl std::cmp::PartialOrd for SketchHashKey {
(Zero, Zero) => Equal,
(Positive(a), Positive(b)) => a.cmp(b),
(Negative(a), Negative(b)) => a.cmp(b).reverse(),
(_, Positive(_)) => Less,
(Positive(_), _) => Greater,
(_, Negative(_)) => Greater,
(Negative(_), _) => Less,
(Negative(_) | Zero, Positive(_)) => Less,
(Positive(_), Negative(_) | Zero) => Greater,
(Zero, Negative(_)) => Greater,
(Negative(_), Zero) => Less,
}
.into()
}
}
impl PartialOrd for SketchHashKey {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl SketchHashKey {
/// This is the key corresponding to the current key after the SketchHashMap it refers to has gone through one compaction.
Expand All @@ -57,7 +65,7 @@ impl SketchHashKey {
Positive(i64::MAX) => *self,
Negative(x) => Negative(if x > 0 { x + 1 } else { x } / 2),
Positive(x) => Positive(if x > 0 { x + 1 } else { x } / 2),
x => x, // Zero and Invalid don't compact
Invalid | Zero => *self, // Zero and Invalid don't compact
}
}
}
Expand Down Expand Up @@ -153,6 +161,65 @@ impl SketchHashMap {

// Combine adjacent buckets
fn compact(&mut self) {
self.compact_with_swap(&mut Vec::new())
}

/// `compact_with_swap` will reuse the provided swap Vec. When
/// this function is called in a loop, this reuse of the Vec ensures
/// we dont malloc/free multiple times, but reuse that piece of memory
fn compact_with_swap(&mut self, swap: &mut Vec<(SketchHashKey, u64)>) {
debug_assert!(swap.is_empty());
swap.reserve(self.map.len());

for (k, v) in self.map.drain() {
swap.push((k.compact_key(), v.count));
}

// We need to sort the Vec as we want to recreate the linked list style
// in the Hash Map
// We use the `unstable` variant, as it does not allocate, and its
// properties are fine for our use-case, and should perform
// better than the non-stable variant.
// > This sort is unstable (i.e., may reorder equal elements),
// > in-place (i.e., does not allocate), and O(n * log(n)) worst-case.
swap.sort_unstable_by_key(|k| k.0);

let mut swap_iter = swap.drain(..);

let Some(mut current) = swap_iter.next() else {
return;
};

while let Some(next) = swap_iter.next() {
// This combines those buckets that compact into the same one
// For example, Positive(9) and Positive(8) both
// compact into Positive(4)
if current.0 == next.0 {
current.1 += next.1;
} else {
self.map.insert(
current.0,
SketchHashEntry {
count: current.1,
next: next.0,
},
);
current = next;
}
}

// And the final one ...
self.map.insert(
current.0,
SketchHashEntry {
count: current.1,
next: Invalid,
},
);
self.head = self.head.compact_key();
}
// Combine adjacent buckets (former implementation)
fn compact_old(&mut self) {
let mut target = self.head;
// TODO can we do without this additional map?
let old_map = std::mem::take(&mut self.map);
Expand Down Expand Up @@ -260,8 +327,8 @@ impl UDDSketch {
key(value, self.gamma)
}

pub fn compact_buckets(&mut self) {
self.buckets.compact();
pub fn compact_buckets(&mut self, swap: &mut Vec<(SketchHashKey, u64)>) {
self.buckets.compact_with_swap(swap);

self.compactions += 1;
self.gamma *= self.gamma; // See https://arxiv.org/pdf/2004.08604.pdf Equation 3
Expand All @@ -278,7 +345,7 @@ impl UDDSketch {
self.buckets.increment(self.key(value));

while self.buckets.len() > self.max_buckets as usize {
self.compact_buckets();
self.compact_buckets(&mut Vec::new());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could reuse a vec here as well

}

self.num_values += 1;
Expand Down Expand Up @@ -307,22 +374,36 @@ impl UDDSketch {
return;
}

let mut other = other.clone();

while self.compactions > other.compactions {
other.compact_buckets();
}
while other.compactions > self.compactions {
self.compact_buckets();
}
// We can reuse this Heap allocated Vec every time
// we compact.
let mut swap = Vec::new();

let mut tmp: UDDSketch;
// We only need to fully clone the other sketch
// if we need to compact it. Not doing it
// is useful, as it doesn't require us to
// allocate any more memory.
// We optimize here, as this is code is called frequently
let other = if self.compactions > other.compactions {
tmp = other.clone();
while self.compactions > tmp.compactions {
tmp.compact_buckets(&mut swap);
}
&tmp
} else {
while other.compactions > self.compactions {
self.compact_buckets(&mut swap);
}
other
};

for entry in other.buckets.iter() {
let (key, value) = entry;
self.buckets.entry(key).count += value;
}

while self.buckets.len() > self.max_buckets as usize {
self.compact_buckets();
self.compact_buckets(&mut swap);
}

self.num_values += other.num_values;
Expand Down Expand Up @@ -473,6 +554,8 @@ pub fn gamma(alpha: f64) -> f64 {

#[cfg(test)]
mod tests {
use std::{thread, time::Duration};

use rand::{Rng, SeedableRng};

use super::*;
Expand Down Expand Up @@ -513,6 +596,58 @@ mod tests {
assert_eq!(sketch.max_error(), a2);
}

// A temporary test to ensure we do the right thing.
#[ignore]
#[test]
fn merge_buckets_compact_implementations() {
let mut sketch1 = UDDSketch::new(20, 0.1);
sketch1.add_value(1.1); // Bucket #1
sketch1.add_value(1.5); // Bucket #3
sketch1.add_value(1.6); // Bucket #3
sketch1.add_value(1.3); // Bucket #2
sketch1.add_value(4.2); // Bucket #8

let mut sketch2 = sketch1.clone();

sketch1.buckets.compact_old();
sketch2.buckets.compact_with_swap(&mut Vec::new());

assert_eq!(sketch1.buckets.head, sketch2.buckets.head);

let mut b1 = sketch1.buckets.map.into_iter().collect::<Vec<_>>();
let mut b2 = sketch2.buckets.map.into_iter().collect::<Vec<_>>();

b1.sort_unstable_by_key(|k| k.0);
b2.sort_unstable_by_key(|k| k.0);
assert_eq!(b1, b2);
}

#[test]
fn merge_sketches_malloc() {
let mut sketch1 = UDDSketch::new(20, 0.1);
sketch1.add_value(1.1); // Bucket #1
sketch1.add_value(1.5); // Bucket #3
sketch1.add_value(1.6); // Bucket #3
sketch1.add_value(1.3); // Bucket #2
sketch1.add_value(4.2); // Bucket #8

let mut sketch2 = UDDSketch::new(20, 0.1);
let mut sketch3 = UDDSketch::new(20, 0.1);
let mut a = 1.0;
let mut b = 0.008;
for _ in 0..200 {
a *= 1.33;
b *= 2.987;
sketch2.add_value(a);
sketch3.add_value(b);
}

for _ in 0..10_000 {
sketch1.merge_sketch(&sketch2);
sketch1.merge_sketch(&sketch3);
}
}

#[test]
fn merge_sketches() {
let a1 = 0.1; // alpha for up to 20 buckets
Expand Down
4 changes: 2 additions & 2 deletions docs/tdigest.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ TimescaleDB Toolkit provides an implementation of the [t-digest data structure](

## Details <a id="tdigest-details"></a>

Timescale's t-digest is implemented as an aggregate function in PostgreSQL. They do not support moving-aggregate mode, and are not ordered-set aggregates. Presently they are restricted to float values, but the goal is to make them polymorphic. They are partializable and are good candidates for [continuous aggregation](https://docs.timescale.com/latest/using-timescaledb/continuous-aggregates).
Timescale's t-digest is implemented as an aggregate function in PostgreSQL. They do not support moving-aggregate mode, and are not ordered-set aggregates. Presently they are restricted to float values, but the goal is to make them polymorphic. They are partializable and are good candidates for [continuous aggregation](https://docs.timescale.com/use-timescale/latest/continuous-aggregates/).

One additional thing to note about TDigests is that they are somewhat dependent on the order of inputs. The percentile approximations should be nearly equal for the same underlying data, especially at the extremes of the quantile range where the TDigest is inherently more accurate, they are unlikely to be identical if built in a different order. While this should have little effect on the accuracy of the estimates, it is worth noting that repeating the creation of the TDigest might have subtle differences if the call is being parallelized by Postgres. Similarly, building a TDigest by combining several subdigests using the [summary aggregate](#tdigest-summary) is likely to produce a subtley different result than combining all of the underlying data using a single [point aggregate](#tdigest).

Expand Down Expand Up @@ -94,7 +94,7 @@ FROM high_temp;
```

## Example Using TimeScale Continuous Aggregates (tdigest-cagg-example)
Timescale [continuous aggregates](https://docs.timescale.com/latest/using-timescaledb/continuous-aggregates)
Timescale [continuous aggregates](https://docs.timescale.com/use-timescale/latest/continuous-aggregates/)
provide an easy way to keep a tdigest up to date as more data is added to a table. The following example
shows how this might look in practice. The first step is to create a Timescale hypertable to store our data.

Expand Down
4 changes: 2 additions & 2 deletions extension/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "timescaledb_toolkit"
version = "1.19.0-dev"
version = "1.20.0-dev"
edition = "2021"

[[bin]]
Expand All @@ -11,7 +11,7 @@ path = "./src/bin/pgrx_embed.rs"
crate-type = ["cdylib", "lib"]

[features]
default = ["pg16"]
default = ["pg17"]
pg12 = ["pgrx/pg12", "pgrx-tests/pg12"]
pg13 = ["pgrx/pg13", "pgrx-tests/pg13"]
pg14 = ["pgrx/pg14", "pgrx-tests/pg14"]
Expand Down
2 changes: 1 addition & 1 deletion extension/timescaledb_toolkit.control
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ superuser = false
module_pathname = '$libdir/timescaledb_toolkit' # only for testing, will be removed for real installs
# comma-separated list of previous versions this version can be upgraded from
# directly. This is used to generate upgrade scripts.
# upgradeable_from = '1.6.0, 1.7.0, 1.8.0, 1.10.0-dev, 1.10.1, 1.11.0, 1.12.0, 1.12.1, 1.13.0, 1.13.1, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0'
# upgradeable_from = '1.6.0, 1.7.0, 1.8.0, 1.10.0-dev, 1.10.1, 1.11.0, 1.12.0, 1.12.1, 1.13.0, 1.13.1, 1.14.0, 1.15.0, 1.16.0, 1.17.0, 1.18.0, 1.19.0'
Loading