Skip to content

Commit 0ae087d

Browse files
committed
Merge upstream/main
2 parents 11e275f + 650fd19 commit 0ae087d

File tree

11 files changed

+592
-446
lines changed

11 files changed

+592
-446
lines changed

src/chain/electrum.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use bdk_chain::bdk_core::spk_client::{
1414
SyncRequest as BdkSyncRequest, SyncResponse as BdkSyncResponse,
1515
};
1616
use bdk_electrum::BdkElectrumClient;
17+
use bdk_wallet::event::WalletEvent;
1718
use bdk_wallet::{KeychainKind as BdkKeyChainKind, Update as BdkUpdate};
1819
use bitcoin::{FeeRate, Network, Script, ScriptBuf, Transaction, Txid};
1920
use electrum_client::{
@@ -39,8 +40,6 @@ use crate::runtime::Runtime;
3940
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
4041
use crate::NodeMetrics;
4142

42-
use bdk_wallet::event::WalletEvent;
43-
4443
const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5;
4544
const ELECTRUM_CLIENT_NUM_RETRIES: u8 = 3;
4645
const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 10;
@@ -103,21 +102,30 @@ impl ElectrumChainSource {
103102
};
104103
if let Some(mut sync_receiver) = receiver_res {
105104
log_info!(self.logger, "Sync in progress, skipping.");
106-
return sync_receiver.recv().await.map(|res| res.map(|_| Vec::new())).map_err(|e| {
107-
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
108-
log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e);
109-
Error::WalletOperationFailed
110-
})?;
105+
match sync_receiver.recv().await {
106+
Ok(Ok(())) => return Ok(Vec::new()),
107+
Ok(Err(e)) => return Err(e),
108+
Err(e) => {
109+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
110+
log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e);
111+
return Err(Error::WalletOperationFailed);
112+
},
113+
}
111114
}
112115

113116
let res = self.sync_onchain_wallet_inner(onchain_wallet).await;
114117

115-
self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res.as_ref().map(|_| ()).map_err(|e| e.clone()));
118+
self.onchain_wallet_sync_status
119+
.lock()
120+
.unwrap()
121+
.propagate_result_to_subscribers(res.as_ref().map(|_| ()).map_err(|e| e.clone()));
116122

117123
res
118124
}
119125

