Skip to content

Commit 8368fdd

Browse files
authored
Merge pull request #661 from tnull/2025-10-MUP
Use `MonitorUpdatingPersister`
2 parents 121dadb + fc6a7ff commit 8368fdd

File tree

3 files changed

+128
-72
lines changed

3 files changed

+128
-72
lines changed

src/builder.rs

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use lightning::routing::scoring::{
3131
};
3232
use lightning::sign::{EntropySource, NodeSigner};
3333
use lightning::util::persist::{
34-
read_channel_monitors, KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
35-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
34+
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
35+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
3636
};
3737
use lightning::util::ser::ReadableArgs;
3838
use lightning::util::sweep::OutputSweeper;
@@ -66,7 +66,7 @@ use crate::runtime::Runtime;
6666
use crate::tx_broadcaster::TransactionBroadcaster;
6767
use crate::types::{
6868
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
69-
OnionMessenger, PaymentStore, PeerManager,
69+
OnionMessenger, PaymentStore, PeerManager, Persister,
7070
};
7171
use crate::wallet::persist::KVStoreWalletPersister;
7272
use crate::wallet::Wallet;
@@ -75,6 +75,7 @@ use crate::{Node, NodeMetrics};
7575
const VSS_HARDENED_CHILD_INDEX: u32 = 877;
7676
const VSS_LNURL_AUTH_HARDENED_CHILD_INDEX: u32 = 138;
7777
const LSPS_HARDENED_CHILD_INDEX: u32 = 577;
78+
const PERSISTER_MAX_PENDING_UPDATES: u64 = 100;
7879

7980
#[derive(Debug, Clone)]
8081
enum ChainDataSourceConfig {
@@ -1317,14 +1318,36 @@ fn build_with_store_internal(
13171318
));
13181319

13191320
let peer_storage_key = keys_manager.get_peer_storage_key();
1321+
let persister = Arc::new(Persister::new(
1322+
Arc::clone(&kv_store),
1323+
Arc::clone(&logger),
1324+
PERSISTER_MAX_PENDING_UPDATES,
1325+
Arc::clone(&keys_manager),
1326+
Arc::clone(&keys_manager),
1327+
Arc::clone(&tx_broadcaster),
1328+
Arc::clone(&fee_estimator),
1329+
));
1330+
1331+
// Read ChannelMonitor state from store
1332+
let channel_monitors = match persister.read_all_channel_monitors_with_updates() {
1333+
Ok(monitors) => monitors,
1334+
Err(e) => {
1335+
if e.kind() == lightning::io::ErrorKind::NotFound {
1336+
Vec::new()
1337+
} else {
1338+
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
1339+
return Err(BuildError::ReadFailed);
1340+
}
1341+
},
1342+
};
13201343

13211344
// Initialize the ChainMonitor
13221345
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
13231346
Some(Arc::clone(&chain_source)),
13241347
Arc::clone(&tx_broadcaster),
13251348
Arc::clone(&logger),
13261349
Arc::clone(&fee_estimator),
1327-
Arc::clone(&kv_store),
1350+
Arc::clone(&persister),
13281351
Arc::clone(&keys_manager),
13291352
peer_storage_key,
13301353
));
@@ -1371,23 +1394,6 @@ fn build_with_store_internal(
13711394
scoring_fee_params,
13721395
));
13731396

1374-
// Read ChannelMonitor state from store
1375-
let channel_monitors = match read_channel_monitors(
1376-
Arc::clone(&kv_store),
1377-
Arc::clone(&keys_manager),
1378-
Arc::clone(&keys_manager),
1379-
) {
1380-
Ok(monitors) => monitors,
1381-
Err(e) => {
1382-
if e.kind() == lightning::io::ErrorKind::NotFound {
1383-
Vec::new()
1384-
} else {
1385-
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
1386-
return Err(BuildError::ReadFailed);
1387-
}
1388-
},
1389-
};
1390-
13911397
let mut user_config = default_user_config(&config);
13921398

13931399
if liquidity_source_config.and_then(|lsc| lsc.lsps2_service.as_ref()).is_some() {

src/io/test_utils.rs

Lines changed: 90 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,28 @@ use std::path::PathBuf;
1111
use lightning::events::ClosureReason;
1212
use lightning::ln::functional_test_utils::{
1313
connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block,
14-
create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
14+
create_network, create_node_cfgs, create_node_chanmgrs, send_payment, TestChanMonCfg,
1515
};
16-
use lightning::util::persist::{read_channel_monitors, KVStoreSync, KVSTORE_NAMESPACE_KEY_MAX_LEN};
16+
use lightning::util::persist::{
17+
KVStoreSync, MonitorUpdatingPersister, KVSTORE_NAMESPACE_KEY_MAX_LEN,
18+
};
19+
1720
use lightning::util::test_utils;
1821
use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event};
1922
use rand::distributions::Alphanumeric;
2023
use rand::{thread_rng, Rng};
2124

