Skip to content

Commit 6923600

Browse files
authored
Introduce RequestsScheduler to track (and deduplicate) all requests and validators' scores (#4856)
Backport of #4752 ## Motivation The Linera client needs to interact with multiple validator nodes efficiently. Previously, the client would make individual requests to validators without: 1. Performance tracking: No mechanism to prefer faster, more reliable validators 2. Request deduplication: Concurrent requests for the same data would all hit the network, wasting bandwidth and validator resources 3. Response caching: Repeated requests for the same data would always go to validators 4. Load balancing: No rate limiting per validator, risking overload 5. Resilience: No fallback mechanism when a validator is slow or unresponsive This led to: - Unnecessary network traffic and validator load - Poor user experience with redundant waiting - No optimization based on validator performance - Risk of overwhelming validators with too many concurrent requests - No recovery mechanism when validators are slow ## Proposal This PR introduces `RequestsScheduler`, a sophisticated request orchestration layer that provides intelligent peer selection, request deduplication, caching, and performance-based routing. Key Features 1. Performance Tracking with Exponential Moving Averages (EMA) - Tracks latency, success rate, and current load for each validator - Uses configurable weights to compute a composite performance score - Intelligently selects the best available validator for each request - Weighted random selection from top performers to avoid hotspots 2. Request Deduplication - Exact matching: Multiple concurrent requests for identical data are deduplicated - Subsumption-based matching: Smaller requests are satisfied by larger in-flight requests that contain the needed data (e.g., a request for blocks 10-12 can be satisfied by an in-flight request for blocks 10-20) - Broadcast mechanism ensures all waiting requesters receive the result when the request completes - Timeout handling: Stale in-flight requests (>200ms) are not deduplicated against, allowing fresh attempts 3. Response Caching - Successfully completed requests are cached with configurable TTL (default: 2 seconds) - LRU eviction when cache reaches maximum size (default: 1000 entries) - Works with both exact and subsumption matching - Only successful results are cached 4. Slot-Based Rate Limiting - Each validator has a maximum concurrent request limit (default: 100) - Async await mechanism: requests wait for available slots without polling - Prevents overloading individual validators - Automatic slot release on request completion 5. Alternative Peer Handling - When multiple callers request the same data, they register as "alternative peers" - If the original request times out (>200ms), any alternative peer can complete the request - The result is broadcast to all waiting requesters - Provides resilience against slow validators 6. Modular Architecture Created a new ` requests_scheduler` module with clear separation of concerns: ``` requests_scheduler/ ├── mod.rs - Module exports and constants ├── scheduler.rs - RequestsScheduler orchestration logic ├── in_flight_tracker.rs - In-flight request tracking and deduplication ├── node_info.rs - Per-validator performance tracking ├── request.rs - Request types and result extraction └── scoring.rs - Configurable scoring weights ``` API High-level APIs: ```rust // Execute with best available validator scheduler.with_best(request_key, |peer| async { peer.download_certificates(chain_id, start, limit).await }).await // Execute with specific validator scheduler.with_peer(request_key, peer, |peer| async { peer.download_blob(blob_id).await }).await Configuration: let manager = RequestsScheduler::with_config( validator_nodes, max_requests_per_node: 100, weights: ScoringWeights { latency: 0.4, success: 0.4, load: 0.2 }, alpha: 0.1, // EMA smoothing factor max_expected_latency_ms: 5000.0, cache_ttl: Duration::from_secs(2), max_cache_size: 1000, ); ``` Benefits - Reduced network load: Deduplication and caching eliminate redundant requests - Better performance: Intelligent peer selection routes to fastest validators - Improved reliability: Alternative peer mechanism provides resilience - Protection for validators: Rate limiting prevents overload - Efficient resource usage: EMA-based scoring optimizes validator selection - Clean architecture: Modular design makes code maintainable and testable Metrics In production usage, this should significantly reduce: - Network traffic between clients and validators - Validator CPU/memory usage from redundant requests - Client request latency through caching and smart routing - Failed requests through performance tracking and rate limiting The following metrics have been added to Prometheus (with compiled with `--features metrics`): - `requests_scheduler_response_time_ms` - Response time for requests to validators in milliseconds - `requests_scheduler_request_total` - Total number of requests made to each validator - `requests_scheduler_request_success` - Number of successful requests to each validator (`(requests_scheduler_request_total - requests_scheduler_request_success) / requests_scheduler_request_total` is an error rate) - `requests_scheduler_request_deduplication_total` - Number of requests that were deduplicated by joining an in-flight request - `requests_scheduler_request_cache_hit_total` - Number of requests that were served from cache ## Test Plan Existing CI makes sure we maintain backwards compatibility. Some tests have been added to the new modules. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links <!-- Optional section for related PRs, related issues, and other references. If needed, please create issues to track future improvements and link them here. --> - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent e290d34 commit 6923600

