Skip to content

Commit 7701e25

Browse files
authored
Basic rate limiting for email notifications (#6)
* Basic rate limiting for email notifications * review fixes, real ip check, check sender and receiver npub
1 parent 608dae1 commit 7701e25

File tree

3 files changed

+252
-5
lines changed

3 files changed

+252
-5
lines changed

src/main.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
mod blossom;
22
mod db;
33
mod notification;
4+
mod rate_limit;
45
mod relay;
56
mod util;
67

@@ -20,18 +21,22 @@ use clap::Parser;
2021
use nostr::types::Url;
2122
use nostr_relay_builder::LocalRelay;
2223
use relay::RelayConfig;
24+
use tokio::sync::Mutex;
2325
use tower_http::{
2426
cors::{Any, CorsLayer},
2527
services::ServeDir,
2628
};
2729
use tracing::{error, info};
2830

29-
use crate::notification::{
30-
email::{
31-
mailjet::{MailjetConfig, MailjetService},
32-
EmailService,
31+
use crate::{
32+
notification::{
33+
email::{
34+
mailjet::{MailjetConfig, MailjetService},
35+
EmailService,
36+
},
37+
notification_store::NotificationStoreApi,
3338
},
34-
notification_store::NotificationStoreApi,
39+
rate_limit::RateLimiter,
3540
};
3641

3742
#[tokio::main]
@@ -120,6 +125,7 @@ struct AppState {
120125
pub file_store: Arc<dyn FileStoreApi>,
121126
pub notification_store: Arc<dyn NotificationStoreApi>,
122127
pub email_service: Arc<dyn EmailService>,
128+
pub rate_limiter: Arc<Mutex<RateLimiter>>,
123129
}
124130

125131
impl AppState {
@@ -142,6 +148,7 @@ impl AppState {
142148
file_store: store.clone(),
143149
notification_store: store,
144150
email_service: Arc::new(email_service),
151+
rate_limiter: Arc::new(Mutex::new(RateLimiter::new())),
145152
})
146153
}
147154
}

src/notification/mod.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::{
1818
email::{build_email_confirmation_message, build_email_notification_message},
1919
preferences::{PreferencesContextContentFlag, PreferencesFlags},
2020
},
21+
rate_limit::RealIp,
2122
util::{self, get_logo_link},
2223
AppState,
2324
};
@@ -150,6 +151,7 @@ pub struct ChangePreferencesReq {
150151
/// Send back a random challenge to the caller, which we expect to be signed with their npub to validate
151152
/// the request actually comes from the given npub
152153
pub async fn start(
154+
RealIp(ip): RealIp,
153155
State(state): State<AppState>,
154156
Json(payload): Json<NotificationStartReq>,
155157
) -> impl IntoResponse {
@@ -162,6 +164,22 @@ pub async fn start(
162164
.into_response();
163165
}
164166

167+
let mut rate_limiter = state.rate_limiter.lock().await;
168+
let allowed = rate_limiter.check(&ip.to_string(), None, None, Some(&payload.npub));
169+
drop(rate_limiter);
170+
if !allowed {
171+
warn!(
172+
"Rate limited req from {} with npub {}",
173+
&ip.to_string(),
174+
&payload.npub
175+
);
176+
return (
177+
StatusCode::TOO_MANY_REQUESTS,
178+
Json(ErrorResp::new("Please try again later")),
179+
)
180+
.into_response();
181+
}
182+
165183
let mut random_bytes = [0u8; 32];
166184
rand::thread_rng().fill_bytes(&mut random_bytes);
167185
let challenge = hex::encode(random_bytes);
@@ -187,6 +205,7 @@ pub async fn start(
187205
/// We validate npub, email and signed challenge. If everything is OK, we send a confirmation email
188206
/// and we create a stub for email preferences with a token to change them later
189207
pub async fn register(
208+
RealIp(ip): RealIp,
190209
State(state): State<AppState>,
191210
Json(payload): Json<NotificationRegisterReq>,
192211
) -> impl IntoResponse {
@@ -214,6 +233,28 @@ pub async fn register(
214233
.into_response();
215234
}
216235

236+
let mut rate_limiter = state.rate_limiter.lock().await;
237+
let allowed = rate_limiter.check(
238+
&ip.to_string(),
239+
Some(&payload.email),
240+
None,
241+
Some(&payload.npub),
242+
);
243+
drop(rate_limiter);
244+
if !allowed {
245+
warn!(
246+
"Rate limited req from {} with npub {} and email {}",
247+
&ip.to_string(),
248+
&payload.npub,
249+
&payload.email,
250+
);
251+
return (
252+
StatusCode::TOO_MANY_REQUESTS,
253+
Json(ErrorResp::new("Please try again later")),
254+
)
255+
.into_response();
256+
}
257+
217258
let challenge = match state
218259
.notification_store
219260
.get_challenge_for_npub(&payload.npub)
@@ -350,6 +391,7 @@ pub async fn register(
350391
}
351392

352393
pub async fn send(
394+
RealIp(ip): RealIp,
353395
State(state): State<AppState>,
354396
Json(req): Json<NotificationSendReq>,
355397
) -> impl IntoResponse {
@@ -376,6 +418,27 @@ pub async fn send(
376418
}
377419
};
378420

421+
let mut rate_limiter = state.rate_limiter.lock().await;
422+
let allowed = rate_limiter.check(
423+
&ip.to_string(),
424+
None,
425+
Some(&payload.sender),
426+
Some(&payload.receiver),
427+
);
428+
drop(rate_limiter);
429+
if !allowed {
430+
warn!(
431+
"Rate limited req from {} with npub {}",
432+
&ip.to_string(),
433+
&payload.receiver
434+
);
435+
return (
436+
StatusCode::TOO_MANY_REQUESTS,
437+
Json(ErrorResp::new("Please try again later")),
438+
)
439+
.into_response();
440+
}
441+
379442
let notification_type = match PreferencesFlags::from_name(&payload.kind) {
380443
Some(nt) => nt,
381444
None => {

src/rate_limit.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use axum::{
2+
extract::{ConnectInfo, FromRequestParts},
3+
http::{request::Parts, StatusCode},
4+
};
5+
use chrono::{DateTime, Duration, Utc};
6+
use std::{
7+
collections::{HashMap, VecDeque},
8+
net::{IpAddr, SocketAddr},
9+
};
10+
11+
/// How often do we allow the same ip in the time frame
12+
const IP_LIMIT: usize = 100;
13+
const IP_WINDOW: Duration = Duration::seconds(10 * 60); // 10 minutes
14+
15+
/// How often do we allow the same email to be registered in the time frame
16+
const EMAIL_LIMIT: usize = 30;
17+
const EMAIL_WINDOW: Duration = Duration::seconds(24 * 3600); // 1 day
18+
19+
/// How often do we allow the same npub in the time frame
20+
const NPUB_LIMIT: usize = 100;
21+
const NPUB_WINDOW: Duration = Duration::seconds(10 * 60); // 10 minutes
22+
23+
const MAX_IDLE: Duration = Duration::seconds(24 * 3600); // remove after 24h idle
24+
const PRUNE_INTERVAL: Duration = Duration::seconds(10 * 60); // check every 10 minutes
25+
26+
#[derive(Debug)]
27+
struct SlidingWindow {
28+
hits: VecDeque<DateTime<Utc>>,
29+
window: Duration,
30+
limit: usize,
31+
last_seen: DateTime<Utc>,
32+
}
33+
34+
impl SlidingWindow {
35+
fn new(limit: usize, window: Duration) -> Self {
36+
Self {
37+
hits: VecDeque::with_capacity(limit),
38+
window,
39+
limit,
40+
last_seen: Utc::now(),
41+
}
42+
}
43+
44+
fn allow(&mut self, now: DateTime<Utc>) -> bool {
45+
// Remove expired hits
46+
while let Some(&ts) = self.hits.front() {
47+
if now - ts > self.window {
48+
self.hits.pop_front();
49+
} else {
50+
break;
51+
}
52+
}
53+
self.last_seen = now;
54+
55+
if self.hits.len() < self.limit {
56+
self.hits.push_back(now);
57+
true
58+
} else {
59+
false
60+
}
61+
}
62+
}
63+
64+
#[derive(Debug)]
65+
pub struct RateLimiter {
66+
by_ip: HashMap<String, SlidingWindow>,
67+
by_email: HashMap<String, SlidingWindow>,
68+
by_npub_sender: HashMap<String, SlidingWindow>,
69+
by_npub_receiver: HashMap<String, SlidingWindow>,
70+
last_prune: DateTime<Utc>,
71+
}
72+
73+
impl RateLimiter {
74+
pub fn new() -> Self {
75+
Self {
76+
by_ip: HashMap::new(),
77+
by_email: HashMap::new(),
78+
by_npub_sender: HashMap::new(),
79+
by_npub_receiver: HashMap::new(),
80+
last_prune: Utc::now(),
81+
}
82+
}
83+
84+
/// Check if the request is allowed
85+
/// There is always an IP, but not always an email, or npub - everything that's set has to be allowed
86+
/// The values are expected to be validated before getting in here
87+
pub fn check(
88+
&mut self,
89+
ip: &str,
90+
email: Option<&str>,
91+
npub_sender: Option<&str>,
92+
npub_receiver: Option<&str>,
93+
) -> bool {
94+
let now = Utc::now();
95+
self.prune_if_needed(now);
96+
97+
let ip_ok = self
98+
.by_ip
99+
.entry(ip.to_string())
100+
.or_insert_with(|| SlidingWindow::new(IP_LIMIT, IP_WINDOW))
101+
.allow(now);
102+
103+
let email_ok = if let Some(email) = email {
104+
let key = email.to_lowercase();
105+
self.by_email
106+
.entry(key)
107+
.or_insert_with(|| SlidingWindow::new(EMAIL_LIMIT, EMAIL_WINDOW))
108+
.allow(now)
109+
} else {
110+
true // no email provided -> skip check
111+
};
112+
113+
let npub_sender_ok = if let Some(npub) = npub_sender {
114+
self.by_npub_sender
115+
.entry(npub.to_string())
116+
.or_insert_with(|| SlidingWindow::new(NPUB_LIMIT, NPUB_WINDOW))
117+
.allow(now)
118+
} else {
119+
true // no sender npub provided -> skip check
120+
};
121+
122+
let npub_receiver_ok = if let Some(npub) = npub_receiver {
123+
self.by_npub_receiver
124+
.entry(npub.to_string())
125+
.or_insert_with(|| SlidingWindow::new(NPUB_LIMIT, NPUB_WINDOW))
126+
.allow(now)
127+
} else {
128+
true // no received npub provided -> skip check
129+
};
130+
131+
ip_ok && email_ok && npub_sender_ok && npub_receiver_ok
132+
}
133+
134+
/// Every PRUNE_INTERVAL, remove outdated entries
135+
fn prune_if_needed(&mut self, now: DateTime<Utc>) {
136+
if now - self.last_prune < PRUNE_INTERVAL {
137+
return;
138+
}
139+
140+
self.last_prune = now;
141+
142+
// only keep recent entries
143+
self.by_ip.retain(|_, win| now - win.last_seen <= MAX_IDLE);
144+
self.by_email
145+
.retain(|_, win| now - win.last_seen <= MAX_IDLE);
146+
self.by_npub_sender
147+
.retain(|_, win| now - win.last_seen <= MAX_IDLE);
148+
}
149+
}
150+
151+
pub struct RealIp(pub IpAddr);
152+
153+
impl<S> FromRequestParts<S> for RealIp
154+
where
155+
S: Send + Sync,
156+
{
157+
type Rejection = (StatusCode, &'static str);
158+
159+
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
160+
// Check X-FORWARDED-FOR header and take the first value for gcp as per
161+
// https://cloud.google.com/functions/docs/reference/headers#x-forwarded-for
162+
if let Some(forwarded) = parts.headers.get("x-forwarded-for")
163+
&& let Ok(s) = forwarded.to_str()
164+
&& let Some(ip_str) = s.split(',').next()
165+
&& let Ok(ip) = ip_str.trim().parse()
166+
{
167+
return Ok(RealIp(ip));
168+
}
169+
170+
// Fallback to socket addr for local dev
171+
if let Some(addr) = parts.extensions.get::<ConnectInfo<SocketAddr>>() {
172+
return Ok(RealIp(addr.ip()));
173+
}
174+
175+
Err((StatusCode::BAD_REQUEST, "No request IP"))
176+
}
177+
}

0 commit comments

Comments
 (0)