Skip to content

Commit c9015c3

Browse files
committed
Add RBF replacement tracking with new persisted lookup table
Introduce a new lookup `ReplacedTransactionStore` that maps old/replaced transaction IDs to their current replacement transaction IDs, enabling reliable tracking of replaced transactions throughout the replacement chain. Key changes: - Add persisted storage for RBF replacement relationships - Link transactions in replacement trees using payment IDs - Remove entire replacement chains from persistence when any transaction in the tree is confirmed
1 parent a3dded1 commit c9015c3

File tree

9 files changed

+299
-109
lines changed

9 files changed

+299
-109
lines changed

bindings/ldk_node.udl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ interface ClosureReason {
447447

448448
[Enum]
449449
interface PaymentKind {
450-
Onchain(Txid txid, ConfirmationStatus status, sequence<Txid> conflicting_txids);
450+
Onchain(Txid txid, ConfirmationStatus status);
451451
Bolt11(PaymentHash hash, PaymentPreimage? preimage, PaymentSecret? secret);
452452
Bolt11Jit(PaymentHash hash, PaymentPreimage? preimage, PaymentSecret? secret, u64? counterparty_skimmed_fee_msat, LSPFeeLimits lsp_fee_limits);
453453
Bolt12Offer(PaymentHash? hash, PaymentPreimage? preimage, PaymentSecret? secret, OfferId offer_id, UntrustedString? payer_note, u64? quantity);

src/builder.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ use crate::gossip::GossipSource;
5555
use crate::io::sqlite_store::SqliteStore;
5656
use crate::io::utils::{
5757
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
58-
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer,
59-
write_node_metrics,
58+
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
59+
read_scorer, write_node_metrics,
6060
};
6161
use crate::io::vss_store::VssStoreBuilder;
6262
use crate::io::{
6363
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
64+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
65+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
6466
};
6567
use crate::liquidity::{
6668
LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder,
@@ -73,7 +75,8 @@ use crate::runtime::Runtime;
7375
use crate::tx_broadcaster::TransactionBroadcaster;
7476
use crate::types::{
7577
ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager,
76-
MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore,
78+
MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, Persister,
79+
SyncAndAsyncKVStore,
7780
};
7881
use crate::wallet::persist::KVStoreWalletPersister;
7982
use crate::wallet::Wallet;
@@ -1235,6 +1238,22 @@ fn build_with_store_internal(
12351238
},
12361239
};
12371240

1241+
let pending_payment_store = match runtime
1242+
.block_on(async { read_pending_payments(&*kv_store, Arc::clone(&logger)).await })
1243+
{
1244+
Ok(pending_payments) => Arc::new(PendingPaymentStore::new(
1245+
pending_payments,
1246+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1247+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1248+
Arc::clone(&kv_store),
1249+
Arc::clone(&logger),
1250+
)),
1251+
Err(e) => {
1252+
log_error!(logger, "Failed to read pending payment data from store: {}", e);
1253+
return Err(BuildError::ReadFailed);
1254+
},
1255+
};
1256+
12381257
let wallet = Arc::new(Wallet::new(
12391258
bdk_wallet,
12401259
wallet_persister,
@@ -1243,6 +1262,7 @@ fn build_with_store_internal(
12431262
Arc::clone(&payment_store),
12441263
Arc::clone(&config),
12451264
Arc::clone(&logger),
1265+
Arc::clone(&pending_payment_store),
12461266
));
12471267

12481268
// Initialize the KeysManager

src/io/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,7 @@ pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer";
7878
///
7979
/// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice
8080
pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices";
81+
82+
/// The pending payment information will be persisted under this prefix.
83+
pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments";
84+
pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

src/io/utils.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::io::{
4444
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
4545
};
4646
use crate::logger::{log_error, LdkLogger, Logger};
47+
use crate::payment::PendingPaymentDetails;
4748
use crate::peer_store::PeerStore;
4849
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
4950
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
@@ -620,6 +621,83 @@ pub(crate) fn read_bdk_wallet_change_set(
620621
Ok(Some(change_set))
621622
}
622623

624+
/// Read previously persisted pending payments information from the store.
625+
pub(crate) async fn read_pending_payments<L: Deref>(
626+
kv_store: &DynStore, logger: L,
627+
) -> Result<Vec<PendingPaymentDetails>, std::io::Error>
628+
where
629+
L::Target: LdkLogger,
630+
{
631+
let mut res = Vec::new();
632+
633+
let mut stored_keys = KVStore::list(
634+
&*kv_store,
635+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
636+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
637+
)
638+
.await?;
639+
640+
const BATCH_SIZE: usize = 50;
641+
642+
let mut set = tokio::task::JoinSet::new();
643+
644+
// Fill JoinSet with tasks if possible
645+
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
646+
if let Some(next_key) = stored_keys.pop() {
647+
let fut = KVStore::read(
648+
&*kv_store,
649+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
650+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
651+
&next_key,
652+
);
653+
set.spawn(fut);
654+
debug_assert!(set.len() <= BATCH_SIZE);
655+
}
656+
}
657+
658+
while let Some(read_res) = set.join_next().await {
659+
// Exit early if we get an IO error.
660+
let reader = read_res
661+
.map_err(|e| {
662+
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
663+
set.abort_all();
664+
e
665+
})?
666+
.map_err(|e| {
667+
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
668+
set.abort_all();
669+
e
670+
})?;
671+
672+
// Refill set for every finished future, if we still have something to do.
673+
if let Some(next_key) = stored_keys.pop() {
674+
let fut = KVStore::read(
675+
&*kv_store,
676+
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
677+
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
678+
&next_key,
679+
);
680+
set.spawn(fut);
681+
debug_assert!(set.len() <= BATCH_SIZE);
682+
}
683+
684+
// Handle result.
685+
let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| {
686+
log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e);
687+
std::io::Error::new(
688+
std::io::ErrorKind::InvalidData,
689+
"Failed to deserialize PendingPaymentDetails",
690+
)
691+
})?;
692+
res.push(pending_payment);
693+
}
694+
695+
debug_assert!(set.is_empty());
696+
debug_assert!(stored_keys.is_empty());
697+
698+
Ok(res)
699+
}
700+
623701
#[cfg(test)]
624702
mod tests {
625703
use super::read_or_generate_seed_file;

src/payment/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ pub(crate) mod asynchronous;
1111
mod bolt11;
1212
mod bolt12;
1313
mod onchain;
14+
pub(crate) mod pending_payment_store;
1415
mod spontaneous;
1516
pub(crate) mod store;
1617
mod unified;
1718

1819
pub use bolt11::Bolt11Payment;
1920
pub use bolt12::Bolt12Payment;
2021
pub use onchain::OnchainPayment;
22+
pub use pending_payment_store::PendingPaymentDetails;
2123
pub use spontaneous::SpontaneousPayment;
2224
pub use store::{
2325
ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus,
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
use bitcoin::Txid;
9+
use lightning::{impl_writeable_tlv_based, ln::channelmanager::PaymentId};
10+
11+
use crate::{
12+
data_store::{StorableObject, StorableObjectUpdate},
13+
payment::{store::PaymentDetailsUpdate, PaymentDetails},
14+
};
15+
16+
/// Represents a pending payment
17+
#[derive(Clone, Debug, PartialEq, Eq)]
18+
pub struct PendingPaymentDetails {
19+
/// The full payment details
20+
pub details: PaymentDetails,
21+
/// Cached timestamp for efficient cleanup queries
22+
pub created_at: u64,
23+
/// Transaction IDs that have replaced or conflict with this payment.
24+
pub conflicting_txids: Vec<Txid>,
25+
}
26+
27+
impl PendingPaymentDetails {
28+
pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec<Txid>) -> Self {
29+
Self { created_at: details.latest_update_timestamp, details, conflicting_txids }
30+
}
31+
32+
/// Convert to finalized payment for the main payment store
33+
pub fn into_payment_details(self) -> PaymentDetails {
34+
self.details
35+
}
36+
}
37+
38+
impl_writeable_tlv_based!(PendingPaymentDetails, {
39+
(0, details, required),
40+
(2, created_at, required),
41+
(4, conflicting_txids, optional_vec),
42+
});
43+
44+
#[derive(Clone, Debug, PartialEq, Eq)]
45+
pub(crate) struct PendingPaymentDetailsUpdate {
46+
pub id: PaymentId,
47+
pub payment_update: Option<PaymentDetailsUpdate>,
48+
pub conflicting_txids: Option<Vec<Txid>>,
49+
}
50+
51+
impl StorableObject for PendingPaymentDetails {
52+
type Id = PaymentId;
53+
type Update = PendingPaymentDetailsUpdate;
54+
55+
fn id(&self) -> Self::Id {
56+
self.details.id
57+
}
58+
59+
fn update(&mut self, update: &Self::Update) -> bool {
60+
let mut updated = false;
61+
62+
// Update the underlying payment details if present
63+
if let Some(payment_update) = &update.payment_update {
64+
updated |= self.details.update(payment_update);
65+
}
66+
67+
if let Some(new_conflicting_txids) = &update.conflicting_txids {
68+
if &self.conflicting_txids != new_conflicting_txids {
69+
self.conflicting_txids = new_conflicting_txids.clone();
70+
updated = true;
71+
}
72+
}
73+
74+
updated
75+
}
76+
77+
fn to_update(&self) -> Self::Update {
78+
self.into()
79+
}
80+
}
81+
82+
impl StorableObjectUpdate<PendingPaymentDetails> for PendingPaymentDetailsUpdate {
83+
fn id(&self) -> <PendingPaymentDetails as StorableObject>::Id {
84+
self.id
85+
}
86+
}
87+
88+
impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate {
89+
fn from(value: &PendingPaymentDetails) -> Self {
90+
Self {
91+
id: value.id(),
92+
payment_update: Some(value.details.to_update()),
93+
conflicting_txids: Some(value.conflicting_txids.clone()),
94+
}
95+
}
96+
}

src/payment/store.rs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -291,15 +291,6 @@ impl StorableObject for PaymentDetails {
291291
}
292292
}
293293