File tree

20 files changed

+2686
-127
lines changed

20 files changed

+2686
-127
lines changed

CLI.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,21 @@ Client implementation and command-line tool for the Linera blockchain
190190
* `--max-joined-tasks <MAX_JOINED_TASKS>` — Maximum number of tasks that can are joined concurrently in the client
191191

192192
Default value: `100`
193+
* `--max-accepted-latency-ms <MAX_ACCEPTED_LATENCY_MS>` — Maximum expected latency in milliseconds for score normalization
194+
195+
Default value: `5000`
196+
* `--cache-ttl-ms <CACHE_TTL_MS>` — Time-to-live for cached responses in milliseconds
197+
198+
Default value: `2000`
199+
* `--cache-max-size <CACHE_MAX_SIZE>` — Maximum number of entries in the cache
200+
201+
Default value: `1000`
202+
* `--max-request-ttl-ms <MAX_REQUEST_TTL_MS>` — Maximum latency for an in-flight request before we stop deduplicating it (in milliseconds)
203+
204+
Default value: `200`
205+
* `--alpha <ALPHA>` — Smoothing factor for Exponential Moving Averages (0 < alpha < 1). Higher values give more weight to recent observations. Typical values are between 0.01 and 0.5. A value of 0.1 means that 10% of the new observation is considered and 90% of the previous average is retained
206+
207+
Default value: `0.1`
193208
* `--storage <STORAGE_CONFIG>` — Storage configuration for the blockchain history
194209
* `--storage-max-concurrent-queries <STORAGE_MAX_CONCURRENT_QUERIES>` — The maximal number of simultaneous queries to the database
195210
* `--storage-max-stream-queries <STORAGE_MAX_STREAM_QUERIES>` — The maximal number of simultaneous stream queries to the database

linera-client/src/client_context.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ where
301301
options.to_chain_client_options(),
302302
block_cache_size,
303303
execution_state_cache_size,
304+
options.to_requests_scheduler_config(),
304305
);
305306

306307
#[cfg(not(web))]
@@ -369,6 +370,7 @@ where
369370
},
370371
block_cache_size,
371372
execution_state_cache_size,
373+
linera_core::client::RequestsSchedulerConfig::default(),
372374
);
373375

374376
ClientContext {

linera-client/src/client_options.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,50 @@ pub struct ClientContextOptions {
186186
/// Maximum number of tasks that can are joined concurrently in the client.
187187
#[arg(long, default_value = "100")]
188188
pub max_joined_tasks: usize,
189+
190+
/// Maximum expected latency in milliseconds for score normalization.
191+
#[arg(
192+
long,
193+
default_value_t = linera_core::client::requests_scheduler::MAX_ACCEPTED_LATENCY_MS,
194+
env = "LINERA_REQUESTS_SCHEDULER_MAX_ACCEPTED_LATENCY_MS"
195+
)]
196+
pub max_accepted_latency_ms: f64,
197+
198+
/// Time-to-live for cached responses in milliseconds.
199+
#[arg(
200+
long,
201+
default_value_t = linera_core::client::requests_scheduler::CACHE_TTL_MS,
202+
env = "LINERA_REQUESTS_SCHEDULER_CACHE_TTL_MS"
203+
)]
204+
pub cache_ttl_ms: u64,
205+
206+
/// Maximum number of entries in the cache.
207+
#[arg(
208+
long,
209+
default_value_t = linera_core::client::requests_scheduler::CACHE_MAX_SIZE,
210+
env = "LINERA_REQUESTS_SCHEDULER_CACHE_MAX_SIZE"
211+
)]
212+
pub cache_max_size: usize,
213+
214+
/// Maximum latency for an in-flight request before we stop deduplicating it (in milliseconds).
215+
#[arg(
216+
long,
217+
default_value_t = linera_core::client::requests_scheduler::MAX_REQUEST_TTL_MS,
218+
env = "LINERA_REQUESTS_SCHEDULER_MAX_REQUEST_TTL_MS"
219+
)]
220+
pub max_request_ttl_ms: u64,
221+
222+
/// Smoothing factor for Exponential Moving Averages (0 < alpha < 1).
223+
/// Higher values give more weight to recent observations.
224+
/// Typical values are between 0.01 and 0.5.
225+
/// A value of 0.1 means that 10% of the new observation is considered
226+
/// and 90% of the previous average is retained.
227+
#[arg(
228+
long,
229+
default_value_t = linera_core::client::requests_scheduler::ALPHA_SMOOTHING_FACTOR,
230+
env = "LINERA_REQUESTS_SCHEDULER_ALPHA"
231+
)]
232+
pub alpha: f64,
189233
}
190234

