Skip to content

Commit b53330c

Browse files
f - remove peers through timer_tick_occurred
1 parent abdb9f6 commit b53330c

File tree

1 file changed

+32
-54
lines changed

1 file changed

+32
-54
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 32 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -595,8 +595,6 @@ pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = C
595595
// | |
596596
// | |__`best_block`
597597
// | |
598-
// | |__`pending_peers_awaiting_removal`
599-
// | |
600598
// | |__`pending_events`
601599
// | |
602600
// | |__`pending_background_events`
@@ -764,16 +762,6 @@ where
764762

765763
/// See `ChannelManager` struct-level documentation for lock order requirements.
766764
pending_events: Mutex<Vec<events::Event>>,
767-
/// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
768-
/// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
769-
/// to that peer is later closed while still being disconnected (i.e. force closed), we
770-
/// therefore need to remove the peer from `peer_state` separately.
771-
/// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
772-
/// instead store such peers awaiting removal in this field, and remove them on a timer to
773-
/// limit the negative effects on parallelism as much as possible.
774-
///
775-
/// See `ChannelManager` struct-level documentation for lock order requirements.
776-
pending_peers_awaiting_removal: Mutex<HashSet<PublicKey>>,
777765
/// See `ChannelManager` struct-level documentation for lock order requirements.
778766
pending_background_events: Mutex<Vec<BackgroundEvent>>,
779767
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1304,11 +1292,10 @@ macro_rules! try_chan_entry {
13041292
}
13051293

13061294
macro_rules! remove_channel {
1307-
($self: expr, $entry: expr, $peer_state: expr) => {
1295+
($self: expr, $entry: expr) => {
13081296
{
13091297
let channel = $entry.remove_entry().1;
13101298
update_maps_on_chan_removal!($self, channel);
1311-
$self.add_pending_peer_to_be_removed(channel.get_counterparty_node_id(), $peer_state);
13121299
channel
13131300
}
13141301
}
@@ -1481,7 +1468,6 @@ where
14811468
per_peer_state: FairRwLock::new(HashMap::new()),
14821469

14831470
pending_events: Mutex::new(Vec::new()),
1484-
pending_peers_awaiting_removal: Mutex::new(HashSet::new()),
14851471
pending_background_events: Mutex::new(Vec::new()),
14861472
total_consistency_lock: RwLock::new(()),
14871473
persistence_notifier: Notifier::new(),
@@ -1720,7 +1706,7 @@ where
17201706
let (result, is_permanent) =
17211707
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
17221708
if is_permanent {
1723-
remove_channel!(self, chan_entry, peer_state);
1709+
remove_channel!(self, chan_entry);
17241710
break result;
17251711
}
17261712
}
@@ -1731,7 +1717,7 @@ where
17311717
});
17321718

17331719
if chan_entry.get().is_shutdown() {
1734-
let channel = remove_channel!(self, chan_entry, peer_state);
1720+
let channel = remove_channel!(self, chan_entry);
17351721
if let Ok(channel_update) = self.get_channel_update_for_broadcast(&channel) {
17361722
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
17371723
msg: channel_update
@@ -1834,7 +1820,7 @@ where
18341820
} else {
18351821
self.issue_channel_close_events(chan.get(),ClosureReason::HolderForceClosed);
18361822
}
1837-
remove_channel!(self, chan, peer_state)
1823+
remove_channel!(self, chan)
18381824
} else {
18391825
return Err(APIError::ChannelUnavailable{ err: format!("Channel with id {} not found for the passed counterparty node_id {}", log_bytes!(*channel_id), peer_node_id) });
18401826
}
@@ -1873,13 +1859,6 @@ where
18731859
}
18741860
}
18751861

1876-
fn add_pending_peer_to_be_removed(&self, counterparty_node_id: PublicKey, peer_state: &mut PeerState<<SP::Target as SignerProvider>::Signer>) {
1877-
let peer_should_be_removed = !peer_state.connected && peer_state.channel_by_id.len() == 0;
1878-
if peer_should_be_removed {
1879-
self.pending_peers_awaiting_removal.lock().unwrap().insert(counterparty_node_id);
1880-
}
1881-
}
1882-
18831862
/// Force closes a channel, immediately broadcasting the latest local transaction(s) and
18841863
/// rejecting new HTLCs on the given channel. Fails if `channel_id` is unknown to
18851864
/// the manager, or if the `counterparty_node_id` isn't the counterparty of the corresponding
@@ -3304,16 +3283,20 @@ where
33043283
true
33053284
}
33063285

