Skip to content

Commit 5cd5202

Browse files
authored
Introduce RequestsScheduler to track (and deduplicate) all requests and validators' scores (#4752)
## Motivation The Linera client needs to interact with multiple validator nodes efficiently. Previously, the client would make individual requests to validators without: 1. Performance tracking: No mechanism to prefer faster, more reliable validators 2. Request deduplication: Concurrent requests for the same data would all hit the network, wasting bandwidth and validator resources 3. Response caching: Repeated requests for the same data would always go to validators 4. Load balancing: No rate limiting per validator, risking overload 5. Resilience: No fallback mechanism when a validator is slow or unresponsive This led to: - Unnecessary network traffic and validator load - Poor user experience with redundant waiting - No optimization based on validator performance - Risk of overwhelming validators with too many concurrent requests - No recovery mechanism when validators are slow ## Proposal This PR introduces `RequestsScheduler`, a sophisticated request orchestration layer that provides intelligent peer selection, request deduplication, caching, and performance-based routing. Key Features 1. Performance Tracking with Exponential Moving Averages (EMA) - Tracks latency, success rate, and current load for each validator - Uses configurable weights to compute a composite performance score - Intelligently selects the best available validator for each request - Weighted random selection from top performers to avoid hotspots 2. Request Deduplication - Exact matching: Multiple concurrent requests for identical data are deduplicated - Subsumption-based matching: Smaller requests are satisfied by larger in-flight requests that contain the needed data (e.g., a request for blocks 10-12 can be satisfied by an in-flight request for blocks 10-20) - Broadcast mechanism ensures all waiting requesters receive the result when the request completes - Timeout handling: Stale in-flight requests (>200ms) are not deduplicated against, allowing fresh attempts 3. Response Caching - Successfully completed requests are cached with configurable TTL (default: 2 seconds) - LRU eviction when cache reaches maximum size (default: 1000 entries) - Works with both exact and subsumption matching - Only successful results are cached 4. Slot-Based Rate Limiting - Each validator has a maximum concurrent request limit (default: 100) - Async await mechanism: requests wait for available slots without polling - Prevents overloading individual validators - Automatic slot release on request completion 5. Alternative Peer Handling - When multiple callers request the same data, they register as "alternative peers" - If the original request times out (>200ms), any alternative peer can complete the request - The result is broadcast to all waiting requesters - Provides resilience against slow validators 6. Modular Architecture Created a new ` requests_scheduler` module with clear separation of concerns: ``` requests_scheduler/ ├── mod.rs - Module exports and constants ├── scheduler.rs - RequestsScheduler orchestration logic ├── in_flight_tracker.rs - In-flight request tracking and deduplication ├── node_info.rs - Per-validator performance tracking ├── request.rs - Request types and result extraction └── scoring.rs - Configurable scoring weights ``` API High-level APIs: ```rust // Execute with best available validator scheduler.with_best(request_key, |peer| async { peer.download_certificates(chain_id, start, limit).await }).await // Execute with specific validator scheduler.with_peer(request_key, peer, |peer| async { peer.download_blob(blob_id).await }).await Configuration: let manager = RequestsScheduler::with_config( validator_nodes, max_requests_per_node: 100, weights: ScoringWeights { latency: 0.4, success: 0.4, load: 0.2 }, alpha: 0.1, // EMA smoothing factor max_expected_latency_ms: 5000.0, cache_ttl: Duration::from_secs(2), max_cache_size: 1000, ); ``` Benefits - Reduced network load: Deduplication and caching eliminate redundant requests - Better performance: Intelligent peer selection routes to fastest validators - Improved reliability: Alternative peer mechanism provides resilience - Protection for validators: Rate limiting prevents overload - Efficient resource usage: EMA-based scoring optimizes validator selection - Clean architecture: Modular design makes code maintainable and testable Metrics In production usage, this should significantly reduce: - Network traffic between clients and validators - Validator CPU/memory usage from redundant requests - Client request latency through caching and smart routing - Failed requests through performance tracking and rate limiting The following metrics have been added to Prometheus (with compiled with `--features metrics`): - `requests_scheduler_response_time_ms` - Response time for requests to validators in milliseconds - `requests_scheduler_request_total` - Total number of requests made to each validator - `requests_scheduler_request_success` - Number of successful requests to each validator (`(requests_scheduler_request_total - requests_scheduler_request_success) / requests_scheduler_request_total` is an error rate) - `requests_scheduler_request_deduplication_total` - Number of requests that were deduplicated by joining an in-flight request - `requests_scheduler_request_cache_hit_total` - Number of requests that were served from cache ## Test Plan Existing CI makes sure we maintain backwards compatibility. Some tests have been added to the new modules. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links <!-- Optional section for related PRs, related issues, and other references. If needed, please create issues to track future improvements and link them here. --> - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 4713776 commit 5cd5202

