@@ -3819,3 +3819,225 @@ fn test_claim_to_closed_channel_blocks_claimed_event() {
38193819 nodes[ 1 ] . chain_monitor . complete_sole_pending_chan_update ( & chan_a. 2 ) ;
38203820 expect_payment_claimed ! ( nodes[ 1 ] , payment_hash, 1_000_000 ) ;
38213821}
3822+
3823+ #[ test]
3824+ #[ cfg( all( feature = "std" , not( target_os = "windows" ) ) ) ]
3825+ fn test_single_channel_multiple_mpp ( ) {
3826+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
3827+
3828+ // Test what happens when we attempt to claim an MPP with many parts that came to us through
3829+ // the same channel with a synchronous persistence interface which has very high latency.
3830+ //
3831+ // Previously, if a `revoke_and_ack` came in while we were still running in
3832+ // `ChannelManager::claim_payment` we'd end up hanging waiting to apply a
3833+ // `ChannelMonitorUpdate` until after it completed. See the commit which introduced this test
3834+ // for more info.
3835+ let chanmon_cfgs = create_chanmon_cfgs ( 9 ) ;
3836+ let node_cfgs = create_node_cfgs ( 9 , & chanmon_cfgs) ;
3837+ let configs = [ None , None , None , None , None , None , None , None , None ] ;
3838+ let node_chanmgrs = create_node_chanmgrs ( 9 , & node_cfgs, & configs) ;
3839+ let mut nodes = create_network ( 9 , & node_cfgs, & node_chanmgrs) ;
3840+
3841+ let node_7_id = nodes[ 7 ] . node . get_our_node_id ( ) ;
3842+ let node_8_id = nodes[ 8 ] . node . get_our_node_id ( ) ;
3843+
3844+ // Send an MPP payment in six parts along the path shown from top to bottom
3845+ // 0
3846+ // 1 2 3 4 5 6
3847+ // 7
3848+ // 8
3849+ //
3850+ // We can in theory reproduce this issue with fewer channels/HTLCs, but getting this test
3851+ // robust is rather challenging. We rely on having the main test thread wait on locks held in
3852+ // the background `claim_funds` thread and unlocking when the `claim_funds` thread completes a
3853+ // single `ChannelMonitorUpdate`.
3854+ // This thread calls `get_and_clear_pending_msg_events()` and `handle_revoke_and_ack()`, both
3855+ // of which require `ChannelManager` locks, but we have to make sure this thread gets a chance
3856+ // to be blocked on the mutexes before we let the background thread wake `claim_funds` so that
3857+ // the mutex can switch to this main thread.
3858+ // This relies on our locks being fair, but also on our threads getting runtime during the test
3859+ // run, which can be pretty competitive. Thus we do a dumb dance to be as conservative as
3860+ // possible - we have a background thread which completes a `ChannelMonitorUpdate` (by sending
3861+ // into the `write_blocker` mpsc) but it doesn't run until a mpsc channel sends from this main
3862+ // thread to the background thread, and then we let it sleep a while before we send the
3863+ // `ChannelMonitorUpdate` unblocker.
3864+ // Further, we give ourselves two chances each time, needing 4 HTLCs just to unlock our two
3865+ // `ChannelManager` calls. We then need a few remaining HTLCs to actually trigger the bug, so
3866+ // we use 6 HTLCs.
3867+ // Finaly, we do not run this test on Winblowz because it, somehow, in 2025, does not implement
3868+ // actual preemptive multitasking and thinks that cooperative multitasking somehow is
3869+ // acceptable in the 21st century, let alone a quarter of the way into it.
3870+ const MAX_THREAD_INIT_TIME : std:: time:: Duration = std:: time:: Duration :: from_secs ( 1 ) ;
3871+
3872+ create_announced_chan_between_nodes_with_value ( & nodes, 0 , 1 , 100_000 , 0 ) ;
3873+ create_announced_chan_between_nodes_with_value ( & nodes, 0 , 2 , 100_000 , 0 ) ;
3874+ create_announced_chan_between_nodes_with_value ( & nodes, 0 , 3 , 100_000 , 0 ) ;
3875+ create_announced_chan_between_nodes_with_value ( & nodes, 0 , 4 , 100_000 , 0 ) ;
3876+ create_announced_chan_between_nodes_with_value ( & nodes, 0 , 5 , 100_000 , 0 ) ;
3877+ create_announced_chan_between_nodes_with_value ( & nodes, 0 , 6 , 100_000 , 0 ) ;
3878+
3879+ create_announced_chan_between_nodes_with_value ( & nodes, 1 , 7 , 100_000 , 0 ) ;
3880+ create_announced_chan_between_nodes_with_value ( & nodes, 2 , 7 , 100_000 , 0 ) ;
3881+ create_announced_chan_between_nodes_with_value ( & nodes, 3 , 7 , 100_000 , 0 ) ;
3882+ create_announced_chan_between_nodes_with_value ( & nodes, 4 , 7 , 100_000 , 0 ) ;
3883+ create_announced_chan_between_nodes_with_value ( & nodes, 5 , 7 , 100_000 , 0 ) ;
3884+ create_announced_chan_between_nodes_with_value ( & nodes, 6 , 7 , 100_000 , 0 ) ;
3885+ create_announced_chan_between_nodes_with_value ( & nodes, 7 , 8 , 1_000_000 , 0 ) ;
3886+
3887+ let ( mut route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash ! ( & nodes[ 0 ] , nodes[ 8 ] , 50_000_000 ) ;
3888+
3889+ send_along_route_with_secret ( & nodes[ 0 ] , route, & [ & [ & nodes[ 1 ] , & nodes[ 7 ] , & nodes[ 8 ] ] , & [ & nodes[ 2 ] , & nodes[ 7 ] , & nodes[ 8 ] ] , & [ & nodes[ 3 ] , & nodes[ 7 ] , & nodes[ 8 ] ] , & [ & nodes[ 4 ] , & nodes[ 7 ] , & nodes[ 8 ] ] , & [ & nodes[ 5 ] , & nodes[ 7 ] , & nodes[ 8 ] ] , & [ & nodes[ 6 ] , & nodes[ 7 ] , & nodes[ 8 ] ] ] , 50_000_000 , payment_hash, payment_secret) ;
3890+
3891+ let ( do_a_write, blocker) = std:: sync:: mpsc:: sync_channel ( 0 ) ;
3892+ * nodes[ 8 ] . chain_monitor . write_blocker . lock ( ) . unwrap ( ) = Some ( blocker) ;
3893+
3894+ // Until we have std::thread::scoped we have to unsafe { turn off the borrow checker }.
3895+ // We do this by casting a pointer to a `TestChannelManager` to a pointer to a
3896+ // `TestChannelManager` with different (in this case 'static) lifetime.
3897+ // This is even suggested in the second example at
3898+ // https://doc.rust-lang.org/std/mem/fn.transmute.html#examples
3899+ let claim_node: & ' static TestChannelManager < ' static , ' static > =
3900+ unsafe { std:: mem:: transmute ( nodes[ 8 ] . node as & TestChannelManager ) } ;
3901+ let thrd = std:: thread:: spawn ( move || {
3902+ // Initiate the claim in a background thread as it will immediately block waiting on the
3903+ // `write_blocker` we set above.
3904+ claim_node. claim_funds ( payment_preimage) ;
3905+ } ) ;
3906+
3907+ // First unlock one monitor so that we have a pending
3908+ // `update_fulfill_htlc`/`commitment_signed` pair to pass to our counterparty.
3909+ do_a_write. send ( ( ) ) . unwrap ( ) ;
3910+
3911+ // Then fetch the `update_fulfill_htlc`/`commitment_signed`. Note that the
3912+ // `get_and_clear_pending_msg_events` will immediately hang trying to take a peer lock which
3913+ // `claim_funds` is holding. Thus, we release a second write after a small sleep in the
3914+ // background to give `claim_funds` a chance to step forward, unblocking
3915+ // `get_and_clear_pending_msg_events`.
3916+ let do_a_write_background = do_a_write. clone ( ) ;
3917+ let block_thrd2 = AtomicBool :: new ( true ) ;
3918+ let block_thrd2_read: & ' static AtomicBool = unsafe { std:: mem:: transmute ( & block_thrd2) } ;
3919+ let thrd2 = std:: thread:: spawn ( move || {
3920+ while block_thrd2_read. load ( Ordering :: Acquire ) {
3921+ std:: thread:: yield_now ( ) ;
3922+ }
3923+ std:: thread:: sleep ( MAX_THREAD_INIT_TIME ) ;
3924+ do_a_write_background. send ( ( ) ) . unwrap ( ) ;
3925+ std:: thread:: sleep ( MAX_THREAD_INIT_TIME ) ;
3926+ do_a_write_background. send ( ( ) ) . unwrap ( ) ;
3927+ } ) ;
3928+ block_thrd2. store ( false , Ordering :: Release ) ;
3929+ let first_updates = get_htlc_update_msgs ( & nodes[ 8 ] , & nodes[ 7 ] . node . get_our_node_id ( ) ) ;
3930+ thrd2. join ( ) . unwrap ( ) ;
3931+
3932+ // Disconnect node 6 from all its peers so it doesn't bother to fail the HTLCs back
3933+ nodes[ 7 ] . node . peer_disconnected ( nodes[ 1 ] . node . get_our_node_id ( ) ) ;
3934+ nodes[ 7 ] . node . peer_disconnected ( nodes[ 2 ] . node . get_our_node_id ( ) ) ;
3935+ nodes[ 7 ] . node . peer_disconnected ( nodes[ 3 ] . node . get_our_node_id ( ) ) ;
3936+ nodes[ 7 ] . node . peer_disconnected ( nodes[ 4 ] . node . get_our_node_id ( ) ) ;
3937+ nodes[ 7 ] . node . peer_disconnected ( nodes[ 5 ] . node . get_our_node_id ( ) ) ;
3938+ nodes[ 7 ] . node . peer_disconnected ( nodes[ 6 ] . node . get_our_node_id ( ) ) ;
3939+
3940+ nodes[ 7 ] . node . handle_update_fulfill_htlc ( node_8_id, & first_updates. update_fulfill_htlcs [ 0 ] ) ;
3941+ check_added_monitors ( & nodes[ 7 ] , 1 ) ;
3942+ expect_payment_forwarded ! ( nodes[ 7 ] , nodes[ 1 ] , nodes[ 8 ] , Some ( 1000 ) , false , false ) ;
3943+ nodes[ 7 ] . node . handle_commitment_signed ( node_8_id, & first_updates. commitment_signed ) ;
3944+ check_added_monitors ( & nodes[ 7 ] , 1 ) ;
3945+ let ( raa, cs) = get_revoke_commit_msgs ( & nodes[ 7 ] , & node_8_id) ;
3946+
3947+ // Now, handle the `revoke_and_ack` from node 5. Note that `claim_funds` is still blocked on
3948+ // our peer lock, so we have to release a write to let it process.
3949+ // After this call completes, the channel previously would be locked up and should not be able
3950+ // to make further progress.
3951+ let do_a_write_background = do_a_write. clone ( ) ;
3952+ let block_thrd3 = AtomicBool :: new ( true ) ;
3953+ let block_thrd3_read: & ' static AtomicBool = unsafe { std:: mem:: transmute ( & block_thrd3) } ;
3954+ let thrd3 = std:: thread:: spawn ( move || {
3955+ while block_thrd3_read. load ( Ordering :: Acquire ) {
3956+ std:: thread:: yield_now ( ) ;
3957+ }
3958+ std:: thread:: sleep ( MAX_THREAD_INIT_TIME ) ;
3959+ do_a_write_background. send ( ( ) ) . unwrap ( ) ;
3960+ std:: thread:: sleep ( MAX_THREAD_INIT_TIME ) ;
3961+ do_a_write_background. send ( ( ) ) . unwrap ( ) ;
3962+ } ) ;
3963+ block_thrd3. store ( false , Ordering :: Release ) ;
3964+ nodes[ 8 ] . node . handle_revoke_and_ack ( node_7_id, & raa) ;
3965+ thrd3. join ( ) . unwrap ( ) ;
3966+ assert ! ( !thrd. is_finished( ) ) ;
3967+
3968+ let thrd4 = std:: thread:: spawn ( move || {
3969+ do_a_write. send ( ( ) ) . unwrap ( ) ;
3970+ do_a_write. send ( ( ) ) . unwrap ( ) ;
3971+ } ) ;
3972+
3973+ thrd4. join ( ) . unwrap ( ) ;
3974+ thrd. join ( ) . unwrap ( ) ;
3975+
3976+ expect_payment_claimed ! ( nodes[ 8 ] , payment_hash, 50_000_000 ) ;
3977+
3978+ // At the end, we should have 7 ChannelMonitorUpdates - 6 for HTLC claims, and one for the
3979+ // above `revoke_and_ack`.
3980+ check_added_monitors ( & nodes[ 8 ] , 7 ) ;
3981+
3982+ // Now drive everything to the end, at least as far as node 7 is concerned...
3983+ * nodes[ 8 ] . chain_monitor . write_blocker . lock ( ) . unwrap ( ) = None ;
3984+ nodes[ 8 ] . node . handle_commitment_signed ( node_7_id, & cs) ;
3985+ check_added_monitors ( & nodes[ 8 ] , 1 ) ;
3986+
3987+ let ( updates, raa) = get_updates_and_revoke ( & nodes[ 8 ] , & nodes[ 7 ] . node . get_our_node_id ( ) ) ;
3988+
3989+ nodes[ 7 ] . node . handle_update_fulfill_htlc ( node_8_id, & updates. update_fulfill_htlcs [ 0 ] ) ;
3990+ expect_payment_forwarded ! ( nodes[ 7 ] , nodes[ 2 ] , nodes[ 8 ] , Some ( 1000 ) , false , false ) ;
3991+ nodes[ 7 ] . node . handle_update_fulfill_htlc ( node_8_id, & updates. update_fulfill_htlcs [ 1 ] ) ;
3992+ expect_payment_forwarded ! ( nodes[ 7 ] , nodes[ 3 ] , nodes[ 8 ] , Some ( 1000 ) , false , false ) ;
3993+ let mut next_source = 4 ;
3994+ if let Some ( update) = updates. update_fulfill_htlcs . get ( 2 ) {
3995+ nodes[ 7 ] . node . handle_update_fulfill_htlc ( node_8_id, update) ;
3996+ expect_payment_forwarded ! ( nodes[ 7 ] , nodes[ 4 ] , nodes[ 8 ] , Some ( 1000 ) , false , false ) ;
3997+ next_source += 1 ;
3998+ }
3999+
4000+ nodes[ 7 ] . node . handle_commitment_signed ( node_8_id, & updates. commitment_signed ) ;
4001+ nodes[ 7 ] . node . handle_revoke_and_ack ( node_8_id, & raa) ;
4002+ if updates. update_fulfill_htlcs . get ( 2 ) . is_some ( ) {
4003+ check_added_monitors ( & nodes[ 7 ] , 5 ) ;
4004+ } else {
4005+ check_added_monitors ( & nodes[ 7 ] , 4 ) ;
4006+ }
4007+
4008+ let ( raa, cs) = get_revoke_commit_msgs ( & nodes[ 7 ] , & node_8_id) ;
4009+
4010+ nodes[ 8 ] . node . handle_revoke_and_ack ( node_7_id, & raa) ;
4011+ nodes[ 8 ] . node . handle_commitment_signed ( node_7_id, & cs) ;
4012+ check_added_monitors ( & nodes[ 8 ] , 2 ) ;
4013+
4014+ let ( updates, raa) = get_updates_and_revoke ( & nodes[ 8 ] , & node_7_id) ;
4015+
4016+ nodes[ 7 ] . node . handle_update_fulfill_htlc ( node_8_id, & updates. update_fulfill_htlcs [ 0 ] ) ;
4017+ expect_payment_forwarded ! ( nodes[ 7 ] , nodes[ next_source] , nodes[ 8 ] , Some ( 1000 ) , false , false ) ;
4018+ next_source += 1 ;
4019+ nodes[ 7 ] . node . handle_update_fulfill_htlc ( node_8_id, & updates. update_fulfill_htlcs [ 1 ] ) ;
4020+ expect_payment_forwarded ! ( nodes[ 7 ] , nodes[ next_source] , nodes[ 8 ] , Some ( 1000 ) , false , false ) ;
4021+ next_source += 1 ;
4022+ if let Some ( update) = updates. update_fulfill_htlcs . get ( 2 ) {
4023+ nodes[ 7 ] . node . handle_update_fulfill_htlc ( node_8_id, update) ;
4024+ expect_payment_forwarded ! ( nodes[ 7 ] , nodes[ next_source] , nodes[ 8 ] , Some ( 1000 ) , false , false ) ;
4025+ }
4026+
4027+ nodes[ 7 ] . node . handle_commitment_signed ( node_8_id, & updates. commitment_signed ) ;
4028+ nodes[ 7 ] . node . handle_revoke_and_ack ( node_8_id, & raa) ;
4029+ if updates. update_fulfill_htlcs . get ( 2 ) . is_some ( ) {
4030+ check_added_monitors ( & nodes[ 7 ] , 5 ) ;
4031+ } else {
4032+ check_added_monitors ( & nodes[ 7 ] , 4 ) ;
4033+ }
4034+
4035+ let ( raa, cs) = get_revoke_commit_msgs ( & nodes[ 7 ] , & node_8_id) ;
4036+ nodes[ 8 ] . node . handle_revoke_and_ack ( node_7_id, & raa) ;
4037+ nodes[ 8 ] . node . handle_commitment_signed ( node_7_id, & cs) ;
4038+ check_added_monitors ( & nodes[ 8 ] , 2 ) ;
4039+
4040+ let raa = get_event_msg ! ( nodes[ 8 ] , MessageSendEvent :: SendRevokeAndACK , node_7_id) ;
4041+ nodes[ 7 ] . node . handle_revoke_and_ack ( node_8_id, & raa) ;
4042+ check_added_monitors ( & nodes[ 7 ] , 1 ) ;
4043+ }
0 commit comments