-
Notifications
You must be signed in to change notification settings - Fork 421
feat: Make MonitorUpdatingPersister change persist type based on size #3834
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
34fda75
b313e39
9b4508b
c7397bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -446,6 +446,13 @@ where | |
| /// If you have many stale updates stored (such as after a crash with pending lazy deletes), and | ||
| /// would like to get rid of them, consider using the | ||
| /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. | ||
| /// | ||
| /// # Size-based persistence optimization | ||
| /// | ||
| /// For small channel monitors (below `min_monitor_size_for_updates_bytes` bytes when serialized), | ||
| /// this persister will always write the full monitor instead of individual updates. This avoids | ||
| /// the overhead of managing update files and later compaction for tiny monitors that don't benefit | ||
| /// from differential updates. | ||
| pub struct MonitorUpdatingPersister<K: Deref, L: Deref, ES: Deref, SP: Deref, BI: Deref, FE: Deref> | ||
| where | ||
| K::Target: KVStore, | ||
|
|
@@ -458,6 +465,7 @@ where | |
| kv_store: K, | ||
| logger: L, | ||
| maximum_pending_updates: u64, | ||
| min_monitor_size_for_updates_bytes: usize, | ||
| entropy_source: ES, | ||
| signer_provider: SP, | ||
| broadcaster: BI, | ||
|
|
@@ -475,7 +483,7 @@ where | |
| BI::Target: BroadcasterInterface, | ||
| FE::Target: FeeEstimator, | ||
| { | ||
| /// Constructs a new [`MonitorUpdatingPersister`]. | ||
| /// Constructs a new [`MonitorUpdatingPersister`] with a default minimum monitor size threshold. | ||
| /// | ||
| /// The `maximum_pending_updates` parameter controls how many updates may be stored before a | ||
| /// [`MonitorUpdatingPersister`] consolidates updates by writing a full monitor. Note that | ||
|
|
@@ -491,14 +499,45 @@ where | |
| /// less frequent "waves." | ||
| /// - [`MonitorUpdatingPersister`] will potentially have more listing to do if you need to run | ||
| /// [`MonitorUpdatingPersister::cleanup_stale_updates`]. | ||
| /// | ||
| /// This sets `min_monitor_size_for_updates_bytes` to 8192 bytes (8 KiB), which is a reasonable | ||
Prabhat1308 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| /// default for most use cases. Monitors smaller than this will be persisted in full rather than | ||
| /// using update-based persistence. Use [`MonitorUpdatingPersister::new_with_monitor_size_threshold`] | ||
| /// if you need a custom threshold. | ||
| pub fn new( | ||
| kv_store: K, logger: L, maximum_pending_updates: u64, entropy_source: ES, | ||
| signer_provider: SP, broadcaster: BI, fee_estimator: FE, | ||
| ) -> Self { | ||
| Self::new_with_monitor_size_threshold( | ||
| kv_store, | ||
| logger, | ||
| maximum_pending_updates, | ||
| 8192, | ||
Prabhat1308 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| entropy_source, | ||
| signer_provider, | ||
| broadcaster, | ||
| fee_estimator, | ||
| ) | ||
| } | ||
|
|
||
| /// Constructs a new [`MonitorUpdatingPersister`] with a custom minimum monitor size threshold. | ||
| /// | ||
| /// The `min_monitor_size_for_updates_bytes` parameter sets the minimum serialized size (in bytes) | ||
| /// for a [`ChannelMonitor`] to use update-based persistence. Monitors smaller than this threshold | ||
| /// will always be persisted in full, avoiding the overhead of managing update files for tiny | ||
| /// monitors. Set to 0 to always use update-based persistence regardless of size. | ||
| /// | ||
| /// For other parameters, see [`MonitorUpdatingPersister::new`]. | ||
| pub fn new_with_monitor_size_threshold( | ||
|
||
| kv_store: K, logger: L, maximum_pending_updates: u64, | ||
| min_monitor_size_for_updates_bytes: usize, entropy_source: ES, signer_provider: SP, | ||
| broadcaster: BI, fee_estimator: FE, | ||
| ) -> Self { | ||
| MonitorUpdatingPersister { | ||
| kv_store, | ||
| logger, | ||
| maximum_pending_updates, | ||
| min_monitor_size_for_updates_bytes, | ||
| entropy_source, | ||
| signer_provider, | ||
| broadcaster, | ||
|
|
@@ -752,7 +791,12 @@ where | |
| ) -> chain::ChannelMonitorUpdateStatus { | ||
| const LEGACY_CLOSED_CHANNEL_UPDATE_ID: u64 = u64::MAX; | ||
| if let Some(update) = update { | ||
| let persist_update = update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID | ||
| // Check if monitor is too small for update-based persistence | ||
| let monitor_size = monitor.serialized_length(); | ||
| let use_full_persistence = monitor_size < self.min_monitor_size_for_updates_bytes; | ||
|
|
||
| let persist_update = !use_full_persistence | ||
| && update.update_id != LEGACY_CLOSED_CHANNEL_UPDATE_ID | ||
| && update.update_id % self.maximum_pending_updates != 0; | ||
| if persist_update { | ||
| let monitor_key = monitor_name.to_string(); | ||
|
|
@@ -1156,6 +1200,7 @@ mod tests { | |
| kv_store: &TestStore::new(false), | ||
| logger: &TestLogger::new(), | ||
| maximum_pending_updates: persister_0_max_pending_updates, | ||
| min_monitor_size_for_updates_bytes: 0, | ||
| entropy_source: &chanmon_cfgs[0].keys_manager, | ||
| signer_provider: &chanmon_cfgs[0].keys_manager, | ||
| broadcaster: &chanmon_cfgs[0].tx_broadcaster, | ||
|
|
@@ -1165,6 +1210,7 @@ mod tests { | |
| kv_store: &TestStore::new(false), | ||
| logger: &TestLogger::new(), | ||
| maximum_pending_updates: persister_1_max_pending_updates, | ||
| min_monitor_size_for_updates_bytes: 0, | ||
| entropy_source: &chanmon_cfgs[1].keys_manager, | ||
| signer_provider: &chanmon_cfgs[1].keys_manager, | ||
| broadcaster: &chanmon_cfgs[1].tx_broadcaster, | ||
|
|
@@ -1330,6 +1376,7 @@ mod tests { | |
| kv_store: &TestStore::new(true), | ||
| logger: &TestLogger::new(), | ||
| maximum_pending_updates: 11, | ||
| min_monitor_size_for_updates_bytes: 0, | ||
| entropy_source: node_cfgs[0].keys_manager, | ||
| signer_provider: node_cfgs[0].keys_manager, | ||
| broadcaster: node_cfgs[0].tx_broadcaster, | ||
|
|
@@ -1372,24 +1419,36 @@ mod tests { | |
| fn clean_stale_updates_works() { | ||
| let test_max_pending_updates = 7; | ||
| let chanmon_cfgs = create_chanmon_cfgs(3); | ||
| let persister_0 = MonitorUpdatingPersister { | ||
| kv_store: &TestStore::new(false), | ||
| logger: &TestLogger::new(), | ||
| maximum_pending_updates: test_max_pending_updates, | ||
| entropy_source: &chanmon_cfgs[0].keys_manager, | ||
| signer_provider: &chanmon_cfgs[0].keys_manager, | ||
| broadcaster: &chanmon_cfgs[0].tx_broadcaster, | ||
| fee_estimator: &chanmon_cfgs[0].fee_estimator, | ||
| }; | ||
| let persister_1 = MonitorUpdatingPersister { | ||
| kv_store: &TestStore::new(false), | ||
| logger: &TestLogger::new(), | ||
| maximum_pending_updates: test_max_pending_updates, | ||
| entropy_source: &chanmon_cfgs[1].keys_manager, | ||
| signer_provider: &chanmon_cfgs[1].keys_manager, | ||
| broadcaster: &chanmon_cfgs[1].tx_broadcaster, | ||
| fee_estimator: &chanmon_cfgs[1].fee_estimator, | ||
| }; | ||
| let store_0 = TestStore::new(false); | ||
| let logger_0 = TestLogger::new(); | ||
| let store_1 = TestStore::new(false); | ||
| let logger_1 = TestLogger::new(); | ||
|
|
||
| // Test the default new() constructor (uses 8192 byte threshold) | ||
| let persister_0 = MonitorUpdatingPersister::new( | ||
| &store_0, | ||
| &logger_0, | ||
| test_max_pending_updates, | ||
| &chanmon_cfgs[0].keys_manager, | ||
| &chanmon_cfgs[0].keys_manager, | ||
| &chanmon_cfgs[0].tx_broadcaster, | ||
| &chanmon_cfgs[0].fee_estimator, | ||
| ); | ||
| // Test the custom threshold constructor with zero threshold | ||
| let persister_1 = MonitorUpdatingPersister::new_with_monitor_size_threshold( | ||
| &store_1, | ||
| &logger_1, | ||
| test_max_pending_updates, | ||
| 0, // 0 byte threshold for maximum update usage | ||
| &chanmon_cfgs[1].keys_manager, | ||
| &chanmon_cfgs[1].keys_manager, | ||
| &chanmon_cfgs[1].tx_broadcaster, | ||
| &chanmon_cfgs[1].fee_estimator, | ||
| ); | ||
|
|
||
| // Verify the constructors set the thresholds correctly | ||
| assert_eq!(persister_0.min_monitor_size_for_updates_bytes, 8192); | ||
| assert_eq!(persister_1.min_monitor_size_for_updates_bytes, 0); | ||
| let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); | ||
| let chain_mon_0 = test_utils::TestChainMonitor::new( | ||
| Some(&chanmon_cfgs[0].chain_source), | ||
|
|
@@ -1452,6 +1511,118 @@ mod tests { | |
| .is_err()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_size_based_optimization() { | ||
| let chanmon_cfgs = create_chanmon_cfgs(2); | ||
| let store_0 = TestStore::new(false); | ||
| let logger_0 = TestLogger::new(); | ||
| let store_1 = TestStore::new(false); | ||
| let logger_1 = TestLogger::new(); | ||
|
|
||
| // Create a persister with a huge minimum size threshold (100KB) | ||
| // This should force all monitors to use full persistence instead of updates | ||
| // Test the new_with_monitor_size_threshold constructor with large threshold | ||
| let large_threshold_persister = MonitorUpdatingPersister::new_with_monitor_size_threshold( | ||
| &store_0, | ||
| &logger_0, | ||
| 5, | ||
| 100_000, | ||
| &chanmon_cfgs[0].keys_manager, | ||
| &chanmon_cfgs[0].keys_manager, | ||
| &chanmon_cfgs[0].tx_broadcaster, | ||
| &chanmon_cfgs[0].fee_estimator, | ||
| ); | ||
|
|
||
| // Create a persister with zero minimum size threshold | ||
| // This should allow all monitors to use update-based persistence | ||
| // Test the new_with_monitor_size_threshold constructor with zero threshold | ||
| let small_threshold_persister = MonitorUpdatingPersister::new_with_monitor_size_threshold( | ||
| &store_1, | ||
| &logger_1, | ||
| 5, | ||
| 0, // allows all monitors to use updates | ||
| &chanmon_cfgs[1].keys_manager, | ||
| &chanmon_cfgs[1].keys_manager, | ||
| &chanmon_cfgs[1].tx_broadcaster, | ||
| &chanmon_cfgs[1].fee_estimator, | ||
| ); | ||
|
|
||
| // Verify the constructors set the thresholds correctly | ||
| assert_eq!(large_threshold_persister.min_monitor_size_for_updates_bytes, 100_000); | ||
| assert_eq!(small_threshold_persister.min_monitor_size_for_updates_bytes, 0); | ||
|
|
||
| let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); | ||
| let chain_mon_0 = test_utils::TestChainMonitor::new( | ||
| Some(&chanmon_cfgs[0].chain_source), | ||
| &chanmon_cfgs[0].tx_broadcaster, | ||
| &chanmon_cfgs[0].logger, | ||
| &chanmon_cfgs[0].fee_estimator, | ||
| &large_threshold_persister, | ||
| &chanmon_cfgs[0].keys_manager, | ||
| ); | ||
| let chain_mon_1 = test_utils::TestChainMonitor::new( | ||
| Some(&chanmon_cfgs[1].chain_source), | ||
| &chanmon_cfgs[1].tx_broadcaster, | ||
| &chanmon_cfgs[1].logger, | ||
| &chanmon_cfgs[1].fee_estimator, | ||
| &small_threshold_persister, | ||
| &chanmon_cfgs[1].keys_manager, | ||
| ); | ||
| node_cfgs[0].chain_monitor = chain_mon_0; | ||
| node_cfgs[1].chain_monitor = chain_mon_1; | ||
| let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); | ||
| let nodes = create_network(2, &node_cfgs, &node_chanmgrs); | ||
|
|
||
| // Create a channel and make a payment to trigger monitor updates | ||
| let _ = create_announced_chan_between_nodes(&nodes, 0, 1); | ||
| send_payment(&nodes[0], &vec![&nodes[1]][..], 1_000_000); | ||
|
|
||
| // Test passes if we can create the channels and send payments without issues. | ||
| // The actual verification is that the different persisters behave differently | ||
| // based on their thresholds, which we can verify by ensuring no panics occur. | ||
|
|
||
| // Verify that monitors were created | ||
| let persisted_data_0 = | ||
| large_threshold_persister.read_all_channel_monitors_with_updates().unwrap(); | ||
| let persisted_data_1 = | ||
| small_threshold_persister.read_all_channel_monitors_with_updates().unwrap(); | ||
|
|
||
| assert_eq!(persisted_data_0.len(), 1); | ||
| assert_eq!(persisted_data_1.len(), 1); | ||
|
|
||
| // Verify the monitors exist and are of reasonable size | ||
| for (_, monitor) in persisted_data_0.iter() { | ||
| let monitor_size = monitor.serialized_length(); | ||
| // Verify the monitor is not empty and reasonably sized | ||
| assert!( | ||
| monitor_size > 1000, | ||
| "Monitor should be at least 1KB in size, got {} bytes", | ||
| monitor_size | ||
| ); | ||
| assert!( | ||
| monitor_size < 100_000, | ||
| "Monitor should be smaller than 100KB threshold, got {} bytes", | ||
| monitor_size | ||
| ); | ||
| } | ||
|
|
||
| for (_, monitor) in persisted_data_1.iter() { | ||
| let monitor_size = monitor.serialized_length(); | ||
| // Verify the monitor is not empty and reasonably sized | ||
| assert!( | ||
| monitor_size > 1000, | ||
| "Monitor should be at least 1KB in size, got {} bytes", | ||
| monitor_size | ||
| ); | ||
| // Since threshold is 0, this monitor should be large enough to use updates | ||
| assert!( | ||
| monitor_size > 0, | ||
| "Monitor should be larger than 0 byte threshold, got {} bytes", | ||
| monitor_size | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| fn persist_fn<P: Deref, ChannelSigner: EcdsaChannelSigner>(_persist: P) -> bool | ||
| where | ||
| P::Target: Persist<ChannelSigner>, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still not clear to me how much the gain is of this in practice. Also worried that disabling the incremental path initially allow certain bugs to linger for longer, just because the path isn't hit as much, or rarely.