Skip to content

Commit e13e503

Browse files
committed
Add RBF replacement tracking with new persisted lookup table
Introduce a new lookup `ReplacedTransactionStore` that maps old/replaced transaction IDs to their current replacement transaction IDs, enabling reliable tracking of replaced transactions throughout the replacement chain. Key changes: - Add persisted storage for RBF replacement relationships - Link transactions in replacement trees using payment IDs - Remove entire replacement chains from persistence when any transaction in the tree is confirmed
1 parent 9eb5584 commit e13e503

File tree

7 files changed

+185
-54
lines changed

7 files changed

+185
-54
lines changed

src/builder.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use crate::io::utils::{
5959
use crate::io::vss_store::VssStoreBuilder;
6060
use crate::io::{
6161
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
62+
REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE, REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE,
6263
};
6364
use crate::liquidity::{
6465
LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder,
@@ -71,7 +72,8 @@ use crate::runtime::Runtime;
7172
use crate::tx_broadcaster::TransactionBroadcaster;
7273
use crate::types::{
7374
ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager,
74-
MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore,
75+
MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, ReplacedTransactionStore,
76+
SyncAndAsyncKVStore,
7577
};
7678
use crate::wallet::persist::KVStoreWalletPersister;
7779
use crate::wallet::Wallet;
@@ -1230,6 +1232,21 @@ fn build_with_store_internal(
12301232
},
12311233
};
12321234

1235+
let replaced_tx_store =
1236+
match io::utils::read_replaced_txs(Arc::clone(&kv_store), Arc::clone(&logger)) {
1237+
Ok(replaced_txs) => Arc::new(ReplacedTransactionStore::new(
1238+
replaced_txs,
1239+
REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1240+
REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1241+
Arc::clone(&kv_store),
1242+
Arc::clone(&logger),
1243+
)),
1244+
Err(e) => {
1245+
log_error!(logger, "Failed to read replaced transaction data from store: {}", e);
1246+
return Err(BuildError::ReadFailed);
1247+
},
1248+
};
1249+
12331250
let wallet = Arc::new(Wallet::new(
12341251
bdk_wallet,
12351252
wallet_persister,
@@ -1238,6 +1255,7 @@ fn build_with_store_internal(
12381255
Arc::clone(&payment_store),
12391256
Arc::clone(&config),
12401257
Arc::clone(&logger),
1258+
Arc::clone(&replaced_tx_store),
12411259
));
12421260

12431261
// Initialize the KeysManager

src/io/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,7 @@ pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer";
7878
///
7979
/// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice
8080
pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices";
81+
82+
/// The replaced transaction information will be persisted under this prefix.
83+
pub(crate) const REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE: &str = "replaced_txs";
84+
pub(crate) const REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

src/io/utils.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::io::{
4545
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
4646
};
4747
use crate::logger::{log_error, LdkLogger, Logger};
48+
use crate::payment::ReplacedOnchainTransactionDetails;
4849
use crate::peer_store::PeerStore;
4950
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
5051
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
@@ -576,6 +577,38 @@ pub(crate) fn read_bdk_wallet_change_set(
576577
Ok(Some(change_set))
577578
}
578579

580+
/// Read previously persisted replaced transaction information from the store.
581+
pub(crate) fn read_replaced_txs<L: Deref>(
582+
kv_store: Arc<DynStore>, logger: L,
583+
) -> Result<Vec<ReplacedOnchainTransactionDetails>, std::io::Error>
584+
where
585+
L::Target: LdkLogger,
586+
{
587+
let mut res = Vec::new();
588+
589+
for stored_key in KVStoreSync::list(
590+
&*kv_store,
591+
REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE,
592+
REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE,
593+
)? {
594+
let mut reader = Cursor::new(KVStoreSync::read(
595+
&*kv_store,
596+
REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE,
597+
REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE,
598+
&stored_key,
599+
)?);
600+
let payment = ReplacedOnchainTransactionDetails::read(&mut reader).map_err(|e| {
601+
log_error!(logger, "Failed to deserialize ReplacedOnchainTransactionDetails: {}", e);
602+
std::io::Error::new(
603+
std::io::ErrorKind::InvalidData,
604+
"Failed to deserialize ReplacedOnchainTransactionDetails",
605+
)
606+
})?;
607+
res.push(payment);
608+
}
609+
Ok(res)
610+
}
611+
579612
#[cfg(test)]
580613
mod tests {
581614
use super::read_or_generate_seed_file;

src/payment/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ pub(crate) mod asynchronous;
1111
mod bolt11;
1212
mod bolt12;
1313
mod onchain;
14+
mod replaced_transaction_store;
1415
mod spontaneous;
1516
pub(crate) mod store;
1617
mod unified_qr;
1718

1819
pub use bolt11::Bolt11Payment;
1920
pub use bolt12::Bolt12Payment;
2021
pub use onchain::OnchainPayment;
22+
pub use replaced_transaction_store::ReplacedOnchainTransactionDetails;
2123
pub use spontaneous::SpontaneousPayment;
2224
pub use store::{
2325
ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus,
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
use bitcoin::Txid;
9+
use lightning::impl_writeable_tlv_based;
10+
use lightning::ln::channelmanager::PaymentId;
11+
12+
use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate};
13+
14+
/// Details of an on-chain transaction that has replaced a previous transaction (e.g., via RBF).
15+
#[derive(Clone, Debug, PartialEq, Eq)]
16+
pub struct ReplacedOnchainTransactionDetails {
17+
/// The new transaction ID.
18+
pub new_txid: Txid,
19+
/// The original transaction ID that was replaced.
20+
pub original_txid: Txid,
21+
/// The payment ID associated with the transaction.
22+
pub payment_id: PaymentId,
23+
}
24+
25+
impl ReplacedOnchainTransactionDetails {
26+
pub(crate) fn new(new_txid: Txid, original_txid: Txid, payment_id: PaymentId) -> Self {
27+
Self { new_txid, original_txid, payment_id }
28+
}
29+
}
30+
31+
impl_writeable_tlv_based!(ReplacedOnchainTransactionDetails,{
32+
(0, new_txid, required),
33+
(2, original_txid, required),
34+
(4, payment_id, required),
35+
});
36+
37+
impl StorableObjectId for Txid {
38+
fn encode_to_hex_str(&self) -> String {
39+
self.to_string()
40+
}
41+
}
42+
impl StorableObject for ReplacedOnchainTransactionDetails {
43+
type Id = Txid;
44+
type Update = ReplacedOnchainTransactionDetailsUpdate;
45+
46+
fn id(&self) -> Self::Id {
47+
self.new_txid
48+
}
49+
50+
fn update(&mut self, _update: &Self::Update) -> bool {
51+
// We don't update, we delete on confirmation
52+
false
53+
}
54+
55+
fn to_update(&self) -> Self::Update {
56+
self.into()
57+
}
58+
}
59+
60+
#[derive(Clone, Debug, PartialEq, Eq)]
61+
pub(crate) struct ReplacedOnchainTransactionDetailsUpdate {
62+
pub id: Txid,
63+
}
64+
65+
impl From<&ReplacedOnchainTransactionDetails> for ReplacedOnchainTransactionDetailsUpdate {
66+
fn from(value: &ReplacedOnchainTransactionDetails) -> Self {
67+
Self { id: value.new_txid }
68+
}
69+
}
70+
71+
impl StorableObjectUpdate<ReplacedOnchainTransactionDetails>
72+
for ReplacedOnchainTransactionDetailsUpdate
73+
{
74+
fn id(&self) -> <ReplacedOnchainTransactionDetails as StorableObject>::Id {
75+
self.id
76+
}
77+
}

src/types.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::fee_estimator::OnchainFeeEstimator;
3737
use crate::gossip::RuntimeSpawner;
3838
use crate::logger::Logger;
3939
use crate::message_handler::NodeCustomMessageHandler;
40-
use crate::payment::PaymentDetails;
40+
use crate::payment::{PaymentDetails, ReplacedOnchainTransactionDetails};
4141

4242
/// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the
4343
/// same time.
@@ -596,3 +596,6 @@ impl From<&(u64, Vec<u8>)> for CustomTlvRecord {
596596
CustomTlvRecord { type_num: tlv.0, value: tlv.1.clone() }
597597
}
598598
}
599+
600+
pub(crate) type ReplacedTransactionStore =
601+
DataStore<ReplacedOnchainTransactionDetails, Arc<Logger>>;

src/wallet/mod.rs

Lines changed: 46 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ use crate::config::Config;
5050
use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator};
5151
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
5252
use crate::payment::store::ConfirmationStatus;
53-
use crate::payment::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus};
54-
use crate::types::{Broadcaster, PaymentStore};
53+
use crate::payment::{
54+
PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, ReplacedOnchainTransactionDetails,
55+
};
56+
use crate::types::{Broadcaster, PaymentStore, ReplacedTransactionStore};
5557
use crate::Error;
5658