191235
impl ClientContextOptions {
@@ -218,6 +262,19 @@ impl ClientContextOptions {
218262
report_interval_secs: self.timing_interval,
219263
}
220264
}
265+
266+
/// Creates [`RequestsSchedulerConfig`] with the corresponding values.
267+
pub(crate) fn to_requests_scheduler_config(
268+
&self,
269+
) -> linera_core::client::RequestsSchedulerConfig {
270+
linera_core::client::RequestsSchedulerConfig {
271+
max_accepted_latency_ms: self.max_accepted_latency_ms,
272+
cache_ttl_ms: self.cache_ttl_ms,
273+
cache_max_size: self.cache_max_size,
274+
max_request_ttl_ms: self.max_request_ttl_ms,
275+
alpha: self.alpha,
276+
}
277+
}
221278
}
222279

223280
#[derive(Debug, Clone, clap::Args)]

linera-client/src/unit_tests/chain_listener.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ async fn test_chain_listener() -> anyhow::Result<()> {
125125
chain_client::Options::test_default(),
126126
5_000,
127127
10_000,
128+
linera_core::client::RequestsSchedulerConfig::default(),
128129
)),
129130
};
130131
context
@@ -213,6 +214,7 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> {
213214
chain_client::Options::test_default(),
214215
5_000,
215216
10_000,
217+
linera_core::client::RequestsSchedulerConfig::default(),
216218
)),
217219
};
218220
let context = Arc::new(Mutex::new(context));

linera-core/src/client/mod.rs

Lines changed: 57 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use crate::{
5151
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse, RoundTimeout},
5252
environment::Environment,
5353
local_node::{LocalChainInfoExt as _, LocalNodeClient, LocalNodeError},
54-
node::{CrossChainMessageDelivery, NodeError, ValidatorNode, ValidatorNodeProvider as _},
54+
node::{CrossChainMessageDelivery, NodeError, ValidatorNodeProvider as _},
5555
notifier::ChannelNotifier,
5656
remote_node::RemoteNode,
5757
updater::{communicate_with_quorum, CommunicateAction, ValidatorUpdater},
@@ -67,6 +67,9 @@ pub use crate::data_types::ClientOutcome;
6767
#[cfg(test)]
6868
#[path = "../unit_tests/client_tests.rs"]
6969
mod client_tests;
70+
pub mod requests_scheduler;
71+
72+
pub use requests_scheduler::{RequestsScheduler, RequestsSchedulerConfig, ScoringWeights};
7073
mod received_log;
7174
mod validator_trackers;
7275