25+
type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister<
26+
&'a K,
27+
&'a test_utils::TestLogger,
28+
&'a test_utils::TestKeysInterface,
29+
&'a test_utils::TestKeysInterface,
30+
&'a test_utils::TestBroadcaster,
31+
&'a test_utils::TestFeeEstimator,
32+
>;
33+
34+
const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;
35+
2236
pub(crate) fn random_storage_path() -> PathBuf {
2337
let mut temp_path = std::env::temp_dir();
2438
let mut rng = thread_rng();
@@ -77,54 +91,71 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
7791
assert_eq!(listed_keys.len(), 0);
7892
}
7993

94+
pub(crate) fn create_persister<'a, K: KVStoreSync + Sync>(
95+
store: &'a K, chanmon_cfg: &'a TestChanMonCfg, max_pending_updates: u64,
96+
) -> TestMonitorUpdatePersister<'a, K> {
97+
MonitorUpdatingPersister::new(
98+
store,
99+
&chanmon_cfg.logger,
100+
max_pending_updates,
101+
&chanmon_cfg.keys_manager,
102+
&chanmon_cfg.keys_manager,
103+
&chanmon_cfg.tx_broadcaster,
104+
&chanmon_cfg.fee_estimator,
105+
)
106+
}
107+
108+
pub(crate) fn create_chain_monitor<'a, K: KVStoreSync + Sync>(
109+
chanmon_cfg: &'a TestChanMonCfg, persister: &'a TestMonitorUpdatePersister<'a, K>,
110+
) -> test_utils::TestChainMonitor<'a> {
111+
test_utils::TestChainMonitor::new(
112+
Some(&chanmon_cfg.chain_source),
113+
&chanmon_cfg.tx_broadcaster,
114+
&chanmon_cfg.logger,
115+
&chanmon_cfg.fee_estimator,
116+
persister,
117+
&chanmon_cfg.keys_manager,
118+
)
119+
}
120+
80121
// Integration-test the given KVStore implementation. Test relaying a few payments and check that
81122
// the persisted data is updated the appropriate number of times.
82123
pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
124+
// This value is used later to limit how many iterations we perform.
125+
let persister_0_max_pending_updates = 7;
126+
// Intentionally set this to a smaller value to test a different alignment.
127+
let persister_1_max_pending_updates = 3;
128+
83129
let chanmon_cfgs = create_chanmon_cfgs(2);
130+
131+
let persister_0 = create_persister(store_0, &chanmon_cfgs[0], persister_0_max_pending_updates);
132+
let persister_1 = create_persister(store_1, &chanmon_cfgs[1], persister_1_max_pending_updates);
133+
134+
let chain_mon_0 = create_chain_monitor(&chanmon_cfgs[0], &persister_0);
135+
let chain_mon_1 = create_chain_monitor(&chanmon_cfgs[1], &persister_1);
136+
84137
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
85-
let chain_mon_0 = test_utils::TestChainMonitor::new(
86-
Some(&chanmon_cfgs[0].chain_source),
87-
&chanmon_cfgs[0].tx_broadcaster,
88-
&chanmon_cfgs[0].logger,
89-
&chanmon_cfgs[0].fee_estimator,
90-
store_0,
91-
node_cfgs[0].keys_manager,
92-
);
93-
let chain_mon_1 = test_utils::TestChainMonitor::new(
94-
Some(&chanmon_cfgs[1].chain_source),
95-
&chanmon_cfgs[1].tx_broadcaster,
96-
&chanmon_cfgs[1].logger,
97-
&chanmon_cfgs[1].fee_estimator,
98-
store_1,
99-
node_cfgs[1].keys_manager,
100-
);
101138
node_cfgs[0].chain_monitor = chain_mon_0;
102139
node_cfgs[1].chain_monitor = chain_mon_1;
103140
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
104141
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
105142

106143
// Check that the persisted channel data is empty before any channels are
107144
// open.
108-
let mut persisted_chan_data_0 =
109-
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
145+
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
110146
assert_eq!(persisted_chan_data_0.len(), 0);
111-
let mut persisted_chan_data_1 =
112-
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
147+
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
113148
assert_eq!(persisted_chan_data_1.len(), 0);
114149

