@@ -346,9 +346,10 @@ where
346346/// 
347347/// # Pruning stale channel updates 
348348/// 
349- /// Stale updates are pruned when a full monitor is written. The old monitor is first read, and if 
350- /// that succeeds, updates in the range between the old and new monitors are deleted. The `lazy` 
351- /// flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions 
349+ /// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`. 
350+ /// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates` 
351+ /// are deleted. 
352+ /// The `lazy` flag is used on the [`KVStore::remove`] method, so there are no guarantees that the deletions 
352353/// will complete. However, stale updates are not a problem for data integrity, since updates are 
353354/// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`. 
354355/// 
@@ -610,24 +611,6 @@ where
610611	)  -> chain:: ChannelMonitorUpdateStatus  { 
611612		// Determine the proper key for this monitor 
612613		let  monitor_name = MonitorName :: from ( funding_txo) ; 
613- 		let  maybe_old_monitor = self . read_monitor ( & monitor_name) ; 
614- 		match  maybe_old_monitor { 
615- 			Ok ( ( _,  ref  old_monitor) )  => { 
616- 				// Check that this key isn't already storing a monitor with a higher update_id 
617- 				// (collision) 
618- 				if  old_monitor. get_latest_update_id ( )  > monitor. get_latest_update_id ( )  { 
619- 					log_error ! ( 
620- 						self . logger, 
621- 						"Tried to write a monitor at the same outpoint {} with a higher update_id!" , 
622- 						monitor_name. as_str( ) 
623- 					) ; 
624- 					return  chain:: ChannelMonitorUpdateStatus :: UnrecoverableError ; 
625- 				} 
626- 			} 
627- 			// This means the channel monitor is new. 
628- 			Err ( ref  e)  if  e. kind ( )  == io:: ErrorKind :: NotFound  => { } 
629- 			_ => return  chain:: ChannelMonitorUpdateStatus :: UnrecoverableError , 
630- 		} 
631614		// Serialize and write the new monitor 
632615		let  mut  monitor_bytes = Vec :: with_capacity ( 
633616			MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL . len ( )  + monitor. serialized_length ( ) , 
@@ -641,65 +624,12 @@ where
641624			& monitor_bytes, 
642625		)  { 
643626			Ok ( _)  => { 
644- 				// Assess cleanup. Typically, we'll clean up only between the last two known full 
645- 				// monitors. 
646- 				if  let  Ok ( ( _,  old_monitor) )  = maybe_old_monitor { 
647- 					let  start = old_monitor. get_latest_update_id ( ) ; 
648- 					let  end = if  monitor. get_latest_update_id ( )  == CLOSED_CHANNEL_UPDATE_ID  { 
649- 						// We don't want to clean the rest of u64, so just do possible pending 
650- 						// updates. Note that we never write updates at 
651- 						// `CLOSED_CHANNEL_UPDATE_ID`. 
652- 						cmp:: min ( 
653- 							start. saturating_add ( self . maximum_pending_updates ) , 
654- 							CLOSED_CHANNEL_UPDATE_ID  - 1 , 
655- 						) 
656- 					}  else  { 
657- 						monitor. get_latest_update_id ( ) . saturating_sub ( 1 ) 
658- 					} ; 
659- 					// We should bother cleaning up only if there's at least one update 
660- 					// expected. 
661- 					for  update_id in  start..=end { 
662- 						let  update_name = UpdateName :: from ( update_id) ; 
663- 						#[ cfg( debug_assertions) ]  
664- 						{ 
665- 							if  let  Ok ( update)  =
666- 								self . read_monitor_update ( & monitor_name,  & update_name) 
667- 							{ 
668- 								// Assert that we are reading what we think we are. 
669- 								debug_assert_eq ! ( update. update_id,  update_name. 0 ) ; 
670- 							}  else  if  update_id != start && monitor. get_latest_update_id ( )  != CLOSED_CHANNEL_UPDATE_ID 
671- 							{ 
672- 								// We're deleting something we should know doesn't exist. 
673- 								panic ! ( 
674- 									"failed to read monitor update {}" , 
675- 									update_name. as_str( ) 
676- 								) ; 
677- 							} 
678- 							// On closed channels, we will unavoidably try to read 
679- 							// non-existent updates since we have to guess at the range of 
680- 							// stale updates, so do nothing. 
681- 						} 
682- 						if  let  Err ( e)  = self . kv_store . remove ( 
683- 							CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , 
684- 							monitor_name. as_str ( ) , 
685- 							update_name. as_str ( ) , 
686- 							true , 
687- 						)  { 
688- 							log_error ! ( 
689- 								self . logger, 
690- 								"error cleaning up channel monitor updates for monitor {}, reason: {}" , 
691- 								monitor_name. as_str( ) , 
692- 								e
693- 							) ; 
694- 						} ; 
695- 					} 
696- 				} ; 
697627				chain:: ChannelMonitorUpdateStatus :: Completed 
698628			} 
699629			Err ( e)  => { 
700630				log_error ! ( 
701631					self . logger, 
702- 					"error writing channel monitor  {}/{}/{} reason: {}" , 
632+ 					"Failed to write ChannelMonitor  {}/{}/{} reason: {}" , 
703633					CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE , 
704634					CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE , 
705635					monitor_name. as_str( ) , 
@@ -741,7 +671,7 @@ where
741671					Err ( e)  => { 
742672						log_error ! ( 
743673							self . logger, 
744- 							"error writing channel monitor update  {}/{}/{} reason: {}" , 
674+ 							"Failed to write ChannelMonitorUpdate  {}/{}/{} reason: {}" , 
745675							CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , 
746676							monitor_name. as_str( ) , 
747677							update_name. as_str( ) , 
@@ -751,8 +681,41 @@ where
751681					} 
752682				} 
753683			}  else  { 
754- 				// We could write this update, but it meets criteria of our design that call for a full monitor write. 
755- 				self . persist_new_channel ( funding_txo,  monitor,  monitor_update_call_id) 
684+ 				let  monitor_name = MonitorName :: from ( funding_txo) ; 
685+ 				// In case of channel-close monitor update, we need to read old monitor before persisting 
686+ 				// the new one in order to determine the cleanup range. 
687+ 				let  maybe_old_monitor = match  monitor. get_latest_update_id ( )  { 
688+ 					CLOSED_CHANNEL_UPDATE_ID  => self . read_monitor ( & monitor_name) . ok ( ) , 
689+ 					_ => None 
690+ 				} ; 
691+ 
692+ 				// We could write this update, but it meets criteria of our design that calls for a full monitor write. 
693+ 				let  monitor_update_status = self . persist_new_channel ( funding_txo,  monitor,  monitor_update_call_id) ; 
694+ 
695+ 				if  let  chain:: ChannelMonitorUpdateStatus :: Completed  = monitor_update_status { 
696+ 					let  cleanup_range = if  monitor. get_latest_update_id ( )  == CLOSED_CHANNEL_UPDATE_ID  { 
697+ 						// If there is an error while reading old monitor, we skip clean up. 
698+ 						maybe_old_monitor. map ( |( _,  ref  old_monitor) | { 
699+ 							let  start = old_monitor. get_latest_update_id ( ) ; 
700+ 							// We never persist an update with update_id = CLOSED_CHANNEL_UPDATE_ID 
701+ 							let  end = cmp:: min ( 
702+ 								start. saturating_add ( self . maximum_pending_updates ) , 
703+ 								CLOSED_CHANNEL_UPDATE_ID  - 1 , 
704+ 							) ; 
705+ 							( start,  end) 
706+ 						} ) 
707+ 					}  else  { 
708+ 						let  end = monitor. get_latest_update_id ( ) ; 
709+ 						let  start = end. saturating_sub ( self . maximum_pending_updates ) ; 
710+ 						Some ( ( start,  end) ) 
711+ 					} ; 
712+ 
713+ 					if  let  Some ( ( start,  end) )  = cleanup_range { 
714+ 						self . cleanup_in_range ( monitor_name,  start,  end) ; 
715+ 					} 
716+ 				} 
717+ 
718+ 				monitor_update_status
756719			} 
757720		}  else  { 
758721			// There is no update given, so we must persist a new monitor. 
@@ -761,6 +724,34 @@ where
761724	} 
762725} 
763726
727+ impl < K :  Deref ,  L :  Deref ,  ES :  Deref ,  SP :  Deref >  MonitorUpdatingPersister < K ,  L ,  ES ,  SP > 
728+ where 
729+ 	ES :: Target :  EntropySource  + Sized , 
730+ 	K :: Target :  KVStore , 
731+ 	L :: Target :  Logger , 
732+ 	SP :: Target :  SignerProvider  + Sized 
733+ { 
734+ 	// Cleans up monitor updates for given monitor in range `start..=end`. 
735+ 	fn  cleanup_in_range ( & self ,  monitor_name :  MonitorName ,  start :  u64 ,  end :  u64 )  { 
736+ 		for  update_id in  start..=end { 
737+ 			let  update_name = UpdateName :: from ( update_id) ; 
738+ 			if  let  Err ( e)  = self . kv_store . remove ( 
739+ 				CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , 
740+ 				monitor_name. as_str ( ) , 
741+ 				update_name. as_str ( ) , 
742+ 				true , 
743+ 			)  { 
744+ 				log_error ! ( 
745+ 					self . logger, 
746+ 					"Failed to clean up channel monitor updates for monitor {}, reason: {}" , 
747+ 					monitor_name. as_str( ) , 
748+ 					e
749+ 				) ; 
750+ 			} ; 
751+ 		} 
752+ 	} 
753+ } 
754+ 
764755/// A struct representing a name for a monitor. 
765756#[ derive( Debug ) ]  
766757struct  MonitorName ( String ) ; 
@@ -896,20 +887,21 @@ mod tests {
896887	#[ test]  
897888	fn  persister_with_real_monitors ( )  { 
898889		// This value is used later to limit how many iterations we perform. 
899- 		let  test_max_pending_updates = 7 ; 
890+ 		let  persister_0_max_pending_updates = 7 ; 
891+ 		// Intentionally set this to a smaller value to test a different alignment. 
892+ 		let  persister_1_max_pending_updates = 3 ; 
900893		let  chanmon_cfgs = create_chanmon_cfgs ( 4 ) ; 
901894		let  persister_0 = MonitorUpdatingPersister  { 
902895			kv_store :  & TestStore :: new ( false ) , 
903896			logger :  & TestLogger :: new ( ) , 
904- 			maximum_pending_updates :  test_max_pending_updates , 
897+ 			maximum_pending_updates :  persister_0_max_pending_updates , 
905898			entropy_source :  & chanmon_cfgs[ 0 ] . keys_manager , 
906899			signer_provider :  & chanmon_cfgs[ 0 ] . keys_manager , 
907900		} ; 
908901		let  persister_1 = MonitorUpdatingPersister  { 
909902			kv_store :  & TestStore :: new ( false ) , 
910903			logger :  & TestLogger :: new ( ) , 
911- 			// Intentionally set this to a smaller value to test a different alignment. 
912- 			maximum_pending_updates :  3 , 
904+ 			maximum_pending_updates :  persister_1_max_pending_updates, 
913905			entropy_source :  & chanmon_cfgs[ 1 ] . keys_manager , 
914906			signer_provider :  & chanmon_cfgs[ 1 ] . keys_manager , 
915907		} ; 
@@ -934,7 +926,6 @@ mod tests {
934926		node_cfgs[ 1 ] . chain_monitor  = chain_mon_1; 
935927		let  node_chanmgrs = create_node_chanmgrs ( 2 ,  & node_cfgs,  & [ None ,  None ] ) ; 
936928		let  nodes = create_network ( 2 ,  & node_cfgs,  & node_chanmgrs) ; 
937- 
938929		let  broadcaster_0 = & chanmon_cfgs[ 2 ] . tx_broadcaster ; 
939930		let  broadcaster_1 = & chanmon_cfgs[ 3 ] . tx_broadcaster ; 
940931
@@ -957,10 +948,11 @@ mod tests {
957948				for  ( _,  mon)  in persisted_chan_data_0. iter( )  { 
958949					// check that when we read it, we got the right update id 
959950					assert_eq!( mon. get_latest_update_id( ) ,  $expected_update_id) ; 
960- 					// if the CM is at the correct update id without updates, ensure no updates are stored 
951+ 
952+ 					// if the CM is at consolidation threshold, ensure no updates are stored. 
961953					let  monitor_name = MonitorName :: from( mon. get_funding_txo( ) . 0 ) ; 
962- 					let   ( _ ,  cm_0 )  = persister_0 . read_monitor ( & monitor_name ) . unwrap ( ) ; 
963- 					if  cm_0 . get_latest_update_id( )  == $expected_update_id  { 
954+ 					if  mon . get_latest_update_id ( )  % persister_0_max_pending_updates ==  0 
955+ 							|| mon . get_latest_update_id( )  == CLOSED_CHANNEL_UPDATE_ID  { 
964956						assert_eq!( 
965957							persister_0. kv_store. list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , 
966958								monitor_name. as_str( ) ) . unwrap( ) . len( ) , 
@@ -975,8 +967,9 @@ mod tests {
975967				for  ( _,  mon)  in persisted_chan_data_1. iter( )  { 
976968					assert_eq!( mon. get_latest_update_id( ) ,  $expected_update_id) ; 
977969					let  monitor_name = MonitorName :: from( mon. get_funding_txo( ) . 0 ) ; 
978- 					let  ( _,  cm_1)  = persister_1. read_monitor( & monitor_name) . unwrap( ) ; 
979- 					if  cm_1. get_latest_update_id( )  == $expected_update_id { 
970+ 					// if the CM is at consolidation threshold, ensure no updates are stored. 
971+ 					if  mon. get_latest_update_id( )  % persister_1_max_pending_updates == 0 
972+ 							|| mon. get_latest_update_id( )  == CLOSED_CHANNEL_UPDATE_ID  { 
980973						assert_eq!( 
981974							persister_1. kv_store. list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , 
982975								monitor_name. as_str( ) ) . unwrap( ) . len( ) , 
@@ -1001,7 +994,7 @@ mod tests {
1001994		// Send a few more payments to try all the alignments of max pending updates with 
1002995		// updates for a payment sent and received. 
1003996		let  mut  sender = 0 ; 
1004- 		for  i in  3 ..=test_max_pending_updates  *  2  { 
997+ 		for  i in  3 ..=persister_0_max_pending_updates  *  2  { 
1005998			let  receiver; 
1006999			if  sender == 0  { 
10071000				sender = 1 ; 
0 commit comments