3307-
/// Removes peers which have been been added to `pending_peers_awaiting_removal` which are
3308-
/// still disconnected and we have no channels to.
3286+
/// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
3287+
/// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
3288+
/// to that peer is later closed while still being disconnected (i.e. force closed), we
3289+
/// therefore need to remove the peer from `peer_state` separately.
3290+
/// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
3291+
/// instead remove such peers awaiting removal through this function, which is called on a
3292+
/// timer through `timer_tick_occurred`, passing the peers disconnected peers with no channels,
3293+
/// to limit the negative effects on parallelism as much as possible.
33093294
///
33103295
/// Must be called without the `per_peer_state` lock acquired.
3311-
fn remove_peers_awaiting_removal(&self) {
3312-
let mut pending_peers_awaiting_removal = HashSet::new();
3313-
mem::swap(&mut *self.pending_peers_awaiting_removal.lock().unwrap(), &mut pending_peers_awaiting_removal);
3296+
fn remove_peers_awaiting_removal(&self, pending_peers_awaiting_removal: HashSet<PublicKey>) {
33143297
if pending_peers_awaiting_removal.len() > 0 {
33153298
let mut per_peer_state = self.per_peer_state.write().unwrap();
3316-
for counterparty_node_id in pending_peers_awaiting_removal.drain() {
3299+
for counterparty_node_id in pending_peers_awaiting_removal {
33173300
match per_peer_state.entry(counterparty_node_id) {
33183301
hash_map::Entry::Occupied(entry) => {
33193302
// Remove the entry if the peer is still disconnected and we still
@@ -3392,6 +3375,7 @@ where
33923375
/// the channel.
33933376
/// * Expiring a channel's previous `ChannelConfig` if necessary to only allow forwarding HTLCs
33943377
/// with the current `ChannelConfig`.
3378+
/// * Removing peers which have disconnected but and no longer have any channels.
33953379
///
33963380
/// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate
33973381
/// estimate fetches.
@@ -3404,6 +3388,7 @@ where
34043388

34053389
let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
34063390
let mut timed_out_mpp_htlcs = Vec::new();
3391+
let mut pending_peers_awaiting_removal = HashSet::new();
34073392
{
34083393
let per_peer_state = self.per_peer_state.read().unwrap();
34093394
for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() {
@@ -3451,10 +3436,13 @@ where
34513436

34523437
true
34533438
});
3454-
self.add_pending_peer_to_be_removed(counterparty_node_id, peer_state);
3439+
let peer_should_be_removed = !peer_state.connected && peer_state.channel_by_id.len() == 0;
3440+
if peer_should_be_removed {
3441+
pending_peers_awaiting_removal.insert(counterparty_node_id);
3442+
}
34553443
}
34563444
}
3457-
self.remove_peers_awaiting_removal();
3445+
self.remove_peers_awaiting_removal(pending_peers_awaiting_removal);
34583446

34593447
self.claimable_payments.lock().unwrap().claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
34603448
if htlcs.is_empty() {
@@ -4190,7 +4178,7 @@ where
41904178
}
41914179
};
41924180
peer_state.pending_msg_events.push(send_msg_err_event);
4193-
let _ = remove_channel!(self, channel, peer_state);
4181+
let _ = remove_channel!(self, channel);
41944182
return Err(APIError::APIMisuseError { err: "Please use accept_inbound_channel_from_trusted_peer_0conf to accept channels with zero confirmations.".to_owned() });
41954183
}
41964184

@@ -4476,7 +4464,7 @@ where
44764464
let (result, is_permanent) =
44774465
handle_monitor_update_res!(self, update_res, chan_entry.get_mut(), RAACommitmentOrder::CommitmentFirst, chan_entry.key(), NO_UPDATE);
44784466
if is_permanent {
4479-
remove_channel!(self, chan_entry, peer_state);
4467+
remove_channel!(self, chan_entry);
44804468
break result;
44814469
}
44824470
}
@@ -4525,7 +4513,7 @@ where
45254513
// also implies there are no pending HTLCs left on the channel, so we can
45264514
// fully delete it from tracking (the channel monitor is still around to
45274515
// watch for old state broadcasts)!
4528-
(tx, Some(remove_channel!(self, chan_entry, peer_state)))
4516+
(tx, Some(remove_channel!(self, chan_entry)))
45294517
} else { (tx, None) }
45304518
},
45314519
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
@@ -5028,11 +5016,12 @@ where
50285016
if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) {
50295017
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
50305018
let peer_state = &mut *peer_state_lock;
5019+
let pending_msg_events = &mut peer_state.pending_msg_events;
50315020
if let hash_map::Entry::Occupied(chan_entry) = peer_state.channel_by_id.entry(funding_outpoint.to_channel_id()) {
5032-
let mut chan = remove_channel!(self, chan_entry, peer_state);
5021+
let mut chan = remove_channel!(self, chan_entry);
50335022
failed_channels.push(chan.force_shutdown(false));
50345023
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
5035-
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
5024+
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
50365025
msg: update
50375026
});
50385027
}
@@ -5042,7 +5031,7 @@ where
50425031
ClosureReason::CommitmentTxConfirmed
50435032
};
50445033
self.issue_channel_close_events(&chan, reason);
5045-
peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
5034+
pending_msg_events.push(events::MessageSendEvent::HandleError {
50465035
node_id: chan.get_counterparty_node_id(),
50475036
action: msgs::ErrorAction::SendErrorMessage {
50485037
msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
@@ -5084,7 +5073,7 @@ where
50845073
{
50855074
let per_peer_state = self.per_peer_state.read().unwrap();
50865075

5087-
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
5076+
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
50885077
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
50895078
let peer_state = &mut *peer_state_lock;
50905079
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -5124,7 +5113,6 @@ where
51245113
}
51255114
}
51265115
});
5127-
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
51285116
}
51295117
}
51305118

