@@ -171,6 +171,16 @@ const SWEEPER_TIMER: Duration = Duration::from_secs(30);
171171#[ cfg( test) ]
172172const SWEEPER_TIMER : Duration = Duration :: from_secs ( 1 ) ;
173173
174+ #[ cfg( not( test) ) ]
175+ const FIRST_ARCHIVE_STALE_MONITORS_TIMER : Duration = Duration :: from_secs ( 15 ) ;
176+ #[ cfg( test) ]
177+ const FIRST_ARCHIVE_STALE_MONITORS_TIMER : Duration = Duration :: ZERO ;
178+
179+ #[ cfg( not( test) ) ]
180+ const ARCHIVE_STALE_MONITORS_TIMER : Duration = Duration :: from_secs ( 60 * 10 ) ;
181+ #[ cfg( test) ]
182+ const ARCHIVE_STALE_MONITORS_TIMER : Duration = Duration :: from_secs ( 1 ) ;
183+
174184/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
175185const fn min_duration ( a : Duration , b : Duration ) -> Duration {
176186 if a. as_nanos ( ) < b. as_nanos ( ) {
@@ -1018,8 +1028,10 @@ where
10181028 let mut last_scorer_persist_call = sleeper ( SCORER_PERSIST_TIMER ) ;
10191029 let mut last_rebroadcast_call = sleeper ( REBROADCAST_TIMER ) ;
10201030 let mut last_sweeper_call = sleeper ( SWEEPER_TIMER ) ;
1031+ let mut last_archive_call = sleeper ( FIRST_ARCHIVE_STALE_MONITORS_TIMER ) ;
10211032 let mut have_pruned = false ;
10221033 let mut have_decayed_scorer = false ;
1034+ let mut have_archived = false ;
10231035
10241036 let mut last_forwards_processing_call = sleeper ( batch_delay. get ( ) ) ;
10251037
@@ -1147,11 +1159,31 @@ where
11471159 log_trace ! ( logger, "Done persisting ChannelManager." ) ;
11481160 }
11491161
1150- // Note that we want to run a graph prune once not long after startup before
1151- // falling back to our usual hourly prunes. This avoids short-lived clients never
1152- // pruning their network graph. We run once 60 seconds after startup before
1153- // continuing our normal cadence. For RGS, since 60 seconds is likely too long,
1154- // we prune after an initial sync completes.
1162+ // Note that we want to archive stale ChannelMonitors and run a network graph prune once
1163+ // not long after startup before falling back to their usual infrequent runs. This avoids
1164+ // short-lived clients never archiving stale ChannelMonitors or pruning their network
1165+ // graph. For network graph pruning, in the case of RGS sync, we run a prune immediately
1166+ // after initial sync completes, otherwise we do so on a timer which should be long enough
1167+ // to give us a chance to get most of the network graph from our peers.
1168+ let archive_timer = if have_archived {
1169+ ARCHIVE_STALE_MONITORS_TIMER
1170+ } else {
1171+ FIRST_ARCHIVE_STALE_MONITORS_TIMER
1172+ } ;
1173+ let archive_timer_elapsed = {
1174+ match check_and_reset_sleeper ( & mut last_archive_call, || sleeper ( archive_timer) ) {
1175+ Some ( false ) => true ,
1176+ Some ( true ) => break ,
1177+ None => false ,
1178+ }
1179+ } ;
1180+ if archive_timer_elapsed {
1181+ log_trace ! ( logger, "Archiving stale ChannelMonitors." ) ;
1182+ chain_monitor. archive_fully_resolved_channel_monitors ( ) ;
1183+ have_archived = true ;
1184+ log_trace ! ( logger, "Archived stale ChannelMonitors." ) ;
1185+ }
1186+
11551187 let prune_timer = if gossip_sync. prunable_network_graph ( ) . is_some ( ) {
11561188 NETWORK_PRUNE_TIMER
11571189 } else {
@@ -1601,8 +1633,10 @@ impl BackgroundProcessor {
16011633 let mut last_scorer_persist_call = Instant :: now ( ) ;
16021634 let mut last_rebroadcast_call = Instant :: now ( ) ;
16031635 let mut last_sweeper_call = Instant :: now ( ) ;
1636+ let mut last_archive_call = Instant :: now ( ) ;
16041637 let mut have_pruned = false ;
16051638 let mut have_decayed_scorer = false ;
1639+ let mut have_archived = false ;
16061640
16071641 let mut cur_batch_delay = batch_delay. get ( ) ;
16081642 let mut last_forwards_processing_call = Instant :: now ( ) ;
@@ -1691,11 +1725,26 @@ impl BackgroundProcessor {
16911725 } ) ;
16921726 }
16931727
1694- // Note that we want to run a graph prune once not long after startup before
1695- // falling back to our usual hourly prunes. This avoids short-lived clients never
1696- // pruning their network graph. We run once 60 seconds after startup before
1697- // continuing our normal cadence. For RGS, since 60 seconds is likely too long,
1698- // we prune after an initial sync completes.
1728+ // Note that we want to archive stale ChannelMonitors and run a network graph prune once
1729+ // not long after startup before falling back to their usual infrequent runs. This avoids
1730+ // short-lived clients never archiving stale ChannelMonitors or pruning their network
1731+ // graph. For network graph pruning, in the case of RGS sync, we run a prune immediately
1732+ // after initial sync completes, otherwise we do so on a timer which should be long enough
1733+ // to give us a chance to get most of the network graph from our peers.
1734+ let archive_timer = if have_archived {
1735+ ARCHIVE_STALE_MONITORS_TIMER
1736+ } else {
1737+ FIRST_ARCHIVE_STALE_MONITORS_TIMER
1738+ } ;
1739+ let archive_timer_elapsed = last_archive_call. elapsed ( ) > archive_timer;
1740+ if archive_timer_elapsed {
1741+ log_trace ! ( logger, "Archiving stale ChannelMonitors." ) ;
1742+ chain_monitor. archive_fully_resolved_channel_monitors ( ) ;
1743+ have_archived = true ;
1744+ last_archive_call = Instant :: now ( ) ;
1745+ log_trace ! ( logger, "Archived stale ChannelMonitors." ) ;
1746+ }
1747+
16991748 let prune_timer =
17001749 if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } ;
17011750 let prune_timer_elapsed = last_prune_call. elapsed ( ) > prune_timer;
@@ -3698,4 +3747,127 @@ mod tests {
36983747 exit_sender. send ( ( ) ) . unwrap ( ) ;
36993748 t1. await . unwrap ( ) . unwrap ( ) ;
37003749 }
3750+
3751+ #[ test]
3752+ fn test_monitor_archive ( ) {
3753+ let ( persist_dir, nodes) = create_nodes ( 2 , "test_monitor_archive" ) ;
3754+ // Open a channel, but don't confirm it so that it prunes immediately on FC.
3755+ open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
3756+
3757+ let data_dir = nodes[ 1 ] . kv_store . get_data_dir ( ) ;
3758+ let persister = Arc :: new ( Persister :: new ( data_dir) ) ;
3759+ let event_handler = |_: _ | Ok ( ( ) ) ;
3760+ let bp = BackgroundProcessor :: start (
3761+ persister,
3762+ event_handler,
3763+ Arc :: clone ( & nodes[ 1 ] . chain_monitor ) ,
3764+ Arc :: clone ( & nodes[ 1 ] . node ) ,
3765+ Some ( Arc :: clone ( & nodes[ 1 ] . messenger ) ) ,
3766+ nodes[ 1 ] . p2p_gossip_sync ( ) ,
3767+ Arc :: clone ( & nodes[ 1 ] . peer_manager ) ,
3768+ Some ( Arc :: clone ( & nodes[ 1 ] . liquidity_manager ) ) ,
3769+ Some ( Arc :: clone ( & nodes[ 1 ] . sweeper ) ) ,
3770+ Arc :: clone ( & nodes[ 1 ] . logger ) ,
3771+ Some ( Arc :: clone ( & nodes[ 1 ] . scorer ) ) ,
3772+ ) ;
3773+
3774+ let dir = format ! ( "{}_persister_1/monitors" , & persist_dir) ;
3775+ let mut mons = std:: fs:: read_dir ( & dir) . unwrap ( ) ;
3776+ let mut mon = mons. next ( ) . unwrap ( ) . unwrap ( ) ;
3777+ if mon. path ( ) . to_str ( ) . unwrap ( ) . ends_with ( ".tmp" ) {
3778+ mon = mons. next ( ) . unwrap ( ) . unwrap ( ) ;
3779+ assert_eq ! ( mon. path( ) . extension( ) , None ) ;
3780+ }
3781+ assert ! ( mons. next( ) . is_none( ) ) ;
3782+
3783+ // Because the channel wasn't funded, we'll archive the ChannelMonitor immedaitely after
3784+ // its force-closed (at least on node B, which didn't put their money into it).
3785+ nodes[ 1 ] . node . force_close_all_channels_broadcasting_latest_txn ( "" . to_owned ( ) ) ;
3786+ loop {
3787+ let mut mons = std:: fs:: read_dir ( & dir) . unwrap ( ) ;
3788+ if let Some ( new_mon) = mons. next ( ) {
3789+ let mut new_mon = new_mon. unwrap ( ) ;
3790+ if new_mon. path ( ) . to_str ( ) . unwrap ( ) . ends_with ( ".tmp" ) {
3791+ new_mon = mons. next ( ) . unwrap ( ) . unwrap ( ) ;
3792+ assert_eq ! ( new_mon. path( ) . extension( ) , None ) ;
3793+ }
3794+ assert_eq ! ( new_mon. path( ) , mon. path( ) ) ;
3795+ assert ! ( mons. next( ) . is_none( ) ) ;
3796+ } else {
3797+ break ;
3798+ }
3799+ }
3800+
3801+ bp. stop ( ) . unwrap ( ) ;
3802+ }
3803+
3804+ #[ tokio:: test]
3805+ #[ cfg( not( c_bindings) ) ]
3806+ async fn test_monitor_archive_async ( ) {
3807+ let ( persist_dir, nodes) = create_nodes ( 2 , "test_monitor_archive_async" ) ;
3808+ // Open a channel, but don't confirm it so that it prunes immediately on FC.
3809+ open_channel ! ( nodes[ 0 ] , nodes[ 1 ] , 100000 ) ;
3810+
3811+ let kv_store = KVStoreSyncWrapper ( Arc :: clone ( & nodes[ 0 ] . kv_store ) ) ;
3812+ let sweeper_async: & ' static OutputSweeper < _ , _ , _ , _ , _ , _ , _ > = unsafe {
3813+ & * ( nodes[ 0 ] . sweeper . sweeper_async ( ) as * const OutputSweeper < _ , _ , _ , _ , _ , _ , _ > )
3814+ as & ' static OutputSweeper < _ , _ , _ , _ , _ , _ , _ >
3815+ } ;
3816+ let ( exit_sender, exit_receiver) = tokio:: sync:: watch:: channel ( ( ) ) ;
3817+ let bp_future = tokio:: spawn ( super :: process_events_async (
3818+ kv_store,
3819+ move |_: Event | async move { Ok ( ( ) ) } ,
3820+ Arc :: clone ( & nodes[ 1 ] . chain_monitor ) ,
3821+ Arc :: clone ( & nodes[ 1 ] . node ) ,
3822+ crate :: NO_ONION_MESSENGER ,
3823+ nodes[ 1 ] . no_gossip_sync ( ) ,
3824+ Arc :: clone ( & nodes[ 1 ] . peer_manager ) ,
3825+ crate :: NO_LIQUIDITY_MANAGER ,
3826+ Some ( sweeper_async) ,
3827+ Arc :: clone ( & nodes[ 1 ] . logger ) ,
3828+ Some ( Arc :: clone ( & nodes[ 1 ] . scorer ) ) ,
3829+ move |dur : Duration | {
3830+ let mut exit_receiver = exit_receiver. clone ( ) ;
3831+ Box :: pin ( async move {
3832+ tokio:: select! {
3833+ _ = tokio:: time:: sleep( dur) => false ,
3834+ _ = exit_receiver. changed( ) => true ,
3835+ }
3836+ } )
3837+ } ,
3838+ false ,
3839+ || Some ( Duration :: ZERO ) ,
3840+ ) ) ;
3841+
3842+ let dir = format ! ( "{}_persister_1/monitors" , & persist_dir) ;
3843+ let mut mons = std:: fs:: read_dir ( & dir) . unwrap ( ) ;
3844+ let mut mon = mons. next ( ) . unwrap ( ) . unwrap ( ) ;
3845+ if mon. path ( ) . to_str ( ) . unwrap ( ) . ends_with ( ".tmp" ) {
3846+ mon = mons. next ( ) . unwrap ( ) . unwrap ( ) ;
3847+ assert_eq ! ( mon. path( ) . extension( ) , None ) ;
3848+ }
3849+ assert ! ( mons. next( ) . is_none( ) ) ;
3850+
3851+ // Because the channel wasn't funded, we'll archive the ChannelMonitor immedaitely after
3852+ // its force-closed (at least on node B, which didn't put their money into it).
3853+ nodes[ 1 ] . node . force_close_all_channels_broadcasting_latest_txn ( "" . to_owned ( ) ) ;
3854+ loop {
3855+ let mut mons = std:: fs:: read_dir ( & dir) . unwrap ( ) ;
3856+ if let Some ( new_mon) = mons. next ( ) {
3857+ let mut new_mon = new_mon. unwrap ( ) ;
3858+ if new_mon. path ( ) . to_str ( ) . unwrap ( ) . ends_with ( ".tmp" ) {
3859+ new_mon = mons. next ( ) . unwrap ( ) . unwrap ( ) ;
3860+ assert_eq ! ( new_mon. path( ) . extension( ) , None ) ;
3861+ }
3862+ assert_eq ! ( new_mon. path( ) , mon. path( ) ) ;
3863+ assert ! ( mons. next( ) . is_none( ) ) ;
3864+ } else {
3865+ break ;
3866+ }
3867+ tokio:: task:: yield_now ( ) . await ;
3868+ }
3869+
3870+ exit_sender. send ( ( ) ) . unwrap ( ) ;
3871+ bp_future. await . unwrap ( ) . unwrap ( ) ;
3872+ }
37013873}
0 commit comments