@@ -33,10 +33,15 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature};
3333use bitcoin:: secp256k1:: { PublicKey , Scalar , Secp256k1 , SecretKey , Signing } ;
3434use bitcoin:: { ScriptBuf , Transaction , TxOut , Txid } ;
3535
36- use std:: ops:: Deref ;
37- use std:: sync:: { Arc , Condvar , Mutex , RwLock } ;
36+ use std:: ops:: { Deref , DerefMut } ;
37+ use std:: sync:: { Arc , Mutex , RwLock } ;
3838use std:: time:: Duration ;
3939
40+ enum WalletSyncStatus {
41+ Completed ,
42+ InProgress { subscribers : tokio:: sync:: broadcast:: Sender < Result < ( ) , Error > > } ,
43+ }
44+
4045pub struct Wallet < D , B : Deref , E : Deref , L : Deref >
4146where
4247 D : BatchDatabase ,
5156 // A cache storing the most recently retrieved fee rate estimations.
5257 broadcaster : B ,
5358 fee_estimator : E ,
54- sync_lock : ( Mutex < ( ) > , Condvar ) ,
59+ // A Mutex holding the current sync status.
60+ sync_status : Mutex < WalletSyncStatus > ,
5561 // TODO: Drop this workaround after BDK 1.0 upgrade.
5662 balance_cache : RwLock < Balance > ,
5763 logger : L ,
@@ -76,69 +82,66 @@ where
7682 } ) ;
7783
7884 let inner = Mutex :: new ( wallet) ;
79- let sync_lock = ( Mutex :: new ( ( ) ) , Condvar :: new ( ) ) ;
85+ let sync_status = Mutex :: new ( WalletSyncStatus :: Completed ) ;
8086 let balance_cache = RwLock :: new ( start_balance) ;
81- Self { blockchain, inner, broadcaster, fee_estimator, sync_lock , balance_cache, logger }
87+ Self { blockchain, inner, broadcaster, fee_estimator, sync_status , balance_cache, logger }
8288 }
8389
8490 pub ( crate ) async fn sync ( & self ) -> Result < ( ) , Error > {
85- let ( lock, cvar) = & self . sync_lock ;
86-
87- let guard = match lock. try_lock ( ) {
88- Ok ( guard) => guard,
89- Err ( _) => {
90- log_info ! ( self . logger, "Sync in progress, skipping." ) ;
91- let guard = cvar. wait ( lock. lock ( ) . unwrap ( ) ) ;
92- drop ( guard) ;
93- cvar. notify_all ( ) ;
94- return Ok ( ( ) ) ;
95- } ,
96- } ;
91+ if let Some ( mut sync_receiver) = self . register_or_subscribe_pending_sync ( ) {
92+ log_info ! ( self . logger, "Sync in progress, skipping." ) ;
93+ return sync_receiver. recv ( ) . await . map_err ( |e| {
94+ debug_assert ! ( false , "Failed to receive wallet sync result: {:?}" , e) ;
95+ log_error ! ( self . logger, "Failed to receive wallet sync result: {:?}" , e) ;
96+ Error :: WalletOperationFailed
97+ } ) ?;
98+ }
9799
98- let sync_options = SyncOptions { progress : None } ;
99- let wallet_lock = self . inner . lock ( ) . unwrap ( ) ;
100- let res = match wallet_lock. sync ( & self . blockchain , sync_options) . await {
101- Ok ( ( ) ) => {
102- // TODO: Drop this workaround after BDK 1.0 upgrade.
103- // Update balance cache after syncing.
104- if let Ok ( balance) = wallet_lock. get_balance ( ) {
105- * self . balance_cache . write ( ) . unwrap ( ) = balance;
106- }
107- Ok ( ( ) )
108- } ,
109- Err ( e) => match e {
110- bdk:: Error :: Esplora ( ref be) => match * * be {
111- bdk:: blockchain:: esplora:: EsploraError :: Reqwest ( _) => {
112- // Drop lock, sleep for a second, retry.
113- drop ( wallet_lock) ;
114- tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
115- log_error ! (
116- self . logger,
117- "Sync failed due to HTTP connection error, retrying: {}" ,
118- e
119- ) ;
120- let sync_options = SyncOptions { progress : None } ;
121- self . inner
122- . lock ( )
123- . unwrap ( )
124- . sync ( & self . blockchain , sync_options)
125- . await
126- . map_err ( From :: from)
100+ let res = {
101+ let wallet_lock = self . inner . lock ( ) . unwrap ( ) ;
102+ match wallet_lock. sync ( & self . blockchain , SyncOptions { progress : None } ) . await {
103+ Ok ( ( ) ) => {
104+ // TODO: Drop this workaround after BDK 1.0 upgrade.
105+ // Update balance cache after syncing.
106+ if let Ok ( balance) = wallet_lock. get_balance ( ) {
107+ * self . balance_cache . write ( ) . unwrap ( ) = balance;
108+ }
109+ Ok ( ( ) )
110+ } ,
111+ Err ( e) => match e {
112+ bdk:: Error :: Esplora ( ref be) => match * * be {
113+ bdk:: blockchain:: esplora:: EsploraError :: Reqwest ( _) => {
114+ // Drop lock, sleep for a second, retry.
115+ drop ( wallet_lock) ;
116+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
117+ log_error ! (
118+ self . logger,
119+ "Sync failed due to HTTP connection error, retrying: {}" ,
120+ e
121+ ) ;
122+ let sync_options = SyncOptions { progress : None } ;
123+ self . inner
124+ . lock ( )
125+ . unwrap ( )
126+ . sync ( & self . blockchain , sync_options)
127+ . await
128+ . map_err ( From :: from)
129+ } ,
130+ _ => {
131+ log_error ! ( self . logger, "Sync failed due to Esplora error: {}" , e) ;
132+ Err ( From :: from ( e) )
133+ } ,
127134 } ,
128135 _ => {
129- log_error ! ( self . logger, "Sync failed due to Esplora error: {}" , e) ;
136+ log_error ! ( self . logger, "Wallet sync error: {}" , e) ;
130137 Err ( From :: from ( e) )
131138 } ,
132139 } ,
133- _ => {
134- log_error ! ( self . logger, "Wallet sync error: {}" , e) ;
135- Err ( From :: from ( e) )
136- } ,
137- } ,
140+ }
138141 } ;
139142
140- drop ( guard ) ;
141- cvar . notify_all ( ) ;
143+ self . propagate_result_to_subscribers ( res ) ;
144+
142145 res
143146 }
144147
@@ -303,6 +306,59 @@ where
303306
304307 Ok ( txid)
305308 }
309+
310+ fn register_or_subscribe_pending_sync (
311+ & self ,
312+ ) -> Option < tokio:: sync:: broadcast:: Receiver < Result < ( ) , Error > > > {
313+ let mut sync_status_lock = self . sync_status . lock ( ) . unwrap ( ) ;
314+ match sync_status_lock. deref_mut ( ) {
315+ WalletSyncStatus :: Completed => {
316+ // We're first to register for a sync.
317+ let ( tx, _) = tokio:: sync:: broadcast:: channel ( 1 ) ;
318+ * sync_status_lock = WalletSyncStatus :: InProgress { subscribers : tx } ;
319+ None
320+ } ,
321+ WalletSyncStatus :: InProgress { subscribers } => {
322+ // A sync is in-progress, we subscribe.
323+ let rx = subscribers. subscribe ( ) ;
324+ Some ( rx)
325+ } ,
326+ }
327+ }
328+
329+ fn propagate_result_to_subscribers ( & self , res : Result < ( ) , Error > ) {
330+ // Send the notification to any other tasks that might be waiting on it by now.
331+ {
332+ let mut sync_status_lock = self . sync_status . lock ( ) . unwrap ( ) ;
333+ match sync_status_lock. deref_mut ( ) {
334+ WalletSyncStatus :: Completed => {
335+ // No sync in-progress, do nothing.
336+ return ;
337+ } ,
338+ WalletSyncStatus :: InProgress { subscribers } => {
339+ // A sync is in-progress, we notify subscribers.
340+ if subscribers. receiver_count ( ) > 0 {
341+ match subscribers. send ( res) {
342+ Ok ( _) => ( ) ,
343+ Err ( e) => {
344+ debug_assert ! (
345+ false ,
346+ "Failed to send wallet sync result to subscribers: {:?}" ,
347+ e
348+ ) ;
349+ log_error ! (
350+ self . logger,
351+ "Failed to send wallet sync result to subscribers: {:?}" ,
352+ e
353+ ) ;
354+ } ,
355+ }
356+ }
357+ * sync_status_lock = WalletSyncStatus :: Completed ;
358+ } ,
359+ }
360+ }
361+ }
306362}
307363
308364impl < D , B : Deref , E : Deref , L : Deref > WalletSource for Wallet < D , B , E , L >
0 commit comments