Skip to content

Commit ebf1912

Browse files
committed
feat: debounce notifications
1 parent f4ddb2b commit ebf1912

File tree

5 files changed

+180
-0
lines changed

5 files changed

+180
-0
lines changed

src/debouncer.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
//! # Debouncer for notifications.
2+
//!
3+
//! Sometimes the client application may be reinstalled
4+
//! while keeping the notification token.
5+
//! In this case the same token is stored twice
6+
//! for the same mailbox on a chatmail relay
7+
//! and is notified twice for the same message.
8+
//! Since it is not possible for the chatmail relay
9+
//! to deduplicate the tokens in this case
10+
//! as only the notification gateway
11+
//! can decrypt them, notification gateway needs
12+
//! to debounce notifications to the same token.
13+
14+
use std::cmp::Reverse;
15+
use std::collections::{BinaryHeap, HashSet};
16+
use std::sync::RwLock;
17+
use std::time::{Duration, Instant};
18+
19+
#[derive(Default)]
20+
pub(crate) struct Debouncer {
21+
state: RwLock<DebouncerState>,
22+
}
23+
24+
#[derive(Default)]
25+
struct DebouncerState {
26+
/// Set of recently notified tokens.
27+
///
28+
/// The tokens are stored in plaintext,
29+
/// not hashed or encrypted.
30+
/// No token is stored for a long time anyway.
31+
tokens: HashSet<String>,
32+
33+
/// Binary heap storing tokens
34+
/// sorted by the timestamp of the recent notifications.
35+
///
36+
/// `Reverse` is used to turn max-heap into min-heap.
37+
heap: BinaryHeap<Reverse<(Instant, String)>>,
38+
}
39+
40+
impl DebouncerState {
41+
/// Removes old entries for tokens that can be notified again.
42+
fn cleanup(&mut self, now: Instant) {
43+
loop {
44+
let Some(Reverse((timestamp, token))) = self.heap.pop() else {
45+
debug_assert!(self.tokens.is_empty());
46+
break;
47+
};
48+
49+
if now.duration_since(timestamp) < Duration::from_secs(1) {
50+
self.heap.push(Reverse((timestamp, token)));
51+
break;
52+
}
53+
54+
self.tokens.remove(&token);
55+
}
56+
}
57+
58+
#[cfg(test)]
59+
fn is_debounced(&mut self, now: Instant, token: &String) -> bool {
60+
self.cleanup(now);
61+
self.tokens.contains(token)
62+
}
63+
64+
fn notify(&mut self, now: Instant, token: String) -> bool {
65+
self.cleanup(now);
66+
let inserted = self.tokens.insert(token.clone());
67+
if inserted {
68+
self.heap.push(Reverse((now, token)));
69+
}
70+
!inserted
71+
}
72+
73+
fn count(&self) -> usize {
74+
let res = self.tokens.len();
75+
debug_assert_eq!(res, self.heap.len());
76+
res
77+
}
78+
}
79+
80+
impl Debouncer {
81+
/// Returns true if the token was notified recently
82+
/// and should not be notified again.
83+
#[cfg(test)]
84+
pub(crate) fn is_debounced(&self, now: Instant, token: &String) -> bool {
85+
let mut state = self.state.write().unwrap();
86+
state.is_debounced(now, token)
87+
}
88+
89+
/// Returns true if notification should be sent,
90+
/// false if the token is currently debounced.
91+
pub(crate) fn notify(&self, now: Instant, token: String) -> bool {
92+
self.state.write().unwrap().notify(now, token)
93+
}
94+
95+
/// Returns number of currently debounced notification tokens.
96+
///
97+
/// This is used for metrics to display the size of the set.
98+
///
99+
/// This function does not remove expired tokens.
100+
pub(crate) fn count(&self) -> usize {
101+
self.state.read().unwrap().count()
102+
}
103+
}
104+
105+
#[cfg(test)]
106+
mod tests {
107+
use super::*;
108+
109+
#[test]
110+
fn test_debouncer() {
111+
let mut now = Instant::now();
112+
113+
let debouncer = Debouncer::default();
114+
115+
let token1 = "foobar".to_string();
116+
let token2 = "barbaz".to_string();
117+
118+
assert!(!debouncer.is_debounced(now, &token1));
119+
assert!(!debouncer.is_debounced(now, &token2));
120+
assert_eq!(debouncer.count(), 0);
121+
122+
debouncer.notify(now, token1.clone());
123+
124+
assert!(debouncer.is_debounced(now, &token1));
125+
assert!(!debouncer.is_debounced(now, &token2));
126+
assert_eq!(debouncer.count(), 1);
127+
128+
now += Duration::from_secs(5);
129+
130+
assert!(!debouncer.is_debounced(now, &token1));
131+
assert!(!debouncer.is_debounced(now, &token2));
132+
assert_eq!(debouncer.count(), 0);
133+
}
134+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod debouncer;
12
pub mod metrics;
23
pub mod notifier;
34
mod openpgp;

src/metrics.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ pub struct Metrics {
3030
/// Number of successfully sent visible UBports notifications.
3131
pub ubports_notifications_total: Counter,
3232

33+
/// Number of debounced notifications.
34+
pub debounced_notifications_total: Counter,
35+
36+
/// Number of tokens notified recently.
37+
pub debounced_set_size: Gauge<i64, AtomicI64>,
38+
3339
/// Number of successfully sent heartbeat notifications.
3440
pub heartbeat_notifications_total: Counter,
3541

@@ -68,6 +74,20 @@ impl Metrics {
6874
ubports_notifications_total.clone(),
6975
);
7076

77+
let debounced_notifications_total = Counter::default();
78+
registry.register(
79+
"debounced_notifications",
80+
"Number of debounced notifications",
81+
debounced_notifications_total.clone(),
82+
);
83+
84+
let debounced_set_size = Gauge::<i64, AtomicI64>::default();
85+
registry.register(
86+
"debounced_set_size",
87+
"Number of tokens notified recently.",
88+
debounced_set_size.clone(),
89+
);
90+
7191
let heartbeat_notifications_total = Counter::default();
7292
registry.register(
7393
"heartbeat_notifications",
@@ -101,6 +121,8 @@ impl Metrics {
101121
direct_notifications_total,
102122
fcm_notifications_total,
103123
ubports_notifications_total,
124+
debounced_notifications_total,
125+
debounced_set_size,
104126
heartbeat_notifications_total,
105127
heartbeat_registrations_total,
106128
heartbeat_tokens,

src/server.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use chrono::{Local, TimeDelta};
1010
use log::*;
1111
use serde::Deserialize;
1212
use std::str::FromStr;
13+
use std::time::Instant;
1314

1415
use crate::metrics::Metrics;
1516
use crate::state::State;
@@ -293,6 +294,20 @@ async fn notify_device(
293294
}
294295

295296
info!("Got direct notification for {device_token}.");
297+
let now = Instant::now();
298+
if !state.debouncer().notify(now, device_token.clone()) {
299+
// Token is debounced.
300+
let metrics = state.metrics();
301+
metrics.debounced_notifications_total.inc();
302+
metrics
303+
.debounced_set_size
304+
.set(state.debouncer().count() as i64);
305+
return Ok(StatusCode::OK);
306+
}
307+
state
308+
.metrics()
309+
.debounced_set_size
310+
.set(state.debouncer().count() as i64);
296311
let device_token: NotificationToken = device_token.as_str().parse()?;
297312

298313
let status_code = match device_token {

src/state.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::time::Duration;
66
use a2::{Client, Endpoint};
77
use anyhow::{Context as _, Result};
88

9+
use crate::debouncer::Debouncer;
910
use crate::metrics::Metrics;
1011
use crate::openpgp::PgpDecryptor;
1112
use crate::schedule::Schedule;
@@ -36,6 +37,8 @@ pub struct InnerState {
3637
/// Decryptor for incoming tokens
3738
/// storing the secret keyring inside.
3839
openpgp_decryptor: PgpDecryptor,
40+
41+
debouncer: Debouncer,
3942
}
4043

4144
impl State {
@@ -88,6 +91,7 @@ impl State {
8891
interval,
8992
fcm_authenticator,
9093
openpgp_decryptor,
94+
debouncer: Default::default(),
9195
}),
9296
})
9397
}
@@ -134,4 +138,8 @@ impl State {
134138
pub fn openpgp_decryptor(&self) -> &PgpDecryptor {
135139
&self.inner.openpgp_decryptor
136140
}
141+
142+
pub(crate) fn debouncer(&self) -> &Debouncer {
143+
&self.inner.debouncer
144+
}
137145
}

0 commit comments

Comments
 (0)