@@ -8,10 +8,13 @@ use std::net::{IpAddr, SocketAddr};
88use std:: str:: FromStr ;
99use tokio:: net:: { TcpListener , TcpStream } ;
1010use futures:: future:: select_all;
11+ use log:: info;
12+ use tokio:: { select, signal} ;
1113
1214pub struct TcpSocket {
1315 tcp_listeners : Vec < TcpListener > ,
1416 addrs : Vec < TcpAddr > ,
17+ pub ( crate ) shutdown_tx : tokio:: sync:: broadcast:: Sender < ( ) > ,
1518 from_sys : bool ,
1619}
1720
@@ -24,9 +27,11 @@ impl TcpSocket {
2427 let tcp_addr = TcpAddr :: from_config ( config) ?;
2528 ( vec ! [ Self :: bind( & tcp_addr) ?] , vec ! [ tcp_addr] , false )
2629 } ;
30+ let ( shutdown_tx, _) = tokio:: sync:: broadcast:: channel :: < ( ) > ( 1 ) ;
2731 Ok ( TcpSocket {
2832 tcp_listeners,
2933 addrs : tcp_addr,
34+ shutdown_tx,
3035 from_sys,
3136 } )
3237 }
@@ -62,7 +67,7 @@ impl TcpSocket {
6267 Ok ( tcp_listener)
6368 }
6469
65- pub async fn accept ( & self ) -> io:: Result < ( TcpStream , SocketAddr ) > {
70+ pub async fn accept ( & self , shutdown_rx : & mut tokio :: sync :: broadcast :: Receiver < ( ) > ) -> io:: Result < Option < ( TcpStream , SocketAddr ) > > {
6671 if self . tcp_listeners . is_empty ( ) {
6772 return Err ( Error :: new (
6873 ErrorKind :: NotConnected ,
@@ -73,9 +78,41 @@ impl TcpSocket {
7378 let accept_futures = self . tcp_listeners . iter ( ) . map ( |listener| {
7479 Box :: pin ( listener. accept ( ) )
7580 } ) ;
76- let ( result, _index, _remaining) = select_all ( accept_futures) . await ;
77- result // result of the first future to complete
81+
82+ select ! {
83+ res = select_all( accept_futures) => {
84+ let ( result, _index, _remaining) = res;
85+ result. map( Some )
86+ }
87+ _ = shutdown_rx. recv( ) => Ok ( None ) ,
88+ }
7889 }
90+
91+ pub fn spawn_signal_handler ( & self ) {
92+ let shutdown_tx = self . shutdown_tx . clone ( ) ;
93+ tokio:: spawn ( async move {
94+ let ctrl_c = signal:: ctrl_c ( ) ;
95+
96+ #[ cfg( unix) ]
97+ let terminate = async {
98+ if let Ok ( mut term) = signal:: unix:: signal ( signal:: unix:: SignalKind :: terminate ( ) ) {
99+ term. recv ( ) . await ;
100+ }
101+ } ;
102+
103+ #[ cfg( not( unix) ) ]
104+ let terminate = std:: future:: pending :: < ( ) > ( ) ;
105+
106+ select ! {
107+ _ = ctrl_c => { } ,
108+ _ = terminate => { } ,
109+ }
110+
111+ info ! ( "SIGTERM / Ctrl+C received, notifying tasks ..." ) ;
112+ let _ = shutdown_tx. send ( ( ) ) ;
113+ } ) ;
114+ }
115+
79116}
80117
81118impl fmt:: Display for TcpSocket {
0 commit comments