Skip to content

Commit 6ed6780

Browse files
authored
[consensus] reduce retry timeout for submission and subscription (#24412)
## Description Besides the PR title, also refactor `ExponentialBackoff` to accept an `initial_delay`, and improve logging a bit. ## Test plan CI `SIM_STRESS_TEST_DURATION_SECS=300 python ./scripts/simtest/seed-search.py test_mainnet_config --test simtest --num-seeds 1000`
1 parent f16d5ec commit 6ed6780

File tree

6 files changed

+60
-73
lines changed

6 files changed

+60
-73
lines changed

consensus/core/src/subscriber.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,15 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
118118
last_received: Round,
119119
) {
120120
const IMMEDIATE_RETRIES: i64 = 3;
121+
const MIN_TIMEOUT: Duration = Duration::from_millis(500);
121122
// When not immediately retrying, limit retry delay between 100ms and 10s.
122-
const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
123-
const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(10);
124-
const RETRY_INTERVAL_MULTIPLIER: f32 = 1.2;
123+
let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
124+
Duration::from_millis(100),
125+
Duration::from_secs(10),
126+
);
127+
125128
let peer_hostname = &context.committee.authority(peer).hostname;
126129
let mut retries: i64 = 0;
127-
let mut delay = INITIAL_RETRY_INTERVAL;
128130
'subscription: loop {
129131
context
130132
.metrics
@@ -133,29 +135,26 @@ impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
133135
.with_label_values(&[peer_hostname])
134136
.set(0);
135137

138+
let mut delay = Duration::ZERO;
136139
if retries > IMMEDIATE_RETRIES {
140+
delay = backoff.next().unwrap();
137141
debug!(
138142
"Delaying retry {} of peer {} subscription, in {} seconds",
139143
retries,
140144
peer_hostname,
141145
delay.as_secs_f32(),
142146
);
143147
sleep(delay).await;
144-
// Update delay for the next retry.
145-
delay = delay
146-
.mul_f32(RETRY_INTERVAL_MULTIPLIER)
147-
.min(MAX_RETRY_INTERVAL);
148148
} else if retries > 0 {
149149
// Retry immediately, but still yield to avoid monopolizing the thread.
150150
tokio::task::yield_now().await;
151-
} else {
152-
// First attempt, reset delay for next retries but no waiting.
153-
delay = INITIAL_RETRY_INTERVAL;
154151
}
155152
retries += 1;
156153

