Skip to content

Commit 664e0b8

Browse files
committed
Move Lightning wallet syncing to ChainSource
1 parent 8f85b23 commit 664e0b8

File tree

5 files changed

+183
-150
lines changed

5 files changed

+183
-150
lines changed

src/builder.rs

Lines changed: 58 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ use lightning::util::sweep::OutputSweeper;
5050

5151
use lightning_persister::fs_store::FilesystemStore;
5252

53-
use lightning_transaction_sync::EsploraSyncClient;
54-
5553
use lightning_liquidity::lsps2::client::LSPS2ClientConfig;
5654
use lightning_liquidity::{LiquidityClientConfig, LiquidityManager};
5755

@@ -562,82 +560,71 @@ fn build_with_store_internal(
562560
})?,
563561
};
564562

565-
let (wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator) =
566-
match chain_data_source_config {
567-
Some(ChainDataSourceConfig::Esplora(server_url)) => {
568-
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
569-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
570-
let esplora_client = client_builder.build_async().unwrap();
571-
let tx_sync =
572-
Arc::new(EsploraSyncClient::from_client(esplora_client, Arc::clone(&logger)));
573-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
574-
tx_sync.client().clone(),
575-
Arc::clone(&logger),
576-
));
577-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
578-
tx_sync.client().clone(),
579-
Arc::clone(&config),
580-
Arc::clone(&logger),
581-
));
563+
let (wallet, chain_source, tx_broadcaster, fee_estimator) = match chain_data_source_config {
564+
Some(ChainDataSourceConfig::Esplora(server_url)) => {
565+
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
566+
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
567+
let esplora_client = client_builder.build_async().unwrap();
568+
let tx_broadcaster =
569+
Arc::new(TransactionBroadcaster::new(esplora_client.clone(), Arc::clone(&logger)));
570+
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
571+
esplora_client,
572+
Arc::clone(&config),
573+
Arc::clone(&logger),
574+
));
582575

583-
let wallet = Arc::new(Wallet::new(
584-
bdk_wallet,
585-
wallet_persister,
586-
Arc::clone(&tx_broadcaster),
587-
Arc::clone(&fee_estimator),
588-
Arc::clone(&logger),
589-
));
576+
let wallet = Arc::new(Wallet::new(
577+
bdk_wallet,
578+
wallet_persister,
579+
Arc::clone(&tx_broadcaster),
580+
Arc::clone(&fee_estimator),
581+
Arc::clone(&logger),
582+
));
590583

591-
let chain_source = Arc::new(ChainSource::new_esplora(
592-
server_url.clone(),
593-
Arc::clone(&wallet),
594-
Arc::clone(&logger),
595-
));
596-
(wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator)
597-
},
598-
None => {
599-
// Default to Esplora client.
600-
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
601-
let mut client_builder = esplora_client::Builder::new(&server_url);
602-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
603-
let esplora_client = client_builder.build_async().unwrap();
604-
let tx_sync = Arc::new(EsploraSyncClient::from_client(
605-
esplora_client.clone(),
606-
Arc::clone(&logger),
607-
));
608-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
609-
tx_sync.client().clone(),
610-
Arc::clone(&logger),
611-
));
612-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
613-
tx_sync.client().clone(),
614-
Arc::clone(&config),
615-
Arc::clone(&logger),
616-
));
584+
let chain_source = Arc::new(ChainSource::new_esplora(
585+
server_url.clone(),
586+
Arc::clone(&wallet),
587+
Arc::clone(&logger),
588+
));
589+
(wallet, chain_source, tx_broadcaster, fee_estimator)
590+
},
591+
None => {
592+
// Default to Esplora client.
593+
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
594+
let mut client_builder = esplora_client::Builder::new(&server_url);
595+
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
596+
let esplora_client = client_builder.build_async().unwrap();
597+
let tx_broadcaster =
598+
Arc::new(TransactionBroadcaster::new(esplora_client.clone(), Arc::clone(&logger)));
599+
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
600+
esplora_client,
601+
Arc::clone(&config),
602+
Arc::clone(&logger),
603+
));
617604

