Skip to content

Commit 14494e1

Browse files
committed
Use MonitorUpdatingPersister
1 parent 3b36020 commit 14494e1

File tree

3 files changed

+150
-83
lines changed

3 files changed

+150
-83
lines changed

src/builder.rs

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use crate::peer_store::PeerStore;
2626
use crate::tx_broadcaster::TransactionBroadcaster;
2727
use crate::types::{
2828
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
29-
OnionMessenger, PeerManager,
29+
OnionMessenger, PeerManager, Persister,
3030
};
3131
use crate::wallet::persist::KVStoreWalletPersister;
3232
use crate::wallet::Wallet;
@@ -46,7 +46,7 @@ use lightning::routing::scoring::{
4646
use lightning::sign::EntropySource;
4747

4848
use lightning::util::persist::{
49-
read_channel_monitors, CHANNEL_MANAGER_PERSISTENCE_KEY,
49+
CHANNEL_MANAGER_PERSISTENCE_KEY,
5050
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
5151
};
5252
use lightning::util::ser::ReadableArgs;
@@ -908,15 +908,6 @@ fn build_with_store_internal(
908908

909909
let runtime = Arc::new(RwLock::new(None));
910910

911-
// Initialize the ChainMonitor
912-
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
913-
Some(Arc::clone(&chain_source)),
914-
Arc::clone(&tx_broadcaster),
915-
Arc::clone(&logger),
916-
Arc::clone(&fee_estimator),
917-
Arc::clone(&kv_store),
918-
));
919-
920911
// Initialize the KeysManager
921912
let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| {
922913
log_error!(logger, "Failed to get current time: {}", e);
@@ -932,6 +923,38 @@ fn build_with_store_internal(
932923
Arc::clone(&logger),
933924
));
934925

926+
let persister = Arc::new(Persister::new(
927+
Arc::clone(&kv_store),
928+
Arc::clone(&logger),
929+
10, // (?)
930+
Arc::clone(&keys_manager),
931+
Arc::clone(&keys_manager),
932+
Arc::clone(&tx_broadcaster),
933+
Arc::clone(&fee_estimator),
934+
));
935+
936+
// Read ChannelMonitor state from store
937+
let channel_monitors = match persister.read_all_channel_monitors_with_updates() {
938+
Ok(monitors) => monitors,
939+
Err(e) => {
940+
if e.kind() == lightning::io::ErrorKind::NotFound {
941+
Vec::new()
942+
} else {
943+
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
944+
return Err(BuildError::ReadFailed);
945+
}
946+
},
947+
};
948+
949+
// Initialize the ChainMonitor
950+
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
951+
Some(Arc::clone(&chain_source)),
952+
Arc::clone(&tx_broadcaster),
953+
Arc::clone(&logger),
954+
Arc::clone(&fee_estimator),
955+
Arc::clone(&persister),
956+
));
957+
935958
// Initialize the network graph, scorer, and router
936959
let network_graph =
937960
match io::utils::read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)) {
@@ -974,23 +997,6 @@ fn build_with_store_internal(
974997
scoring_fee_params,
975998
));
976999

977-
// Read ChannelMonitor state from store
978-
let channel_monitors = match read_channel_monitors(
979-
Arc::clone(&kv_store),
980-
Arc::clone(&keys_manager),
981-
Arc::clone(&keys_manager),
982-
) {
983-
Ok(monitors) => monitors,
984-
Err(e) => {
985-
if e.kind() == lightning::io::ErrorKind::NotFound {
986-
Vec::new()
987-
} else {
988-
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
989-
return Err(BuildError::ReadFailed);
990-
}
991-
},
992-
};
993-
9941000
let mut user_config = default_user_config(&config);
9951001
if liquidity_source_config.and_then(|lsc| lsc.lsps2_service.as_ref()).is_some() {
9961002
// Generally allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll

src/io/test_utils.rs

Lines changed: 105 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,22 @@
77

88
use lightning::ln::functional_test_utils::{
99
connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block,
10-
create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
10+
create_network, create_node_cfgs, create_node_chanmgrs, send_payment, check_closed_event,
1111
};
12-
use lightning::util::persist::{read_channel_monitors, KVStore, KVSTORE_NAMESPACE_KEY_MAX_LEN};
12+
use lightning::util::persist::{MonitorUpdatingPersister, MonitorName, KVStore, CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, KVSTORE_NAMESPACE_KEY_MAX_LEN};
1313

1414
use lightning::events::ClosureReason;
1515
use lightning::util::test_utils;
16-
use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event};
16+
use lightning::{check_added_monitors, check_closed_broadcast};
1717

1818
use rand::distributions::Alphanumeric;
1919
use rand::{thread_rng, Rng};
2020

