Skip to content

Commit ef0696c

Browse files
Compact using stack
1 parent fe4554c commit ef0696c

File tree

1 file changed

+99
-30
lines changed

1 file changed

+99
-30
lines changed

crates/udd-sketch/src/lib.rs

Lines changed: 99 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
55
use std::collections::hash_map::Entry;
66
use std::collections::HashMap;
77

8-
use crate::SketchHashKey::Zero;
8+
use crate::SketchHashKey::{Invalid, Zero};
99
#[cfg(test)]
1010
use ordered_float::OrderedFloat;
1111
#[cfg(test)]
@@ -223,39 +223,108 @@ impl SketchHashMap {
223223
self.map.len()
224224
}
225225

226-
// Combine adjacent buckets
227-
fn compact(&mut self) {
228-
let mut target = self.head;
229-
// TODO can we do without this additional map?
230-
let old_map = std::mem::take(&mut self.map);
231-
232-
self.head = self.head.compact_key();
233-
234-
while target != SketchHashKey::Invalid {
235-
let old_entry = &old_map[&target];
236-
let new_key = target.compact_key();
237-
// it doesn't matter where buckets are absolutely, their relative
238-
// positions will remain unchanged unless two buckets are compacted
239-
// together
240-
let new_next = if old_entry.next.compact_key() == new_key {
241-
// the old `next` bucket is going to be compacted into the same
242-
// one as `target`
243-
old_map[&old_entry.next].next.compact_key()
226+
/// Combine adjacent buckets using the stack.
227+
fn compact_using_stack<const N: usize>(&mut self) {
228+
let len = self.map.len();
229+
debug_assert!(len <= N);
230+
let mut entries = [(SketchHashKey::Invalid, 0); N];
231+
let mut drain = self.map.drain();
232+
233+
for e in entries.iter_mut() {
234+
if let Some((key, entry)) = drain.next() {
235+
*e = (key.compact_key(), entry.count);
244236
} else {
245-
old_entry.next.compact_key()
246-
};
247-
self.map
248-
.entry(new_key)
249-
.or_insert(SketchHashEntry {
250-
count: 0,
251-
next: new_next,
252-
})
253-
.count += old_entry.count;
254-
target = old_map[&target].next;
237+
break;
238+
}
255239
}
240+
drop(drain);
241+
242+
self.populate_map_using_iter(&mut entries[0..len])
256243
}
257-
}
258244

245+
/// This function will populate the backing map using the provided slice.
246+
/// It will sort and aggregate, so the caller does not need to take care
247+
/// of that.
248+
/// However, this should really only be called to populate the empty map.
249+
fn populate_map_using_iter(&mut self, entries: &mut [(SketchHashKey, u64)]) {
250+
assert!(
251+
self.map.is_empty(),
252+
"SketchHashMap should be empty when populating using a slice"
253+
);
254+
if entries.is_empty() {
255+
return;
256+
}
257+
258+
// To build up the linked list, we can do so by calling `entry_upsert` for every call
259+
// to the `HashMap`. `entry_upsert` however needs to walk the map though to figure
260+
// out where to place a key, therefore, we switch to:
261+
// - sort
262+
// - aggregate
263+
// - insert
264+
// That's what we do here
265+
266+
// - sort
267+
entries.sort_unstable_by_key(|e| e.0);
268+
269+
// - aggregate
270+
let mut old_index = 0;
271+
let mut current = entries[0];
272+
for idx in 1..entries.len() {
273+
let next = entries[idx];
274+
if next.0 == current.0 {
275+
current.1 += next.1;
276+
} else {
277+
entries[old_index] = current;
278+
current = next;
279+
old_index += 1;
280+
}
281+
}
282+
283+
// Final one
284+
entries[old_index] = current;
285+
286+
// We should only return the slice containing the aggregated values
287+
let iter = entries.into_iter().take(old_index + 1).peekable();
288+
289+
let mut iter = iter.peekable();
290+
self.head = iter.peek().map(|p| p.0).unwrap_or(Invalid);
291+
292+
// - insert
293+
while let Some((key, count)) = iter.next() {
294+
self.map.insert(
295+
*key,
296+
SketchHashEntry {
297+
count: *count,
298+
next: iter.peek().map(|p| p.0).unwrap_or(Invalid),
299+
},
300+
);
301+
}
302+
}
303+
304+
#[inline]
305+
fn compact(&mut self) {
306+
match self.len() {
307+
0 => return,
308+
// PERCENTILE_AGG_DEFAULT_SIZE defaults to 200, so
309+
// this entry covers that case.
310+
1..=200 => self.compact_using_stack::<200>(),
311+
201..=1000 => self.compact_using_stack::<1000>(),
312+
1001..=5000 => self.compact_using_stack::<5000>(),
313+
_ => self.compact_using_heap(),
314+
}
315+
}
316+
317+
// Combine adjacent buckets
318+
fn compact_using_heap(&mut self) {
319+
let mut entries = Vec::with_capacity(self.map.len());
320+
321+
// By draining the `HashMap`, we can reuse the same piece of memory after we're done.
322+
// We're only using the `Vec` for a very short-lived period of time.
323+
entries.extend(self.map.drain().map(|e| (e.0.compact_key(), e.1.count)));
324+
325+
self.populate_map_using_iter(&mut entries)
326+
}
327+
}
259328
#[derive(Clone, Debug, PartialEq)]
260329
pub struct UDDSketch {
261330
buckets: SketchHashMap,

0 commit comments

Comments
 (0)