@@ -5149,7 +5137,7 @@ where
51495137
{
51505138
let per_peer_state = self.per_peer_state.read().unwrap();
51515139

5152-
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
5140+
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
51535141
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
51545142
let peer_state = &mut *peer_state_lock;
51555143
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -5187,7 +5175,6 @@ where
51875175
}
51885176
}
51895177
});
5190-
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
51915178
}
51925179
}
51935180

@@ -5751,7 +5738,7 @@ where
57515738
let mut timed_out_htlcs = Vec::new();
57525739
{
57535740
let per_peer_state = self.per_peer_state.read().unwrap();
5754-
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
5741+
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
57555742
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
57565743
let peer_state = &mut *peer_state_lock;
57575744
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -5835,7 +5822,6 @@ where
58355822
}
58365823
true
58375824
});
5838-
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
58395825
}
58405826
}
58415827

@@ -6161,7 +6147,7 @@ where
61616147

61626148
let per_peer_state = self.per_peer_state.read().unwrap();
61636149

6164-
for (cp_id, peer_state_mutex) in per_peer_state.iter() {
6150+
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
61656151
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
61666152
let peer_state = &mut *peer_state_lock;
61676153
let pending_msg_events = &mut peer_state.pending_msg_events;
@@ -6193,7 +6179,6 @@ where
61936179
}
61946180
retain
61956181
});
6196-
self.add_pending_peer_to_be_removed(*cp_id, peer_state);
61976182
}
61986183
//TODO: Also re-broadcast announcement_signatures
61996184
Ok(())
@@ -6707,8 +6692,6 @@ where
67076692

67086693
write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION);
67096694

6710-
self.remove_peers_awaiting_removal();
6711-
67126695
self.genesis_hash.write(writer)?;
67136696
{
67146697
let best_block = self.best_block.read().unwrap();
@@ -7530,7 +7513,6 @@ where
75307513
per_peer_state: FairRwLock::new(per_peer_state),
75317514

75327515
pending_events: Mutex::new(pending_events_read),
7533-
pending_peers_awaiting_removal: Mutex::new(HashSet::new()),
75347516
pending_background_events: Mutex::new(pending_background_events_read),
75357517
total_consistency_lock: RwLock::new(()),
75367518
persistence_notifier: Notifier::new(),
@@ -8019,9 +8001,6 @@ mod tests {
80198001
// Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
80208002
// disconnected and the channel between has been force closed.
80218003
let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
8022-
let nodes_0_pending_peers_awaiting_removal = nodes[0].node.pending_peers_awaiting_removal.lock().unwrap();
8023-
assert_eq!(nodes_0_pending_peers_awaiting_removal.len(), 1);
8024-
assert!(nodes_0_pending_peers_awaiting_removal.get(&nodes[1].node.get_our_node_id()).is_some());
80258004
// Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
80268005
assert_eq!(nodes_0_per_peer_state.len(), 1);
80278006
assert!(nodes_0_per_peer_state.get(&nodes[1].node.get_our_node_id()).is_some());
@@ -8032,7 +8011,6 @@ mod tests {
80328011
{
80338012
// Assert that nodes[1] has now been removed.
80348013
assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 0);
8035-
assert_eq!(nodes[0].node.pending_peers_awaiting_removal.lock().unwrap().len(), 0);
80368014
}
80378015
}
80388016

0 commit comments

Comments
 (0)