Skip to content

Commit 8f85b23

Browse files
committed
Move on-chain syncing to ChainSource
1 parent 833940a commit 8f85b23

File tree

4 files changed

+240
-202
lines changed

4 files changed

+240
-202
lines changed

src/builder.rs

Lines changed: 72 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::chain::{DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL};
8+
use crate::chain::{ChainSource, DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL};
99
use crate::config::{default_user_config, Config, WALLET_KEYS_SEED_LEN};
1010

1111
use crate::connection::ConnectionManager;
@@ -562,58 +562,78 @@ fn build_with_store_internal(
562562
})?,
563563
};
564564

565-
let (esplora_client, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config {
566-
Some(ChainDataSourceConfig::Esplora(server_url)) => {
567-
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
568-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
569-
let esplora_client = client_builder.build_async().unwrap();
570-
let tx_sync = Arc::new(EsploraSyncClient::from_client(
571-
esplora_client.clone(),
572-
Arc::clone(&logger),
573-
));
574-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
575-
tx_sync.client().clone(),
576-
Arc::clone(&logger),
577-
));
578-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
579-
tx_sync.client().clone(),
580-
Arc::clone(&config),
581-
Arc::clone(&logger),
582-
));
583-
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
584-
},
585-
None => {
586-
// Default to Esplora client.
587-
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
588-
let mut client_builder = esplora_client::Builder::new(&server_url);
589-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
590-
let esplora_client = client_builder.build_async().unwrap();
591-
let tx_sync = Arc::new(EsploraSyncClient::from_client(
592-
esplora_client.clone(),
593-
Arc::clone(&logger),
594-
));
595-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
596-
tx_sync.client().clone(),
597-
Arc::clone(&logger),
598-
));
599-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
600-
tx_sync.client().clone(),
601-
Arc::clone(&config),
602-
Arc::clone(&logger),
603-
));
604-
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
605-
},
606-
};
565+
let (wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator) =
566+
match chain_data_source_config {
567+
Some(ChainDataSourceConfig::Esplora(server_url)) => {
568+
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
569+
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
570+
let esplora_client = client_builder.build_async().unwrap();
571+
let tx_sync =
572+
Arc::new(EsploraSyncClient::from_client(esplora_client, Arc::clone(&logger)));
573+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
574+
tx_sync.client().clone(),
575+
Arc::clone(&logger),
576+
));
577+
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
578+
tx_sync.client().clone(),
579+
Arc::clone(&config),
580+
Arc::clone(&logger),
581+
));
582+
583+
let wallet = Arc::new(Wallet::new(
584+
bdk_wallet,
585+
wallet_persister,
586+
Arc::clone(&tx_broadcaster),
587+
Arc::clone(&fee_estimator),
588+
Arc::clone(&logger),
589+
));
590+
591+
let chain_source = Arc::new(ChainSource::new_esplora(
592+
server_url.clone(),
593+
Arc::clone(&wallet),
594+
Arc::clone(&logger),
595+
));
596+
(wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator)
597+
},
598+
None => {
599+
// Default to Esplora client.
600+
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
601+
let mut client_builder = esplora_client::Builder::new(&server_url);
602+
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
603+
let esplora_client = client_builder.build_async().unwrap();
604+
let tx_sync = Arc::new(EsploraSyncClient::from_client(
605+
esplora_client.clone(),
606+
Arc::clone(&logger),
607+
));
608+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
609+
tx_sync.client().clone(),
610+
Arc::clone(&logger),
611+
));
612+
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
613+
tx_sync.client().clone(),
614+
Arc::clone(&config),
615+
Arc::clone(&logger),
616+
));
617+
618+
let wallet = Arc::new(Wallet::new(
619+
bdk_wallet,
620+
wallet_persister,
621+
Arc::clone(&tx_broadcaster),
622+
Arc::clone(&fee_estimator),
623+
Arc::clone(&logger),
624+
));
625+
626+
let chain_source = Arc::new(ChainSource::new_esplora(
627+
server_url.clone(),
628+
Arc::clone(&wallet),
629+
Arc::clone(&logger),
630+
));
631+
632+
(wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator)
633+
},
634+
};
607635

