Skip to content

Commit 5704fbf

Browse files
committed
Set timeout for the first request
1 parent 6e9cc93 commit 5704fbf

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed

linera-core/src/client/requests_scheduler/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub const CACHE_TTL_MS: u64 = 2000;
2121
pub const CACHE_MAX_SIZE: usize = 1000;
2222
pub const MAX_REQUEST_TTL_MS: u64 = 200;
2323
pub const ALPHA_SMOOTHING_FACTOR: f64 = 0.1;
24-
pub const STAGGERED_DELAY_MS: u64 = 75;
24+
pub const STAGGERED_DELAY_MS: u64 = 150;
2525

2626
/// Configuration for the `RequestsScheduler`.
2727
#[derive(Debug, Clone)]

linera-core/src/client/requests_scheduler/scheduler.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
use std::{cmp::Ordering, collections::BTreeMap, future::Future, sync::Arc};
55

66
use custom_debug_derive::Debug;
7-
use futures::stream::{FuturesUnordered, StreamExt};
7+
use futures::{
8+
stream::{FuturesUnordered, StreamExt},
9+
FutureExt,
10+
};
811
use linera_base::{
912
crypto::ValidatorPublicKey,
1013
data_types::{Blob, BlobContent, BlockHeight},
@@ -663,7 +666,15 @@ impl<Env: Environment> RequestsScheduler<Env> {
663666

664667
// Execute the actual request
665668
tracing::trace!(key = ?key, peer = ?peer, "executing new request");
666-
let result = operation(peer.clone()).await;
669+
let result = futures::select! {
670+
_timeout = linera_base::time::timer::sleep(self.retry_delay).fuse() => {
671+
tracing::trace!(key = ?key, "retry delay elapsed, proceeding with request");
672+
Err(NodeError::WorkerError { error: "timeout".to_string() }) // Placeholder error to trigger retries
673+
},
674+
result = operation(peer.clone()).fuse() => {
675+
result
676+
}
677+
};
667678

668679
// If the first request failed, try alternative peers in staggered parallel
669680
let result = if result.is_err() {
@@ -754,7 +765,8 @@ impl<Env: Environment> RequestsScheduler<Env> {
754765
}
755766

756767
// Wait for first success or collect all failures
757-
let mut last_error = None;
768+
// Placeholder for the last error. If we error, this will be updated.
769+
let mut last_error = NodeError::UnexpectedMessage;
758770

759771
while let Some((peer_idx, result)) = futures.next().await {
760772
match result {
@@ -773,7 +785,7 @@ impl<Env: Environment> RequestsScheduler<Env> {
773785
error = %e,
774786
"staggered parallel retry attempt failed"
775787
);
776-
last_error = Some(e);
788+
last_error = e;
777789
}
778790
}
779791
}
@@ -783,7 +795,7 @@ impl<Env: Environment> RequestsScheduler<Env> {
783795
key = ?key,
784796
"all staggered parallel retry attempts failed"
785797
);
786-
Err(last_error.unwrap_or(NodeError::UnexpectedMessage))
798+
Err(last_error)
787799
}
788800

789801
/// Returns all peers ordered by their score (highest first).

0 commit comments

Comments
 (0)