2121
use std::panic::RefUnwindSafe;
2222
use std::path::PathBuf;
2323

24+
const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
25+
2426
pub(crate) fn random_storage_path() -> PathBuf {
2527
let mut temp_path = std::env::temp_dir();
2628
let mut rng = thread_rng();
@@ -81,54 +83,104 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStore + RefUnwindSafe>(kv_s
8183
// Integration-test the given KVStore implementation. Test relaying a few payments and check that
8284
// the persisted data is updated the appropriate number of times.
8385
pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
86+
// This value is used later to limit how many iterations we perform.
87+
let persister_0_max_pending_updates = 7;
88+
// Intentionally set this to a smaller value to test a different alignment.
89+
let persister_1_max_pending_updates = 3;
90+
8491
let chanmon_cfgs = create_chanmon_cfgs(2);
92+
93+
let persister_0 = MonitorUpdatingPersister::new(
94+
store_0,
95+
&chanmon_cfgs[0].logger,
96+
persister_0_max_pending_updates,
97+
&chanmon_cfgs[0].keys_manager,
98+
&chanmon_cfgs[0].keys_manager,
99+
&chanmon_cfgs[0].tx_broadcaster,
100+
&chanmon_cfgs[0].fee_estimator,
101+
);
102+
103+
let persister_1 = MonitorUpdatingPersister::new(
104+
store_1,
105+
&chanmon_cfgs[1].logger,
106+
persister_1_max_pending_updates,
107+
&chanmon_cfgs[1].keys_manager,
108+
&chanmon_cfgs[1].keys_manager,
109+
&chanmon_cfgs[1].tx_broadcaster,
110+
&chanmon_cfgs[1].fee_estimator,
111+
);
112+
85113
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
114+
86115
let chain_mon_0 = test_utils::TestChainMonitor::new(
87116
Some(&chanmon_cfgs[0].chain_source),
88117
&chanmon_cfgs[0].tx_broadcaster,
89118
&chanmon_cfgs[0].logger,
90119
&chanmon_cfgs[0].fee_estimator,
91-
store_0,
92-
node_cfgs[0].keys_manager,
120+
&persister_0,
121+
&chanmon_cfgs[0].keys_manager,
93122
);
123+
94124
let chain_mon_1 = test_utils::TestChainMonitor::new(
95125
Some(&chanmon_cfgs[1].chain_source),
96126
&chanmon_cfgs[1].tx_broadcaster,
97127
&chanmon_cfgs[1].logger,
98128
&chanmon_cfgs[1].fee_estimator,
99-
store_1,
100-
node_cfgs[1].keys_manager,
129+
&persister_1,
130+
&chanmon_cfgs[1].keys_manager,
101131
);
132+
102133
node_cfgs[0].chain_monitor = chain_mon_0;
103134
node_cfgs[1].chain_monitor = chain_mon_1;
104135
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
105136
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
106137

107138
// Check that the persisted channel data is empty before any channels are
108139
// open.
109-
let mut persisted_chan_data_0 =
110-
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
140+
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
111141
assert_eq!(persisted_chan_data_0.len(), 0);
112-
let mut persisted_chan_data_1 =
113-
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
142+
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
114143
assert_eq!(persisted_chan_data_1.len(), 0);
115144

116145
// Helper to make sure the channel is on the expected update ID.
117146
macro_rules! check_persisted_data {
118147
($expected_update_id: expr) => {
119-
persisted_chan_data_0 =
120-
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager)
121-
.unwrap();
148+
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
149+
// check that we stored only one monitor
122150
assert_eq!(persisted_chan_data_0.len(), 1);
123151
for (_, mon) in persisted_chan_data_0.iter() {
124152
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
153+
154+
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
155+
assert_eq!(
156+
store_0
157+
.list(
158+
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
159+
monitor_name.as_str()
160+
)
161+
.unwrap()
162+
.len() as u64,
163+
mon.get_latest_update_id() % persister_0_max_pending_updates,
164+
"Wrong number of updates stored in persister 0",
165+
);
125166
}
126-
persisted_chan_data_1 =
127-
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager)
128-
.unwrap();
167+
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
129168
assert_eq!(persisted_chan_data_1.len(), 1);
130169
for (_, mon) in persisted_chan_data_1.iter() {
131170
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
171+
172+
let monitor_name = MonitorName::from(mon.get_funding_txo().0);
173+
assert_eq!(
174+
store_1
175+
.list(
176+
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
177+
monitor_name.as_str()
178+
)
179+
.unwrap()
180+
.len() as u64,
181+
mon.get_latest_update_id() % persister_1_max_pending_updates,
182+
"Wrong number of updates stored in persister 1",
183+
);
132184
}
133185
};
134186
}
@@ -138,52 +190,52 @@ pub(crate) fn do_test_store<K: KVStore>(store_0: &K, store_1: &K) {
138190
check_persisted_data!(0);
139191

140192
// Send a few payments and make sure the monitors are updated to the latest.
141-
send_payment(&nodes[0], &vec![&nodes[1]][..], 8000000);
142-
check_persisted_data!(5);
143-
send_payment(&nodes[1], &vec![&nodes[0]][..], 4000000);
144-
check_persisted_data!(10);
193+
send_payment(&nodes[0], &vec![&nodes[1]][..], 8_000_000);
194+
check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
195+
send_payment(&nodes[1], &vec![&nodes[0]][..], 4_000_000);
196+
check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);
197+
198+
// Send a few more payments to try all the alignments of max pending updates with
199+
// updates for a payment sent and received.
200+
let mut sender = 0;
201+
for i in 3..=persister_0_max_pending_updates * 2 {
202+
let receiver;
203+
if sender == 0 {
204+
sender = 1;
205+
receiver = 0;
206+
} else {
207+
sender = 0;
208+
receiver = 1;
209+
}
210+
send_payment(&nodes[sender], &vec![&nodes[receiver]][..], 21_000);
211+
check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
212+
}
145213

