Skip to content

Commit effa5d0

Browse files
authored
Feature/max connection limit (#154)
* Add configurable maximum connection limit with graceful rejection - Add max_connections field to ServerOptions (default: 0 = no limit) - Use tokio::sync::Semaphore to limit concurrent connections when max_connections > 0 - Gracefully reject excess connections with warning log instead of crashing - Connection permits are automatically released when connections close - Configure via ServerOptions::new().with_max_connections(500) - Added tests to verify default behavior and configuration options - Prevents resource exhaustion under high connection load * fmt
1 parent 80ff65f commit effa5d0

File tree

1 file changed

+49
-2
lines changed

1 file changed

+49
-2
lines changed

datafusion-postgres/src/lib.rs

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use pgwire::tokio::process_socket;
1616
use rustls_pemfile::{certs, pkcs8_private_keys};
1717
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
1818
use tokio::net::TcpListener;
19+
use tokio::sync::Semaphore;
1920
use tokio_rustls::rustls::{self, ServerConfig};
2021
use tokio_rustls::TlsAcceptor;
2122

@@ -34,6 +35,7 @@ pub struct ServerOptions {
3435
port: u16,
3536
tls_cert_path: Option<String>,
3637
tls_key_path: Option<String>,
38+
max_connections: usize,
3739
}
3840

3941
impl ServerOptions {
@@ -49,6 +51,7 @@ impl Default for ServerOptions {
4951
port: 5432,
5052
tls_cert_path: None,
5153
tls_key_path: None,
54+
max_connections: 0, // 0 = no limit
5255
}
5356
}
5457
}
@@ -126,17 +129,40 @@ pub async fn serve_with_handlers(
126129
info!("Listening on {server_addr} (unencrypted)");
127130
}
128131

132+
// Connection limiter (if configured)
133+
let max_conn_count = opts.max_connections;
134+
let connection_limiter = if max_conn_count > 0 {
135+
Some(Arc::new(Semaphore::new(max_conn_count)))
136+
} else {
137+
None
138+
};
139+
129140
// Accept incoming connections
130141
loop {
131142
match listener.accept().await {
132-
Ok((socket, _addr)) => {
143+
Ok((socket, addr)) => {
133144
let factory_ref = handlers.clone();
134145
let tls_acceptor_ref = tls_acceptor.clone();
146+
let limiter_ref = connection_limiter.clone();
135147

136148
tokio::spawn(async move {
149+
// Check connection limit if configured
150+
let _permit = if let Some(ref semaphore) = limiter_ref {
151+
match semaphore.try_acquire() {
152+
Ok(permit) => Some(permit),
153+
Err(_) => {
154+
warn!("Connection rejected from {addr}: max connections ({max_conn_count}) reached");
155+
return;
156+
}
157+
}
158+
} else {
159+
None
160+
};
161+
137162
if let Err(e) = process_socket(socket, tls_acceptor_ref, factory_ref).await {
138-
warn!("Error processing socket: {e}");
163+
warn!("Error processing socket from {addr}: {e}");
139164
}
165+
// Permit is automatically released when _permit is dropped
140166
});
141167
}
142168
Err(e) => {
@@ -145,3 +171,24 @@ pub async fn serve_with_handlers(
145171
}
146172
}
147173
}
174+
175+
#[cfg(test)]
176+
mod tests {
177+
use super::*;
178+
179+
#[test]
180+
fn test_server_options_default_max_connections() {
181+
let opts = ServerOptions::default();
182+
assert_eq!(opts.max_connections, 0); // No limit by default
183+
}
184+
185+
#[test]
186+
fn test_server_options_max_connections_configuration() {
187+
let opts = ServerOptions::new().with_max_connections(500);
188+
assert_eq!(opts.max_connections, 500);
189+
190+
// Test that 0 means no limit
191+
let opts_no_limit = ServerOptions::new().with_max_connections(0);
192+
assert_eq!(opts_no_limit.max_connections, 0);
193+
}
194+
}

0 commit comments

Comments
 (0)