Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions crates/consensus/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ use malachite_consensus::{
State,
};
use malachite_signing_ed25519::Signature;
use malachite_types::{Height as _, SignedMessage, ThresholdParams, Timeout, ValuePayload};
use malachite_types::{
Height as _,
SignedMessage,
ThresholdParams,
Timeout,
TimeoutKind,
ValuePayload,
};
use tokio::time::Instant;
use wal::*;

Expand All @@ -30,6 +37,7 @@ use crate::{
NetworkMessage,
Proposal,
ProposerSelector,
Round,
SignedProposal,
SignedVote,
ValidatorSet,
Expand Down Expand Up @@ -99,7 +107,7 @@ impl<
}

/// Recover the consensus from a list of write-ahead log entries.
pub fn recover_from_wal(&mut self, entries: Vec<WalEntry<V, A>>) {
pub fn recover_from_wal(&mut self, entries: Vec<WalEntry<V, A>>) -> Option<Round> {
tracing::debug!(
validator = %self.state.address(),
entry_count = entries.len(),
Expand All @@ -117,12 +125,16 @@ impl<
}

// Now process the entries.
let mut max_round: Option<u32> = None;
for (i, entry) in entries.into_iter().enumerate() {
// We skip Decision entries as they're just markers.
if matches!(entry, WalEntry::Decision { .. }) {
continue;
}

// Find the last round with votes
max_round = max_round.max(Self::get_vote_round(&entry));

let input = convert_wal_entry_to_input(entry);
if let Err(e) = self.process_input(input) {
tracing::warn!(
Expand All @@ -142,6 +154,16 @@ impl<
validator = %self.state.address(),
"Completed WAL recovery"
);

max_round.map(Round::from)
}

/// Schedule rebroadcast timeout.
pub fn schedule_rebroadcast(&mut self, round: Round) {
self.timeout_manager.schedule_timeout(Timeout {
kind: TimeoutKind::Rebroadcast,
round: round.into(),
});
}

/// Check if this consensus engine has been finalized (i.e., a decision has
Expand Down Expand Up @@ -243,6 +265,14 @@ impl<
self.output_queue.pop_front()
}

fn get_vote_round(entry: &WalEntry<V, A>) -> Option<u32> {
if let WalEntry::SignedVote(signed) = entry {
signed.vote.round.0
} else {
None
}
}

#[allow(clippy::result_large_err)]
fn process_input(
&mut self,
Expand Down
13 changes: 11 additions & 2 deletions crates/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,12 @@ impl<
let mut internal_consensus = consensus.create_consensus(height, &validator_set);

// Recover from WAL first to restore the engine state.
internal_consensus.recover_from_wal(entries);
let vote_round = internal_consensus.recover_from_wal(entries);
if let Some(round) = vote_round {
// Schedule rebroadcast timeout.
// See https://github.com/eqlabs/pathfinder/issues/3286 for motivation.
internal_consensus.schedule_rebroadcast(round);
}

// Only call StartHeight if the height is not already finalized.
if !internal_consensus.is_finalized() {
Expand Down Expand Up @@ -578,7 +583,11 @@ impl<
let validator_set = validator_sets.get_validator_set(height)?;
let mut internal_consensus = consensus.create_consensus(height, &validator_set);
internal_consensus.handle_command(ConsensusCommand::StartHeight(height, validator_set));
internal_consensus.recover_from_wal(entries);
let vote_round = internal_consensus.recover_from_wal(entries);
if let Some(round) = vote_round {
// Schedule rebroadcast timeout.
internal_consensus.schedule_rebroadcast(round);
}
consensus.internal.insert(height, internal_consensus);
}

Expand Down
3 changes: 3 additions & 0 deletions crates/pathfinder/src/config/integration_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub enum InjectFailureTrigger {
ProposalDecided,
ProposalCommitted,
OutdatedVote,
ProposalCommittedSilently,
}

impl InjectFailureTrigger {
Expand All @@ -42,6 +43,7 @@ impl InjectFailureTrigger {
InjectFailureTrigger::ProposalDecided => "proposal_decided",
InjectFailureTrigger::ProposalCommitted => "proposal_committed",
InjectFailureTrigger::OutdatedVote => "outdated_vote",
InjectFailureTrigger::ProposalCommittedSilently => "proposal_committed_silently",
}
}
}
Expand All @@ -62,6 +64,7 @@ impl FromStr for InjectFailureTrigger {
"proposal_decided" => Ok(InjectFailureTrigger::ProposalDecided),
"proposal_committed" => Ok(InjectFailureTrigger::ProposalCommitted),
"outdated_vote" => Ok(InjectFailureTrigger::OutdatedVote),
"proposal_committed_silently" => Ok(InjectFailureTrigger::ProposalCommittedSilently),
_ => Err(format!("Unknown inject failure event: {s}")),
}
}
Expand Down
34 changes: 33 additions & 1 deletion crates/pathfinder/src/consensus/inner/integration_testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,13 @@ pub fn debug_fail_on_proposal_committed(
data_directory: &Path,
) {
debug_fail_on(
|trigger| matches!(trigger, InjectFailureTrigger::ProposalCommitted),
|trigger| {
matches!(
trigger,
InjectFailureTrigger::ProposalCommitted
| InjectFailureTrigger::ProposalCommittedSilently
)
},
height,
inject_failure,
data_directory,
Expand Down Expand Up @@ -215,3 +221,29 @@ pub fn send_outdated_vote(
) -> bool {
false
}

#[cfg(all(
feature = "p2p",
feature = "consensus-integration-tests",
debug_assertions
))]
pub fn do_not_send_vote(vote_height: u64, inject_failure: Option<InjectFailureConfig>) -> bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn do_not_send_vote(vote_height: u64, inject_failure: Option<InjectFailureConfig>) -> bool {
pub fn debug_do_not_send_vote(vote_height: u64, inject_failure: Option<InjectFailureConfig>) -> bool {

matches!(inject_failure,
Some(InjectFailureConfig {
height,
trigger: InjectFailureTrigger::ProposalCommittedSilently,
}) if vote_height >= height
)
}

#[cfg(not(all(
feature = "p2p",
feature = "consensus-integration-tests",
debug_assertions
)))]
pub fn do_not_send_vote(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub fn do_not_send_vote(
pub fn debug_do_not_send_vote(

_proposal_height: u64,
_inject_failure: Option<InjectFailureConfig>,
) -> bool {
false
}
5 changes: 4 additions & 1 deletion crates/pathfinder/src/consensus/inner/p2p_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,10 @@ pub fn spawn(
.await?;
}
ComputationSuccess::GossipVote(vote) => {
gossip_handler.gossip_vote(&p2p_client, vote).await?;
// The condition is always true in production builds.
if !integration_testing::do_not_send_vote(vote.height, inject_failure) {
gossip_handler.gossip_vote(&p2p_client, vote).await?;
}
}
ComputationSuccess::PreviouslyDeferredProposalIsFinalized(hnr, commitment) => {
send_proposal_to_consensus(&tx_to_consensus, hnr, commitment).await;
Expand Down
6 changes: 2 additions & 4 deletions crates/pathfinder/tests/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ mod test {
#[case::fail_on_prevote_rx(Some(InjectFailureConfig { height: 4, trigger: InjectFailureTrigger::PrevoteRx }))]
#[case::fail_on_precommit_rx(Some(InjectFailureConfig { height: 4, trigger: InjectFailureTrigger::PrecommitRx }))]
#[case::fail_on_proposal_decided(Some(InjectFailureConfig { height: 4, trigger: InjectFailureTrigger::ProposalDecided }))]
#[case::fail_on_proposal_committed(Some(InjectFailureConfig { height: 4, trigger: InjectFailureTrigger::ProposalCommitted }))]
#[tokio::test]
async fn consensus_3_nodes_with_failures(#[case] inject_failure: Option<InjectFailureConfig>) {
const NUM_NODES: usize = 3;
Expand Down Expand Up @@ -233,10 +234,7 @@ mod test {
}

#[rstest]
// Cannot be tested with just 3 nodes b/c Bob might break the
// Gossip communication property when restarting - see
// https://github.com/eqlabs/pathfinder/issues/3286
#[case::fail_on_proposal_committed(InjectFailureConfig { height: 4, trigger: InjectFailureTrigger::ProposalCommitted })]
#[case::fail_on_proposal_committed(InjectFailureConfig { height: 4, trigger: InjectFailureTrigger::ProposalCommittedSilently })]
#[tokio::test]
async fn consensus_4_nodes_with_failures(#[case] inject_failure: InjectFailureConfig) {
const NUM_NODES: usize = 4;
Expand Down
Loading