Skip to content

Commit b2cecb3

Browse files
committed
fix(node/bft): wake up on proposal task on max leader delay
1 parent 1dc921e commit b2cecb3

File tree

2 files changed

+77
-2
lines changed

2 files changed

+77
-2
lines changed

node/bft/src/primary.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub use proposal_task::ProposalTask;
1919
use crate::{
2020
Gateway,
2121
MAX_BATCH_DELAY,
22+
MAX_LEADER_CERTIFICATE_DELAY,
2223
MAX_WORKERS,
2324
MIN_BATCH_DELAY,
2425
PRIMARY_PING_INTERVAL,
@@ -1485,6 +1486,11 @@ impl<N: Network> Primary<N> {
14851486
{
14861487
futures.push(Box::pin(tokio::time::sleep(remaining_delay)));
14871488
}
1489+
// Always ensure a wakeup no later than MAX_LEADER_CERTIFICATE_DELAY so that
1490+
// try_advance_to_next_round is called after the leader-certificate timer
1491+
// expires, even when no further certificates arrive (e.g. an even round where
1492+
// the elected leader was absent and quorum was reached without their cert).
1493+
futures.push(Box::pin(tokio::time::sleep(MAX_LEADER_CERTIFICATE_DELAY)));
14881494
if !self_.sync.is_synced() {
14891495
futures.push(Box::pin(self_.sync.wait_for_synced()));
14901496
}

node/bft/src/primary/proposal_task.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,12 @@ impl<N: Network> ProposalTask<N> {
171171
let timeout = MAX_BATCH_DELAY.saturating_sub(round_start.elapsed());
172172
futures.push(tokio::time::sleep(timeout).boxed());
173173

174-
// All conditions must hold for us to advance.
175-
futures::future::join_all(futures).await;
174+
// Any condition completing is sufficient to attempt a proposal.
175+
// Using select_all (not join_all) ensures that once MAX_BATCH_DELAY elapses
176+
// we call propose_batch() even if signal() was never fired — which happens
177+
// when an even round has no leader cert. propose_batch() calls
178+
// try_advance_to_next_round, which checks the leader-certificate timer.
179+
futures::future::select_all(futures).await;
176180
reached_min_batch_delay = true;
177181
}
178182

@@ -371,6 +375,71 @@ mod tests {
371375
assert_eq!(propose_count.load(Ordering::SeqCst), 0, "propose_batch called despite round advancement");
372376
}
373377

378+
/// Tests the following scenario
379+
///
380+
/// 1. A batch was already certified for the current round, so `is_proposal_ready` is `false`.
381+
/// 2. `signal()` is **never** called externally — the BFT cannot advance the round until
382+
/// `propose_batch()` is called (which internally checks the leader-certificate timer).
383+
#[test_log::test(tokio::test)]
384+
async fn test_proposal_task_advances_without_leader_cert() {
385+
// Start NOT ready: simulates a batch that was already certified for the round but the
386+
// round has not yet advanced (the even-round leader cert was missing — e.g. the elected
387+
// leader was one of the freshly-reset minority validators).
388+
let task = ProposalTask::<MainnetV0> {
389+
inner: Arc::new(ProposalTaskInner {
390+
is_proposal_ready: RwLock::new(false),
391+
is_ready_notify: Notify::new(),
392+
}),
393+
_phantom: PhantomData,
394+
};
395+
396+
let proposed_notify = Arc::new(Notify::new());
397+
let propose_count = Arc::new(AtomicU32::new(0));
398+
399+
// A proposer that stays on round 1 and returns Ok(true) on every call to
400+
// propose_batch(), simulating try_advance_to_next_round finding the leader-certificate
401+
// timer expired and advancing the round without an external signal().
402+
struct NoSignalProposer {
403+
propose_count: Arc<AtomicU32>,
404+
proposed_notify: Arc<Notify>,
405+
}
406+
407+
#[async_trait::async_trait]
408+
impl BatchPropose for NoSignalProposer {
409+
fn current_round(&self) -> u64 {
410+
1
411+
}
412+
413+
fn is_synced(&self) -> bool {
414+
true
415+
}
416+
417+
fn wait_for_synced_if_syncing(&self) -> Option<BoxFuture<'_, ()>> {
418+
None
419+
}
420+
421+
async fn propose_batch(&self) -> Result<bool> {
422+
self.propose_count.fetch_add(1, Ordering::SeqCst);
423+
self.proposed_notify.notify_one();
424+
Ok(true)
425+
}
426+
}
427+
428+
let proposer =
429+
NoSignalProposer { propose_count: propose_count.clone(), proposed_notify: proposed_notify.clone() };
430+
431+
// signal() is intentionally never called — the task must retry on its own.
432+
tokio::spawn(task.run(proposer));
433+
434+
// Allow enough time for MAX_BATCH_DELAY (2.5 s) to elapse plus the CREATE_BATCH_INTERVAL
435+
// (250 ms) retry window. Use 10 s to give generous headroom on slow CI machines.
436+
tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified())
437+
.await
438+
.expect("propose_batch was not called");
439+
440+
assert!(propose_count.load(Ordering::SeqCst) >= 1, "propose_batch should have been called at least once");
441+
}
442+
374443
/// When `propose_batch` returns `Ok(false)`, the task retries within the same round until
375444
/// it succeeds.
376445
///

0 commit comments

Comments
 (0)