diff --git a/Cargo.lock b/Cargo.lock index e58a6fd3f..6490f2104 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -772,12 +772,11 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.1.0" +version = "5.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "crossbeam-utils", "hashbrown 0.14.5", "lock_api", "once_cell", @@ -1165,6 +1164,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -1242,6 +1247,26 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if", + "dashmap", + "futures", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "h2" version = "0.4.10" @@ -1888,9 +1913,9 @@ dependencies = [ [[package]] name = "lru" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86ea4e65087ff52f3862caff188d489f1fab49a0cb09e01b2e3f1a617b10aaed" +checksum = "bfe949189f46fabb938b3a9a0be30fdd93fd8a09260da863399a8cf3db756ec8" dependencies = [ "hashbrown 0.15.3", ] @@ -2039,6 +2064,12 @@ dependencies = [ "memoffset", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -2049,6 +2080,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2361,16 +2398,16 @@ dependencies = [ "chrono", "clap", "csv-core", - "dashmap", "fnv", "futures", + "governor", "hickory-resolver", "http-body-util", "hyper", "hyper-util", "indexmap", "lazy_static", - "lru 0.16.0", + "lru 0.16.1", "md5", "once_cell", "parking_lot", @@ -2711,6 +2748,21 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "quanta" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3ab5a9d756f0d97bdc89019bd2e4ea098cf9cde50ee7564dde6b81ccc8f06c7" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.40" @@ -2875,6 +2927,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "raw-cpuid" +version = "11.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +dependencies = [ + "bitflags 2.9.1", +] + [[package]] name = "redox_syscall" version = "0.5.12" @@ -3556,6 +3617,15 @@ dependencies = [ "lock_api", ] +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.3" diff --git a/example.pgdog.toml b/example.pgdog.toml index eb38c9720..2204f3079 100644 --- a/example.pgdog.toml +++ b/example.pgdog.toml @@ -245,6 +245,14 @@ mirror_queue = 128 # - trust auth_type = "scram" +# Authentication rate limit (attempts per minute per IP address). +# +# Prevents brute-force authentication attacks by limiting the number +# of authentication attempts from a single IP address. +# +# Default: 10 +auth_rate_limit = 10 + # Disable cross-shard queries. # # Default: false diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index e16090d8b..9df55da31 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -48,6 +48,7 @@ regex = "1" uuid = { version = "1", features = ["v4", "serde"] } url = "2" ratatui = { version = "0.30.0-alpha.1", optional = true } +governor = "0.6" rmp-serde = "1" rust_decimal = { version = "1.36", features = ["db-postgres"] } chrono = "0.4" @@ -60,7 +61,6 @@ indexmap = "2.9" lru = "0.16" hickory-resolver = "0.25.2" lazy_static = "1" -dashmap = "6" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/pgdog/src/auth/mod.rs b/pgdog/src/auth/mod.rs index 98d6160a8..19526b006 100644 --- a/pgdog/src/auth/mod.rs +++ b/pgdog/src/auth/mod.rs @@ -2,6 +2,7 @@ pub mod error; pub mod md5; +pub mod rate_limit; pub mod scram; pub use error::Error; diff --git a/pgdog/src/auth/rate_limit.rs b/pgdog/src/auth/rate_limit.rs new file mode 100644 index 000000000..554795795 --- /dev/null +++ b/pgdog/src/auth/rate_limit.rs @@ -0,0 +1,232 @@ +//! Rate limiting for authentication attempts. + +use governor::{ + clock::DefaultClock, + state::keyed::DefaultKeyedStateStore, + Quota, RateLimiter, +}; +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use std::net::{IpAddr, Ipv6Addr}; +use std::num::NonZeroU32; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use crate::config::config; + +/// Normalize an IP address for rate limiting purposes. +/// +/// IPv4 addresses are used as-is. +/// IPv6 addresses are masked to their /64 prefix to prevent attackers +/// from bypassing rate limits by rotating through addresses in their +/// allocated block (most ISPs and cloud providers allocate /64 or /48). +fn normalize_ip(ip: IpAddr) -> IpAddr { + match ip { + IpAddr::V4(v4) => IpAddr::V4(v4), + IpAddr::V6(v6) => { + let segments = v6.segments(); + // Keep first 4 segments (64 bits), zero out the rest + let masked = Ipv6Addr::new(segments[0], segments[1], segments[2], segments[3], 0, 0, 0, 0); + IpAddr::V6(masked) + } + } +} + +type KeyedStore = DefaultKeyedStateStore; +type IpRateLimiter = RateLimiter; + +fn create_limiter(limit: u32) -> IpRateLimiter { + // Config validation ensures limit is always >= 1 + let limit = NonZeroU32::new(limit).expect("auth_rate_limit validated to be non-zero"); + let quota = Quota::per_minute(limit).allow_burst(limit); + RateLimiter::keyed(quota) +} + +/// Global rate limiter for authentication attempts per IP address. +static AUTH_RATE_LIMITER: Lazy>> = Lazy::new(|| { + let limit = config().config.general.auth_rate_limit; + Arc::new(RwLock::new(create_limiter(limit))) +}); + +/// Track last reset time to prevent unbounded memory growth. +/// +/// Governor's DefaultKeyedStateStore (DashMap) grows indefinitely. +/// Reset limiter every hour to clear accumulated state. +static LAST_RESET: Lazy>> = Lazy::new(|| Arc::new(RwLock::new(Instant::now()))); + +/// Check if an IP address can attempt authentication. +/// Returns true if the request is allowed, false if rate limited. +/// +/// IPv6 addresses are normalized to /64 prefix before rate limiting +/// to prevent attackers from bypassing limits by rotating through +/// addresses in their allocated block. +/// +/// Resets state every hour to prevent unbounded memory growth. +pub fn check(ip: IpAddr) -> bool { + // Reset every hour to prevent unbounded growth of keyed state + if LAST_RESET.read().elapsed() > Duration::from_secs(3600) { + reload(config().config.general.auth_rate_limit); + *LAST_RESET.write() = Instant::now(); + } + + check_with_limiter(&AUTH_RATE_LIMITER.read(), ip) +} + +/// Internal function to check rate limit with a specific limiter. +/// Allows testing without depending on global config. +fn check_with_limiter(limiter: &IpRateLimiter, ip: IpAddr) -> bool { + let normalized_ip = normalize_ip(ip); + limiter.check_key(&normalized_ip).is_ok() +} + +/// Reload the rate limiter with a new limit. +/// +/// Called when configuration is reloaded via SIGHUP or RELOAD command. +pub fn reload(new_limit: u32) { + *AUTH_RATE_LIMITER.write() = create_limiter(new_limit); +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Create a test rate limiter with a specific limit. + /// Tests should use this to avoid coupling to config defaults. + fn test_limiter(limit: u32) -> IpRateLimiter { + create_limiter(limit) + } + + #[test] + fn test_rate_limiter_allows_initial_requests() { + let limiter = test_limiter(10); + let ip = "127.0.0.1".parse().unwrap(); + + // First 10 requests should succeed + for _ in 0..10 { + assert!(check_with_limiter(&limiter, ip)); + } + + // 11th request should be rate limited + assert!(!check_with_limiter(&limiter, ip)); + } + + #[test] + fn test_rate_limiter_per_ip() { + let limiter = test_limiter(10); + let ip1: IpAddr = "127.0.0.10".parse().unwrap(); + let ip2: IpAddr = "127.0.0.20".parse().unwrap(); + + // Exhaust ip1 + for _ in 0..10 { + assert!(check_with_limiter(&limiter, ip1)); + } + assert!(!check_with_limiter(&limiter, ip1)); + + // ip2 should still work + for _ in 0..10 { + assert!(check_with_limiter(&limiter, ip2)); + } + assert!(!check_with_limiter(&limiter, ip2)); + } + + #[test] + fn test_ipv6_normalization() { + // Test that IPv6 addresses in same /64 are normalized to same value + let ip1: IpAddr = "2001:db8:abcd:1234:5678:90ab:cdef:1111".parse().unwrap(); + let ip2: IpAddr = "2001:db8:abcd:1234:9999:aaaa:bbbb:cccc".parse().unwrap(); + let ip3: IpAddr = "2001:db8:abcd:5678:1234:5678:90ab:cdef".parse().unwrap(); + + let normalized1 = normalize_ip(ip1); + let normalized2 = normalize_ip(ip2); + let normalized3 = normalize_ip(ip3); + + // Same /64 prefix should normalize to same address + assert_eq!(normalized1, normalized2); + assert_eq!(normalized1.to_string(), "2001:db8:abcd:1234::"); + + // Different /64 prefix should normalize differently + assert_ne!(normalized1, normalized3); + assert_eq!(normalized3.to_string(), "2001:db8:abcd:5678::"); + } + + #[test] + fn test_ipv4_normalization() { + // IPv4 addresses should pass through unchanged + let ip: IpAddr = "192.168.1.100".parse().unwrap(); + let normalized = normalize_ip(ip); + assert_eq!(ip, normalized); + } + + #[test] + fn test_ipv6_rate_limiting_blocks_same_subnet() { + let limiter = test_limiter(10); + + // Two addresses in same /64 block + let ip1: IpAddr = "2001:db8:cafe:1234:5678:90ab:cdef:1111".parse().unwrap(); + let ip2: IpAddr = "2001:db8:cafe:1234:9999:aaaa:bbbb:cccc".parse().unwrap(); + + // Exhaust rate limit with first address + for _ in 0..10 { + assert!(check_with_limiter(&limiter, ip1)); + } + assert!(!check_with_limiter(&limiter, ip1)); + + // Second address in same /64 should also be blocked + assert!(!check_with_limiter(&limiter, ip2)); + + // Different /64 should still work + let ip3: IpAddr = "2001:db8:cafe:5678:1234:5678:90ab:cdef".parse().unwrap(); + assert!(check_with_limiter(&limiter, ip3)); + } + + #[test] + fn test_reload() { + // Use unique IP to avoid interference from other tests + let ip: IpAddr = "10.0.0.1".parse().unwrap(); + + // Exhaust default limit of 10 on global limiter + for _ in 0..10 { + assert!(check(ip)); + } + assert!(!check(ip)); + + // Reload with higher limit (resets state) + reload(20); + + // Should now allow more requests (state was reset) + for _ in 0..20 { + assert!(check(ip)); + } + assert!(!check(ip)); + } + + #[test] + fn test_different_limits() { + // Test with limit of 3 + let limiter = test_limiter(3); + let ip: IpAddr = "192.168.100.1".parse().unwrap(); + + for _ in 0..3 { + assert!(check_with_limiter(&limiter, ip)); + } + assert!(!check_with_limiter(&limiter, ip)); + } + + #[test] + fn test_limit_of_one() { + // Edge case: minimum limit of 1 + let limiter = test_limiter(1); + let ip: IpAddr = "192.168.100.2".parse().unwrap(); + + assert!(check_with_limiter(&limiter, ip)); + assert!(!check_with_limiter(&limiter, ip)); + } + + // Note: Time-based recovery test is not included because: + // 1. Governor crate uses DefaultClock (wall-clock time), not Tokio's time + // 2. Tokio's pause()/advance() don't affect governor's clock + // 3. Would need custom clock implementation or 60s wait + // + // The rate limiting behavior is well-tested by the governor crate itself. + // Our tests verify correct integration with the library. +} \ No newline at end of file diff --git a/pgdog/src/auth/scram/server.rs b/pgdog/src/auth/scram/server.rs index 3cb6a3a36..f6179b8da 100644 --- a/pgdog/src/auth/scram/server.rs +++ b/pgdog/src/auth/scram/server.rs @@ -50,10 +50,20 @@ use base64::prelude::*; impl AuthenticationProvider for UserPassword { fn get_password_for(&self, _user: &str) -> Option { - // TODO: This is slow. We should move it to its own thread pool. let iterations = 4096; let salt = rand::thread_rng().gen::<[u8; 16]>().to_vec(); - let hash = hash_password(&self.password, NonZeroU32::new(iterations).unwrap(), &salt); + + // Move expensive PBKDF2 computation to blocking thread pool + // to avoid blocking the async runtime. + // Note: Using block_in_place() because AuthenticationProvider trait is synchronous. + let hash = tokio::task::block_in_place(|| { + hash_password( + &self.password, + NonZeroU32::new(iterations).expect("PBKDF2 iterations must be non-zero"), + &salt, + ) + }); + Some(PasswordInfo::new(hash.to_vec(), iterations as u16, salt)) } } diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index 8d7f2480b..d5f5dd441 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -98,6 +98,9 @@ pub fn reload() -> Result<(), Error> { // Resize query cache Cache::resize(new_config.config.general.query_cache_limit); + // Reload rate limiter with new limit + crate::auth::rate_limit::reload(new_config.config.general.auth_rate_limit); + Ok(()) } diff --git a/pgdog/src/config/general.rs b/pgdog/src/config/general.rs index 08ed6cbba..fd81c1c84 100644 --- a/pgdog/src/config/general.rs +++ b/pgdog/src/config/general.rs @@ -150,6 +150,9 @@ pub struct General { /// Two-phase commit automatic transactions. #[serde(default)] pub two_phase_commit_auto: Option, + /// Authentication rate limit (attempts per minute per IP). + #[serde(default = "General::auth_rate_limit")] + pub auth_rate_limit: u32, } impl Default for General { @@ -204,6 +207,7 @@ impl Default for General { two_phase_commit: bool::default(), two_phase_commit_auto: None, server_lifetime: Self::server_lifetime(), + auth_rate_limit: Self::auth_rate_limit(), } } } @@ -467,6 +471,12 @@ impl General { ) } + pub fn auth_rate_limit() -> u32 { + let value = Self::env_or_default("PGDOG_AUTH_RATE_LIMIT", 10); + // Ensure value is at least 1 to prevent disabling rate limiting + value.max(1) + } + fn default_passthrough_auth() -> PassthoughAuth { if let Ok(auth) = env::var("PGDOG_PASSTHROUGH_AUTH") { // TODO: figure out why toml::from_str doesn't work. @@ -842,4 +852,19 @@ mod tests { env::remove_var("PGDOG_AUTH_TYPE"); env::remove_var("PGDOG_DRY_RUN"); } + + #[test] + fn test_auth_rate_limit_validation() { + // Test normal value + env::set_var("PGDOG_AUTH_RATE_LIMIT", "20"); + assert_eq!(General::auth_rate_limit(), 20); + + // Test zero value gets clamped to 1 + env::set_var("PGDOG_AUTH_RATE_LIMIT", "0"); + assert_eq!(General::auth_rate_limit(), 1); + + // Test default + env::remove_var("PGDOG_AUTH_RATE_LIMIT"); + assert_eq!(General::auth_rate_limit(), 10); + } } diff --git a/pgdog/src/frontend/client/mod.rs b/pgdog/src/frontend/client/mod.rs index 928abbbbf..d44243c81 100644 --- a/pgdog/src/frontend/client/mod.rs +++ b/pgdog/src/frontend/client/mod.rs @@ -10,7 +10,7 @@ use tokio::{select, spawn}; use tracing::{debug, enabled, error, info, trace, Level as LogLevel}; use super::{ClientRequest, Comms, Error, PreparedStatements}; -use crate::auth::{md5, scram::Server}; +use crate::auth::{md5, rate_limit, scram::Server}; use crate::backend::maintenance_mode; use crate::backend::{ databases, @@ -150,6 +150,20 @@ impl Client { } }; + if let Some(addr) = *stream.peer_addr() { + if !rate_limit::check(addr.ip()) { + error!( + "Authentication rate limit exceeded for IP: {}, user: \"{}\", database: \"{}\"", + addr.ip(), + user, + database + ); + // Send generic auth error to prevent information leakage to attacker + stream.fatal(ErrorResponse::auth(user, database)).await?; + return Ok(()); + } + } + let password = if admin { admin_password } else {