@@ -11,14 +11,28 @@ use std::path::PathBuf;
1111use lightning:: events:: ClosureReason ;
1212use lightning:: ln:: functional_test_utils:: {
1313 connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block,
14- create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
14+ create_network, create_node_cfgs, create_node_chanmgrs, send_payment, TestChanMonCfg ,
1515} ;
16- use lightning:: util:: persist:: { read_channel_monitors, KVStoreSync , KVSTORE_NAMESPACE_KEY_MAX_LEN } ;
16+ use lightning:: util:: persist:: {
17+ KVStoreSync , MonitorUpdatingPersister , KVSTORE_NAMESPACE_KEY_MAX_LEN ,
18+ } ;
19+
1720use lightning:: util:: test_utils;
1821use lightning:: { check_added_monitors, check_closed_broadcast, check_closed_event} ;
1922use rand:: distributions:: Alphanumeric ;
2023use rand:: { thread_rng, Rng } ;
2124
25+ type TestMonitorUpdatePersister < ' a , K > = MonitorUpdatingPersister <
26+ & ' a K ,
27+ & ' a test_utils:: TestLogger ,
28+ & ' a test_utils:: TestKeysInterface ,
29+ & ' a test_utils:: TestKeysInterface ,
30+ & ' a test_utils:: TestBroadcaster ,
31+ & ' a test_utils:: TestFeeEstimator ,
32+ > ;
33+
34+ const EXPECTED_UPDATES_PER_PAYMENT : u64 = 5 ;
35+
2236pub ( crate ) fn random_storage_path ( ) -> PathBuf {
2337 let mut temp_path = std:: env:: temp_dir ( ) ;
2438 let mut rng = thread_rng ( ) ;
@@ -77,54 +91,73 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
7791 assert_eq ! ( listed_keys. len( ) , 0 ) ;
7892}
7993
94+ pub ( crate ) fn create_persister < ' a , K : KVStoreSync + Sync > (
95+ store : & ' a K , chanmon_cfg : & ' a TestChanMonCfg , max_pending_updates : u64 ,
96+ ) -> TestMonitorUpdatePersister < ' a , K > {
97+ let persister: TestMonitorUpdatePersister < ' a , K > = MonitorUpdatingPersister :: new (
98+ store,
99+ & chanmon_cfg. logger ,
100+ max_pending_updates,
101+ & chanmon_cfg. keys_manager ,
102+ & chanmon_cfg. keys_manager ,
103+ & chanmon_cfg. tx_broadcaster ,
104+ & chanmon_cfg. fee_estimator ,
105+ ) ;
106+ return persister;
107+ }
108+
109+ pub ( crate ) fn create_chain_monitor < ' a , K : KVStoreSync + Sync > (
110+ chanmon_cfg : & ' a TestChanMonCfg , persister : & ' a TestMonitorUpdatePersister < ' a , K > ,
111+ ) -> test_utils:: TestChainMonitor < ' a > {
112+ let chain_mon = test_utils:: TestChainMonitor :: new (
113+ Some ( & chanmon_cfg. chain_source ) ,
114+ & chanmon_cfg. tx_broadcaster ,
115+ & chanmon_cfg. logger ,
116+ & chanmon_cfg. fee_estimator ,
117+ persister,
118+ & chanmon_cfg. keys_manager ,
119+ ) ;
120+ return chain_mon;
121+ }
122+
80123// Integration-test the given KVStore implementation. Test relaying a few payments and check that
81124// the persisted data is updated the appropriate number of times.
82125pub ( crate ) fn do_test_store < K : KVStoreSync + Sync > ( store_0 : & K , store_1 : & K ) {
126+ // This value is used later to limit how many iterations we perform.
127+ let persister_0_max_pending_updates = 7 ;
128+ // Intentionally set this to a smaller value to test a different alignment.
129+ let persister_1_max_pending_updates = 3 ;
130+
83131 let chanmon_cfgs = create_chanmon_cfgs ( 2 ) ;
132+
133+ let persister_0 = create_persister ( store_0, & chanmon_cfgs[ 0 ] , persister_0_max_pending_updates) ;
134+ let persister_1 = create_persister ( store_1, & chanmon_cfgs[ 1 ] , persister_1_max_pending_updates) ;
135+
136+ let chain_mon_0 = create_chain_monitor ( & chanmon_cfgs[ 0 ] , & persister_0) ;
137+ let chain_mon_1 = create_chain_monitor ( & chanmon_cfgs[ 1 ] , & persister_1) ;
138+
84139 let mut node_cfgs = create_node_cfgs ( 2 , & chanmon_cfgs) ;
85- let chain_mon_0 = test_utils:: TestChainMonitor :: new (
86- Some ( & chanmon_cfgs[ 0 ] . chain_source ) ,
87- & chanmon_cfgs[ 0 ] . tx_broadcaster ,
88- & chanmon_cfgs[ 0 ] . logger ,
89- & chanmon_cfgs[ 0 ] . fee_estimator ,
90- store_0,
91- node_cfgs[ 0 ] . keys_manager ,
92- ) ;
93- let chain_mon_1 = test_utils:: TestChainMonitor :: new (
94- Some ( & chanmon_cfgs[ 1 ] . chain_source ) ,
95- & chanmon_cfgs[ 1 ] . tx_broadcaster ,
96- & chanmon_cfgs[ 1 ] . logger ,
97- & chanmon_cfgs[ 1 ] . fee_estimator ,
98- store_1,
99- node_cfgs[ 1 ] . keys_manager ,
100- ) ;
101140 node_cfgs[ 0 ] . chain_monitor = chain_mon_0;
102141 node_cfgs[ 1 ] . chain_monitor = chain_mon_1;
103142 let node_chanmgrs = create_node_chanmgrs ( 2 , & node_cfgs, & [ None , None ] ) ;
104143 let nodes = create_network ( 2 , & node_cfgs, & node_chanmgrs) ;
105144
106145 // Check that the persisted channel data is empty before any channels are
107146 // open.
108- let mut persisted_chan_data_0 =
109- read_channel_monitors ( store_0, nodes[ 0 ] . keys_manager , nodes[ 0 ] . keys_manager ) . unwrap ( ) ;
147+ let mut persisted_chan_data_0 = persister_0. read_all_channel_monitors_with_updates ( ) . unwrap ( ) ;
110148 assert_eq ! ( persisted_chan_data_0. len( ) , 0 ) ;
111- let mut persisted_chan_data_1 =
112- read_channel_monitors ( store_1, nodes[ 1 ] . keys_manager , nodes[ 1 ] . keys_manager ) . unwrap ( ) ;
149+ let mut persisted_chan_data_1 = persister_1. read_all_channel_monitors_with_updates ( ) . unwrap ( ) ;
113150 assert_eq ! ( persisted_chan_data_1. len( ) , 0 ) ;
114151
115152 // Helper to make sure the channel is on the expected update ID.
116153 macro_rules! check_persisted_data {
117154 ( $expected_update_id: expr) => {
118- persisted_chan_data_0 =
119- read_channel_monitors( store_0, nodes[ 0 ] . keys_manager, nodes[ 0 ] . keys_manager)
120- . unwrap( ) ;
155+ persisted_chan_data_0 = persister_0. read_all_channel_monitors_with_updates( ) . unwrap( ) ;
121156 assert_eq!( persisted_chan_data_0. len( ) , 1 ) ;
122157 for ( _, mon) in persisted_chan_data_0. iter( ) {
123158 assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
124159 }
125- persisted_chan_data_1 =
126- read_channel_monitors( store_1, nodes[ 1 ] . keys_manager, nodes[ 1 ] . keys_manager)
127- . unwrap( ) ;
160+ persisted_chan_data_1 = persister_1. read_all_channel_monitors_with_updates( ) . unwrap( ) ;
128161 assert_eq!( persisted_chan_data_1. len( ) , 1 ) ;
129162 for ( _, mon) in persisted_chan_data_1. iter( ) {
130163 assert_eq!( mon. get_latest_update_id( ) , $expected_update_id) ;
@@ -137,10 +170,29 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
137170 check_persisted_data ! ( 0 ) ;
138171
139172 // Send a few payments and make sure the monitors are updated to the latest.
140- send_payment ( & nodes[ 0 ] , & vec ! [ & nodes[ 1 ] ] [ ..] , 8000000 ) ;
141- check_persisted_data ! ( 5 ) ;
142- send_payment ( & nodes[ 1 ] , & vec ! [ & nodes[ 0 ] ] [ ..] , 4000000 ) ;
143- check_persisted_data ! ( 10 ) ;
173+ let expected_route = & [ & nodes[ 1 ] ] [ ..] ;
174+ send_payment ( & nodes[ 0 ] , expected_route, 8_000_000 ) ;
175+ check_persisted_data ! ( EXPECTED_UPDATES_PER_PAYMENT ) ;
176+ let expected_route = & [ & nodes[ 0 ] ] [ ..] ;
177+ send_payment ( & nodes[ 1 ] , expected_route, 4_000_000 ) ;
178+ check_persisted_data ! ( 2 * EXPECTED_UPDATES_PER_PAYMENT ) ;
179+
180+ // Send a few more payments to try all the alignments of max pending updates with
181+ // updates for a payment sent and received.
182+ let mut sender = 0 ;
183+ for i in 3 ..=persister_0_max_pending_updates * 2 {
184+ let receiver;
185+ if sender == 0 {
186+ sender = 1 ;
187+ receiver = 0 ;
188+ } else {
189+ sender = 0 ;
190+ receiver = 1 ;
191+ }
192+ let expected_route = & [ & nodes[ receiver] ] [ ..] ;
193+ send_payment ( & nodes[ sender] , expected_route, 21_000 ) ;
194+ check_persisted_data ! ( i * EXPECTED_UPDATES_PER_PAYMENT ) ;
195+ }
144196
145197 // Force close because cooperative close doesn't result in any persisted
146198 // updates.
@@ -163,27 +215,18 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
163215 check_closed_broadcast ! ( nodes[ 0 ] , true ) ;
164216 check_added_monitors ! ( nodes[ 0 ] , 1 ) ;
165217
166- let node_txn = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) ;
218+ let node_txn = nodes[ 0 ] . tx_broadcaster . txn_broadcast ( ) ;
167219 assert_eq ! ( node_txn. len( ) , 1 ) ;
220+ let txn = vec ! [ node_txn[ 0 ] . clone( ) , node_txn[ 0 ] . clone( ) ] ;
221+ let dummy_block = create_dummy_block ( nodes[ 0 ] . best_block_hash ( ) , 42 , txn) ;
222+ connect_block ( & nodes[ 1 ] , & dummy_block) ;
168223
169- connect_block (
170- & nodes[ 1 ] ,
171- & create_dummy_block (
172- nodes[ 0 ] . best_block_hash ( ) ,
173- 42 ,
174- vec ! [ node_txn[ 0 ] . clone( ) , node_txn[ 0 ] . clone( ) ] ,
175- ) ,
176- ) ;
177224 check_closed_broadcast ! ( nodes[ 1 ] , true ) ;
178- check_closed_event ! (
179- nodes[ 1 ] ,
180- 1 ,
181- ClosureReason :: CommitmentTxConfirmed ,
182- [ nodes[ 0 ] . node. get_our_node_id( ) ] ,
183- 100000
184- ) ;
225+ let reason = ClosureReason :: CommitmentTxConfirmed ;
226+ let node_id_0 = nodes[ 0 ] . node . get_our_node_id ( ) ;
227+ check_closed_event ! ( nodes[ 1 ] , 1 , reason, false , [ node_id_0] , 100000 ) ;
185228 check_added_monitors ! ( nodes[ 1 ] , 1 ) ;
186229
187230 // Make sure everything is persisted as expected after close.
188- check_persisted_data ! ( 11 ) ;
231+ check_persisted_data ! ( persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1 ) ;
189232}
0 commit comments