@@ -499,7 +499,7 @@ impl PeerState {
499499}
500500
501501macro_rules! get_or_insert_peer_state_entry {
502- ( $self: ident, $outer_state_lock: expr, $counterparty_node_id: expr) => { {
502+ ( $self: ident, $outer_state_lock: expr, $message_queue_notifier : expr , $ counterparty_node_id: expr) => { {
503503 // Return an internal error and abort if we hit the maximum allowed number of total peers.
504504 let is_limited_by_max_total_peers = $outer_state_lock. len( ) >= MAX_TOTAL_PEERS ;
505505 match $outer_state_lock. entry( * $counterparty_node_id) {
@@ -512,7 +512,7 @@ macro_rules! get_or_insert_peer_state_entry {
512512
513513 let msg = LSPSMessage :: Invalid ( error_response) ;
514514 drop( $outer_state_lock) ;
515- $self . pending_messages . enqueue( $counterparty_node_id, msg) ;
515+ $message_queue_notifier . enqueue( $counterparty_node_id, msg) ;
516516
517517 let err = format!(
518518 "Dropping request from peer {} due to reaching maximally allowed number of total peers: {}" ,
@@ -581,6 +581,7 @@ where
581581 pub fn invalid_token_provided (
582582 & self , counterparty_node_id : & PublicKey , request_id : LSPSRequestId ,
583583 ) -> Result < ( ) , APIError > {
584+ let mut message_queue_notifier = self . pending_messages . notifier ( ) ;
584585 let ( result, response) = {
585586 let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
586587
@@ -622,7 +623,7 @@ where
622623
623624 if let Some ( response) = response {
624625 let msg = LSPS2Message :: Response ( request_id, response) . into ( ) ;
625- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
626+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
626627 }
627628
628629 result
@@ -637,6 +638,7 @@ where
637638 & self , counterparty_node_id : & PublicKey , request_id : LSPSRequestId ,
638639 opening_fee_params_menu : Vec < LSPS2RawOpeningFeeParams > ,
639640 ) -> Result < ( ) , APIError > {
641+ let mut message_queue_notifier = self . pending_messages . notifier ( ) ;
640642 let ( result, response) = {
641643 let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
642644
@@ -689,7 +691,7 @@ where
689691
690692 if let Some ( response) = response {
691693 let msg = LSPS2Message :: Response ( request_id, response) . into ( ) ;
692- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
694+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
693695 }
694696
695697 result
@@ -707,6 +709,8 @@ where
707709 & self , counterparty_node_id : & PublicKey , request_id : LSPSRequestId , intercept_scid : u64 ,
708710 cltv_expiry_delta : u32 , client_trusts_lsp : bool , user_channel_id : u128 ,
709711 ) -> Result < ( ) , APIError > {
712+ let mut message_queue_notifier = self . pending_messages . notifier ( ) ;
713+
710714 let ( result, response) = {
711715 let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
712716
@@ -767,7 +771,7 @@ where
767771
768772 if let Some ( response) = response {
769773 let msg = LSPS2Message :: Response ( request_id, response) . into ( ) ;
770- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
774+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
771775 }
772776
773777 result
@@ -1202,11 +1206,16 @@ where
12021206 & self , request_id : LSPSRequestId , counterparty_node_id : & PublicKey ,
12031207 params : LSPS2GetInfoRequest ,
12041208 ) -> Result < ( ) , LightningError > {
1209+ let mut message_queue_notifier = self . pending_messages . notifier ( ) ;
12051210 let event_queue_notifier = self . pending_events . notifier ( ) ;
12061211 let ( result, response) = {
12071212 let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
1208- let inner_state_lock =
1209- get_or_insert_peer_state_entry ! ( self , outer_state_lock, counterparty_node_id) ;
1213+ let inner_state_lock = get_or_insert_peer_state_entry ! (
1214+ self ,
1215+ outer_state_lock,
1216+ message_queue_notifier,
1217+ counterparty_node_id
1218+ ) ;
12101219 let mut peer_state_lock = inner_state_lock. lock ( ) . unwrap ( ) ;
12111220 let request = LSPS2Request :: GetInfo ( params. clone ( ) ) ;
12121221 match self . insert_pending_request (
@@ -1229,7 +1238,7 @@ where
12291238 } ;
12301239
12311240 if let Some ( msg) = response {
1232- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
1241+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
12331242 }
12341243
12351244 result
@@ -1238,6 +1247,7 @@ where
12381247 fn handle_buy_request (
12391248 & self , request_id : LSPSRequestId , counterparty_node_id : & PublicKey , params : LSPS2BuyRequest ,
12401249 ) -> Result < ( ) , LightningError > {
1250+ let mut message_queue_notifier = self . pending_messages . notifier ( ) ;
12411251 let event_queue_notifier = self . pending_events . notifier ( ) ;
12421252 if let Some ( payment_size_msat) = params. payment_size_msat {
12431253 if payment_size_msat < params. opening_fee_params . min_payment_size_msat {
@@ -1247,7 +1257,7 @@ where
12471257 data : None ,
12481258 } ) ;
12491259 let msg = LSPS2Message :: Response ( request_id, response) . into ( ) ;
1250- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
1260+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
12511261
12521262 return Err ( LightningError {
12531263 err : "payment size is below our minimum supported payment size" . to_string ( ) ,
@@ -1262,7 +1272,7 @@ where
12621272 data : None ,
12631273 } ) ;
12641274 let msg = LSPS2Message :: Response ( request_id, response) . into ( ) ;
1265- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
1275+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
12661276 return Err ( LightningError {
12671277 err : "payment size is above our maximum supported payment size" . to_string ( ) ,
12681278 action : ErrorAction :: IgnoreAndLog ( Level :: Info ) ,
@@ -1283,7 +1293,7 @@ where
12831293 data : None ,
12841294 } ) ;
12851295 let msg = LSPS2Message :: Response ( request_id, response) . into ( ) ;
1286- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
1296+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
12871297 return Err ( LightningError {
12881298 err : "payment size is too small to cover the opening fee" . to_string ( ) ,
12891299 action : ErrorAction :: IgnoreAndLog ( Level :: Info ) ,
@@ -1297,7 +1307,7 @@ where
12971307 data : None ,
12981308 } ) ;
12991309 let msg = LSPS2Message :: Response ( request_id, response) . into ( ) ;
1300- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
1310+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
13011311 return Err ( LightningError {
13021312 err : "overflow error when calculating opening_fee" . to_string ( ) ,
13031313 action : ErrorAction :: IgnoreAndLog ( Level :: Info ) ,
@@ -1314,7 +1324,7 @@ where
13141324 data : None ,
13151325 } ) ;
13161326 let msg = LSPS2Message :: Response ( request_id, response) . into ( ) ;
1317- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
1327+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
13181328 return Err ( LightningError {
13191329 err : "invalid opening fee parameters were supplied by client" . to_string ( ) ,
13201330 action : ErrorAction :: IgnoreAndLog ( Level :: Info ) ,
@@ -1323,8 +1333,12 @@ where
13231333
13241334 let ( result, response) = {
13251335 let mut outer_state_lock = self . per_peer_state . write ( ) . unwrap ( ) ;
1326- let inner_state_lock =
1327- get_or_insert_peer_state_entry ! ( self , outer_state_lock, counterparty_node_id) ;
1336+ let inner_state_lock = get_or_insert_peer_state_entry ! (
1337+ self ,
1338+ outer_state_lock,
1339+ message_queue_notifier,
1340+ counterparty_node_id
1341+ ) ;
13281342 let mut peer_state_lock = inner_state_lock. lock ( ) . unwrap ( ) ;
13291343
13301344 let request = LSPS2Request :: Buy ( params. clone ( ) ) ;
@@ -1350,7 +1364,7 @@ where
13501364 } ;
13511365
13521366 if let Some ( msg) = response {
1353- self . pending_messages . enqueue ( counterparty_node_id, msg) ;
1367+ message_queue_notifier . enqueue ( counterparty_node_id, msg) ;
13541368 }
13551369
13561370 result
0 commit comments