@@ -36,6 +36,7 @@ use lightning::util::persist::KVStore;
3636use lightning:: util:: ser:: Writeable ;
3737
3838use core:: ops:: Deref ;
39+ use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
3940use core:: time:: Duration ;
4041
4142use alloc:: string:: String ;
@@ -140,6 +141,7 @@ where
140141 node_signer : NS ,
141142 kv_store : K ,
142143 last_pruning : Mutex < Option < LSPSDateTime > > ,
144+ persistence_in_flight : AtomicUsize ,
143145}
144146
145147impl < CM : Deref , NS : Deref , K : Deref + Clone , TP : Deref > LSPS5ServiceHandler < CM , NS , K , TP >
@@ -167,6 +169,7 @@ where
167169 node_signer,
168170 kv_store,
169171 last_pruning : Mutex :: new ( None ) ,
172+ persistence_in_flight : AtomicUsize :: new ( 0 ) ,
170173 }
171174 }
172175
@@ -245,63 +248,80 @@ where
245248 // TODO: We should eventually persist in parallel, however, when we do, we probably want to
246249 // introduce some batching to upper-bound the number of requests inflight at any given
247250 // time.
248- let mut need_remove = Vec :: new ( ) ;
249- let mut need_persist = Vec :: new ( ) ;
250-
251- self . check_prune_stale_webhooks ( & mut self . per_peer_state . write ( ) . unwrap ( ) ) ;
252- {
253- let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
254-
255- for ( client_id, peer_state) in outer_state_lock. iter ( ) {
256- let is_prunable = peer_state. is_prunable ( ) ;
257- let has_open_channel = self . client_has_open_channel ( client_id) ;
258- if is_prunable && !has_open_channel {
259- need_remove. push ( * client_id) ;
260- } else if peer_state. needs_persist {
261- need_persist. push ( * client_id) ;
262- }
263- }
264- }
265251
266- for client_id in need_persist. into_iter ( ) {
267- debug_assert ! ( !need_remove. contains( & client_id) ) ;
268- self . persist_peer_state ( client_id) . await ?;
252+ if self . persistence_in_flight . fetch_add ( 1 , Ordering :: AcqRel ) > 0 {
253+ // If we're not the first event processor to get here, just return early, the increment
254+ // we just did will be treated as "go around again" at the end.
255+ return Ok ( ( ) ) ;
269256 }
270257
271- for client_id in need_remove {
272- let mut future_opt = None ;
258+ loop {
259+ let mut need_remove = Vec :: new ( ) ;
260+ let mut need_persist = Vec :: new ( ) ;
261+
262+ self . check_prune_stale_webhooks ( & mut self . per_peer_state . write ( ) . unwrap ( ) ) ;
273263 {
274- // We need to take the `per_peer_state` write lock to remove an entry, but also
275- // have to hold it until after the `remove` call returns (but not through
276- // future completion) to ensure that writes for the peer's state are
277- // well-ordered with other `persist_peer_state` calls even across the removal
278- // itself.
279- let mut per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
280- if let Entry :: Occupied ( mut entry) = per_peer_state. entry ( client_id) {
281- let state = entry. get_mut ( ) ;
282- if state. is_prunable ( ) && !self . client_has_open_channel ( & client_id) {
283- entry. remove ( ) ;
284- let key = client_id. to_string ( ) ;
285- future_opt = Some ( self . kv_store . remove (
286- LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
287- LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
288- & key,
289- ) ) ;
264+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
265+
266+ for ( client_id, peer_state) in outer_state_lock. iter ( ) {
267+ let is_prunable = peer_state. is_prunable ( ) ;
268+ let has_open_channel = self . client_has_open_channel ( client_id) ;
269+ if is_prunable && !has_open_channel {
270+ need_remove. push ( * client_id) ;
271+ } else if peer_state. needs_persist {
272+ need_persist. push ( * client_id) ;
273+ }
274+ }
275+ }
276+
277+ for client_id in need_persist. into_iter ( ) {
278+ debug_assert ! ( !need_remove. contains( & client_id) ) ;
279+ self . persist_peer_state ( client_id) . await ?;
280+ }
281+
282+ for client_id in need_remove {
283+ let mut future_opt = None ;
284+ {
285+ // We need to take the `per_peer_state` write lock to remove an entry, but also
286+ // have to hold it until after the `remove` call returns (but not through
287+ // future completion) to ensure that writes for the peer's state are
288+ // well-ordered with other `persist_peer_state` calls even across the removal
289+ // itself.
290+ let mut per_peer_state = self . per_peer_state . write ( ) . unwrap ( ) ;
291+ if let Entry :: Occupied ( mut entry) = per_peer_state. entry ( client_id) {
292+ let state = entry. get_mut ( ) ;
293+ if state. is_prunable ( ) && !self . client_has_open_channel ( & client_id) {
294+ entry. remove ( ) ;
295+ let key = client_id. to_string ( ) ;
296+ future_opt = Some ( self . kv_store . remove (
297+ LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
298+ LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE ,
299+ & key,
300+ ) ) ;
301+ } else {
302+ // If the peer was re-added, force a re-persist of the current state.
303+ state. needs_persist = true ;
304+ }
290305 } else {
291- // If the peer was re-added, force a re-persist of the current state.
292- state. needs_persist = true ;
306+ // This should never happen, we can only have one `persist` call
307+ // in-progress at once and map entries are only removed by it.
308+ debug_assert ! ( false ) ;
293309 }
310+ }
311+ if let Some ( future) = future_opt {
312+ future. await ?;
294313 } else {
295- // This should never happen, we can only have one `persist` call
296- // in-progress at once and map entries are only removed by it.
297- debug_assert ! ( false ) ;
314+ self . persist_peer_state ( client_id) . await ?;
298315 }
299316 }
300- if let Some ( future) = future_opt {
301- future. await ?;
302- } else {
303- self . persist_peer_state ( client_id) . await ?;
317+
318+ if self . persistence_in_flight . fetch_sub ( 1 , Ordering :: AcqRel ) != 1 {
319+ // If another thread incremented the state while we were running we should go
320+ // around again, but only once.
321+ self . persistence_in_flight . store ( 1 , Ordering :: Release ) ;
322+ continue ;
304323 }
324+ break ;
305325 }
306326
307327 Ok ( ( ) )
0 commit comments