@@ -33,10 +33,16 @@ use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature};
3333use bitcoin:: secp256k1:: { PublicKey , Scalar , Secp256k1 , SecretKey , Signing } ;
3434use bitcoin:: { ScriptBuf , Transaction , TxOut , Txid } ;
3535
36- use std:: ops:: Deref ;
37- use std:: sync:: { Arc , Condvar , Mutex , RwLock } ;
36+ use std:: mem;
37+ use std:: ops:: { Deref , DerefMut } ;
38+ use std:: sync:: { Arc , Mutex , RwLock } ;
3839use std:: time:: Duration ;
3940
41+ enum WalletSyncStatus {
42+ Completed ,
43+ InProgress { subscribers : Vec < tokio:: sync:: oneshot:: Sender < Result < ( ) , Error > > > } ,
44+ }
45+
4046pub struct Wallet < D , B : Deref , E : Deref , L : Deref >
4147where
4248 D : BatchDatabase ,
5157 // A cache storing the most recently retrieved fee rate estimations.
5258 broadcaster : B ,
5359 fee_estimator : E ,
54- sync_lock : ( Mutex < ( ) > , Condvar ) ,
60+ // A Mutex holding the current sync status.
61+ sync_status : Mutex < WalletSyncStatus > ,
5562 // TODO: Drop this workaround after BDK 1.0 upgrade.
5663 balance_cache : RwLock < Balance > ,
5764 logger : L ,
@@ -76,69 +83,67 @@ where
7683 } ) ;
7784
7885 let inner = Mutex :: new ( wallet) ;
79- let sync_lock = ( Mutex :: new ( ( ) ) , Condvar :: new ( ) ) ;
86+ let sync_status = Mutex :: new ( WalletSyncStatus :: Completed ) ;
8087 let balance_cache = RwLock :: new ( start_balance) ;
81- Self { blockchain, inner, broadcaster, fee_estimator, sync_lock , balance_cache, logger }
88+ Self { blockchain, inner, broadcaster, fee_estimator, sync_status , balance_cache, logger }
8289 }
8390
8491 pub ( crate ) async fn sync ( & self ) -> Result < ( ) , Error > {
85- let ( lock, cvar) = & self . sync_lock ;
86-
87- let guard = match lock. try_lock ( ) {
88- Ok ( guard) => guard,
89- Err ( _) => {
90- log_info ! ( self . logger, "Sync in progress, skipping." ) ;
91- let guard = cvar. wait ( lock. lock ( ) . unwrap ( ) ) ;
92- drop ( guard) ;
93- cvar. notify_all ( ) ;
94- return Ok ( ( ) ) ;
95- } ,
96- } ;
92+ if let Some ( sync_receiver) = self . register_or_subscribe_pending_sync ( ) {
93+ log_info ! ( self . logger, "Sync in progress, skipping." ) ;
94+ return sync_receiver. await . map_err ( |e| {
95+ debug_assert ! ( false , "Failed to receive wallet sync result: {:?}" , e) ;
96+ log_error ! ( self . logger, "Failed to receive wallet sync result: {:?}" , e) ;
97+ Error :: WalletOperationFailed
98+ } ) ?;
99+ }
97100
98- let sync_options = SyncOptions { progress : None } ;
99- let wallet_lock = self . inner . lock ( ) . unwrap ( ) ;
100- let res = match wallet_lock. sync ( & self . blockchain , sync_options) . await {
101- Ok ( ( ) ) => {
102- // TODO: Drop this workaround after BDK 1.0 upgrade.
103- // Update balance cache after syncing.
104- if let Ok ( balance) = wallet_lock. get_balance ( ) {
105- * self . balance_cache . write ( ) . unwrap ( ) = balance;
106- }
107- Ok ( ( ) )
108- } ,
109- Err ( e) => match e {
110- bdk:: Error :: Esplora ( ref be) => match * * be {
111- bdk:: blockchain:: esplora:: EsploraError :: Reqwest ( _) => {
112- // Drop lock, sleep for a second, retry.
113- drop ( wallet_lock) ;
114- tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
115- log_error ! (
116- self . logger,
117- "Sync failed due to HTTP connection error, retrying: {}" ,
118- e
119- ) ;
120- let sync_options = SyncOptions { progress : None } ;
121- self . inner
122- . lock ( )
123- . unwrap ( )
124- . sync ( & self . blockchain , sync_options)
125- . await
126- . map_err ( From :: from)
101+ let res = {
102+ let sync_options = SyncOptions { progress : None } ;
103+ let wallet_lock = self . inner . lock ( ) . unwrap ( ) ;
104+ match wallet_lock. sync ( & self . blockchain , sync_options) . await {
105+ Ok ( ( ) ) => {
106+ // TODO: Drop this workaround after BDK 1.0 upgrade.
107+ // Update balance cache after syncing.
108+ if let Ok ( balance) = wallet_lock. get_balance ( ) {
109+ * self . balance_cache . write ( ) . unwrap ( ) = balance;
110+ }
111+ Ok ( ( ) )
112+ } ,
113+ Err ( e) => match e {
114+ bdk:: Error :: Esplora ( ref be) => match * * be {
115+ bdk:: blockchain:: esplora:: EsploraError :: Reqwest ( _) => {
116+ // Drop lock, sleep for a second, retry.
117+ drop ( wallet_lock) ;
118+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
119+ log_error ! (
120+ self . logger,
121+ "Sync failed due to HTTP connection error, retrying: {}" ,
122+ e
123+ ) ;
124+ let sync_options = SyncOptions { progress : None } ;
125+ self . inner
126+ . lock ( )
127+ . unwrap ( )
128+ . sync ( & self . blockchain , sync_options)
129+ . await
130+ . map_err ( From :: from)
131+ } ,
132+ _ => {
133+ log_error ! ( self . logger, "Sync failed due to Esplora error: {}" , e) ;
134+ Err ( From :: from ( e) )
135+ } ,
127136 } ,
128137 _ => {
129- log_error ! ( self . logger, "Sync failed due to Esplora error: {}" , e) ;
138+ log_error ! ( self . logger, "Wallet sync error: {}" , e) ;
130139 Err ( From :: from ( e) )
131140 } ,
132141 } ,
133- _ => {
134- log_error ! ( self . logger, "Wallet sync error: {}" , e) ;
135- Err ( From :: from ( e) )
136- } ,
137- } ,
142+ }
138143 } ;
139144
140- drop ( guard ) ;
141- cvar . notify_all ( ) ;
145+ self . propagate_result_to_subscribers ( res ) ;
146+
142147 res
143148 }
144149
@@ -303,6 +308,55 @@ where
303308
304309 Ok ( txid)
305310 }
311+
312+ fn register_or_subscribe_pending_sync (
313+ & self ,
314+ ) -> Option < tokio:: sync:: oneshot:: Receiver < Result < ( ) , Error > > > {
315+ let mut sync_status_lock = self . sync_status . lock ( ) . unwrap ( ) ;
316+ match sync_status_lock. deref_mut ( ) {
317+ WalletSyncStatus :: Completed => {
318+ // We're first to register for a sync.
319+ * sync_status_lock = WalletSyncStatus :: InProgress { subscribers : Vec :: new ( ) } ;
320+ None
321+ } ,
322+ WalletSyncStatus :: InProgress { subscribers } => {
323+ // A sync is in-progress, we subscribe.
324+ let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
325+ subscribers. push ( tx) ;
326+ Some ( rx)
327+ } ,
328+ }
329+ }
330+
331+ fn propagate_result_to_subscribers ( & self , res : Result < ( ) , Error > ) {
332+ // Send the notification to any other tasks that might be waiting on it by now.
333+ let mut waiting_subscribers = Vec :: new ( ) ;
334+ {
335+ let mut sync_status_lock = self . sync_status . lock ( ) . unwrap ( ) ;
336+ match sync_status_lock. deref_mut ( ) {
337+ WalletSyncStatus :: Completed => {
338+ // No sync in-progress, do nothing.
339+ return ;
340+ } ,
341+ WalletSyncStatus :: InProgress { subscribers } => {
342+ // A sync is in-progress, we notify subscribers.
343+ mem:: swap ( & mut waiting_subscribers, subscribers) ;
344+ * sync_status_lock = WalletSyncStatus :: Completed ;
345+ } ,
346+ }
347+ }
348+
349+ for sender in waiting_subscribers {
350+ sender. send ( res) . unwrap_or_else ( |e| {
351+ debug_assert ! ( false , "Failed to send wallet sync result to subscribers: {:?}" , e) ;
352+ log_error ! (
353+ self . logger,
354+ "Failed to send wallet sync result to subscribers: {:?}" ,
355+ e
356+ ) ;
357+ } ) ;
358+ }
359+ }
306360}
307361
308362impl < D , B : Deref , E : Deref , L : Deref > WalletSource for Wallet < D , B , E , L >
0 commit comments