File tree

20 files changed

+2903
-126
lines changed

20 files changed

+2903
-126
lines changed

CLI.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,24 @@ Client implementation and command-line tool for the Linera blockchain
184184
* `--max-joined-tasks <MAX_JOINED_TASKS>` — Maximum number of tasks that can are joined concurrently in the client
185185

186186
Default value: `100`
187+
* `--max-in-flight-requests <MAX_IN_FLIGHT_REQUESTS>` — Maximum concurrent requests per validator node
188+
189+
Default value: `100`
190+
* `--max-accepted-latency-ms <MAX_ACCEPTED_LATENCY_MS>` — Maximum expected latency in milliseconds for score normalization
191+
192+
Default value: `5000`
193+
* `--cache-ttl-ms <CACHE_TTL_MS>` — Time-to-live for cached responses in milliseconds
194+
195+
Default value: `2000`
196+
* `--cache-max-size <CACHE_MAX_SIZE>` — Maximum number of entries in the cache
197+
198+
Default value: `1000`
199+
* `--max-request-ttl-ms <MAX_REQUEST_TTL_MS>` — Maximum latency for an in-flight request before we stop deduplicating it (in milliseconds)
200+
201+
Default value: `200`
202+
* `--alpha <ALPHA>` — Smoothing factor for Exponential Moving Averages (0 < alpha < 1). Higher values give more weight to recent observations. Typical values are between 0.01 and 0.5. A value of 0.1 means that 10% of the new observation is considered and 90% of the previous average is retained
203+
204+
Default value: `0.1`
187205
* `--storage <STORAGE_CONFIG>` — Storage configuration for the blockchain history
188206
* `--storage-max-concurrent-queries <STORAGE_MAX_CONCURRENT_QUERIES>` — The maximal number of simultaneous queries to the database
189207
* `--storage-max-stream-queries <STORAGE_MAX_STREAM_QUERIES>` — The maximal number of simultaneous stream queries to the database

linera-client/src/client_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ where
291291
options.chain_worker_ttl,
292292
options.sender_chain_worker_ttl,
293293
options.to_chain_client_options(),
294+
options.to_requests_scheduler_config(),
294295
);
295296

296297
#[cfg(not(web))]
@@ -351,6 +352,7 @@ where
351352
cross_chain_message_delivery: CrossChainMessageDelivery::Blocking,
352353
..ChainClientOptions::test_default()
353354
},
355+
linera_core::client::RequestsSchedulerConfig::default(),
354356
);
355357