115150
// Helper to make sure the channel is on the expected update ID.
116151
macro_rules! check_persisted_data {
117152
($expected_update_id: expr) => {
118-
persisted_chan_data_0 =
119-
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager)
120-
.unwrap();
153+
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
121154
assert_eq!(persisted_chan_data_0.len(), 1);
122155
for (_, mon) in persisted_chan_data_0.iter() {
123156
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
124157
}
125-
persisted_chan_data_1 =
126-
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager)
127-
.unwrap();
158+
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
128159
assert_eq!(persisted_chan_data_1.len(), 1);
129160
for (_, mon) in persisted_chan_data_1.iter() {
130161
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
@@ -137,10 +168,29 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
137168
check_persisted_data!(0);
138169

139170
// Send a few payments and make sure the monitors are updated to the latest.
140-
send_payment(&nodes[0], &vec![&nodes[1]][..], 8000000);
141-
check_persisted_data!(5);
142-
send_payment(&nodes[1], &vec![&nodes[0]][..], 4000000);
143-
check_persisted_data!(10);
171+
let expected_route = &[&nodes[1]][..];
172+
send_payment(&nodes[0], expected_route, 8_000_000);
173+
check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
174+
let expected_route = &[&nodes[0]][..];
175+
send_payment(&nodes[1], expected_route, 4_000_000);
176+
check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);
177+
178+
// Send a few more payments to try all the alignments of max pending updates with
179+
// updates for a payment sent and received.
180+
let mut sender = 0;
181+
for i in 3..=persister_0_max_pending_updates * 2 {
182+
let receiver;
183+
if sender == 0 {
184+
sender = 1;
185+
receiver = 0;
186+
} else {
187+
sender = 0;
188+
receiver = 1;
189+
}
190+
let expected_route = &[&nodes[receiver]][..];
191+
send_payment(&nodes[sender], expected_route, 21_000);
192+
check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
193+
}
144194

145195
// Force close because cooperative close doesn't result in any persisted
146196
// updates.
@@ -163,27 +213,18 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
163213
check_closed_broadcast!(nodes[0], true);
164214
check_added_monitors!(nodes[0], 1);
165215

166-
let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
216+
let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
167217
assert_eq!(node_txn.len(), 1);
218+
let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
219+
let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
220+
connect_block(&nodes[1], &dummy_block);
168221

169-
connect_block(
170-
&nodes[1],
171-
&create_dummy_block(
172-
nodes[0].best_block_hash(),
173-
42,
174-
vec![node_txn[0].clone(), node_txn[0].clone()],
175-
),
176-
);
177222
check_closed_broadcast!(nodes[1], true);
178-
check_closed_event!(
179-
nodes[1],
180-
1,
181-
ClosureReason::CommitmentTxConfirmed,
182-
[nodes[0].node.get_our_node_id()],
183-
100000
184-
);
223+
let reason = ClosureReason::CommitmentTxConfirmed;
224+
let node_id_0 = nodes[0].node.get_our_node_id();
225+
check_closed_event!(nodes[1], 1, reason, false, [node_id_0], 100000);
185226
check_added_monitors!(nodes[1], 1);
186227

187228
// Make sure everything is persisted as expected after close.
188-
check_persisted_data!(11);
229+
check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1);
189230
}

src/types.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use lightning::routing::gossip;
1919
use lightning::routing::router::DefaultRouter;
2020
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
2121
use lightning::sign::InMemorySigner;
22-
use lightning::util::persist::{KVStore, KVStoreSync};
22+
use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister};
2323
use lightning::util::ser::{Readable, Writeable, Writer};
2424
use lightning::util::sweep::OutputSweeper;
2525
use lightning_block_sync::gossip::{GossipVerifier, UtxoSource};
@@ -49,13 +49,22 @@ where
4949
/// A type alias for [`SyncAndAsyncKVStore`] with `Sync`/`Send` markers;
5050
pub type DynStore = dyn SyncAndAsyncKVStore + Sync + Send;
5151

52+
pub type Persister = MonitorUpdatingPersister<
53+
Arc<DynStore>,
54+
Arc<Logger>,
55+
Arc<KeysManager>,
56+
Arc<KeysManager>,
57+
Arc<Broadcaster>,
58+
Arc<OnchainFeeEstimator>,
59+
>;
60+
5261
pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
5362
InMemorySigner,
5463
Arc<ChainSource>,
5564
Arc<Broadcaster>,
5665
Arc<OnchainFeeEstimator>,
5766
Arc<Logger>,
58-
Arc<DynStore>,
67+
Arc<Persister>,
5968
Arc<KeysManager>,
6069
>;
6170

0 commit comments

Comments
 (0)