Skip to content

Commit e5e120c

Browse files
committed
Add RBF replacement tracking with new persisted lookup table
Introduce a new lookup `ReplacedTransactionStore` that maps old/replaced transaction IDs to their current replacement transaction IDs, enabling reliable tracking of replaced transactions throughout the replacement chain. Key changes: - Add persisted storage for RBF replacement relationships - Link transactions in replacement trees using payment IDs - Remove entire replacement chains from persistence when any transaction in the tree is confirmed
1 parent a3dded1 commit e5e120c

File tree

7 files changed

+257
-73
lines changed

7 files changed

+257
-73
lines changed

src/builder.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,13 @@ use crate::gossip::GossipSource;
5555
use crate::io::sqlite_store::SqliteStore;
5656
use crate::io::utils::{
5757
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
58-
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer,
59-
write_node_metrics,
58+
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_replaced_txs,
59+
read_scorer, write_node_metrics,
6060
};
6161
use crate::io::vss_store::VssStoreBuilder;
6262
use crate::io::{
6363
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
64+
REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE, REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE,
6465
};
6566
use crate::liquidity::{
6667
LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder,
@@ -73,7 +74,8 @@ use crate::runtime::Runtime;
7374
use crate::tx_broadcaster::TransactionBroadcaster;
7475
use crate::types::{
7576
ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager,
76-
MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore,
77+
MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, ReplacedTransactionStore,
78+
SyncAndAsyncKVStore,
7779
};
7880
use crate::wallet::persist::KVStoreWalletPersister;
7981
use crate::wallet::Wallet;
@@ -1235,6 +1237,21 @@ fn build_with_store_internal(
12351237
},
12361238
};
12371239

