Skip to content

Commit 1bc5009

Browse files
Optimization: merge UDDSketch using iterators
The previous implementation would create a UDDSketch (with a backing HashMap) for every possible merge, and then call `compact_buckets` on that in order to ensure the number of compactions between the target and the source were equal. Profiling this, we found out that in a `rollup` call of a lot of data, the `compact_buckets` was pretty much the main contributor to all the CPU time. However, if we merge a different sketch into this sketch, we don't need to actually compact_buckets all the time, we can directly consume the keys and counts, and apply some compact_key calls to it. This prevents a lot of heap allocations, as compact_buckets does a fully copy of the backing `HashMap`, and then rebuilding it. For a particular workload, this reduced the execution time from 30 to 12 seconds.
1 parent d9f7143 commit 1bc5009

File tree

2 files changed

+85
-34
lines changed

2 files changed

+85
-34
lines changed

crates/udd-sketch/src/lib.rs

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ impl std::cmp::PartialOrd for SketchHashKey {
4646
}
4747
}
4848

49+
/// `UDDSketchMetadata` was created to avoid passing along many parameters
50+
/// to function calls.
51+
pub struct UDDSketchMetadata {
52+
pub max_buckets: u32,
53+
pub current_error: f64,
54+
pub compactions: u32,
55+
pub values: u64,
56+
pub sum: f64,
57+
}
4958
impl SketchHashKey {
5059
/// This is the key corresponding to the current key after the SketchHashMap it refers to has gone through one compaction.
5160
/// Note that odd buckets get combined with the bucket after them (i.e. old buckets -3 and -2 become new bucket -1, {-1, 0} -> 0, {1, 2} -> 1)
@@ -211,22 +220,18 @@ impl UDDSketch {
211220

212221
// This constructor is used to recreate a UddSketch from it's component data
213222
pub fn new_from_data(
214-
max_buckets: u64,
215-
current_error: f64,
216-
compactions: u64,
217-
values: u64,
218-
sum: f64,
223+
metadata: &UDDSketchMetadata,
219224
keys: impl Iterator<Item = SketchHashKey>,
220225
counts: impl Iterator<Item = u64>,
221226
) -> Self {
222227
let mut sketch = UDDSketch {
223228
buckets: SketchHashMap::new(),
224-
alpha: current_error,
225-
gamma: gamma(current_error),
226-
compactions: compactions as u32,
227-
max_buckets,
228-
num_values: values,
229-
values_sum: sum,
229+
alpha: metadata.current_error,
230+
gamma: gamma(metadata.current_error),
231+
compactions: metadata.compactions,
232+
max_buckets: metadata.max_buckets as u64,
233+
num_values: metadata.values,
234+
values_sum: metadata.sum,
230235
};
231236
// TODO
232237
let keys: Vec<_> = keys.collect();
@@ -285,6 +290,51 @@ impl UDDSketch {
285290
self.values_sum += value;
286291
}
287292

293+
/// `merge_items` will merge these values into the current sketch
294+
/// it requires less memory than `merge_sketch`, as that needs a fully serialized
295+
/// `UDDSketch`, whereas this function relies on iterators to do its job.
296+
pub fn merge_items(
297+
&mut self,
298+
other: &UDDSketchMetadata,
299+
mut keys: impl Iterator<Item = SketchHashKey>,
300+
mut counts: impl Iterator<Item = u64>,
301+
) {
302+
let other_gamma = gamma(other.current_error);
303+
// Require matching initial parameters
304+
debug_assert!(
305+
(self
306+
.gamma
307+
.powf(1.0 / f64::powi(2.0, self.compactions as i32))
308+
- other_gamma.powf(1.0 / f64::powi(2.0, other.compactions as i32)))
309+
.abs()
310+
< 1e-9 // f64::EPSILON too small, see issue #396
311+
);
312+
debug_assert_eq!(self.max_buckets, other.max_buckets as u64);
313+
314+
if other.values == 0 {
315+
return;
316+
}
317+
318+
while self.compactions < other.compactions {
319+
self.compact_buckets();
320+
}
321+
322+
let extra_compactions = self.compactions - other.compactions;
323+
while let (Some(mut key), Some(count)) = (keys.next(), counts.next()) {
324+
for _ in 0..extra_compactions {
325+
key = key.compact_key();
326+
}
327+
self.buckets.entry(key).count += count;
328+
}
329+
330+
while self.buckets.len() > self.max_buckets as usize {
331+
self.compact_buckets();
332+
}
333+
334+
self.num_values += other.values;
335+
self.values_sum += other.sum;
336+
}
337+
288338
pub fn merge_sketch(&mut self, other: &UDDSketch) {
289339
// Require matching initial parameters
290340
assert!(

extension/src/uddsketch.rs

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use pgrx::*;
22

33
use encodings::{delta, prefix_varint};
44

5-
use uddsketch::{SketchHashKey, UDDSketch as UddSketchInternal};
5+
use uddsketch::{SketchHashKey, UDDSketch as UddSketchInternal, UDDSketchMetadata};
66

77
use crate::{
88
accessors::{
@@ -148,11 +148,13 @@ impl From<&UddSketchInternal> for SerializedUddSketch {
148148
impl From<SerializedUddSketch> for UddSketchInternal {
149149
fn from(sketch: SerializedUddSketch) -> Self {
150150
UddSketchInternal::new_from_data(
151-
sketch.max_buckets as u64,
152-
sketch.alpha,
153-
sketch.compactions as u64,
154-
sketch.count,
155-
sketch.sum,
151+
&UDDSketchMetadata {
152+
max_buckets: sketch.max_buckets,
153+
current_error: sketch.alpha,
154+
compactions: sketch.compactions,
155+
values: sketch.count,
156+
sum: sketch.sum,
157+
},
156158
sketch.keys(),
157159
sketch.counts(),
158160
)
@@ -304,16 +306,18 @@ impl<'input> UddSketch<'input> {
304306
)
305307
}
306308

309+
fn metadata(&self) -> UDDSketchMetadata {
310+
UDDSketchMetadata {
311+
max_buckets: self.max_buckets,
312+
current_error: self.alpha,
313+
compactions: self.compactions as u32,
314+
values: self.count,
315+
sum: self.sum,
316+
}
317+
}
318+
307319
fn to_uddsketch(&self) -> UddSketchInternal {
308-
UddSketchInternal::new_from_data(
309-
self.max_buckets as u64,
310-
self.alpha,
311-
self.compactions,
312-
self.count,
313-
self.sum,
314-
self.keys(),
315-
self.counts(),
316-
)
320+
UddSketchInternal::new_from_data(&self.metadata(), self.keys(), self.counts())
317321
}
318322

319323
fn from_internal(state: &UddSketchInternal) -> Self {
@@ -520,15 +524,12 @@ pub fn uddsketch_compound_trans_inner(
520524
) -> Option<Inner<UddSketchInternal>> {
521525
unsafe {
522526
in_aggregate_context(fcinfo, || {
523-
let value = match value {
524-
None => return state,
525-
Some(value) => value.to_uddsketch(),
526-
};
527-
let mut state = match state {
528-
None => return Some(value.into()),
529-
Some(state) => state,
527+
let Some(value) = value else { return state };
528+
let Some(mut state) = state else {
529+
return Some(value.to_uddsketch().into());
530530
};
531-
state.merge_sketch(&value);
531+
532+
state.merge_items(&value.metadata(), value.keys(), value.counts());
532533
state.into()
533534
})
534535
}

0 commit comments

Comments
 (0)