@@ -36,7 +36,7 @@ use crate::ln::types::ChannelId;
3636use crate :: sign:: { ecdsa:: EcdsaChannelSigner , EntropySource , SignerProvider } ;
3737use crate :: sync:: Mutex ;
3838use crate :: util:: async_poll:: {
39- dummy_waker, MaybeSend , MaybeSync , MultiResultFuturePoller , ResultFuture ,
39+ dummy_waker, MaybeSend , MaybeSync , MultiResultFuturePoller , ResultFuture , TwoFutureJoiner ,
4040} ;
4141use crate :: util:: logger:: Logger ;
4242use crate :: util:: native_async:: FutureSpawner ;
@@ -576,15 +576,6 @@ fn poll_sync_future<F: Future>(future: F) -> F::Output {
576576/// list channel monitors themselves and load channels individually using
577577/// [`MonitorUpdatingPersister::read_channel_monitor_with_updates`].
578578///
579- /// ## EXTREMELY IMPORTANT
580- ///
581- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
582- /// [`io::ErrorKind::NotFound`] variant correctly: that is, when a file is not found, and _only_ in
583- /// that circumstance (not when there is really a permissions error, for example). This is because
584- /// neither channel monitor reading function lists updates. Instead, either reads the monitor, and
585- /// using its stored `update_id`, synthesizes update storage keys, and tries them in sequence until
586- /// one is not found. All _other_ errors will be bubbled up in the function's [`Result`].
587- ///
588579/// # Pruning stale channel updates
589580///
590581/// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`.
@@ -658,10 +649,6 @@ where
658649 }
659650
660651 /// Reads all stored channel monitors, along with any stored updates for them.
661- ///
662- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
663- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
664- /// documentation for [`MonitorUpdatingPersister`].
665652 pub fn read_all_channel_monitors_with_updates (
666653 & self ,
667654 ) -> Result <
@@ -673,10 +660,6 @@ where
673660
674661 /// Read a single channel monitor, along with any stored updates for it.
675662 ///
676- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
677- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
678- /// documentation for [`MonitorUpdatingPersister`].
679- ///
680663 /// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
681664 /// underscore `_` between txid and index for v1 channels. For example, given:
682665 ///
@@ -877,10 +860,6 @@ where
877860 /// If you can move this object into an `Arc`, consider using
878861 /// [`Self::read_all_channel_monitors_with_updates_parallel`] to parallelize the CPU-bound
879862 /// deserialization as well.
880- ///
881- /// It is extremely important that your [`KVStore::read`] implementation uses the
882- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
883- /// documentation for [`MonitorUpdatingPersister`].
884863 pub async fn read_all_channel_monitors_with_updates (
885864 & self ,
886865 ) -> Result <
@@ -915,10 +894,6 @@ where
915894 /// Because [`FutureSpawner`] requires that the spawned future be `'static` (matching `tokio`
916895 /// and other multi-threaded runtime requirements), this method requires that `self` be an
917896 /// `Arc` that can live for `'static` and be sent and accessed across threads.
918- ///
919- /// It is extremely important that your [`KVStore::read`] implementation uses the
920- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
921- /// documentation for [`MonitorUpdatingPersister`].
922897 pub async fn read_all_channel_monitors_with_updates_parallel (
923898 self : & Arc < Self > ,
924899 ) -> Result <
@@ -959,10 +934,6 @@ where
959934
960935 /// Read a single channel monitor, along with any stored updates for it.
961936 ///
962- /// It is extremely important that your [`KVStoreSync::read`] implementation uses the
963- /// [`io::ErrorKind::NotFound`] variant correctly. For more information, please see the
964- /// documentation for [`MonitorUpdatingPersister`].
965- ///
966937 /// For `monitor_key`, channel storage keys can be the channel's funding [`OutPoint`], with an
967938 /// underscore `_` between txid and index for v1 channels. For example, given:
968939 ///
@@ -1121,40 +1092,37 @@ where
11211092 io:: Error ,
11221093 > {
11231094 let monitor_name = MonitorName :: from_str ( monitor_key) ?;
1124- let read_res = self . maybe_read_monitor ( & monitor_name, monitor_key) . await ?;
1125- let ( block_hash, monitor) = match read_res {
1095+ let read_future = pin ! ( self . maybe_read_monitor( & monitor_name, monitor_key) ) ;
1096+ let list_future = pin ! ( self
1097+ . kv_store
1098+ . list( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , monitor_key) ) ;
1099+ let ( read_res, list_res) = TwoFutureJoiner :: new ( read_future, list_future) . await ;
1100+ let ( block_hash, monitor) = match read_res? {
11261101 Some ( res) => res,
11271102 None => return Ok ( None ) ,
11281103 } ;
1129- let mut current_update_id = monitor. get_latest_update_id ( ) ;
1130- // TODO: Parallelize this loop by speculatively reading a batch of updates
1131- loop {
1132- current_update_id = match current_update_id. checked_add ( 1 ) {
1133- Some ( next_update_id) => next_update_id,
1134- None => break ,
1135- } ;
1136- let update_name = UpdateName :: from ( current_update_id) ;
1137- let update = match self . read_monitor_update ( monitor_key, & update_name) . await {
1138- Ok ( update) => update,
1139- Err ( err) if err. kind ( ) == io:: ErrorKind :: NotFound => {
1140- // We can't find any more updates, so we are done.
1141- break ;
1142- } ,
1143- Err ( err) => return Err ( err) ,
1144- } ;
1145-
1146- monitor
1147- . update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
1148- . map_err ( |e| {
1149- log_error ! (
1150- self . logger,
1151- "Monitor update failed. monitor: {} update: {} reason: {:?}" ,
1152- monitor_key,
1153- update_name. as_str( ) ,
1154- e
1155- ) ;
1156- io:: Error :: new ( io:: ErrorKind :: Other , "Monitor update failed" )
1157- } ) ?;
1104+ let current_update_id = monitor. get_latest_update_id ( ) ;
1105+ let updates: Result < Vec < _ > , _ > =
1106+ list_res?. into_iter ( ) . map ( |name| UpdateName :: new ( name) ) . collect ( ) ;
1107+ let mut updates = updates?;
1108+ updates. sort_unstable ( ) ;
1109+ // TODO: Parallelize this loop
1110+ for update_name in updates {
1111+ if update_name. 0 > current_update_id {
1112+ let update = self . read_monitor_update ( monitor_key, & update_name) . await ?;
1113+ monitor
1114+ . update_monitor ( & update, & self . broadcaster , & self . fee_estimator , & self . logger )
1115+ . map_err ( |e| {
1116+ log_error ! (
1117+ self . logger,
1118+ "Monitor update failed. monitor: {} update: {} reason: {:?}" ,
1119+ monitor_key,
1120+ update_name. as_str( ) ,
1121+ e
1122+ ) ;
1123+ io:: Error :: new ( io:: ErrorKind :: Other , "Monitor update failed" )
1124+ } ) ?;
1125+ }
11581126 }
11591127 Ok ( Some ( ( block_hash, monitor) ) )
11601128 }
@@ -1529,7 +1497,7 @@ impl core::fmt::Display for MonitorName {
15291497/// let monitor_name = "some_monitor_name";
15301498/// let storage_key = format!("channel_monitor_updates/{}/{}", monitor_name, update_name.as_str());
15311499/// ```
1532- #[ derive( Debug ) ]
1500+ #[ derive( Debug , PartialEq , Eq , PartialOrd , Ord ) ]
15331501pub struct UpdateName ( pub u64 , String ) ;
15341502
15351503impl UpdateName {
0 commit comments