Skip to content

Commit 5290a64

Browse files
committed
Implement sync_onchain_wallet for ChainSource::Electrum
1 parent 1323533 commit 5290a64

File tree

3 files changed

+166
-3
lines changed

3 files changed

+166
-3
lines changed

src/chain/electrum.rs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
// accordance with one or both of these licenses.
77

88
use crate::config::{
9-
Config, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS,
10-
TX_BROADCAST_TIMEOUT_SECS,
9+
Config, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS,
10+
LDK_WALLET_SYNC_TIMEOUT_SECS, TX_BROADCAST_TIMEOUT_SECS,
1111
};
1212
use crate::error::Error;
1313
use crate::fee_estimator::{
@@ -20,6 +20,12 @@ use lightning::chain::{Confirm, Filter, WatchedOutput};
2020
use lightning::util::ser::Writeable;
2121
use lightning_transaction_sync::ElectrumSyncClient;
2222

23+
use bdk_chain::bdk_core::spk_client::FullScanRequest as BdkFullScanRequest;
24+
use bdk_chain::bdk_core::spk_client::FullScanResponse as BdkFullScanResponse;
25+
use bdk_chain::bdk_core::spk_client::SyncRequest as BdkSyncRequest;
26+
use bdk_chain::bdk_core::spk_client::SyncResponse as BdkSyncResponse;
27+
use bdk_wallet::KeychainKind as BdkKeyChainKind;
28+
2329
use bdk_electrum::BdkElectrumClient;
2430

2531
use electrum_client::{Batch, Client as ElectrumClient, ElectrumApi};
@@ -30,6 +36,8 @@ use std::collections::HashMap;
3036
use std::sync::Arc;
3137
use std::time::{Duration, Instant};
3238

39+
const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5;
40+
3341
pub(crate) struct ElectrumRuntimeClient {
3442
electrum_client: Arc<ElectrumClient>,
3543
bdk_electrum_client: Arc<BdkElectrumClient<ElectrumClient>>,
@@ -96,6 +104,69 @@ impl ElectrumRuntimeClient {
96104
Ok(res)
97105
}
98106

107+
pub(crate) async fn get_full_scan_wallet_update(
108+
&self, request: BdkFullScanRequest<BdkKeyChainKind>,
109+
cached_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
110+
) -> Result<BdkFullScanResponse<BdkKeyChainKind>, Error> {
111+
let bdk_electrum_client = Arc::clone(&self.bdk_electrum_client);
112+
bdk_electrum_client.populate_tx_cache(cached_txs);
113+
114+
let spawn_fut = self.runtime.spawn_blocking(move || {
115+
bdk_electrum_client.full_scan(
116+
request,
117+
BDK_CLIENT_STOP_GAP,
118+
BDK_ELECTRUM_CLIENT_BATCH_SIZE,
119+
true,
120+
)
121+
});
122+
let wallet_sync_timeout_fut =
123+
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
124+
125+
wallet_sync_timeout_fut
126+
.await
127+
.map_err(|e| {
128+
log_error!(self.logger, "Sync of on-chain wallet timed out: {}", e);
129+
Error::WalletOperationTimeout
130+
})?
131+
.map_err(|e| {
132+
log_error!(self.logger, "Sync of on-chain wallet failed: {}", e);
133+
Error::WalletOperationFailed
134+
})?
135+
.map_err(|e| {
136+
log_error!(self.logger, "Sync of on-chain wallet failed: {}", e);
137+
Error::WalletOperationFailed
138+
})
139+
}
140+
141+
pub(crate) async fn get_incremental_sync_wallet_update(
142+
&self, request: BdkSyncRequest<(BdkKeyChainKind, u32)>,
143+
cached_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
144+
) -> Result<BdkSyncResponse, Error> {
145+
let bdk_electrum_client = Arc::clone(&self.bdk_electrum_client);
146+
bdk_electrum_client.populate_tx_cache(cached_txs);
147+
148+
let spawn_fut = self.runtime.spawn_blocking(move || {
149+
bdk_electrum_client.sync(request, BDK_ELECTRUM_CLIENT_BATCH_SIZE, true)
150+
});
151+
let wallet_sync_timeout_fut =
152+
tokio::time::timeout(Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), spawn_fut);
153+
154+
wallet_sync_timeout_fut
155+
.await
156+
.map_err(|e| {
157+
log_error!(self.logger, "Incremental sync of on-chain wallet timed out: {}", e);
158+
Error::WalletOperationTimeout
159+
})?
160+
.map_err(|e| {
161+
log_error!(self.logger, "Incremental sync of on-chain wallet failed: {}", e);
162+
Error::WalletOperationFailed
163+
})?
164+
.map_err(|e| {
165+
log_error!(self.logger, "Incremental sync of on-chain wallet failed: {}", e);
166+
Error::WalletOperationFailed
167+
})
168+
}
169+
99170
pub(crate) async fn broadcast(&self, tx: Transaction) {
100171
let electrum_client = Arc::clone(&self.electrum_client);
101172

src/chain/mod.rs

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,95 @@ impl ChainSource {
658658

659659
res
660660
},
661-
Self::Electrum { .. } => todo!(),
661+
Self::Electrum {
662+
electrum_runtime_client,
663+
onchain_wallet,
664+
onchain_wallet_sync_status,
665+
kv_store,
666+
logger,
667+
node_metrics,
668+
..
669+
} => {
670+
let electrum_client: Arc<ElectrumRuntimeClient> =
671+
if let Some(client) = electrum_runtime_client.read().unwrap().as_ref() {
672+
Arc::clone(client)
673+
} else {
674+
debug_assert!(
675+
false,
676+
"We should have started the chain source before syncing the onchain wallet"
677+
);
678+
return Err(Error::FeerateEstimationUpdateFailed);
679+
};
680+
let receiver_res = {
681+
let mut status_lock = onchain_wallet_sync_status.lock().unwrap();
682+
status_lock.register_or_subscribe_pending_sync()
683+
};
684+
if let Some(mut sync_receiver) = receiver_res {
685+
log_info!(logger, "Sync in progress, skipping.");
686+
return sync_receiver.recv().await.map_err(|e| {
687+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
688+
log_error!(logger, "Failed to receive wallet sync result: {:?}", e);
689+
Error::WalletOperationFailed
690+
})?;
691+
}
692+
693+
// If this is our first sync, do a full scan with the configured gap limit.
694+
// Otherwise just do an incremental sync.
695+
let incremental_sync =
696+
node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();
697+
698+
macro_rules! get_and_apply_wallet_update {
699+
($sync_future: expr) => {{
700+
let now = Instant::now();
701+
let update = $sync_future.await?;
702+
703+
match onchain_wallet.apply_update(update) {
704+
Ok(()) => {
705+
log_info!(
706+
logger,
707+
"{} of on-chain wallet finished in {}ms.",
708+
if incremental_sync { "Incremental sync" } else { "Sync" },
709+
now.elapsed().as_millis()
710+
);
711+
let unix_time_secs_opt = SystemTime::now()
712+
.duration_since(UNIX_EPOCH)
713+
.ok()
714+
.map(|d| d.as_secs());
715+
{
716+
let mut locked_node_metrics = node_metrics.write().unwrap();
717+
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
718+
unix_time_secs_opt;
719+
write_node_metrics(
720+
&*locked_node_metrics,
721+
Arc::clone(&kv_store),
722+
Arc::clone(&logger),
723+
)?;
724+
}
725+
Ok(())
726+
},
727+
Err(e) => Err(e),
728+
}
729+
}};
730+
}
731+
732+
let cached_txs = onchain_wallet.get_cached_txs();
733+
734+
let res = if incremental_sync {
735+
let incremental_sync_request = onchain_wallet.get_incremental_sync_request();
736+
let incremental_sync_fut = electrum_client
737+
.get_incremental_sync_wallet_update(incremental_sync_request, cached_txs);
738+
get_and_apply_wallet_update!(incremental_sync_fut)
739+
} else {
740+
let full_scan_request = onchain_wallet.get_full_scan_request();
741+
let full_scan_fut =
742+
electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs);
743+
get_and_apply_wallet_update!(full_scan_fut)
744+
};
745+
746+
onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
747+
748+
res
749+
},
662750
Self::BitcoindRpc { .. } => {
663751
// In BitcoindRpc mode we sync lightning and onchain wallet in one go by via
664752
// `ChainPoller`. So nothing to do here.

src/wallet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ where
9898
self.inner.lock().unwrap().start_sync_with_revealed_spks().build()
9999
}
100100

101+
pub(crate) fn get_cached_txs(&self) -> Vec<Arc<Transaction>> {
102+
self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect()
103+
}
104+
101105
pub(crate) fn current_best_block(&self) -> BestBlock {
102106
let checkpoint = self.inner.lock().unwrap().latest_checkpoint();
103107
BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() }

0 commit comments

Comments
 (0)