Skip to content

Commit 08880e0

Browse files
committed
Support NaN
1 parent c102ecc commit 08880e0

File tree

2 files changed

+33
-13
lines changed

2 files changed

+33
-13
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,11 @@ The algorithm’s correctness is validated under the [C++11 memory model](https:
8181

8282
### NaN Support
8383

84-
The current implementation panics on `NaN` inputs to `Histogram::observe`, but it can be trivially supported by adding a `NaN` bucket after the `+Inf` bucket, ignored in collection results.
84+
`NaN` values are supported via a dedicated `NaN` bucket placed after `+Inf`, removed from the collection result.
8585

8686
## Discussion
8787

88-
The Go implementation could be improved by using the `NaN` bucket trick presented above, and compute `_count` as the sum of non-`NaN` buckets. This eliminates the `_count` atomic, reducing observation to three atomic RMW — matching this algorithm.
88+
The Go implementation could be improved by computing `_count` as the sum of all buckets (and using the `NaN` bucket trick presented above). This eliminates the `_count` atomic, reducing observation to three atomic RMW — matching this algorithm.
8989

9090
However, it lacks cache locality, which [benchmarks](benches/README.md) show has a significant impact under contention.
9191

src/lib.rs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,9 @@ impl<T: HistogramValue> Histogram<T> {
7474
}
7575

7676
pub fn observe(&self, value: T) {
77-
let bucket_index = self.0.buckets.iter().position(|b| value <= *b).unwrap();
77+
let bucket_index = self.0.buckets.iter().position(|b| value <= *b);
7878
let hot_shard = self.0.hot_shard.load(Ordering::Relaxed) as usize;
79-
self.0.shards[hot_shard].observe(value, bucket_index, &self.0.waker);
79+
self.0.shards[hot_shard].observe(value, bucket_index, self.0.buckets.len(), &self.0.waker);
8080
}
8181

8282
pub fn collect(&self) -> (u64, T, Vec<(T, u64)>) {
@@ -134,7 +134,8 @@ struct Shard<T> {
134134
impl<T: HistogramValue> Shard<T> {
135135
fn new(bucket_count: usize) -> Self {
136136
Self {
137-
counters: (0..(bucket_count + 2).div_ceil(COUNTERS_PER_CACHE_LINE))
137+
// `+ 3` for _count, _sum and `NaN` bucket
138+
counters: (0..(bucket_count + 3).div_ceil(COUNTERS_PER_CACHE_LINE))
138139
.map(|_| CachePadded::new(array::from_fn(|_| AtomicU64::new(0))))
139140
.collect(),
140141
_phantom: PhantomData,
@@ -150,8 +151,8 @@ impl<T: HistogramValue> Shard<T> {
150151
}
151152

152153
fn bucket(&self, bucket_index: usize) -> &AtomicU64 {
153-
let true_index = bucket_index + 2;
154-
&self.counters[true_index / COUNTERS_PER_CACHE_LINE][true_index % COUNTERS_PER_CACHE_LINE]
154+
let idx = bucket_index + 2;
155+
&self.counters[idx / COUNTERS_PER_CACHE_LINE][idx % COUNTERS_PER_CACHE_LINE]
155156
}
156157

157158
fn buckets(&self, bucket_count: usize) -> impl IntoIterator<Item = &AtomicU64> {
@@ -162,8 +163,15 @@ impl<T: HistogramValue> Shard<T> {
162163
.take(bucket_count)
163164
}
164165

165-
fn observe(&self, value: T, bucket_index: usize, waker: &AtomicWaker) {
166-
self.bucket(bucket_index).fetch_add(1, Ordering::Relaxed);
166+
fn observe(
167+
&self,
168+
value: T,
169+
bucket_index: Option<usize>,
170+
bucket_count: usize,
171+
waker: &AtomicWaker,
172+
) {
173+
self.bucket(bucket_index.unwrap_or(bucket_count))
174+
.fetch_add(1, Ordering::Relaxed);
167175
T::atomic_add(self.sum(), value, Ordering::Release);
168176
let count = self.count().fetch_add(1, Ordering::Release);
169177
if count & WAITING_FLAG != 0 {
@@ -183,6 +191,7 @@ impl<T: HistogramValue> Shard<T> {
183191
*count = counter.load(Ordering::Relaxed);
184192
expected_count += *count;
185193
}
194+
expected_count += self.bucket(bucket_count).load(Ordering::Relaxed);
186195
(sum, expected_count)
187196
}
188197

@@ -293,16 +302,27 @@ mod tests {
293302
#[cfg(not(loom))]
294303
#[test]
295304
fn observe_inf() {
296-
let histogram = Histogram::new([0.1, 1.0]);
305+
let histogram = Histogram::new([1.0]);
297306
histogram.observe(f64::INFINITY);
298-
histogram.observe(f64::NEG_INFINITY);
307+
assert_eq!(
308+
histogram.collect(),
309+
(1, f64::INFINITY, vec![(1.0, 0), (f64::INFINITY, 1)])
310+
);
299311
}
300312

301313
#[cfg(not(loom))]
302314
#[test]
303-
#[should_panic]
304315
fn observe_nan() {
305-
let histogram = Histogram::new([0.1, 1.0]);
316+
let histogram = Histogram::new([1.0]);
306317
histogram.observe(f64::NAN);
318+
let (count, sum, buckets) = histogram.collect();
319+
assert_eq!(count, 1);
320+
assert!(sum.is_nan());
321+
assert_eq!(buckets, vec![(1.0, 0), (f64::INFINITY, 0)]);
307322
}
308323
}
324+
325+
#[inline(never)]
326+
pub fn plop(h: Histogram, v: f64) {
327+
h.observe(v);
328+
}

0 commit comments

Comments
 (0)