@@ -771,10 +771,9 @@ where
771771 Vec < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) > ,
772772 io:: Error ,
773773 > {
774- let monitor_list = self . 0 . kv_store . list (
775- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
776- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
777- ) . await ?;
774+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
775+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
776+ let monitor_list = self . 0 . kv_store . list ( primary, secondary) . await ?;
778777 let mut res = Vec :: with_capacity ( monitor_list. len ( ) ) ;
779778 // TODO: Parallelize this loop
780779 for monitor_key in monitor_list {
@@ -874,11 +873,10 @@ where
874873 & self , monitor_name : & MonitorName , monitor_key : & str ,
875874 ) -> Result < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) , io:: Error >
876875 {
877- let mut monitor_cursor = io:: Cursor :: new ( self . kv_store . read (
878- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
879- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
880- monitor_key,
881- ) . await ?) ;
876+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
877+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
878+ let monitor_bytes = self . kv_store . read ( primary, secondary, monitor_key) . await ?;
879+ let mut monitor_cursor = io:: Cursor :: new ( monitor_bytes) ;
882880 // Discard the sentinel bytes if found.
883881 if monitor_cursor. get_ref ( ) . starts_with ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL ) {
884882 monitor_cursor. set_position ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL . len ( ) as u64 ) ;
@@ -918,12 +916,9 @@ where
918916 async fn read_monitor_update (
919917 & self , monitor_key : & str , update_name : & UpdateName ,
920918 ) -> Result < ChannelMonitorUpdate , io:: Error > {
921- let update_bytes = self . kv_store . read (
922- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
923- monitor_key,
924- update_name. as_str ( ) ,
925- ) . await ?;
926- ChannelMonitorUpdate :: read ( & mut io:: Cursor :: new ( update_bytes) ) . map_err ( |e| {
919+ let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ;
920+ let update_bytes = self . kv_store . read ( primary, monitor_key, update_name. as_str ( ) ) . await ?;
921+ ChannelMonitorUpdate :: read ( & mut & update_bytes[ ..] ) . map_err ( |e| {
927922 log_error ! (
928923 self . logger,
929924 "Failed to read ChannelMonitorUpdate {}/{}/{}, reason: {}" ,
@@ -937,27 +932,19 @@ where
937932 }
938933
939934 async fn cleanup_stale_updates ( & self , lazy : bool ) -> Result < ( ) , io:: Error > {
940- let monitor_keys = self . kv_store . list (
941- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
942- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
943- ) . await ?;
935+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
936+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
937+ let monitor_keys = self . kv_store . list ( primary, secondary) . await ?;
944938 for monitor_key in monitor_keys {
945939 let monitor_name = MonitorName :: from_str ( & monitor_key) ?;
946940 let ( _, current_monitor) = self . read_monitor ( & monitor_name, & monitor_key) . await ?;
947- let updates = self
948- . kv_store
949- . list ( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , monitor_key. as_str ( ) )
950- . await ?;
941+ let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ;
942+ let updates = self . kv_store . list ( primary, monitor_key. as_str ( ) ) . await ?;
951943 for update in updates {
952944 let update_name = UpdateName :: new ( update) ?;
953945 // if the update_id is lower than the stored monitor, delete
954946 if update_name. 0 <= current_monitor. get_latest_update_id ( ) {
955- self . kv_store . remove (
956- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
957- monitor_key. as_str ( ) ,
958- update_name. as_str ( ) ,
959- lazy,
960- ) . await ?;
947+ self . kv_store . remove ( primary, & monitor_key, update_name. as_str ( ) , lazy) . await ?;
961948 }
962949 }
963950 }
@@ -980,12 +967,9 @@ where
980967 monitor_bytes. extend_from_slice ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL ) ;
981968 }
982969 monitor. write ( & mut monitor_bytes) . unwrap ( ) ;
983- self . kv_store . write (
984- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
985- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
986- monitor_key. as_str ( ) ,
987- monitor_bytes,
988- ) . await
970+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
971+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
972+ self . kv_store . write ( primary, secondary, monitor_key. as_str ( ) , monitor_bytes) . await
989973 }
990974
991975 async fn update_persisted_channel < ChannelSigner : EcdsaChannelSigner > (
@@ -1000,12 +984,10 @@ where
1000984 if persist_update {
1001985 let monitor_key = monitor_name. to_string ( ) ;
1002986 let update_name = UpdateName :: from ( update. update_id ) ;
1003- self . kv_store . write (
1004- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
1005- monitor_key. as_str ( ) ,
1006- update_name. as_str ( ) ,
1007- update. encode ( ) ,
1008- ) . await
987+ let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ;
988+ self . kv_store
989+ . write ( primary, & monitor_key, update_name. as_str ( ) , update. encode ( ) )
990+ . await
1009991 } else {
1010992 // In case of channel-close monitor update, we need to read old monitor before persisting
1011993 // the new one in order to determine the cleanup range.
@@ -1059,34 +1041,25 @@ where
10591041 Ok ( ( _block_hash, monitor) ) => monitor,
10601042 Err ( _) => return ,
10611043 } ;
1062- match self . kv_store . write (
1063- ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
1064- ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
1065- monitor_key. as_str ( ) ,
1066- monitor. encode ( ) ,
1067- ) . await {
1044+ let primary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
1045+ let secondary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
1046+ match self . kv_store . write ( primary, secondary, & monitor_key, monitor. encode ( ) ) . await {
10681047 Ok ( ( ) ) => { } ,
10691048 Err ( _e) => return ,
10701049 } ;
1071- let _ = self . kv_store . remove (
1072- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
1073- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
1074- monitor_key. as_str ( ) ,
1075- true ,
1076- ) . await ;
1050+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
1051+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
1052+ let _ = self . kv_store . remove ( primary, secondary, & monitor_key, true ) . await ;
10771053 }
10781054
10791055 // Cleans up monitor updates for given monitor in range `start..=end`.
10801056 async fn cleanup_in_range ( & self , monitor_name : MonitorName , start : u64 , end : u64 ) {
10811057 let monitor_key = monitor_name. to_string ( ) ;
10821058 for update_id in start..=end {
10831059 let update_name = UpdateName :: from ( update_id) ;
1084- if let Err ( e) = self . kv_store . remove (
1085- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
1086- monitor_key. as_str ( ) ,
1087- update_name. as_str ( ) ,
1088- true ,
1089- ) . await {
1060+ let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ;
1061+ let res = self . kv_store . remove ( primary, & monitor_key, update_name. as_str ( ) , true ) . await ;
1062+ if let Err ( e) = res {
10901063 log_error ! (
10911064 self . logger,
10921065 "Failed to clean up channel monitor updates for monitor {}, reason: {}" ,
0 commit comments