@@ -5,7 +5,7 @@ use serde::{Deserialize, Serialize};
55use std:: collections:: hash_map:: Entry ;
66use std:: collections:: HashMap ;
77
8- use crate :: SketchHashKey :: Zero ;
8+ use crate :: SketchHashKey :: { Invalid , Zero } ;
99#[ cfg( test) ]
1010use ordered_float:: OrderedFloat ;
1111#[ cfg( test) ]
@@ -223,39 +223,108 @@ impl SketchHashMap {
223223 self . map . len ( )
224224 }
225225
226- // Combine adjacent buckets
227- fn compact ( & mut self ) {
228- let mut target = self . head ;
229- // TODO can we do without this additional map?
230- let old_map = std:: mem:: take ( & mut self . map ) ;
231-
232- self . head = self . head . compact_key ( ) ;
233-
234- while target != SketchHashKey :: Invalid {
235- let old_entry = & old_map[ & target] ;
236- let new_key = target. compact_key ( ) ;
237- // it doesn't matter where buckets are absolutely, their relative
238- // positions will remain unchanged unless two buckets are compacted
239- // together
240- let new_next = if old_entry. next . compact_key ( ) == new_key {
241- // the old `next` bucket is going to be compacted into the same
242- // one as `target`
243- old_map[ & old_entry. next ] . next . compact_key ( )
226+ /// Combine adjacent buckets using the stack.
227+ fn compact_using_stack < const N : usize > ( & mut self ) {
228+ let len = self . map . len ( ) ;
229+ debug_assert ! ( len <= N ) ;
230+ let mut entries = [ ( SketchHashKey :: Invalid , 0 ) ; N ] ;
231+ let mut drain = self . map . drain ( ) ;
232+
233+ for e in entries. iter_mut ( ) {
234+ if let Some ( ( key, entry) ) = drain. next ( ) {
235+ * e = ( key. compact_key ( ) , entry. count ) ;
244236 } else {
245- old_entry. next . compact_key ( )
246- } ;
247- self . map
248- . entry ( new_key)
249- . or_insert ( SketchHashEntry {
250- count : 0 ,
251- next : new_next,
252- } )
253- . count += old_entry. count ;
254- target = old_map[ & target] . next ;
237+ break ;
238+ }
255239 }
240+ drop ( drain) ;
241+
242+ self . populate_map_using_iter ( & mut entries[ 0 ..len] )
256243 }
257- }
258244
245+ /// This function will populate the backing map using the provided slice.
246+ /// It will sort and aggregate, so the caller does not need to take care
247+ /// of that.
248+ /// However, this should really only be called to populate the empty map.
249+ fn populate_map_using_iter ( & mut self , entries : & mut [ ( SketchHashKey , u64 ) ] ) {
250+ assert ! (
251+ self . map. is_empty( ) ,
252+ "SketchHashMap should be empty when populating using a slice"
253+ ) ;
254+ if entries. is_empty ( ) {
255+ return ;
256+ }
257+
258+ // To build up the linked list, we can do so by calling `entry_upsert` for every call
259+ // to the `HashMap`. `entry_upsert` however needs to walk the map though to figure
260+ // out where to place a key, therefore, we switch to:
261+ // - sort
262+ // - aggregate
263+ // - insert
264+ // That's what we do here
265+
266+ // - sort
267+ entries. sort_unstable_by_key ( |e| e. 0 ) ;
268+
269+ // - aggregate
270+ let mut old_index = 0 ;
271+ let mut current = entries[ 0 ] ;
272+ for idx in 1 ..entries. len ( ) {
273+ let next = entries[ idx] ;
274+ if next. 0 == current. 0 {
275+ current. 1 += next. 1 ;
276+ } else {
277+ entries[ old_index] = current;
278+ current = next;
279+ old_index += 1 ;
280+ }
281+ }
282+
283+ // Final one
284+ entries[ old_index] = current;
285+
286+ // We should only return the slice containing the aggregated values
287+ let iter = entries. into_iter ( ) . take ( old_index + 1 ) . peekable ( ) ;
288+
289+ let mut iter = iter. peekable ( ) ;
290+ self . head = iter. peek ( ) . map ( |p| p. 0 ) . unwrap_or ( Invalid ) ;
291+
292+ // - insert
293+ while let Some ( ( key, count) ) = iter. next ( ) {
294+ self . map . insert (
295+ * key,
296+ SketchHashEntry {
297+ count : * count,
298+ next : iter. peek ( ) . map ( |p| p. 0 ) . unwrap_or ( Invalid ) ,
299+ } ,
300+ ) ;
301+ }
302+ }
303+
304+ #[ inline]
305+ fn compact ( & mut self ) {
306+ match self . len ( ) {
307+ 0 => return ,
308+ // PERCENTILE_AGG_DEFAULT_SIZE defaults to 200, so
309+ // this entry covers that case.
310+ 1 ..=200 => self . compact_using_stack :: < 200 > ( ) ,
311+ 201 ..=1000 => self . compact_using_stack :: < 1000 > ( ) ,
312+ 1001 ..=5000 => self . compact_using_stack :: < 5000 > ( ) ,
313+ _ => self . compact_using_heap ( ) ,
314+ }
315+ }
316+
317+ // Combine adjacent buckets
318+ fn compact_using_heap ( & mut self ) {
319+ let mut entries = Vec :: with_capacity ( self . map . len ( ) ) ;
320+
321+ // By draining the `HashMap`, we can reuse the same piece of memory after we're done.
322+ // We're only using the `Vec` for a very short-lived period of time.
323+ entries. extend ( self . map . drain ( ) . map ( |e| ( e. 0 . compact_key ( ) , e. 1 . count ) ) ) ;
324+
325+ self . populate_map_using_iter ( & mut entries)
326+ }
327+ }
259328#[ derive( Clone , Debug , PartialEq ) ]
260329pub struct UDDSketch {
261330 buckets : SketchHashMap ,
0 commit comments