146214
// Force close because cooperative close doesn't result in any persisted
147215
// updates.
148-
nodes[0]
149-
.node
150-
.force_close_broadcasting_latest_txn(
151-
&nodes[0].node.list_channels()[0].channel_id,
152-
&nodes[1].node.get_our_node_id(),
153-
"whoops".to_string(),
154-
)
155-
.unwrap();
156-
check_closed_event!(
157-
nodes[0],
158-
1,
159-
ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) },
160-
[nodes[1].node.get_our_node_id()],
161-
100000
162-
);
216+
217+
let node_id_1 = nodes[1].node.get_our_node_id();
218+
let chan_id = nodes[0].node.list_channels()[0].channel_id;
219+
let err_msg = "Channel force-closed".to_string();
220+
nodes[0].node.force_close_broadcasting_latest_txn(&chan_id, &node_id_1, err_msg).unwrap();
221+
222+
let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true) };
223+
check_closed_event(&nodes[0], 1, reason, false, &[node_id_1], 100000);
163224
check_closed_broadcast!(nodes[0], true);
164225
check_added_monitors!(nodes[0], 1);
165226

166-
let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
227+
let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
167228
assert_eq!(node_txn.len(), 1);
229+
let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
230+
let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
231+
connect_block(&nodes[1], &dummy_block);
168232

169-
connect_block(
170-
&nodes[1],
171-
&create_dummy_block(
172-
nodes[0].best_block_hash(),
173-
42,
174-
vec![node_txn[0].clone(), node_txn[0].clone()],
175-
),
176-
);
177233
check_closed_broadcast!(nodes[1], true);
178-
check_closed_event!(
179-
nodes[1],
180-
1,
181-
ClosureReason::CommitmentTxConfirmed,
182-
[nodes[0].node.get_our_node_id()],
183-
100000
184-
);
234+
let reason = ClosureReason::CommitmentTxConfirmed;
235+
let node_id_0 = nodes[0].node.get_our_node_id();
236+
check_closed_event(&nodes[1], 1, reason, false, &[node_id_0], 100000);
185237
check_added_monitors!(nodes[1], 1);
186238

187239
// Make sure everything is persisted as expected after close.
188-
check_persisted_data!(11);
240+
check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1);
189241
}

src/types.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use lightning::routing::gossip;
2323
use lightning::routing::router::DefaultRouter;
2424
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
2525
use lightning::sign::InMemorySigner;
26-
use lightning::util::persist::KVStore;
26+
use lightning::util::persist::{MonitorUpdatingPersister, KVStore};
2727
use lightning::util::ser::{Readable, Writeable, Writer};
2828
use lightning::util::sweep::OutputSweeper;
2929

@@ -38,13 +38,22 @@ use std::sync::{Arc, Mutex};
3838

3939
pub(crate) type DynStore = dyn KVStore + Sync + Send;
4040

41+
pub type Persister = MonitorUpdatingPersister<
42+
Arc<DynStore>,
43+
Arc<Logger>,
44+
Arc<KeysManager>,
45+
Arc<KeysManager>,
46+
Arc<Broadcaster>,
47+
Arc<OnchainFeeEstimator>,
48+
>;
49+
4150
pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
4251
InMemorySigner,
4352
Arc<ChainSource>,
4453
Arc<Broadcaster>,
4554
Arc<OnchainFeeEstimator>,
4655
Arc<Logger>,
47-
Arc<DynStore>,
56+
Arc<Persister>,
4857
>;
4958

5059
pub(crate) type PeerManager = lightning::ln::peer_handler::PeerManager<

0 commit comments

Comments
 (0)