618-
let wallet = Arc::new(Wallet::new(
619-
bdk_wallet,
620-
wallet_persister,
621-
Arc::clone(&tx_broadcaster),
622-
Arc::clone(&fee_estimator),
623-
Arc::clone(&logger),
624-
));
605+
let wallet = Arc::new(Wallet::new(
606+
bdk_wallet,
607+
wallet_persister,
608+
Arc::clone(&tx_broadcaster),
609+
Arc::clone(&fee_estimator),
610+
Arc::clone(&logger),
611+
));
625612

626-
let chain_source = Arc::new(ChainSource::new_esplora(
627-
server_url.clone(),
628-
Arc::clone(&wallet),
629-
Arc::clone(&logger),
630-
));
613+
let chain_source = Arc::new(ChainSource::new_esplora(
614+
server_url.clone(),
615+
Arc::clone(&wallet),
616+
Arc::clone(&logger),
617+
));
631618

632-
(wallet, chain_source, tx_sync, tx_broadcaster, fee_estimator)
633-
},
634-
};
619+
(wallet, chain_source, tx_broadcaster, fee_estimator)
620+
},
621+
};
635622

636623
let runtime = Arc::new(RwLock::new(None));
637624

638625
// Initialize the ChainMonitor
639626
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
640-
Some(Arc::clone(&tx_sync)),
627+
Some(Arc::clone(&chain_source)),
641628
Arc::clone(&tx_broadcaster),
642629
Arc::clone(&logger),
643630
Arc::clone(&fee_estimator),
@@ -853,7 +840,7 @@ fn build_with_store_internal(
853840
let liquidity_manager = Arc::new(LiquidityManager::new(
854841
Arc::clone(&keys_manager),
855842
Arc::clone(&channel_manager),
856-
Some(Arc::clone(&tx_sync)),
843+
Some(Arc::clone(&chain_source)),
857844
None,
858845
None,
859846
liquidity_client_config,
@@ -921,7 +908,7 @@ fn build_with_store_internal(
921908
let output_sweeper = match io::utils::read_output_sweeper(
922909
Arc::clone(&tx_broadcaster),
923910
Arc::clone(&fee_estimator),
924-
Arc::clone(&tx_sync),
911+
Arc::clone(&chain_source),
925912
Arc::clone(&keys_manager),
926913
Arc::clone(&kv_store),
927914
Arc::clone(&logger),
@@ -933,7 +920,7 @@ fn build_with_store_internal(
933920
channel_manager.current_best_block(),
934921
Arc::clone(&tx_broadcaster),
935922
Arc::clone(&fee_estimator),
936-
Some(Arc::clone(&tx_sync)),
923+
Some(Arc::clone(&chain_source)),
937924
Arc::clone(&keys_manager),
938925
Arc::clone(&keys_manager),
939926
Arc::clone(&kv_store),
@@ -1010,7 +997,6 @@ fn build_with_store_internal(
1010997
config,
1011998
wallet,
1012999
chain_source,
1013-
tx_sync,
10141000
tx_broadcaster,
10151001
fee_estimator,
10161002
event_queue,

src/chain/mod.rs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,18 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::config::{BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS};
8+
use crate::config::{
9+
BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS,
10+
LDK_WALLET_SYNC_TIMEOUT_SECS,
11+
};
912
use crate::logger::{log_error, log_info, FilesystemLogger, Logger};
1013
use crate::types::Wallet;
1114
use crate::Error;
1215

16+
use lightning::chain::{Confirm, Filter};
17+
18+
use lightning_transaction_sync::EsploraSyncClient;
19+
1320
use bdk_esplora::EsploraAsyncExt;
1421

1522
use esplora_client::AsyncClient as EsploraAsyncClient;
@@ -81,6 +88,8 @@ pub(crate) enum ChainSource {
8188
esplora_client: EsploraAsyncClient,
8289
onchain_wallet: Arc<Wallet>,
8390
onchain_wallet_sync_status: Mutex<WalletSyncStatus>,
91+
tx_sync: Arc<EsploraSyncClient<Arc<FilesystemLogger>>>,
92+
lightning_wallet_sync_status: Mutex<WalletSyncStatus>,
8493
logger: Arc<FilesystemLogger>,
8594
},
8695
}
@@ -92,11 +101,16 @@ impl ChainSource {
92101
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
93102
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
94103
let esplora_client = client_builder.build_async().unwrap();
104+
let tx_sync =
105+
Arc::new(EsploraSyncClient::from_client(esplora_client.clone(), Arc::clone(&logger)));
95106
let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
107+
let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
96108
Self::Esplora {
97109
esplora_client,
98110
onchain_wallet,
99111
onchain_wallet_sync_status,
112+
tx_sync,
113+
lightning_wallet_sync_status,
100114
logger,
101115
}
102116
}
@@ -166,4 +180,57 @@ impl ChainSource {
166180
},
167181
}
168182
}
183+
184+
pub(crate) async fn sync_lightning_wallet(
185+
&self, confirmables: Vec<&(dyn Confirm + Send + Sync)>,
186+
) -> Result<(), Error> {
187+
match self {
188+
Self::Esplora { tx_sync, lightning_wallet_sync_status, logger, .. } => {
189+
let receiver_res = {
190+
let mut status_lock = lightning_wallet_sync_status.lock().unwrap();
191+
status_lock.register_or_subscribe_pending_sync()
192+
};
193+
if let Some(mut sync_receiver) = receiver_res {
194+
log_info!(logger, "Sync in progress, skipping.");
195+
return sync_receiver.recv().await.map_err(|e| {
196+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
197+
log_error!(logger, "Failed to receive wallet sync result: {:?}", e);
198+
Error::WalletOperationFailed
199+
})?;
200+
}
201+
let res = {
202+
let timeout_fut = tokio::time::timeout(
203+
Duration::from_secs(LDK_WALLET_SYNC_TIMEOUT_SECS),
204+
tx_sync.sync(confirmables),
205+
);
206+
match timeout_fut.await {
207+
Ok(res) => res.map_err(|_| Error::TxSyncFailed),
208+
Err(e) => {
209+
log_error!(logger, "Lightning wallet sync timed out: {}", e);
210+
Err(Error::TxSyncTimeout)
211+
},
212+
}
213+
};
214+
215+
lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res);
216+
217+
res
218+
},
219+
}
220+
}
221+
}
222+
223+
impl Filter for ChainSource {
224+
fn register_tx(&self, txid: &bitcoin::Txid, script_pubkey: &bitcoin::Script) {
225+
{
226+
match self {
227+
Self::Esplora { tx_sync, .. } => tx_sync.register_tx(txid, script_pubkey),
228+
}
229+
}
230+
}
231+
fn register_output(&self, output: lightning::chain::WatchedOutput) {
232+
match self {
233+
Self::Esplora { tx_sync, .. } => tx_sync.register_output(output),
234+
}
235+
}
169236
}

src/io/utils.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
use super::*;
99
use crate::config::WALLET_KEYS_SEED_LEN;
1010

11+
use crate::chain::ChainSource;
1112
use crate::logger::{log_error, FilesystemLogger};
1213
use crate::peer_store::PeerStore;
1314
use crate::sweep::DeprecatedSpendableOutputInfo;
14-
use crate::types::{Broadcaster, ChainSource, DynStore, FeeEstimator, KeysManager, Sweeper};
15+
use crate::types::{Broadcaster, DynStore, FeeEstimator, KeysManager, Sweeper};
1516
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
1617
use crate::{Error, EventQueue, PaymentDetails};
1718

0 commit comments

Comments
 (0)