diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 076d7fc9b..af2c90f48 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -13,6 +13,7 @@ dictionary Config { u64 probing_liquidity_limit_multiplier; AnchorChannelsConfig? anchor_channels_config; SendingParameters? sending_parameters; + boolean auto_rebroadcast_unconfirmed_tx; }; dictionary AnchorChannelsConfig { @@ -231,6 +232,10 @@ interface OnchainPayment { Txid send_to_address([ByRef]Address address, u64 amount_sats, FeeRate? fee_rate); [Throws=NodeError] Txid send_all_to_address([ByRef]Address address, boolean retain_reserve, FeeRate? fee_rate); + [Throws=NodeError] + void rebroadcast_transaction(PaymentId payment_id); + [Throws=NodeError] + Txid bump_fee_rbf(PaymentId payment_id); }; interface FeeRate { @@ -311,6 +316,7 @@ enum NodeError { "InsufficientFunds", "LiquiditySourceUnavailable", "LiquidityFeeTooHigh", + "InvalidTransaction", }; dictionary NodeStatus { @@ -409,7 +415,7 @@ interface ClosureReason { [Enum] interface PaymentKind { - Onchain(Txid txid, ConfirmationStatus status); + Onchain(Txid txid, ConfirmationStatus status, Transaction? raw_tx, u64? last_broadcast_time, u32? broadcast_attempts); Bolt11(PaymentHash hash, PaymentPreimage? preimage, PaymentSecret? secret); Bolt11Jit(PaymentHash hash, PaymentPreimage? preimage, PaymentSecret? secret, u64? counterparty_skimmed_fee_msat, LSPFeeLimits lsp_fee_limits); Bolt12Offer(PaymentHash? hash, PaymentPreimage? preimage, PaymentSecret? secret, OfferId offer_id, UntrustedString? payer_note, u64? quantity); @@ -865,3 +871,6 @@ typedef string OrderId; [Custom] typedef string DateTime; + +[Custom] +typedef string Transaction; diff --git a/src/config.rs b/src/config.rs index 02df8bbc7..7a8394b7f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,6 +29,9 @@ const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30; const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10; const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3; const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000; +const DEFAULT_MIN_REBROADCAST_INTERVAL_SECS: u64 = 300; +const DEFAULT_MAX_BROADCAST_ATTEMPTS: u32 = 24; +const DEFAULT_BACKOFF_FACTOR: f32 = 1.5; /// The default log level. pub const DEFAULT_LOG_LEVEL: LogLevel = LogLevel::Debug; @@ -94,6 +97,9 @@ pub(crate) const RGS_SYNC_TIMEOUT_SECS: u64 = 5; /// The length in bytes of our wallets' keys seed. pub const WALLET_KEYS_SEED_LEN: usize = 64; +// The time in-between unconfirmed transaction broadcasts. +pub(crate) const UNCONFIRMED_TX_BROADCAST_INTERVAL: Duration = Duration::from_secs(300); + #[derive(Debug, Clone)] /// Represents the configuration of an [`Node`] instance. /// @@ -115,6 +121,7 @@ pub const WALLET_KEYS_SEED_LEN: usize = 64; /// | `log_level` | Debug | /// | `anchor_channels_config` | Some(..) | /// | `sending_parameters` | None | +/// | `auto_rebroadcast_unconfirmed_tx` | true | /// /// See [`AnchorChannelsConfig`] and [`SendingParameters`] for more information regarding their /// respective default values. @@ -179,6 +186,16 @@ pub struct Config { /// **Note:** If unset, default parameters will be used, and you will be able to override the /// parameters on a per-payment basis in the corresponding method calls. pub sending_parameters: Option, + /// This will determine whether to automatically rebroadcast unconfirmed transactions + /// (e.g., channel funding or sweep transactions). + /// + /// If enabled, the node will periodically attempt to rebroadcast any unconfirmed transactions to + /// increase propagation and confirmation likelihood. This is helpful in cases where transactions + /// were dropped by the mempool or not widely propagated. + /// + /// Defaults to `true`. Disabling this may be desired for privacy-sensitive use cases or low-bandwidth + /// environments, but may result in slower or failed confirmations if transactions are not re-announced. + pub auto_rebroadcast_unconfirmed_tx: bool, } impl Default for Config { @@ -193,6 +210,7 @@ impl Default for Config { anchor_channels_config: Some(AnchorChannelsConfig::default()), sending_parameters: None, node_alias: None, + auto_rebroadcast_unconfirmed_tx: true, } } } @@ -534,6 +552,49 @@ impl From for LdkMaxDustHTLCExposure { } } +/// Policy for controlling transaction rebroadcasting behavior. +/// +/// Determines the strategy for resending unconfirmed transactions to the network +/// to ensure they remain in mempools and eventually get confirmed. +#[derive(Clone, Debug)] +pub struct RebroadcastPolicy { + /// Minimum time between rebroadcast attempts in seconds. + /// + /// This prevents excessive network traffic by ensuring a minimum delay + /// between consecutive rebroadcast attempts. + /// + /// **Recommended values**: 60-600 seconds (1-10 minutes) + pub min_rebroadcast_interval_secs: u64, + /// Maximum number of broadcast attempts before giving up. + /// + /// After reaching this limit, the transaction will no longer be rebroadcast + /// automatically. Manual intervention may be required. + /// + /// **Recommended values**: 12-48 attempts + pub max_broadcast_attempts: u32, + /// Exponential backoff factor for increasing intervals between attempts. + /// + /// Each subsequent rebroadcast wait time is multiplied by this factor, + /// creating an exponential backoff pattern. + /// + /// - `1.0`: No backoff (constant interval) + /// - `1.5`: 50% increase each attempt + /// - `2.0`: 100% increase (doubling) each attempt + /// + /// **Recommended values**: 1.2-2.0 + pub backoff_factor: f32, +} + +impl Default for RebroadcastPolicy { + fn default() -> Self { + Self { + min_rebroadcast_interval_secs: DEFAULT_MIN_REBROADCAST_INTERVAL_SECS, + max_broadcast_attempts: DEFAULT_MAX_BROADCAST_ATTEMPTS, + backoff_factor: DEFAULT_BACKOFF_FACTOR, + } + } +} + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/src/error.rs b/src/error.rs index 2cb71186d..6ef3c6343 100644 --- a/src/error.rs +++ b/src/error.rs @@ -120,6 +120,8 @@ pub enum Error { LiquiditySourceUnavailable, /// The given operation failed due to the LSP's required opening fee being too high. LiquidityFeeTooHigh, + /// The given transaction is invalid. + InvalidTransaction, } impl fmt::Display for Error { @@ -193,6 +195,7 @@ impl fmt::Display for Error { Self::LiquidityFeeTooHigh => { write!(f, "The given operation failed due to the LSP's required opening fee being too high.") }, + Self::InvalidTransaction => write!(f, "The given transaction is invalid."), } } } diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 984e4da8f..ebf77ba5f 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -36,7 +36,7 @@ pub use lightning_invoice::{Description, SignedRawBolt11Invoice}; pub use lightning_liquidity::lsps1::msgs::ChannelInfo as ChannelOrderInfo; pub use lightning_liquidity::lsps1::msgs::{OrderId, OrderParameters, PaymentState}; -pub use bitcoin::{Address, BlockHash, FeeRate, Network, OutPoint, Txid}; +pub use bitcoin::{Address, BlockHash, FeeRate, Network, OutPoint, Transaction, Txid}; pub use bip39::Mnemonic; @@ -1117,6 +1117,21 @@ impl UniffiCustomTypeConverter for DateTime { } } +impl UniffiCustomTypeConverter for Transaction { + type Builtin = String; + fn into_custom(val: Self::Builtin) -> uniffi::Result { + if let Some(bytes) = hex_utils::to_vec(&val) { + if let Ok(tx) = bitcoin::consensus::deserialize::(&bytes) { + return Ok(tx); + } + } + Err(Error::InvalidTransaction.into()) + } + fn from_custom(obj: Self) -> Self::Builtin { + hex_utils::to_string(&bitcoin::consensus::serialize(&obj)) + } +} + #[cfg(test)] mod tests { use std::{ diff --git a/src/lib.rs b/src/lib.rs index da86fce73..82a3b2e3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,7 +129,7 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, - PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, UNCONFIRMED_TX_BROADCAST_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -402,6 +402,33 @@ impl Node { } }); + // Regularly rebroadcast unconfirmed transactions. + let rebroadcast_wallet = Arc::clone(&self.wallet); + let rebroadcast_logger = Arc::clone(&self.logger); + let mut stop_rebroadcast = self.stop_sender.subscribe(); + if self.config.auto_rebroadcast_unconfirmed_tx { + self.runtime.spawn_cancellable_background_task(async move { + let mut interval = tokio::time::interval(UNCONFIRMED_TX_BROADCAST_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = stop_rebroadcast.changed() => { + log_debug!( + rebroadcast_logger, + "Stopping rebroadcasting unconfirmed transactions." + ); + return; + } + _ = interval.tick() => { + if let Err(e) = rebroadcast_wallet.rebroadcast_unconfirmed_transactions() { + log_error!(rebroadcast_logger, "Background rebroadcast failed: {}", e); + } + } + } + } + }); + } + // Regularly broadcast node announcements. let bcast_cm = Arc::clone(&self.channel_manager); let bcast_pm = Arc::clone(&self.peer_manager); diff --git a/src/payment/onchain.rs b/src/payment/onchain.rs index 2614e55ce..1cdd495dd 100644 --- a/src/payment/onchain.rs +++ b/src/payment/onchain.rs @@ -14,6 +14,7 @@ use crate::types::{ChannelManager, Wallet}; use crate::wallet::OnchainSendAmount; use bitcoin::{Address, Txid}; +use lightning::ln::channelmanager::PaymentId; use std::sync::{Arc, RwLock}; @@ -120,4 +121,29 @@ impl OnchainPayment { let fee_rate_opt = maybe_map_fee_rate_opt!(fee_rate); self.wallet.send_to_address(address, send_amount, fee_rate_opt) } + + /// Manually trigger a rebroadcast of a specific transaction according to the default policy. + /// + /// This is useful if you suspect a transaction may not have propagated properly through the + /// network and you want to attempt to rebroadcast it immediately rather than waiting for the + /// automatic background job to handle it. + /// + /// updating the attempt count and last broadcast time for the transaction in the payment store. + pub fn rebroadcast_transaction(&self, payment_id: PaymentId) -> Result<(), Error> { + self.wallet.rebroadcast_transaction(payment_id) + } + + /// Attempt to bump the fee of an unconfirmed transaction using Replace-by-Fee (RBF). + /// + /// This creates a new transaction that replaces the original one, increasing the fee by the + /// specified increment to improve its chances of confirmation. The original transaction must + /// be signaling RBF replaceability for this to succeed. + /// + /// The new transaction will have the same outputs as the original but with a + /// higher fee, resulting in faster confirmation potential. + /// + /// Returns the Txid of the new replacement transaction if successful. + pub fn bump_fee_rbf(&self, payment_id: PaymentId) -> Result { + self.wallet.bump_fee_rbf(payment_id) + } } diff --git a/src/payment/store.rs b/src/payment/store.rs index 75b2b1b2a..d87a8067a 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -17,7 +17,7 @@ use lightning::{ use lightning_types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; -use bitcoin::{BlockHash, Txid}; +use bitcoin::{BlockHash, Transaction, Txid}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -293,6 +293,33 @@ impl StorableObject for PaymentDetails { } } + if let Some(tx) = &update.raw_tx { + match self.kind { + PaymentKind::Onchain { ref mut raw_tx, .. } => { + update_if_necessary!(*raw_tx, tx.clone()); + }, + _ => {}, + } + } + + if let Some(attempts) = update.broadcast_attempts { + match self.kind { + PaymentKind::Onchain { ref mut broadcast_attempts, .. } => { + update_if_necessary!(*broadcast_attempts, attempts); + }, + _ => {}, + } + } + + if let Some(broadcast_time) = update.last_broadcast_time { + match self.kind { + PaymentKind::Onchain { ref mut last_broadcast_time, .. } => { + update_if_necessary!(*last_broadcast_time, broadcast_time); + }, + _ => {}, + } + } + if updated { self.latest_update_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -353,6 +380,12 @@ pub enum PaymentKind { txid: Txid, /// The confirmation status of this payment. status: ConfirmationStatus, + /// The raw transaction for rebroadcasting + raw_tx: Option, + /// Last broadcast attempt timestamp (UNIX seconds) + last_broadcast_time: Option, + /// Number of broadcast attempts + broadcast_attempts: Option, }, /// A [BOLT 11] payment. /// @@ -450,7 +483,10 @@ pub enum PaymentKind { impl_writeable_tlv_based_enum!(PaymentKind, (0, Onchain) => { (0, txid, required), + (1, raw_tx, option), (2, status, required), + (3, last_broadcast_time, option), + (5, broadcast_attempts, option), }, (2, Bolt11) => { (0, hash, required), @@ -542,6 +578,9 @@ pub(crate) struct PaymentDetailsUpdate { pub direction: Option, pub status: Option, pub confirmation_status: Option, + pub raw_tx: Option>, + pub last_broadcast_time: Option>, + pub broadcast_attempts: Option>, } impl PaymentDetailsUpdate { @@ -557,6 +596,9 @@ impl PaymentDetailsUpdate { direction: None, status: None, confirmation_status: None, + raw_tx: None, + last_broadcast_time: None, + broadcast_attempts: None, } } } @@ -572,10 +614,17 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { _ => (None, None, None), }; - let confirmation_status = match value.kind { - PaymentKind::Onchain { status, .. } => Some(status), - _ => None, - }; + let (confirmation_status, raw_tx, last_broadcast_time, broadcast_attempts) = + match &value.kind { + PaymentKind::Onchain { + status, + raw_tx, + last_broadcast_time, + broadcast_attempts, + .. + } => (Some(*status), raw_tx.clone(), *last_broadcast_time, *broadcast_attempts), + _ => (None, None, None, None), + }; let counterparty_skimmed_fee_msat = match value.kind { PaymentKind::Bolt11Jit { counterparty_skimmed_fee_msat, .. } => { @@ -595,6 +644,9 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { direction: Some(value.direction), status: Some(value.status), confirmation_status, + raw_tx: Some(raw_tx), + last_broadcast_time: Some(last_broadcast_time), + broadcast_attempts: Some(broadcast_attempts), } } } diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index fbac1d1b6..2d3ba278e 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -5,14 +5,15 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +use bdk_wallet::error::{BuildFeeBumpError, CreateTxError}; use persist::KVStoreWalletPersister; -use crate::config::Config; +use crate::config::{Config, RebroadcastPolicy}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger}; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator}; use crate::payment::store::ConfirmationStatus; -use crate::payment::{PaymentDetails, PaymentDirection, PaymentStatus}; +use crate::payment::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; use crate::types::PaymentStore; use crate::Error; @@ -53,7 +54,7 @@ use bitcoin::{ use std::ops::Deref; use std::str::FromStr; use std::sync::{Arc, Mutex}; - +use std::time::{Duration, SystemTime, UNIX_EPOCH}; pub(crate) enum OnchainSendAmount { ExactRetainingReserve { amount_sats: u64, cur_anchor_reserve_sats: u64 }, AllRetainingReserve { cur_anchor_reserve_sats: u64 }, @@ -200,7 +201,27 @@ where // create and persist a list of 'static pending outputs' that we could use // here to determine the `PaymentKind`, but that's not really satisfactory, so // we're punting on it until we can come up with a better solution. - let kind = crate::payment::PaymentKind::Onchain { txid, status: confirmation_status }; + let existing_payment = self.payment_store.get(&id); + let (last_broadcast_time, broadcast_attempts) = if let Some(existing) = existing_payment + { + if let PaymentKind::Onchain { last_broadcast_time, broadcast_attempts, .. } = + existing.kind + { + (last_broadcast_time, broadcast_attempts) + } else { + (None, None) + } + } else { + (None, None) + }; + + let kind = crate::payment::PaymentKind::Onchain { + txid, + status: confirmation_status, + raw_tx: Some((*wtx.tx_node.tx).clone()), + last_broadcast_time, + broadcast_attempts, + }; let fee = locked_wallet.calculate_fee(&wtx.tx_node.tx).unwrap_or(Amount::ZERO); let (sent, received) = locked_wallet.sent_and_received(&wtx.tx_node.tx); let (direction, amount_msat) = if sent > received { @@ -533,9 +554,68 @@ where })? }; + let txid = tx.compute_txid(); + + // Calculate amounts for payment tracking + let (amount_msat, fee_paid_msat) = { + let locked_wallet = self.inner.lock().unwrap(); + let (sent, received) = locked_wallet.sent_and_received(&tx); + let fee = locked_wallet.calculate_fee(&tx).unwrap_or(Amount::ZERO); + + let amount_msat = + sent.checked_sub(received) + .unwrap_or(Amount::ZERO) + .to_sat() + .checked_sub(fee.to_sat()) + .unwrap_or(0) * 1000; + + let fee_paid_msat = fee.to_sat() * 1000; + + (Some(amount_msat), Some(fee_paid_msat)) + }; + + // Create payment details + let payment_details = PaymentDetails { + id: PaymentId(txid.to_byte_array()), + kind: PaymentKind::Onchain { + txid, + status: ConfirmationStatus::Unconfirmed, + raw_tx: Some(tx.clone()), + last_broadcast_time: None, + broadcast_attempts: None, + }, + amount_msat, + fee_paid_msat, + direction: PaymentDirection::Outbound, + status: PaymentStatus::Pending, + latest_update_timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(), + }; + + // Store payment details before broadcasting + self.payment_store.insert_or_update(payment_details)?; + self.broadcaster.broadcast_transactions(&[&tx]); - let txid = tx.compute_txid(); + if let Some(mut payment) = self.payment_store.get(&PaymentId(txid.to_byte_array())) { + if let PaymentKind::Onchain { + ref mut last_broadcast_time, + ref mut broadcast_attempts, + .. + } = &mut payment.kind + { + *last_broadcast_time = Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(), + ); + *broadcast_attempts = Some(broadcast_attempts.unwrap_or(0) + 1); + self.payment_store.insert_or_update(payment)?; + } + } match send_amount { OnchainSendAmount::ExactRetainingReserve { amount_sats, .. } => { @@ -568,6 +648,309 @@ where Ok(txid) } + + pub(crate) fn rebroadcast_unconfirmed_transactions(&self) -> Result<(), Error> { + let policy = RebroadcastPolicy::default(); + let unconfirmed_txids = self.get_unconfirmed_txids(); + + log_debug!(self.logger, "Found {} unconfirmed transactions", unconfirmed_txids.len()); + + if unconfirmed_txids.is_empty() { + log_info!(self.logger, "No unconfirmed transactions to rebroadcast"); + return Ok(()); + } + + let mut rebroadcast_count = 0; + let locked_wallet = self.inner.lock().unwrap(); + + for txid in unconfirmed_txids { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + + if let Some(mut payment) = self.payment_store.get(&PaymentId(txid.to_byte_array())) { + if let PaymentKind::Onchain { + ref mut last_broadcast_time, + ref mut broadcast_attempts, + ref raw_tx, + ref mut status, + .. + } = &mut payment.kind + { + if !matches!(status, ConfirmationStatus::Unconfirmed) { + log_info!(self.logger, "Skipping confirmed transaction {}", txid); + continue; + } + + let Some(raw_tx) = raw_tx else { + log_info!(self.logger, "No raw transaction data for {}", txid); + continue; + }; + + let should_rebroadcast = match last_broadcast_time { + Some(last_time) => { + let next_allowed_time = *last_time + + self.calculate_backoff_interval( + (*broadcast_attempts).unwrap_or(0), + &policy, + ); + now >= next_allowed_time + && (*broadcast_attempts).unwrap_or(0) + < policy.max_broadcast_attempts + }, + None => true, + }; + + if !should_rebroadcast { + continue; + } + + *last_broadcast_time = Some(now); + *broadcast_attempts = Some(broadcast_attempts.unwrap_or(0) + 1); + + self.broadcaster.broadcast_transactions(&[&raw_tx]); + rebroadcast_count += 1; + + log_info!(self.logger, "Rebroadcast unconfirmed transaction {}", txid); + + if let Err(e) = self.payment_store.insert_or_update(payment) { + log_error!( + self.logger, + "Failed to update payment store for {}: {}", + txid, + e + ); + } + } + } else { + log_info!(self.logger, "No details found for payment {} in store", txid); + + if let Some(tx_entry) = locked_wallet.get_tx(txid) { + self.broadcaster.broadcast_transactions(&[&tx_entry.tx_node.tx]); + rebroadcast_count += 1; + log_info!( + self.logger, + "Rebroadcast unconfirmed transaction {} (from wallet)", + txid + ); + } else { + log_info!( + self.logger, + "Transaction {} not found in wallet or payment store", + txid + ); + } + } + } + + if rebroadcast_count > 0 { + log_info!(self.logger, "Successfully rebroadcast {} transactions", rebroadcast_count); + } + + Ok(()) + } + + fn calculate_backoff_interval(&self, attempt: u32, policy: &RebroadcastPolicy) -> u64 { + let base_interval = policy.min_rebroadcast_interval_secs as f32; + let interval = base_interval * policy.backoff_factor.powi(attempt as i32); + interval.round() as u64 + } + + pub(crate) fn rebroadcast_transaction(&self, payment_id: PaymentId) -> Result<(), Error> { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(); + + if let Some(mut payment) = self.payment_store.get(&payment_id) { + if let PaymentKind::Onchain { + ref mut last_broadcast_time, + ref mut broadcast_attempts, + ref mut raw_tx, + .. + } = &mut payment.kind + { + if let Some(raw_tx) = raw_tx { + *last_broadcast_time = Some(now); + *broadcast_attempts = Some(broadcast_attempts.unwrap_or(0) + 1); + + self.broadcaster.broadcast_transactions(&[raw_tx]); + log_info!(self.logger, "Rebroadcast transaction {}", payment_id); + + self.payment_store.insert_or_update(payment)?; + return Ok(()); + } else { + log_info!(self.logger, "No details found for payment {} in store", payment_id); + return Err(Error::InvalidPaymentId); + } + } + } + + log_info!(self.logger, "No details found for payment {} in store", payment_id); + return Err(Error::InvalidPaymentId); + } + + pub(crate) fn bump_fee_rbf(&self, payment_id: PaymentId) -> Result { + let old_payment = + self.payment_store.get(&payment_id).ok_or(Error::InvalidPaymentId)?.clone(); + + let mut locked_wallet = self.inner.lock().unwrap(); + + let txid = Txid::from_slice(&payment_id.0).expect("32 bytes"); + + let wallet_tx = locked_wallet.get_tx(txid).ok_or(Error::InvalidPaymentId)?; + let (sent, received) = locked_wallet.sent_and_received(&wallet_tx.tx_node.tx); + + if sent <= received { + log_error!( + self.logger, + "Transaction {} is not an outbound payment (sent: {}, received: {})", + txid, + sent, + received + ); + return Err(Error::InvalidPaymentId); + } + + if old_payment.direction != PaymentDirection::Outbound { + log_error!(self.logger, "Transaction {} is not an outbound payment", txid); + return Err(Error::InvalidPaymentId); + } + + if let PaymentKind::Onchain { status, .. } = &old_payment.kind { + match status { + ConfirmationStatus::Confirmed { .. } => { + log_error!( + self.logger, + "Transaction {} is already confirmed and cannot be fee bumped", + txid + ); + return Err(Error::InvalidPaymentId); + }, + ConfirmationStatus::Unconfirmed => {}, + } + } + + let confirmation_target = ConfirmationTarget::OnchainPayment; + let estimated_fee_rate = self.fee_estimator.estimate_fee_rate(confirmation_target); + + log_info!(self.logger, "Bumping fee to {}", estimated_fee_rate); + + let mut psbt = { + let mut builder = locked_wallet.build_fee_bump(txid).map_err(|e| { + log_error!(self.logger, "BDK fee bump failed for {}: {:?}", txid, e); + match e { + BuildFeeBumpError::TransactionNotFound(_) => Error::InvalidPaymentId, + BuildFeeBumpError::TransactionConfirmed(_) => Error::InvalidPaymentId, + BuildFeeBumpError::IrreplaceableTransaction(_) => Error::InvalidPaymentId, + BuildFeeBumpError::FeeRateUnavailable => Error::InvalidPaymentId, + _ => Error::InvalidFeeRate, + } + })?; + + builder.fee_rate(estimated_fee_rate); + + match builder.finish() { + Ok(psbt) => Ok(psbt), + Err(CreateTxError::FeeRateTooLow { required }) => { + log_info!(self.logger, "BDK requires higher fee rate: {}", required); + + // Safety check + const MAX_REASONABLE_FEE_RATE_SAT_VB: u64 = 1000; + if required.to_sat_per_vb_ceil() > MAX_REASONABLE_FEE_RATE_SAT_VB { + log_error!( + self.logger, + "BDK requires unreasonably high fee rate: {} sat/vB", + required.to_sat_per_vb_ceil() + ); + return Err(Error::InvalidFeeRate); + } + + let mut builder = locked_wallet.build_fee_bump(txid).map_err(|e| { + log_error!(self.logger, "BDK fee bump retry failed for {}: {:?}", txid, e); + Error::InvalidFeeRate + })?; + + builder.fee_rate(required); + builder.finish().map_err(|e| { + log_error!( + self.logger, + "Failed to finish PSBT with required fee rate: {:?}", + e + ); + Error::InvalidFeeRate + }) + }, + Err(e) => { + log_error!(self.logger, "Failed to create fee bump PSBT: {:?}", e); + Err(Error::InvalidFeeRate) + }, + }? + }; + + match locked_wallet.sign(&mut psbt, SignOptions::default()) { + Ok(finalized) => { + if !finalized { + return Err(Error::OnchainTxCreationFailed); + } + }, + Err(err) => { + log_error!(self.logger, "Failed to create transaction: {}", err); + return Err(err.into()); + }, + } + + let mut locked_persister = self.persister.lock().unwrap(); + locked_wallet.persist(&mut locked_persister).map_err(|e| { + log_error!(self.logger, "Failed to persist wallet: {}", e); + Error::PersistenceFailed + })?; + + let fee_bumped_tx = psbt.extract_tx().map_err(|e| { + log_error!(self.logger, "Failed to extract transaction: {}", e); + e + })?; + + let new_txid = fee_bumped_tx.compute_txid(); + + self.broadcaster.broadcast_transactions(&[&fee_bumped_tx]); + + let new_fee = locked_wallet.calculate_fee(&fee_bumped_tx).unwrap_or(Amount::ZERO); + let new_fee_sats = new_fee.to_sat(); + + let payment_details = PaymentDetails { + id: PaymentId(new_txid.to_byte_array()), + kind: PaymentKind::Onchain { + txid: new_txid, + status: ConfirmationStatus::Unconfirmed, + raw_tx: Some(fee_bumped_tx), + last_broadcast_time: Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(), + ), + broadcast_attempts: Some(1), + }, + amount_msat: old_payment.amount_msat, + fee_paid_msat: Some(new_fee_sats * 1000), + direction: old_payment.direction, + status: PaymentStatus::Pending, + latest_update_timestamp: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_secs(), + }; + + self.payment_store.remove(&payment_id)?; + + self.payment_store.insert_or_update(payment_details)?; + + log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); + + Ok(new_txid) + } } impl Listen for Wallet diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 0932116ef..120716380 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -33,9 +33,9 @@ use lightning::util::persist::KVStore; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; -use bitcoin::address::NetworkUnchecked; use bitcoin::hashes::sha256::Hash as Sha256Hash; use bitcoin::hashes::Hash; +use bitcoin::{address::NetworkUnchecked, Txid}; use bitcoin::{Address, Amount, ScriptBuf}; use log::LevelFilter; @@ -444,7 +444,7 @@ fn onchain_send_receive() { let payment_a = node_a.payment(&payment_id).unwrap(); match payment_a.kind { - PaymentKind::Onchain { txid: _txid, status } => { + PaymentKind::Onchain { txid: _txid, status, .. } => { assert_eq!(_txid, txid); assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); }, @@ -453,7 +453,7 @@ fn onchain_send_receive() { let payment_b = node_a.payment(&payment_id).unwrap(); match payment_b.kind { - PaymentKind::Onchain { txid: _txid, status } => { + PaymentKind::Onchain { txid: _txid, status, .. } => { assert_eq!(_txid, txid); assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); }, @@ -671,6 +671,129 @@ fn onchain_wallet_recovery() { ); } +#[test] +fn onchain_fee_bump_rbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + // Fund both nodes + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + let premine_amount_sat = 500_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a.clone(), addr_b.clone()], + Amount::from_sat(premine_amount_sat), + ); + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Send a transaction from node_b to node_a that we'll later bump + let amount_to_send_sats = 100_000; + let txid = + node_b.onchain_payment().send_to_address(&addr_a, amount_to_send_sats, None).unwrap(); + wait_for_tx(&electrsd.client, txid); + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let payment_id = PaymentId(txid.to_byte_array()); + let original_payment = node_b.payment(&payment_id).unwrap(); + let original_fee = original_payment.fee_paid_msat.unwrap(); + + // Non-existent payment id + let fake_txid = + Txid::from_str("0000000000000000000000000000000000000000000000000000000000000000").unwrap(); + let invalid_payment_id = PaymentId(fake_txid.to_byte_array()); + assert_eq!( + Err(NodeError::InvalidPaymentId), + node_b.onchain_payment().bump_fee_rbf(invalid_payment_id) + ); + + // Bump an inbound payment + assert_eq!(Err(NodeError::InvalidPaymentId), node_a.onchain_payment().bump_fee_rbf(payment_id)); + + // Successful fee bump + let new_txid = node_b.onchain_payment().bump_fee_rbf(payment_id).unwrap(); + wait_for_tx(&electrsd.client, new_txid); + + // Sleep to allow for transaction propagation + std::thread::sleep(std::time::Duration::from_secs(5)); + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify old payment is removed and new payment exists + assert!(node_b.payment(&payment_id).is_none(), "Old payment should be removed"); + + let new_payment_id = PaymentId(new_txid.to_byte_array()); + let new_payment = node_b.payment(&new_payment_id).unwrap(); + + // Verify payment properties + assert_eq!(new_payment.amount_msat, Some(amount_to_send_sats * 1000)); + assert_eq!(new_payment.direction, PaymentDirection::Outbound); + assert_eq!(new_payment.status, PaymentStatus::Pending); + + // Verify fee increased + assert!( + new_payment.fee_paid_msat > Some(original_fee), + "Fee should increase after RBF bump. Original: {}, New: {}", + original_fee, + new_payment.fee_paid_msat.unwrap() + ); + + // Multiple consecutive bumps + let second_bump_txid = node_b.onchain_payment().bump_fee_rbf(new_payment_id).unwrap(); + wait_for_tx(&electrsd.client, second_bump_txid); + + // Sleep to allow for transaction propagation + std::thread::sleep(std::time::Duration::from_secs(5)); + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + assert!(node_b.payment(&new_payment_id).is_none(), "First bump payment should be removed"); + + let second_payment_id = PaymentId(second_bump_txid.to_byte_array()); + let second_payment = node_b.payment(&second_payment_id).unwrap(); + + assert!( + second_payment.fee_paid_msat > new_payment.fee_paid_msat, + "Second bump should have higher fee than first bump" + ); + + // Confirm the transaction and try to bump again (should fail) + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + assert_eq!( + Err(NodeError::InvalidPaymentId), + node_b.onchain_payment().bump_fee_rbf(second_payment_id) + ); + + // Verify final payment is confirmed + let final_payment = node_b.payment(&second_payment_id).unwrap(); + assert_eq!(final_payment.status, PaymentStatus::Succeeded); + match final_payment.kind { + PaymentKind::Onchain { status, .. } => { + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + + // Verify node A received the funds correctly + let node_a_received_payment = node_a.list_payments_with_filter( + |p| matches!(p.kind, PaymentKind::Onchain { txid, .. } if txid == second_bump_txid), + ); + assert_eq!(node_a_received_payment.len(), 1); + assert_eq!(node_a_received_payment[0].amount_msat, Some(amount_to_send_sats * 1000)); + assert_eq!(node_a_received_payment[0].status, PaymentStatus::Succeeded); +} + #[test] fn test_rbf_via_mempool() { run_rbf_test(false);