Skip to content

Commit e063bb3

Browse files
committed
feat: debounce notifications
1 parent f4ddb2b commit e063bb3

File tree

5 files changed

+136
-0
lines changed

5 files changed

+136
-0
lines changed

src/debouncer.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
//! # Debouncer for notifications.
2+
//!
3+
//! Sometimes the client application may be reinstalled
4+
//! while keeping the notification token.
5+
//! In this case the same token is stored twice
6+
//! for the same mailbox on a chatmail relay
7+
//! and is notified twice for the same message.
8+
//! Since it is not possible for the chatmail relay
9+
//! to deduplicate the tokens in this case
10+
//! as only the notification gateway
11+
//! can decrypt them, notification gateway needs
12+
//! to debounce notifications to the same token.
13+
14+
use std::cmp::Reverse;
15+
use std::collections::{BinaryHeap, HashSet};
16+
use std::sync::Mutex;
17+
use std::time::{Duration, Instant};
18+
19+
#[derive(Default)]
20+
pub(crate) struct Debouncer {
21+
state: Mutex<DebouncerState>,
22+
}
23+
24+
#[derive(Default)]
25+
struct DebouncerState {
26+
/// Set of recently notified tokens.
27+
///
28+
/// The tokens are stored in plaintext,
29+
/// not hashed or encrypted.
30+
/// No token is stored for a long time anyway.
31+
tokens: HashSet<String>,
32+
33+
/// Binary heap storing tokens
34+
/// sorted by the timestamp of the recent notifications.
35+
///
36+
/// `Reverse` is used to turn max-heap into min-heap.
37+
heap: BinaryHeap<Reverse<(Instant, String)>>,
38+
}
39+
40+
impl DebouncerState {
41+
/// Removes old entries for tokens that can be notified again.
42+
fn cleanup(&mut self, now: Instant) {
43+
loop {
44+
let Some(Reverse((timestamp, token))) = self.heap.pop() else {
45+
break;
46+
};
47+
48+
if now.duration_since(timestamp) < Duration::from_secs(1) {
49+
self.heap.push(Reverse((timestamp, token)));
50+
break;
51+
}
52+
53+
self.tokens.remove(&token);
54+
}
55+
}
56+
57+
/// Returns true if the token was notified recently
58+
/// and should not be notified again.
59+
fn is_debounced(&mut self, now: Instant, token: &String) -> bool {
60+
self.cleanup(now);
61+
self.tokens.contains(token)
62+
}
63+
64+
fn notify(&mut self, now: Instant, token: String) {
65+
if self.tokens.insert(token.clone()) {
66+
self.heap.push(Reverse((now, token)));
67+
}
68+
}
69+
}
70+
71+
impl Debouncer {
72+
pub(crate) fn is_debounced(&self, now: Instant, token: &String) -> bool {
73+
let mut state = self.state.lock().unwrap();
74+
state.is_debounced(now, token)
75+
}
76+
77+
pub(crate) fn notify(&self, now: Instant, token: String) {
78+
self.state.lock().unwrap().notify(now, token);
79+
}
80+
}
81+
82+
#[cfg(test)]
83+
mod tests {
84+
use super::*;
85+
86+
#[test]
87+
fn test_debouncer() {
88+
let mut now = Instant::now();
89+
90+
let debouncer = Debouncer::default();
91+
92+
let token1 = "foobar".to_string();
93+
let token2 = "barbaz".to_string();
94+
95+
assert!(!debouncer.is_debounced(now, &token1));
96+
assert!(!debouncer.is_debounced(now, &token2));
97+
98+
debouncer.notify(now, token1.clone());
99+
100+
assert!(debouncer.is_debounced(now, &token1));
101+
assert!(!debouncer.is_debounced(now, &token2));
102+
103+
now += Duration::from_secs(5);
104+
105+
assert!(!debouncer.is_debounced(now, &token1));
106+
assert!(!debouncer.is_debounced(now, &token2));
107+
}
108+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod debouncer;
12
pub mod metrics;
23
pub mod notifier;
34
mod openpgp;

src/metrics.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ pub struct Metrics {
3030
/// Number of successfully sent visible UBports notifications.
3131
pub ubports_notifications_total: Counter,
3232

33+
/// Number of debounced notifications.
34+
pub debounced_notifications_total: Counter,
35+
3336
/// Number of successfully sent heartbeat notifications.
3437
pub heartbeat_notifications_total: Counter,
3538

@@ -68,6 +71,13 @@ impl Metrics {
6871
ubports_notifications_total.clone(),
6972
);
7073

74+
let debounced_notifications_total = Counter::default();
75+
registry.register(
76+
"debounced_notifications",
77+
"Number of debounced notifications",
78+
debounced_notifications_total.clone(),
79+
);
80+
7181
let heartbeat_notifications_total = Counter::default();
7282
registry.register(
7383
"heartbeat_notifications",
@@ -101,6 +111,7 @@ impl Metrics {
101111
direct_notifications_total,
102112
fcm_notifications_total,
103113
ubports_notifications_total,
114+
debounced_notifications_total,
104115
heartbeat_notifications_total,
105116
heartbeat_registrations_total,
106117
heartbeat_tokens,

src/server.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use chrono::{Local, TimeDelta};
1010
use log::*;
1111
use serde::Deserialize;
1212
use std::str::FromStr;
13+
use std::time::Instant;
1314

1415
use crate::metrics::Metrics;
1516
use crate::state::State;
@@ -293,6 +294,13 @@ async fn notify_device(
293294
}
294295

295296
info!("Got direct notification for {device_token}.");
297+
let now = Instant::now();
298+
if state.debouncer().is_debounced(now, &device_token) {
299+
let metrics = state.metrics();
300+
metrics.debounced_notifications_total.inc();
301+
return Ok(StatusCode::OK);
302+
}
303+
state.debouncer().notify(now, device_token.clone());
296304
let device_token: NotificationToken = device_token.as_str().parse()?;
297305

298306
let status_code = match device_token {

src/state.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::time::Duration;
66
use a2::{Client, Endpoint};
77
use anyhow::{Context as _, Result};
88

9+
use crate::debouncer::Debouncer;
910
use crate::metrics::Metrics;
1011
use crate::openpgp::PgpDecryptor;
1112
use crate::schedule::Schedule;
@@ -36,6 +37,8 @@ pub struct InnerState {
3637
/// Decryptor for incoming tokens
3738
/// storing the secret keyring inside.
3839
openpgp_decryptor: PgpDecryptor,
40+
41+
debouncer: Debouncer,
3942
}
4043

4144
impl State {
@@ -88,6 +91,7 @@ impl State {
8891
interval,
8992
fcm_authenticator,
9093
openpgp_decryptor,
94+
debouncer: Default::default(),
9195
}),
9296
})
9397
}
@@ -134,4 +138,8 @@ impl State {
134138
pub fn openpgp_decryptor(&self) -> &PgpDecryptor {
135139
&self.inner.openpgp_decryptor
136140
}
141+
142+
pub(crate) fn debouncer(&self) -> &Debouncer {
143+
&self.inner.debouncer
144+
}
137145
}

0 commit comments

Comments
 (0)