Skip to content

Commit 1323473

Browse files
committed
graph: Add MovingStats to track duration statistics over a moving window
1 parent 4cf2d0d commit 1323473

File tree

2 files changed

+194
-0
lines changed

2 files changed

+194
-0
lines changed

graph/src/util/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ pub mod security;
1010
pub mod lfu_cache;
1111

1212
pub mod error;
13+
14+
pub mod stats;

graph/src/util/stats.rs

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
use std::collections::VecDeque;
2+
use std::time::{Duration, Instant};
3+
4+
/// One bin of durations. The bin starts at time `start`, and we've added `count`
5+
/// entries to it whose durations add up to `duration`
6+
struct Bin {
7+
start: Instant,
8+
duration: Duration,
9+
count: u32,
10+
}
11+
12+
impl Bin {
13+
fn new(start: Instant) -> Self {
14+
Self {
15+
start,
16+
duration: Duration::from_millis(0),
17+
count: 0,
18+
}
19+
}
20+
21+
/// Add a new measurement to the bin
22+
fn add(&mut self, duration: Duration) {
23+
self.count += 1;
24+
self.duration += duration;
25+
}
26+
27+
/// Remove the measurements for `other` from this bin. Only used to
28+
/// keep a running total of measurements in `MovingStats`
29+
fn remove(&mut self, other: &Bin) {
30+
self.count -= other.count;
31+
self.duration -= other.duration;
32+
}
33+
34+
/// Return `true` if the average of measurements in this bin is above
35+
/// `duration`
36+
fn average_gt(&self, duration: Duration) -> bool {
37+
// Compute self.duration / self.count > duration as
38+
// self.duration > duration * self.count. If the RHS
39+
// oveflows, we assume the average would have been smaller
40+
// than any duration
41+
duration
42+
.checked_mul(self.count)
43+
.map(|rhs| self.duration > rhs)
44+
.unwrap_or(false)
45+
}
46+
}
47+
48+
/// Collect statistics over a moving window of size `window_size`. To keep
49+
/// the amount of memory needed to store the values inside the window
50+
/// constant, values are put into bins of size `bin_size`. For example, using
51+
/// a `window_size` of 5 minutes and a bin size of one second would use
52+
/// 300 bins. Each bin has constant size
53+
pub struct MovingStats {
54+
window_size: Duration,
55+
bin_size: Duration,
56+
/// The buffer with measurements. The back has the most recent entries,
57+
/// and the front has the oldest entries
58+
bins: VecDeque<Bin>,
59+
/// Sum over the values in `elements` The `start` of this bin
60+
/// is meaningless
61+
total: Bin,
62+
}
63+
64+
impl MovingStats {
65+
pub fn new(window_size: Duration, bin_size: Duration) -> Self {
66+
let capacity = if bin_size.as_millis() > 0 {
67+
window_size.as_millis() as usize / bin_size.as_millis() as usize
68+
} else {
69+
1
70+
};
71+
MovingStats {
72+
window_size,
73+
bin_size,
74+
bins: VecDeque::with_capacity(capacity),
75+
total: Bin::new(Instant::now()),
76+
}
77+
}
78+
79+
/// Return `true` if the average of measurements in within `window_size`
80+
/// is above `duration`
81+
pub fn average_gt(&self, duration: Duration) -> bool {
82+
// Depending on how often add() is called, we should
83+
// call expire_bins first, but that would require taking a
84+
// `&mut self`
85+
self.total.average_gt(duration)
86+
}
87+
88+
/// Return the average over the current window in milliseconds
89+
pub fn average(&self) -> Option<Duration> {
90+
self.total.duration.checked_div(self.total.count)
91+
}
92+
93+
pub fn add(&mut self, duration: Duration) {
94+
self.add_at(Instant::now(), duration);
95+
}
96+
97+
/// Add an entry with the given timestamp. Note that the entry will
98+
/// still be added either to the current latest bin or a new
99+
/// latest bin. It is expected that subsequent calls to `add_at` still
100+
/// happen with monotonically increasing `now` values. If the `now`
101+
/// values do not monotonically increase, the average calculation
102+
/// becomes imprecise because values are expired later than they
103+
/// should be.
104+
pub fn add_at(&mut self, now: Instant, duration: Duration) {
105+
let need_new_bin = self
106+
.bins
107+
.back()
108+
.map(|bin| now.saturating_duration_since(bin.start) >= self.bin_size)
109+
.unwrap_or(true);
110+
if need_new_bin {
111+
self.bins.push_back(Bin::new(now));
112+
}
113+
self.expire_bins(now);
114+
// unwrap is fine because we just added a bin if there wasn't one
115+
// before
116+
let bin = self.bins.back_mut().unwrap();
117+
bin.add(duration);
118+
self.total.add(duration);
119+
}
120+
121+
fn expire_bins(&mut self, now: Instant) {
122+
while self
123+
.bins
124+
.front()
125+
.map(|existing| now.saturating_duration_since(existing.start) >= self.window_size)
126+
.unwrap_or(false)
127+
{
128+
self.bins.pop_front().map(|existing| {
129+
self.total.remove(&existing);
130+
});
131+
}
132+
}
133+
134+
pub fn duration(&self) -> Duration {
135+
self.total.duration
136+
}
137+
}
138+
139+
#[cfg(test)]
140+
mod tests {
141+
use super::*;
142+
use std::time::{Duration, Instant};
143+
144+
#[allow(dead_code)]
145+
fn dump_bin(msg: &str, bin: &Bin, start: Instant) {
146+
println!(
147+
"bin[{}]: age={}ms count={} duration={}ms",
148+
msg,
149+
bin.start.saturating_duration_since(start).as_millis(),
150+
bin.count,
151+
bin.duration.as_millis()
152+
);
153+
}
154+
155+
#[test]
156+
fn add_one_const() {
157+
let mut stats = MovingStats::new(Duration::from_secs(5), Duration::from_secs(1));
158+
let start = Instant::now();
159+
for i in 0..10 {
160+
stats.add_at(start + Duration::from_secs(i), Duration::from_secs(1));
161+
}
162+
assert_eq!(5, stats.bins.len());
163+
for (i, bin) in stats.bins.iter().enumerate() {
164+
assert_eq!(1, bin.count);
165+
assert_eq!(Duration::from_secs(1), bin.duration);
166+
assert_eq!(Duration::from_secs(i as u64 + 5), (bin.start - start));
167+
}
168+
assert_eq!(5, stats.total.count);
169+
assert_eq!(Duration::from_secs(5), stats.total.duration);
170+
assert!(stats.average_gt(Duration::from_millis(900)));
171+
assert!(!stats.average_gt(Duration::from_secs(1)));
172+
}
173+
174+
#[test]
175+
fn add_four_linear() {
176+
let mut stats = MovingStats::new(Duration::from_secs(5), Duration::from_secs(1));
177+
let start = Instant::now();
178+
for i in 0..40 {
179+
stats.add_at(
180+
start + Duration::from_millis(250 * i),
181+
Duration::from_secs(i),
182+
);
183+
}
184+
assert_eq!(5, stats.bins.len());
185+
for (b, bin) in stats.bins.iter().enumerate() {
186+
assert_eq!(4, bin.count);
187+
assert_eq!(Duration::from_secs(86 + 16 * b as u64), bin.duration);
188+
}
189+
assert_eq!(20, stats.total.count);
190+
assert_eq!(Duration::from_secs(5 * 86 + 16 * 10), stats.total.duration);
191+
}
192+
}

0 commit comments

Comments
 (0)