@@ -248,3 +248,148 @@ where
248248fn lerp ( a : f64 , b : f64 , f : f64 ) -> f64 {
249249 ( a * ( 1.0 - f) ) + ( b * f)
250250}
251+
252+ #[ cfg( test) ]
253+ mod tests {
254+ use std:: sync:: { Arc , Mutex } ;
255+
256+ use crossbeam:: thread;
257+ use ordered_float:: OrderedFloat ;
258+ use rand:: distributions:: { Distribution , Uniform , WeightedIndex } ;
259+ use rand:: rngs:: StdRng ;
260+ use rand:: SeedableRng ;
261+
262+ use super :: { IntoFloat , TDigest } ;
263+
264+ impl IntoFloat for OrderedFloat < f64 > {
265+ fn to_float ( & self ) -> f64 {
266+ self . 0
267+ }
268+ }
269+
270+ // Whether obtained = expected +/- error
271+ fn is_close ( obtained : f64 , expected : f64 , error : f64 ) -> bool {
272+ ( ( expected - error) < obtained) && ( obtained < ( expected + error) )
273+ }
274+
275+ // Checks whether the tdigest follows a uniform distribution.
276+ fn check_tdigest_uniform (
277+ tdigest : & TDigest < OrderedFloat < f64 > > ,
278+ buckets : i32 ,
279+ max : f64 ,
280+ min : f64 ,
281+ error : f64 ,
282+ ) {
283+ for k in 0 ..buckets {
284+ let expected_cdf = ( k as f64 ) / ( buckets as f64 ) ;
285+ let expected_quantile = ( max - min) * expected_cdf + min;
286+
287+ let obtained_cdf = tdigest. cdf ( & OrderedFloat ( expected_quantile) ) ;
288+ let obtained_quantile = tdigest. quantile ( expected_cdf) ;
289+
290+ assert ! ( is_close( obtained_cdf, expected_cdf, error) ) ;
291+ assert ! ( is_close(
292+ obtained_quantile,
293+ expected_quantile,
294+ ( max - min) * error,
295+ ) ) ;
296+ }
297+ }
298+
299+ #[ test]
300+ fn uniform_merge_sequential ( ) {
301+ let buckets = 200 ;
302+ let error = 0.03 ; // 3% absolute error on each quantile; error gets worse near the median.
303+ let mut tdigest = TDigest :: new ( buckets as f64 ) ;
304+
305+ let ( min, max) = ( -1000.0 , 1000.0 ) ;
306+ let uniform_distr = Uniform :: new ( min, max) ;
307+ let mut rng = StdRng :: seed_from_u64 ( 0 ) ;
308+
309+ let batch_size = 1024 ;
310+ let batch_numbers = 64 ;
311+
312+ for _ in 0 ..batch_numbers {
313+ let mut random_numbers = Vec :: with_capacity ( batch_size) ;
314+ for _ in 0 ..batch_size {
315+ let num: f64 = uniform_distr. sample ( & mut rng) ;
316+ random_numbers. push ( OrderedFloat ( num) ) ;
317+ }
318+ tdigest. merge_values ( & random_numbers) ;
319+ }
320+
321+ check_tdigest_uniform ( & tdigest, buckets, max, min, error) ;
322+ }
323+
324+ #[ test]
325+ fn uniform_merge_parallel ( ) {
326+ let buckets = 200 ;
327+ let error = 0.03 ; // 3% absolute error on each quantile, note error is worse near the median.
328+
329+ let ( min, max) = ( -1000.0 , 1000.0 ) ;
330+
331+ let batch_size = 65536 ;
332+ let batch_numbers = 64 ;
333+
334+ let result_tdigest = Arc :: new ( Mutex :: new ( TDigest :: new ( buckets as f64 ) ) ) ;
335+ thread:: scope ( |s| {
336+ for _ in 0 ..batch_numbers {
337+ s. spawn ( |_| {
338+ let mut local_tdigest = TDigest :: new ( buckets as f64 ) ;
339+
340+ let mut random_numbers = Vec :: with_capacity ( batch_size) ;
341+ let uniform_distr = Uniform :: new ( min, max) ;
342+ let mut rng = StdRng :: seed_from_u64 ( 0 ) ;
343+
344+ for _ in 0 ..batch_size {
345+ let num: f64 = uniform_distr. sample ( & mut rng) ;
346+ random_numbers. push ( OrderedFloat ( num) ) ;
347+ }
348+ local_tdigest. merge_values ( & random_numbers) ;
349+
350+ let mut result = result_tdigest. lock ( ) . unwrap ( ) ;
351+ result. merge ( & local_tdigest) ;
352+ } ) ;
353+ }
354+ } )
355+ . unwrap ( ) ;
356+
357+ let tdigest = result_tdigest. lock ( ) . unwrap ( ) ;
358+ check_tdigest_uniform ( & tdigest, buckets, max, min, error) ;
359+ }
360+
361+ #[ test]
362+ fn weighted_merge ( ) {
363+ let buckets = 200 ;
364+ let error = 0.05 ; // 5% absolute error on each quantile, note error is worse near the median.
365+
366+ let mut tdigest = TDigest :: new ( buckets as f64 ) ;
367+
368+ let choices = [ 9.0 , 900.0 , 990.0 , 9990.0 , 190000.0 , 990000.0 ] ;
369+ let weights = [ 1 , 2 , 1 , 3 , 4 , 5 ] ; // Total of 16.
370+ let total_weight: i32 = weights. iter ( ) . sum ( ) ;
371+
372+ let weighted_distr = WeightedIndex :: new ( weights) . unwrap ( ) ;
373+ let mut rng = StdRng :: seed_from_u64 ( 0 ) ;
374+
375+ let batch_size = 128 ;
376+ let batch_numbers = 16 ;
377+
378+ for _ in 0 ..batch_numbers {
379+ let mut random_numbers = Vec :: with_capacity ( batch_size) ;
380+ for _ in 0 ..batch_size {
381+ let num: f64 = choices[ weighted_distr. sample ( & mut rng) ] ;
382+ random_numbers. push ( OrderedFloat ( num) ) ;
383+ }
384+ tdigest. merge_values ( & random_numbers) ;
385+ }
386+
387+ let mut curr_weight = 0 ;
388+ for ( c, w) in choices. iter ( ) . zip ( weights) {
389+ curr_weight += w;
390+ let estimate_cdf = tdigest. cdf ( & OrderedFloat ( * c) ) ;
391+ let obtained_cdf = ( curr_weight as f64 ) / ( total_weight as f64 ) ;
392+ assert ! ( is_close( obtained_cdf, estimate_cdf, error) ) ;
393+ }
394+ }
395+ }
0 commit comments