@@ -24,7 +24,10 @@ use stun::integrity::*;
24
24
use stun:: message:: * ;
25
25
use stun:: textattrs:: * ;
26
26
use stun:: xoraddr:: * ;
27
+ use tokio:: pin;
28
+ use tokio:: select;
27
29
use tokio:: sync:: { mpsc, Mutex } ;
30
+ use tokio_util:: sync:: CancellationToken ;
28
31
use transaction:: * ;
29
32
use util:: conn:: * ;
30
33
use util:: vnet:: net:: * ;
@@ -78,6 +81,7 @@ struct ClientInternal {
78
81
binding_mgr : Arc < Mutex < BindingManager > > ,
79
82
rto_in_ms : u16 ,
80
83
read_ch_tx : Arc < Mutex < Option < mpsc:: Sender < InboundData > > > > ,
84
+ close_notify : CancellationToken ,
81
85
}
82
86
83
87
#[ async_trait]
@@ -210,6 +214,7 @@ impl ClientInternal {
210
214
} ,
211
215
integrity : MessageIntegrity :: new_short_term_integrity ( String :: new ( ) ) ,
212
216
read_ch_tx : Arc :: new ( Mutex :: new ( None ) ) ,
217
+ close_notify : CancellationToken :: new ( ) ,
213
218
} )
214
219
}
215
220
@@ -227,33 +232,51 @@ impl ClientInternal {
227
232
let tr_map = Arc :: clone ( & self . tr_map ) ;
228
233
let read_ch_tx = Arc :: clone ( & self . read_ch_tx ) ;
229
234
let binding_mgr = Arc :: clone ( & self . binding_mgr ) ;
235
+ let close_notify = self . close_notify . clone ( ) ;
230
236
231
237
tokio:: spawn ( async move {
232
238
let mut buf = vec ! [ 0u8 ; MAX_DATA_BUFFER_SIZE ] ;
239
+ let wait_cancel = close_notify. cancelled ( ) ;
240
+ pin ! ( wait_cancel) ;
241
+
233
242
loop {
234
- //TODO: gracefully exit loop
235
- let ( n , from ) = match conn . recv_from ( & mut buf ) . await {
236
- Ok ( ( n , from ) ) => ( n , from ) ,
237
- Err ( err ) => {
238
- log:: debug!( "exiting read loop: {}" , err ) ;
243
+ let ( n , from ) = select ! {
244
+ biased ;
245
+
246
+ _ = & mut wait_cancel => {
247
+ log:: debug!( "exiting read loop" ) ;
239
248
break ;
249
+ } ,
250
+ result = conn. recv_from( & mut buf) => match result {
251
+ Ok ( ( n, from) ) => ( n, from) ,
252
+ Err ( err) => {
253
+ log:: debug!( "exiting read loop: {}" , err) ;
254
+ break ;
255
+ }
240
256
}
241
257
} ;
242
-
243
258
log:: debug!( "received {} bytes of udp from {}" , n, from) ;
244
259
245
- if let Err ( err) = ClientInternal :: handle_inbound (
246
- & read_ch_tx,
247
- & buf[ ..n] ,
248
- from,
249
- & stun_serv_str,
250
- & tr_map,
251
- & binding_mgr,
252
- )
253
- . await
254
- {
255
- log:: debug!( "exiting read loop: {}" , err) ;
256
- break ;
260
+ select ! {
261
+ biased;
262
+
263
+ _ = & mut wait_cancel => {
264
+ log:: debug!( "exiting read loop" ) ;
265
+ break ;
266
+ } ,
267
+ result = ClientInternal :: handle_inbound(
268
+ & read_ch_tx,
269
+ & buf[ ..n] ,
270
+ from,
271
+ & stun_serv_str,
272
+ & tr_map,
273
+ & binding_mgr,
274
+ ) => {
275
+ if let Err ( err) = result {
276
+ log:: debug!( "exiting read loop: {}" , err) ;
277
+ break ;
278
+ }
279
+ }
257
280
}
258
281
}
259
282
} ) ;
@@ -430,6 +453,7 @@ impl ClientInternal {
430
453
431
454
/// Closes this client.
432
455
async fn close ( & mut self ) {
456
+ self . close_notify . cancel ( ) ;
433
457
{
434
458
let mut read_ch_tx = self . read_ch_tx . lock ( ) . await ;
435
459
read_ch_tx. take ( ) ;
0 commit comments