@@ -13,7 +13,7 @@ use libp2p::swarm::{
1313use libp2p:: swarm:: { FromSwarm , SubstreamProtocol , THandlerInEvent } ;
1414use libp2p:: PeerId ;
1515use parking_lot:: Mutex ;
16- use rate_limiter:: { RPCRateLimiter as RateLimiter , RateLimitedErr } ;
16+ use rate_limiter:: RPCRateLimiter as RateLimiter ;
1717use slog:: { crit, debug, o} ;
1818use std:: marker:: PhantomData ;
1919use std:: sync:: Arc ;
@@ -25,6 +25,7 @@ pub(crate) use handler::{HandlerErr, HandlerEvent};
2525pub ( crate ) use methods:: { MetaData , MetaDataV1 , MetaDataV2 , Ping , RPCCodedResponse , RPCResponse } ;
2626pub ( crate ) use protocol:: InboundRequest ;
2727
28+ use crate :: rpc:: rate_limiter:: InboundRequestSizeLimiter ;
2829pub use handler:: SubstreamId ;
2930pub use methods:: {
3031 BlocksByRangeRequest , BlocksByRootRequest , GoodbyeReason , LightClientBootstrapRequest ,
@@ -124,6 +125,8 @@ pub struct RPC<Id: ReqId, E: EthSpec> {
124125 response_limiter : Option < Arc < Mutex < RateLimiter > > > ,
125126 /// Rate limiter for our own requests.
126127 self_limiter : Option < SelfRateLimiter < Id , E > > ,
128+ /// Limiter for our inbound requests, which checks the request size.
129+ inbound_request_size_limiter : Option < InboundRequestSizeLimiter > ,
127130 /// Queue of events to be processed.
128131 events : Vec < BehaviourAction < Id , E > > ,
129132 fork_context : Arc < ForkContext > ,
@@ -152,14 +155,19 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
152155 // });
153156 let inbound_limiter = None ; // TODO
154157
155- let response_limiter = inbound_rate_limiter_config. map ( |config| {
158+ let response_limiter = inbound_rate_limiter_config. clone ( ) . map ( |config| {
156159 debug ! ( log, "Using response rate limiting params" ; "config" => ?config) ;
157160 Arc :: new ( Mutex :: new (
158161 RateLimiter :: new_with_config ( config. 0 )
159162 . expect ( "Inbound limiter configuration parameters are valid" ) ,
160163 ) )
161164 } ) ;
162165
166+ let inbound_request_size_limiter = inbound_rate_limiter_config. map ( |config| {
167+ InboundRequestSizeLimiter :: new_with_config ( config. 0 )
168+ . expect ( "Inbound limiter configuration parameters are valid" )
169+ } ) ;
170+
163171 let self_limiter = outbound_rate_limiter_config. map ( |config| {
164172 SelfRateLimiter :: new ( config, log. clone ( ) ) . expect ( "Configuration parameters are valid" )
165173 } ) ;
@@ -168,6 +176,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
168176 limiter : inbound_limiter,
169177 response_limiter,
170178 self_limiter,
179+ inbound_request_size_limiter,
171180 events : Vec :: new ( ) ,
172181 fork_context,
173182 enable_light_client_server,
@@ -315,57 +324,42 @@ where
315324 ) {
316325 match event {
317326 HandlerEvent :: Ok ( RPCReceived :: Request ( ref id, ref req) ) => {
318- if let Some ( limiter) = self . limiter . as_mut ( ) {
319- // check if the request is conformant to the quota
320- match limiter. allows ( & peer_id, req) {
321- Ok ( ( ) ) => {
322- // send the event to the user
323- self . events . push ( ToSwarm :: GenerateEvent ( RPCMessage {
324- peer_id,
325- conn_id,
326- event,
327- } ) )
328- }
329- Err ( RateLimitedErr :: TooLarge ) => {
330- // we set the batch sizes, so this is a coding/config err for most protocols
331- let protocol = req. versioned_protocol ( ) . protocol ( ) ;
332- if matches ! (
333- protocol,
334- Protocol :: BlocksByRange
335- | Protocol :: BlobsByRange
336- | Protocol :: BlocksByRoot
337- | Protocol :: BlobsByRoot
338- ) {
339- debug ! ( self . log, "Request too large to process" ; "request" => %req, "protocol" => %protocol) ;
340- } else {
341- // Other protocols shouldn't be sending large messages, we should flag the peer kind
342- crit ! ( self . log, "Request size too large to ever be processed" ; "protocol" => %protocol) ;
343- }
344- // send an error code to the peer.
345- // the handler upon receiving the error code will send it back to the behaviour
346- self . send_response (
347- peer_id,
348- ( conn_id, * id) ,
349- RPCCodedResponse :: Error (
350- RPCResponseErrorCode :: RateLimited ,
351- "Rate limited. Request too large" . into ( ) ,
352- ) ,
353- ) ;
354- }
355- Err ( RateLimitedErr :: TooSoon ( wait_time) ) => {
356- debug ! ( self . log, "Request exceeds the rate limit" ;
357- "request" => %req, "peer_id" => %peer_id, "wait_time_ms" => wait_time. as_millis( ) ) ;
358- // send an error code to the peer.
359- // the handler upon receiving the error code will send it back to the behaviour
360- self . send_response (
361- peer_id,
362- ( conn_id, * id) ,
363- RPCCodedResponse :: Error (
364- RPCResponseErrorCode :: RateLimited ,
365- format ! ( "Wait {:?}" , wait_time) . into ( ) ,
366- ) ,
367- ) ;
327+ // TODO: Send error response if there is ongoing request with the same protocol.
328+
329+ if let Some ( limiter) = self . inbound_request_size_limiter . as_ref ( ) {
330+ // Check if the request is conformant to the quota
331+ if limiter. allows ( req) {
332+ // Send the event to the user
333+ self . events . push ( ToSwarm :: GenerateEvent ( RPCMessage {
334+ peer_id,
335+ conn_id,
336+ event,
337+ } ) )
338+ } else {
339+ // We set the batch sizes, so this is a coding/config err for most protocols
340+ let protocol = req. versioned_protocol ( ) . protocol ( ) ;
341+ if matches ! (
342+ protocol,
343+ Protocol :: BlocksByRange
344+ | Protocol :: BlobsByRange
345+ | Protocol :: BlocksByRoot
346+ | Protocol :: BlobsByRoot
347+ ) {
348+ debug ! ( self . log, "Request too large to process" ; "request" => %req, "protocol" => %protocol) ;
349+ } else {
350+ // Other protocols shouldn't be sending large messages, we should flag the peer kind
351+ crit ! ( self . log, "Request size too large to ever be processed" ; "protocol" => %protocol) ;
368352 }
353+ // Send an error code to the peer.
354+ // The handler upon receiving the error code will send it back to the behaviour
355+ self . send_response (
356+ peer_id,
357+ ( conn_id, * id) ,
358+ RPCCodedResponse :: Error (
359+ RPCResponseErrorCode :: RateLimited ,
360+ "Rate limited. Request too large" . into ( ) ,
361+ ) ,
362+ ) ;
369363 }
370364 } else {
371365 // No rate limiting, send the event to the user
0 commit comments