@@ -127,8 +127,8 @@ pub use builder::NodeBuilder as Builder;
127
127
use chain:: ChainSource ;
128
128
use config:: {
129
129
default_user_config, may_announce_channel, ChannelConfig , Config ,
130
- LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS , NODE_ANN_BCAST_INTERVAL , PEER_RECONNECTION_INTERVAL ,
131
- RGS_SYNC_INTERVAL ,
130
+ BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS , LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS ,
131
+ NODE_ANN_BCAST_INTERVAL , PEER_RECONNECTION_INTERVAL , RGS_SYNC_INTERVAL ,
132
132
} ;
133
133
use connection:: ConnectionManager ;
134
134
use event:: { EventHandler , EventQueue } ;
@@ -179,6 +179,8 @@ pub struct Node {
179
179
runtime : Arc < RwLock < Option < Arc < tokio:: runtime:: Runtime > > > > ,
180
180
stop_sender : tokio:: sync:: watch:: Sender < ( ) > ,
181
181
background_processor_task : Mutex < Option < tokio:: task:: JoinHandle < ( ) > > > ,
182
+ background_tasks : Mutex < Option < tokio:: task:: JoinSet < ( ) > > > ,
183
+ cancellable_background_tasks : Mutex < Option < tokio:: task:: JoinSet < ( ) > > > ,
182
184
config : Arc < Config > ,
183
185
wallet : Arc < Wallet > ,
184
186
chain_source : Arc < ChainSource > ,
@@ -232,6 +234,10 @@ impl Node {
232
234
return Err ( Error :: AlreadyRunning ) ;
233
235
}
234
236
237
+ let mut background_tasks = tokio:: task:: JoinSet :: new ( ) ;
238
+ let mut cancellable_background_tasks = tokio:: task:: JoinSet :: new ( ) ;
239
+ let runtime_handle = runtime. handle ( ) ;
240
+
235
241
log_info ! (
236
242
self . logger,
237
243
"Starting up LDK Node with node ID {} on network: {}" ,
@@ -258,19 +264,27 @@ impl Node {
258
264
let sync_cman = Arc :: clone ( & self . channel_manager ) ;
259
265
let sync_cmon = Arc :: clone ( & self . chain_monitor ) ;
260
266
let sync_sweeper = Arc :: clone ( & self . output_sweeper ) ;
261
- runtime. spawn ( async move {
262
- chain_source
263
- . continuously_sync_wallets ( stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper)
264
- . await ;
265
- } ) ;
267
+ background_tasks. spawn_on (
268
+ async move {
269
+ chain_source
270
+ . continuously_sync_wallets (
271
+ stop_sync_receiver,
272
+ sync_cman,
273
+ sync_cmon,
274
+ sync_sweeper,
275
+ )
276
+ . await ;
277
+ } ,
278
+ runtime_handle,
279
+ ) ;
266
280
267
281
if self . gossip_source . is_rgs ( ) {
268
282
let gossip_source = Arc :: clone ( & self . gossip_source ) ;
269
283
let gossip_sync_store = Arc :: clone ( & self . kv_store ) ;
270
284
let gossip_sync_logger = Arc :: clone ( & self . logger ) ;
271
285
let gossip_node_metrics = Arc :: clone ( & self . node_metrics ) ;
272
286
let mut stop_gossip_sync = self . stop_sender . subscribe ( ) ;
273
- runtime . spawn ( async move {
287
+ cancellable_background_tasks . spawn_on ( async move {
274
288
let mut interval = tokio:: time:: interval ( RGS_SYNC_INTERVAL ) ;
275
289
loop {
276
290
tokio:: select! {
@@ -311,7 +325,7 @@ impl Node {
311
325
}
312
326
}
313
327
}
314
- } ) ;
328
+ } , runtime_handle ) ;
315
329
}
316
330
317
331
if let Some ( listening_addresses) = & self . config . listening_addresses {
@@ -337,7 +351,7 @@ impl Node {
337
351
bind_addrs. extend ( resolved_address) ;
338
352
}
339
353
340
- runtime . spawn ( async move {
354
+ cancellable_background_tasks . spawn_on ( async move {
341
355
{
342
356
let listener =
343
357
tokio:: net:: TcpListener :: bind ( & * bind_addrs) . await
@@ -356,7 +370,7 @@ impl Node {
356
370
_ = stop_listen. changed( ) => {
357
371
log_debug!(
358
372
listening_logger,
359
- "Stopping listening to inbound connections." ,
373
+ "Stopping listening to inbound connections."
360
374
) ;
361
375
break ;
362
376
}
@@ -375,7 +389,7 @@ impl Node {
375
389
}
376
390
377
391
listening_indicator. store ( false , Ordering :: Release ) ;
378
- } ) ;
392
+ } , runtime_handle ) ;
379
393
}
380
394
381
395
// Regularly reconnect to persisted peers.
@@ -384,15 +398,15 @@ impl Node {
384
398
let connect_logger = Arc :: clone ( & self . logger ) ;
385
399
let connect_peer_store = Arc :: clone ( & self . peer_store ) ;
386
400
let mut stop_connect = self . stop_sender . subscribe ( ) ;
387
- runtime . spawn ( async move {
401
+ cancellable_background_tasks . spawn_on ( async move {
388
402
let mut interval = tokio:: time:: interval ( PEER_RECONNECTION_INTERVAL ) ;
389
403
interval. set_missed_tick_behavior ( tokio:: time:: MissedTickBehavior :: Skip ) ;
390
404
loop {
391
405
tokio:: select! {
392
406
_ = stop_connect. changed( ) => {
393
407
log_debug!(
394
408
connect_logger,
395
- "Stopping reconnecting known peers." ,
409
+ "Stopping reconnecting known peers."
396
410
) ;
397
411
return ;
398
412
}
@@ -412,7 +426,7 @@ impl Node {
412
426
}
413
427
}
414
428
}
415
- } ) ;
429
+ } , runtime_handle ) ;
416
430
417
431
// Regularly broadcast node announcements.
418
432
let bcast_cm = Arc :: clone ( & self . channel_manager ) ;
@@ -424,7 +438,7 @@ impl Node {
424
438
let mut stop_bcast = self . stop_sender . subscribe ( ) ;
425
439
let node_alias = self . config . node_alias . clone ( ) ;
426
440
if may_announce_channel ( & self . config ) . is_ok ( ) {
427
- runtime . spawn ( async move {
441
+ cancellable_background_tasks . spawn_on ( async move {
428
442
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
429
443
#[ cfg( not( test) ) ]
430
444
let mut interval = tokio:: time:: interval ( Duration :: from_secs ( 30 ) ) ;
@@ -495,14 +509,15 @@ impl Node {
495
509
}
496
510
}
497
511
}
498
- } ) ;
512
+ } , runtime_handle ) ;
499
513
}
500
514
501
515
let stop_tx_bcast = self . stop_sender . subscribe ( ) ;
502
516
let chain_source = Arc :: clone ( & self . chain_source ) ;
503
- runtime. spawn ( async move {
504
- chain_source. continuously_process_broadcast_queue ( stop_tx_bcast) . await
505
- } ) ;
517
+ cancellable_background_tasks. spawn_on (
518
+ async move { chain_source. continuously_process_broadcast_queue ( stop_tx_bcast) . await } ,
519
+ runtime_handle,
520
+ ) ;
506
521
507
522
let bump_tx_event_handler = Arc :: new ( BumpTransactionEventHandler :: new (
508
523
Arc :: clone ( & self . tx_broadcaster ) ,
@@ -587,24 +602,33 @@ impl Node {
587
602
let mut stop_liquidity_handler = self . stop_sender . subscribe ( ) ;
588
603
let liquidity_handler = Arc :: clone ( & liquidity_source) ;
589
604
let liquidity_logger = Arc :: clone ( & self . logger ) ;
590
- runtime. spawn ( async move {
591
- loop {
592
- tokio:: select! {
593
- _ = stop_liquidity_handler. changed( ) => {
594
- log_debug!(
595
- liquidity_logger,
596
- "Stopping processing liquidity events." ,
597
- ) ;
598
- return ;
605
+ background_tasks. spawn_on (
606
+ async move {
607
+ loop {
608
+ tokio:: select! {
609
+ _ = stop_liquidity_handler. changed( ) => {
610
+ log_debug!(
611
+ liquidity_logger,
612
+ "Stopping processing liquidity events." ,
613
+ ) ;
614
+ return ;
615
+ }
616
+ _ = liquidity_handler. handle_next_event( ) => { }
599
617
}
600
- _ = liquidity_handler. handle_next_event( ) => { }
601
618
}
602
- }
603
- } ) ;
619
+ } ,
620
+ runtime_handle,
621
+ ) ;
604
622
}
605
623
606
624
* runtime_lock = Some ( runtime) ;
607
625
626
+ debug_assert ! ( self . background_tasks. lock( ) . unwrap( ) . is_none( ) ) ;
627
+ * self . background_tasks . lock ( ) . unwrap ( ) = Some ( background_tasks) ;
628
+
629
+ debug_assert ! ( self . cancellable_background_tasks. lock( ) . unwrap( ) . is_none( ) ) ;
630
+ * self . cancellable_background_tasks . lock ( ) . unwrap ( ) = Some ( cancellable_background_tasks) ;
631
+
608
632
log_info ! ( self . logger, "Startup complete." ) ;
609
633
Ok ( ( ) )
610
634
}
@@ -643,6 +667,53 @@ impl Node {
643
667
self . chain_source . stop ( ) ;
644
668
log_debug ! ( self . logger, "Stopped chain sources." ) ;
645
669
670
+ // Cancel cancellable background tasks
671
+ if let Some ( mut tasks) = self . cancellable_background_tasks . lock ( ) . unwrap ( ) . take ( ) {
672
+ tasks. abort_all ( ) ;
673
+ } else {
674
+ debug_assert ! ( false , "Expected some cancellable background tasks" ) ;
675
+ } ;
676
+
677
+ // Wait until non-cancellable background tasks (mod LDK's background processor) are done.
678
+ let runtime_2 = Arc :: clone ( & runtime) ;
679
+ if let Some ( mut tasks) = self . background_tasks . lock ( ) . unwrap ( ) . take ( ) {
680
+ tokio:: task:: block_in_place ( move || {
681
+ runtime_2. block_on ( async {
682
+ loop {
683
+ let timeout_fut = tokio:: time:: timeout (
684
+ Duration :: from_secs ( BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS ) ,
685
+ tasks. join_next_with_id ( ) ,
686
+ ) ;
687
+ match timeout_fut. await {
688
+ Ok ( Some ( Ok ( ( id, _) ) ) ) => {
689
+ log_trace ! ( self . logger, "Stopped background task with id {}" , id) ;
690
+ } ,
691
+ Ok ( Some ( Err ( e) ) ) => {
692
+ tasks. abort_all ( ) ;
693
+ log_trace ! ( self . logger, "Stopping background task failed: {}" , e) ;
694
+ break ;
695
+ } ,
696
+ Ok ( None ) => {
697
+ log_debug ! ( self . logger, "Stopped all background tasks" ) ;
698
+ break ;
699
+ } ,
700
+ Err ( e) => {
701
+ tasks. abort_all ( ) ;
702
+ log_error ! (
703
+ self . logger,
704
+ "Stopping background task timed out: {}" ,
705
+ e
706
+ ) ;
707
+ break ;
708
+ } ,
709
+ }
710
+ }
711
+ } )
712
+ } ) ;
713
+ } else {
714
+ debug_assert ! ( false , "Expected some background tasks" ) ;
715
+ } ;
716
+
646
717
// Wait until background processing stopped, at least until a timeout is reached.
647
718
if let Some ( background_processor_task) =
648
719
self . background_processor_task . lock ( ) . unwrap ( ) . take ( )
@@ -676,7 +747,9 @@ impl Node {
676
747
log_error ! ( self . logger, "Stopping event handling timed out: {}" , e) ;
677
748
} ,
678
749
}
679
- }
750
+ } else {
751
+ debug_assert ! ( false , "Expected a background processing task" ) ;
752
+ } ;
680
753
681
754
#[ cfg( tokio_unstable) ]
682
755
{
0 commit comments