Skip to content

Commit 66f9ce1

Browse files
Tighten up constraints on UDDSketch config
Buckets should be within 1 and 44 million (practical PostgreSQL limit for column size), which is better caught by a NonZeroU32. Compactions are limited to at most 65. Using a u8 for that allows some further information in the struct without requiring more memory.
1 parent 1322336 commit 66f9ce1

File tree

2 files changed

+89
-41
lines changed

2 files changed

+89
-41
lines changed

crates/udd-sketch/src/lib.rs

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use std::collections::HashMap;
77

88
#[cfg(test)]
99
use ordered_float::OrderedFloat;
10+
use serde::{Deserialize, Serialize};
1011
#[cfg(test)]
1112
use std::collections::HashSet;
12-
use serde::{Deserialize, Serialize};
13+
use std::num::NonZeroU32;
1314

1415
#[cfg(test)]
1516
extern crate quickcheck;
@@ -57,9 +58,13 @@ impl PartialOrd for SketchHashKey {
5758
/// `UDDSketchMetadata` was created to avoid passing along many parameters
5859
/// to function calls.
5960
pub struct UDDSketchMetadata {
60-
pub max_buckets: u32,
61+
// `max_buckets` is limited to `PostgreSQL`'s limit: At most 1GB per column.
62+
// As we need 24 bytes to represent any bucket, there are at most something like
63+
// 44 million values possible. It can however never be actually zero.
64+
pub max_buckets: NonZeroU32,
6165
pub current_error: f64,
62-
pub compactions: u32,
66+
// Compactions are limited to at most 65
67+
pub compactions: u8,
6368
pub values: u64,
6469
pub sum: f64,
6570
}
@@ -259,14 +264,14 @@ pub struct UDDSketch {
259264
buckets: SketchHashMap,
260265
alpha: f64,
261266
gamma: f64,
262-
compactions: u32, // should always be smaller than 64
263-
max_buckets: u64,
267+
compactions: u8, // should always be smaller than 64
268+
max_buckets: NonZeroU32, // PostgreSQL limit is 1GB per column, which is roughly 44 million buckets
264269
num_values: u64,
265270
values_sum: f64,
266271
}
267272

268273
impl UDDSketch {
269-
pub fn new(max_buckets: u64, initial_error: f64) -> Self {
274+
pub fn new(max_buckets: NonZeroU32, initial_error: f64) -> Self {
270275
assert!((1e-12..1.0).contains(&initial_error));
271276
UDDSketch {
272277
buckets: SketchHashMap::new(),
@@ -290,7 +295,7 @@ impl UDDSketch {
290295
alpha: metadata.current_error,
291296
gamma: gamma(metadata.current_error),
292297
compactions: metadata.compactions,
293-
max_buckets: metadata.max_buckets as u64,
298+
max_buckets: metadata.max_buckets,
294299
num_values: metadata.values,
295300
values_sum: metadata.sum,
296301
};
@@ -343,7 +348,7 @@ impl UDDSketch {
343348
pub fn add_value(&mut self, value: f64) {
344349
self.buckets.increment(self.key(value));
345350

346-
while self.buckets.len() > self.max_buckets as usize {
351+
while self.buckets.len() > self.max_buckets.get() as usize {
347352
self.compact_buckets();
348353
}
349354

@@ -370,7 +375,7 @@ impl UDDSketch {
370375
.abs()
371376
< 1e-9 // f64::EPSILON too small, see issue #396
372377
);
373-
debug_assert_eq!(self.max_buckets, other.max_buckets as u64);
378+
debug_assert_eq!(self.max_buckets, other.max_buckets);
374379

375380
if other.values == 0 {
376381
return;
@@ -389,7 +394,7 @@ impl UDDSketch {
389394
self.buckets.entry_upsert(key, count);
390395
}
391396

392-
while self.buckets.len() > self.max_buckets as usize {
397+
while self.buckets.len() > self.max_buckets.get() as usize {
393398
self.compact_buckets();
394399
}
395400

@@ -433,19 +438,19 @@ impl UDDSketch {
433438
self.buckets.entry_upsert(key, value);
434439
}
435440

436-
while self.buckets.len() > self.max_buckets as usize {
441+
while self.buckets.len() > self.max_buckets.get() as usize {
437442
self.compact_buckets();
438443
}
439444

440445
self.num_values += other.num_values;
441446
self.values_sum += other.values_sum;
442447
}
443448

444-
pub fn max_allowed_buckets(&self) -> u64 {
445-
self.max_buckets
449+
pub fn max_allowed_buckets(&self) -> u32 {
450+
self.max_buckets.get()
446451
}
447452

448-
pub fn times_compacted(&self) -> u32 {
453+
pub fn times_compacted(&self) -> u8 {
449454
self.compactions
450455
}
451456

@@ -591,7 +596,7 @@ mod tests {
591596

592597
#[test]
593598
fn build_and_add_values() {
594-
let mut sketch = UDDSketch::new(20, 0.1);
599+
let mut sketch = UDDSketch::new(NonZeroU32::new(20).unwrap(), 0.1);
595600
sketch.add_value(1.0);
596601
sketch.add_value(3.0);
597602
sketch.add_value(0.5);
@@ -603,7 +608,7 @@ mod tests {
603608

604609
#[test]
605610
fn exceed_buckets() {
606-
let mut sketch = UDDSketch::new(20, 0.1);
611+
let mut sketch = UDDSketch::new(NonZeroU32::new(20).unwrap(), 0.1);
607612
sketch.add_value(1.1); // Bucket #1
608613
sketch.add_value(400.0); // Bucket #30
609614
let a2 = 0.2 / 1.01;
@@ -641,7 +646,7 @@ mod tests {
641646
}
642647

643648
let metadata = UDDSketchMetadata {
644-
max_buckets: other.max_buckets as u32,
649+
max_buckets: other.max_buckets,
645650
current_error: other.alpha,
646651
compactions: other.compactions,
647652
values: other.num_values,
@@ -662,7 +667,7 @@ mod tests {
662667
let a4 = 2.0 * a3 / (1.0 + f64::powi(a3, 2)); // alpha for 3 compactions
663668
let a5 = 2.0 * a4 / (1.0 + f64::powi(a4, 2)); // alpha for 4 compactions
664669

665-
let mut sketch1 = UDDSketch::new(20, 0.1);
670+
let mut sketch1 = UDDSketch::new(NonZeroU32::new(20).unwrap(), 0.1);
666671
sketch1.add_value(1.1); // Bucket #1
667672
sketch1.add_value(1.5); // Bucket #3
668673
sketch1.add_value(1.6); // Bucket #3
@@ -672,7 +677,7 @@ mod tests {
672677
assert_eq!(sketch1.count(), 5);
673678
assert_eq!(sketch1.max_error(), a1);
674679

675-
let mut sketch2 = UDDSketch::new(20, 0.1);
680+
let mut sketch2 = UDDSketch::new(NonZeroU32::new(20).unwrap(), 0.1);
676681
sketch2.add_value(5.1); // Bucket #9
677682
sketch2.add_value(7.5); // Bucket #11
678683
sketch2.add_value(10.6); // Bucket #12
@@ -685,7 +690,7 @@ mod tests {
685690
assert_eq!(sketch1.count(), 10);
686691
assert_eq!(sketch1.max_error(), a1);
687692

688-
let mut sketch3 = UDDSketch::new(20, 0.1);
693+
let mut sketch3 = UDDSketch::new(NonZeroU32::new(20).unwrap(), 0.1);
689694
sketch3.add_value(0.8); // Bucket #-1
690695
sketch3.add_value(3.7); // Bucket #7
691696
sketch3.add_value(15.2); // Bucket #14
@@ -698,7 +703,7 @@ mod tests {
698703
assert_eq!(sketch1.count(), 15);
699704
assert_eq!(sketch1.max_error(), a1);
700705

701-
let mut sketch4 = UDDSketch::new(20, 0.1);
706+
let mut sketch4 = UDDSketch::new(NonZeroU32::new(20).unwrap(), 0.1);
702707
sketch4.add_value(400.0); // Bucket #30
703708
sketch4.add_value(0.004); // Bucket #-27
704709
sketch4.add_value(0.0); // Zero Bucket
@@ -716,7 +721,7 @@ mod tests {
716721
assert_eq!(sketch1.count(), 24);
717722
assert_eq!(sketch1.max_error(), a2);
718723

719-
let mut sketch5 = UDDSketch::new(20, 0.1);
724+
let mut sketch5 = UDDSketch::new(NonZeroU32::new(20).unwrap(), 0.1);
720725
for i in 100..220 {
721726
sketch5.add_value(1.23_f64.powi(i));
722727
}
@@ -731,7 +736,7 @@ mod tests {
731736

732737
#[test]
733738
fn test_quantile_and_value_estimates() {
734-
let mut sketch = UDDSketch::new(50, 0.1);
739+
let mut sketch = UDDSketch::new(NonZeroU32::new(50).unwrap(), 0.1);
735740
for v in 1..=10000 {
736741
sketch.add_value(v as f64 / 100.0);
737742
}
@@ -771,7 +776,7 @@ mod tests {
771776

772777
#[test]
773778
fn test_extreme_quantile_at_value() {
774-
let mut sketch = UDDSketch::new(50, 0.1);
779+
let mut sketch = UDDSketch::new(NonZeroU32::new(50).unwrap(), 0.1);
775780
for v in 1..=10000 {
776781
sketch.add_value(v as f64 / 100.0);
777782
}
@@ -786,7 +791,7 @@ mod tests {
786791

787792
#[test]
788793
fn random_stress() {
789-
let mut sketch = UDDSketch::new(1000, 0.01);
794+
let mut sketch = UDDSketch::new(NonZeroU32::new(1000).unwrap(), 0.01);
790795
let seed = rand::thread_rng().gen();
791796
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
792797
let mut bounds = Vec::new();
@@ -917,7 +922,7 @@ mod tests {
917922
if master.len() < 100 {
918923
return TestResult::discard();
919924
}
920-
let mut sketch = UDDSketch::new(100, 0.000001);
925+
let mut sketch = UDDSketch::new(NonZeroU32::new(100).unwrap(), 0.000001);
921926
for value in &master {
922927
sketch.add_value(*value);
923928
}

extension/src/uddsketch.rs

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
use pgrx::*;
2+
use std::num::NonZeroU32;
23

34
use encodings::{delta, prefix_varint};
45

56
use uddsketch::{SketchHashKey, UDDSketch as UddSketchInternal, UDDSketchMetadata};
67

8+
const ERROR_UDDSKETCH_ZERO_BUCKETS: &str = "uddsketch size most be 1 or more";
9+
const ERROR_COMPACTIONS_LIMITED: &str = "compactions are limited to at most 65";
10+
711
use crate::{
812
accessors::{
913
AccessorApproxPercentile, AccessorApproxPercentileRank, AccessorError, AccessorMean,
@@ -25,12 +29,20 @@ pub fn uddsketch_trans(
2529
value: Option<f64>,
2630
fcinfo: pg_sys::FunctionCallInfo,
2731
) -> Option<Internal> {
28-
uddsketch_trans_inner(unsafe { state.to_inner() }, size, max_error, value, fcinfo).internal()
32+
uddsketch_trans_inner(
33+
unsafe { state.to_inner() },
34+
NonZeroU32::new(u32::try_from(size).expect(ERROR_UDDSKETCH_ZERO_BUCKETS))
35+
.expect(ERROR_UDDSKETCH_ZERO_BUCKETS),
36+
max_error,
37+
value,
38+
fcinfo,
39+
)
40+
.internal()
2941
}
3042

3143
pub fn uddsketch_trans_inner(
3244
state: Option<Inner<UddSketchInternal>>,
33-
size: i32,
45+
size: NonZeroU32,
3446
max_error: f64,
3547
value: Option<f64>,
3648
fcinfo: pg_sys::FunctionCallInfo,
@@ -42,7 +54,7 @@ pub fn uddsketch_trans_inner(
4254
Some(value) => value,
4355
};
4456
let mut state = match state {
45-
None => UddSketchInternal::new(size as u64, max_error).into(),
57+
None => UddSketchInternal::new(size, max_error).into(),
4658
Some(state) => state,
4759
};
4860
state.add_value(value);
@@ -51,7 +63,7 @@ pub fn uddsketch_trans_inner(
5163
}
5264
}
5365

54-
const PERCENTILE_AGG_DEFAULT_SIZE: u32 = 200;
66+
const PERCENTILE_AGG_DEFAULT_SIZE: NonZeroU32 = NonZeroU32::new(200).unwrap();
5567
const PERCENTILE_AGG_DEFAULT_ERROR: f64 = 0.001;
5668

5769
// transition function for the simpler percentile_agg aggregate, which doesn't
@@ -135,9 +147,9 @@ impl From<&UddSketchInternal> for SerializedUddSketch {
135147
let buckets = compress_buckets(sketch.bucket_iter());
136148
SerializedUddSketch {
137149
alpha: sketch.max_error(),
138-
max_buckets: sketch.max_allowed_buckets() as u32,
150+
max_buckets: sketch.max_allowed_buckets(),
139151
num_buckets: sketch.current_buckets_count() as u32,
140-
compactions: sketch.times_compacted(),
152+
compactions: sketch.times_compacted() as u32,
141153
count: sketch.count(),
142154
sum: sketch.sum(),
143155
buckets,
@@ -149,9 +161,10 @@ impl From<SerializedUddSketch> for UddSketchInternal {
149161
fn from(sketch: SerializedUddSketch) -> Self {
150162
UddSketchInternal::new_from_data(
151163
&UDDSketchMetadata {
152-
max_buckets: sketch.max_buckets,
164+
max_buckets: NonZeroU32::new(sketch.max_buckets)
165+
.expect(ERROR_UDDSKETCH_ZERO_BUCKETS),
153166
current_error: sketch.alpha,
154-
compactions: sketch.compactions,
167+
compactions: u8::try_from(sketch.compactions).expect(ERROR_COMPACTIONS_LIMITED),
155168
values: sketch.count,
156169
sum: sketch.sum,
157170
},
@@ -308,9 +321,9 @@ impl<'input> UddSketch<'input> {
308321

309322
fn metadata(&self) -> UDDSketchMetadata {
310323
UDDSketchMetadata {
311-
max_buckets: self.max_buckets,
324+
max_buckets: NonZeroU32::new(self.max_buckets).expect(ERROR_UDDSKETCH_ZERO_BUCKETS),
312325
current_error: self.alpha,
313-
compactions: self.compactions as u32,
326+
compactions: u8::try_from(self.compactions).expect(ERROR_COMPACTIONS_LIMITED),
314327
values: self.count,
315328
sum: self.sum,
316329
}
@@ -1262,11 +1275,41 @@ mod tests {
12621275
fn uddsketch_byte_io_test() {
12631276
unsafe {
12641277
use std::ptr;
1265-
let state = uddsketch_trans_inner(None, 100, 0.005, Some(14.0), ptr::null_mut());
1266-
let state = uddsketch_trans_inner(state, 100, 0.005, Some(18.0), ptr::null_mut());
1267-
let state = uddsketch_trans_inner(state, 100, 0.005, Some(22.7), ptr::null_mut());
1268-
let state = uddsketch_trans_inner(state, 100, 0.005, Some(39.42), ptr::null_mut());
1269-
let state = uddsketch_trans_inner(state, 100, 0.005, Some(-43.0), ptr::null_mut());
1278+
let state = uddsketch_trans_inner(
1279+
None,
1280+
NonZeroU32::new(100).unwrap(),
1281+
0.005,
1282+
Some(14.0),
1283+
ptr::null_mut(),
1284+
);
1285+
let state = uddsketch_trans_inner(
1286+
state,
1287+
NonZeroU32::new(100).unwrap(),
1288+
0.005,
1289+
Some(18.0),
1290+
ptr::null_mut(),
1291+
);
1292+
let state = uddsketch_trans_inner(
1293+
state,
1294+
NonZeroU32::new(100).unwrap(),
1295+
0.005,
1296+
Some(22.7),
1297+
ptr::null_mut(),
1298+
);
1299+
let state = uddsketch_trans_inner(
1300+
state,
1301+
NonZeroU32::new(100).unwrap(),
1302+
0.005,
1303+
Some(39.42),
1304+
ptr::null_mut(),
1305+
);
1306+
let state = uddsketch_trans_inner(
1307+
state,
1308+
NonZeroU32::new(100).unwrap(),
1309+
0.005,
1310+
Some(-43.0),
1311+
ptr::null_mut(),
1312+
);
12701313

12711314
let control = state.unwrap();
12721315
let buffer = uddsketch_serialize(Inner::from(control.clone()).internal().unwrap());

0 commit comments

Comments
 (0)