Skip to content

Commit 924f840

Browse files
committed
Fix tdigest
1 parent 8142308 commit 924f840

File tree

1 file changed

+44
-2
lines changed
  • datafusion/functions-aggregate-common/src

1 file changed

+44
-2
lines changed

datafusion/functions-aggregate-common/src/tdigest.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,13 @@ impl TDigest {
436436

437437
let rank = q * self.count;
438438

439+
// Matches airlift's boundary: rank >= totalWeight - 1 → return max.
440+
// Without this, p90 on sparse data interpolates into a value that was
441+
// never observed (e.g. returns 550 instead of 1000 for a 10-value set).
442+
if rank >= self.count - 1.0 {
443+
return self.max();
444+
}
445+
439446
let mut pos: usize;
440447
let mut t;
441448
if q > 0.5 {
@@ -491,8 +498,28 @@ impl TDigest {
491498
}
492499
}
493500

494-
let value = self.centroids[pos].mean()
495-
+ ((rank - t) / self.centroids[pos].weight() - 0.5) * delta;
501+
// Matches airlift's single-sample guard: when the rank falls between two
502+
// weight-1 centroids, treat them as point masses and snap to the nearer
503+
// one rather than interpolating an invented value between them.
504+
let offset = (rank - t) / self.centroids[pos].weight() - 0.5;
505+
if offset < 0.0 {
506+
// rank is in the left half of pos — the straddling pair is (pos-1, pos)
507+
let left = pos.saturating_sub(1);
508+
if self.centroids[left].weight() == 1.0 && self.centroids[pos].weight() == 1.0
509+
{
510+
return self.centroids[pos].mean();
511+
}
512+
} else {
513+
// rank is in the right half of pos — the straddling pair is (pos, pos+1)
514+
let right = (pos + 1).min(self.centroids.len() - 1);
515+
if self.centroids[pos].weight() == 1.0
516+
&& self.centroids[right].weight() == 1.0
517+
{
518+
return self.centroids[pos].mean();
519+
}
520+
}
521+
522+
let value = self.centroids[pos].mean() + offset * delta;
496523

497524
// In `merge_digests()`: `min` is initialized to Inf, `max` is initialized to -Inf
498525
// and gets updated according to different `TDigest`s
@@ -735,6 +762,21 @@ mod tests {
735762
assert_state_roundtrip!(t);
736763
}
737764

765+
// Verify that sparse datasets produce the same percentile results as Trino.
766+
// Previously DataFusion would interpolate between single-sample centroids,
767+
// returning invented values like 550 (not in the dataset).
768+
// Trino returns: p50=10, p90=1000, p99=1000.
769+
#[test]
770+
fn test_sparse_dataset_matches_trino() {
771+
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0, 1000.0];
772+
let t = TDigest::new(100);
773+
let t = t.merge_unsorted_f64(values);
774+
775+
assert_eq!(t.estimate_quantile(0.5), 10.0);
776+
assert_eq!(t.estimate_quantile(0.9), 1000.0);
777+
assert_eq!(t.estimate_quantile(0.99), 1000.0);
778+
}
779+
738780
#[test]
739781
fn test_size() {
740782
let t = TDigest::new(10);

0 commit comments

Comments
 (0)