@@ -11,14 +11,28 @@ use std::path::PathBuf;
1111use lightning:: events:: ClosureReason ;
1212use lightning:: ln:: functional_test_utils:: {
1313 connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block,
14- create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
14+ create_network, create_node_cfgs, create_node_chanmgrs, send_payment, TestChanMonCfg ,
1515} ;
16- use lightning:: util:: persist:: { read_channel_monitors, KVStoreSync , KVSTORE_NAMESPACE_KEY_MAX_LEN } ;
16+ use lightning:: util:: persist:: {
17+ KVStoreSync , MonitorUpdatingPersister , KVSTORE_NAMESPACE_KEY_MAX_LEN ,
18+ } ;
19+
1720use lightning:: util:: test_utils;
1821use lightning:: { check_added_monitors, check_closed_broadcast, check_closed_event} ;
1922use rand:: distributions:: Alphanumeric ;
2023use rand:: { thread_rng, Rng } ;
2124
25+ type TestMonitorUpdatePersister < ' a , K > = MonitorUpdatingPersister <
26+ & ' a K ,
27+ & ' a test_utils:: TestLogger ,
28+ & ' a test_utils:: TestKeysInterface ,
29+ & ' a test_utils:: TestKeysInterface ,
30+ & ' a test_utils:: TestBroadcaster ,
31+ & ' a test_utils:: TestFeeEstimator ,
32+ > ;
33+
34+ const EXPECTED_UPDATES_PER_PAYMENT : u64 = 5 ;
35+
2236pub ( crate ) fn random_storage_path ( ) -> PathBuf {
2337 let mut temp_path = std:: env:: temp_dir ( ) ;
2438 let mut rng = thread_rng ( ) ;
@@ -77,54 +91,71 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
7791 assert_eq ! ( listed_keys. len( ) , 0 ) ;
7892}
7993
94+ pub ( crate ) fn create_persister < ' a , K : KVStoreSync + Sync > (
95+ store : & ' a K , chanmon_cfg : & ' a TestChanMonCfg , max_pending_updates : u64 ,
96+ ) -> TestMonitorUpdatePersister < ' a , K > {
97+ MonitorUpdatingPersister :: new (
98+ store,
99+ & chanmon_cfg. logger ,
100+ max_pending_updates,
101+ & chanmon_cfg. keys_manager ,
102+ & chanmon_cfg. keys_manager ,
103+ & chanmon_cfg. tx_broadcaster ,
104+ & chanmon_cfg. fee_estimator ,
105+ )
106+ }
107+
108+ pub ( crate ) fn create_chain_monitor < ' a , K : KVStoreSync + Sync > (
109+ chanmon_cfg : & ' a TestChanMonCfg , persister : & ' a TestMonitorUpdatePersister < ' a , K > ,
110+ ) -> test_utils:: TestChainMonitor < ' a > {
111+ test_utils:: TestChainMonitor :: new (
112+ Some ( & chanmon_cfg. chain_source ) ,
113+ & chanmon_cfg. tx_broadcaster ,
114+ & chanmon_cfg. logger ,
115+ & chanmon_cfg. fee_estimator ,
116+ persister,
117+ & chanmon_cfg. keys_manager ,
118+ )
119+ }
120+
80121// Integration-test the given KVStore implementation. Test relaying a few payments and check that
81122// the persisted data is updated the appropriate number of times.
82123pub ( crate ) fn do_test_store < K : KVStoreSync + Sync > ( store_0 : & K , store_1 : & K ) {
124+ // This value is used later to limit how many iterations we perform.
125+ let persister_0_max_pending_updates = 7 ;
126+ // Intentionally set this to a smaller value to test a different alignment.
127+ let persister_1_max_pending_updates = 3 ;
128+
83129 let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
130+
131+ let persister_0 = create_persister ( store_0, & chanmon_cfgs[ 0 ] , persister_0_max_pending_updates) ;
132+ let persister_1 = create_persister ( store_1, & chanmon_cfgs[ 1 ] , persister_1_max_pending_updates) ;
133+
134+ let chain_mon_0 = create_chain_monitor ( & chanmon_cfgs[ 0 ] , & persister_0) ;
135+ let chain_mon_1 = create_chain_monitor ( & chanmon_cfgs[ 1 ] , & persister_1) ;
136+
84137 let mut node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
85- let chain_mon_0 = test_utils:: TestChainMonitor :: new (
86- Some ( & chanmon_cfgs[ 0 ] . chain_source ) ,
87- & chanmon_cfgs[ 0 ] . tx_broadcaster ,
88- & chanmon_cfgs[ 0 ] . logger ,
89- & chanmon_cfgs[ 0 ] . fee_estimator ,
90- store_0,
91- node_cfgs[ 0 ] . keys_manager ,
92- ) ;
93- let chain_mon_1 = test_utils:: TestChainMonitor :: new (
94- Some ( & chanmon_cfgs[ 1 ] . chain_source ) ,
95- & chanmon_cfgs[ 1 ] . tx_broadcaster ,
96- & chanmon_cfgs[ 1 ] . logger ,
97- & chanmon_cfgs[ 1 ] . fee_estimator ,
98- store_1,
99- node_cfgs[ 1 ] . keys_manager ,
100- ) ;
101138 node_cfgs[ 0 ] . chain_monitor = chain_mon_0;
102139 node_cfgs[ 1 ] . chain_monitor = chain_mon_1;
103140 let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
104141 let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
105142
106143 // Check that the persisted channel data is empty before any channels are
107144 // open.
108- let mut persisted_chan_data_0 =
109- read_channel_monitors ( store_0, nodes[ 0 ] . keys_manager , nodes[ 0 ] . keys_manager ) . unwrap ( ) ;
145+ let mut persisted_chan_data_0 = persister_0. read_all_channel_monitors_with_updates ( ) . unwrap ( ) ;
110146 assert_eq ! ( persisted_chan_data_0. len( ) , 0 ) ;
111- let mut persisted_chan_data_1 =
112- read_channel_monitors ( store_1, nodes[ 1 ] . keys_manager , nodes[ 1 ] . keys_manager ) . unwrap ( ) ;
147+ let mut persisted_chan_data_1 = persister_1. read_all_channel_monitors_with_updates ( ) . unwrap ( ) ;
113148 assert_eq ! ( persisted_chan_data_1. len( ) , 0 ) ;
114149
115150 // Helper to make sure the channel is on the expected update ID.
116151 macro_rules! check_persisted_data {
117152 ( $expected_update_id: expr) => {
118- persisted_chan_data_0 =
119- read_channel_monitors( store_0, nodes[ 0 ] . keys_manager, nodes[ 0 ] . keys_manager)
120- . unwrap( ) ;
153+ persisted_chan_data_0 = persister_0. read_all_channel_monitors_with_updates( ) . unwrap( ) ;
121154 assert_eq!( persisted_chan_data_0. len( ) , 1 ) ;
122155 for ( _, mon) in persisted_chan_data_0. iter( ) {
123156 assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
124157 }
125- persisted_chan_data_1 =
126- read_channel_monitors( store_1, nodes[ 1 ] . keys_manager, nodes[ 1 ] . keys_manager)
127- . unwrap( ) ;
158+ persisted_chan_data_1 = persister_1. read_all_channel_monitors_with_updates( ) . unwrap( ) ;
128159 assert_eq!( persisted_chan_data_1. len( ) , 1 ) ;
129160 for ( _, mon) in persisted_chan_data_1. iter( ) {
130161 assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
@@ -137,10 +168,29 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
137168 check_persisted_data ! ( 0 ) ;
138169
139170 // Send a few payments and make sure the monitors are updated to the latest.
140- send_payment ( & nodes[ 0 ] , & vec ! [ & nodes[ 1 ] ] [ ..] , 8000000 ) ;
141- check_persisted_data ! ( 5 ) ;
142- send_payment ( & nodes[ 1 ] , & vec ! [ & nodes[ 0 ] ] [ ..] , 4000000 ) ;
143- check_persisted_data ! ( 10 ) ;
171+ let expected_route = & [ & nodes[ 1 ] ] [ ..] ;
172+ send_payment ( & nodes[ 0 ] , expected_route, 8_000_000 ) ;
173+ check_persisted_data ! ( EXPECTED_UPDATES_PER_PAYMENT ) ;
174+ let expected_route = & [ & nodes[ 0 ] ] [ ..] ;
175+ send_payment ( & nodes[ 1 ] , expected_route, 4_000_000 ) ;
176+ check_persisted_data ! ( 2 * EXPECTED_UPDATES_PER_PAYMENT ) ;
177+
178+ // Send a few more payments to try all the alignments of max pending updates with
179+ // updates for a payment sent and received.
180+ let mut sender = 0 ;
181+ for i in 3 ..=persister_0_max_pending_updates * 2 {
182+ let receiver;
183+ if sender == 0 {
184+ sender = 1 ;
185+ receiver = 0 ;
186+ } else {
187+ sender = 0 ;
188+ receiver = 1 ;
189+ }
190+ let expected_route = & [ & nodes[ receiver] ] [ ..] ;
191+ send_payment ( & nodes[ sender] , expected_route, 21_000 ) ;
192+ check_persisted_data ! ( i * EXPECTED_UPDATES_PER_PAYMENT ) ;
193+ }
144194
145195 // Force close because cooperative close doesn't result in any persisted
146196 // updates.
@@ -163,27 +213,18 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
163213 check_closed_broadcast ! ( nodes[ 0 ] , true ) ;
164214 check_added_monitors ! ( nodes[ 0 ] , 1 ) ;
165215
166- let node_txn = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) ;
216+ let node_txn = nodes[ 0 ] . tx_broadcaster . txn_broadcast ( ) ;
167217 assert_eq ! ( node_txn. len( ) , 1 ) ;
218+ let txn = vec ! [ node_txn[ 0 ] . clone( ) , node_txn[ 0 ] . clone( ) ] ;
219+ let dummy_block = create_dummy_block ( nodes[ 0 ] . best_block_hash ( ) , 42 , txn) ;
220+ connect_block ( & nodes[ 1 ] , & dummy_block) ;
168221
169- connect_block (
170- & nodes[ 1 ] ,
171- & create_dummy_block (
172- nodes[ 0 ] . best_block_hash ( ) ,
173- 42 ,
174- vec ! [ node_txn[ 0 ] . clone( ) , node_txn[ 0 ] . clone( ) ] ,
175- ) ,
176- ) ;
177222 check_closed_broadcast ! ( nodes[ 1 ] , true ) ;
178- check_closed_event ! (
179- nodes[ 1 ] ,
180- 1 ,
181- ClosureReason :: CommitmentTxConfirmed ,
182- [ nodes[ 0 ] . node. get_our_node_id( ) ] ,
183- 100000
184- ) ;
223+ let reason = ClosureReason :: CommitmentTxConfirmed ;
224+ let node_id_0 = nodes[ 0 ] . node . get_our_node_id ( ) ;
225+ check_closed_event ! ( nodes[ 1 ] , 1 , reason, false , [ node_id_0] , 100000 ) ;
185226 check_added_monitors ! ( nodes[ 1 ] , 1 ) ;
186227
187228 // Make sure everything is persisted as expected after close.
188- check_persisted_data ! ( 11 ) ;
229+ check_persisted_data ! ( persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1 ) ;
189230}
0 commit comments