@@ -257,6 +260,8 @@ pub struct Client<Env: Environment> {
257260
/// Local node to manage the execution state and the local storage of the chains that we are
258261
/// tracking.
259262
local_node: LocalNodeClient<Env::Storage>,
263+
/// Manages the requests sent to validator nodes.
264+
requests_scheduler: RequestsScheduler<Env>,
260265
/// The admin chain ID.
261266
admin_id: ChainId,
262267
/// Chains that should be tracked by the client.
@@ -285,6 +290,7 @@ impl<Env: Environment> Client<Env> {
285290
options: chain_client::Options,
286291
block_cache_size: usize,
287292
execution_state_cache_size: usize,
293+
requests_scheduler_config: requests_scheduler::RequestsSchedulerConfig,
288294
) -> Self {
289295
let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
290296
let state = WorkerState::new_for_client(
@@ -300,10 +306,12 @@ impl<Env: Environment> Client<Env> {
300306
.with_chain_worker_ttl(chain_worker_ttl)
301307
.with_sender_chain_worker_ttl(sender_chain_worker_ttl);
302308
let local_node = LocalNodeClient::new(state);
309+
let requests_scheduler = RequestsScheduler::new(vec![], requests_scheduler_config);
303310

304311
Self {
305312
environment,
306313
local_node,
314+
requests_scheduler,
307315
chains: papaya::HashMap::new(),
308316
admin_id,
309317
tracked_chains,
@@ -484,8 +492,10 @@ impl<Env: Environment> Client<Env> {
484492
.checked_sub(u64::from(next_height))
485493
.ok_or(ArithmeticError::Overflow)?
486494
.min(self.options.certificate_download_batch_size);
487-
let certificates = remote_node
488-
.query_certificates_from(chain_id, next_height, limit)
495+
496+
let certificates = self
497+
.requests_scheduler
498+
.download_certificates(remote_node, chain_id, next_height, limit)
489499
.await?;
490500
let Some(info) = self.process_certificates(remote_node, certificates).await? else {
491501
break;
@@ -499,28 +509,25 @@ impl<Env: Environment> Client<Env> {
499509

500510
async fn download_blobs(
501511
&self,
502-
remote_node: &RemoteNode<impl ValidatorNode>,
503-
blob_ids: impl IntoIterator<Item = BlobId>,
512+
remote_nodes: &[RemoteNode<Env::ValidatorNode>],
513+
blob_ids: &[BlobId],
504514
) -> Result<(), chain_client::Error> {
505-
self.local_node
506-
.store_blobs(
507-
&futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move {
508-
remote_node.try_download_blob(blob_id).await.unwrap()
509-
}))
510-
.buffer_unordered(self.options.max_joined_tasks)
511-
.collect::<Vec<_>>()
512-
.await,
513-
)
514-
.await
515-
.map_err(Into::into)
515+
let blobs = &self
516+
.requests_scheduler
517+
.download_blobs(remote_nodes, blob_ids, self.options.blob_download_timeout)
518+
.await?
519+
.ok_or_else(|| {
520+
chain_client::Error::RemoteNodeError(NodeError::BlobsNotFound(blob_ids.to_vec()))
521+
})?;
522+
self.local_node.store_blobs(blobs).await.map_err(Into::into)
516523
}
517524

518525
/// Tries to process all the certificates, requesting any missing blobs from the given node.
519526
/// Returns the chain info of the last successfully processed certificate.
520527
#[instrument(level = "trace", skip_all)]
521528
async fn process_certificates(
522529
&self,
523-
remote_node: &RemoteNode<impl ValidatorNode>,
530+
remote_node: &RemoteNode<Env::ValidatorNode>,
524531
certificates: Vec<ConfirmedBlockCertificate>,
525532
) -> Result<Option<Box<ChainInfo>>, chain_client::Error> {
526533
let mut info = None;
@@ -535,7 +542,8 @@ impl<Env: Environment> Client<Env> {
535542
.await
536543
{
537544
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
538-
self.download_blobs(remote_node, blob_ids).await?;
545+
self.download_blobs(&[remote_node.clone()], &blob_ids)
546+
.await?;
539547
}
540548
x => {
541549
x?;
@@ -546,7 +554,8 @@ impl<Env: Environment> Client<Env> {
546554
info = Some(
547555
match self.handle_certificate(certificate.clone()).await {
548556
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
549-
self.download_blobs(remote_node, blob_ids).await?;
557+
self.download_blobs(&[remote_node.clone()], &blob_ids)
558+
.await?;
550559
self.handle_certificate(certificate).await?
551560
}
552561
x => x?,
@@ -802,7 +811,6 @@ impl<Env: Environment> Client<Env> {
802811
) -> Result<(), chain_client::Error> {
803812
let certificate = Box::new(certificate);
804813
let block = certificate.block();
805-
806814
// Recover history from the network.
807815
self.download_certificates(block.header.chain_id, block.header.height)
808816
.await?;
@@ -811,14 +819,9 @@ impl<Env: Environment> Client<Env> {
811819
if let Err(err) = self.process_certificate(certificate.clone()).await {
812820
match &err {
813821
LocalNodeError::BlobsNotFound(blob_ids) => {
814-
let blobs = RemoteNode::download_blobs(
815-
blob_ids,
816-
&self.validator_nodes().await?,
817-
self.options.blob_download_timeout,
818-
)
819-
.await
820-
.ok_or(err)?;
821-
self.local_node.store_blobs(&blobs).await?;
822+
self.download_blobs(&self.validator_nodes().await?, blob_ids)
823+
.await
824+
.map_err(|_| err)?;
822825
self.process_certificate(certificate).await?;
823826
}
824827
_ => {
@@ -855,14 +858,7 @@ impl<Env: Environment> Client<Env> {
855858
if let Err(err) = self.handle_certificate(certificate.clone()).await {
856859
match &err {
857860
LocalNodeError::BlobsNotFound(blob_ids) => {
858-
let blobs = RemoteNode::download_blobs(
859-
blob_ids,
860-
&nodes,
861-
self.options.blob_download_timeout,
862-
)
863-
.await
864-
.ok_or(err)?;
865-
self.local_node.store_blobs(&blobs).await?;
861+
self.download_blobs(&nodes, blob_ids).await?;
866862
self.handle_certificate(certificate.clone()).await?;
867863
}
868864
_ => {
@@ -916,8 +912,13 @@ impl<Env: Environment> Client<Env> {
916912
// anything from the validator - let the function try the other validators
917913
return Err(());
918914
}
919-
let certificates = remote_node
920-
.download_certificates_by_heights(sender_chain_id, remote_heights)
915+
let certificates = self
916+
.requests_scheduler
917+
.download_certificates_by_heights(
918+
&remote_node,
919+
sender_chain_id,
920+
remote_heights,
921+
)
921922
.await
922923
.map_err(|_| ())?;
923924
let mut certificates_with_check_results = vec![];
@@ -1073,8 +1074,13 @@ impl<Env: Environment> Client<Env> {
10731074
// Stop if we've reached the height we've already processed.
10741075
while current_height >= next_outbox_height {
10751076
// Download the certificate for this height.
1076-
let downloaded = remote_node
1077-
.download_certificates_by_heights(sender_chain_id, vec![current_height])
1077+
let downloaded = self
1078+
.requests_scheduler
1079+
.download_certificates_by_heights(
1080+
remote_node,
1081+
sender_chain_id,
1082+
vec![current_height],
1083+
)
10781084
.await?;
10791085
let Some(certificate) = downloaded.into_iter().next() else {
10801086
return Err(chain_client::Error::CannotDownloadMissingSenderBlock {
@@ -1258,9 +1264,9 @@ impl<Env: Environment> Client<Env> {
12581264
if !required_blob_ids.is_empty() {
12591265
let mut blobs = Vec::new();
12601266
for blob_id in required_blob_ids {
1261-
let blob_content = match remote_node
1262-
.node
1263-
.download_pending_blob(chain_id, blob_id)
1267+
let blob_content = match self
1268+
.requests_scheduler
1269+
.download_pending_blob(remote_node, chain_id, blob_id)
12641270
.await
12651271
{
12661272
Ok(content) => content,
@@ -1356,9 +1362,9 @@ impl<Env: Environment> Client<Env> {
13561362
Err(LocalNodeError::BlobsNotFound(blob_ids)) => {
13571363
let mut blobs = Vec::new();
13581364
for blob_id in blob_ids {
1359-
let blob_content = remote_node
1360-
.node
1361-
.download_pending_blob(chain_id, blob_id)
1365+
let blob_content = self
1366+
.requests_scheduler
1367+
.download_pending_blob(remote_node, chain_id, blob_id)
13621368
.await?;
13631369
blobs.push(Blob::new(blob_content));
13641370
}
@@ -1387,7 +1393,10 @@ impl<Env: Environment> Client<Env> {
13871393
communicate_concurrently(
13881394
remote_nodes,
13891395
async move |remote_node| {
1390-
let certificate = remote_node.download_certificate_for_blob(blob_id).await?;
1396+
let certificate = self
1397+
.requests_scheduler
1398+
.download_certificate_for_blob(&remote_node, blob_id)
1399+
.await?;
13911400
self.receive_sender_certificate(
13921401
certificate,
13931402
ReceiveCertificateMode::NeedsCheck,
@@ -1494,7 +1503,7 @@ impl<Env: Environment> Client<Env> {
14941503
}
14951504

14961505
/// Performs `f` in parallel on multiple nodes, starting with a quadratically increasing delay on
1497-
/// each subsequent node. Returns error `err` is all of the nodes fail.
1506+
/// each subsequent node. Returns error `err` if all of the nodes fail.
14981507
async fn communicate_concurrently<'a, A, E1, E2, F, G, R, V>(
14991508
nodes: &[RemoteNode<A>],
15001509
f: F,

0 commit comments

Comments
 (0)