Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 65 additions & 56 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2064,64 +2064,66 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
{
let peers_lock = self.peers.read().unwrap();

let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
let chan_events = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
let route_events = self.message_handler.route_handler.get_and_clear_pending_msg_events();

let peers = &*peers_lock;
macro_rules! get_peer_for_forwarding {
($node_id: expr) => {
{
if peers_to_disconnect.get($node_id).is_some() {
// If we've "disconnected" this peer, do not send to it.
continue;
}
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
match descriptor_opt {
Some(descriptor) => match peers.get(&descriptor) {
Some(peer_mutex) => {
let peer_lock = peer_mutex.lock().unwrap();
if !peer_lock.handshake_complete() {
continue;
None
} else {
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
match descriptor_opt {
Some(descriptor) => match peers.get(&descriptor) {
Some(peer_mutex) => {
let peer_lock = peer_mutex.lock().unwrap();
if !peer_lock.handshake_complete() {
None
} else {
Some(peer_lock)
}
},
None => {
debug_assert!(false, "Inconsistent peers set state!");
None
}
peer_lock
},
None => {
debug_assert!(false, "Inconsistent peers set state!");
continue;
}
},
None => {
continue;
},
None
},
}
}
}
}
}
for event in events_generated.drain(..) {
let mut handle_event = |event, from_chan_handler| {
match event {
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.common_fields.temporary_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
Expand All @@ -2130,107 +2132,107 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index));
// TODO: If the peer is gone we should generate a DiscardFunding event
// indicating to the wallet that they should just throw away this funding transaction
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendStfu { ref node_id, ref msg} => {
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendSpliceInit { ref node_id, ref msg} => {
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => {
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => {
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
Expand All @@ -2239,7 +2241,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
update_fulfill_htlcs.len(),
update_fail_htlcs.len(),
&commitment_signed.channel_id);
let mut peer = get_peer_for_forwarding!(node_id);
let mut peer = get_peer_for_forwarding!(node_id)?;
for msg in update_add_htlcs {
self.enqueue_message(&mut *peer, msg);
}
Expand All @@ -2261,32 +2263,32 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
log_pubkey!(node_id),
msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, update_msg);
},
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
Expand Down Expand Up @@ -2322,7 +2324,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
log_pubkey!(node_id), msg.contents.short_channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::HandleError { node_id, action } => {
let logger = WithContext::from(&self.logger, Some(node_id), None, None);
Expand Down Expand Up @@ -2360,21 +2362,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
},
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
log_pubkey!(node_id),
msg.data);
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
},
}
},
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
},
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
Expand All @@ -2383,17 +2385,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
msg.first_blocknum,
msg.number_of_blocks,
msg.sync_complete);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
}
}
Some(())
};
for event in chan_events {
handle_event(event, true);
}
for event in route_events {
handle_event(event, false);
}

for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() {
if peers_to_disconnect.get(&node_id).is_some() { continue; }
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
self.enqueue_message(&mut *if let Some(peer) = get_peer_for_forwarding!(&node_id) { peer } else { continue; }, &msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really worth voiding the approvals for, but does this have to all be in one line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rustfmt will figure it out eventually 🤷‍♂️

}

for (descriptor, peer_mutex) in peers.iter() {
Expand Down