608636
let runtime = Arc::new(RwLock::new(None));
609-
let wallet = Arc::new(Wallet::new(
610-
bdk_wallet,
611-
wallet_persister,
612-
esplora_client,
613-
Arc::clone(&tx_broadcaster),
614-
Arc::clone(&fee_estimator),
615-
Arc::clone(&logger),
616-
));
617637

618638
// Initialize the ChainMonitor
619639
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
@@ -989,6 +1009,7 @@ fn build_with_store_internal(
9891009
event_handling_stopped_sender,
9901010
config,
9911011
wallet,
1012+
chain_source,
9921013
tx_sync,
9931014
tx_broadcaster,
9941015
fee_estimator,

src/chain/mod.rs

Lines changed: 137 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,165 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::logger::FilesystemLogger;
8+
use crate::config::{BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS};
9+
use crate::logger::{log_error, log_info, FilesystemLogger, Logger};
10+
use crate::types::Wallet;
11+
use crate::Error;
12+
13+
use bdk_esplora::EsploraAsyncExt;
914

1015
use esplora_client::AsyncClient as EsploraAsyncClient;
1116

12-
use std::sync::Arc;
17+
use std::sync::{Arc, Mutex};
18+
use std::time::Duration;
1319

1420
// The default Esplora server we're using.
1521
pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api";
1622

1723
// The default Esplora client timeout we're using.
1824
pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10;
1925

26+
pub(crate) enum WalletSyncStatus {
27+
Completed,
28+
InProgress { subscribers: tokio::sync::broadcast::Sender<Result<(), Error>> },
29+
}
30+
31+
impl WalletSyncStatus {
32+
fn register_or_subscribe_pending_sync(
33+
&mut self,
34+
) -> Option<tokio::sync::broadcast::Receiver<Result<(), Error>>> {
35+
match self {
36+
WalletSyncStatus::Completed => {
37+
// We're first to register for a sync.
38+
let (tx, _) = tokio::sync::broadcast::channel(1);
39+
*self = WalletSyncStatus::InProgress { subscribers: tx };
40+
None
41+
},
42+
WalletSyncStatus::InProgress { subscribers } => {
43+
// A sync is in-progress, we subscribe.
44+
let rx = subscribers.subscribe();
45+
Some(rx)
46+
},
47+
}
48+
}
49+
50+
fn propagate_result_to_subscribers(&mut self, res: Result<(), Error>) {
51+
// Send the notification to any other tasks that might be waiting on it by now.
52+
{
53+
match self {
54+
WalletSyncStatus::Completed => {
55+
// No sync in-progress, do nothing.
56+
return;
57+
},
58+
WalletSyncStatus::InProgress { subscribers } => {
59+
// A sync is in-progress, we notify subscribers.
60+
if subscribers.receiver_count() > 0 {
61+
match subscribers.send(res) {
62+
Ok(_) => (),
63+
Err(e) => {
64+
debug_assert!(
65+
false,
66+
"Failed to send wallet sync result to subscribers: {:?}",
67+
e
68+
);
69+
},
70+
}
71+
}
72+
*self = WalletSyncStatus::Completed;
73+
},
74+
}
75+
}
76+
}
77+
}
78+
2079
pub(crate) enum ChainSource {
2180
Esplora {
2281
esplora_client: EsploraAsyncClient,
82+
onchain_wallet: Arc<Wallet>,
83+
onchain_wallet_sync_status: Mutex<WalletSyncStatus>,
2384
logger: Arc<FilesystemLogger>,
2485
},
2586
}
2687

2788
impl ChainSource {
2889
pub(crate) fn new_esplora(
29-
server_url: String,
30-
logger: Arc<FilesystemLogger>,
90+
server_url: String, onchain_wallet: Arc<Wallet>, logger: Arc<FilesystemLogger>,
3191
) -> Self {
3292
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
3393
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
3494
let esplora_client = client_builder.build_async().unwrap();
35-
Self::Esplora { esplora_client, logger }
95+
let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
96+
Self::Esplora {
97+
esplora_client,
98+
onchain_wallet,
99+
onchain_wallet_sync_status,
100+
logger,
101+
}
102+
}
103+
104+
pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> {
105+
match self {
106+
Self::Esplora {
107+
esplora_client,
108+
onchain_wallet,
109+
onchain_wallet_sync_status,
110+
logger,
111+
..
112+
} => {
113+
let receiver_res = {
114+
let mut status_lock = onchain_wallet_sync_status.lock().unwrap();
115+
status_lock.register_or_subscribe_pending_sync()
116+
};
117+
if let Some(mut sync_receiver) = receiver_res {
118+
log_info!(logger, "Sync in progress, skipping.");
119+
return sync_receiver.recv().await.map_err(|e| {
120+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
121+
log_error!(logger, "Failed to receive wallet sync result: {:?}", e);
122+
Error::WalletOperationFailed
123+
})?;
124+
}
125+
126+
let res = {
127+
let full_scan_request = onchain_wallet.get_full_scan_request();
128+
129+
let wallet_sync_timeout_fut = tokio::time::timeout(
130+
Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS),
131+
esplora_client.full_scan(
132+
full_scan_request,
133+
BDK_CLIENT_STOP_GAP,
134+
BDK_CLIENT_CONCURRENCY,
135+
),
136+
);
137+
138+
match wallet_sync_timeout_fut.await {
139+
Ok(res) => match res {
140+
Ok(update) => onchain_wallet.apply_update(update),
141+
Err(e) => match *e {
142+
esplora_client::Error::Reqwest(he) => {
143+
log_error!(
144+
logger,
145+
"Sync failed due to HTTP connection error: {}",
146+
he
147+
);
148+
Err(Error::WalletOperationFailed)
149+
},
150+
_ => {
151+
log_error!(logger, "Sync failed due to Esplora error: {}", e);
152+
Err(Error::WalletOperationFailed)
153+
},
154+
},
155+
},
156+
Err(e) => {
157+
log_error!(logger, "On-chain wallet sync timed out: {}", e);
158+
Err(Error::WalletOperationTimeout)
159+
},
160+
}
161+
};
162+
163+
onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
164+
165+
res
166+
},
167+
}
36168
}
37169
}

