@@ -36,8 +36,10 @@ use lightning::onion_message::messenger::AOnionMessenger;
3636use lightning:: routing:: gossip:: { NetworkGraph , P2PGossipSync } ;
3737use lightning:: routing:: scoring:: { ScoreUpdate , WriteableScore } ;
3838use lightning:: routing:: utxo:: UtxoLookup ;
39+ use lightning:: sign:: { ChangeDestinationSource , OutputSpender } ;
3940use lightning:: util:: logger:: Logger ;
40- use lightning:: util:: persist:: Persister ;
41+ use lightning:: util:: persist:: { KVStore , Persister } ;
42+ use lightning:: util:: sweep:: OutputSweeper ;
4143#[ cfg( feature = "std" ) ]
4244use lightning:: util:: wakers:: Sleeper ;
4345use lightning_rapid_gossip_sync:: RapidGossipSync ;
@@ -132,6 +134,11 @@ const REBROADCAST_TIMER: u64 = 30;
132134#[ cfg( test) ]
133135const REBROADCAST_TIMER : u64 = 1 ;
134136
137+ #[ cfg( not( test) ) ]
138+ const SWEEPER_TIMER : u64 = 30 ;
139+ #[ cfg( test) ]
140+ const SWEEPER_TIMER : u64 = 1 ;
141+
135142#[ cfg( feature = "futures" ) ]
136143/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
137144const fn min_u64 ( a : u64 , b : u64 ) -> u64 {
@@ -308,6 +315,7 @@ macro_rules! define_run_body {
308315 $channel_manager: ident, $process_channel_manager_events: expr,
309316 $onion_messenger: ident, $process_onion_message_handler_events: expr,
310317 $peer_manager: ident, $gossip_sync: ident,
318+ $process_sweeper: expr,
311319 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
312320 $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
313321 ) => { {
@@ -322,6 +330,7 @@ macro_rules! define_run_body {
322330 let mut last_prune_call = $get_timer( FIRST_NETWORK_PRUNE_TIMER ) ;
323331 let mut last_scorer_persist_call = $get_timer( SCORER_PERSIST_TIMER ) ;
324332 let mut last_rebroadcast_call = $get_timer( REBROADCAST_TIMER ) ;
333+ let mut last_sweeper_call = $get_timer( SWEEPER_TIMER ) ;
325334 let mut have_pruned = false ;
326335 let mut have_decayed_scorer = false ;
327336
@@ -465,6 +474,12 @@ macro_rules! define_run_body {
465474 $chain_monitor. rebroadcast_pending_claims( ) ;
466475 last_rebroadcast_call = $get_timer( REBROADCAST_TIMER ) ;
467476 }
477+
478+ if $timer_elapsed( & mut last_sweeper_call, SWEEPER_TIMER ) {
479+ log_trace!( $logger, "Regenerating sweeper spends if necessary" ) ;
480+ let _ = $process_sweeper;
481+ last_sweeper_call = $get_timer( SWEEPER_TIMER ) ;
482+ }
468483 }
469484
470485 // After we exit, ensure we persist the ChannelManager one final time - this avoids
@@ -627,6 +642,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
627642/// ```
628643/// # use lightning::io;
629644/// # use lightning::events::ReplayEvent;
645+ /// # use lightning::util::sweep::OutputSweeper;
630646/// # use std::sync::{Arc, RwLock};
631647/// # use std::sync::atomic::{AtomicBool, Ordering};
632648/// # use std::time::SystemTime;
@@ -666,6 +682,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
666682/// # F: lightning::chain::Filter + Send + Sync + 'static,
667683/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
668684/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
685+ /// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
686+ /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
687+ /// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
669688/// # > {
670689/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
671690/// # event_handler: Arc<EventHandler>,
@@ -677,14 +696,18 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
677696/// # persister: Arc<Store>,
678697/// # logger: Arc<Logger>,
679698/// # scorer: Arc<Scorer>,
699+ /// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
680700/// # }
681701/// #
682702/// # async fn setup_background_processing<
683703/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
684704/// # F: lightning::chain::Filter + Send + Sync + 'static,
685705/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
686706/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
687- /// # >(node: Node<B, F, FE, UL>) {
707+ /// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
708+ /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
709+ /// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
710+ /// # >(node: Node<B, F, FE, UL, D, K, O>) {
688711/// let background_persister = Arc::clone(&node.persister);
689712/// let background_event_handler = Arc::clone(&node.event_handler);
690713/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -695,7 +718,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
695718/// let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
696719/// let background_logger = Arc::clone(&node.logger);
697720/// let background_scorer = Arc::clone(&node.scorer);
698- ///
721+ /// let background_sweeper = Arc::clone(&node.sweeper);
699722/// // Setup the sleeper.
700723#[ cfg_attr(
701724 feature = "std" ,
@@ -729,6 +752,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
729752/// background_gossip_sync,
730753/// background_peer_man,
731754/// Some(background_liquidity_manager),
755+ /// Some(background_sweeper),
732756/// background_logger,
733757/// Some(background_scorer),
734758/// sleeper,
@@ -767,6 +791,10 @@ pub async fn process_events_async<
767791 RGS : ' static + Deref < Target = RapidGossipSync < G , L > > ,
768792 PM : ' static + Deref ,
769793 LM : ' static + Deref ,
794+ D : ' static + Deref ,
795+ O : ' static + Deref ,
796+ K : ' static + Deref ,
797+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > ,
770798 S : ' static + Deref < Target = SC > + Send + Sync ,
771799 SC : for < ' b > WriteableScore < ' b > ,
772800 SleepFuture : core:: future:: Future < Output = bool > + core:: marker:: Unpin ,
@@ -775,12 +803,12 @@ pub async fn process_events_async<
775803> (
776804 persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
777805 onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
778- liquidity_manager : Option < LM > , logger : L , scorer : Option < S > , sleeper : Sleeper ,
779- mobile_interruptable_platform : bool , fetch_time : FetchTime ,
806+ liquidity_manager : Option < LM > , sweeper : Option < OS > , logger : L , scorer : Option < S > ,
807+ sleeper : Sleeper , mobile_interruptable_platform : bool , fetch_time : FetchTime ,
780808) -> Result < ( ) , lightning:: io:: Error >
781809where
782810 UL :: Target : ' static + UtxoLookup ,
783- CF :: Target : ' static + chain:: Filter ,
811+ CF :: Target : ' static + chain:: Filter + Sync + Send ,
784812 T :: Target : ' static + BroadcasterInterface ,
785813 F :: Target : ' static + FeeEstimator ,
786814 L :: Target : ' static + Logger ,
@@ -790,6 +818,9 @@ where
790818 OM :: Target : AOnionMessenger ,
791819 PM :: Target : APeerManager ,
792820 LM :: Target : ALiquidityManager ,
821+ O :: Target : ' static + OutputSpender ,
822+ D :: Target : ' static + ChangeDestinationSource ,
823+ K :: Target : ' static + KVStore ,
793824{
794825 let mut should_break = false ;
795826 let async_event_handler = |event| {
@@ -833,6 +864,13 @@ where
833864 } ,
834865 peer_manager,
835866 gossip_sync,
867+ {
868+ if let Some ( ref sweeper) = sweeper {
869+ sweeper. regenerate_and_broadcast_spend_if_necessary( )
870+ } else {
871+ Ok ( ( ) )
872+ }
873+ } ,
836874 logger,
837875 scorer,
838876 should_break,
@@ -953,14 +991,18 @@ impl BackgroundProcessor {
953991 LM : ' static + Deref + Send ,
954992 S : ' static + Deref < Target = SC > + Send + Sync ,
955993 SC : for < ' b > WriteableScore < ' b > ,
994+ D : ' static + Deref ,
995+ O : ' static + Deref ,
996+ K : ' static + Deref ,
997+ OS : ' static + Deref < Target = OutputSweeper < T , D , F , CF , K , L , O > > + Send + Sync ,
956998 > (
957999 persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
9581000 onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
959- liquidity_manager : Option < LM > , logger : L , scorer : Option < S > ,
1001+ liquidity_manager : Option < LM > , sweeper : Option < OS > , logger : L , scorer : Option < S > ,
9601002 ) -> Self
9611003 where
9621004 UL :: Target : ' static + UtxoLookup ,
963- CF :: Target : ' static + chain:: Filter ,
1005+ CF :: Target : ' static + chain:: Filter + Sync + Send ,
9641006 T :: Target : ' static + BroadcasterInterface ,
9651007 F :: Target : ' static + FeeEstimator ,
9661008 L :: Target : ' static + Logger ,
@@ -970,6 +1012,9 @@ impl BackgroundProcessor {
9701012 OM :: Target : AOnionMessenger ,
9711013 PM :: Target : APeerManager ,
9721014 LM :: Target : ALiquidityManager ,
1015+ O :: Target : ' static + OutputSpender ,
1016+ D :: Target : ' static + ChangeDestinationSource ,
1017+ K :: Target : ' static + KVStore ,
9731018 {
9741019 let stop_thread = Arc :: new ( AtomicBool :: new ( false ) ) ;
9751020 let stop_thread_clone = stop_thread. clone ( ) ;
@@ -1005,6 +1050,13 @@ impl BackgroundProcessor {
10051050 } ,
10061051 peer_manager,
10071052 gossip_sync,
1053+ {
1054+ if let Some ( ref sweeper) = sweeper {
1055+ sweeper. regenerate_and_broadcast_spend_if_necessary( )
1056+ } else {
1057+ Ok ( ( ) )
1058+ }
1059+ } ,
10081060 logger,
10091061 scorer,
10101062 stop_thread. load( Ordering :: Acquire ) ,
@@ -1269,7 +1321,7 @@ mod tests {
12691321 Arc < test_utils:: TestBroadcaster > ,
12701322 Arc < TestWallet > ,
12711323 Arc < test_utils:: TestFeeEstimator > ,
1272- Arc < dyn Filter + Sync + Send > ,
1324+ Arc < test_utils :: TestChainSource > ,
12731325 Arc < FilesystemStore > ,
12741326 Arc < test_utils:: TestLogger > ,
12751327 Arc < KeysManager > ,
@@ -1648,7 +1700,7 @@ mod tests {
16481700 best_block,
16491701 Arc :: clone ( & tx_broadcaster) ,
16501702 Arc :: clone ( & fee_estimator) ,
1651- None :: < Arc < dyn Filter + Sync + Send > > ,
1703+ None :: < Arc < test_utils :: TestChainSource > > ,
16521704 Arc :: clone ( & keys_manager) ,
16531705 wallet,
16541706 Arc :: clone ( & kv_store) ,
@@ -1888,6 +1940,7 @@ mod tests {
18881940 nodes[ 0 ] . p2p_gossip_sync ( ) ,
18891941 nodes[ 0 ] . peer_manager . clone ( ) ,
18901942 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
1943+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
18911944 nodes[ 0 ] . logger . clone ( ) ,
18921945 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
18931946 ) ;
@@ -1982,6 +2035,7 @@ mod tests {
19822035 nodes[ 0 ] . no_gossip_sync ( ) ,
19832036 nodes[ 0 ] . peer_manager . clone ( ) ,
19842037 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2038+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
19852039 nodes[ 0 ] . logger . clone ( ) ,
19862040 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
19872041 ) ;
@@ -2025,6 +2079,7 @@ mod tests {
20252079 nodes[ 0 ] . no_gossip_sync ( ) ,
20262080 nodes[ 0 ] . peer_manager . clone ( ) ,
20272081 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2082+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
20282083 nodes[ 0 ] . logger . clone ( ) ,
20292084 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
20302085 ) ;
@@ -2058,6 +2113,7 @@ mod tests {
20582113 nodes[ 0 ] . rapid_gossip_sync ( ) ,
20592114 nodes[ 0 ] . peer_manager . clone ( ) ,
20602115 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2116+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
20612117 nodes[ 0 ] . logger . clone ( ) ,
20622118 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
20632119 move |dur : Duration | {
@@ -2095,6 +2151,7 @@ mod tests {
20952151 nodes[ 0 ] . p2p_gossip_sync ( ) ,
20962152 nodes[ 0 ] . peer_manager . clone ( ) ,
20972153 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2154+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
20982155 nodes[ 0 ] . logger . clone ( ) ,
20992156 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21002157 ) ;
@@ -2125,6 +2182,7 @@ mod tests {
21252182 nodes[ 0 ] . no_gossip_sync ( ) ,
21262183 nodes[ 0 ] . peer_manager . clone ( ) ,
21272184 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2185+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
21282186 nodes[ 0 ] . logger . clone ( ) ,
21292187 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21302188 ) ;
@@ -2172,6 +2230,7 @@ mod tests {
21722230 nodes[ 0 ] . no_gossip_sync ( ) ,
21732231 nodes[ 0 ] . peer_manager . clone ( ) ,
21742232 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2233+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
21752234 nodes[ 0 ] . logger . clone ( ) ,
21762235 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
21772236 ) ;
@@ -2235,6 +2294,7 @@ mod tests {
22352294 nodes[ 0 ] . no_gossip_sync ( ) ,
22362295 nodes[ 0 ] . peer_manager . clone ( ) ,
22372296 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2297+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
22382298 nodes[ 0 ] . logger . clone ( ) ,
22392299 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
22402300 ) ;
@@ -2280,10 +2340,22 @@ mod tests {
22802340
22812341 advance_chain ( & mut nodes[ 0 ] , 3 ) ;
22822342
2343+ let tx_broadcaster = nodes[ 0 ] . tx_broadcaster . clone ( ) ;
2344+ let wait_for_sweep_tx = || -> Transaction {
2345+ loop {
2346+ let sweep_tx = tx_broadcaster. txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) ;
2347+ if let Some ( sweep_tx) = sweep_tx {
2348+ return sweep_tx;
2349+ }
2350+
2351+ std:: thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
2352+ }
2353+ } ;
2354+
22832355 // Check we generate an initial sweeping tx.
22842356 assert_eq ! ( nodes[ 0 ] . sweeper. tracked_spendable_outputs( ) . len( ) , 1 ) ;
2357+ let sweep_tx_0 = wait_for_sweep_tx ( ) ;
22852358 let tracked_output = nodes[ 0 ] . sweeper . tracked_spendable_outputs ( ) . first ( ) . unwrap ( ) . clone ( ) ;
2286- let sweep_tx_0 = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) . unwrap ( ) ;
22872359 match tracked_output. status {
22882360 OutputSpendStatus :: PendingFirstConfirmation { latest_spending_tx, .. } => {
22892361 assert_eq ! ( sweep_tx_0. compute_txid( ) , latest_spending_tx. compute_txid( ) ) ;
@@ -2294,8 +2366,8 @@ mod tests {
22942366 // Check we regenerate and rebroadcast the sweeping tx each block.
22952367 advance_chain ( & mut nodes[ 0 ] , 1 ) ;
22962368 assert_eq ! ( nodes[ 0 ] . sweeper. tracked_spendable_outputs( ) . len( ) , 1 ) ;
2369+ let sweep_tx_1 = wait_for_sweep_tx ( ) ;
22972370 let tracked_output = nodes[ 0 ] . sweeper . tracked_spendable_outputs ( ) . first ( ) . unwrap ( ) . clone ( ) ;
2298- let sweep_tx_1 = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) . unwrap ( ) ;
22992371 match tracked_output. status {
23002372 OutputSpendStatus :: PendingFirstConfirmation { latest_spending_tx, .. } => {
23012373 assert_eq ! ( sweep_tx_1. compute_txid( ) , latest_spending_tx. compute_txid( ) ) ;
@@ -2306,8 +2378,8 @@ mod tests {
23062378
23072379 advance_chain ( & mut nodes[ 0 ] , 1 ) ;
23082380 assert_eq ! ( nodes[ 0 ] . sweeper. tracked_spendable_outputs( ) . len( ) , 1 ) ;
2381+ let sweep_tx_2 = wait_for_sweep_tx ( ) ;
23092382 let tracked_output = nodes[ 0 ] . sweeper . tracked_spendable_outputs ( ) . first ( ) . unwrap ( ) . clone ( ) ;
2310- let sweep_tx_2 = nodes[ 0 ] . tx_broadcaster . txn_broadcasted . lock ( ) . unwrap ( ) . pop ( ) . unwrap ( ) ;
23112383 match tracked_output. status {
23122384 OutputSpendStatus :: PendingFirstConfirmation { latest_spending_tx, .. } => {
23132385 assert_eq ! ( sweep_tx_2. compute_txid( ) , latest_spending_tx. compute_txid( ) ) ;
@@ -2387,6 +2459,7 @@ mod tests {
23872459 nodes[ 0 ] . no_gossip_sync ( ) ,
23882460 nodes[ 0 ] . peer_manager . clone ( ) ,
23892461 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2462+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
23902463 nodes[ 0 ] . logger . clone ( ) ,
23912464 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
23922465 ) ;
@@ -2417,6 +2490,7 @@ mod tests {
24172490 nodes[ 0 ] . no_gossip_sync ( ) ,
24182491 nodes[ 0 ] . peer_manager . clone ( ) ,
24192492 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2493+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
24202494 nodes[ 0 ] . logger . clone ( ) ,
24212495 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
24222496 ) ;
@@ -2513,6 +2587,7 @@ mod tests {
25132587 nodes[ 0 ] . rapid_gossip_sync ( ) ,
25142588 nodes[ 0 ] . peer_manager . clone ( ) ,
25152589 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2590+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
25162591 nodes[ 0 ] . logger . clone ( ) ,
25172592 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
25182593 ) ;
@@ -2546,6 +2621,7 @@ mod tests {
25462621 nodes[ 0 ] . rapid_gossip_sync ( ) ,
25472622 nodes[ 0 ] . peer_manager . clone ( ) ,
25482623 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2624+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
25492625 nodes[ 0 ] . logger . clone ( ) ,
25502626 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
25512627 move |dur : Duration | {
@@ -2709,6 +2785,7 @@ mod tests {
27092785 nodes[ 0 ] . no_gossip_sync ( ) ,
27102786 nodes[ 0 ] . peer_manager . clone ( ) ,
27112787 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2788+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
27122789 nodes[ 0 ] . logger . clone ( ) ,
27132790 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
27142791 ) ;
@@ -2760,6 +2837,7 @@ mod tests {
27602837 nodes[ 0 ] . no_gossip_sync ( ) ,
27612838 nodes[ 0 ] . peer_manager . clone ( ) ,
27622839 Some ( Arc :: clone ( & nodes[ 0 ] . liquidity_manager ) ) ,
2840+ Some ( nodes[ 0 ] . sweeper . clone ( ) ) ,
27632841 nodes[ 0 ] . logger . clone ( ) ,
27642842 Some ( nodes[ 0 ] . scorer . clone ( ) ) ,
27652843 move |dur : Duration | {
0 commit comments