120-
async fn sync_onchain_wallet_inner(&self, onchain_wallet: Arc<Wallet>) -> Result<Vec<WalletEvent>, Error> {
126+
async fn sync_onchain_wallet_inner(
127+
&self, onchain_wallet: Arc<Wallet>,
128+
) -> Result<Vec<WalletEvent>, Error> {
121129
let electrum_client: Arc<ElectrumRuntimeClient> =
122130
if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() {
123131
Arc::clone(client)

src/chain/esplora.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use std::sync::{Arc, Mutex, RwLock};
1010
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
1111

1212
use bdk_esplora::EsploraAsyncExt;
13+
use bdk_wallet::event::WalletEvent;
1314
use bitcoin::{FeeRate, Network, Script, Transaction, Txid};
1415
use esplora_client::AsyncClient as EsploraAsyncClient;
1516
use lightning::chain::{Confirm, Filter, WatchedOutput};
@@ -31,8 +32,6 @@ use crate::logger::{log_bytes, log_error, log_info, log_trace, LdkLogger, Logger
3132
use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
3233
use crate::{Error, NodeMetrics};
3334

34-
use bdk_wallet::event::WalletEvent;
35-
3635
pub(super) struct EsploraChainSource {
3736
pub(super) sync_config: EsploraSyncConfig,
3837
esplora_client: EsploraAsyncClient,
@@ -88,23 +87,30 @@ impl EsploraChainSource {
8887
};
8988
if let Some(mut sync_receiver) = receiver_res {
9089
log_info!(self.logger, "Sync in progress, skipping.");
91-
sync_receiver.recv().await.map_err(|e| {
92-
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
93-
log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e);
94-
Error::WalletOperationFailed
95-
})??;
96-
// Return empty events since we're just waiting for another sync
97-
return Ok(Vec::new());
90+
match sync_receiver.recv().await {
91+
Ok(Ok(())) => return Ok(Vec::new()),
92+
Ok(Err(e)) => return Err(e),
93+
Err(e) => {
94+
debug_assert!(false, "Failed to receive wallet sync result: {:?}", e);
95+
log_error!(self.logger, "Failed to receive wallet sync result: {:?}", e);
96+
return Err(Error::WalletOperationFailed);
97+
},
98+
}
9899
}
99100

100101
let res = self.sync_onchain_wallet_inner(onchain_wallet).await;
101102

102-
self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res.as_ref().map(|_| ()).map_err(|e| e.clone()));
103+
self.onchain_wallet_sync_status
104+
.lock()
105+
.unwrap()
106+
.propagate_result_to_subscribers(res.as_ref().map(|_| ()).map_err(|e| e.clone()));
103107

104108
res
105109
}
106110

107-
async fn sync_onchain_wallet_inner(&self, onchain_wallet: Arc<Wallet>) -> Result<Vec<WalletEvent>, Error> {
111+
async fn sync_onchain_wallet_inner(
112+
&self, onchain_wallet: Arc<Wallet>,
113+
) -> Result<Vec<WalletEvent>, Error> {
108114
// If this is our first sync, do a full scan with the configured gap limit.
109115
// Otherwise just do an incremental sync.
110116
let incremental_sync =

src/chain/mod.rs

Lines changed: 98 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,8 @@ where
256256
// We don't emit an event for chain tip changes as this is too noisy
257257
},
258258
BdkWalletEvent::TxReplaced { txid, conflicts, .. } => {
259-
let conflict_txids: Vec<Txid> = conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect();
259+
let conflict_txids: Vec<Txid> =
260+
conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect();
260261
log_info!(
261262
logger,
262263
"Onchain transaction {} was replaced by {} transaction(s)",
@@ -295,7 +296,16 @@ impl ChainSource {
295296
node_metrics,
296297
);
297298
let kind = ChainSourceKind::Esplora(esplora_chain_source);
298-
(Self { kind, tx_broadcaster, logger, onchain_wallet: Arc::new(Mutex::new(None)), event_queue: Arc::new(Mutex::new(None)) }, None)
299+
(
300+
Self {
301+
kind,
302+
tx_broadcaster,
303+
logger,
304+
onchain_wallet: Arc::new(Mutex::new(None)),
305+
event_queue: Arc::new(Mutex::new(None)),
306+
},
307+
None,
308+
)
299309
}
300310

301311
pub(crate) fn new_electrum(
@@ -314,7 +324,16 @@ impl ChainSource {
314324
node_metrics,
315325
);
316326
let kind = ChainSourceKind::Electrum(electrum_chain_source);
317-
(Self { kind, tx_broadcaster, logger, onchain_wallet: Arc::new(Mutex::new(None)), event_queue: Arc::new(Mutex::new(None)) }, None)
327+
(
328+
Self {
329+
kind,
330+
tx_broadcaster,
331+
logger,
332+
onchain_wallet: Arc::new(Mutex::new(None)),
333+
event_queue: Arc::new(Mutex::new(None)),
334+
},
335+
None,
336+
)
318337
}
319338

320339
pub(crate) async fn new_bitcoind_rpc(
@@ -336,7 +355,16 @@ impl ChainSource {
336355
);
337356
let best_block = bitcoind_chain_source.poll_best_block().await.ok();
338357
let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source);
339-
(Self { kind, tx_broadcaster, logger, onchain_wallet: Arc::new(Mutex::new(None)), event_queue: Arc::new(Mutex::new(None)) }, best_block)
358+
(
359+
Self {
360+
kind,
361+
tx_broadcaster,
362+
logger,
363+
onchain_wallet: Arc::new(Mutex::new(None)),
364+
event_queue: Arc::new(Mutex::new(None)),
365+
},
366+
best_block,
367+
)
340368
}
341369

342370
pub(crate) async fn new_bitcoind_rest(
@@ -359,7 +387,16 @@ impl ChainSource {
359387
);
360388
let best_block = bitcoind_chain_source.poll_best_block().await.ok();
361389
let kind = ChainSourceKind::Bitcoind(bitcoind_chain_source);
362-
(Self { kind, tx_broadcaster, logger, onchain_wallet: Arc::new(Mutex::new(None)), event_queue: Arc::new(Mutex::new(None)) }, best_block)
390+
(
391+
Self {
392+
kind,
393+
tx_broadcaster,
394+
logger,
395+
onchain_wallet: Arc::new(Mutex::new(None)),
396+
event_queue: Arc::new(Mutex::new(None)),
397+
},
398+
best_block,
399+
)
363400
}
364401

365402
pub(crate) fn start(&self, runtime: Arc<Runtime>) -> Result<(), Error> {
@@ -468,14 +505,17 @@ impl ChainSource {
468505

469506
// Synchronize the onchain wallet via transaction-based protocols (Esplora, Electrum).
470507
// If event_queue is set, emits onchain events.
471-
pub(crate) async fn sync_onchain_wallet(&self, wallet: Arc<Wallet>) -> Result<(), Error> {
508+
pub(crate) async fn sync_onchain_wallet(
509+
&self, wallet: Arc<Wallet>, channel_manager: Option<Arc<ChannelManager>>,
510+
chain_monitor: Option<Arc<ChainMonitor>>,
511+
) -> Result<(), Error> {
472512
let event_queue = self.event_queue.lock().unwrap().clone();
473513
if let Some(event_queue) = event_queue {
474514
// Use event-emitting sync path
475515
self.sync_onchain_wallet_with_events(
476516
Some(&event_queue),
477-
None,
478-
None,
517+
channel_manager.as_ref(),
518+
chain_monitor.as_ref(),
479519
self.config(),
480520
)
481521
.await
@@ -584,8 +624,8 @@ impl ChainSource {
584624
// etc.) with event emission support.
585625
async fn sync_onchain_wallet_with_events(
586626
&self, event_queue: Option<&Arc<EventQueue<Arc<Logger>>>>,
587-
channel_manager: Option<&Arc<ChannelManager>>,
588-
chain_monitor: Option<&Arc<ChainMonitor>>, config: Option<&Arc<Config>>,
627+
channel_manager: Option<&Arc<ChannelManager>>, chain_monitor: Option<&Arc<ChainMonitor>>,
628+
config: Option<&Arc<Config>>,
589629
) -> Result<(), Error> {
590630
let wallet = self.onchain_wallet.lock().unwrap().clone();
591631
let wallet = wallet.ok_or(Error::WalletOperationFailed)?;
@@ -594,9 +634,10 @@ impl ChainSource {
594634
ChainSourceKind::Esplora(esplora_chain_source) => {
595635
// Track unconfirmed transactions before sync to detect evictions
596636
let prev_unconfirmed_txids = wallet.get_unconfirmed_txids();
597-
598-
let wallet_events = esplora_chain_source.sync_onchain_wallet(Arc::clone(&wallet)).await?;
599-
637+
638+
let wallet_events =
639+
esplora_chain_source.sync_onchain_wallet(Arc::clone(&wallet)).await?;
640+
600641
// Process wallet events if event queue is provided
601642
if let Some(event_queue) = event_queue {
602643
process_wallet_events(
@@ -606,33 +647,41 @@ impl ChainSource {
606647
&self.logger,
607648
channel_manager,
608649
chain_monitor,
609-
).await?;
650+
)
651+
.await?;
610652

611653
// Check for evicted transactions
612654
check_and_emit_evicted_transactions(
613655
prev_unconfirmed_txids,
614656
&wallet,
615657
event_queue,
616658
&self.logger,
617-
).await;
659+
)
660+
.await;
618661

619662
// Emit SyncCompleted event
620663
let synced_height = wallet.current_best_block().height;
621-
event_queue.add_event(Event::SyncCompleted {
622-
sync_type: SyncType::OnchainWallet,
623-
synced_block_height: synced_height,
624-
}).await?;
664+
event_queue
665+
.add_event(Event::SyncCompleted {
666+
sync_type: SyncType::OnchainWallet,
667+
synced_block_height: synced_height,
668+
})
669+
.await?;
625670
// Check for balance changes and emit BalanceChanged event if needed
626-
if let (Some(cm), Some(chain_mon), Some(cfg)) = (channel_manager, chain_monitor, config) {
627-
let cur_anchor_reserve_sats = crate::total_anchor_channels_reserve_sats(cm, cfg);
671+
if let (Some(cm), Some(chain_mon), Some(cfg)) =
672+
(channel_manager, chain_monitor, config)
673+
{
674+
let cur_anchor_reserve_sats =
675+
crate::total_anchor_channels_reserve_sats(cm, cfg);
628676
let (total_onchain_balance_sats, spendable_onchain_balance_sats) =
629677
wallet.get_balances(cur_anchor_reserve_sats).unwrap_or((0, 0));
630678

631679
let mut total_lightning_balance_sats = 0;
632680
for channel_id in chain_mon.list_monitors() {
633681
if let Ok(monitor) = chain_mon.get_monitor(channel_id) {
634682
for ldk_balance in monitor.get_claimable_balances() {
635-
total_lightning_balance_sats += ldk_balance.claimable_amount_satoshis();
683+
total_lightning_balance_sats +=
684+
ldk_balance.claimable_amount_satoshis();
636685
}
637686
}
638687
}
@@ -642,7 +691,7 @@ impl ChainSource {
642691
spendable_onchain_balance_sats,
643692
total_anchor_channels_reserve_sats: std::cmp::min(
644693
cur_anchor_reserve_sats,
645-
total_onchain_balance_sats
694+
total_onchain_balance_sats,
646695
),
647696
total_lightning_balance_sats,
648697
lightning_balances: Vec::new(),
@@ -666,17 +715,19 @@ impl ChainSource {
666715
event_queue,
667716
&kv_store,
668717
&self.logger,
669-
).await?;
718+
)
719+
.await?;
670720
}
671721
}
672722
Ok(())
673723
},
674724
ChainSourceKind::Electrum(electrum_chain_source) => {
675725
// Track unconfirmed transactions before sync to detect evictions
676726
let prev_unconfirmed_txids = wallet.get_unconfirmed_txids();
677-
678-
let wallet_events = electrum_chain_source.sync_onchain_wallet(Arc::clone(&wallet)).await?;
679-
727+
728+
let wallet_events =
729+
electrum_chain_source.sync_onchain_wallet(Arc::clone(&wallet)).await?;
730+
680731
// Process wallet events if event queue is provided
681732
if let Some(event_queue) = event_queue {
682733
process_wallet_events(
@@ -686,34 +737,42 @@ impl ChainSource {
686737
&self.logger,
687738
channel_manager,
688739
chain_monitor,
689-
).await?;
740+
)
741+
.await?;
690742

691743
// Check for evicted transactions
692744
check_and_emit_evicted_transactions(
693745
prev_unconfirmed_txids,
694746
&wallet,
695747
event_queue,
696748
&self.logger,
697-
).await;
749+
)
750+
.await;
698751

699752
// Emit SyncCompleted event
700753
let synced_height = wallet.current_best_block().height;
701-
event_queue.add_event(Event::SyncCompleted {
702-
sync_type: SyncType::OnchainWallet,
703-
synced_block_height: synced_height,
704-
}).await?;
754+
event_queue
755+
.add_event(Event::SyncCompleted {
756+
sync_type: SyncType::OnchainWallet,
757+
synced_block_height: synced_height,
758+
})
759+
.await?;
705760

706761
// Check for balance changes and emit BalanceChanged event if needed
707-
if let (Some(cm), Some(chain_mon), Some(cfg)) = (channel_manager, chain_monitor, config) {
708-
let cur_anchor_reserve_sats = crate::total_anchor_channels_reserve_sats(cm, cfg);
762+
if let (Some(cm), Some(chain_mon), Some(cfg)) =
763+
(channel_manager, chain_monitor, config)
764+
{
765+
let cur_anchor_reserve_sats =
766+
crate::total_anchor_channels_reserve_sats(cm, cfg);
709767
let (total_onchain_balance_sats, spendable_onchain_balance_sats) =
710768
wallet.get_balances(cur_anchor_reserve_sats).unwrap_or((0, 0));
711769

712770
let mut total_lightning_balance_sats = 0;
713771
for channel_id in chain_mon.list_monitors() {
714772
if let Ok(monitor) = chain_mon.get_monitor(channel_id) {
715773
for ldk_balance in monitor.get_claimable_balances() {
716-
total_lightning_balance_sats += ldk_balance.claimable_amount_satoshis();
774+
total_lightning_balance_sats +=
775+
ldk_balance.claimable_amount_satoshis();
717776
}
718777
}
719778
}
@@ -723,7 +782,7 @@ impl ChainSource {
723782
spendable_onchain_balance_sats,
724783
total_anchor_channels_reserve_sats: std::cmp::min(
725784
cur_anchor_reserve_sats,
726-
total_onchain_balance_sats
785+
total_onchain_balance_sats,
727786
),
728787
total_lightning_balance_sats,
729788
lightning_balances: Vec::new(),
@@ -747,7 +806,8 @@ impl ChainSource {
747806
event_queue,
748807
&kv_store,
749808
&self.logger,
750-
).await?;
809+
)
810+
.await?;
751811
}
752812
}
753813
Ok(())

0 commit comments

Comments
 (0)