@@ -36,8 +36,10 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636use lightning:: routing:: gossip:: { NetworkGraph , P2PGossipSync } ;
3737use lightning:: routing:: scoring:: { ScoreUpdate , WriteableScore } ;
3838use lightning:: routing:: utxo:: UtxoLookup ;
39+ use lightning:: sign:: { ChangeDestinationSource , OutputSpender } ;
3940use lightning:: util:: logger:: Logger ;
40- use lightning:: util:: persist:: Persister ;
41+ use lightning:: util:: persist:: { KVStore , Persister } ;
42+ use lightning:: util:: sweep:: OutputSweeper ;
4143#[ cfg( feature = "std" ) ]
4244use lightning:: util:: wakers:: Sleeper ;
4345use lightning_rapid_gossip_sync:: RapidGossipSync ;
@@ -132,6 +134,11 @@ const REBROADCAST_TIMER: u64 = 30;
132134#[ cfg( test) ]
133135const REBROADCAST_TIMER : u64 = 1 ;
134136
137+ #[ cfg( not( test) ) ]
138+ const SWEEPER_TIMER : u64 = 30 ;
139+ #[ cfg( test) ]
140+ const SWEEPER_TIMER : u64 = 1 ;
141+
135142#[ cfg( feature = "futures" ) ]
136143/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
137144const fn min_u64 ( a : u64 , b : u64 ) -> u64 {
@@ -308,6 +315,7 @@ macro_rules! define_run_body {
308315 $channel_manager: ident, $process_channel_manager_events: expr,
309316 $onion_messenger: ident, $process_onion_message_handler_events: expr,
310317 $peer_manager: ident, $gossip_sync: ident,
318+ $process_sweeper: expr,
311319 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
312320 $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
313321 ) => { {
@@ -322,6 +330,7 @@ macro_rules! define_run_body {
322330 let mut last_prune_call = $get_timer( FIRST_NETWORK_PRUNE_TIMER ) ;
323331 let mut last_scorer_persist_call = $get_timer( SCORER_PERSIST_TIMER ) ;
324332 let mut last_rebroadcast_call = $get_timer( REBROADCAST_TIMER ) ;
333+ let mut last_sweeper_call = $get_timer( SWEEPER_TIMER ) ;
325334 let mut have_pruned = false ;
326335 let mut have_decayed_scorer = false ;
327336
@@ -465,6 +474,12 @@ macro_rules! define_run_body {
465474 $chain_monitor. rebroadcast_pending_claims( ) ;
466475 last_rebroadcast_call = $get_timer( REBROADCAST_TIMER ) ;
467476 }
477+
478+ if $timer_elapsed( & mut last_sweeper_call, SWEEPER_TIMER ) {
479+ log_trace!( $logger, "Regenerating sweeper spends if necessary" ) ;
480+ let _ = $process_sweeper;
481+ last_sweeper_call = $get_timer( SWEEPER_TIMER ) ;
482+ }
468483 }
469484
470485 // After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -627,6 +642,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627642/// ```
628643/// # use lightning::io;
629644/// # use lightning::events::ReplayEvent;
645+ /// # use lightning::util::sweep::OutputSweeper;
630646/// # use std::sync::{Arc, RwLock};
631647/// # use std::sync::atomic::{AtomicBool, Ordering};
632648/// # use std::time::SystemTime;
@@ -666,6 +682,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
666682/// # F: lightning::chain::Filter + Send + Sync + 'static,
667683/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
668684/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
685+ /// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
686+ /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
687+ /// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
669688/// # > {
670689/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
671690/// # event_handler: Arc<EventHandler>,
@@ -677,14 +696,18 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
677696/// # persister: Arc<Store>,
678697/// # logger: Arc<Logger>,
679698/// # scorer: Arc<Scorer>,
699+ /// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
680700/// # }
681701/// #
682702/// # async fn setup_background_processing<
683703/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
684704/// # F: lightning::chain::Filter + Send + Sync + 'static,
685705/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
686706/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
687- /// # >(node: Node<B, F, FE, UL>) {
707+ /// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
708+ /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
709+ /// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
710+ /// # >(node: Node<B, F, FE, UL, D, K, O>) {
688711/// let background_persister = Arc::clone(&node.persister);
689712/// let background_event_handler = Arc::clone(&node.event_handler);
690713/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -695,7 +718,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
695718/// let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
696719/// let background_logger = Arc::clone(&node.logger);
697720/// let background_scorer = Arc::clone(&node.scorer);
698- ///
721+ /// let background_sweeper = Arc::clone(&node.sweeper);
722+
699723/// // Setup the sleeper.
700724#[ cfg_attr(
701725 feature = "std" ,
@@ -729,6 +753,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
729753/// background_gossip_sync,
730754/// background_peer_man,
731755/// Some(background_liquidity_manager),
756+ /// Some(background_sweeper),
732757/// background_logger,
733758/// Some(background_scorer),
734759/// sleeper,
@@ -767,6 +792,10 @@ pub async fn process_events_async<
767792 RGS : ' static + Deref < Target = RapidGossipSync < G , L > > ,
768793 PM : ' static + Deref ,
769794 LM : ' static + Deref ,
795+ D : ' static + Deref ,
796+ O : ' static + Deref ,
797+ K : ' static + Deref ,
798+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > ,
770799 S : ' static + Deref < Target = SC > + Send + Sync ,
771800 SC : for < ' b > WriteableScore < ' b > ,
772801 SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
@@ -775,12 +804,12 @@ pub async fn process_events_async<
775804> (
776805 persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
777806 onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
778- liquidity_manager : Option < LM > , logger : L , scorer : Option < S > , sleeper : Sleeper ,
779- mobile_interruptable_platform : bool , fetch_time : FetchTime ,
807+ liquidity_manager : Option < LM > , sweeper : Option < OS > , logger : L , scorer : Option < S > ,
808+ sleeper : Sleeper , mobile_interruptable_platform : bool , fetch_time : FetchTime ,
780809) -> Result < ( ) , lightning:: io:: Error >
781810where
782811 UL :: Target : ' static + UtxoLookup ,
783- CF :: Target : ' static + chain:: Filter ,
812+ CF :: Target : ' static + chain:: Filter + Sync + Send ,
784813 T :: Target : ' static + BroadcasterInterface ,
785814 F :: Target : ' static + FeeEstimator ,
786815 L :: Target : ' static + Logger ,
@@ -790,6 +819,9 @@ where
790819 OM :: Target : AOnionMessenger ,
791820 PM :: Target : APeerManager ,
792821 LM :: Target : ALiquidityManager ,
822+ O :: Target : ' static + OutputSpender ,
823+ D :: Target : ' static + ChangeDestinationSource ,
824+ K :: Target : ' static + KVStore ,
793825{
794826 let mut should_break = false ;
795827 let async_event_handler = |event| {
@@ -833,6 +865,13 @@ where
833865 } ,
834866 peer_manager,
835867 gossip_sync,
868+ {
869+ if let Some ( ref sweeper) = sweeper {
870+ sweeper. regenerate_and_broadcast_spend_if_necessary( )
871+ } else {
872+ Ok ( ( ) )
873+ }
874+ } ,
836875 logger,
837876 scorer,
838877 should_break,
@@ -953,14 +992,18 @@ impl BackgroundProcessor {
953992 LM : ' static + Deref + Send ,
954993 S : ' static + Deref < Target = SC > + Send + Sync ,
955994 SC : for < ' b > WriteableScore < ' b > ,
995+ D : ' static + Deref ,
996+ O : ' static + Deref ,
997+ K : ' static + Deref ,
998+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > + Send + Sync ,
956999 > (
9571000 persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
9581001 onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
959- liquidity_manager : Option < LM > , logger : L , scorer : Option < S > ,
1002+ liquidity_manager : Option < LM > , sweeper : Option < OS > , logger : L , scorer : Option < S > ,
9601003 ) -> Self
9611004 where
9621005 UL :: Target : ' static + UtxoLookup ,
963- CF :: Target : ' static + chain:: Filter ,
1006+ CF :: Target : ' static + chain:: Filter + Sync + Send ,
9641007 T :: Target : ' static + BroadcasterInterface ,
9651008 F :: Target : ' static + FeeEstimator ,
9661009 L :: Target : ' static + Logger ,
@@ -970,6 +1013,9 @@ impl BackgroundProcessor {
9701013 OM :: Target : AOnionMessenger ,
9711014 PM :: Target : APeerManager ,
9721015 LM :: Target : ALiquidityManager ,
1016+ O :: Target : ' static + OutputSpender ,
1017+ D :: Target : ' static + ChangeDestinationSource ,
1018+ K :: Target : ' static + KVStore ,
9731019 {
9741020 let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
9751021 let stop_thread_clone = stop_thread. clone ( ) ;
@@ -1005,6 +1051,13 @@ impl BackgroundProcessor {
10051051 } ,
10061052 peer_manager,
10071053 gossip_sync,
1054+ {
1055+ if let Some ( ref sweeper) = sweeper {
1056+ sweeper. regenerate_and_broadcast_spend_if_necessary( )
1057+ } else {
1058+ Ok ( ( ) )
1059+ }
1060+ } ,
10081061 logger,
10091062 scorer,
10101063 stop_thread. load( Ordering :: Acquire ) ,
@@ -1269,7 +1322,7 @@ mod tests {
12691322 Arc < test_utils:: TestBroadcaster > ,
12701323 Arc < TestWallet > ,
12711324 Arc < test_utils:: TestFeeEstimator > ,
1272- Arc < dyn Filter + Sync + Send > ,
1325+ Arc < test_utils :: TestChainSource > ,
12731326 Arc < FilesystemStore > ,
12741327 Arc < test_utils:: TestLogger > ,
12751328 Arc < KeysManager > ,
@@ -1648,7 +1701,7 @@ mod tests {
16481701 best_block,
16491702 Arc :: clone ( & tx_broadcaster) ,
16501703 Arc :: clone ( & fee_estimator) ,
1651- None :: < Arc < dyn Filter + Sync + Send > > ,
1704+ None :: < Arc < test_utils :: TestChainSource > > ,
16521705 Arc :: clone ( & keys_manager) ,
16531706 wallet,
16541707 Arc :: clone ( & kv_store) ,
@@ -1888,6 +1941,7 @@ mod tests {
18881941 nodes[ 0 ] . p2p_gossip_sync ( ) ,
18891942 nodes[ 0 ] . peer_manager . clone ( ) ,
18901943 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
1944+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
18911945 nodes[ 0 ] . logger . clone ( ) ,
18921946 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
18931947 ) ;
@@ -1982,6 +2036,7 @@ mod tests {
19822036 nodes[ 0 ] . no_gossip_sync ( ) ,
19832037 nodes[ 0 ] . peer_manager . clone ( ) ,
19842038 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2039+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
19852040 nodes[ 0 ] . logger . clone ( ) ,
19862041 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
19872042 ) ;
@@ -2025,6 +2080,7 @@ mod tests {
20252080 nodes[ 0 ] . no_gossip_sync ( ) ,
20262081 nodes[ 0 ] . peer_manager . clone ( ) ,
20272082 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2083+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
20282084 nodes[ 0 ] . logger . clone ( ) ,
20292085 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
20302086 ) ;
@@ -2058,6 +2114,7 @@ mod tests {
20582114 nodes[ 0 ] . rapid_gossip_sync ( ) ,
20592115 nodes[ 0 ] . peer_manager . clone ( ) ,
20602116 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2117+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
20612118 nodes[ 0 ] . logger . clone ( ) ,
20622119 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
20632120 move |dur : Duration | {
@@ -2095,6 +2152,7 @@ mod tests {
20952152 nodes[ 0 ] . p2p_gossip_sync ( ) ,
20962153 nodes[ 0 ] . peer_manager . clone ( ) ,
20972154 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2155+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
20982156 nodes[ 0 ] . logger . clone ( ) ,
20992157 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21002158 ) ;
@@ -2125,6 +2183,7 @@ mod tests {
21252183 nodes[ 0 ] . no_gossip_sync ( ) ,
21262184 nodes[ 0 ] . peer_manager . clone ( ) ,
21272185 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2186+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
21282187 nodes[ 0 ] . logger . clone ( ) ,
21292188 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21302189 ) ;
@@ -2172,6 +2231,7 @@ mod tests {
21722231 nodes[ 0 ] . no_gossip_sync ( ) ,
21732232 nodes[ 0 ] . peer_manager . clone ( ) ,
21742233 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2234+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
21752235 nodes[ 0 ] . logger . clone ( ) ,
21762236 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21772237 ) ;
@@ -2235,6 +2295,7 @@ mod tests {
22352295 nodes[ 0 ] . no_gossip_sync ( ) ,
22362296 nodes[ 0 ] . peer_manager . clone ( ) ,
22372297 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2298+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
22382299 nodes[ 0 ] . logger . clone ( ) ,
22392300 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
22402301 ) ;
@@ -2280,10 +2341,22 @@ mod tests {
22802341
22812342 advance_chain ( & mut nodes[ 0 ] , 3 ) ;
22822343
2344+ let tx_broadcaster = nodes[ 0 ] . tx_broadcaster . clone ( ) ;
2345+ let wait_for_sweep_tx = || -> Transaction {
2346+ loop {
2347+ let sweep_tx = tx_broadcaster. txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) ;
2348+ if let Some ( sweep_tx) = sweep_tx {
2349+ return sweep_tx;
2350+ }
2351+
2352+ std:: thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
2353+ }
2354+ } ;
2355+
22832356 // Check we generate an initial sweeping tx.
22842357 assert_eq ! ( nodes[ 0 ] . sweeper. tracked_spendable_outputs( ) . len( ) , 1 ) ;
2358+ let sweep_tx_0 = wait_for_sweep_tx ( ) ;
22852359 let tracked_output = nodes[ 0 ] . sweeper . tracked_spendable_outputs ( ) . first ( ) . unwrap ( ) . clone ( ) ;
2286- let sweep_tx_0 = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) . unwrap ( ) ;
22872360 match tracked_output. status {
22882361 OutputSpendStatus :: PendingFirstConfirmation { latest_spending_tx, .. } => {
22892362 assert_eq ! ( sweep_tx_0. compute_txid( ) , latest_spending_tx. compute_txid( ) ) ;
@@ -2294,8 +2367,8 @@ mod tests {
22942367 // Check we regenerate and rebroadcast the sweeping tx each block.
22952368 advance_chain ( & mut nodes[ 0 ] , 1 ) ;
22962369 assert_eq ! ( nodes[ 0 ] . sweeper. tracked_spendable_outputs( ) . len( ) , 1 ) ;
2370+ let sweep_tx_1 = wait_for_sweep_tx ( ) ;
22972371 let tracked_output = nodes[ 0 ] . sweeper . tracked_spendable_outputs ( ) . first ( ) . unwrap ( ) . clone ( ) ;
2298- let sweep_tx_1 = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) . unwrap ( ) ;
22992372 match tracked_output. status {
23002373 OutputSpendStatus :: PendingFirstConfirmation { latest_spending_tx, .. } => {
23012374 assert_eq ! ( sweep_tx_1. compute_txid( ) , latest_spending_tx. compute_txid( ) ) ;
@@ -2306,8 +2379,8 @@ mod tests {
23062379
23072380 advance_chain ( & mut nodes[ 0 ] , 1 ) ;
23082381 assert_eq ! ( nodes[ 0 ] . sweeper. tracked_spendable_outputs( ) . len( ) , 1 ) ;
2382+ let sweep_tx_2 = wait_for_sweep_tx ( ) ;
23092383 let tracked_output = nodes[ 0 ] . sweeper . tracked_spendable_outputs ( ) . first ( ) . unwrap ( ) . clone ( ) ;
2310- let sweep_tx_2 = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) . unwrap ( ) ;
23112384 match tracked_output. status {
23122385 OutputSpendStatus :: PendingFirstConfirmation { latest_spending_tx, .. } => {
23132386 assert_eq ! ( sweep_tx_2. compute_txid( ) , latest_spending_tx. compute_txid( ) ) ;
@@ -2387,6 +2460,7 @@ mod tests {
23872460 nodes[ 0 ] . no_gossip_sync ( ) ,
23882461 nodes[ 0 ] . peer_manager . clone ( ) ,
23892462 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2463+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
23902464 nodes[ 0 ] . logger . clone ( ) ,
23912465 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
23922466 ) ;
@@ -2417,6 +2491,7 @@ mod tests {
24172491 nodes[ 0 ] . no_gossip_sync ( ) ,
24182492 nodes[ 0 ] . peer_manager . clone ( ) ,
24192493 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2494+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
24202495 nodes[ 0 ] . logger . clone ( ) ,
24212496 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
24222497 ) ;
@@ -2513,6 +2588,7 @@ mod tests {
25132588 nodes[ 0 ] . rapid_gossip_sync ( ) ,
25142589 nodes[ 0 ] . peer_manager . clone ( ) ,
25152590 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2591+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
25162592 nodes[ 0 ] . logger . clone ( ) ,
25172593 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
25182594 ) ;
@@ -2546,6 +2622,7 @@ mod tests {
25462622 nodes[ 0 ] . rapid_gossip_sync ( ) ,
25472623 nodes[ 0 ] . peer_manager . clone ( ) ,
25482624 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2625+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
25492626 nodes[ 0 ] . logger . clone ( ) ,
25502627 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
25512628 move |dur : Duration | {
@@ -2709,6 +2786,7 @@ mod tests {
27092786 nodes[ 0 ] . no_gossip_sync ( ) ,
27102787 nodes[ 0 ] . peer_manager . clone ( ) ,
27112788 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2789+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
27122790 nodes[ 0 ] . logger . clone ( ) ,
27132791 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
27142792 ) ;
@@ -2760,6 +2838,7 @@ mod tests {
27602838 nodes[ 0 ] . no_gossip_sync ( ) ,
27612839 nodes[ 0 ] . peer_manager . clone ( ) ,
27622840 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2841+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
27632842 nodes[ 0 ] . logger . clone ( ) ,
27642843 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
27652844 move |dur : Duration | {
0 commit comments