Skip to content

Commit c38d010

Browse files
committed
Use BDK events in update_payment_store instead of scanning all transactions
Replace the full transaction list scan in `update_payment_store` with handling of BDK's `WalletEvent` stream during sync. This leverages the new events in BDK 2.2, reduces redundant work, and prepares the foundation for reliable RBF/CPFP tracking via `WalletEvent::TxReplaced`.
1 parent e4bb615 commit c38d010

File tree

1 file changed

+202
-67
lines changed

1 file changed

+202
-67
lines changed

src/wallet/mod.rs

Lines changed: 202 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::sync::{Arc, Mutex};
1313
use bdk_chain::spk_client::{FullScanRequest, SyncRequest};
1414
#[allow(deprecated)]
1515
use bdk_wallet::SignOptions;
16-
use bdk_wallet::{Balance, KeychainKind, PersistedWallet, Update};
16+
use bdk_wallet::{event::WalletEvent, Balance, KeychainKind, PersistedWallet, Update};
1717
use bitcoin::address::NetworkUnchecked;
1818
use bitcoin::blockdata::constants::WITNESS_SCALE_FACTOR;
1919
use bitcoin::blockdata::locktime::absolute::LockTime;
@@ -112,15 +112,16 @@ impl Wallet {
112112

113113
pub(crate) fn apply_update(&self, update: impl Into<Update>) -> Result<(), Error> {
114114
let mut locked_wallet = self.inner.lock().unwrap();
115-
match locked_wallet.apply_update(update) {
116-
Ok(()) => {
115+
match locked_wallet.apply_update_events(update) {
116+
Ok(events) => {
117117
let mut locked_persister = self.persister.lock().unwrap();
118118
locked_wallet.persist(&mut locked_persister).map_err(|e| {
119119
log_error!(self.logger, "Failed to persist wallet: {}", e);
120120
Error::PersistenceFailed
121121
})?;
122122

123-
self.update_payment_store(&mut *locked_wallet).map_err(|e| {
123+
let mut events_vec: Vec<WalletEvent> = events.into_iter().collect();
124+
self.update_payment_store(&mut *locked_wallet, Some(&events_vec)).map_err(|e| {
124125
log_error!(self.logger, "Failed to update payment store: {}", e);
125126
Error::PersistenceFailed
126127
})?;
@@ -152,75 +153,209 @@ impl Wallet {
152153

153154
fn update_payment_store<'a>(
154155
&self, locked_wallet: &'a mut PersistedWallet<KVStoreWalletPersister>,
156+
events: Option<&'a Vec<WalletEvent>>,
155157
) -> Result<(), Error> {
156-
for wtx in locked_wallet.transactions() {
157-
let id = PaymentId(wtx.tx_node.txid.to_byte_array());
158-
let txid = wtx.tx_node.txid;
159-
let (payment_status, confirmation_status) = match wtx.chain_position {
160-
bdk_chain::ChainPosition::Confirmed { anchor, .. } => {
161-
let confirmation_height = anchor.block_id.height;
162-
let cur_height = locked_wallet.latest_checkpoint().height();
163-
let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1
164-
{
165-
PaymentStatus::Succeeded
166-
} else {
167-
PaymentStatus::Pending
158+
if let Some(events) = events {
159+
if !events.is_empty() {
160+
for event in events.iter() {
161+
let (id, txid, tx, payment_status, confirmation_status) = match event {
162+
WalletEvent::TxConfirmed { txid, tx, block_time, .. } => {
163+
let cur_height = locked_wallet.latest_checkpoint().height();
164+
let confirmation_height = block_time.block_id.height;
165+
let payment_status =
166+
if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 {
167+
PaymentStatus::Succeeded
168+
} else {
169+
PaymentStatus::Pending
170+
};
171+
172+
(
173+
PaymentId(txid.to_byte_array()),
174+
*txid,
175+
tx.clone(),
176+
payment_status,
177+
ConfirmationStatus::Confirmed {
178+
block_hash: block_time.block_id.hash,
179+
height: confirmation_height,
180+
timestamp: block_time.confirmation_time,
181+
},
182+
)
183+
},
184+
WalletEvent::ChainTipChanged { new_tip, .. } => {
185+
// Get all payments that are Pending with Confirmed status
186+
let pending_payments: Vec<PaymentDetails> =
187+
self.payment_store.list_filter(|p| {
188+
p.status == PaymentStatus::Pending
189+
&& matches!(
190+
p.kind,
191+
crate::payment::PaymentKind::Onchain {
192+
status: ConfirmationStatus::Confirmed { .. },
193+
..
194+
}
195+
)
196+
});
197+
198+
for mut payment in pending_payments {
199+
if let crate::payment::PaymentKind::Onchain {
200+
status: ConfirmationStatus::Confirmed { height, .. },
201+
..
202+
} = payment.kind
203+
{
204+
if new_tip.height >= height + ANTI_REORG_DELAY - 1 {
205+
payment.status = PaymentStatus::Succeeded;
206+
self.payment_store.insert_or_update(payment)?;
207+
}
208+
}
209+
}
210+
continue;
211+
},
212+
WalletEvent::TxUnconfirmed { txid, tx, old_block_time: None } => (
213+
PaymentId(txid.to_byte_array()),
214+
*txid,
215+
tx.clone(),
216+
PaymentStatus::Pending,
217+
ConfirmationStatus::Unconfirmed,
218+
),
219+
WalletEvent::TxReplaced { txid, tx, conflicts } => {
220+
println!(
221+
"WalletEvent::TxReplaced: {}, conflicts: {:?}",
222+
txid, conflicts
223+
);
224+
continue;
225+
},
226+
WalletEvent::TxDropped { txid, tx } => {
227+
println!("WalletEvent::TxDropped: {}", txid);
228+
continue;
229+
},
230+
_ => {
231+
// unexpected event, do nothing
232+
continue;
233+
},
168234
};
169-
let confirmation_status = ConfirmationStatus::Confirmed {
170-
block_hash: anchor.block_id.hash,
171-
height: confirmation_height,
172-
timestamp: anchor.confirmation_time,
235+
236+
// TODO: It would be great to introduce additional variants for
237+
// `ChannelFunding` and `ChannelClosing`. For the former, we could just
238+
// take a reference to `ChannelManager` here and check against
239+
// `list_channels`. But for the latter the best approach is much less
240+
// clear: for force-closes/HTLC spends we should be good querying
241+
// `OutputSweeper::tracked_spendable_outputs`, but regular channel closes
242+
// (i.e., `SpendableOutputDescriptor::StaticOutput` variants) are directly
243+
// spent to a wallet address. The only solution I can come up with is to
244+
// create and persist a list of 'static pending outputs' that we could use
245+
// here to determine the `PaymentKind`, but that's not really satisfactory, so
246+
// we're punting on it until we can come up with a better solution.
247+
let kind =
248+
crate::payment::PaymentKind::Onchain { txid, status: confirmation_status };
249+
let fee = locked_wallet.calculate_fee(&tx).unwrap_or(Amount::ZERO);
250+
let (sent, received) = locked_wallet.sent_and_received(&tx);
251+
let (direction, amount_msat) = if sent > received {
252+
let direction = PaymentDirection::Outbound;
253+
let amount_msat = Some(
254+
sent.to_sat()
255+
.saturating_sub(fee.to_sat())
256+
.saturating_sub(received.to_sat())
257+
* 1000,
258+
);
259+
(direction, amount_msat)
260+
} else {
261+
let direction = PaymentDirection::Inbound;
262+
let amount_msat = Some(
263+
received
264+
.to_sat()
265+
.saturating_sub(sent.to_sat().saturating_sub(fee.to_sat()))
266+
* 1000,
267+
);
268+
(direction, amount_msat)
173269
};
174-
(payment_status, confirmation_status)
175-
},
176-
bdk_chain::ChainPosition::Unconfirmed { .. } => {
177-
(PaymentStatus::Pending, ConfirmationStatus::Unconfirmed)
178-
},
179-
};
180-
// TODO: It would be great to introduce additional variants for
181-
// `ChannelFunding` and `ChannelClosing`. For the former, we could just
182-
// take a reference to `ChannelManager` here and check against
183-
// `list_channels`. But for the latter the best approach is much less
184-
// clear: for force-closes/HTLC spends we should be good querying
185-
// `OutputSweeper::tracked_spendable_outputs`, but regular channel closes
186-
// (i.e., `SpendableOutputDescriptor::StaticOutput` variants) are directly
187-
// spent to a wallet address. The only solution I can come up with is to
188-
// create and persist a list of 'static pending outputs' that we could use
189-
// here to determine the `PaymentKind`, but that's not really satisfactory, so
190-
// we're punting on it until we can come up with a better solution.
191-
let kind = crate::payment::PaymentKind::Onchain { txid, status: confirmation_status };
192-
let fee = locked_wallet.calculate_fee(&wtx.tx_node.tx).unwrap_or(Amount::ZERO);
193-
let (sent, received) = locked_wallet.sent_and_received(&wtx.tx_node.tx);
194-
let (direction, amount_msat) = if sent > received {
195-
let direction = PaymentDirection::Outbound;
196-
let amount_msat = Some(
197-
sent.to_sat().saturating_sub(fee.to_sat()).saturating_sub(received.to_sat())
198-
* 1000,
199-
);
200-
(direction, amount_msat)
201-
} else {
202-
let direction = PaymentDirection::Inbound;
203-
let amount_msat = Some(
204-
received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee.to_sat()))
205-
* 1000,
206-
);
207-
(direction, amount_msat)
208-
};
209270

210-
let fee_paid_msat = Some(fee.to_sat() * 1000);
271+
let fee_paid_msat = Some(fee.to_sat() * 1000);
211272

212-
let payment = PaymentDetails::new(
213-
id,
214-
kind,
215-
amount_msat,
216-
fee_paid_msat,
217-
direction,
218-
payment_status,
219-
);
273+
let payment = PaymentDetails::new(
274+
id,
275+
kind,
276+
amount_msat,
277+
fee_paid_msat,
278+
direction,
279+
payment_status,
280+
);
220281

221-
self.payment_store.insert_or_update(payment)?;
222-
}
282+
self.payment_store.insert_or_update(payment)?;
283+
}
284+
}
285+
} else {
286+
for wtx in locked_wallet.transactions() {
287+
let id = PaymentId(wtx.tx_node.txid.to_byte_array());
288+
let txid = wtx.tx_node.txid;
289+
let (payment_status, confirmation_status) = match wtx.chain_position {
290+
bdk_chain::ChainPosition::Confirmed { anchor, .. } => {
291+
let confirmation_height = anchor.block_id.height;
292+
let cur_height = locked_wallet.latest_checkpoint().height();
293+
let payment_status =
294+
if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 {
295+
PaymentStatus::Succeeded
296+
} else {
297+
PaymentStatus::Pending
298+
};
299+
let confirmation_status = ConfirmationStatus::Confirmed {
300+
block_hash: anchor.block_id.hash,
301+
height: confirmation_height,
302+
timestamp: anchor.confirmation_time,
303+
};
304+
(payment_status, confirmation_status)
305+
},
306+
bdk_chain::ChainPosition::Unconfirmed { .. } => {
307+
(PaymentStatus::Pending, ConfirmationStatus::Unconfirmed)
308+
},
309+
};
310+
// TODO: It would be great to introduce additional variants for
311+
// `ChannelFunding` and `ChannelClosing`. For the former, we could just
312+
// take a reference to `ChannelManager` here and check against
313+
// `list_channels`. But for the latter the best approach is much less
314+
// clear: for force-closes/HTLC spends we should be good querying
315+
// `OutputSweeper::tracked_spendable_outputs`, but regular channel closes
316+
// (i.e., `SpendableOutputDescriptor::StaticOutput` variants) are directly
317+
// spent to a wallet address. The only solution I can come up with is to
318+
// create and persist a list of 'static pending outputs' that we could use
319+
// here to determine the `PaymentKind`, but that's not really satisfactory, so
320+
// we're punting on it until we can come up with a better solution.
321+
let kind =
322+
crate::payment::PaymentKind::Onchain { txid, status: confirmation_status };
323+
let fee = locked_wallet.calculate_fee(&wtx.tx_node.tx).unwrap_or(Amount::ZERO);
324+
let (sent, received) = locked_wallet.sent_and_received(&wtx.tx_node.tx);
325+
let (direction, amount_msat) = if sent > received {
326+
let direction = PaymentDirection::Outbound;
327+
let amount_msat = Some(
328+
sent.to_sat()
329+
.saturating_sub(fee.to_sat())
330+
.saturating_sub(received.to_sat())
331+
* 1000,
332+
);
333+
(direction, amount_msat)
334+
} else {
335+
let direction = PaymentDirection::Inbound;
336+
let amount_msat = Some(
337+
received
338+
.to_sat()
339+
.saturating_sub(sent.to_sat().saturating_sub(fee.to_sat()))
340+
* 1000,
341+
);
342+
(direction, amount_msat)
343+
};
344+
345+
let fee_paid_msat = Some(fee.to_sat() * 1000);
346+
347+
let payment = PaymentDetails::new(
348+
id,
349+
kind,
350+
amount_msat,
351+
fee_paid_msat,
352+
direction,
353+
payment_status,
354+
);
223355

356+
self.payment_store.insert_or_update(payment)?;
357+
}
358+
}
224359
Ok(())
225360
}
226361

@@ -723,7 +858,7 @@ impl Listen for Wallet {
723858

724859
match locked_wallet.apply_block(block, height) {
725860
Ok(()) => {
726-
if let Err(e) = self.update_payment_store(&mut *locked_wallet) {
861+
if let Err(e) = self.update_payment_store(&mut *locked_wallet, None) {
727862
log_error!(self.logger, "Failed to update payment store: {}", e);
728863
return;
729864
}

0 commit comments

Comments
 (0)