154+
// Use longer timeout when retry delay is long, to adapt to slow network.
155+
let request_timeout = MIN_TIMEOUT.max(delay);
157156
let mut blocks = match network_client
158-
.subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL)
157+
.subscribe_blocks(peer, last_received, request_timeout)
159158
.await
160159
{
161160
Ok(blocks) => {

crates/mysten-common/src/backoff.rs

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@ use rand::Rng as _;
1616
/// use mysten_common::backoff::ExponentialBackoff;
1717
///
1818
/// // Basic example:
19-
/// let mut backoff = ExponentialBackoff::new(Duration::from_secs(10));
19+
/// let mut backoff = ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(10));
2020
/// for (attempt, delay) in backoff.enumerate() {
2121
/// println!("Attempt {attempt}: Delay: {:?}", delay);
2222
/// }
2323
///
24-
/// // Specifying initial, maximum delay and jitter:
25-
/// let mut backoff = ExponentialBackoff::new(Duration::from_secs(60))
26-
/// .base(Duration::from_secs(5))
27-
/// .factor(1.2)
24+
/// // Specifying backoff factor and maximum jitter:
25+
/// let mut backoff = ExponentialBackoff::new(Duration::from_secs(5), Duration::from_secs(60))
26+
/// .factor(2.0)
2827
/// .max_jitter(Duration::from_secs(1));
2928
/// loop {
3029
/// // next() should always return a Some(Duration).
@@ -34,42 +33,35 @@ use rand::Rng as _;
3433
/// ```
3534
#[derive(Debug, Clone)]
3635
pub struct ExponentialBackoff {
37-
current: Duration,
36+
next: Duration,
3837
factor: f64,
3938
max_delay: Duration,
4039
max_jitter: Duration,
4140
}
4241

4342
impl ExponentialBackoff {
4443
/// Constructs a new exponential backoff generator, specifying the maximum delay.
45-
pub fn new(max_delay: Duration) -> ExponentialBackoff {
44+
pub fn new(initial_delay: Duration, max_delay: Duration) -> ExponentialBackoff {
4645
ExponentialBackoff {
47-
current: Duration::from_millis(50),
48-
factor: 1.5,
46+
next: initial_delay,
47+
factor: 1.2,
4948
max_delay,
50-
max_jitter: Duration::from_millis(50),
49+
max_jitter: initial_delay,
5150
}
5251
}
5352

54-
/// Sets the base delay for computing the next delay, before increasing by the exponential factor and adding jitter.
55-
///
56-
/// Default base delay is 50ms.
57-
pub fn base(mut self, base: Duration) -> ExponentialBackoff {
58-
self.current = base;
59-
self
60-
}
61-
6253
/// Sets the approximate ratio of consecutive backoff delays, before jitters are applied.
6354
/// Setting this to Duration::ZERO disables jittering.
6455
///
65-
/// Default factor is 1.5.
56+
/// Default is 1.2.
6657
pub fn factor(mut self, factor: f64) -> ExponentialBackoff {
6758
self.factor = factor;
6859
self
6960
}
7061

7162
/// Sets the maximum jitter per delay.
72-
/// Default maximum jitter is 50ms.
63+
///
64+
/// Default is the initial delay.
7365
pub fn max_jitter(mut self, max_jitter: Duration) -> ExponentialBackoff {
7466
self.max_jitter = max_jitter;
7567
self
@@ -81,32 +73,31 @@ impl Iterator for ExponentialBackoff {
8173

8274
/// Yields backoff delays. Never terminates.
8375
fn next(&mut self) -> Option<Duration> {
76+
let current = self.next;
77+
8478
let jitter = if self.max_jitter.is_zero() {
8579
Duration::ZERO
8680
} else {
8781
Duration::from_secs_f64(
8882
rand::thread_rng().gen_range(0.0..self.max_jitter.as_secs_f64()),
8983
)
9084
};
91-
let next = self
92-
.current
85+
self.next = current
9386
.mul_f64(self.factor)
9487
.min(self.max_delay)
9588
.saturating_add(jitter);
9689

97-
self.current = next;
98-
99-
Some(next)
90+
Some(current)
10091
}
10192
}
10293

10394
#[test]
10495
fn test_exponential_backoff_default() {
105-
let mut backoff = ExponentialBackoff::new(Duration::from_secs(10));
96+
let mut backoff = ExponentialBackoff::new(Duration::from_millis(50), Duration::from_secs(10));
10697

10798
let bounds = vec![
108-
(Duration::from_millis(75), Duration::from_millis(125)),
109-
(Duration::from_millis(110), Duration::from_millis(250)),
99+
(Duration::from_millis(50), Duration::from_millis(100)),
100+
(Duration::from_millis(60), Duration::from_millis(170)),
110101
];
111102
for ((lower, upper), delay) in bounds.into_iter().zip(backoff.next()) {
112103
assert!(delay >= lower && delay <= upper);
@@ -115,23 +106,23 @@ fn test_exponential_backoff_default() {
115106

116107
#[test]
117108
fn test_exponential_backoff_base_100_factor_2_no_jitter() {
118-
let mut backoff = ExponentialBackoff::new(Duration::from_secs(10))
119-
.base(Duration::from_millis(100))
109+
let mut backoff = ExponentialBackoff::new(Duration::from_millis(100), Duration::from_secs(10))
120110
.factor(2.0)
121111
.max_jitter(Duration::ZERO);
122112

113+
assert_eq!(backoff.next(), Some(Duration::from_millis(100)));
123114
assert_eq!(backoff.next(), Some(Duration::from_millis(200)));
124115
assert_eq!(backoff.next(), Some(Duration::from_millis(400)));
125116
assert_eq!(backoff.next(), Some(Duration::from_millis(800)));
126117
}
127118

128119
#[test]
129120
fn test_exponential_backoff_max_delay() {
130-
let mut backoff = ExponentialBackoff::new(Duration::from_secs(1))
131-
.base(Duration::from_millis(200))
121+
let mut backoff = ExponentialBackoff::new(Duration::from_millis(200), Duration::from_secs(1))
132122
.factor(3.0)
133123
.max_jitter(Duration::ZERO);
134124

125+
assert_eq!(backoff.next(), Some(Duration::from_millis(200)));
135126
assert_eq!(backoff.next(), Some(Duration::from_millis(600)));
136127

137128
for _ in 0..10 {

crates/sui-core/src/consensus_adapter.rs

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -739,9 +739,11 @@ impl ConsensusAdapter {
739739
// In addition to that, within_alive_epoch ensures that all pending consensus
740740
// adapter tasks are stopped before reconfiguration can proceed.
741741
//
742-
// This is essential because narwhal workers reuse same ports when narwhal restarts,
743-
// this means we might be sending transactions from previous epochs to narwhal of
744-
// new epoch if we have not had this barrier.
742+
// This is essential because after epoch change, this validator may exit the committee and become a full node.
743+
// So it is no longer able to submit to consensus.
744+
//
745+
// Also, submission to consensus is not gated on epoch. Although it is ok to submit user transactions
746+
// to the new epoch, we want to cancel system transaction submissions from the current epoch to the new epoch.
745747
epoch_store
746748
.within_alive_epoch(self.submit_and_wait_inner(
747749
transactions,
@@ -769,7 +771,7 @@ impl ConsensusAdapter {
769771
"Performing a ping check, pinging consensus to get a consensus position in next block"
770772
);
771773
let (consensus_positions, _status_waiter) = self
772-
.submit_inner(&transactions, epoch_store, &[], "ping", false)
774+
.submit_inner(&transactions, epoch_store, &[], "ping")
773775
.await;
774776

775777
if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
@@ -917,13 +919,7 @@ impl ConsensusAdapter {
917919
loop {
918920
// Submit the transaction to consensus and return the submit result with a status waiter
919921
let (consensus_positions, status_waiter) = self
920-
.submit_inner(
921-
&transactions,
922-
epoch_store,
923-
&transaction_keys,
924-
tx_type,
925-
is_soft_bundle,
926-
)
922+
.submit_inner(&transactions, epoch_store, &transaction_keys, tx_type)
927923
.await;
928924

929925
if let Some(tx_consensus_positions) = tx_consensus_positions.take() {
@@ -1050,11 +1046,13 @@ impl ConsensusAdapter {
10501046
epoch_store: &Arc<AuthorityPerEpochStore>,
10511047
transaction_keys: &[SequencedConsensusTransactionKey],
10521048
tx_type: &str,
1053-
is_soft_bundle: bool,
10541049
) -> (Vec<ConsensusPosition>, BlockStatusReceiver) {
10551050
let ack_start = Instant::now();
10561051
let mut retries: u32 = 0;
1057-
let is_dkg = !transactions.is_empty() && transactions[0].is_dkg();
1052+
let mut backoff = mysten_common::backoff::ExponentialBackoff::new(
1053+
Duration::from_millis(100),
1054+
Duration::from_secs(10),
1055+
);
10581056

10591057
let (consensus_positions, status_waiter) = loop {
10601058
let span = debug_span!("client_submit");
@@ -1065,11 +1063,10 @@ impl ConsensusAdapter {
10651063
.await
10661064
{
10671065
Err(err) => {
1068-
// This can happen during reconfig, or when consensus has full internal buffers
1069-
// and needs to back pressure, so retry a few times before logging warnings.
1070-
if retries > 30 || (retries > 3 && (is_soft_bundle || !is_dkg)) {
1066+
// This can happen during reconfig, so keep retrying until succeed.
1067+
if cfg!(msim) || retries > 3 {
10711068
warn!(
1072-
"Failed to submit transactions {transaction_keys:?} to consensus: {err:?}. Retry #{retries}"
1069+
"Failed to submit transactions {transaction_keys:?} to consensus: {err}. Retry #{retries}"
10731070
);
10741071
}
10751072
self.metrics
@@ -1078,13 +1075,7 @@ impl ConsensusAdapter {
10781075
.inc();
10791076
retries += 1;
10801077

1081-
if is_dkg {
1082-
// Shorter delay for DKG messages, which are time-sensitive and happen at
1083-
// start-of-epoch when submit errors due to active reconfig are likely.
1084-
time::sleep(Duration::from_millis(100)).await;
1085-
} else {
1086-
time::sleep(Duration::from_secs(10)).await;
1087-
};
1078+
time::sleep(backoff.next().unwrap()).await;
10881079
}
10891080
Ok((consensus_positions, status_waiter)) => {
10901081
break (consensus_positions, status_waiter);
@@ -1462,7 +1453,7 @@ impl SubmitToConsensus for Arc<ConsensusAdapter> {
14621453

14631454
let result = tokio::time::timeout(
14641455
timeout,
1465-
this.submit_inner(&[transaction], &epoch_store, &[key], tx_type, false),
1456+
this.submit_inner(&[transaction], &epoch_store, &[key], tx_type),
14661457
)
14671458
.await;
14681459

crates/sui-core/src/transaction_driver/effects_certifier.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,8 @@ impl EffectsCertifier {
586586
A: AuthorityAPI + Send + Sync + 'static + Clone,
587587
{
588588
let effects_start = Instant::now();
589-
let backoff = ExponentialBackoff::new(MAX_WAIT_FOR_EFFECTS_RETRY_DELAY);
589+
let backoff =
590+
ExponentialBackoff::new(Duration::from_millis(100), MAX_WAIT_FOR_EFFECTS_RETRY_DELAY);
590591
let ping_type = raw_request.get_ping_type();
591592
// This loop should only retry errors that are retriable without new submission.
592593
for (attempt, delay) in backoff.enumerate() {

crates/sui-core/src/transaction_driver/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,10 @@ where
178178
.with_label_values(&[tx_type.as_str(), ping_label])
179179
.inc();
180180

181-
let mut backoff = ExponentialBackoff::new(MAX_DRIVE_TRANSACTION_RETRY_DELAY);
181+
let mut backoff = ExponentialBackoff::new(
182+
Duration::from_millis(100),
183+
MAX_DRIVE_TRANSACTION_RETRY_DELAY,
184+
);
182185
let mut attempts = 0;
183186
let mut latest_retriable_error = None;
184187

crates/sui-surfer/src/surfer_state.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ impl SurferState {
344344
rgp,
345345
);
346346
let tx = self.cluster.wallet.sign_transaction(&tx_data).await;
347+
let tx_digest = *tx.digest();
348+
info!(?tx_digest, "Publishing package");
347349
let start = Instant::now();
348350
let response = loop {
349351
match self
@@ -358,12 +360,12 @@ impl SurferState {
358360
Err(err) => {
359361
if start.elapsed() > Duration::from_secs(120) {
360362
fatal!(
361-
"Failed to publish package after 120 seconds: {:?} {}",
363+
"Failed to publish package after 120 seconds: {} {}",
362364
err,
363365
tx.digest()
364366
);
365367
}
366-
error!("Failed to publish package: {:?} {}", err, tx.digest());
368+
error!(?tx_digest, "Failed to publish package: {}", err);
367369
tokio::time::sleep(Duration::from_secs(1)).await;
368370
}
369371
}

0 commit comments

Comments
 (0)