@@ -21,7 +21,6 @@ use tokio::{
21
21
net:: UnixListener ,
22
22
select, spawn,
23
23
sync:: mpsc:: { channel, Receiver , Sender } ,
24
- sync:: watch,
25
24
time:: timeout,
26
25
} ;
27
26
#[ cfg( target_os = "linux" ) ]
@@ -33,16 +32,20 @@ use crate::common::{self, Domain};
33
32
use crate :: context;
34
33
use crate :: error:: { get_status, Error , Result } ;
35
34
use crate :: proto:: { Code , MessageHeader , Status , MESSAGE_TYPE_REQUEST } ;
35
+ use crate :: r#async:: shutdown;
36
36
use crate :: r#async:: utils;
37
37
use crate :: r#async:: { MethodHandler , TtrpcContext } ;
38
38
39
+ const DEFAULT_CONN_SHUTDOWN_TIMEOUT : Duration = Duration :: from_millis ( 5000 ) ;
40
+ const DEFAULT_SERVER_SHUTDOWN_TIMEOUT : Duration = Duration :: from_millis ( 10000 ) ;
41
+
39
42
/// A ttrpc Server (async).
40
43
pub struct Server {
41
44
listeners : Vec < RawFd > ,
42
45
methods : Arc < HashMap < String , Box < dyn MethodHandler + Send + Sync > > > ,
43
46
domain : Option < Domain > ,
44
- disconnect_tx : Option < watch :: Sender < i32 > > ,
45
- all_conn_done_rx : Option < Receiver < i32 > > ,
47
+
48
+ shutdown : shutdown :: Notifier ,
46
49
stop_listen_tx : Option < Sender < Sender < RawFd > > > ,
47
50
}
48
51
@@ -52,8 +55,7 @@ impl Default for Server {
52
55
listeners : Vec :: with_capacity ( 1 ) ,
53
56
methods : Arc :: new ( HashMap :: new ( ) ) ,
54
57
domain : None ,
55
- disconnect_tx : None ,
56
- all_conn_done_rx : None ,
58
+ shutdown : shutdown:: with_timeout ( DEFAULT_SERVER_SHUTDOWN_TIMEOUT ) . 0 ,
57
59
stop_listen_tx : None ,
58
60
}
59
61
}
@@ -151,11 +153,7 @@ impl Server {
151
153
{
152
154
let methods = self . methods . clone ( ) ;
153
155
154
- let ( disconnect_tx, close_conn_rx) = watch:: channel ( 0 ) ;
155
- self . disconnect_tx = Some ( disconnect_tx) ;
156
-
157
- let ( conn_done_tx, all_conn_done_rx) = channel :: < i32 > ( 1 ) ;
158
- self . all_conn_done_rx = Some ( all_conn_done_rx) ;
156
+ let shutdown_waiter = self . shutdown . subscribe ( ) ;
159
157
160
158
let ( stop_listen_tx, mut stop_listen_rx) = channel ( 1 ) ;
161
159
self . stop_listen_tx = Some ( stop_listen_tx) ;
@@ -174,8 +172,7 @@ impl Server {
174
172
fd,
175
173
stream,
176
174
methods. clone( ) ,
177
- close_conn_rx. clone( ) ,
178
- conn_done_tx. clone( )
175
+ shutdown_waiter. clone( ) ,
179
176
) . await ;
180
177
}
181
178
Err ( e) => {
@@ -201,7 +198,6 @@ impl Server {
201
198
}
202
199
}
203
200
}
204
- drop ( conn_done_tx) ;
205
201
} ) ;
206
202
Ok ( ( ) )
207
203
}
@@ -214,13 +210,16 @@ impl Server {
214
210
}
215
211
216
212
pub async fn disconnect ( & mut self ) {
217
- if let Some ( tx) = self . disconnect_tx . take ( ) {
218
- tx. send ( 1 ) . ok ( ) ;
219
- }
213
+ self . shutdown . shutdown ( ) ;
220
214
221
- if let Some ( mut rx) = self . all_conn_done_rx . take ( ) {
222
- rx. recv ( ) . await ;
223
- }
215
+ self . shutdown
216
+ . wait_all_exit ( )
217
+ . await
218
+ . map_err ( |e| {
219
+ trace ! ( "wait connection exit error: {}" , e) ;
220
+ } )
221
+ . ok ( ) ;
222
+ trace ! ( "wait connection exit." ) ;
224
223
}
225
224
226
225
pub async fn stop_listen ( & mut self ) {
@@ -239,17 +238,17 @@ async fn spawn_connection_handler<S>(
239
238
fd : RawFd ,
240
239
stream : S ,
241
240
methods : Arc < HashMap < String , Box < dyn MethodHandler + Send + Sync > > > ,
242
- mut close_conn_rx : watch:: Receiver < i32 > ,
243
- conn_done_tx : Sender < i32 > ,
241
+ shutdown_waiter : shutdown:: Waiter ,
244
242
) where
245
243
S : AsyncRead + AsyncWrite + AsRawFd + Send + ' static ,
246
244
{
247
- let ( req_done_tx, mut all_req_done_rx) = channel :: < i32 > ( 1 ) ;
248
-
249
245
spawn ( async move {
250
246
let ( mut reader, mut writer) = split ( stream) ;
251
247
let ( tx, mut rx) : ( Sender < Vec < u8 > > , Receiver < Vec < u8 > > ) = channel ( 100 ) ;
252
- let ( client_disconnected_tx, client_disconnected_rx) = watch:: channel ( false ) ;
248
+
249
+ let server_shutdown = shutdown_waiter. clone ( ) ;
250
+ let ( disconnect_notifier, disconnect_waiter) =
251
+ shutdown:: with_timeout ( DEFAULT_CONN_SHUTDOWN_TIMEOUT ) ;
253
252
254
253
spawn ( async move {
255
254
while let Some ( buf) = rx. recv ( ) . await {
@@ -262,8 +261,7 @@ async fn spawn_connection_handler<S>(
262
261
loop {
263
262
let tx = tx. clone ( ) ;
264
263
let methods = methods. clone ( ) ;
265
- let req_done_tx2 = req_done_tx. clone ( ) ;
266
- let mut client_disconnected_rx2 = client_disconnected_rx. clone ( ) ;
264
+ let handler_shutdown_waiter = disconnect_waiter. clone ( ) ;
267
265
268
266
select ! {
269
267
resp = receive( & mut reader) => {
@@ -272,33 +270,32 @@ async fn spawn_connection_handler<S>(
272
270
spawn( async move {
273
271
select! {
274
272
_ = handle_request( tx, fd, methods, message) => { }
275
- _ = client_disconnected_rx2 . changed ( ) => { }
273
+ _ = handler_shutdown_waiter . wait_shutdown ( ) => { }
276
274
}
277
-
278
- drop( req_done_tx2) ;
279
275
} ) ;
280
276
}
281
277
Err ( e) => {
282
- let _ = client_disconnected_tx . send ( true ) ;
278
+ disconnect_notifier . shutdown ( ) ;
283
279
trace!( "error {:?}" , e) ;
284
280
break ;
285
281
}
286
282
}
287
283
}
288
- v = close_conn_rx. changed( ) => {
289
- // 0 is the init value of this watch, not a valid signal
290
- // is_err means the tx was dropped.
291
- if v. is_err( ) || * close_conn_rx. borrow( ) != 0 {
292
- info!( "Stop accepting new connections." ) ;
293
- break ;
294
- }
284
+ _ = server_shutdown. wait_shutdown( ) => {
285
+ trace!( "Receive shutdown." ) ;
286
+ break ;
295
287
}
296
288
}
297
289
}
298
-
299
- drop ( req_done_tx) ;
300
- all_req_done_rx. recv ( ) . await ;
301
- drop ( conn_done_tx) ;
290
+ // TODO: Don't disconnect_notifier.shutdown();
291
+ // Wait pedding request/stream to exit.
292
+ disconnect_notifier
293
+ . wait_all_exit ( )
294
+ . await
295
+ . map_err ( |e| {
296
+ trace ! ( "wait handler exit error: {}" , e) ;
297
+ } )
298
+ . ok ( ) ;
302
299
} ) ;
303
300
}
304
301
0 commit comments