1240+
let replaced_tx_store = match runtime
1241+
.block_on(async { read_replaced_txs(&*kv_store, Arc::clone(&logger)).await })
1242+
{
1243+
Ok(replaced_txs) => Arc::new(ReplacedTransactionStore::new(
1244+
replaced_txs,
1245+
REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1246+
REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1247+
Arc::clone(&kv_store),
1248+
Arc::clone(&logger),
1249+
)),
1250+
Err(e) => {
1251+
log_error!(logger, "Failed to read replaced transaction data from store: {}", e);
1252+
return Err(BuildError::ReadFailed);
1253+
},
1254+
};
12381255
let wallet = Arc::new(Wallet::new(
12391256
bdk_wallet,
12401257
wallet_persister,
@@ -1243,6 +1260,7 @@ fn build_with_store_internal(
12431260
Arc::clone(&payment_store),
12441261
Arc::clone(&config),
12451262
Arc::clone(&logger),
1263+
Arc::clone(&replaced_tx_store),
12461264
));
12471265

12481266
// Initialize the KeysManager

src/io/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,7 @@ pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer";
7878
///
7979
/// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice
8080
pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices";
81+
82+
/// The replaced transaction information will be persisted under this prefix.
83+
pub(crate) const REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE: &str = "replaced_txs";
84+
pub(crate) const REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";

src/io/utils.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use crate::io::{
4444
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
4545
};
4646
use crate::logger::{log_error, LdkLogger, Logger};
47+
use crate::payment::ReplacedOnchainTransactionDetails;
4748
use crate::peer_store::PeerStore;
4849
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
4950
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
@@ -620,6 +621,83 @@ pub(crate) fn read_bdk_wallet_change_set(
620621
Ok(Some(change_set))
621622
}
622623

624+
/// Read previously persisted replaced transaction information from the store.
625+
pub(crate) async fn read_replaced_txs<L: Deref>(
626+
kv_store: &DynStore, logger: L,
627+
) -> Result<Vec<ReplacedOnchainTransactionDetails>, std::io::Error>
628+
where
629+
L::Target: LdkLogger,
630+
{
631+
let mut res = Vec::new();
632+
633+
let mut stored_keys = KVStore::list(
634+
&*kv_store,
635+
REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE,
636+
REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE,
637+
)
638+
.await?;
639+
640+
const BATCH_SIZE: usize = 50;
641+
642+
let mut set = tokio::task::JoinSet::new();
643+
644+
// Fill JoinSet with tasks if possible
645+
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
646+
if let Some(next_key) = stored_keys.pop() {
647+
let fut = KVStore::read(
648+
&*kv_store,
649+
REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE,
650+
REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE,
651+
&next_key,
652+
);
653+
set.spawn(fut);
654+
debug_assert!(set.len() <= BATCH_SIZE);
655+
}
656+
}
657+
658+
while let Some(read_res) = set.join_next().await {
659+
// Exit early if we get an IO error.
660+
let reader = read_res
661+
.map_err(|e| {
662+
log_error!(logger, "Failed to read ReplacedOnchainTransactionDetails: {}", e);
663+
set.abort_all();
664+
e
665+
})?
666+
.map_err(|e| {
667+
log_error!(logger, "Failed to read ReplacedOnchainTransactionDetails: {}", e);
668+
set.abort_all();
669+
e
670+
})?;
671+
672+
// Refill set for every finished future, if we still have something to do.
673+
if let Some(next_key) = stored_keys.pop() {
674+
let fut = KVStore::read(
675+
&*kv_store,
676+
REPLACED_TX_PERSISTENCE_PRIMARY_NAMESPACE,
677+
REPLACED_TX_PERSISTENCE_SECONDARY_NAMESPACE,
678+
&next_key,
679+
);
680+
set.spawn(fut);
681+
debug_assert!(set.len() <= BATCH_SIZE);
682+
}
683+
684+
// Handle result.
685+
let payment = ReplacedOnchainTransactionDetails::read(&mut &*reader).map_err(|e| {
686+
log_error!(logger, "Failed to deserialize ReplacedOnchainTransactionDetails: {}", e);
687+
std::io::Error::new(
688+
std::io::ErrorKind::InvalidData,
689+
"Failed to deserialize ReplacedOnchainTransactionDetails",
690+
)
691+
})?;
692+
res.push(payment);
693+
}
694+
695+
debug_assert!(set.is_empty());
696+
debug_assert!(stored_keys.is_empty());
697+
698+
Ok(res)
699+
}
700+
623701
#[cfg(test)]
624702
mod tests {
625703
use super::read_or_generate_seed_file;

src/payment/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ pub(crate) mod asynchronous;
1111
mod bolt11;
1212
mod bolt12;
1313
mod onchain;
14+
mod replaced_transaction_store;
1415
mod spontaneous;
1516
pub(crate) mod store;
1617
mod unified;
1718

1819
pub use bolt11::Bolt11Payment;
1920
pub use bolt12::Bolt12Payment;
2021
pub use onchain::OnchainPayment;
22+
pub use replaced_transaction_store::ReplacedOnchainTransactionDetails;
2123
pub use spontaneous::SpontaneousPayment;
2224
pub use store::{
2325
ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus,
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// This file is Copyright its original authors, visible in version control history.
2+
//
3+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4+
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5+
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
6+
// accordance with one or both of these licenses.
7+
8+
use bitcoin::Txid;
9+
use lightning::impl_writeable_tlv_based;
10+
use lightning::ln::channelmanager::PaymentId;
11+
12+
use crate::data_store::{StorableObject, StorableObjectId, StorableObjectUpdate};
13+
14+
/// Details of an on-chain transaction that has replaced a previous transaction (e.g., via RBF).
15+
#[derive(Clone, Debug, PartialEq, Eq)]
16+
pub struct ReplacedOnchainTransactionDetails {
17+
/// The new transaction ID.
18+
pub new_txid: Txid,
19+
/// The original transaction ID that was replaced.
20+
pub original_txid: Txid,
21+
/// The payment ID associated with the transaction.
22+
pub payment_id: PaymentId,
23+
}
24+
25+
impl ReplacedOnchainTransactionDetails {
26+
pub(crate) fn new(new_txid: Txid, original_txid: Txid, payment_id: PaymentId) -> Self {
27+
Self { new_txid, original_txid, payment_id }
28+
}
29+
}
30+
31+
impl_writeable_tlv_based!(ReplacedOnchainTransactionDetails,{
32+
(0, new_txid, required),
33+
(2, original_txid, required),
34+
(4, payment_id, required),
35+
});
36+
37+
impl StorableObjectId for Txid {
38+
fn encode_to_hex_str(&self) -> String {
39+
self.to_string()
40+
}
41+
}
42+
impl StorableObject for ReplacedOnchainTransactionDetails {
43+
type Id = Txid;
44+
type Update = ReplacedOnchainTransactionDetailsUpdate;
45+
46+
fn id(&self) -> Self::Id {
47+
self.new_txid
48+
}
49+
50+
fn update(&mut self, _update: &Self::Update) -> bool {
51+
// We don't update, we delete on confirmation
52+
false
53+
}
54+
55+
fn to_update(&self) -> Self::Update {
56+
self.into()
57+
}
58+
}
59+
60+
#[derive(Clone, Debug, PartialEq, Eq)]
61+
pub(crate) struct ReplacedOnchainTransactionDetailsUpdate {
62+
pub id: Txid,
63+
}
64+
65+
impl From<&ReplacedOnchainTransactionDetails> for ReplacedOnchainTransactionDetailsUpdate {
66+
fn from(value: &ReplacedOnchainTransactionDetails) -> Self {
67+
Self { id: value.new_txid }
68+
}
69+
}
70+
71+
impl StorableObjectUpdate<ReplacedOnchainTransactionDetails>
72+
for ReplacedOnchainTransactionDetailsUpdate
73+
{
74+
fn id(&self) -> <ReplacedOnchainTransactionDetails as StorableObject>::Id {
75+
self.id
76+
}
77+
}

src/types.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::fee_estimator::OnchainFeeEstimator;
3838
use crate::gossip::RuntimeSpawner;
3939
use crate::logger::Logger;
4040
use crate::message_handler::NodeCustomMessageHandler;
41-
use crate::payment::PaymentDetails;
41+
use crate::payment::{PaymentDetails, ReplacedOnchainTransactionDetails};
4242

4343
/// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the
4444
/// same time.
@@ -609,3 +609,6 @@ impl From<&(u64, Vec<u8>)> for CustomTlvRecord {
609609
CustomTlvRecord { type_num: tlv.0, value: tlv.1.clone() }
610610
}
611611
}
612+
613+
pub(crate) type ReplacedTransactionStore =
614+
DataStore<ReplacedOnchainTransactionDetails, Arc<Logger>>;

0 commit comments

Comments
 (0)