@@ -28,6 +28,8 @@ use bitcoin::hash_types::{BlockHash, Txid};
28
28
29
29
use crate :: chain;
30
30
use crate :: chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
31
+ #[ cfg( peer_storage) ]
32
+ use crate :: chain:: channelmonitor:: write_chanmon_internal;
31
33
use crate :: chain:: channelmonitor:: {
32
34
Balance , ChannelMonitor , ChannelMonitorUpdate , MonitorEvent , TransactionOutputs ,
33
35
WithChannelMonitor ,
@@ -36,8 +38,11 @@ use crate::chain::transaction::{OutPoint, TransactionData};
36
38
use crate :: chain:: { ChannelMonitorUpdateStatus , Filter , WatchedOutput } ;
37
39
use crate :: events:: { self , Event , EventHandler , ReplayEvent } ;
38
40
use crate :: ln:: channel_state:: ChannelDetails ;
39
- use crate :: ln:: msgs:: { self , BaseMessageHandler , Init , MessageSendEvent , SendOnlyMessageHandler } ;
40
- use crate :: ln:: our_peer_storage:: DecryptedOurPeerStorage ;
41
+ #[ cfg( peer_storage) ]
42
+ use crate :: ln:: msgs:: PeerStorage ;
43
+ use crate :: ln:: msgs:: { BaseMessageHandler , Init , MessageSendEvent , SendOnlyMessageHandler } ;
44
+ #[ cfg( peer_storage) ]
45
+ use crate :: ln:: our_peer_storage:: { DecryptedOurPeerStorage , PeerStorageMonitorHolder } ;
41
46
use crate :: ln:: types:: ChannelId ;
42
47
use crate :: prelude:: * ;
43
48
use crate :: sign:: ecdsa:: EcdsaChannelSigner ;
@@ -47,8 +52,12 @@ use crate::types::features::{InitFeatures, NodeFeatures};
47
52
use crate :: util:: errors:: APIError ;
48
53
use crate :: util:: logger:: { Logger , WithContext } ;
49
54
use crate :: util:: persist:: MonitorName ;
55
+ #[ cfg( peer_storage) ]
56
+ use crate :: util:: ser:: { VecWriter , Writeable } ;
50
57
use crate :: util:: wakers:: { Future , Notifier } ;
51
58
use bitcoin:: secp256k1:: PublicKey ;
59
+ #[ cfg( peer_storage) ]
60
+ use core:: iter:: Cycle ;
52
61
use core:: ops:: Deref ;
53
62
use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
54
63
@@ -264,7 +273,7 @@ pub struct ChainMonitor<
264
273
logger : L ,
265
274
fee_estimator : F ,
266
275
persister : P ,
267
- entropy_source : ES ,
276
+ _entropy_source : ES ,
268
277
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
269
278
/// from the user and not from a [`ChannelMonitor`].
270
279
pending_monitor_events : Mutex < Vec < ( OutPoint , ChannelId , Vec < MonitorEvent > , PublicKey ) > > ,
@@ -278,6 +287,7 @@ pub struct ChainMonitor<
278
287
/// Messages to send to the peer. This is currently used to distribute PeerStorage to channel partners.
279
288
pending_send_only_events : Mutex < Vec < MessageSendEvent > > ,
280
289
290
+ #[ cfg( peer_storage) ]
281
291
our_peerstorage_encryption_key : PeerStorageKey ,
282
292
}
283
293
@@ -477,7 +487,7 @@ where
477
487
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
478
488
pub fn new (
479
489
chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P ,
480
- entropy_source : ES , our_peerstorage_encryption_key : PeerStorageKey ,
490
+ _entropy_source : ES , _our_peerstorage_encryption_key : PeerStorageKey ,
481
491
) -> Self {
482
492
Self {
483
493
monitors : RwLock :: new ( new_hash_map ( ) ) ,
@@ -486,12 +496,13 @@ where
486
496
logger,
487
497
fee_estimator : feeest,
488
498
persister,
489
- entropy_source ,
499
+ _entropy_source ,
490
500
pending_monitor_events : Mutex :: new ( Vec :: new ( ) ) ,
491
501
highest_chain_height : AtomicUsize :: new ( 0 ) ,
492
502
event_notifier : Notifier :: new ( ) ,
493
503
pending_send_only_events : Mutex :: new ( Vec :: new ( ) ) ,
494
- our_peerstorage_encryption_key,
504
+ #[ cfg( peer_storage) ]
505
+ our_peerstorage_encryption_key : _our_peerstorage_encryption_key,
495
506
}
496
507
}
497
508
@@ -804,23 +815,90 @@ where
804
815
805
816
/// This function collects the counterparty node IDs from all monitors into a `HashSet`,
806
817
/// ensuring unique IDs are returned.
818
+ #[ cfg( peer_storage) ]
807
819
fn all_counterparty_node_ids ( & self ) -> HashSet < PublicKey > {
808
820
let mon = self . monitors . read ( ) . unwrap ( ) ;
809
821
mon. values ( ) . map ( |monitor| monitor. monitor . get_counterparty_node_id ( ) ) . collect ( )
810
822
}
811
823
824
+ #[ cfg( peer_storage) ]
812
825
fn send_peer_storage ( & self , their_node_id : PublicKey ) {
813
- // TODO: Serialize `ChannelMonitor`s inside `our_peer_storage`.
826
+ let mut monitors_list: Vec < PeerStorageMonitorHolder > = Vec :: new ( ) ;
827
+ let random_bytes = self . _entropy_source . get_secure_random_bytes ( ) ;
828
+
829
+ const MAX_PEER_STORAGE_SIZE : usize = 65531 ;
830
+ const USIZE_LEN : usize = core:: mem:: size_of :: < usize > ( ) ;
831
+ let mut random_bytes_cycle_iter = random_bytes. iter ( ) . cycle ( ) ;
832
+
833
+ let mut current_size = 0 ;
834
+ let monitors_lock = self . monitors . read ( ) . unwrap ( ) ;
835
+ let mut channel_ids = monitors_lock. keys ( ) . copied ( ) . collect ( ) ;
836
+
837
+ fn next_random_id (
838
+ channel_ids : & mut Vec < ChannelId > ,
839
+ random_bytes_cycle_iter : & mut Cycle < core:: slice:: Iter < u8 > > ,
840
+ ) -> Option < ChannelId > {
841
+ if channel_ids. is_empty ( ) {
842
+ return None ;
843
+ }
844
+ let random_idx = {
845
+ let mut usize_bytes = [ 0u8 ; USIZE_LEN ] ;
846
+ usize_bytes. iter_mut ( ) . for_each ( |b| {
847
+ * b = * random_bytes_cycle_iter. next ( ) . expect ( "A cycle never ends" )
848
+ } ) ;
849
+ // Take one more to introduce a slight misalignment.
850
+ random_bytes_cycle_iter. next ( ) . expect ( "A cycle never ends" ) ;
851
+ usize:: from_le_bytes ( usize_bytes) % channel_ids. len ( )
852
+ } ;
853
+ Some ( channel_ids. swap_remove ( random_idx) )
854
+ }
855
+
856
+ while let Some ( channel_id) = next_random_id ( & mut channel_ids, & mut random_bytes_cycle_iter)
857
+ {
858
+ let monitor_holder = if let Some ( monitor_holder) = monitors_lock. get ( & channel_id) {
859
+ monitor_holder
860
+ } else {
861
+ debug_assert ! (
862
+ false ,
863
+ "Tried to access non-existing monitor, this should never happen"
864
+ ) ;
865
+ break ;
866
+ } ;
867
+
868
+ let mut serialized_channel = VecWriter ( Vec :: new ( ) ) ;
869
+ let min_seen_secret = monitor_holder. monitor . get_min_seen_secret ( ) ;
870
+ let counterparty_node_id = monitor_holder. monitor . get_counterparty_node_id ( ) ;
871
+ {
872
+ let inner_lock = monitor_holder. monitor . inner . lock ( ) . unwrap ( ) ;
873
+
874
+ write_chanmon_internal ( & inner_lock, true , & mut serialized_channel)
875
+ . expect ( "can not write Channel Monitor for peer storage message" ) ;
876
+ }
877
+ let peer_storage_monitor = PeerStorageMonitorHolder {
878
+ channel_id,
879
+ min_seen_secret,
880
+ counterparty_node_id,
881
+ monitor_bytes : serialized_channel. 0 ,
882
+ } ;
883
+
884
+ let serialized_length = peer_storage_monitor. serialized_length ( ) ;
885
+
886
+ if current_size + serialized_length > MAX_PEER_STORAGE_SIZE {
887
+ continue ;
888
+ } else {
889
+ current_size += serialized_length;
890
+ monitors_list. push ( peer_storage_monitor) ;
891
+ }
892
+ }
814
893
815
- let random_bytes = self . entropy_source . get_secure_random_bytes ( ) ;
816
- let serialised_channels = Vec :: new ( ) ;
894
+ let serialised_channels = monitors_list. encode ( ) ;
817
895
let our_peer_storage = DecryptedOurPeerStorage :: new ( serialised_channels) ;
818
896
let cipher = our_peer_storage. encrypt ( & self . our_peerstorage_encryption_key , & random_bytes) ;
819
897
820
898
log_debug ! ( self . logger, "Sending Peer Storage to {}" , log_pubkey!( their_node_id) ) ;
821
899
let send_peer_storage_event = MessageSendEvent :: SendPeerStorage {
822
900
node_id : their_node_id,
823
- msg : msgs :: PeerStorage { data : cipher. into_vec ( ) } ,
901
+ msg : PeerStorage { data : cipher. into_vec ( ) } ,
824
902
} ;
825
903
826
904
self . pending_send_only_events . lock ( ) . unwrap ( ) . push ( send_peer_storage_event)
@@ -920,6 +998,7 @@ where
920
998
)
921
999
} ) ;
922
1000
1001
+ #[ cfg( peer_storage) ]
923
1002
// Send peer storage everytime a new block arrives.
924
1003
for node_id in self . all_counterparty_node_ids ( ) {
925
1004
self . send_peer_storage ( node_id) ;
@@ -1021,6 +1100,7 @@ where
1021
1100
)
1022
1101
} ) ;
1023
1102
1103
+ #[ cfg( peer_storage) ]
1024
1104
// Send peer storage everytime a new block arrives.
1025
1105
for node_id in self . all_counterparty_node_ids ( ) {
1026
1106
self . send_peer_storage ( node_id) ;
0 commit comments