@@ -479,6 +479,21 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
479479/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
480480pub ( crate ) const EXPIRE_PREV_CONFIG_TICKS : usize = 5 ;
481481
482+ struct PendingChannelMonitorUpdate {
483+ update : ChannelMonitorUpdate ,
484+ /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
485+ /// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
486+ /// blocked on some external event and the [`ChannelManager`] will update us when we're ready.
487+ ///
488+ /// [`ChannelManager`]: super::channelmanager::ChannelManager
489+ blocked : bool ,
490+ }
491+
492+ impl_writeable_tlv_based ! ( PendingChannelMonitorUpdate , {
493+ ( 0 , update, required) ,
494+ ( 2 , blocked, required) ,
495+ } ) ;
496+
482497// TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking
483498// has been completed, and then turn into a Channel to get compiler-time enforcement of things like
484499// calling channel_id() before we're set up or things like get_outbound_funding_signed on an
@@ -744,7 +759,7 @@ pub(super) struct Channel<Signer: ChannelSigner> {
744759 /// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
745760 /// completes we still need to be able to complete the persistence. Thus, we have to keep a
746761 /// copy of the [`ChannelMonitorUpdate`] here until it is complete.
747- pending_monitor_updates : Vec < ChannelMonitorUpdate > ,
762+ pending_monitor_updates : Vec < PendingChannelMonitorUpdate > ,
748763}
749764
750765#[ cfg( any( test, fuzzing) ) ]
@@ -1995,28 +2010,52 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
19952010 }
19962011
19972012 pub fn get_update_fulfill_htlc_and_commit < L : Deref > ( & mut self , htlc_id : u64 , payment_preimage : PaymentPreimage , logger : & L ) -> UpdateFulfillCommitFetch where L :: Target : Logger {
2013+ let release_cs_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
19982014 match self . get_update_fulfill_htlc ( htlc_id, payment_preimage, logger) {
1999- UpdateFulfillFetch :: NewClaim { mut monitor_update, htlc_value_msat, msg : Some ( _) } => {
2000- let mut additional_update = self . build_commitment_no_status_check ( logger) ;
2001- // build_commitment_no_status_check may bump latest_monitor_id but we want them to be
2002- // strictly increasing by one, so decrement it here.
2003- self . latest_monitor_update_id = monitor_update. update_id ;
2004- monitor_update. updates . append ( & mut additional_update. updates ) ;
2005- self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
2006- self . pending_monitor_updates . push ( monitor_update) ;
2015+ UpdateFulfillFetch :: NewClaim { mut monitor_update, htlc_value_msat, msg } => {
2016+ // Even if we aren't supposed to let new monitor updates with commitment state
2017+ // updates run, we still need to push the preimage ChannelMonitorUpdateStep no
2018+ // matter what. Sadly, to push a new monitor update which flies before others
2019+ // already queued, we have to insert it into the pending queue and update the
2020+ // update_ids of all the following monitors.
2021+ let unblocked_update_pos = if release_cs_monitor && msg. is_some ( ) {
2022+ let mut additional_update = self . build_commitment_no_status_check ( logger) ;
2023+ // build_commitment_no_status_check may bump latest_monitor_id but we want them
2024+ // to be strictly increasing by one, so decrement it here.
2025+ self . latest_monitor_update_id = monitor_update. update_id ;
2026+ monitor_update. updates . append ( & mut additional_update. updates ) ;
2027+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
2028+ update : monitor_update, blocked : false ,
2029+ } ) ;
2030+ self . pending_monitor_updates . len ( ) - 1
2031+ } else {
2032+ let insert_pos = self . pending_monitor_updates . iter ( ) . position ( |upd| upd. blocked )
2033+ . unwrap_or ( self . pending_monitor_updates . len ( ) ) ;
2034+ let new_mon_id = self . pending_monitor_updates . get ( insert_pos)
2035+ . map ( |upd| upd. update . update_id ) . unwrap_or ( monitor_update. update_id ) ;
2036+ monitor_update. update_id = new_mon_id;
2037+ self . pending_monitor_updates . insert ( insert_pos, PendingChannelMonitorUpdate {
2038+ update : monitor_update, blocked : false ,
2039+ } ) ;
2040+ for held_update in self . pending_monitor_updates . iter_mut ( ) . skip ( insert_pos + 1 ) {
2041+ held_update. update . update_id += 1 ;
2042+ }
2043+ if msg. is_some ( ) {
2044+ debug_assert ! ( false , "If there is a pending blocked monitor we should have MonitorUpdateInProgress set" ) ;
2045+ let update = self . build_commitment_no_status_check ( logger) ;
2046+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
2047+ update, blocked : true ,
2048+ } ) ;
2049+ }
2050+ insert_pos
2051+ } ;
2052+ self . monitor_updating_paused ( false , msg. is_some ( ) , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
20072053 UpdateFulfillCommitFetch :: NewClaim {
2008- monitor_update : self . pending_monitor_updates . last ( ) . unwrap ( ) ,
2054+ monitor_update : & self . pending_monitor_updates . get ( unblocked_update_pos)
2055+ . expect ( "We just pushed the monitor update" ) . update ,
20092056 htlc_value_msat,
20102057 }
20112058 } ,
2012- UpdateFulfillFetch :: NewClaim { monitor_update, htlc_value_msat, msg : None } => {
2013- self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
2014- self . pending_monitor_updates . push ( monitor_update) ;
2015- UpdateFulfillCommitFetch :: NewClaim {
2016- monitor_update : self . pending_monitor_updates . last ( ) . unwrap ( ) ,
2017- htlc_value_msat,
2018- }
2019- }
20202059 UpdateFulfillFetch :: DuplicateClaim { } => UpdateFulfillCommitFetch :: DuplicateClaim { } ,
20212060 }
20222061 }
@@ -3084,7 +3123,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
30843123 Ok ( ( ) )
30853124 }
30863125
3087- pub fn commitment_signed < L : Deref > ( & mut self , msg : & msgs:: CommitmentSigned , logger : & L ) -> Result < & ChannelMonitorUpdate , ChannelError >
3126+ pub fn commitment_signed < L : Deref > ( & mut self , msg : & msgs:: CommitmentSigned , logger : & L ) -> Result < Option < & ChannelMonitorUpdate > , ChannelError >
30883127 where L :: Target : Logger
30893128 {
30903129 if ( self . channel_state & ( ChannelState :: ChannelReady as u32 ) ) != ( ChannelState :: ChannelReady as u32 ) {
@@ -3284,8 +3323,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
32843323 }
32853324 log_debug ! ( logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply." ,
32863325 log_bytes!( self . channel_id) ) ;
3287- self . pending_monitor_updates . push ( monitor_update) ;
3288- return Ok ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) ;
3326+ return Ok ( self . push_ret_blockable_mon_update ( monitor_update) ) ;
32893327 }
32903328
32913329 let need_commitment_signed = if need_commitment && ( self . channel_state & ( ChannelState :: AwaitingRemoteRevoke as u32 ) ) == 0 {
@@ -3302,9 +3340,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
33023340
33033341 log_debug ! ( logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack." ,
33043342 log_bytes!( self . channel_id( ) ) , if need_commitment_signed { " our own commitment_signed and" } else { "" } ) ;
3305- self . pending_monitor_updates . push ( monitor_update) ;
33063343 self . monitor_updating_paused ( true , need_commitment_signed, false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
3307- return Ok ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) ;
3344+ return Ok ( self . push_ret_blockable_mon_update ( monitor_update ) ) ;
33083345 }
33093346
33103347 /// Public version of the below, checking relevant preconditions first.
@@ -3419,8 +3456,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
34193456 update_add_htlcs. len( ) , update_fulfill_htlcs. len( ) , update_fail_htlcs. len( ) ) ;
34203457
34213458 self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
3422- self . pending_monitor_updates . push ( monitor_update) ;
3423- ( Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) , htlcs_to_fail)
3459+ ( self . push_ret_blockable_mon_update ( monitor_update) , htlcs_to_fail)
34243460 } else {
34253461 ( None , Vec :: new ( ) )
34263462 }
@@ -3431,7 +3467,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
34313467 /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
34323468 /// generating an appropriate error *after* the channel state has been updated based on the
34333469 /// revoke_and_ack message.
3434- pub fn revoke_and_ack < L : Deref > ( & mut self , msg : & msgs:: RevokeAndACK , logger : & L ) -> Result < ( Vec < ( HTLCSource , PaymentHash ) > , & ChannelMonitorUpdate ) , ChannelError >
3470+ pub fn revoke_and_ack < L : Deref > ( & mut self , msg : & msgs:: RevokeAndACK , logger : & L ) -> Result < ( Vec < ( HTLCSource , PaymentHash ) > , Option < & ChannelMonitorUpdate > ) , ChannelError >
34353471 where L :: Target : Logger ,
34363472 {
34373473 if ( self . channel_state & ( ChannelState :: ChannelReady as u32 ) ) != ( ChannelState :: ChannelReady as u32 ) {
@@ -3628,21 +3664,19 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
36283664 self . monitor_pending_failures . append ( & mut revoked_htlcs) ;
36293665 self . monitor_pending_finalized_fulfills . append ( & mut finalized_claimed_htlcs) ;
36303666 log_debug ! ( logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply." , log_bytes!( self . channel_id( ) ) ) ;
3631- self . pending_monitor_updates . push ( monitor_update) ;
3632- return Ok ( ( Vec :: new ( ) , self . pending_monitor_updates . last ( ) . unwrap ( ) ) ) ;
3667+ return Ok ( ( Vec :: new ( ) , self . push_ret_blockable_mon_update ( monitor_update) ) ) ;
36333668 }
36343669
36353670 match self . free_holding_cell_htlcs ( logger) {
36363671 ( Some ( _) , htlcs_to_fail) => {
3637- let mut additional_update = self . pending_monitor_updates . pop ( ) . unwrap ( ) ;
3672+ let mut additional_update = self . pending_monitor_updates . pop ( ) . unwrap ( ) . update ;
36383673 // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
36393674 // strictly increasing by one, so decrement it here.
36403675 self . latest_monitor_update_id = monitor_update. update_id ;
36413676 monitor_update. updates . append ( & mut additional_update. updates ) ;
36423677
36433678 self . monitor_updating_paused ( false , true , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3644- self . pending_monitor_updates . push ( monitor_update) ;
3645- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3679+ Ok ( ( htlcs_to_fail, self . push_ret_blockable_mon_update ( monitor_update) ) )
36463680 } ,
36473681 ( None , htlcs_to_fail) => {
36483682 if require_commitment {
@@ -3656,13 +3690,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
36563690 log_debug ! ( logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed." ,
36573691 log_bytes!( self . channel_id( ) ) , update_fail_htlcs. len( ) + update_fail_malformed_htlcs. len( ) ) ;
36583692 self . monitor_updating_paused ( false , true , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3659- self . pending_monitor_updates . push ( monitor_update) ;
3660- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3693+ Ok ( ( htlcs_to_fail, self . push_ret_blockable_mon_update ( monitor_update) ) )
36613694 } else {
36623695 log_debug ! ( logger, "Received a valid revoke_and_ack for channel {} with no reply necessary." , log_bytes!( self . channel_id( ) ) ) ;
36633696 self . monitor_updating_paused ( false , false , false , to_forward_infos, revoked_htlcs, finalized_claimed_htlcs) ;
3664- self . pending_monitor_updates . push ( monitor_update) ;
3665- Ok ( ( htlcs_to_fail, self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
3697+ Ok ( ( htlcs_to_fail, self . push_ret_blockable_mon_update ( monitor_update) ) )
36663698 }
36673699 }
36683700 }
@@ -3851,7 +3883,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
38513883 {
38523884 assert_eq ! ( self . channel_state & ChannelState :: MonitorUpdateInProgress as u32 , ChannelState :: MonitorUpdateInProgress as u32 ) ;
38533885 self . channel_state &= !( ChannelState :: MonitorUpdateInProgress as u32 ) ;
3854- self . pending_monitor_updates . clear ( ) ;
3886+ let mut found_blocked = false ;
3887+ self . pending_monitor_updates . retain ( |upd| {
3888+ if found_blocked { debug_assert ! ( upd. blocked, "No mons may be unblocked after a blocked one" ) ; }
3889+ if upd. blocked { found_blocked = true ; }
3890+ upd. blocked
3891+ } ) ;
38553892
38563893 // If we're past (or at) the FundingSent stage on an outbound channel, try to
38573894 // (re-)broadcast the funding transaction as we may have declined to broadcast it when we
@@ -4392,8 +4429,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
43924429 } ] ,
43934430 } ;
43944431 self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
4395- self . pending_monitor_updates . push ( monitor_update) ;
4396- Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) )
4432+ if self . push_blockable_mon_update ( monitor_update) {
4433+ self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update )
4434+ } else { None }
43974435 } else { None } ;
43984436 let shutdown = if send_shutdown {
43994437 Some ( msgs:: Shutdown {
@@ -4965,8 +5003,49 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
49655003 ( self . channel_state & ChannelState :: MonitorUpdateInProgress as u32 ) != 0
49665004 }
49675005
4968- pub fn get_next_monitor_update ( & self ) -> Option < & ChannelMonitorUpdate > {
4969- self . pending_monitor_updates . first ( )
5006+ pub fn get_latest_complete_monitor_update_id ( & self ) -> u64 {
5007+ if self . pending_monitor_updates . is_empty ( ) { return self . get_latest_monitor_update_id ( ) ; }
5008+ self . pending_monitor_updates [ 0 ] . update . update_id - 1
5009+ }
5010+
5011+ /// Returns the next blocked monitor update, if one exists, and a bool which indicates a
5012+ /// further blocked monitor update exists after the next.
5013+ pub fn unblock_next_blocked_monitor_update ( & mut self ) -> Option < ( & ChannelMonitorUpdate , bool ) > {
5014+ for i in 0 ..self . pending_monitor_updates . len ( ) {
5015+ if self . pending_monitor_updates [ i] . blocked {
5016+ self . pending_monitor_updates [ i] . blocked = false ;
5017+ return Some ( ( & self . pending_monitor_updates [ i] . update ,
5018+ self . pending_monitor_updates . len ( ) > i + 1 ) ) ;
5019+ }
5020+ }
5021+ None
5022+ }
5023+
5024+ /// Pushes a new monitor update into our monitor update queue, returning whether it should be
5025+ /// immediately given to the user for persisting or if it should be held as blocked.
5026+ fn push_blockable_mon_update ( & mut self , update : ChannelMonitorUpdate ) -> bool {
5027+ let release_monitor = self . pending_monitor_updates . iter ( ) . all ( |upd| !upd. blocked ) ;
5028+ self . pending_monitor_updates . push ( PendingChannelMonitorUpdate {
5029+ update, blocked : !release_monitor
5030+ } ) ;
5031+ release_monitor
5032+ }
5033+
5034+ /// Pushes a new monitor update into our monitor update queue, returning a reference to it if
5035+ /// it should be immediately given to the user for persisting or `None` if it should be held as
5036+ /// blocked.
5037+ fn push_ret_blockable_mon_update ( & mut self , update : ChannelMonitorUpdate )
5038+ -> Option < & ChannelMonitorUpdate > {
5039+ let release_monitor = self . push_blockable_mon_update ( update) ;
5040+ if release_monitor { self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update ) } else { None }
5041+ }
5042+
5043+ pub fn no_monitor_updates_pending ( & self ) -> bool {
5044+ self . pending_monitor_updates . is_empty ( )
5045+ }
5046+
5047+ pub fn complete_one_mon_update ( & mut self , update_id : u64 ) {
5048+ self . pending_monitor_updates . retain ( |upd| upd. update . update_id != update_id) ;
49705049 }
49715050
49725051 /// Returns true if funding_created was sent/received.
@@ -6009,8 +6088,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
60096088 Some ( _) => {
60106089 let monitor_update = self . build_commitment_no_status_check ( logger) ;
60116090 self . monitor_updating_paused ( false , true , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
6012- self . pending_monitor_updates . push ( monitor_update) ;
6013- Ok ( Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) ) )
6091+ Ok ( self . push_ret_blockable_mon_update ( monitor_update) )
60146092 } ,
60156093 None => Ok ( None )
60166094 }
@@ -6112,8 +6190,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
61126190 } ] ,
61136191 } ;
61146192 self . monitor_updating_paused ( false , false , false , Vec :: new ( ) , Vec :: new ( ) , Vec :: new ( ) ) ;
6115- self . pending_monitor_updates . push ( monitor_update) ;
6116- Some ( self . pending_monitor_updates . last ( ) . unwrap ( ) )
6193+ if self . push_blockable_mon_update ( monitor_update) {
6194+ self . pending_monitor_updates . last ( ) . map ( |upd| & upd. update )
6195+ } else { None }
61176196 } else { None } ;
61186197 let shutdown = msgs:: Shutdown {
61196198 channel_id : self . channel_id ,
@@ -6550,6 +6629,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for Channel<Signer> {
65506629 ( 28 , holder_max_accepted_htlcs, option) ,
65516630 ( 29 , self . temporary_channel_id, option) ,
65526631 ( 31 , channel_pending_event_emitted, option) ,
6632+ ( 33 , self . pending_monitor_updates, vec_type) ,
65536633 } ) ;
65546634
65556635 Ok ( ( ) )
@@ -6826,6 +6906,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
68266906 let mut temporary_channel_id: Option < [ u8 ; 32 ] > = None ;
68276907 let mut holder_max_accepted_htlcs: Option < u16 > = None ;
68286908
6909+ let mut pending_monitor_updates = Some ( Vec :: new ( ) ) ;
6910+
68296911 read_tlv_fields ! ( reader, {
68306912 ( 0 , announcement_sigs, option) ,
68316913 ( 1 , minimum_depth, option) ,
@@ -6848,6 +6930,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
68486930 ( 28 , holder_max_accepted_htlcs, option) ,
68496931 ( 29 , temporary_channel_id, option) ,
68506932 ( 31 , channel_pending_event_emitted, option) ,
6933+ ( 33 , pending_monitor_updates, vec_type) ,
68516934 } ) ;
68526935
68536936 let ( channel_keys_id, holder_signer) = if let Some ( channel_keys_id) = channel_keys_id {
@@ -7017,7 +7100,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
70177100 channel_type : channel_type. unwrap ( ) ,
70187101 channel_keys_id,
70197102
7020- pending_monitor_updates : Vec :: new ( ) ,
7103+ pending_monitor_updates : pending_monitor_updates . unwrap ( ) ,
70217104 } )
70227105 }
70237106}
0 commit comments