5759
pub(crate) enum OnchainSendAmount {
@@ -72,18 +74,28 @@ pub(crate) struct Wallet {
7274
payment_store: Arc<PaymentStore>,
7375
config: Arc<Config>,
7476
logger: Arc<Logger>,
77+
replaced_tx_store: Arc<ReplacedTransactionStore>,
7578
}
7679

7780
impl Wallet {
7881
pub(crate) fn new(
7982
wallet: bdk_wallet::PersistedWallet<KVStoreWalletPersister>,
8083
wallet_persister: KVStoreWalletPersister, broadcaster: Arc<Broadcaster>,
8184
fee_estimator: Arc<OnchainFeeEstimator>, payment_store: Arc<PaymentStore>,
82-
config: Arc<Config>, logger: Arc<Logger>,
85+
config: Arc<Config>, logger: Arc<Logger>, replaced_tx_store: Arc<ReplacedTransactionStore>,
8386
) -> Self {
8487
let inner = Mutex::new(wallet);
8588
let persister = Mutex::new(wallet_persister);
86-
Self { inner, persister, broadcaster, fee_estimator, payment_store, config, logger }
89+
Self {
90+
inner,
91+
persister,
92+
broadcaster,
93+
fee_estimator,
94+
payment_store,
95+
config,
96+
logger,
97+
replaced_tx_store,
98+
}
8799
}
88100

89101
pub(crate) fn get_full_scan_request(&self) -> FullScanRequest<KeychainKind> {
@@ -225,9 +237,21 @@ impl Wallet {
225237
..
226238
} = payment.kind
227239
{
240+
let payment_id = payment.id;
228241
if new_tip.height >= height + ANTI_REORG_DELAY - 1 {
229242
payment.status = PaymentStatus::Succeeded;
230243
self.payment_store.insert_or_update(payment)?;
244+
245+
// Remove any replaced transactions associated with this payment
246+
let replaced_txids = self
247+
.replaced_tx_store
248+
.list_filter(|r| r.payment_id == payment_id)
249+
.iter()
250+
.map(|p| p.new_txid)
251+
.collect::<Vec<Txid>>();
252+
for replaced_txid in replaced_txids {
253+
self.replaced_tx_store.remove(&replaced_txid)?;
254+
}
231255
}
232256
}
233257
}
@@ -248,47 +272,21 @@ impl Wallet {
248272
);
249273
self.payment_store.insert_or_update(payment)?;
250274
},
251-
WalletEvent::TxReplaced { txid, tx, conflicts } => {
275+
WalletEvent::TxReplaced { txid, conflicts, .. } => {
252276
let payment_id = self
253277
.find_payment_by_txid(txid)
254278
.unwrap_or_else(|| PaymentId(txid.to_byte_array()));
255279

256-
if let Some(mut payment) = self.payment_store.get(&payment_id) {
257-
if let PaymentKind::Onchain {
258-
ref mut conflicting_txids,
259-
txid: current_txid,
260-
..
261-
} = payment.kind
262-
{
263-
let existing_set: std::collections::HashSet<_> =
264-
conflicting_txids.iter().collect();
265-
266-
let new_conflicts: Vec<_> = conflicts
267-
.iter()
268-
.map(|(_, conflict_txid)| *conflict_txid)
269-
.filter(|conflict_txid| {
270-
*conflict_txid != current_txid
271-
&& !existing_set.contains(conflict_txid)
272-
})
273-
.collect();
274-
275-
conflicting_txids.extend(new_conflicts);
276-
}
277-
self.payment_store.insert_or_update(payment)?;
278-
} else {
279-
let conflicting_txids =
280-
Some(conflicts.iter().map(|(_, txid)| *txid).collect());
281-
282-
let payment = self.create_payment_from_tx(
283-
locked_wallet,
284-
txid,
285-
payment_id,
286-
&tx,
287-
PaymentStatus::Pending,
288-
ConfirmationStatus::Unconfirmed,
289-
conflicting_txids,
290-
);
291-
self.payment_store.insert_or_update(payment)?;
280+
// Collect all conflict txids
281+
let conflict_txids: Vec<Txid> =
282+
conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect();
283+
284+
for conflict_txid in conflict_txids {
285+
// Update the replaced transaction store
286+
let replaced_tx_details =
287+
ReplacedOnchainTransactionDetails::new(conflict_txid, txid, payment_id);
288+
289+
self.replaced_tx_store.insert_or_update(replaced_tx_details)?;
292290
}
293291
},
294292
WalletEvent::TxDropped { txid, tx } => {
@@ -954,20 +952,16 @@ impl Wallet {
954952

955953
fn find_payment_by_txid(&self, target_txid: Txid) -> Option<PaymentId> {
956954
let direct_payment_id = PaymentId(target_txid.to_byte_array());
957-
if self.payment_store.get(&direct_payment_id).is_some() {
955+
if self.payment_store.contains_key(&direct_payment_id) {
958956
return Some(direct_payment_id);
959957
}
960958

961-
self.payment_store
962-
.list_filter(|p| {
963-
if let PaymentKind::Onchain { txid, conflicting_txids, .. } = &p.kind {
964-
*txid == target_txid || conflicting_txids.contains(&target_txid)
965-
} else {
966-
false
967-
}
968-
})
969-
.first()
970-
.map(|p| p.id)
959+
// Check if this txid is a replaced transaction
960+
if let Some(replaced_details) = self.replaced_tx_store.get(&target_txid) {
961+
return Some(replaced_details.payment_id);
962+
}
963+
964+
None
971965
}
972966
}
973967

0 commit comments

Comments
 (0)