356358
ClientContext {

linera-client/src/client_options.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,58 @@ pub struct ClientContextOptions {
186186
/// Maximum number of tasks that can are joined concurrently in the client.
187187
#[arg(long, default_value = "100")]
188188
pub max_joined_tasks: usize,
189+
190+
/// Maximum concurrent requests per validator node.
191+
#[arg(
192+
long,
193+
default_value_t = linera_core::client::requests_scheduler::MAX_IN_FLIGHT_REQUESTS,
194+
env = "LINERA_REQUESTS_SCHEDULER_MAX_IN_FLIGHT_REQUESTS"
195+
)]
196+
pub max_in_flight_requests: usize,
197+
198+
/// Maximum expected latency in milliseconds for score normalization.
199+
#[arg(
200+
long,
201+
default_value_t = linera_core::client::requests_scheduler::MAX_ACCEPTED_LATENCY_MS,
202+
env = "LINERA_REQUESTS_SCHEDULER_MAX_ACCEPTED_LATENCY_MS"
203+
)]
204+
pub max_accepted_latency_ms: f64,
205+
206+
/// Time-to-live for cached responses in milliseconds.
207+
#[arg(
208+
long,
209+
default_value_t = linera_core::client::requests_scheduler::CACHE_TTL_MS,
210+
env = "LINERA_REQUESTS_SCHEDULER_CACHE_TTL_MS"
211+
)]
212+
pub cache_ttl_ms: u64,
213+
214+
/// Maximum number of entries in the cache.
215+
#[arg(
216+
long,
217+
default_value_t = linera_core::client::requests_scheduler::CACHE_MAX_SIZE,
218+
env = "LINERA_REQUESTS_SCHEDULER_CACHE_MAX_SIZE"
219+
)]
220+
pub cache_max_size: usize,
221+
222+
/// Maximum latency for an in-flight request before we stop deduplicating it (in milliseconds).
223+
#[arg(
224+
long,
225+
default_value_t = linera_core::client::requests_scheduler::MAX_REQUEST_TTL_MS,
226+
env = "LINERA_REQUESTS_SCHEDULER_MAX_REQUEST_TTL_MS"
227+
)]
228+
pub max_request_ttl_ms: u64,
229+
230+
/// Smoothing factor for Exponential Moving Averages (0 < alpha < 1).
231+
/// Higher values give more weight to recent observations.
232+
/// Typical values are between 0.01 and 0.5.
233+
/// A value of 0.1 means that 10% of the new observation is considered
234+
/// and 90% of the previous average is retained.
235+
#[arg(
236+
long,
237+
default_value_t = linera_core::client::requests_scheduler::ALPHA_SMOOTHING_FACTOR,
238+
env = "LINERA_REQUESTS_SCHEDULER_ALPHA"
239+
)]
240+
pub alpha: f64,
189241
}
190242

191243
impl ClientContextOptions {
@@ -218,6 +270,20 @@ impl ClientContextOptions {
218270
report_interval_secs: self.timing_interval,
219271
}
220272
}
273+
274+
/// Creates [`RequestsSchedulerConfig`] with the corresponding values.
275+
pub(crate) fn to_requests_scheduler_config(
276+
&self,
277+
) -> linera_core::client::RequestsSchedulerConfig {
278+
linera_core::client::RequestsSchedulerConfig {
279+
max_in_flight_requests: self.max_in_flight_requests,
280+
max_accepted_latency_ms: self.max_accepted_latency_ms,
281+
cache_ttl_ms: self.cache_ttl_ms,
282+
cache_max_size: self.cache_max_size,
283+
max_request_ttl_ms: self.max_request_ttl_ms,
284+
alpha: self.alpha,
285+
}
286+
}
221287
}
222288

223289
#[derive(Debug, Clone, clap::Args)]

linera-client/src/unit_tests/chain_listener.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ async fn test_chain_listener() -> anyhow::Result<()> {
119119
Duration::from_secs(30),
120120
Duration::from_secs(1),
121121
ChainClientOptions::test_default(),
122+
linera_core::client::RequestsSchedulerConfig::default(),
122123
)),
123124
};
124125
context
@@ -204,6 +205,7 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> {
204205
Duration::from_secs(30),
205206
Duration::from_secs(1),
206207
ChainClientOptions::test_default(),
208+
linera_core::client::RequestsSchedulerConfig::default(),
207209
)),
208210
};
209211
let context = Arc::new(Mutex::new(context));

linera-core/src/client/mod.rs

Lines changed: 56 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ mod chain_client_state;
8686
#[cfg(test)]
8787
#[path = "../unit_tests/client_tests.rs"]
8888
mod client_tests;
89+
pub mod requests_scheduler;
90+
91+
pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
8992
mod received_log;
9093
mod validator_trackers;
9194

