@@ -28,7 +28,7 @@ use crate::{impl_writeable_tlv_based, log_debug, log_error};
2828use bitcoin:: block:: Header ;
2929use bitcoin:: locktime:: absolute:: LockTime ;
3030use bitcoin:: secp256k1:: Secp256k1 ;
31- use bitcoin:: { BlockHash , Transaction , Txid } ;
31+ use bitcoin:: { BlockHash , ScriptBuf , Transaction , Txid } ;
3232
3333use core:: ops:: Deref ;
3434
@@ -372,7 +372,7 @@ where
372372 output_spender : O , change_destination_source : D , kv_store : K , logger : L ,
373373 ) -> Self {
374374 let outputs = Vec :: new ( ) ;
375- let sweeper_state = Mutex :: new ( SweeperState { outputs, best_block } ) ;
375+ let sweeper_state = Mutex :: new ( SweeperState { outputs, best_block, sweep_pending : false } ) ;
376376 Self {
377377 sweeper_state,
378378 broadcaster,
@@ -451,14 +451,71 @@ where
451451
452452 /// Regenerates and broadcasts the spending transaction for any outputs that are pending
453453 pub async fn regenerate_and_broadcast_spend_if_necessary_locked ( & self ) -> Result < ( ) , ( ) > {
454- let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
455- self . regenerate_and_broadcast_spend_if_necessary ( & mut * sweeper_state) . await
454+ // Collect spendable output descriptors.
455+ let respend_descriptors_clones: Vec < SpendableOutputDescriptor > ;
456+ let respend_descriptors: Vec < & SpendableOutputDescriptor > ;
457+ {
458+ let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
459+
460+ // Prevent concurrent sweeping.
461+ if sweeper_state. sweep_pending {
462+ return Ok ( ( ) ) ;
463+ }
464+
465+ let cur_height = sweeper_state. best_block . height ;
466+ let filter_fn = |o : & TrackedSpendableOutput | {
467+ if o. status . is_confirmed ( ) {
468+ // Don't rebroadcast confirmed txs.
469+ return false ;
470+ }
471+
472+ if o. status . is_delayed ( cur_height) {
473+ // Don't generate and broadcast if still delayed
474+ return false ;
475+ }
476+
477+ if o. status . latest_broadcast_height ( ) >= Some ( cur_height) {
478+ // Only broadcast once per block height.
479+ return false ;
480+ }
481+
482+ true
483+ } ;
484+
485+ // Clone first, otherwise we can't take references that outlive the lock.
486+ respend_descriptors_clones =
487+ sweeper_state. outputs . iter ( ) . filter ( |o| filter_fn ( * o) ) . map ( |o| o. descriptor . clone ( ) ) . collect ( ) ;
488+
489+ respend_descriptors = respend_descriptors_clones. iter ( ) . collect ( ) ;
490+
491+ if respend_descriptors. is_empty ( ) {
492+ // Nothing to do.
493+ return Ok ( ( ) ) ;
494+ }
495+
496+ // There is something to sweep. Block concurrent sweeps.
497+ sweeper_state. sweep_pending = true ;
498+ }
499+
500+ // Request a new change address outside of the mutex to avoid the mutex crossing await.
501+ let change_destination_script_result = self . change_destination_source . get_change_destination_script ( ) . await ;
502+
503+ // Sweep the outputs.
504+ {
505+ let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
506+
507+ // Always allow a new sweep after this spend.
508+ sweeper_state. sweep_pending = false ;
509+
510+ let change_destination_script = change_destination_script_result?;
511+ self . regenerate_and_broadcast_spend_if_necessary ( & mut * sweeper_state, respend_descriptors, change_destination_script) . await
512+ }
456513 }
457514
458515 async fn regenerate_and_broadcast_spend_if_necessary (
459- & self , sweeper_state : & mut SweeperState ,
516+ & self , sweeper_state : & mut SweeperState , respend_descriptors : Vec < & SpendableOutputDescriptor > , change_destination_script : ScriptBuf ,
460517 ) -> Result < ( ) , ( ) > {
461- let spending_tx_opt = self . regenerate_spend_if_necessary ( sweeper_state) . await ;
518+ let spending_tx_opt = self . regenerate_spend_if_necessary ( sweeper_state, respend_descriptors , change_destination_script ) . await ;
462519 if let Some ( spending_tx) = spending_tx_opt {
463520 self . persist_state ( & * sweeper_state) . map_err ( |e| {
464521 log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
@@ -471,38 +528,12 @@ where
471528 }
472529
473530 async fn regenerate_spend_if_necessary (
474- & self , sweeper_state : & mut SweeperState ,
531+ & self , sweeper_state : & mut SweeperState , respend_descriptors : Vec < & SpendableOutputDescriptor > , change_destination_script : ScriptBuf ,
475532 ) -> Option < Transaction > {
476533 let cur_height = sweeper_state. best_block . height ;
477534 let cur_hash = sweeper_state. best_block . block_hash ;
478- let filter_fn = |o : & TrackedSpendableOutput | {
479- if o. status . is_confirmed ( ) {
480- // Don't rebroadcast confirmed txs.
481- return false ;
482- }
483535
484- if o. status . is_delayed ( cur_height) {
485- // Don't generate and broadcast if still delayed
486- return false ;
487- }
488-
489- if o. status . latest_broadcast_height ( ) >= Some ( cur_height) {
490- // Only broadcast once per block height.
491- return false ;
492- }
493-
494- true
495- } ;
496-
497- let respend_descriptors: Vec < & SpendableOutputDescriptor > =
498- sweeper_state. outputs . iter ( ) . filter ( |o| filter_fn ( * o) ) . map ( |o| & o. descriptor ) . collect ( ) ;
499-
500- if respend_descriptors. is_empty ( ) {
501- // Nothing to do.
502- return None ;
503- }
504-
505- let spending_tx = match self . spend_outputs ( & * sweeper_state, respend_descriptors) . await {
536+ let spending_tx = match self . spend_outputs ( & * sweeper_state, & respend_descriptors, change_destination_script) {
506537 Ok ( spending_tx) => {
507538 log_debug ! (
508539 self . logger,
@@ -517,10 +548,14 @@ where
517548 } ,
518549 } ;
519550
520- // As we didn't modify the state so far, the same filter_fn yields the same elements as
521- // above.
522- let respend_outputs = sweeper_state. outputs . iter_mut ( ) . filter ( |o| filter_fn ( & * * o) ) ;
523- for output_info in respend_outputs {
551+ // Watch outputs and update status.
552+ for output_info in respend_descriptors {
553+ let output_info = sweeper_state
554+ . outputs
555+ . iter_mut ( )
556+ . find ( |o| o. descriptor == * output_info)
557+ . expect ( "We should have found the output info" ) ;
558+
524559 if let Some ( filter) = self . chain_data_source . as_ref ( ) {
525560 let watched_output = output_info. to_watched_output ( cur_hash) ;
526561 filter. register_output ( watched_output) ;
@@ -573,17 +608,15 @@ where
573608 } )
574609 }
575610
576- async fn spend_outputs (
577- & self , sweeper_state : & SweeperState , descriptors : Vec < & SpendableOutputDescriptor > ,
611+ fn spend_outputs (
612+ & self , sweeper_state : & SweeperState , descriptors : & Vec < & SpendableOutputDescriptor > , change_destination_script : ScriptBuf ,
578613 ) -> Result < Transaction , ( ) > {
579614 let tx_feerate =
580615 self . fee_estimator . get_est_sat_per_1000_weight ( ConfirmationTarget :: OutputSpendingFee ) ;
581- let change_destination_script =
582- self . change_destination_source . get_change_destination_script ( ) . await ?;
583616 let cur_height = sweeper_state. best_block . height ;
584617 let locktime = Some ( LockTime :: from_height ( cur_height) . unwrap_or ( LockTime :: ZERO ) ) ;
585618 self . output_spender . spend_spendable_outputs (
586- & descriptors,
619+ descriptors,
587620 Vec :: new ( ) ,
588621 change_destination_script,
589622 tx_feerate,
@@ -746,11 +779,15 @@ where
746779struct SweeperState {
747780 outputs : Vec < TrackedSpendableOutput > ,
748781 best_block : BestBlock ,
782+ sweep_pending : bool ,
749783}
750784
751785impl_writeable_tlv_based ! ( SweeperState , {
752786 ( 0 , outputs, required_vec) ,
753787 ( 2 , best_block, required) ,
788+
789+ // TODO: Do not persist this field.
790+ ( 4 , sweep_pending, required) ,
754791} ) ;
755792
756793/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a
0 commit comments