src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ use config::{
127127
WALLET_SYNC_INTERVAL_MINIMUM_SECS,
128128
};
129129
use connection::ConnectionManager;
130+
use chain::ChainSource;
130131
use event::{EventHandler, EventQueue};
131132
use gossip::GossipSource;
132133
use graph::NetworkGraph;
@@ -179,6 +180,7 @@ pub struct Node {
179180
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
180181
config: Arc<Config>,
181182
wallet: Arc<Wallet>,
183+
chain_source: Arc<ChainSource>,
182184
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
183185
tx_broadcaster: Arc<Broadcaster>,
184186
fee_estimator: Arc<FeeEstimator>,
@@ -273,7 +275,7 @@ impl Node {
273275
})?;
274276

275277
// Setup wallet sync
276-
let wallet = Arc::clone(&self.wallet);
278+
let chain_source = Arc::clone(&self.chain_source);
277279
let sync_logger = Arc::clone(&self.logger);
278280
let sync_onchain_wallet_timestamp = Arc::clone(&self.latest_onchain_wallet_sync_timestamp);
279281
let mut stop_sync = self.stop_sender.subscribe();
@@ -297,7 +299,7 @@ impl Node {
297299
}
298300
_ = onchain_wallet_sync_interval.tick() => {
299301
let now = Instant::now();
300-
match wallet.sync().await {
302+
match chain_source.sync_onchain_wallet().await {
301303
Ok(()) => {
302304
log_trace!(
303305
sync_logger,
@@ -1299,7 +1301,7 @@ impl Node {
12991301
return Err(Error::NotRunning);
13001302
}
13011303

1302-
let wallet = Arc::clone(&self.wallet);
1304+
let chain_source = Arc::clone(&self.chain_source);
13031305
let tx_sync = Arc::clone(&self.tx_sync);
13041306
let sync_cman = Arc::clone(&self.channel_manager);
13051307
let archive_cman = Arc::clone(&self.channel_manager);
@@ -1325,7 +1327,7 @@ impl Node {
13251327
let now = Instant::now();
13261328
// We don't add an additional timeout here, as `Wallet::sync` already returns
13271329
// after a timeout.
1328-
match wallet.sync().await {
1330+
match chain_source.sync_onchain_wallet().await {
13291331
Ok(()) => {
13301332
log_info!(
13311333
sync_logger,

0 commit comments

Comments
 (0)