Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 53 additions & 25 deletions consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::{
use tracing::{debug, error, info, trace, warn};

use crate::{
BlockAPI,
BlockAPI, CommitIndex,
block::{ExtendedBlock, SignedBlock, VerifiedBlock},
block_verifier::BlockVerifier,
commit_vote_monitor::CommitVoteMonitor,
Expand All @@ -57,6 +57,9 @@ const MAX_AUTHORITIES_TO_FETCH_PER_BLOCK: usize = 2;
// Max number of peers to request missing blocks concurrently in periodic sync.
const MAX_PERIODIC_SYNC_PEERS: usize = 3;

// If commit index hasn't progressed for this duration, run periodic sync despite lagging.
const COMMIT_PROGRESS_THRESHOLD: Duration = Duration::from_secs(10);

struct BlocksGuard {
map: Arc<InflightBlocksMap>,
block_refs: BTreeSet<BlockRef>,
Expand Down Expand Up @@ -245,6 +248,8 @@ pub(crate) struct Synchronizer<C: NetworkClient, V: BlockVerifier, D: CoreThread
round_tracker: Arc<RwLock<RoundTracker>>,
inflight_blocks_map: Arc<InflightBlocksMap>,
commands_sender: Sender<Command>,
last_commit_index_change_time: Instant,
last_seen_commit_index: CommitIndex,
}

impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C, V, D> {
Expand Down Expand Up @@ -298,6 +303,7 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
}

// Spawn the task to listen to the requests & periodic runs
let initial_commit_index = dag_state.read().last_commit_index();
tasks.spawn(monitored_future!(async move {
let mut s = Self {
context,
Expand All @@ -314,6 +320,8 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
commands_sender: commands_sender_clone,
dag_state,
round_tracker,
last_seen_commit_index: initial_commit_index,
last_commit_index_change_time: Instant::now(),
};
s.run().await;
}));
Expand Down Expand Up @@ -908,8 +916,22 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C

// If we are commit lagging, then we don't want to enable the scheduler. As the node is sycnhronizing via the commit syncer, the certified commits
// will bring all the necessary blocks to run the commits. As the commits are certified, we are guaranteed that all the necessary causal history is present.
// However, if local commit index has not progressed for too long, we run periodic sync anyway to help unblock progress, in case large commit sync
// requests are failing too frequently.
Comment on lines +919 to +920
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we actually observed this in any of our environments? Because I would expect this to not significantly help and maybe even create some race between the synchronizer and commit syncer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I have observed this in tests where sometimes commit sync took a long time. Let me optimize and test it out a bit more.

if self.is_commit_lagging() {
return Ok(());
if self.is_commit_progressing() {
debug!(
"Local commit index {} is lagging behind quorum commit index {}, not running periodic sync",
self.last_seen_commit_index,
self.commit_vote_monitor.quorum_commit_index(),
);
return Ok(());
}
debug!(
"Local commit index {} has not progressed for {:?}, running periodic sync despite commit lagging",
self.last_seen_commit_index,
self.last_commit_index_change_time.elapsed()
);
}

self.fetch_blocks_scheduler_task
Expand Down Expand Up @@ -992,6 +1014,22 @@ impl<C: NetworkClient, V: BlockVerifier, D: CoreThreadDispatcher> Synchronizer<C
commit_threshold < quorum_commit_index
}

/// Returns true if last_commit_index has progressed, or has not progressed for less than COMMIT_PROGRESS_THRESHOLD.
/// Updates tracking state when commit index changes.
fn is_commit_progressing(&mut self) -> bool {
let current_commit_index = self.dag_state.read().last_commit_index();

if current_commit_index != self.last_seen_commit_index {
// Commit index progressed, update tracking
self.last_seen_commit_index = current_commit_index;
self.last_commit_index_change_time = Instant::now();
return true;
}

// Check if still progressing
self.last_commit_index_change_time.elapsed() < COMMIT_PROGRESS_THRESHOLD
}

/// Fetches the `missing_blocks` from peers. Requests the same number of authorities with missing blocks from each peer.
/// Each response from peer can contain the requested blocks, and additional blocks from the last accepted round for
/// authorities with missing blocks.
Expand Down Expand Up @@ -1195,7 +1233,8 @@ mod tests {
use parking_lot::RwLock;
use tokio::{sync::Mutex, time::sleep};

use crate::commit::{CommitVote, TrustedCommit};
use super::COMMIT_PROGRESS_THRESHOLD;
use crate::commit::CommitVote;
use crate::{
CommitDigest, CommitIndex,
block::{TestBlock, VerifiedBlock},
Expand Down Expand Up @@ -1733,38 +1772,27 @@ mod tests {
false,
);

sleep(4 * FETCH_REQUEST_TIMEOUT).await;
// Wait for less than COMMIT_PROGRESS_THRESHOLD (10s) - periodic sync should be disabled
sleep(COMMIT_PROGRESS_THRESHOLD / 2).await;

// Since we should be in commit lag mode none of the missed blocks should have been fetched - hence nothing should be
// sent to core for processing.
// Since we should be in commit lag mode and commit is still "progressing",
// none of the missed blocks should have been fetched
let added_blocks = core_dispatcher.get_add_blocks().await;
assert_eq!(added_blocks, vec![]);

// AND advance now the local commit index by adding a new commit that matches the commit index
// of quorum
{
let mut d = dag_state.write();
for index in 1..=commit_index {
let commit =
TrustedCommit::new_for_test(index, CommitDigest::MIN, 0, BlockRef::MIN, vec![]);
// Now wait past COMMIT_PROGRESS_THRESHOLD total - commit is no longer progressing.
sleep(COMMIT_PROGRESS_THRESHOLD).await;

d.add_commit(commit);
}

assert_eq!(
d.last_commit_index(),
commit_vote_monitor.quorum_commit_index()
);
}

// Now stub again the missing blocks to fetch the exact same ones.
// Re-stub missing blocks NOW (after commit is stale). The previous stub was consumed by
// get_missing_blocks() during periodic checks while commit was still progressing.
core_dispatcher
.stub_missing_blocks(missing_blocks.clone())
.await;

sleep(2 * FETCH_REQUEST_TIMEOUT).await;
// Wait for periodic sync to run and complete the fetch (needs > FETCH_FROM_PEERS_TIMEOUT)
sleep(4 * FETCH_REQUEST_TIMEOUT + Duration::from_millis(500)).await;

// THEN the missing blocks should now be fetched and added to core
// THEN the missing blocks should now be fetched despite still being commit lagging
let mut added_blocks = core_dispatcher.get_add_blocks().await;

added_blocks.sort_by_key(|block| block.reference());
Expand Down
Loading