@@ -382,8 +382,10 @@ where
382382 output_spender : O , change_destination_source : D , kv_store : K , logger : L ,
383383 ) -> Self {
384384 let outputs = Vec :: new ( ) ;
385- let sweeper_state =
386- Mutex :: new ( SweeperState { persistent : PersistentSweeperState { outputs, best_block } } ) ;
385+ let sweeper_state = Mutex :: new ( SweeperState {
386+ persistent : PersistentSweeperState { outputs, best_block } ,
387+ dirty : false ,
388+ } ) ;
387389 Self {
388390 sweeper_state,
389391 pending_sweep : AtomicBool :: new ( false ) ,
@@ -450,7 +452,7 @@ where
450452
451453 state_lock. persistent . outputs . push ( output_info) ;
452454 }
453- self . persist_state ( & state_lock) . map_err ( |e| {
455+ self . persist_state ( & mut state_lock) . map_err ( |e| {
454456 log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
455457 } )
456458 }
@@ -478,7 +480,19 @@ where
478480 return Ok ( ( ) ) ;
479481 }
480482
481- let result = self . regenerate_and_broadcast_spend_if_necessary_internal ( ) . await ;
483+ let result = {
484+ self . regenerate_and_broadcast_spend_if_necessary_internal ( ) . await ?;
485+
486+ // If there is still dirty state, we need to persist it.
487+ let mut sweeper_state = self . sweeper_state . lock ( ) . unwrap ( ) ;
488+ if !sweeper_state. dirty {
489+ return Ok ( ( ) ) ;
490+ }
491+
492+ self . persist_state ( & mut sweeper_state) . map_err ( |e| {
493+ log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
494+ } )
495+ } ;
482496
483497 // Release the pending sweep flag again, regardless of result.
484498 self . pending_sweep . store ( false , Ordering :: Release ) ;
@@ -571,7 +585,7 @@ where
571585 output_info. status . broadcast ( cur_hash, cur_height, spending_tx. clone ( ) ) ;
572586 }
573587
574- self . persist_state ( & sweeper_state) . map_err ( |e| {
588+ self . persist_state ( & mut sweeper_state) . map_err ( |e| {
575589 log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
576590 } ) ?;
577591
@@ -599,9 +613,11 @@ where
599613 }
600614 true
601615 } ) ;
616+
617+ sweeper_state. dirty = true ;
602618 }
603619
604- fn persist_state ( & self , sweeper_state : & SweeperState ) -> Result < ( ) , io:: Error > {
620+ fn persist_state ( & self , sweeper_state : & mut SweeperState ) -> Result < ( ) , io:: Error > {
605621 self . kv_store
606622 . write (
607623 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE ,
@@ -620,6 +636,9 @@ where
620636 ) ;
621637 e
622638 } )
639+ . map ( |_| {
640+ sweeper_state. dirty = false ;
641+ } )
623642 }
624643
625644 fn spend_outputs (
@@ -652,13 +671,17 @@ where
652671 }
653672 }
654673 }
674+
675+ sweeper_state. dirty = true ;
655676 }
656677
657678 fn best_block_updated_internal (
658679 & self , sweeper_state : & mut SweeperState , header : & Header , height : u32 ,
659680 ) {
660681 sweeper_state. persistent . best_block = BestBlock :: new ( header. block_hash ( ) , height) ;
661682 self . prune_confirmed_outputs ( sweeper_state) ;
683+
684+ sweeper_state. dirty = true ;
662685 }
663686}
664687
@@ -682,12 +705,8 @@ where
682705 assert_eq ! ( state_lock. persistent. best_block. height, height - 1 ,
683706 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height" ) ;
684707
685- self . transactions_confirmed_internal ( & mut * state_lock, header, txdata, height) ;
686- self . best_block_updated_internal ( & mut * state_lock, header, height) ;
687-
688- let _ = self . persist_state ( & * state_lock) . map_err ( |e| {
689- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
690- } ) ;
708+ self . transactions_confirmed_internal ( & mut state_lock, header, txdata, height) ;
709+ self . best_block_updated_internal ( & mut state_lock, header, height) ;
691710 }
692711
693712 fn block_disconnected ( & self , header : & Header , height : u32 ) {
@@ -709,9 +728,7 @@ where
709728 }
710729 }
711730
712- self . persist_state ( & * state_lock) . unwrap_or_else ( |e| {
713- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
714- } ) ;
731+ state_lock. dirty = true ;
715732 }
716733}
717734
@@ -731,9 +748,6 @@ where
731748 ) {
732749 let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
733750 self . transactions_confirmed_internal ( & mut * state_lock, header, txdata, height) ;
734- self . persist_state ( & * state_lock) . unwrap_or_else ( |e| {
735- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
736- } ) ;
737751 }
738752
739753 fn transaction_unconfirmed ( & self , txid : & Txid ) {
@@ -756,24 +770,18 @@ where
756770 . filter ( |o| o. status . confirmation_height ( ) >= Some ( unconf_height) )
757771 . for_each ( |o| o. status . unconfirmed ( ) ) ;
758772
759- self . persist_state ( & * state_lock) . unwrap_or_else ( |e| {
760- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
761- } ) ;
773+ state_lock. dirty = true ;
762774 }
763775 }
764776
765777 fn best_block_updated ( & self , header : & Header , height : u32 ) {
766778 let mut state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
767- self . best_block_updated_internal ( & mut * state_lock, header, height) ;
768- let _ = self . persist_state ( & * state_lock) . map_err ( |e| {
769- log_error ! ( self . logger, "Error persisting OutputSweeper: {:?}" , e) ;
770- } ) ;
779+ self . best_block_updated_internal ( & mut state_lock, header, height) ;
771780 }
772781
773782 fn get_relevant_txids ( & self ) -> Vec < ( Txid , u32 , Option < BlockHash > ) > {
774- let state_lock = self . sweeper_state . lock ( ) . unwrap ( ) ;
783+ let state_lock = & self . sweeper_state . lock ( ) . unwrap ( ) . persistent ;
775784 state_lock
776- . persistent
777785 . outputs
778786 . iter ( )
779787 . filter_map ( |o| match o. status {
@@ -796,6 +804,7 @@ where
796804#[ derive( Debug ) ]
797805struct SweeperState {
798806 persistent : PersistentSweeperState ,
807+ dirty : bool ,
799808}
800809
801810#[ derive( Debug , Clone ) ]
@@ -860,7 +869,7 @@ where
860869 }
861870 }
862871
863- let sweeper_state = Mutex :: new ( SweeperState { persistent : state } ) ;
872+ let sweeper_state = Mutex :: new ( SweeperState { persistent : state, dirty : false } ) ;
864873 Ok ( Self {
865874 sweeper_state,
866875 pending_sweep : AtomicBool :: new ( false ) ,
@@ -909,7 +918,7 @@ where
909918 }
910919 }
911920
912- let sweeper_state = Mutex :: new ( SweeperState { persistent : state } ) ;
921+ let sweeper_state = Mutex :: new ( SweeperState { persistent : state, dirty : false } ) ;
913922 Ok ( (
914923 best_block,
915924 OutputSweeper {
0 commit comments