Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions crates/apollo_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ pub enum RunHeightRes {

type ProposalReceiverTuple<T> = (ConsensusBlockInfo, mpsc::Receiver<T>);

// TODO(guyn): remove allow dead code once we use the duplicate vote report.
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct DuplicateVoteReport {
pub cached_vote: Vote,
pub new_vote: Vote,
}

/// Manages votes and proposals for future heights.
#[derive(Debug)]
struct ConsensusCache<ContextT: ConsensusContext> {
Expand Down Expand Up @@ -256,8 +264,27 @@ impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
}

/// Caches a vote for a future height.
fn cache_future_vote(&mut self, vote: Vote) {
self.future_votes.entry(vote.height).or_default().push(vote);
fn cache_future_vote(&mut self, vote: Vote) -> Option<DuplicateVoteReport> {
let votes = self.future_votes.entry(vote.height).or_default();
// Find a vote in the list with the same type, round, and voter. If found, do not add it to
// list.
let duplicate_vote = votes.iter().find(|v| {
v.vote_type == vote.vote_type && v.round == vote.round && v.voter == vote.voter
});
if let Some(duplicate_vote) = duplicate_vote {
// If the two votes are identical, we just ignore this.
if duplicate_vote == &vote {
return None;
}
// Otherwise, we report a duplicate vote.
return Some(DuplicateVoteReport {
cached_vote: duplicate_vote.clone(),
new_vote: vote,
});
}
// If no duplicate vote was found, we add the vote to the list.
votes.push(vote);
None
}

/// Caches a proposal for a future height.
Expand Down Expand Up @@ -822,7 +849,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
vote: (Result<Vote, ProtobufConversionError>, BroadcastedMessageMetadata),
broadcast_channels: &mut BroadcastVoteChannel,
) -> Result<Requests, ConsensusError> {
let message = match vote {
let (message, metadata) = match vote {
(Ok(message), metadata) => {
// TODO(matan): Hold onto report_sender for use in later errors by SHC.
if broadcast_channels
Expand All @@ -833,7 +860,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
{
error!("Unable to send continue_propagation. {:?}", metadata);
}
message
(message, metadata)
}
(Err(e), metadata) => {
// Failed to parse consensus message. Report the peer and drop the vote.
Expand All @@ -853,7 +880,12 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
std::cmp::Ordering::Greater => {
if self.cache.should_cache_vote(&height, 0, &message) {
trace!("Cache message for a future height. {:?}", message);
self.cache.cache_future_vote(message);
let duplicate_report = self.cache.cache_future_vote(message);
if let Some(duplicate_report) = duplicate_report {
warn!("Duplicate vote found: {:?}", duplicate_report);
self.report_peer(broadcast_channels, &metadata);
// TODO(guyn): send back a report with the duplicate information
}
}
Ok(VecDeque::new())
}
Expand Down
75 changes: 75 additions & 0 deletions crates/apollo_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,81 @@ async fn manager_fallback_to_sync_on_height_level_errors(consensus_config: Conse
assert_eq!(res, Ok(RunHeightRes::Sync));
}

#[rstest]
#[tokio::test]
async fn cache_future_vote_deduplication(consensus_config: ConsensusConfig) {
let TestSubscriberChannels { mock_network, subscriber_channels } =
mock_register_broadcast_topic().unwrap();
let mut sender = mock_network.broadcasted_messages_sender;
let mut reported_messages_receiver = mock_network.reported_messages_receiver;

let (mut proposal_receiver_sender, mut proposal_receiver_receiver) =
mpsc::channel(CHANNEL_SIZE);

// Send a prevote for HEIGHT_1 (future height - will be cached during HEIGHT_0 processing).
let original_vote = prevote(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID);
send(&mut sender, original_vote.clone()).await;

// Send the exact same vote again (duplicate - should be silently ignored, no report).
send(&mut sender, original_vote).await;

// Send a conflicting vote: same type/round/voter but different proposal commitment
// (equivocation - should trigger report_peer).
let equivocating_vote = prevote(Some(Felt::TWO), HEIGHT_1, ROUND_0, *PROPOSER_ID);
send(&mut sender, equivocating_vote).await;

// Send proposal and votes for HEIGHT_0 to reach a decision.
send_proposal(
&mut proposal_receiver_sender,
vec![TestProposalPart::BlockInfo(block_info(HEIGHT_0, ROUND_0, *PROPOSER_ID))],
)
.await;
send(&mut sender, prevote(Some(Felt::ZERO), HEIGHT_0, ROUND_0, *PROPOSER_ID)).await;
send(&mut sender, precommit(Some(Felt::ZERO), HEIGHT_0, ROUND_0, *PROPOSER_ID)).await;

let mut context = MockTestContext::new();
context.expect_try_sync().returning(|_| false);
expect_validate_proposal(&mut context, Felt::ZERO, 1);
context.expect_validators().returning(move |_| Ok(vec![*PROPOSER_ID, *VALIDATOR_ID]));
context.expect_proposer().returning(move |_, _| Ok(*PROPOSER_ID));
context.expect_virtual_proposer().returning(move |_, _| Ok(*PROPOSER_ID));
context.expect_set_height_and_round().returning(move |_, _| Ok(()));
context.expect_broadcast().returning(move |_| Ok(()));
context
.expect_decision_reached()
.withf(move |_, c| *c == ProposalCommitment(Felt::ZERO))
.return_once(move |_, _| Ok(()));

let mut manager = MultiHeightManager::new(
consensus_config,
QuorumType::Byzantine,
Arc::new(Mutex::new(NoOpHeightVotedStorage)),
)
.await;
let mut subscriber_channels = subscriber_channels.into();
let decision = manager
.run_height(
&mut context,
HEIGHT_0,
&mut subscriber_channels,
&mut proposal_receiver_receiver,
)
.await
.unwrap();
assert_decision(decision, Felt::ZERO, ROUND_0);

// Verify that report_peer was called exactly once (for the equivocation).
assert!(
matches!(reported_messages_receiver.try_next(), Ok(Some(_))),
"Expected report_peer to be called for the equivocation"
);
// The duplicate (identical vote) should not have triggered a report.
assert!(
reported_messages_receiver.try_next().is_err(),
"Expected no additional report_peer calls (duplicate should be silently ignored)"
);
}

#[rstest]
#[tokio::test]
async fn manager_ignores_invalid_network_messages(consensus_config: ConsensusConfig) {
Expand Down
Loading