@@ -32,6 +32,7 @@ use lightning_types::payment::PaymentHash;
3232use bitcoin:: secp256k1:: PublicKey ;
3333
3434use core:: ops:: Deref ;
35+ use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
3536
3637use crate :: lsps2:: msgs:: {
3738 BuyRequest , BuyResponse , GetInfoRequest , GetInfoResponse , LSPS2Message , LSPS2Request ,
@@ -43,6 +44,7 @@ use crate::lsps2::msgs::{
4344} ;
4445
4546const MAX_PENDING_REQUESTS_PER_PEER : usize = 10 ;
47+ const MAX_TOTAL_PENDING_REQUESTS : usize = 1000 ;
4648
4749/// Server-side configuration options for JIT channels.
4850#[ derive( Clone , Debug ) ]
@@ -470,6 +472,7 @@ where
470472 per_peer_state : RwLock < HashMap < PublicKey , Mutex < PeerState > > > ,
471473 peer_by_intercept_scid : RwLock < HashMap < u64 , PublicKey > > ,
472474 peer_by_channel_id : RwLock < HashMap < ChannelId , PublicKey > > ,
475+ total_pending_requests : AtomicUsize ,
473476 config : LSPS2ServiceConfig ,
474477}
475478
@@ -488,6 +491,7 @@ where
488491 per_peer_state : RwLock :: new ( new_hash_map ( ) ) ,
489492 peer_by_intercept_scid : RwLock :: new ( new_hash_map ( ) ) ,
490493 peer_by_channel_id : RwLock :: new ( new_hash_map ( ) ) ,
494+ total_pending_requests : AtomicUsize :: new ( 0 ) ,
491495 channel_manager,
492496 config,
493497 }
@@ -1145,8 +1149,27 @@ where
11451149 & self , peer_state_lock : & mut MutexGuard < ' a , PeerState > , request_id : RequestId ,
11461150 counterparty_node_id : PublicKey , request : LSPS2Request ,
11471151 ) -> ( Result < ( ) , LightningError > , Option < LSPSMessage > ) {
1152+ if self . total_pending_requests . load ( Ordering :: Relaxed ) >= MAX_TOTAL_PENDING_REQUESTS {
1153+ let response = LSPS2Response :: BuyError ( ResponseError {
1154+ code : LSPS0_CLIENT_REJECTED_ERROR_CODE ,
1155+ message : "Reached maximum number of pending requests. Please try again later."
1156+ . to_string ( ) ,
1157+ data : None ,
1158+ } ) ;
1159+ let msg = Some ( LSPS2Message :: Response ( request_id, response) . into ( ) ) ;
1160+
1161+ let err = format ! (
1162+ "Peer {} reached maximum number of total pending requests: {}" ,
1163+ counterparty_node_id, MAX_TOTAL_PENDING_REQUESTS
1164+ ) ;
1165+ let result =
1166+ Err ( LightningError { err, action : ErrorAction :: IgnoreAndLog ( Level :: Debug ) } ) ;
1167+ return ( result, msg) ;
1168+ }
1169+
11481170 if peer_state_lock. pending_requests . len ( ) < MAX_PENDING_REQUESTS_PER_PEER {
11491171 peer_state_lock. pending_requests . insert ( request_id, request) ;
1172+ self . total_pending_requests . fetch_add ( 1 , Ordering :: Relaxed ) ;
11501173 ( Ok ( ( ) ) , None )
11511174 } else {
11521175 let response = LSPS2Response :: BuyError ( ResponseError {
@@ -1171,7 +1194,43 @@ where
11711194 fn remove_pending_request < ' a > (
11721195 & self , peer_state_lock : & mut MutexGuard < ' a , PeerState > , request_id : & RequestId ,
11731196 ) -> Option < LSPS2Request > {
1174- peer_state_lock. pending_requests . remove ( request_id)
1197+ match peer_state_lock. pending_requests . remove ( request_id) {
1198+ Some ( req) => {
1199+ let res = self . total_pending_requests . fetch_update (
1200+ Ordering :: Relaxed ,
1201+ Ordering :: Relaxed ,
1202+ |x| Some ( x. saturating_sub ( 1 ) ) ,
1203+ ) ;
1204+ match res {
1205+ Ok ( previous_value) if previous_value == 0 => debug_assert ! (
1206+ false ,
1207+ "total_pending_requests counter out-of-sync! This should never happen!"
1208+ ) ,
1209+ Err ( previous_value) if previous_value == 0 => debug_assert ! (
1210+ false ,
1211+ "total_pending_requests counter out-of-sync! This should never happen!"
1212+ ) ,
1213+ _ => { } ,
1214+ }
1215+ Some ( req)
1216+ } ,
1217+ res => res,
1218+ }
1219+ }
1220+
1221+ #[ cfg( debug_assertions) ]
1222+ fn verify_pending_request_counter ( & self ) {
1223+ let mut num_requests = 0 ;
1224+ let outer_state_lock = self . per_peer_state . read ( ) . unwrap ( ) ;
1225+ for ( _, inner) in outer_state_lock. iter ( ) {
1226+ let inner_state_lock = inner. lock ( ) . unwrap ( ) ;
1227+ num_requests += inner_state_lock. pending_requests . len ( ) ;
1228+ }
1229+ debug_assert_eq ! (
1230+ num_requests,
1231+ self . total_pending_requests. load( Ordering :: Relaxed ) ,
1232+ "total_pending_requests counter out-of-sync! This should never happen!"
1233+ ) ;
11751234 }
11761235}
11771236
@@ -1186,13 +1245,18 @@ where
11861245 & self , message : Self :: ProtocolMessage , counterparty_node_id : & PublicKey ,
11871246 ) -> Result < ( ) , LightningError > {
11881247 match message {
1189- LSPS2Message :: Request ( request_id, request) => match request {
1190- LSPS2Request :: GetInfo ( params) => {
1191- self . handle_get_info_request ( request_id, counterparty_node_id, params)
1192- } ,
1193- LSPS2Request :: Buy ( params) => {
1194- self . handle_buy_request ( request_id, counterparty_node_id, params)
1195- } ,
1248+ LSPS2Message :: Request ( request_id, request) => {
1249+ let res = match request {
1250+ LSPS2Request :: GetInfo ( params) => {
1251+ self . handle_get_info_request ( request_id, counterparty_node_id, params)
1252+ } ,
1253+ LSPS2Request :: Buy ( params) => {
1254+ self . handle_buy_request ( request_id, counterparty_node_id, params)
1255+ } ,
1256+ } ;
1257+ #[ cfg( debug_assertions) ]
1258+ self . verify_pending_request_counter ( ) ;
1259+ res
11961260 } ,
11971261 _ => {
11981262 debug_assert ! (
0 commit comments