Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 93 additions & 13 deletions substrate/client/network/light/src/light_client_requests/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,37 @@ use sp_core::{
storage::{ChildInfo, ChildType, PrefixedStorageKey},
};
use sp_runtime::traits::Block;
use std::{marker::PhantomData, sync::Arc};
use std::{collections::HashMap, marker::PhantomData, sync::Arc, time::{Duration, Instant}};
use tokio::time::timeout;

const LOG_TARGET: &str = "light-client-request-handler";

/// Incoming requests bounded queue size. For now due to lack of data on light client request
/// handling in production systems, this value is chosen to match the block request limit.
const MAX_LIGHT_REQUEST_QUEUE: usize = 20;

/// Maximum number of requests allowed per peer within the rate limiting window.
const MAX_REQUESTS_PER_PEER: usize = 10;

/// Rate limiting window duration in seconds. Resets request counts after this duration.
const RATE_LIMIT_WINDOW_SECS: u64 = 60;

/// Timeout for individual request processing in seconds. Prevents long-running operations from blocking the handler.
const REQUEST_TIMEOUT_SECS: u64 = 30;

/// Per-peer rate limiting state.
struct PeerRateLimit {
request_count: usize,
window_start: Instant,
}

/// Handler for incoming light client requests from a remote peer.
pub struct LightClientRequestHandler<B, Client> {
request_receiver: async_channel::Receiver<IncomingRequest>,
/// Blockchain client.
client: Arc<Client>,
/// Per-peer rate limiting state.
peer_rate_limits: HashMap<PeerId, PeerRateLimit>,
_block: PhantomData<B>,
}

Expand Down Expand Up @@ -79,14 +97,69 @@ where
tx,
);

(Self { client, request_receiver, _block: PhantomData::default() }, protocol_config)
(Self { client, request_receiver, peer_rate_limits: HashMap::new(), _block: PhantomData::default() }, protocol_config)
}

/// Check if a peer has exceeded the rate limit. Returns true if the request should be allowed.
fn check_peer_rate_limit(&mut self, peer: &PeerId) -> bool {
let now = Instant::now();
let window_duration = Duration::from_secs(RATE_LIMIT_WINDOW_SECS);

match self.peer_rate_limits.get_mut(peer) {
Some(rate_limit) => {
// Check if we've exceeded the rate limit window
if now.duration_since(rate_limit.window_start) > window_duration {
// Reset the window
rate_limit.request_count = 1;
rate_limit.window_start = now;
true
} else if rate_limit.request_count < MAX_REQUESTS_PER_PEER {
// Still within limit for this window
rate_limit.request_count += 1;
true
} else {
// Exceeded limit in current window
false
}
},
None => {
// First request from this peer
self.peer_rate_limits.insert(*peer, PeerRateLimit { request_count: 1, window_start: now });
true
},
}
}

/// Run [`LightClientRequestHandler`].
pub async fn run(mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest { peer, payload, pending_response } = request;

// Check rate limit before processing the request
if !self.check_peer_rate_limit(&peer) {
debug!(
target: LOG_TARGET,
"Rate limit exceeded for peer {}: dropping request",
peer,
);

let response = OutgoingResponse {
result: Err(()),
reputation_changes: vec![ReputationChange::new(-(1 << 10), "rate limit exceeded")],
sent_feedback: None,
};

if pending_response.send(response).is_err() {
debug!(
target: LOG_TARGET,
"Failed to send rate limit response to {}: {}",
peer,
HandleRequestError::SendResponse,
);
};
continue;
}

match self.handle_request(peer, payload) {
Ok(response_data) => {
let response = OutgoingResponse {
Expand Down Expand Up @@ -141,7 +214,7 @@ where
}
}

fn handle_request(
async fn handle_request(
&mut self,
peer: PeerId,
payload: Vec<u8>,
Expand All @@ -150,11 +223,11 @@ where

let response = match &request.request {
Some(schema::v1::light::request::Request::RemoteCallRequest(r)) =>
self.on_remote_call_request(&peer, r)?,
self.on_remote_call_request(&peer, r).await?,
Some(schema::v1::light::request::Request::RemoteReadRequest(r)) =>
self.on_remote_read_request(&peer, r)?,
self.on_remote_read_request(&peer, r).await?,
Some(schema::v1::light::request::Request::RemoteReadChildRequest(r)) =>
self.on_remote_read_child_request(&peer, r)?,
self.on_remote_read_child_request(&peer, r).await?,
None =>
return Err(HandleRequestError::BadRequest("Remote request without request data.")),
};
Expand All @@ -165,24 +238,31 @@ where
Ok(data)
}

fn on_remote_call_request(
async fn on_remote_call_request(
&mut self,
peer: &PeerId,
request: &schema::v1::light::RemoteCallRequest,
) -> Result<schema::v1::light::Response, HandleRequestError> {
trace!("Remote call request from {} ({} at {:?}).", peer, request.method, request.block,);

let block = Decode::decode(&mut request.block.as_ref())?;

let response = match self.client.execution_proof(block, &request.method, &request.data) {
Ok((_, proof)) => schema::v1::light::RemoteCallResponse { proof: Some(proof.encode()) },
Err(e) => {
let client = self.client.clone();
let method = request.method.clone();
let data = request.data.clone();

let response = match timeout(
Duration::from_secs(REQUEST_TIMEOUT_SECS),
tokio::task::spawn_blocking(move || {
client.execution_proof(block, &method, &data)
})
).await {
Ok(Ok(Ok((_,proof)))) => schema::v1::light::RemoteCallResponse { proof: Some(proof.encode()) },
_ => {
trace!(
"remote call request from {} ({} at {:?}) failed with: {}",
"remote call request from {} ({} at {:?}) timed out or failed",
peer,
request.method,
request.block,
e,
);
schema::v1::light::RemoteCallResponse { proof: None }
},
Expand Down