@@ -17,9 +17,8 @@ use crate::lsps5::msgs::{
1717 SetWebhookRequest , SetWebhookResponse , WebhookNotification , WebhookNotificationMethod ,
1818} ;
1919use crate :: message_queue:: MessageQueue ;
20- use crate :: prelude:: hash_map:: Entry ;
2120use crate :: prelude:: * ;
22- use crate :: sync:: { Arc , Mutex } ;
21+ use crate :: sync:: { Arc , Mutex , RwLock , RwLockWriteGuard } ;
2322use crate :: utils:: time:: TimeProvider ;
2423
2524use bitcoin:: secp256k1:: PublicKey ;
@@ -117,7 +116,7 @@ where
117116 TP :: Target : TimeProvider ,
118117{
119118 config : LSPS5ServiceConfig ,
120- webhooks : Mutex < HashMap < PublicKey , HashMap < LSPS5AppName , Webhook > > > ,
119+ per_peer_state : RwLock < HashMap < PublicKey , PeerState > > ,
121120 event_queue : Arc < EventQueue > ,
122121 pending_messages : Arc < MessageQueue > ,
123122 time_provider : TP ,
@@ -140,7 +139,7 @@ where
140139 assert ! ( config. max_webhooks_per_client > 0 , "`max_webhooks_per_client` must be > 0" ) ;
141140 Self {
142141 config,
143- webhooks : Mutex :: new ( new_hash_map ( ) ) ,
142+ per_peer_state : RwLock :: new ( new_hash_map ( ) ) ,
144143 event_queue,
145144 pending_messages,
146145 time_provider,
@@ -150,18 +149,26 @@ where
150149 }
151150 }
152151
153- fn check_prune_stale_webhooks ( & self ) {
152+ fn check_prune_stale_webhooks < ' a > (
153+ & self , outer_state_lock : & mut RwLockWriteGuard < ' a , HashMap < PublicKey , PeerState > > ,
154+ ) {
155+ let mut last_pruning = self . last_pruning . lock ( ) . unwrap ( ) ;
154156 let now =
155157 LSPSDateTime :: new_from_duration_since_epoch ( self . time_provider . duration_since_epoch ( ) ) ;
156- let should_prune = {
157- let last_pruning = self . last_pruning . lock ( ) . unwrap ( ) ;
158- last_pruning. as_ref ( ) . map_or ( true , |last_time| {
159- now. duration_since ( & last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS
160- } )
161- } ;
158+
159+ let should_prune = last_pruning. as_ref ( ) . map_or ( true , |last_time| {
160+ now. duration_since ( & last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS
161+ } ) ;
162162
163163 if should_prune {
164- self . prune_stale_webhooks ( ) ;
164+ outer_state_lock. retain ( |client_id, peer_state| {
165+ if self . client_has_open_channel ( client_id) {
166+ // Don't prune clients with open channels
167+ return true ;
168+ }
169+ !peer_state. prune_stale_webhooks ( now)
170+ } ) ;
171+ * last_pruning = Some ( now) ;
165172 }
166173 }
167174
@@ -171,58 +178,56 @@ where
171178 ) -> Result < ( ) , LightningError > {
172179 let mut message_queue_notifier = self . pending_messages . notifier ( ) ;
173180
174- self . check_prune_stale_webhooks ( ) ;
181+ let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
182+ self . check_prune_stale_webhooks ( & mut outer_state_lock) ;
175183
176- let mut webhooks = self . webhooks . lock ( ) . unwrap ( ) ;
184+ let peer_state =
185+ outer_state_lock. entry ( counterparty_node_id) . or_insert_with ( PeerState :: default) ;
177186
178- let client_webhooks = webhooks. entry ( counterparty_node_id) . or_insert_with ( new_hash_map) ;
179187 let now =
180188 LSPSDateTime :: new_from_duration_since_epoch ( self . time_provider . duration_since_epoch ( ) ) ;
181189
182- let num_webhooks = client_webhooks . len ( ) ;
190+ let num_webhooks = peer_state . webhooks_len ( ) ;
183191 let mut no_change = false ;
184- match client_webhooks. entry ( params. app_name . clone ( ) ) {
185- Entry :: Occupied ( mut entry) => {
186- no_change = entry. get ( ) . url == params. webhook ;
187- let last_used = if no_change { entry. get ( ) . last_used } else { now } ;
188- let last_notification_sent = entry. get ( ) . last_notification_sent ;
189- entry. insert ( Webhook {
190- _app_name : params. app_name . clone ( ) ,
191- url : params. webhook . clone ( ) ,
192- _counterparty_node_id : counterparty_node_id,
193- last_used,
194- last_notification_sent,
195- } ) ;
196- } ,
197- Entry :: Vacant ( entry) => {
198- if num_webhooks >= self . config . max_webhooks_per_client as usize {
199- let error = LSPS5ProtocolError :: TooManyWebhooks ;
200- let msg = LSPS5Message :: Response (
201- request_id,
202- LSPS5Response :: SetWebhookError ( error. clone ( ) . into ( ) ) ,
203- )
204- . into ( ) ;
205- message_queue_notifier. enqueue ( & counterparty_node_id, msg) ;
206- return Err ( LightningError {
207- err : error. message ( ) . into ( ) ,
208- action : ErrorAction :: IgnoreAndLog ( Level :: Info ) ,
209- } ) ;
210- }
211192
212- entry. insert ( Webhook {
213- _app_name : params. app_name . clone ( ) ,
214- url : params. webhook . clone ( ) ,
215- _counterparty_node_id : counterparty_node_id,
216- last_used : now,
217- last_notification_sent : None ,
193+ if let Some ( webhook) = peer_state. webhook_mut ( & params. app_name ) {
194+ no_change = webhook. url == params. webhook ;
195+ if !no_change {
196+ // The URL was updated.
197+ webhook. url = params. webhook . clone ( ) ;
198+ webhook. last_used = now;
199+ webhook. last_notification_sent = None ;
200+ }
201+ } else {
202+ if num_webhooks >= self . config . max_webhooks_per_client as usize {
203+ let error = LSPS5ProtocolError :: TooManyWebhooks ;
204+ let msg = LSPS5Message :: Response (
205+ request_id,
206+ LSPS5Response :: SetWebhookError ( error. clone ( ) . into ( ) ) ,
207+ )
208+ . into ( ) ;
209+ message_queue_notifier. enqueue ( & counterparty_node_id, msg) ;
210+ return Err ( LightningError {
211+ err : error. message ( ) . into ( ) ,
212+ action : ErrorAction :: IgnoreAndLog ( Level :: Info ) ,
218213 } ) ;
219- } ,
214+ }
215+
216+ let webhook = Webhook {
217+ _app_name : params. app_name . clone ( ) ,
218+ url : params. webhook . clone ( ) ,
219+ _counterparty_node_id : counterparty_node_id,
220+ last_used : now,
221+ last_notification_sent : None ,
222+ } ;
223+
224+ peer_state. insert_webhook ( params. app_name . clone ( ) , webhook) ;
220225 }
221226
222227 if !no_change {
223228 self . send_webhook_registered_notification (
224229 counterparty_node_id,
225- params. app_name ,
230+ params. app_name . clone ( ) ,
226231 params. webhook ,
227232 )
228233 . map_err ( |e| {
@@ -242,7 +247,7 @@ where
242247 let msg = LSPS5Message :: Response (
243248 request_id,
244249 LSPS5Response :: SetWebhook ( SetWebhookResponse {
245- num_webhooks : client_webhooks . len ( ) as u32 ,
250+ num_webhooks : peer_state . webhooks_len ( ) as u32 ,
246251 max_webhooks : self . config . max_webhooks_per_client ,
247252 no_change,
248253 } ) ,
@@ -258,14 +263,11 @@ where
258263 ) -> Result < ( ) , LightningError > {
259264 let mut message_queue_notifier = self . pending_messages . notifier ( ) ;
260265
261- self . check_prune_stale_webhooks ( ) ;
262-
263- let webhooks = self . webhooks . lock ( ) . unwrap ( ) ;
266+ let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
267+ self . check_prune_stale_webhooks ( & mut outer_state_lock) ;
264268
265- let app_names = webhooks
266- . get ( & counterparty_node_id)
267- . map ( |client_webhooks| client_webhooks. keys ( ) . cloned ( ) . collect :: < Vec < _ > > ( ) )
268- . unwrap_or_else ( Vec :: new) ;
269+ let app_names =
270+ outer_state_lock. get ( & counterparty_node_id) . map ( |p| p. app_names ( ) ) . unwrap_or_default ( ) ;
269271
270272 let max_webhooks = self . config . max_webhooks_per_client ;
271273
@@ -282,12 +284,11 @@ where
282284 ) -> Result < ( ) , LightningError > {
283285 let mut message_queue_notifier = self . pending_messages . notifier ( ) ;
284286
285- self . check_prune_stale_webhooks ( ) ;
287+ let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
288+ self . check_prune_stale_webhooks ( & mut outer_state_lock) ;
286289
287- let mut webhooks = self . webhooks . lock ( ) . unwrap ( ) ;
288-
289- if let Some ( client_webhooks) = webhooks. get_mut ( & counterparty_node_id) {
290- if client_webhooks. remove ( & params. app_name ) . is_some ( ) {
290+ if let Some ( peer_state) = outer_state_lock. get_mut ( & counterparty_node_id) {
291+ if peer_state. remove_webhook ( & params. app_name ) {
291292 let response = RemoveWebhookResponse { } ;
292293 let msg =
293294 LSPS5Message :: Response ( request_id, LSPS5Response :: RemoveWebhook ( response) )
@@ -408,11 +409,13 @@ where
408409 fn send_notifications_to_client_webhooks (
409410 & self , client_id : PublicKey , notification : WebhookNotification ,
410411 ) -> Result < ( ) , LSPS5ProtocolError > {
411- let mut webhooks = self . webhooks . lock ( ) . unwrap ( ) ;
412+ let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
413+ self . check_prune_stale_webhooks ( & mut outer_state_lock) ;
412414
413- let client_webhooks = match webhooks. get_mut ( & client_id) {
414- Some ( webhooks) if !webhooks. is_empty ( ) => webhooks,
415- _ => return Ok ( ( ) ) ,
415+ let peer_state = if let Some ( peer_state) = outer_state_lock. get_mut ( & client_id) {
416+ peer_state
417+ } else {
418+ return Ok ( ( ) ) ;
416419 } ;
417420
418421 let now =
@@ -421,7 +424,7 @@ where
421424 // We must avoid sending multiple notifications of the same method
422425 // (other than lsps5.webhook_registered) close in time.
423426 if notification. method != WebhookNotificationMethod :: LSPS5WebhookRegistered {
424- let rate_limit_applies = client_webhooks . iter ( ) . any ( |( _, webhook) | {
427+ let rate_limit_applies = peer_state . webhooks ( ) . iter ( ) . any ( |( _, webhook) | {
425428 webhook. last_notification_sent . as_ref ( ) . map_or ( false , |last_sent| {
426429 now. duration_since ( & last_sent) < NOTIFICATION_COOLDOWN_TIME
427430 } )
@@ -432,7 +435,7 @@ where
432435 }
433436 }
434437
435- for ( app_name, webhook) in client_webhooks . iter_mut ( ) {
438+ for ( app_name, webhook) in peer_state . webhooks_mut ( ) . iter_mut ( ) {
436439 self . send_notification (
437440 client_id,
438441 app_name. clone ( ) ,
@@ -490,26 +493,6 @@ where
490493 . map_err ( |_| LSPS5ProtocolError :: UnknownError )
491494 }
492495
493- fn prune_stale_webhooks ( & self ) {
494- let now =
495- LSPSDateTime :: new_from_duration_since_epoch ( self . time_provider . duration_since_epoch ( ) ) ;
496- let mut webhooks = self . webhooks . lock ( ) . unwrap ( ) ;
497-
498- webhooks. retain ( |client_id, client_webhooks| {
499- if !self . client_has_open_channel ( client_id) {
500- client_webhooks. retain ( |_, webhook| {
501- now. duration_since ( & webhook. last_used ) < MIN_WEBHOOK_RETENTION_DAYS
502- } ) ;
503- !client_webhooks. is_empty ( )
504- } else {
505- true
506- }
507- } ) ;
508-
509- let mut last_pruning = self . last_pruning . lock ( ) . unwrap ( ) ;
510- * last_pruning = Some ( now) ;
511- }
512-
513496 fn client_has_open_channel ( & self , client_id : & PublicKey ) -> bool {
514497 self . channel_manager
515498 . get_cm ( )
@@ -519,20 +502,16 @@ where
519502 }
520503
521504 pub ( crate ) fn peer_connected ( & self , counterparty_node_id : & PublicKey ) {
522- let mut webhooks = self . webhooks . lock ( ) . unwrap ( ) ;
523- if let Some ( client_webhooks) = webhooks. get_mut ( counterparty_node_id) {
524- for webhook in client_webhooks. values_mut ( ) {
525- webhook. last_notification_sent = None ;
526- }
505+ let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
506+ if let Some ( peer_state) = outer_state_lock. get_mut ( counterparty_node_id) {
507+ peer_state. reset_notification_cooldown ( ) ;
527508 }
528509 }
529510
530511 pub ( crate ) fn peer_disconnected ( & self , counterparty_node_id : & PublicKey ) {
531- let mut webhooks = self . webhooks . lock ( ) . unwrap ( ) ;
532- if let Some ( client_webhooks) = webhooks. get_mut ( counterparty_node_id) {
533- for webhook in client_webhooks. values_mut ( ) {
534- webhook. last_notification_sent = None ;
535- }
512+ let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
513+ if let Some ( peer_state) = outer_state_lock. get_mut ( counterparty_node_id) {
514+ peer_state. reset_notification_cooldown ( ) ;
536515 }
537516 }
538517}
@@ -578,3 +557,69 @@ where
578557 }
579558 }
580559}
560+
561+ #[ derive( Debug , Default ) ]
562+ struct PeerState {
563+ webhooks : Vec < ( LSPS5AppName , Webhook ) > ,
564+ }
565+
566+ impl PeerState {
567+ fn webhook_mut ( & mut self , name : & LSPS5AppName ) -> Option < & mut Webhook > {
568+ self . webhooks . iter_mut ( ) . find_map ( |( n, h) | if n == name { Some ( h) } else { None } )
569+ }
570+
571+ fn webhooks ( & self ) -> & Vec < ( LSPS5AppName , Webhook ) > {
572+ & self . webhooks
573+ }
574+
575+ fn webhooks_mut ( & mut self ) -> & mut Vec < ( LSPS5AppName , Webhook ) > {
576+ & mut self . webhooks
577+ }
578+
579+ fn webhooks_len ( & self ) -> usize {
580+ self . webhooks . len ( )
581+ }
582+
583+ fn app_names ( & self ) -> Vec < LSPS5AppName > {
584+ self . webhooks . iter ( ) . map ( |( n, _) | n) . cloned ( ) . collect ( )
585+ }
586+
587+ fn insert_webhook ( & mut self , name : LSPS5AppName , hook : Webhook ) {
588+ for ( n, h) in self . webhooks . iter_mut ( ) {
589+ if * n == name {
590+ * h = hook;
591+ return ;
592+ }
593+ }
594+
595+ self . webhooks . push ( ( name, hook) ) ;
596+ }
597+
598+ fn remove_webhook ( & mut self , name : & LSPS5AppName ) -> bool {
599+ let mut removed = false ;
600+ self . webhooks . retain ( |( n, _) | {
601+ if n != name {
602+ true
603+ } else {
604+ removed = true ;
605+ false
606+ }
607+ } ) ;
608+ removed
609+ }
610+
611+ fn reset_notification_cooldown ( & mut self ) {
612+ for ( _, h) in self . webhooks . iter_mut ( ) {
613+ h. last_notification_sent = None ;
614+ }
615+ }
616+
617+ // Returns whether the entire state is empty and can be pruned.
618+ fn prune_stale_webhooks ( & mut self , now : LSPSDateTime ) -> bool {
619+ self . webhooks . retain ( |( _, webhook) | {
620+ now. duration_since ( & webhook. last_used ) < MIN_WEBHOOK_RETENTION_DAYS
621+ } ) ;
622+
623+ self . webhooks . is_empty ( )
624+ }
625+ }
0 commit comments