@@ -771,10 +771,9 @@ where
771
771
Vec < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) > ,
772
772
io:: Error ,
773
773
> {
774
- let monitor_list = self . 0 . kv_store . list (
775
- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
776
- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
777
- ) . await ?;
774
+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
775
+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
776
+ let monitor_list = self . 0 . kv_store . list ( primary, secondary) . await ?;
778
777
let mut res = Vec :: with_capacity ( monitor_list. len ( ) ) ;
779
778
// TODO: Parallelize this loop
780
779
for monitor_key in monitor_list {
@@ -874,11 +873,10 @@ where
874
873
& self , monitor_name : & MonitorName , monitor_key : & str ,
875
874
) -> Result < ( BlockHash , ChannelMonitor < <SP :: Target as SignerProvider >:: EcdsaSigner > ) , io:: Error >
876
875
{
877
- let mut monitor_cursor = io:: Cursor :: new ( self . kv_store . read (
878
- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
879
- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
880
- monitor_key,
881
- ) . await ?) ;
876
+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
877
+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
878
+ let monitor_bytes = self . kv_store . read ( primary, secondary, monitor_key) . await ?;
879
+ let mut monitor_cursor = io:: Cursor :: new ( monitor_bytes) ;
882
880
// Discard the sentinel bytes if found.
883
881
if monitor_cursor. get_ref ( ) . starts_with ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL ) {
884
882
monitor_cursor. set_position ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL . len ( ) as u64 ) ;
@@ -918,12 +916,9 @@ where
918
916
async fn read_monitor_update (
919
917
& self , monitor_key : & str , update_name : & UpdateName ,
920
918
) -> Result < ChannelMonitorUpdate , io:: Error > {
921
- let update_bytes = self . kv_store . read (
922
- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
923
- monitor_key,
924
- update_name. as_str ( ) ,
925
- ) . await ?;
926
- ChannelMonitorUpdate :: read ( & mut io:: Cursor :: new ( update_bytes) ) . map_err ( |e| {
919
+ let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ;
920
+ let update_bytes = self . kv_store . read ( primary, monitor_key, update_name. as_str ( ) ) . await ?;
921
+ ChannelMonitorUpdate :: read ( & mut & update_bytes[ ..] ) . map_err ( |e| {
927
922
log_error ! (
928
923
self . logger,
929
924
"Failed to read ChannelMonitorUpdate {}/{}/{}, reason: {}" ,
@@ -937,27 +932,19 @@ where
937
932
}
938
933
939
934
async fn cleanup_stale_updates ( & self , lazy : bool ) -> Result < ( ) , io:: Error > {
940
- let monitor_keys = self . kv_store . list (
941
- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
942
- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
943
- ) . await ?;
935
+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
936
+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
937
+ let monitor_keys = self . kv_store . list ( primary, secondary) . await ?;
944
938
for monitor_key in monitor_keys {
945
939
let monitor_name = MonitorName :: from_str ( & monitor_key) ?;
946
940
let ( _, current_monitor) = self . read_monitor ( & monitor_name, & monitor_key) . await ?;
947
- let updates = self
948
- . kv_store
949
- . list ( CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE , monitor_key. as_str ( ) )
950
- . await ?;
941
+ let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ;
942
+ let updates = self . kv_store . list ( primary, monitor_key. as_str ( ) ) . await ?;
951
943
for update in updates {
952
944
let update_name = UpdateName :: new ( update) ?;
953
945
// if the update_id is lower than the stored monitor, delete
954
946
if update_name. 0 <= current_monitor. get_latest_update_id ( ) {
955
- self . kv_store . remove (
956
- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
957
- monitor_key. as_str ( ) ,
958
- update_name. as_str ( ) ,
959
- lazy,
960
- ) . await ?;
947
+ self . kv_store . remove ( primary, & monitor_key, update_name. as_str ( ) , lazy) . await ?;
961
948
}
962
949
}
963
950
}
@@ -980,12 +967,9 @@ where
980
967
monitor_bytes. extend_from_slice ( MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL ) ;
981
968
}
982
969
monitor. write ( & mut monitor_bytes) . unwrap ( ) ;
983
- self . kv_store . write (
984
- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
985
- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
986
- monitor_key. as_str ( ) ,
987
- monitor_bytes,
988
- ) . await
970
+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
971
+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
972
+ self . kv_store . write ( primary, secondary, monitor_key. as_str ( ) , monitor_bytes) . await
989
973
}
990
974
991
975
async fn update_persisted_channel < ChannelSigner : EcdsaChannelSigner > (
@@ -1000,12 +984,10 @@ where
1000
984
if persist_update {
1001
985
let monitor_key = monitor_name. to_string ( ) ;
1002
986
let update_name = UpdateName :: from ( update. update_id ) ;
1003
- self . kv_store . write (
1004
- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
1005
- monitor_key. as_str ( ) ,
1006
- update_name. as_str ( ) ,
1007
- update. encode ( ) ,
1008
- ) . await
987
+ let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ;
988
+ self . kv_store
989
+ . write ( primary, & monitor_key, update_name. as_str ( ) , update. encode ( ) )
990
+ . await
1009
991
} else {
1010
992
// In case of channel-close monitor update, we need to read old monitor before persisting
1011
993
// the new one in order to determine the cleanup range.
@@ -1059,34 +1041,25 @@ where
1059
1041
Ok ( ( _block_hash, monitor) ) => monitor,
1060
1042
Err ( _) => return ,
1061
1043
} ;
1062
- match self . kv_store . write (
1063
- ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
1064
- ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
1065
- monitor_key. as_str ( ) ,
1066
- monitor. encode ( ) ,
1067
- ) . await {
1044
+ let primary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
1045
+ let secondary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
1046
+ match self . kv_store . write ( primary, secondary, & monitor_key, monitor. encode ( ) ) . await {
1068
1047
Ok ( ( ) ) => { } ,
1069
1048
Err ( _e) => return ,
1070
1049
} ;
1071
- let _ = self . kv_store . remove (
1072
- CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ,
1073
- CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ,
1074
- monitor_key. as_str ( ) ,
1075
- true ,
1076
- ) . await ;
1050
+ let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE ;
1051
+ let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE ;
1052
+ let _ = self . kv_store . remove ( primary, secondary, & monitor_key, true ) . await ;
1077
1053
}
1078
1054
1079
1055
// Cleans up monitor updates for given monitor in range `start..=end`.
1080
1056
async fn cleanup_in_range ( & self , monitor_name : MonitorName , start : u64 , end : u64 ) {
1081
1057
let monitor_key = monitor_name. to_string ( ) ;
1082
1058
for update_id in start..=end {
1083
1059
let update_name = UpdateName :: from ( update_id) ;
1084
- if let Err ( e) = self . kv_store . remove (
1085
- CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ,
1086
- monitor_key. as_str ( ) ,
1087
- update_name. as_str ( ) ,
1088
- true ,
1089
- ) . await {
1060
+ let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE ;
1061
+ let res = self . kv_store . remove ( primary, & monitor_key, update_name. as_str ( ) , true ) . await ;
1062
+ if let Err ( e) = res {
1090
1063
log_error ! (
1091
1064
self . logger,
1092
1065
"Failed to clean up channel monitor updates for monitor {}, reason: {}" ,
0 commit comments