Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ Client implementation and command-line tool for the Linera blockchain
* `--max-retries <MAX_RETRIES>` — Number of times to retry connecting to a validator

Default value: `10`
* `--max-backoff-ms <MAX_BACKOFF>` — Maximum backoff delay for retrying to connect to a validator

Default value: `30000`
* `--wait-for-outgoing-messages` — Whether to wait until a quorum of validators has confirmed that all sent cross-chain messages have been delivered
* `--allow-fast-blocks` — Whether to allow creating blocks in the fast round. Fast blocks have lower latency but must be used carefully so that there are never any conflicting fast block proposals
* `--long-lived-services` — (EXPERIMENTAL) Whether application services can persist in some cases between queries
Expand Down Expand Up @@ -1214,6 +1217,9 @@ Start a Local Linera Network
* `--cross-chain-retry-delay-ms <RETRY_DELAY_MS>` — Delay before retrying of cross-chain message

Default value: `2000`
* `--cross-chain-max-backoff-ms <MAX_BACKOFF_MS>` — Maximum backoff delay for cross-chain message retries

Default value: `30000`
* `--cross-chain-sender-delay-ms <SENDER_DELAY_MS>` — Introduce a delay before sending every cross-chain message (e.g. for testing purpose)

Default value: `0`
Expand Down
4 changes: 4 additions & 0 deletions linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ pub struct ClientContext<Env: Environment> {
pub recv_timeout: Duration,
pub retry_delay: Duration,
pub max_retries: u32,
pub max_backoff: Duration,
pub chain_listeners: JoinSet,
// TODO(#5082): move this into the upstream UI layers (maybe just the CLI)
pub default_chain: Option<ChainId>,
Expand Down Expand Up @@ -284,6 +285,7 @@ where
recv_timeout: options.recv_timeout,
retry_delay: options.retry_delay,
max_retries: options.max_retries,
max_backoff: options.max_backoff,
});
let chain_modes: Vec<_> = wallet
.items()
Expand Down Expand Up @@ -342,6 +344,7 @@ where
recv_timeout: options.recv_timeout,
retry_delay: options.retry_delay,
max_retries: options.max_retries,
max_backoff: options.max_backoff,
chain_listeners: JoinSet::default(),
#[cfg(not(web))]
client_metrics,
Expand Down Expand Up @@ -397,6 +400,7 @@ impl<Env: Environment> ClientContext<Env> {
recv_timeout: self.recv_timeout,
retry_delay: self.retry_delay,
max_retries: self.max_retries,
max_backoff: self.max_backoff,
}
}

Expand Down
8 changes: 8 additions & 0 deletions linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ pub struct Options {
#[arg(long, default_value = "10")]
pub max_retries: u32,

/// Maximum backoff delay for retrying to connect to a validator.
#[arg(
long = "max-backoff-ms",
default_value = "30000",
value_parser = util::parse_millis
)]
pub max_backoff: Duration,