@@ -149,6 +152,8 @@ pub struct Client<Env: Environment> {
149152
/// Local node to manage the execution state and the local storage of the chains that we are
150153
/// tracking.
151154
local_node: LocalNodeClient<Env::Storage>,
155+
/// Manages the requests sent to validator nodes.
156+
requests_scheduler: RequestsScheduler<Env>,
152157
/// The admin chain ID.
153158
admin_id: ChainId,
154159
/// Chains that should be tracked by the client.
@@ -175,6 +180,7 @@ impl<Env: Environment> Client<Env> {
175180
chain_worker_ttl: Duration,
176181
sender_chain_worker_ttl: Duration,
177182
options: ChainClientOptions,
183+
requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
178184
) -> Self {
179185
let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
180186
let state = WorkerState::new_for_client(
@@ -188,10 +194,12 @@ impl<Env: Environment> Client<Env> {
188194
.with_chain_worker_ttl(chain_worker_ttl)
189195
.with_sender_chain_worker_ttl(sender_chain_worker_ttl);
190196
let local_node = LocalNodeClient::new(state);
197+
let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
191198

192199
Self {
193200
environment,
194201
local_node,
202+
requests_scheduler,
195203
chains: papaya::HashMap::new(),
196204
admin_id,
197205
tracked_chains,
@@ -347,8 +355,10 @@ impl<Env: Environment> Client<Env> {
347355
.checked_sub(u64::from(next_height))
348356
.ok_or(ArithmeticError::Overflow)?
349357
.min(self.options.certificate_download_batch_size);
350-
let certificates = remote_node
351-
.query_certificates_from(chain_id, next_height, limit)
358+
359+
let certificates = self
360+
.requests_scheduler
361+
.download_certificates(remote_node, chain_id, next_height, limit)
352362
.await?;
353363
let Some(info) = self.process_certificates(remote_node, certificates).await? else {
354364
break;
@@ -362,28 +372,25 @@ impl<Env: Environment> Client<Env> {
362372

363373
async fn download_blobs(
364374
&self,
365-
remote_node: &RemoteNode<impl ValidatorNode>,
366-
blob_ids: impl IntoIterator<Item = BlobId>,
375+
remote_nodes: &[RemoteNode<Env::ValidatorNode>],
376+
blob_ids: &[BlobId],
367377
) -> Result<(), ChainClientError> {
368-
self.local_node
369-
.store_blobs(
370-
&futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move {
371-
remote_node.try_download_blob(blob_id).await.unwrap()
372-
}))
373-
.buffer_unordered(self.options.max_joined_tasks)
374-
.collect::<Vec<_>>()
375-
.await,
376-
)
377-
.await
378-
.map_err(Into::into)
378+
let blobs = &self
379+
.requests_scheduler
380+
.download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
381+
.await?
382+
.ok_or_else(|| {
383+
ChainClientError::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
384+
})?;
385+
self.local_node.store_blobs(blobs).await.map_err(Into::into)
379386
}
380387

381388
/// Tries to process all the certificates, requesting any missing blobs from the given node.
382389
/// Returns the chain info of the last successfully processed certificate.
383390
#[instrument(level = "trace", skip_all)]
384391
async fn process_certificates(
385392
&self,
386-
remote_node: &RemoteNode<impl ValidatorNode>,
393+
remote_node: &RemoteNode<Env::ValidatorNode>,
387394
certificates: Vec<ConfirmedBlockCertificate>,
388395
) -> Result<Option<Box<ChainInfo>>, ChainClientError> {
389396
let mut info = None;
@@ -398,7 +405,8 @@ impl<Env: Environment> Client<Env> {
398405
.await
399406
{
400407
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
401-
self.download_blobs(remote_node, blob_ids).await?;
408+
self.download_blobs(&[remote_node.clone()], &blob_ids)
409+
.await?;
402410
}
403411
x => {
404412
x?;
@@ -409,7 +417,8 @@ impl<Env: Environment> Client<Env> {
409417
info = Some(
410418
match self.handle_certificate(certificate.clone()).await {
411419
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
412-
self.download_blobs(remote_node, blob_ids).await?;
420+
self.download_blobs(&[remote_node.clone()], &blob_ids)
421+
.await?;
413422
self.handle_certificate(certificate).await?
414423
}
415424
x => x?,
@@ -663,7 +672,6 @@ impl<Env: Environment> Client<Env> {
663672
) -> Result<(), ChainClientError> {
664673
let certificate = Box::new(certificate);
665674
let block = certificate.block();
666-
667675
// Recover history from the network.
668676
self.download_certificates(block.header.chain_id, block.header.height)
669677
.await?;
@@ -672,14 +680,9 @@ impl<Env: Environment> Client<Env> {
672680
if let Err(err) = self.process_certificate(certificate.clone()).await {
673681
match &err {
674682
LocalNodeError::BlobsNotFound(blob_ids) => {
675-
let blobs = RemoteNode::download_blobs(
676-
blob_ids,
677-
&self.validator_nodes().await?,
678-
self.options.blob_download_timeout,
679-
)
680-
.await
681-
.ok_or(err)?;
682-
self.local_node.store_blobs(&blobs).await?;
683+
self.download_blobs(&self.validator_nodes().await?, blob_ids)
684+
.await
685+
.map_err(|_| err)?;
683686
self.process_certificate(certificate).await?;
684687
}
685688
_ => {
@@ -716,14 +719,7 @@ impl<Env: Environment> Client<Env> {
716719
if let Err(err) = self.handle_certificate(certificate.clone()).await {
717720
match &err {
718721
LocalNodeError::BlobsNotFound(blob_ids) => {
719-
let blobs = RemoteNode::download_blobs(
720-
blob_ids,
721-
&nodes,
722-
self.options.blob_download_timeout,
723-
)
724-
.await
725-
.ok_or(err)?;
726-
self.local_node.store_blobs(&blobs).await?;
722+
self.download_blobs(&nodes, blob_ids).await?;
727723
self.handle_certificate(certificate.clone()).await?;
728724
}
729725
_ => {
@@ -777,8 +773,13 @@ impl<Env: Environment> Client<Env> {
777773
// anything from the validator - let the function try the other validators
778774
return Err(());
779775
}
780-
let certificates = remote_node
781-
.download_certificates_by_heights(sender_chain_id, remote_heights)
776+
let certificates = self
777+
.requests_scheduler
778+
.download_certificates_by_heights(
779+
&remote_node,
780+
sender_chain_id,
781+
remote_heights,
782+
)
782783
.await
783784
.map_err(|_| ())?;
784785
let mut certificates_with_check_results = vec![];
@@ -934,8 +935,13 @@ impl<Env: Environment> Client<Env> {
934935
// Stop if we've reached the height we've already processed.
935936
while current_height >= next_outbox_height {
936937
// Download the certificate for this height.
937-
let downloaded = remote_node
938-
.download_certificates_by_heights(sender_chain_id, vec![current_height])
938+
let downloaded = self
939+
.requests_scheduler
940+
.download_certificates_by_heights(
941+
remote_node,
942+
sender_chain_id,
943+
vec![current_height],
944+
)
939945
.await?;
940946
let Some(certificate) = downloaded.into_iter().next() else {
941947
return Err(ChainClientError::CannotDownloadMissingSenderBlock {
@@ -1119,9 +1125,9 @@ impl<Env: Environment> Client<Env> {
11191125
if !required_blob_ids.is_empty() {
11201126
let mut blobs = Vec::new();
11211127
for blob_id in required_blob_ids {
1122-
let blob_content = match remote_node
1123-
.node
1124-
.download_pending_blob(chain_id, blob_id)
1128+
let blob_content = match self
1129+
.requests_scheduler
1130+
.download_pending_blob(remote_node, chain_id, blob_id)
11251131
.await
11261132
{
11271133
Ok(content) => content,
@@ -1217,9 +1223,9 @@ impl<Env: Environment> Client<Env> {
12171223
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
12181224
let mut blobs = Vec::new();
12191225
for blob_id in blob_ids {
1220-
let blob_content = remote_node
1221-
.node
1222-
.download_pending_blob(chain_id, blob_id)
1226+
let blob_content = self
1227+
.requests_scheduler
1228+
.download_pending_blob(remote_node, chain_id, blob_id)
12231229
.await?;
12241230
blobs.push(Blob::new(blob_content));
12251231
}
@@ -1248,7 +1254,10 @@ impl<Env: Environment> Client<Env> {
12481254
communicate_concurrently(
12491255
remote_nodes,
12501256
async move |remote_node| {
1251-
let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
1257+
let certificate = self
1258+
.requests_scheduler
1259+
.download_certificate_for_blob(&remote_node, blob_id)
1260+
.await?;
12521261
self.receive_sender_certificate(
12531262
certificate,
12541263
ReceiveCertificateMode::NeedsCheck,
@@ -4123,7 +4132,7 @@ impl<Env: Environment> ChainClient<Env> {
41234132
}
41244133

41254134
/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on
4126-
/// each subsequent node. Returns error `err` is all of the nodes fail.
4135+
/// each subsequent node. Returns error `err` if all of the nodes fail.
41274136
async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
41284137
nodes: &[RemoteNode<A>],
41294138
f: F,

0 commit comments

Comments
 (0)