294-
if let Some(conflicting_txids_opt) = &update.conflicting_txids {
295-
match self.kind {
296-
PaymentKind::Onchain { ref mut conflicting_txids, .. } => {
297-
update_if_necessary!(*conflicting_txids, conflicting_txids_opt.to_vec());
298-
},
299-
_ => {},
300-
}
301-
}
302-
303294
if updated {
304295
self.latest_update_timestamp = SystemTime::now()
305296
.duration_since(UNIX_EPOCH)
@@ -360,8 +351,6 @@ pub enum PaymentKind {
360351
txid: Txid,
361352
/// The confirmation status of this payment.
362353
status: ConfirmationStatus,
363-
/// Transaction IDs that have replaced or conflict with this payment.
364-
conflicting_txids: Vec<Txid>,
365354
},
366355
/// A [BOLT 11] payment.
367356
///
@@ -459,7 +448,6 @@ pub enum PaymentKind {
459448
impl_writeable_tlv_based_enum!(PaymentKind,
460449
(0, Onchain) => {
461450
(0, txid, required),
462-
(1, conflicting_txids, optional_vec),
463451
(2, status, required),
464452
},
465453
(2, Bolt11) => {
@@ -552,7 +540,6 @@ pub(crate) struct PaymentDetailsUpdate {
552540
pub direction: Option<PaymentDirection>,
553541
pub status: Option<PaymentStatus>,
554542
pub confirmation_status: Option<ConfirmationStatus>,
555-
pub conflicting_txids: Option<Vec<Txid>>,
556543
}
557544

558545
impl PaymentDetailsUpdate {
@@ -568,7 +555,6 @@ impl PaymentDetailsUpdate {
568555
direction: None,
569556
status: None,
570557
confirmation_status: None,
571-
conflicting_txids: None,
572558
}
573559
}
574560
}
@@ -584,11 +570,9 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate {
584570
_ => (None, None, None),
585571
};
586572

587-
let (confirmation_status, conflicting_txids) = match &value.kind {
588-
PaymentKind::Onchain { status, conflicting_txids, .. } => {
589-
(Some(*status), conflicting_txids.clone())
590-
},
591-
_ => (None, Vec::new()),
573+
let confirmation_status = match &value.kind {
574+
PaymentKind::Onchain { status, .. } => Some(*status),
575+
_ => None,
592576
};
593577

594578
let counterparty_skimmed_fee_msat = match value.kind {
@@ -609,7 +593,6 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate {
609593
direction: Some(value.direction),
610594
status: Some(value.status),
611595
confirmation_status,
612-
conflicting_txids: Some(conflicting_txids),
613596
}
614597
}
615598
}

src/types.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::fee_estimator::OnchainFeeEstimator;
3838
use crate::gossip::RuntimeSpawner;
3939
use crate::logger::Logger;
4040
use crate::message_handler::NodeCustomMessageHandler;
41-
use crate::payment::PaymentDetails;
41+
use crate::payment::{PaymentDetails, PendingPaymentDetails};
4242

4343
/// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the
4444
/// same time.
@@ -609,3 +609,5 @@ impl From<&(u64, Vec<u8>)> for CustomTlvRecord {
609609
CustomTlvRecord { type_num: tlv.0, value: tlv.1.clone() }
610610
}
611611
}
612+
613+
pub(crate) type PendingPaymentStore = DataStore<PendingPaymentDetails, Arc<Logger>>;

0 commit comments

Comments
 (0)