@@ -17,6 +17,9 @@ use crate::lsps5::msgs::{
17
17
SetWebhookRequest , SetWebhookResponse , WebhookNotification , WebhookNotificationMethod ,
18
18
} ;
19
19
use crate :: message_queue:: MessageQueue ;
20
+ use crate :: persist:: {
21
+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE , LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
22
+ } ;
20
23
use crate :: prelude:: * ;
21
24
use crate :: sync:: { Arc , Mutex , RwLock , RwLockWriteGuard } ;
22
25
use crate :: utils:: time:: TimeProvider ;
@@ -28,10 +31,15 @@ use lightning::ln::channelmanager::AChannelManager;
28
31
use lightning:: ln:: msgs:: { ErrorAction , LightningError } ;
29
32
use lightning:: sign:: NodeSigner ;
30
33
use lightning:: util:: logger:: Level ;
34
+ use lightning:: util:: persist:: KVStore ;
35
+ use lightning:: util:: ser:: Writeable ;
31
36
37
+ use core:: future:: Future ;
32
38
use core:: ops:: Deref ;
39
+ use core:: pin:: Pin ;
33
40
use core:: time:: Duration ;
34
41
42
+ use alloc:: boxed:: Box ;
35
43
use alloc:: string:: String ;
36
44
use alloc:: vec:: Vec ;
37
45
@@ -131,6 +139,7 @@ where
131
139
time_provider : TP ,
132
140
channel_manager : CM ,
133
141
node_signer : NS ,
142
+ kv_store : Arc < dyn KVStore + Send + Sync > ,
134
143
last_pruning : Mutex < Option < LSPSDateTime > > ,
135
144
}
136
145
@@ -143,7 +152,8 @@ where
143
152
/// Constructs a `LSPS5ServiceHandler` using the given time provider.
144
153
pub ( crate ) fn new_with_time_provider (
145
154
event_queue : Arc < EventQueue > , pending_messages : Arc < MessageQueue > , channel_manager : CM ,
146
- node_signer : NS , config : LSPS5ServiceConfig , time_provider : TP ,
155
+ kv_store : Arc < dyn KVStore + Send + Sync > , node_signer : NS , config : LSPS5ServiceConfig ,
156
+ time_provider : TP ,
147
157
) -> Self {
148
158
assert ! ( config. max_webhooks_per_client > 0 , "`max_webhooks_per_client` must be > 0" ) ;
149
159
Self {
@@ -154,6 +164,7 @@ where
154
164
time_provider,
155
165
channel_manager,
156
166
node_signer,
167
+ kv_store,
157
168
last_pruning : Mutex :: new ( None ) ,
158
169
}
159
170
}
@@ -186,6 +197,44 @@ where
186
197
}
187
198
}
188
199
200
+ fn persist_peer_state (
201
+ & self , counterparty_node_id : PublicKey , peer_state : & PeerState ,
202
+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + Send > > {
203
+ let key = counterparty_node_id. to_string ( ) ;
204
+ let encoded = peer_state. encode ( ) ;
205
+ self . kv_store . write (
206
+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
207
+ LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
208
+ & key,
209
+ encoded,
210
+ )
211
+ }
212
+
213
+ pub ( crate ) fn persist (
214
+ & self ,
215
+ ) -> Pin < Box < dyn Future < Output = Result < ( ) , lightning:: io:: Error > > + Send > > {
216
+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
217
+ let mut futures = Vec :: new ( ) ;
218
+ for ( counterparty_node_id, peer_state) in outer_state_lock. iter ( ) {
219
+ let fut = self . persist_peer_state ( * counterparty_node_id, peer_state) ;
220
+ futures. push ( fut) ;
221
+ }
222
+
223
+ // TODO: We should eventually persist in parallel, however, when we do, we probably want to
224
+ // introduce some batching to upper-bound the number of requests inflight at any given
225
+ // time.
226
+ Box :: pin ( async move {
227
+ let mut ret = Ok ( ( ) ) ;
228
+ for fut in futures {
229
+ let res = fut. await ;
230
+ if res. is_err ( ) {
231
+ ret = res;
232
+ }
233
+ }
234
+ ret
235
+ } )
236
+ }
237
+
189
238
fn check_prune_stale_webhooks < ' a > (
190
239
& self , outer_state_lock : & mut RwLockWriteGuard < ' a , HashMap < PublicKey , PeerState > > ,
191
240
) {
0 commit comments