Skip to content

Commit 5417741

Browse files
committed
[consensus] disable periodic sync only when commit sync is not progressing
1 parent e0c653e commit 5417741

File tree

1 file changed

+53
-25
lines changed

1 file changed

+53
-25
lines changed

consensus/core/src/synchronizer.rs

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use tokio::{
3030
use tracing::{debug, error, info, trace, warn};
3131

3232
use crate::{
33-
BlockAPI,
33+
BlockAPI, CommitIndex,
3434
block::{ExtendedBlock, SignedBlock, VerifiedBlock},
3535
block_verifier::BlockVerifier,
3636
commit_vote_monitor::CommitVoteMonitor,
@@ -57,6 +57,9 @@ const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
5757
// Max number of peers to request missing blocks concurrently in periodic sync.
5858
const MAX_PERIODIC_SYNC_PEERS: usize = 3;
5959

60+
// If commit index hasn't progressed for this duration, run periodic sync despite lagging.
61+
const COMMIT_PROGRESS_THRESHOLD: Duration = Duration::from_secs(10);
62+
6063
struct BlocksGuard {
6164
map: Arc<InflightBlocksMap>,
6265
block_refs: BTreeSet<BlockRef>,
@@ -245,6 +248,8 @@ pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThread
245248
round_tracker: Arc<RwLock<RoundTracker>>,
246249
inflight_blocks_map: Arc<InflightBlocksMap>,
247250
commands_sender: Sender<Command>,
251+
last_commit_index_change_time: Instant,
252+
last_seen_commit_index: CommitIndex,
248253
}
249254

250255
impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
@@ -298,6 +303,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
298303
}
299304

300305
// Spawn the task to listen to the requests & periodic runs
306+
let initial_commit_index = dag_state.read().last_commit_index();
301307
tasks.spawn(monitored_future!(async move {
302308
let mut s = Self {
303309
context,
@@ -314,6 +320,8 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
314320
commands_sender: commands_sender_clone,
315321
dag_state,
316322
round_tracker,
323+
last_seen_commit_index: initial_commit_index,
324+
last_commit_index_change_time: Instant::now(),
317325
};
318326
s.run().await;
319327
}));
@@ -908,8 +916,22 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
908916

909917
// If we are commit lagging, then we don't want to enable the scheduler. As the node is sycnhronizing via the commit syncer, the certified commits
910918
// will bring all the necessary blocks to run the commits. As the commits are certified, we are guaranteed that all the necessary causal history is present.
919+
// However, if local commit index has not progressed for too long, we run periodic sync anyway to help unblock progress, in case large commit sync
920+
// requests are failing too frequently.
911921
if self.is_commit_lagging() {
912-
return Ok(());
922+
if self.is_commit_progressing() {
923+
debug!(
924+
"Local commit index {} is lagging behind quorum commit index {}, not running periodic sync",
925+
self.last_seen_commit_index,
926+
self.commit_vote_monitor.quorum_commit_index(),
927+
);
928+
return Ok(());
929+
}
930+
debug!(
931+
"Local commit index {} has not progressed for {:?}, running periodic sync despite commit lagging",
932+
self.last_seen_commit_index,
933+
self.last_commit_index_change_time.elapsed()
934+
);
913935
}
914936

915937
self.fetch_blocks_scheduler_task
@@ -992,6 +1014,22 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
9921014
commit_threshold < quorum_commit_index
9931015
}
9941016

1017+
/// Returns true if last_commit_index has progressed, or has not progressed for less than COMMIT_PROGRESS_THRESHOLD.
1018+
/// Updates tracking state when commit index changes.
1019+
fn is_commit_progressing(&mut self) -> bool {
1020+
let current_commit_index = self.dag_state.read().last_commit_index();
1021+
1022+
if current_commit_index != self.last_seen_commit_index {
1023+
// Commit index progressed, update tracking
1024+
self.last_seen_commit_index = current_commit_index;
1025+
self.last_commit_index_change_time = Instant::now();
1026+
return true;
1027+
}
1028+
1029+
// Check if still progressing
1030+
self.last_commit_index_change_time.elapsed() < COMMIT_PROGRESS_THRESHOLD
1031+
}
1032+
9951033
/// Fetches the `missing_blocks` from peers. Requests the same number of authorities with missing blocks from each peer.
9961034
/// Each response from peer can contain the requested blocks, and additional blocks from the last accepted round for
9971035
/// authorities with missing blocks.
@@ -1195,7 +1233,8 @@ mod tests {
11951233
use parking_lot::RwLock;
11961234
use tokio::{sync::Mutex, time::sleep};
11971235

1198-
use crate::commit::{CommitVote, TrustedCommit};
1236+
use super::COMMIT_PROGRESS_THRESHOLD;
1237+
use crate::commit::CommitVote;
11991238
use crate::{
12001239
CommitDigest, CommitIndex,
12011240
block::{TestBlock, VerifiedBlock},
@@ -1733,38 +1772,27 @@ mod tests {
17331772
false,
17341773
);
17351774

1736-
sleep(4 * FETCH_REQUEST_TIMEOUT).await;
1775+
// Wait for less than COMMIT_PROGRESS_THRESHOLD (10s) - periodic sync should be disabled
1776+
sleep(COMMIT_PROGRESS_THRESHOLD / 2).await;
17371777

1738-
// Since we should be in commit lag mode none of the missed blocks should have been fetched - hence nothing should be
1739-
// sent to core for processing.
1778+
// Since we should be in commit lag mode and commit is still "progressing",
1779+
// none of the missed blocks should have been fetched
17401780
let added_blocks = core_dispatcher.get_add_blocks().await;
17411781
assert_eq!(added_blocks, vec![]);
17421782

1743-
// AND advance now the local commit index by adding a new commit that matches the commit index
1744-
// of quorum
1745-
{
1746-
let mut d = dag_state.write();
1747-
for index in 1..=commit_index {
1748-
let commit =
1749-
TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
1783+
// Now wait past COMMIT_PROGRESS_THRESHOLD total - commit is no longer progressing.
1784+
sleep(COMMIT_PROGRESS_THRESHOLD).await;
17501785

1751-
d.add_commit(commit);
1752-
}
1753-
1754-
assert_eq!(
1755-
d.last_commit_index(),
1756-
commit_vote_monitor.quorum_commit_index()
1757-
);
1758-
}
1759-
1760-
// Now stub again the missing blocks to fetch the exact same ones.
1786+
// Re-stub missing blocks NOW (after commit is stale). The previous stub was consumed by
1787+
// get_missing_blocks() during periodic checks while commit was still progressing.
17611788
core_dispatcher
17621789
.stub_missing_blocks(missing_blocks.clone())
17631790
.await;
17641791

1765-
sleep(2 * FETCH_REQUEST_TIMEOUT).await;
1792+
// Wait for periodic sync to run and complete the fetch (needs > FETCH_FROM_PEERS_TIMEOUT)
1793+
sleep(4 * FETCH_REQUEST_TIMEOUT + Duration::from_millis(500)).await;
17661794

1767-
// THEN the missing blocks should now be fetched and added to core
1795+
// THEN the missing blocks should now be fetched despite still being commit lagging
17681796
let mut added_blocks = core_dispatcher.get_add_blocks().await;
17691797

17701798
added_blocks.sort_by_key(|block| block.reference());

0 commit comments

Comments
 (0)