@@ -1380,6 +1380,8 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
13801380 /// [`ChannelMessageHandler::peer_connected`] and no corresponding
13811381 /// [`ChannelMessageHandler::peer_disconnected`].
13821382 pub is_connected: bool,
1383+ /// Holds the peer storage data for the channel partner on a per-peer basis.
1384+ peer_storage: Vec<u8>,
13831385}
13841386
13851387impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -8170,9 +8172,65 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
81708172 }
81718173 }
81728174
8173- fn internal_peer_storage_retrieval(&self, _counterparty_node_id: &PublicKey, _msg: &msgs::PeerStorageRetrievalMessage) {}
8175+ fn internal_peer_storage_retrieval(&self, counterparty_node_id: &PublicKey, _msg: &msgs::PeerStorageRetrievalMessage) {
8176+ // TODO: Decrypt and check if have any stale or missing ChannelMonitor.
8177+ let per_peer_state = self.per_peer_state.read().unwrap();
8178+ let peer_state_mutex = match per_peer_state.get(counterparty_node_id) {
8179+ Some(peer_state_mutex) => peer_state_mutex,
8180+ None => return,
8181+ };
8182+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8183+ let peer_state = &mut *peer_state_lock;
8184+ let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None, None);
8185+
8186+ log_debug!(logger, "Received unexpected peer_storage_retrieval from {}. This is unusual since we do not yet distribute peer storage. Sending a warning.", log_pubkey!(counterparty_node_id));
8187+ peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
8188+ node_id: counterparty_node_id.clone(),
8189+ action: msgs::ErrorAction::SendWarningMessage {
8190+ msg: msgs::WarningMessage {
8191+ channel_id: ChannelId([0; 32]),
8192+ data: "Invalid peer_storage_retrieval message received.".to_owned()
8193+ },
8194+ log_level: Level::Trace,
8195+ }
8196+ });
8197+ }
81748198
8175- fn internal_peer_storage(&self, _counterparty_node_id: &PublicKey, _msg: &msgs::PeerStorageMessage) {}
8199+ fn internal_peer_storage(&self, counterparty_node_id: &PublicKey, msg: &msgs::PeerStorageMessage) {
8200+ let per_peer_state = self.per_peer_state.read().unwrap();
8201+ let peer_state_mutex = match per_peer_state.get(counterparty_node_id) {
8202+ Some(peer_state_mutex) => peer_state_mutex,
8203+ None => return,
8204+ };
8205+ let mut peer_state_lock = peer_state_mutex.lock().unwrap();
8206+ let peer_state = &mut *peer_state_lock;
8207+ let logger = WithContext::from(&self.logger, Some(*counterparty_node_id), None, None);
8208+
8209+ // Check if we have any channels with the peer (Currently we only provide the servie to peers we have a channel with).
8210+ if !peer_state.channel_by_id.values().any(|phase| phase.is_funded()) {
8211+ log_debug!(logger, "Ignoring peer storage request from {} as we don't have any funded channels with them.", log_pubkey!(counterparty_node_id));
8212+ return;
8213+ }
8214+
8215+ #[cfg(not(test))]
8216+ if msg.data.len() > 1024 {
8217+ log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id));
8218+ peer_state.pending_msg_events.push(events::MessageSendEvent::HandleError {
8219+ node_id: counterparty_node_id.clone(),
8220+ action: msgs::ErrorAction::SendWarningMessage {
8221+ msg: msgs::WarningMessage {
8222+ channel_id: ChannelId([0; 32]),
8223+ data: "Supports only data up to 1 KiB in peer storage.".to_owned()
8224+ },
8225+ log_level: Level::Trace,
8226+ }
8227+ });
8228+ return;
8229+ }
8230+
8231+ log_trace!(logger, "Received Peer Storage from {}", log_pubkey!(counterparty_node_id));
8232+ peer_state.peer_storage = msg.data.clone();
8233+ }
81768234
81778235 fn internal_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) -> Result<(), MsgHandleErrInternal> {
81788236 let best_block = *self.best_block.read().unwrap();
@@ -11664,6 +11722,7 @@ where
1166411722 actions_blocking_raa_monitor_updates: BTreeMap::new(),
1166511723 closed_channel_monitor_update_ids: BTreeMap::new(),
1166611724 is_connected: true,
11725+ peer_storage: Vec::new(),
1166711726 }));
1166811727 },
1166911728 hash_map::Entry::Occupied(e) => {
@@ -11693,6 +11752,15 @@ where
1169311752 let peer_state = &mut *peer_state_lock;
1169411753 let pending_msg_events = &mut peer_state.pending_msg_events;
1169511754
11755+ if !peer_state.peer_storage.is_empty() {
11756+ pending_msg_events.push(events::MessageSendEvent::SendPeerStorageRetrievalMessage {
11757+ node_id: counterparty_node_id.clone(),
11758+ msg: msgs::PeerStorageRetrievalMessage {
11759+ data: peer_state.peer_storage.clone()
11760+ },
11761+ });
11762+ }
11763+
1169611764 for (_, chan) in peer_state.channel_by_id.iter_mut() {
1169711765 let logger = WithChannelContext::from(&self.logger, &chan.context(), None);
1169811766 match chan.peer_connected_get_handshake(self.chain_hash, &&logger) {
@@ -12850,6 +12918,8 @@ where
1285012918 peer_states.push(peer_state_mutex.unsafe_well_ordered_double_lock_self());
1285112919 }
1285212920
12921+ let mut peer_storage_dir: Vec<(&PublicKey, &Vec<u8>)> = Vec::new();
12922+
1285312923 (serializable_peer_count).write(writer)?;
1285412924 for ((peer_pubkey, _), peer_state) in per_peer_state.iter().zip(peer_states.iter()) {
1285512925 // Peers which we have no channels to should be dropped once disconnected. As we
@@ -12859,6 +12929,8 @@ where
1285912929 if !peer_state.ok_to_remove(false) {
1286012930 peer_pubkey.write(writer)?;
1286112931 peer_state.latest_features.write(writer)?;
12932+ peer_storage_dir.push((peer_pubkey, &peer_state.peer_storage));
12933+
1286212934 if !peer_state.monitor_update_blocked_actions.is_empty() {
1286312935 monitor_update_blocked_actions_per_peer
1286412936 .get_or_insert_with(Vec::new)
@@ -12980,6 +13052,7 @@ where
1298013052 (14, decode_update_add_htlcs_opt, option),
1298113053 (15, self.inbound_payment_id_secret, required),
1298213054 (17, in_flight_monitor_updates, required),
13055+ (19, peer_storage_dir, optional_vec),
1298313056 });
1298413057
1298513058 Ok(())
@@ -13212,6 +13285,7 @@ where
1321213285 monitor_update_blocked_actions: BTreeMap::new(),
1321313286 actions_blocking_raa_monitor_updates: BTreeMap::new(),
1321413287 closed_channel_monitor_update_ids: BTreeMap::new(),
13288+ peer_storage: Vec::new(),
1321513289 is_connected: false,
1321613290 }
1321713291 };
@@ -13507,6 +13581,7 @@ where
1350713581 let mut in_flight_monitor_updates: Option<HashMap<(PublicKey, ChannelId), Vec<ChannelMonitorUpdate>>> = None;
1350813582 let mut decode_update_add_htlcs: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> = None;
1350913583 let mut inbound_payment_id_secret = None;
13584+ let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
1351013585 read_tlv_fields!(reader, {
1351113586 (1, pending_outbound_payments_no_retry, option),
1351213587 (2, pending_intercepted_htlcs, option),
@@ -13523,8 +13598,10 @@ where
1352313598 (14, decode_update_add_htlcs, option),
1352413599 (15, inbound_payment_id_secret, option),
1352513600 (17, in_flight_monitor_updates, required),
13601+ (19, peer_storage_dir, optional_vec),
1352613602 });
1352713603 let mut decode_update_add_htlcs = decode_update_add_htlcs.unwrap_or_else(|| new_hash_map());
13604+ let peer_storage_dir: Vec<(PublicKey, Vec<u8>)> = peer_storage_dir.unwrap_or_else(Vec::new);
1352813605 if fake_scid_rand_bytes.is_none() {
1352913606 fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes());
1353013607 }
@@ -13556,6 +13633,12 @@ where
1355613633 }
1355713634 let pending_outbounds = OutboundPayments::new(pending_outbound_payments.unwrap());
1355813635
13636+ for (peer_pubkey, peer_storage) in peer_storage_dir {
13637+ if let Some(peer_state) = per_peer_state.get_mut(&peer_pubkey) {
13638+ peer_state.get_mut().unwrap().peer_storage = peer_storage;
13639+ }
13640+ }
13641+
1355913642 // Handle transitioning from the legacy TLV to the new one on upgrades.
1356013643 if let Some(legacy_in_flight_upds) = legacy_in_flight_monitor_updates {
1356113644 // We should never serialize an empty map.
0 commit comments