@@ -464,6 +464,8 @@ pub(super) struct PeerState<Signer: Sign> {
464
464
/// Messages to send to the peer - pushed to in the same lock that they are generated in (except
465
465
/// for broadcast messages, where ordering isn't as strict).
466
466
pub ( super ) pending_msg_events : Vec < MessageSendEvent > ,
467
+ /// Represents wether we're connected to the node or not.
468
+ connected : bool ,
467
469
}
468
470
469
471
/// Stores a PaymentSecret and any other data we may need to validate an inbound payment is
@@ -593,6 +595,8 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
593
595
// | |
594
596
// | |__`best_block`
595
597
// | |
598
+ // | |__`pending_peers_awaiting_removal`
599
+ // | |
596
600
// | |__`pending_events`
597
601
// | |
598
602
// | |__`pending_background_events`
@@ -760,6 +764,16 @@ where
760
764
761
765
/// See `ChannelManager` struct-level documentation for lock order requirements.
762
766
pending_events : Mutex < Vec < events:: Event > > ,
767
+ /// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
768
+ /// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
769
+ /// to that peer is later closed while still being disconnected (i.e. force closed), we
770
+ /// therefore need to remove the peer from `peer_state` separately.
771
+ /// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
772
+ /// instead store such peers awaiting removal in this field, and remove them on a timer to
773
+ /// limit the negative effects on parallelism as much as possible.
774
+ ///
775
+ /// See `ChannelManager` struct-level documentation for lock order requirements.
776
+ pending_peers_awaiting_removal : Mutex < HashSet < PublicKey > > ,
763
777
/// See `ChannelManager` struct-level documentation for lock order requirements.
764
778
pending_background_events : Mutex < Vec < BackgroundEvent > > ,
765
779
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1290,10 +1304,11 @@ macro_rules! try_chan_entry {
1290
1304
}
1291
1305
1292
1306
macro_rules! remove_channel {
1293
- ( $self: expr, $entry: expr) => {
1307
+ ( $self: expr, $entry: expr, $peer_state : expr ) => {
1294
1308
{
1295
1309
let channel = $entry. remove_entry( ) . 1 ;
1296
1310
update_maps_on_chan_removal!( $self, channel) ;
1311
+ $self. add_pending_peer_to_be_removed( channel. get_counterparty_node_id( ) , $peer_state) ;
1297
1312
channel
1298
1313
}
1299
1314
}
@@ -1466,6 +1481,7 @@ where
1466
1481
per_peer_state : FairRwLock :: new ( HashMap :: new ( ) ) ,
1467
1482
1468
1483
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
1484
+ pending_peers_awaiting_removal : Mutex :: new ( HashSet :: new ( ) ) ,
1469
1485
pending_background_events : Mutex :: new ( Vec :: new ( ) ) ,
1470
1486
total_consistency_lock : RwLock :: new ( ( ) ) ,
1471
1487
persistence_notifier : Notifier :: new ( ) ,
@@ -1704,7 +1720,7 @@ where
1704
1720
let ( result, is_permanent) =
1705
1721
handle_monitor_update_res ! ( self , update_res, chan_entry. get_mut( ) , RAACommitmentOrder :: CommitmentFirst , chan_entry. key( ) , NO_UPDATE ) ;
1706
1722
if is_permanent {
1707
- remove_channel ! ( self , chan_entry) ;
1723
+ remove_channel ! ( self , chan_entry, peer_state ) ;
1708
1724
break result;
1709
1725
}
1710
1726
}
@@ -1715,7 +1731,7 @@ where
1715
1731
} ) ;
1716
1732
1717
1733
if chan_entry. get ( ) . is_shutdown ( ) {
1718
- let channel = remove_channel ! ( self , chan_entry) ;
1734
+ let channel = remove_channel ! ( self , chan_entry, peer_state ) ;
1719
1735
if let Ok ( channel_update) = self . get_channel_update_for_broadcast ( & channel) {
1720
1736
peer_state. pending_msg_events . push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
1721
1737
msg : channel_update
@@ -1818,7 +1834,7 @@ where
1818
1834
} else {
1819
1835
self . issue_channel_close_events ( chan. get ( ) , ClosureReason :: HolderForceClosed ) ;
1820
1836
}
1821
- remove_channel ! ( self , chan)
1837
+ remove_channel ! ( self , chan, peer_state )
1822
1838
} else {
1823
1839
return Err ( APIError :: ChannelUnavailable { err : format ! ( "Channel with id {} not found for the passed counterparty node_id {}" , log_bytes!( * channel_id) , peer_node_id) } ) ;
1824
1840
}
@@ -1857,6 +1873,13 @@ where
1857
1873
}
1858
1874
}
1859
1875
1876
+ fn add_pending_peer_to_be_removed ( & self , counterparty_node_id : PublicKey , peer_state : & mut PeerState < <SP :: Target as SignerProvider >:: Signer > ) {
1877
+ let peer_should_be_removed = !peer_state. connected && peer_state. channel_by_id . len ( ) == 0 ;
1878
+ if peer_should_be_removed {
1879
+ self . pending_peers_awaiting_removal . lock ( ) . unwrap ( ) . insert ( counterparty_node_id) ;
1880
+ }
1881
+ }
1882
+
1860
1883
/// Force closes a channel, immediately broadcasting the latest local transaction(s) and
1861
1884
/// rejecting new HTLCs on the given channel. Fails if `channel_id` is unknown to
1862
1885
/// the manager, or if the `counterparty_node_id` isn't the counterparty of the corresponding
@@ -3281,6 +3304,34 @@ where
3281
3304
true
3282
3305
}
3283
3306
3307
+ /// Removes peers which have been been added to `pending_peers_awaiting_removal` which are
3308
+ /// still disconnected and we have no channels to.
3309
+ ///
3310
+ /// Must be called without the `per_peer_state` lock acquired.
3311
+ fn remove_peers_awaiting_removal ( & self ) {
3312
+ let mut pending_peers_awaiting_removal = HashSet :: new ( ) ;
3313
+ mem:: swap ( & mut * self . pending_peers_awaiting_removal . lock ( ) . unwrap ( ) , & mut pending_peers_awaiting_removal) ;
3314
+ if pending_peers_awaiting_removal. len ( ) > 0 {
3315
+ let mut per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
3316
+ for counterparty_node_id in pending_peers_awaiting_removal. drain ( ) {
3317
+ match per_peer_state. entry ( counterparty_node_id) {
3318
+ hash_map:: Entry :: Occupied ( entry) => {
3319
+ // Remove the entry if the peer is still disconnected and we still
3320
+ // have no channels to the peer.
3321
+ let remove_entry = {
3322
+ let peer_state = entry. get ( ) . lock ( ) . unwrap ( ) ;
3323
+ !peer_state. connected && peer_state. channel_by_id . len ( ) == 0
3324
+ } ;
3325
+ if remove_entry {
3326
+ entry. remove_entry ( ) ;
3327
+ }
3328
+ } ,
3329
+ hash_map:: Entry :: Vacant ( _) => { /* The PeerState has already been removed */ }
3330
+ }
3331
+ }
3332
+ }
3333
+ }
3334
+
3284
3335
#[ cfg( any( test, feature = "_test_utils" ) ) ]
3285
3336
/// Process background events, for functional testing
3286
3337
pub fn test_process_background_events ( & self ) {
@@ -3359,13 +3410,14 @@ where
3359
3410
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
3360
3411
let peer_state = & mut * peer_state_lock;
3361
3412
let pending_msg_events = & mut peer_state. pending_msg_events ;
3413
+ let counterparty_node_id = * counterparty_node_id;
3362
3414
peer_state. channel_by_id . retain ( |chan_id, chan| {
3363
3415
let chan_needs_persist = self . update_channel_fee ( chan_id, chan, new_feerate) ;
3364
3416
if chan_needs_persist == NotifyOption :: DoPersist { should_persist = NotifyOption :: DoPersist ; }
3365
3417
3366
3418
if let Err ( e) = chan. timer_check_closing_negotiation_progress ( ) {
3367
3419
let ( needs_close, err) = convert_chan_err ! ( self , e, chan, chan_id) ;
3368
- handle_errors. push ( ( Err ( err) , * counterparty_node_id) ) ;
3420
+ handle_errors. push ( ( Err ( err) , counterparty_node_id) ) ;
3369
3421
if needs_close { return false ; }
3370
3422
}
3371
3423
@@ -3399,8 +3451,10 @@ where
3399
3451
3400
3452
true
3401
3453
} ) ;
3454
+ self . add_pending_peer_to_be_removed ( counterparty_node_id, peer_state) ;
3402
3455
}
3403
3456
}
3457
+ self . remove_peers_awaiting_removal ( ) ;
3404
3458
3405
3459
self . claimable_payments . lock ( ) . unwrap ( ) . claimable_htlcs . retain ( |payment_hash, ( _, htlcs) | {
3406
3460
if htlcs. is_empty ( ) {
@@ -4136,7 +4190,7 @@ where
4136
4190
}
4137
4191
} ;
4138
4192
peer_state. pending_msg_events . push ( send_msg_err_event) ;
4139
- let _ = remove_channel ! ( self , channel) ;
4193
+ let _ = remove_channel ! ( self , channel, peer_state ) ;
4140
4194
return Err ( APIError :: APIMisuseError { err : "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations." . to_owned ( ) } ) ;
4141
4195
}
4142
4196
@@ -4422,7 +4476,7 @@ where
4422
4476
let ( result, is_permanent) =
4423
4477
handle_monitor_update_res ! ( self , update_res, chan_entry. get_mut( ) , RAACommitmentOrder :: CommitmentFirst , chan_entry. key( ) , NO_UPDATE ) ;
4424
4478
if is_permanent {
4425
- remove_channel ! ( self , chan_entry) ;
4479
+ remove_channel ! ( self , chan_entry, peer_state ) ;
4426
4480
break result;
4427
4481
}
4428
4482
}
@@ -4471,7 +4525,7 @@ where
4471
4525
// also implies there are no pending HTLCs left on the channel, so we can
4472
4526
// fully delete it from tracking (the channel monitor is still around to
4473
4527
// watch for old state broadcasts)!
4474
- ( tx, Some ( remove_channel ! ( self , chan_entry) ) )
4528
+ ( tx, Some ( remove_channel ! ( self , chan_entry, peer_state ) ) )
4475
4529
} else { ( tx, None ) }
4476
4530
} ,
4477
4531
hash_map:: Entry :: Vacant ( _) => return Err ( MsgHandleErrInternal :: send_err_msg_no_close ( format ! ( "Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}" , counterparty_node_id) , msg. channel_id ) )
@@ -4974,12 +5028,11 @@ where
4974
5028
if let Some ( peer_state_mutex) = per_peer_state. get ( & counterparty_node_id) {
4975
5029
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
4976
5030
let peer_state = & mut * peer_state_lock;
4977
- let pending_msg_events = & mut peer_state. pending_msg_events ;
4978
5031
if let hash_map:: Entry :: Occupied ( chan_entry) = peer_state. channel_by_id . entry ( funding_outpoint. to_channel_id ( ) ) {
4979
- let mut chan = remove_channel ! ( self , chan_entry) ;
5032
+ let mut chan = remove_channel ! ( self , chan_entry, peer_state ) ;
4980
5033
failed_channels. push ( chan. force_shutdown ( false ) ) ;
4981
5034
if let Ok ( update) = self . get_channel_update_for_broadcast ( & chan) {
4982
- pending_msg_events. push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
5035
+ peer_state . pending_msg_events . push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
4983
5036
msg : update
4984
5037
} ) ;
4985
5038
}
@@ -4989,7 +5042,7 @@ where
4989
5042
ClosureReason :: CommitmentTxConfirmed
4990
5043
} ;
4991
5044
self . issue_channel_close_events ( & chan, reason) ;
4992
- pending_msg_events. push ( events:: MessageSendEvent :: HandleError {
5045
+ peer_state . pending_msg_events . push ( events:: MessageSendEvent :: HandleError {
4993
5046
node_id : chan. get_counterparty_node_id ( ) ,
4994
5047
action : msgs:: ErrorAction :: SendErrorMessage {
4995
5048
msg : msgs:: ErrorMessage { channel_id : chan. channel_id ( ) , data : "Channel force-closed" . to_owned ( ) }
@@ -5031,7 +5084,7 @@ where
5031
5084
{
5032
5085
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5033
5086
5034
- for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5087
+ for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5035
5088
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5036
5089
let peer_state = & mut * peer_state_lock;
5037
5090
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -5071,6 +5124,7 @@ where
5071
5124
}
5072
5125
}
5073
5126
} ) ;
5127
+ self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
5074
5128
}
5075
5129
}
5076
5130
@@ -5095,7 +5149,7 @@ where
5095
5149
{
5096
5150
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5097
5151
5098
- for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5152
+ for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5099
5153
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5100
5154
let peer_state = & mut * peer_state_lock;
5101
5155
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -5133,6 +5187,7 @@ where
5133
5187
}
5134
5188
}
5135
5189
} ) ;
5190
+ self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
5136
5191
}
5137
5192
}
5138
5193
@@ -5696,7 +5751,7 @@ where
5696
5751
let mut timed_out_htlcs = Vec :: new ( ) ;
5697
5752
{
5698
5753
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5699
- for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5754
+ for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
5700
5755
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
5701
5756
let peer_state = & mut * peer_state_lock;
5702
5757
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -5780,6 +5835,7 @@ where
5780
5835
}
5781
5836
true
5782
5837
} ) ;
5838
+ self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
5783
5839
}
5784
5840
}
5785
5841
@@ -6026,6 +6082,7 @@ where
6026
6082
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
6027
6083
let peer_state = & mut * peer_state_lock;
6028
6084
let pending_msg_events = & mut peer_state. pending_msg_events ;
6085
+ peer_state. connected = false ;
6029
6086
peer_state. channel_by_id . retain ( |_, chan| {
6030
6087
chan. remove_uncommitted_htlcs_and_mark_paused ( & self . logger ) ;
6031
6088
if chan. is_shutdown ( ) {
@@ -6091,17 +6148,20 @@ where
6091
6148
channel_by_id : HashMap :: new ( ) ,
6092
6149
latest_features : init_msg. features . clone ( ) ,
6093
6150
pending_msg_events : Vec :: new ( ) ,
6151
+ connected : true ,
6094
6152
} ) ) ;
6095
6153
} ,
6096
6154
hash_map:: Entry :: Occupied ( e) => {
6097
- e. get ( ) . lock ( ) . unwrap ( ) . latest_features = init_msg. features . clone ( ) ;
6155
+ let mut peer_state = e. get ( ) . lock ( ) . unwrap ( ) ;
6156
+ peer_state. latest_features = init_msg. features . clone ( ) ;
6157
+ peer_state. connected = true ;
6098
6158
} ,
6099
6159
}
6100
6160
}
6101
6161
6102
6162
let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
6103
6163
6104
- for ( _cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
6164
+ for ( cp_id , peer_state_mutex) in per_peer_state. iter ( ) {
6105
6165
let mut peer_state_lock = peer_state_mutex. lock ( ) . unwrap ( ) ;
6106
6166
let peer_state = & mut * peer_state_lock;
6107
6167
let pending_msg_events = & mut peer_state. pending_msg_events ;
@@ -6133,6 +6193,7 @@ where
6133
6193
}
6134
6194
retain
6135
6195
} ) ;
6196
+ self . add_pending_peer_to_be_removed ( * cp_id, peer_state) ;
6136
6197
}
6137
6198
//TODO: Also re-broadcast announcement_signatures
6138
6199
Ok ( ( ) )
@@ -6646,6 +6707,8 @@ where
6646
6707
6647
6708
write_ver_prefix ! ( writer, SERIALIZATION_VERSION , MIN_SERIALIZATION_VERSION ) ;
6648
6709
6710
+ self . remove_peers_awaiting_removal ( ) ;
6711
+
6649
6712
self . genesis_hash . write ( writer) ?;
6650
6713
{
6651
6714
let best_block = self . best_block . read ( ) . unwrap ( ) ;
@@ -7113,6 +7176,7 @@ where
7113
7176
channel_by_id : peer_channels. remove ( & peer_pubkey) . unwrap_or ( HashMap :: new ( ) ) ,
7114
7177
latest_features : Readable :: read ( reader) ?,
7115
7178
pending_msg_events : Vec :: new ( ) ,
7179
+ connected : false ,
7116
7180
} ;
7117
7181
per_peer_state. insert ( peer_pubkey, Mutex :: new ( peer_state) ) ;
7118
7182
}
@@ -7466,6 +7530,7 @@ where
7466
7530
per_peer_state : FairRwLock :: new ( per_peer_state) ,
7467
7531
7468
7532
pending_events : Mutex :: new ( pending_events_read) ,
7533
+ pending_peers_awaiting_removal : Mutex :: new ( HashSet :: new ( ) ) ,
7469
7534
pending_background_events : Mutex :: new ( pending_background_events_read) ,
7470
7535
total_consistency_lock : RwLock :: new ( ( ) ) ,
7471
7536
persistence_notifier : Notifier :: new ( ) ,
@@ -7933,6 +7998,44 @@ mod tests {
7933
7998
}
7934
7999
}
7935
8000
8001
+ #[ test]
8002
+ fn test_drop_disconnected_peers_when_removing_channels ( ) {
8003
+ let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
8004
+ let node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
8005
+ let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
8006
+ let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
8007
+
8008
+ let chan = create_announced_chan_between_nodes ( & nodes, 0 , 1 ) ;
8009
+
8010
+ nodes[ 0 ] . node . peer_disconnected ( & nodes[ 1 ] . node . get_our_node_id ( ) , false ) ;
8011
+ nodes[ 1 ] . node . peer_disconnected ( & nodes[ 0 ] . node . get_our_node_id ( ) , false ) ;
8012
+
8013
+ nodes[ 0 ] . node . force_close_broadcasting_latest_txn ( & chan. 2 , & nodes[ 1 ] . node . get_our_node_id ( ) ) . unwrap ( ) ;
8014
+ check_closed_broadcast ! ( nodes[ 0 ] , true ) ;
8015
+ check_added_monitors ! ( nodes[ 0 ] , 1 ) ;
8016
+ check_closed_event ! ( nodes[ 0 ] , 1 , ClosureReason :: HolderForceClosed ) ;
8017
+
8018
+ {
8019
+ // Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
8020
+ // disconnected and the channel between has been force closed.
8021
+ let nodes_0_per_peer_state = nodes[ 0 ] . node . per_peer_state . read ( ) . unwrap ( ) ;
8022
+ let nodes_0_pending_peers_awaiting_removal = nodes[ 0 ] . node . pending_peers_awaiting_removal . lock ( ) . unwrap ( ) ;
8023
+ assert_eq ! ( nodes_0_pending_peers_awaiting_removal. len( ) , 1 ) ;
8024
+ assert ! ( nodes_0_pending_peers_awaiting_removal. get( & nodes[ 1 ] . node. get_our_node_id( ) ) . is_some( ) ) ;
8025
+ // Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
8026
+ assert_eq ! ( nodes_0_per_peer_state. len( ) , 1 ) ;
8027
+ assert ! ( nodes_0_per_peer_state. get( & nodes[ 1 ] . node. get_our_node_id( ) ) . is_some( ) ) ;
8028
+ }
8029
+
8030
+ nodes[ 0 ] . node . timer_tick_occurred ( ) ;
8031
+
8032
+ {
8033
+ // Assert that nodes[1] has now been removed.
8034
+ assert_eq ! ( nodes[ 0 ] . node. per_peer_state. read( ) . unwrap( ) . len( ) , 0 ) ;
8035
+ assert_eq ! ( nodes[ 0 ] . node. pending_peers_awaiting_removal. lock( ) . unwrap( ) . len( ) , 0 ) ;
8036
+ }
8037
+ }
8038
+
7936
8039
#[ test]
7937
8040
fn bad_inbound_payment_hash ( ) {
7938
8041
// Add coverage for checking that a user-provided payment hash matches the payment secret.
0 commit comments