/// Whether to wait until a quorum of validators has confirmed that all sent cross-chain
/// messages have been delivered.
#[arg(long)]
Expand Down
6 changes: 6 additions & 0 deletions linera-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ pub struct CrossChainConfig {
#[arg(long = "cross-chain-retry-delay-ms", default_value = "2000")]
pub(crate) retry_delay_ms: u64,

/// Maximum backoff delay for cross-chain message retries.
#[arg(long = "cross-chain-max-backoff-ms", default_value = "30000")]
pub(crate) max_backoff_ms: u64,

/// Introduce a delay before sending every cross-chain message (e.g. for testing purpose).
#[arg(long = "cross-chain-sender-delay-ms", default_value = "0")]
pub(crate) sender_delay_ms: u64,
Expand All @@ -49,6 +53,8 @@ impl CrossChainConfig {
self.max_retries.to_string(),
"--cross-chain-retry-delay-ms".to_string(),
self.retry_delay_ms.to_string(),
"--cross-chain-max-backoff-ms".to_string(),
self.max_backoff_ms.to_string(),
"--cross-chain-sender-delay-ms".to_string(),
self.sender_delay_ms.to_string(),
"--cross-chain-sender-failure-rate".to_string(),
Expand Down
10 changes: 8 additions & 2 deletions linera-rpc/src/cross_chain_message_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use linera_core::data_types::CrossChainRequest;
use rand::Rng as _;
use tracing::{trace, warn};

use crate::config::ShardId;
use crate::{config::ShardId, full_jitter_delay};

#[cfg(with_metrics)]
mod metrics {
Expand Down Expand Up @@ -51,6 +51,7 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
nickname: String,
cross_chain_max_retries: u32,
cross_chain_retry_delay: Duration,
cross_chain_max_backoff: Duration,
cross_chain_sender_delay: Duration,
cross_chain_sender_failure_rate: f32,
this_shard: ShardId,
Expand Down Expand Up @@ -104,7 +105,12 @@ pub(crate) async fn forward_cross_chain_queries<F, G>(
}

Action::Retry => {
linera_base::time::timer::sleep(cross_chain_retry_delay * state.retries).await;
let delay = full_jitter_delay(
cross_chain_retry_delay,
state.retries,
cross_chain_max_backoff,
);
linera_base::time::timer::sleep(delay).await;
Action::Proceed { id: state.id }
}
},
Expand Down
12 changes: 8 additions & 4 deletions linera-rpc/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use super::{
#[cfg(feature = "opentelemetry")]
use crate::propagation::{get_context_with_traffic_type, inject_context};
use crate::{
grpc::api::RawCertificate, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
full_jitter_delay, grpc::api::RawCertificate, HandleConfirmedCertificateRequest,
HandleLiteCertRequest, HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
};

#[derive(Clone)]
Expand All @@ -53,6 +53,7 @@ pub struct GrpcClient {
client: ValidatorNodeClient<transport::Channel>,
retry_delay: Duration,
max_retries: u32,
max_backoff: Duration,
}

impl GrpcClient {
Expand All @@ -61,6 +62,7 @@ impl GrpcClient {
channel: transport::Channel,
retry_delay: Duration,
max_retries: u32,
max_backoff: Duration,
) -> Self {
let client = ValidatorNodeClient::new(channel)
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
Expand All @@ -70,6 +72,7 @@ impl GrpcClient {
client,
retry_delay,
max_retries,
max_backoff,
}
}

Expand Down Expand Up @@ -137,7 +140,7 @@ impl GrpcClient {
inject_context(&get_context_with_traffic_type(), request.metadata_mut());
match f(self.client.clone(), request).await {
Err(s) if Self::is_retryable(&s) && retry_count < self.max_retries => {
let delay = self.retry_delay.saturating_mul(retry_count);
let delay = full_jitter_delay(self.retry_delay, retry_count, self.max_backoff);
retry_count += 1;
linera_base::time::timer::sleep(delay).await;
continue;
Expand Down Expand Up @@ -295,6 +298,7 @@ impl ValidatorNode for GrpcClient {
async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError> {
let retry_delay = self.retry_delay;
let max_retries = self.max_retries;
let max_backoff = self.max_backoff;
// Use shared atomic counter so unfold can reset it on successful reconnection.
let retry_count = Arc::new(AtomicU32::new(0));
let subscription_request = SubscriptionRequest {
Expand Down Expand Up @@ -362,7 +366,7 @@ impl ValidatorNode for GrpcClient {
{
return future::Either::Left(future::ready(false));
}
let delay = retry_delay.saturating_mul(current_retry_count);
let delay = full_jitter_delay(retry_delay, current_retry_count, max_backoff);
retry_count.fetch_add(1, Ordering::Relaxed);
future::Either::Right(async move {
linera_base::time::timer::sleep(delay).await;
Expand Down
4 changes: 4 additions & 0 deletions linera-rpc/src/grpc/node_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@ pub struct GrpcNodeProvider {
pool: GrpcConnectionPool,
retry_delay: Duration,
max_retries: u32,
max_backoff: Duration,
}

impl GrpcNodeProvider {
pub fn new(options: NodeOptions) -> Self {
let transport_options = transport::Options::from(&options);
let retry_delay = options.retry_delay;
let max_retries = options.max_retries;
let max_backoff = options.max_backoff;
let pool = GrpcConnectionPool::new(transport_options);
Self {
pool,
retry_delay,
max_retries,
max_backoff,
}
}
}
Expand All @@ -56,6 +59,7 @@ impl ValidatorNodeProvider for GrpcNodeProvider {
channel,
self.retry_delay,
self.max_retries,
self.max_backoff,
))
}
}
3 changes: 3 additions & 0 deletions linera-rpc/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ where
internal_network.clone(),
cross_chain_config.max_retries,
Duration::from_millis(cross_chain_config.retry_delay_ms),
Duration::from_millis(cross_chain_config.max_backoff_ms),
Duration::from_millis(cross_chain_config.sender_delay_ms),
cross_chain_config.sender_failure_rate,
shard_id,
Expand Down Expand Up @@ -609,6 +610,7 @@ where
network: ValidatorInternalNetworkConfig,
cross_chain_max_retries: u32,
cross_chain_retry_delay: Duration,
cross_chain_max_backoff: Duration,
cross_chain_sender_delay: Duration,
cross_chain_sender_failure_rate: f32,
this_shard: ShardId,
Expand All @@ -632,6 +634,7 @@ where
nickname,
cross_chain_max_retries,
cross_chain_retry_delay,
cross_chain_max_backoff,
cross_chain_sender_delay,
cross_chain_sender_failure_rate,
this_shard,
Expand Down
20 changes: 19 additions & 1 deletion linera-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub mod propagation;

pub use client::Client;
pub use message::{RpcMessage, ShardInfo};
pub use node_provider::{NodeOptions, NodeProvider};
pub use node_provider::{NodeOptions, NodeProvider, DEFAULT_MAX_BACKOFF};

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
#[cfg_attr(with_testing, derive(Eq, PartialEq))]
Expand Down Expand Up @@ -58,3 +58,21 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("file
pub const CERT_PEM: &str = include_str!(concat!(env!("OUT_DIR"), "/self_signed_cert.pem"));
#[cfg(not(target_arch = "wasm32"))]
pub const KEY_PEM: &str = include_str!(concat!(env!("OUT_DIR"), "/private_key.pem"));

/// Computes a Full Jitter delay for exponential backoff.
///
/// Uses the AWS-recommended formula: `sleep = random(0, min(cap, base * 2^attempt))`.
/// Reference: <https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/>
pub(crate) fn full_jitter_delay(
base_delay: std::time::Duration,
attempt: u32,
max_backoff: std::time::Duration,
) -> std::time::Duration {
use rand::Rng as _;
let exponential_delay =
base_delay.saturating_mul(1u32.checked_shl(attempt).unwrap_or(u32::MAX));
let capped_delay = exponential_delay.min(max_backoff);
std::time::Duration::from_millis(
rand::thread_rng().gen_range(0..=capped_delay.as_millis() as u64),
)
}
22 changes: 21 additions & 1 deletion linera-rpc/src/node_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,30 @@ impl ValidatorNodeProvider for NodeProvider {
}
}

#[derive(Copy, Clone, Default)]
/// Default maximum backoff delay (30 seconds), following Google Cloud's recommendation.
/// References:
/// - <https://cloud.google.com/storage/docs/retry-strategy>
/// - <https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html>
/// - <https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md>
pub const DEFAULT_MAX_BACKOFF: Duration = Duration::from_secs(30);

#[derive(Copy, Clone)]
pub struct NodeOptions {
pub send_timeout: Duration,
pub recv_timeout: Duration,
pub retry_delay: Duration,
pub max_retries: u32,
pub max_backoff: Duration,
}

impl Default for NodeOptions {
fn default() -> Self {
Self {
send_timeout: Duration::ZERO,
recv_timeout: Duration::ZERO,
retry_delay: Duration::ZERO,
max_retries: 0,
max_backoff: DEFAULT_MAX_BACKOFF,
}
}
}
3 changes: 3 additions & 0 deletions linera-rpc/src/simple/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ where
network: ValidatorInternalNetworkPreConfig<TransportProtocol>,
cross_chain_max_retries: u32,
cross_chain_retry_delay: Duration,
cross_chain_max_backoff: Duration,
cross_chain_sender_delay: Duration,
cross_chain_sender_failure_rate: f32,
this_shard: ShardId,
Expand Down Expand Up @@ -111,6 +112,7 @@ where
nickname,
cross_chain_max_retries,
cross_chain_retry_delay,
cross_chain_max_backoff,
cross_chain_sender_delay,
cross_chain_sender_failure_rate,
this_shard,
Expand Down Expand Up @@ -139,6 +141,7 @@ where
self.network.clone(),
self.cross_chain_config.max_retries,
Duration::from_millis(self.cross_chain_config.retry_delay_ms),
Duration::from_millis(self.cross_chain_config.max_backoff_ms),
Duration::from_millis(self.cross_chain_config.sender_delay_ms),
self.cross_chain_config.sender_failure_rate,
self.shard_id,
Expand Down
14 changes: 10 additions & 4 deletions linera-rpc/tests/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ async fn client() {
timeout: Some(Duration::from_millis(100)),
};
let channel = create_channel(address.clone(), &options).unwrap();
GrpcClient::new(address, channel, retry_delay, max_retries)
.get_version_info()
.await
.unwrap();
GrpcClient::new(
address,
channel,
retry_delay,
max_retries,
linera_rpc::node_provider::DEFAULT_MAX_BACKOFF,
)
.get_version_info()
.await
.unwrap();
}
2 changes: 2 additions & 0 deletions linera-service/src/cli_wrappers/local_net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,7 @@ impl LocalNet {
recv_timeout: Duration::from_secs(5),
retry_delay: Duration::from_secs(1),
max_retries: 1,
..Default::default()
};
let provider = linera_rpc::simple::SimpleNodeProvider::new(options);
let address = format!("{protocol}:127.0.0.1:{port}");
Expand Down Expand Up @@ -989,6 +990,7 @@ impl LocalNet {
recv_timeout: Duration::from_secs(1),
retry_delay: Duration::ZERO,
max_retries: 0,
..Default::default()
});

Ok(node_provider.make_node(&self.validator_address(validator))?)
Expand Down
9 changes: 9 additions & 0 deletions linera-service/src/exporter/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ struct RunOptions {
#[arg(long, default_value = "10")]
pub max_retries: u32,

/// Maximum backoff delay for retrying to connect to a destination.
#[arg(
long = "max-backoff-ms",
default_value = "30000",
value_parser = util::parse_millis
)]
pub max_backoff: Duration,

/// Port for the metrics server.
#[arg(long)]
pub metrics_port: Option<u16>,
Expand Down Expand Up @@ -217,6 +225,7 @@ impl RunOptions {
recv_timeout: self.recv_timeout,
retry_delay: self.retry_delay,
max_retries: self.max_retries,
max_backoff: self.max_backoff,
};

if let Some(port) = self.metrics_port {
Expand Down
Loading
Loading