@@ -16,6 +16,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
1616
1717use std:: cell:: RefCell ;
1818use std:: net:: SocketAddr ;
19+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
1920
2021use tokio:: net:: { TcpListener , UnixListener } ;
2122
@@ -80,6 +81,7 @@ struct ServerConfig {
8081 recv_timeout : u64 ,
8182 channel_capacity : usize ,
8283 send_timeout : u64 ,
84+ max_connection_age : Option < u64 > ,
8385}
8486
8587impl ServerConfig {
@@ -91,6 +93,7 @@ impl ServerConfig {
9193 recv_timeout : 30000 , // Default 30 second timeout
9294 channel_capacity : 5000 , // Default capacity for worker channel
9395 send_timeout : 1000 , // Default 1 second timeout for send backpressure
96+ max_connection_age : None , // No limit by default
9497 }
9598 }
9699}
@@ -109,6 +112,7 @@ struct Server {
109112 work_tx : RefCell < Option < Arc < crossbeam_channel:: Sender < RequestWithCompletion > > > > ,
110113 runtime : RefCell < Option < Arc < tokio:: runtime:: Runtime > > > ,
111114 shutdown : RefCell < Option < broadcast:: Sender < ( ) > > > ,
115+ total_connections : Arc < AtomicU64 > ,
112116}
113117
114118impl Server {
@@ -121,9 +125,14 @@ impl Server {
121125 work_tx : RefCell :: new ( None ) ,
122126 runtime : RefCell :: new ( None ) ,
123127 shutdown : RefCell :: new ( None ) ,
128+ total_connections : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
124129 }
125130 }
126131
132+ pub fn total_connections ( & self ) -> u64 {
133+ self . total_connections . load ( Ordering :: Relaxed )
134+ }
135+
127136 pub fn configure ( & self , config : magnus:: RHash ) -> Result < ( ) , MagnusError > {
128137 let mut server_config = self . config . borrow_mut ( ) ;
129138 if let Some ( bind_address) = config. get ( magnus:: Symbol :: new ( "bind_address" ) ) {
@@ -150,6 +159,10 @@ impl Server {
150159 server_config. send_timeout = u64:: try_convert ( send_timeout) ?;
151160 }
152161
162+ if let Some ( max_connection_age) = config. get ( magnus:: Symbol :: new ( "max_connection_age" ) ) {
163+ server_config. max_connection_age = Some ( u64:: try_convert ( max_connection_age) ?) ;
164+ }
165+
153166 // Initialize logging if not already initialized
154167 LOGGER_INIT . call_once ( || {
155168 let mut builder = env_logger:: Builder :: from_env ( env_logger:: Env :: default ( ) ) ;
@@ -263,7 +276,9 @@ impl Server {
263276 * self . work_tx . borrow_mut ( ) = Some ( work_tx. clone ( ) ) ;
264277
265278 let ( shutdown_tx, shutdown_rx) = broadcast:: channel ( 1 ) ;
266- * self . shutdown . borrow_mut ( ) = Some ( shutdown_tx) ;
279+ * self . shutdown . borrow_mut ( ) = Some ( shutdown_tx. clone ( ) ) ;
280+
281+ let total_connections = self . total_connections . clone ( ) ;
267282
268283 let mut rt_builder = tokio:: runtime:: Builder :: new_multi_thread ( ) ;
269284
@@ -346,17 +361,19 @@ impl Server {
346361 } ;
347362
348363 // Now that we have successfully bound, spawn the server task
364+ let max_connection_age = config. max_connection_age ;
349365 let server_task = tokio:: spawn ( async move {
350366 let graceful_shutdown = GracefulShutdown :: new ( ) ;
351367 let mut shutdown_rx = shutdown_rx;
352368
353369 loop {
354370 tokio:: select! {
355- Ok ( ( stream, _) ) = listener. accept( ) => {
371+ Ok ( ( stream, _) ) = listener. accept( ) => {
372+ total_connections. fetch_add( 1 , Ordering :: Relaxed ) ;
356373 info!( "New connection established" ) ;
357-
374+
358375 let io = TokioIo :: new( stream) ;
359-
376+
360377 debug!( "Setting up connection" ) ;
361378
362379 let builder = builder. clone( ) ;
@@ -365,13 +382,50 @@ impl Server {
365382 debug!( "Service handling request" ) ;
366383 handle_request( req, work_tx. clone( ) , config. recv_timeout, config. send_timeout)
367384 } ) ) ;
368- let fut = graceful_shutdown. watch( conn. into_owned( ) ) ;
369- tokio:: task:: spawn( async move {
370- if let Err ( err) = fut. await {
371- warn!( "Error serving connection: {:?}" , err) ;
372- }
373- } ) ;
374- } ,
385+ // If max_connection_age is set, handle the connection with a timeout
386+ // but still integrate with server-wide graceful shutdown via broadcast channel
387+ if let Some ( max_age_ms) = max_connection_age {
388+ let conn = conn. into_owned( ) ;
389+ let mut conn_shutdown_rx = shutdown_tx. subscribe( ) ;
390+ tokio:: task:: spawn( async move {
391+ tokio:: pin!( conn) ;
392+ let sleep = tokio:: time:: sleep( std:: time:: Duration :: from_millis( max_age_ms) ) ;
393+ tokio:: pin!( sleep) ;
394+ let mut graceful_shutdown_started = false ;
395+
396+ loop {
397+ tokio:: select! {
398+ result = conn. as_mut( ) => {
399+ if let Err ( err) = result {
400+ warn!( "Error serving connection: {:?}" , err) ;
401+ }
402+ break ;
403+ }
404+ _ = & mut sleep, if !graceful_shutdown_started => {
405+ debug!( "Connection reached max age ({}ms), sending GOAWAY" , max_age_ms) ;
406+ conn. as_mut( ) . graceful_shutdown( ) ;
407+ graceful_shutdown_started = true ;
408+ // Continue the loop to let the connection drain
409+ }
410+ _ = conn_shutdown_rx. recv( ) , if !graceful_shutdown_started => {
411+ debug!( "Server shutdown requested, sending GOAWAY to connection" ) ;
412+ conn. as_mut( ) . graceful_shutdown( ) ;
413+ graceful_shutdown_started = true ;
414+ // Continue the loop to let the connection drain
415+ }
416+ }
417+ }
418+ } ) ;
419+ } else {
420+ // No max age, use the graceful shutdown watcher
421+ let fut = graceful_shutdown. watch( conn. into_owned( ) ) ;
422+ tokio:: task:: spawn( async move {
423+ if let Err ( err) = fut. await {
424+ warn!( "Error serving connection: {:?}" , err) ;
425+ }
426+ } ) ;
427+ }
428+ } ,
375429 _ = shutdown_rx. recv( ) => {
376430 debug!( "Graceful shutdown requested; shutting down" ) ;
377431 break ;
@@ -589,6 +643,7 @@ fn init(ruby: &Ruby) -> Result<(), MagnusError> {
589643 server_class. define_method ( "start" , method ! ( Server :: start, 0 ) ) ?;
590644 server_class. define_method ( "stop" , method ! ( Server :: stop, 0 ) ) ?;
591645 server_class. define_method ( "run_worker" , method ! ( Server :: run_worker, 0 ) ) ?;
646+ server_class. define_method ( "total_connections" , method ! ( Server :: total_connections, 0 ) ) ?;
592647
593648 let response_class = module. define_class ( "Response" , ruby. class_object ( ) ) ?;
594649 response_class. define_singleton_method ( "new" , function ! ( Response :: new, 3 ) ) ?;
0 commit comments