diff --git a/CLI.md b/CLI.md index e5e2bb7e5d3..63ab7da9eae 100644 --- a/CLI.md +++ b/CLI.md @@ -201,6 +201,9 @@ Client implementation and command-line tool for the Linera blockchain * `--alpha ` — Smoothing factor for Exponential Moving Averages (0 < alpha < 1). Higher values give more weight to recent observations. Typical values are between 0.01 and 0.5. A value of 0.1 means that 10% of the new observation is considered and 90% of the previous average is retained Default value: `0.1` +* `--alternative-peers-retry-delay-ms ` — Delay in milliseconds between starting requests to different peers. This helps to stagger requests and avoid overwhelming the network + + Default value: `150` * `--storage ` — Storage configuration for the blockchain history * `--storage-max-concurrent-queries ` — The maximal number of simultaneous queries to the database * `--storage-max-stream-queries ` — The maximal number of simultaneous stream queries to the database diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index 48f0fd31d4e..c2fc8aa0374 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -230,6 +230,15 @@ pub struct ClientContextOptions { env = "LINERA_REQUESTS_SCHEDULER_ALPHA" )] pub alpha: f64, + + /// Delay in milliseconds between starting requests to different peers. + /// This helps to stagger requests and avoid overwhelming the network. + #[arg( + long, + default_value_t = linera_core::client::requests_scheduler::STAGGERED_DELAY_MS, + env = "LINERA_REQUESTS_SCHEDULER_ALTERNATIVE_PEERS_RETRY_DELAY_MS" + )] + pub alternative_peers_retry_delay_ms: u64, } impl ClientContextOptions { @@ -273,6 +282,7 @@ impl ClientContextOptions { cache_max_size: self.cache_max_size, max_request_ttl_ms: self.max_request_ttl_ms, alpha: self.alpha, + retry_delay_ms: self.alternative_peers_retry_delay_ms, } } } diff --git a/linera-core/src/client/requests_scheduler/in_flight_tracker.rs b/linera-core/src/client/requests_scheduler/in_flight_tracker.rs index 33786b3ee14..6a90198483d 100644 --- a/linera-core/src/client/requests_scheduler/in_flight_tracker.rs +++ b/linera-core/src/client/requests_scheduler/in_flight_tracker.rs @@ -173,6 +173,21 @@ impl InFlightTracker { let peers = entry.alternative_peers.read().await; Some(peers.clone()) } + + /// Removes a specific peer from the alternative peers list. + /// + /// # Arguments + /// - `key`: The request key to look up + /// - `peer`: The peer to remove from alternatives + pub(super) async fn remove_alternative_peer(&self, key: &RequestKey, peer: &N) + where + N: PartialEq + Eq, + { + if let Some(entry) = self.entries.read().await.get(key) { + let mut alt_peers = entry.alternative_peers.write().await; + alt_peers.retain(|p| p != peer); + } + } } /// Type of in-flight request match found. diff --git a/linera-core/src/client/requests_scheduler/mod.rs b/linera-core/src/client/requests_scheduler/mod.rs index 97888a213e6..b34c19bee6a 100644 --- a/linera-core/src/client/requests_scheduler/mod.rs +++ b/linera-core/src/client/requests_scheduler/mod.rs @@ -21,6 +21,7 @@ pub const CACHE_TTL_MS: u64 = 2000; pub const CACHE_MAX_SIZE: usize = 1000; pub const MAX_REQUEST_TTL_MS: u64 = 200; pub const ALPHA_SMOOTHING_FACTOR: f64 = 0.1; +pub const STAGGERED_DELAY_MS: u64 = 150; /// Configuration for the `RequestsScheduler`. #[derive(Debug, Clone)] @@ -35,6 +36,8 @@ pub struct RequestsSchedulerConfig { pub max_request_ttl_ms: u64, /// Smoothing factor for Exponential Moving Averages (0 < alpha < 1) pub alpha: f64, + /// Delay in milliseconds between starting requests to different peers. + pub retry_delay_ms: u64, } impl Default for RequestsSchedulerConfig { @@ -45,6 +48,7 @@ impl Default for RequestsSchedulerConfig { cache_max_size: CACHE_MAX_SIZE, max_request_ttl_ms: MAX_REQUEST_TTL_MS, alpha: ALPHA_SMOOTHING_FACTOR, + retry_delay_ms: STAGGERED_DELAY_MS, } } } diff --git a/linera-core/src/client/requests_scheduler/scheduler.rs b/linera-core/src/client/requests_scheduler/scheduler.rs index 4aa1a7d7652..9e7ec1edb0f 100644 --- a/linera-core/src/client/requests_scheduler/scheduler.rs +++ b/linera-core/src/client/requests_scheduler/scheduler.rs @@ -28,7 +28,9 @@ use super::{ use crate::{ client::{ communicate_concurrently, - requests_scheduler::{in_flight_tracker::Subscribed, request::Cacheable}, + requests_scheduler::{ + in_flight_tracker::Subscribed, request::Cacheable, STAGGERED_DELAY_MS, + }, RequestsSchedulerConfig, }, environment::Environment, @@ -131,6 +133,8 @@ pub struct RequestsScheduler { alpha: f64, /// Default maximum expected latency in milliseconds for score normalization. max_expected_latency: f64, + /// Delay between starting requests to alternative peers. + retry_delay: Duration, /// Tracks in-flight requests to deduplicate concurrent requests for the same data. in_flight_tracker: InFlightTracker>, /// Cache of recently completed requests with their results and timestamps. @@ -151,6 +155,7 @@ impl RequestsScheduler { Duration::from_millis(config.cache_ttl_ms), config.cache_max_size, Duration::from_millis(config.max_request_ttl_ms), + Duration::from_millis(config.retry_delay_ms), ) } @@ -165,6 +170,8 @@ impl RequestsScheduler { /// - `cache_ttl`: Time-to-live for cached responses /// - `max_cache_size`: Maximum number of entries in the cache /// - `max_request_ttl`: Maximum latency for an in-flight request before we stop deduplicating it + /// - `retry_delay_ms`: Delay in milliseconds between starting requests to different peers. + #[allow(clippy::too_many_arguments)] pub fn with_config( nodes: impl IntoIterator>, weights: ScoringWeights, @@ -173,6 +180,7 @@ impl RequestsScheduler { cache_ttl: Duration, max_cache_size: usize, max_request_ttl: Duration, + retry_delay: Duration, ) -> Self { assert!(alpha > 0.0 && alpha < 1.0, "Alpha must be in (0, 1) range"); Self { @@ -190,6 +198,7 @@ impl RequestsScheduler { weights, alpha, max_expected_latency: max_expected_latency_ms, + retry_delay, in_flight_tracker: InFlightTracker::new(max_request_ttl), cache: RequestsCache::new(cache_ttl, max_cache_size), } @@ -229,7 +238,7 @@ impl RequestsScheduler { async fn with_best(&self, key: RequestKey, operation: F) -> Result where R: Cacheable + Clone + Send + 'static, - F: FnOnce(RemoteNode) -> Fut, + F: Fn(RemoteNode) -> Fut, Fut: Future>, { // Select the best available peer @@ -268,15 +277,16 @@ impl RequestsScheduler { ) -> Result where R: Cacheable + Clone + Send + 'static, - F: FnOnce(RemoteNode) -> Fut, + F: Fn(RemoteNode) -> Fut, Fut: Future>, { self.add_peer(peer.clone()).await; self.in_flight_tracker .add_alternative_peer(&key, peer.clone()) .await; - self.deduplicated_request(key, peer, |peer| async { - self.track_request(peer, operation).await + self.deduplicated_request(key, peer, move |peer| { + let fut = operation(peer.clone()); + async move { self.track_request(peer, fut).await } }) .await } @@ -344,8 +354,11 @@ impl RequestsScheduler { heights: heights.clone(), }, peer.clone(), - |peer| async move { - Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await + |peer| { + let heights = heights.clone(); + async move { + Box::pin(peer.download_certificates_by_heights(chain_id, heights)).await + } }, ) .await @@ -363,9 +376,12 @@ impl RequestsScheduler { heights: heights.clone(), }, peer.clone(), - |peer| async move { - peer.download_certificates_by_heights(chain_id, heights) - .await + |peer| { + let heights = heights.clone(); + async move { + peer.download_certificates_by_heights(chain_id, heights) + .await + } }, ) .await @@ -435,34 +451,28 @@ impl RequestsScheduler { /// Wraps a request operation with performance tracking and capacity management. /// /// This method: - /// 1. Acquires a request slot (blocks asynchronously until one is available) - /// 2. Executes the provided operation with the selected peer - /// 3. Measures response time - /// 4. Updates node metrics based on success/failure - /// 5. Releases the request slot + /// 1. Measures response time + /// 2. Updates node metrics based on success/failure /// /// # Arguments - /// - `peer`: The remote node to execute the operation on - /// - `operation`: Async closure that performs the actual request with the selected peer + /// - `peer`: The remote node to track metrics for + /// - `operation`: Future that performs the actual request /// /// # Behavior - /// If no slot is available, this method will wait asynchronously (without polling) - /// until another request completes and releases its slot. The task will be efficiently - /// suspended and woken by the async runtime using notification mechanisms. - async fn track_request( + /// Executes the provided future and tracks metrics for the given peer. + async fn track_request( &self, peer: RemoteNode, - operation: F, + operation: Fut, ) -> Result where - F: FnOnce(RemoteNode) -> Fut, Fut: Future>, { let start_time = Instant::now(); let public_key = peer.public_key; // Execute the operation - let result = operation(peer).await; + let result = operation.await; // Update metrics and release slot let response_time_ms = start_time.elapsed().as_millis() as u64; @@ -520,15 +530,15 @@ impl RequestsScheduler { /// /// # Returns /// The result from either the in-flight request or the newly executed operation - async fn deduplicated_request( + async fn deduplicated_request( &self, key: RequestKey, - peer: N, + peer: RemoteNode, operation: F, ) -> Result where T: Cacheable + Clone + Send + 'static, - F: FnOnce(N) -> Fut, + F: Fn(RemoteNode) -> Fut, Fut: Future>, { // Check cache for exact or subsuming match @@ -646,9 +656,42 @@ impl RequestsScheduler { // Create new in-flight entry for this request self.in_flight_tracker.insert_new(key.clone()).await; + // Remove the peer we're about to use from alternatives (it shouldn't retry with itself) + self.in_flight_tracker + .remove_alternative_peer(&key, &peer) + .await; + // Execute the actual request - tracing::trace!(key = ?key, "executing new request"); - let result = operation(peer).await; + tracing::trace!(key = ?key, peer = ?peer, "executing new request"); + let result = operation(peer.clone()).await; + + // If the first request failed, try alternative peers in staggered parallel + let result = if result.is_err() { + // Get all alternative peers at once + let alternative_peers = self.in_flight_tracker.get_alternative_peers(&key).await; + + if let Some(peers) = alternative_peers { + if !peers.is_empty() { + tracing::info!( + key = ?key, + peer_count = peers.len(), + "request failed, trying alternative peers in staggered parallel" + ); + + self.try_staggered_parallel(&key, peers, &operation, STAGGERED_DELAY_MS) + .await + } else { + // No alternatives registered, return original result. + result + } + } else { + // No alternatives registered, return original result. + result + } + } else { + result + }; + let result_for_broadcast: Result = result.clone().map(Into::into); let shared_result = Arc::new(result_for_broadcast); @@ -665,6 +708,84 @@ impl RequestsScheduler { result } + /// Tries alternative peers in staggered parallel fashion. + /// + /// Launches requests to alternative peers with a stagger delay between each, + /// returning the first successful result. This provides a balance between + /// sequential (slow) and fully parallel (wasteful) approaches. + /// + /// # Arguments + /// - `key`: The request key (for logging) + /// - `peers`: List of alternative peers to try + /// - `operation`: The operation to execute on each peer + /// + /// # Returns + /// The first successful result, or the last error if all fail + async fn try_staggered_parallel( + &self, + key: &RequestKey, + peers: Vec>, + operation: &F, + staggered_delay_ms: u64, + ) -> Result + where + T: Send, + F: Fn(RemoteNode) -> Fut, + Fut: Future>, + { + use futures::stream::{FuturesUnordered, StreamExt}; + use linera_base::time::{timer::sleep, Duration}; + + let mut futures = FuturesUnordered::new(); + + // Create futures with staggered delays + for (idx, peer) in peers.into_iter().enumerate() { + let delay = Duration::from_millis(staggered_delay_ms * idx as u64); + let fut = operation(peer); + + let delayed_fut = async move { + if delay.as_millis() > 0 { + sleep(delay).await; + } + (idx, fut.await) + }; + + futures.push(delayed_fut); + } + + // Wait for first success or collect all failures + let mut last_error = None; + + while let Some((peer_idx, result)) = futures.next().await { + match result { + Ok(value) => { + tracing::info!( + key = ?key, + peer_index = peer_idx, + "staggered parallel retry succeeded" + ); + return Ok(value); + } + Err(e) => { + tracing::debug!( + key = ?key, + peer_index = peer_idx, + error = %e, + "staggered parallel retry attempt failed" + ); + last_error = Some(e); + } + } + } + + // All attempts failed + tracing::debug!( + key = ?key, + "all staggered parallel retry attempts failed" + ); + Err(last_error.unwrap_or(NodeError::UnexpectedMessage)) + } + /// Returns all peers ordered by their score (highest first). /// /// Only includes peers that can currently accept requests. Each peer is paired @@ -770,6 +891,7 @@ mod tests { cache_ttl, 100, in_flight_timeout, + Duration::from_millis(STAGGERED_DELAY_MS), ); // Replace the tracker with one using the custom timeout manager.in_flight_tracker = InFlightTracker::new(in_flight_timeout); @@ -789,11 +911,33 @@ mod tests { } } + /// Helper function to create a dummy peer for testing + fn dummy_peer() -> RemoteNode<::ValidatorNode> { + use crate::test_utils::{MemoryStorageBuilder, TestBuilder}; + + // Create a minimal test builder to get a validator node + let mut builder = futures::executor::block_on(async { + TestBuilder::new( + MemoryStorageBuilder::default(), + 1, + 0, + linera_base::crypto::InMemorySigner::new(None), + ) + .await + .unwrap() + }); + + let node = builder.node(0); + let public_key = node.name(); + RemoteNode { public_key, node } + } + #[tokio::test] async fn test_cache_hit_returns_cached_result() { // Create a manager with standard cache TTL let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60)); let key = test_key(); + let peer = dummy_peer(); // Track how many times the operation is executed let execution_count = Arc::new(AtomicUsize::new(0)); @@ -801,9 +945,12 @@ mod tests { // First call - should execute the operation and cache the result let result1: Result, NodeError> = manager - .deduplicated_request(key.clone(), (), |_| async move { - execution_count_clone.fetch_add(1, Ordering::SeqCst); - test_result_ok() + .deduplicated_request(key.clone(), peer.clone(), |_| { + let count = execution_count_clone.clone(); + async move { + count.fetch_add(1, Ordering::SeqCst); + test_result_ok() + } }) .await; @@ -813,9 +960,12 @@ mod tests { // Second call - should return cached result without executing the operation let execution_count_clone2 = execution_count.clone(); let result2: Result, NodeError> = manager - .deduplicated_request(key.clone(), (), |_| async move { - execution_count_clone2.fetch_add(1, Ordering::SeqCst); - test_result_ok() + .deduplicated_request(key.clone(), peer.clone(), |_| { + let count = execution_count_clone2.clone(); + async move { + count.fetch_add(1, Ordering::SeqCst); + test_result_ok() + } }) .await; @@ -828,24 +978,34 @@ mod tests { async fn test_in_flight_request_deduplication() { let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60)); let key = test_key(); + let peer = dummy_peer(); // Track how many times the operation is executed let execution_count = Arc::new(AtomicUsize::new(0)); // Create a channel to control when the first operation completes let (tx, rx) = oneshot::channel(); + let rx = Arc::new(tokio::sync::Mutex::new(Some(rx))); // Start first request (will be slow - waits for signal) let manager_clone = Arc::clone(&manager); let key_clone = key.clone(); let execution_count_clone = execution_count.clone(); + let rx_clone = Arc::clone(&rx); + let peer_clone = peer.clone(); let first_request = tokio::spawn(async move { manager_clone - .deduplicated_request(key_clone, (), |_| async move { - execution_count_clone.fetch_add(1, Ordering::SeqCst); - // Wait for signal before completing - rx.await.unwrap(); - test_result_ok() + .deduplicated_request(key_clone, peer_clone, |_| { + let count = execution_count_clone.clone(); + let rx = Arc::clone(&rx_clone); + async move { + count.fetch_add(1, Ordering::SeqCst); + // Wait for signal before completing + if let Some(receiver) = rx.lock().await.take() { + receiver.await.unwrap(); + } + test_result_ok() + } }) .await }); @@ -854,9 +1014,12 @@ mod tests { let execution_count_clone2 = execution_count.clone(); let second_request = tokio::spawn(async move { manager - .deduplicated_request(key, (), |_| async move { - execution_count_clone2.fetch_add(1, Ordering::SeqCst); - test_result_ok() + .deduplicated_request(key, peer, |_| { + let count = execution_count_clone2.clone(); + async move { + count.fetch_add(1, Ordering::SeqCst); + test_result_ok() + } }) .await }); @@ -881,23 +1044,33 @@ mod tests { async fn test_multiple_subscribers_all_notified() { let manager = create_test_manager(Duration::from_secs(60), Duration::from_secs(60)); let key = test_key(); + let peer = dummy_peer(); // Track how many times the operation is executed let execution_count = Arc::new(AtomicUsize::new(0)); // Create a channel to control when the operation completes let (tx, rx) = oneshot::channel(); + let rx = Arc::new(tokio::sync::Mutex::new(Some(rx))); // Start first request (will be slow - waits for signal) let manager_clone1 = Arc::clone(&manager); let key_clone1 = key.clone(); let execution_count_clone = execution_count.clone(); + let rx_clone = Arc::clone(&rx); + let peer_clone = peer.clone(); let first_request = tokio::spawn(async move { manager_clone1 - .deduplicated_request(key_clone1, (), |_| async move { - execution_count_clone.fetch_add(1, Ordering::SeqCst); - rx.await.unwrap(); - test_result_ok() + .deduplicated_request(key_clone1, peer_clone, |_| { + let count = execution_count_clone.clone(); + let rx = Arc::clone(&rx_clone); + async move { + count.fetch_add(1, Ordering::SeqCst); + if let Some(receiver) = rx.lock().await.take() { + receiver.await.unwrap(); + } + test_result_ok() + } }) .await }); @@ -908,11 +1081,15 @@ mod tests { let manager_clone = Arc::clone(&manager); let key_clone = key.clone(); let execution_count_clone = execution_count.clone(); + let peer_clone = peer.clone(); let handle = tokio::spawn(async move { manager_clone - .deduplicated_request(key_clone, (), |_| async move { - execution_count_clone.fetch_add(1, Ordering::SeqCst); - test_result_ok() + .deduplicated_request(key_clone, peer_clone, |_| { + let count = execution_count_clone.clone(); + async move { + count.fetch_add(1, Ordering::SeqCst); + test_result_ok() + } }) .await }); @@ -942,23 +1119,33 @@ mod tests { let manager = create_test_manager(Duration::from_millis(50), Duration::from_secs(60)); let key = test_key(); + let peer = dummy_peer(); // Track how many times the operation is executed let execution_count = Arc::new(AtomicUsize::new(0)); // Create a channel to control when the first operation completes let (tx, rx) = oneshot::channel(); + let rx = Arc::new(tokio::sync::Mutex::new(Some(rx))); // Start first request (will be slow - waits for signal) let manager_clone = Arc::clone(&manager); let key_clone = key.clone(); let execution_count_clone = execution_count.clone(); + let rx_clone = Arc::clone(&rx); + let peer_clone = peer.clone(); let first_request = tokio::spawn(async move { manager_clone - .deduplicated_request(key_clone, (), |_| async move { - execution_count_clone.fetch_add(1, Ordering::SeqCst); - rx.await.unwrap(); - test_result_ok() + .deduplicated_request(key_clone, peer_clone, |_| { + let count = execution_count_clone.clone(); + let rx = Arc::clone(&rx_clone); + async move { + count.fetch_add(1, Ordering::SeqCst); + if let Some(receiver) = rx.lock().await.take() { + receiver.await.unwrap(); + } + test_result_ok() + } }) .await }); @@ -970,9 +1157,12 @@ mod tests { let execution_count_clone2 = execution_count.clone(); let second_request = tokio::spawn(async move { manager - .deduplicated_request(key, (), |_| async move { - execution_count_clone2.fetch_add(1, Ordering::SeqCst); - test_result_ok() + .deduplicated_request(key, peer, |_| { + let count = execution_count_clone2.clone(); + async move { + count.fetch_add(1, Ordering::SeqCst); + test_result_ok() + } }) .await }); @@ -1027,6 +1217,7 @@ mod tests { Duration::from_secs(60), 100, Duration::from_millis(MAX_REQUEST_TTL_MS), + Duration::from_millis(STAGGERED_DELAY_MS), )); let key = RequestKey::Blob(BlobId::new( @@ -1036,17 +1227,24 @@ mod tests { // Create a channel to control when first request completes let (tx, rx) = oneshot::channel(); + let rx = Arc::new(tokio::sync::Mutex::new(Some(rx))); // Start first request with node 0 (will block until signaled) let manager_clone = Arc::clone(&manager); let node_clone = nodes[0].clone(); let key_clone = key.clone(); + let rx_clone = Arc::clone(&rx); let first_request = tokio::spawn(async move { manager_clone - .with_peer(key_clone, node_clone, |_peer| async move { - // Wait for signal - rx.await.unwrap(); - Ok(None) // Return Option + .with_peer(key_clone, node_clone, |_peer| { + let rx = Arc::clone(&rx_clone); + async move { + // Wait for signal + if let Some(receiver) = rx.lock().await.take() { + receiver.await.unwrap(); + } + Ok(None) // Return Option + } }) .await }); @@ -1105,4 +1303,137 @@ mod tests { "Expected in-flight entry to be removed after completion" ); } + + #[tokio::test] + async fn test_staggered_parallel_retry_on_failure() { + use std::sync::atomic::{AtomicU64, Ordering}; + + use crate::test_utils::{MemoryStorageBuilder, TestBuilder}; + + // Create a test environment with four validators + let mut builder = TestBuilder::new( + MemoryStorageBuilder::default(), + 4, + 0, + InMemorySigner::new(None), + ) + .await + .unwrap(); + + // Get validator nodes + let nodes: Vec<_> = (0..4) + .map(|i| { + let node = builder.node(i); + let public_key = node.name(); + RemoteNode { public_key, node } + }) + .collect(); + + let staggered_delay_ms = 10u128; + + // Store public keys for comparison + let node0_key = nodes[0].public_key; + let node2_key = nodes[2].public_key; + + // Create a RequestsScheduler + let manager: Arc> = + Arc::new(RequestsScheduler::with_config( + nodes.clone(), + ScoringWeights::default(), + 0.1, + 1000.0, + Duration::from_secs(60), + 100, + Duration::from_millis(MAX_REQUEST_TTL_MS), + Duration::from_millis(STAGGERED_DELAY_MS), + )); + + let key = test_key(); + + // Track when each peer is called + let call_times = Arc::new(tokio::sync::Mutex::new(Vec::new())); + let start_time = Instant::now(); + + // Track call count per peer + let call_count = Arc::new(AtomicU64::new(0)); + + let call_times_clone = Arc::clone(&call_times); + let call_count_clone = Arc::clone(&call_count); + + // Test the staggered parallel retry logic directly + let operation = |peer: RemoteNode<::ValidatorNode>| { + let times = Arc::clone(&call_times_clone); + let count = Arc::clone(&call_count_clone); + let start = start_time; + async move { + let elapsed = Instant::now().duration_since(start); + times.lock().await.push((peer.public_key, elapsed)); + count.fetch_add(1, Ordering::SeqCst); + + if peer.public_key == node0_key { + // Node 0 fails quickly + Err(NodeError::UnexpectedMessage) + } else if peer.public_key == node2_key { + // Node 2 succeeds after a delay + let delay = staggered_delay_ms / 2; + tokio::time::sleep(Duration::from_millis(delay.try_into().unwrap())).await; + Ok(vec![]) + } else { + // Other nodes take longer or fail + let delay = staggered_delay_ms * 2; + tokio::time::sleep(Duration::from_millis(delay.try_into().unwrap())).await; + Err(NodeError::UnexpectedMessage) + } + } + }; + + // Use nodes 0, 1, 2, 3 as alternatives + let result: Result, NodeError> = manager + .try_staggered_parallel( + &key, + nodes.clone(), + &operation, + staggered_delay_ms.try_into().unwrap(), + ) + .await; + + // Should succeed with result from node 2 + assert!( + result.is_ok(), + "Expected request to succeed with alternative peer" + ); + + // Verify timing: calls should be staggered, not sequential + let times = call_times.lock().await; + // Can't test exactly 2 b/c we sleep _inside_ the operation and increase right at the start of it. + assert!( + times.len() >= 2, + "Should have tried at least 2 peers, got {}", + times.len() + ); + + // First call should be at ~0ms + assert!( + times[0].1.as_millis() < 10, + "First peer should be called immediately, was called at {}ms", + times[0].1.as_millis() + ); + + // Second call should start after a stagger delay (~10ms) + if times.len() > 1 { + let delay = times[1].1.as_millis(); + assert!( + delay >= staggered_delay_ms / 2 && delay <= staggered_delay_ms + 5, + "Second peer should be called after stagger delay (~{}ms), got {}ms", + staggered_delay_ms, + delay + ); + } + + // Total time should be significantly less than sequential. + // With staggered parallel: node0 fails immediately, node1 starts at 10ms (and takes 20ms), + // node2 starts at 20ms and succeeds at 25ms total + let total_time = Instant::now().duration_since(start_time).as_millis(); + assert!(total_time >= staggered_delay_ms && total_time < 50,); + } } diff --git a/linera-web/src/lib.rs b/linera-web/src/lib.rs index adba7e3c26d..1f0fc82aadc 100644 --- a/linera-web/src/lib.rs +++ b/linera-web/src/lib.rs @@ -97,6 +97,7 @@ pub const OPTIONS: ClientContextOptions = ClientContextOptions { cache_max_size: linera_core::client::requests_scheduler::CACHE_MAX_SIZE, max_request_ttl_ms: linera_core::client::requests_scheduler::MAX_REQUEST_TTL_MS, alpha: linera_core::client::requests_scheduler::ALPHA_SMOOTHING_FACTOR, + alternative_peers_retry_delay_ms: linera_core::client::requests_scheduler::STAGGERED_DELAY_MS, // TODO(linera-protocol#2944): separate these out from the // `ClientOptions` struct, since they apply only to the CLI/native