Skip to content

Commit 51b8a2f

Browse files
apollo_consensus: fix ensure BroadcastVote is sent before DecisionReached processing
1 parent fd180dd commit 51b8a2f

File tree

5 files changed

+235
-202
lines changed

5 files changed

+235
-202
lines changed

crates/apollo_consensus/src/manager.rs

Lines changed: 45 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::metrics::{
4242
CONSENSUS_PROPOSALS_RECEIVED,
4343
CONSENSUS_REPROPOSALS,
4444
};
45-
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
45+
use crate::single_height_consensus::{Requests, SingleHeightConsensus};
4646
use crate::state_machine::{SMRequest, StateMachineEvent, Step};
4747
use crate::storage::HeightVotedStorageTrait;
4848
use crate::types::{
@@ -547,49 +547,29 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
547547
self.cache.report_cached_votes_metric(height);
548548
let mut pending_requests = {
549549
let leader_fn = make_leader_fn(context, height);
550-
match shc.start(&leader_fn) {
551-
ShcReturn::Decision(decision) => {
552-
// Start should generate either StartValidateProposal (validator) or
553-
// StartBuildProposal (proposer). We do not enforce this
554-
// since the Manager is intentionally not meant to
555-
// understand consensus in detail.
556-
return Ok(Some(decision));
557-
}
558-
ShcReturn::Requests(requests) => requests,
559-
}
550+
shc.start(&leader_fn)
560551
};
561552

562553
let cached_proposals = self.cache.get_current_height_proposals(height);
563554
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
564555
for (init, content_receiver) in cached_proposals {
565-
match self
566-
.handle_proposal_known_init(context, height, shc, init, content_receiver)
567-
.await
568-
{
569-
ShcReturn::Decision(decision) => {
570-
return Ok(Some(decision));
571-
}
572-
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
573-
}
556+
let new_requests =
557+
self.handle_proposal_known_init(context, height, shc, init, content_receiver).await;
558+
pending_requests.extend(new_requests);
574559
}
575560

576561
let cached_votes = self.cache.get_current_height_votes(height);
577562
trace!("Cached votes for height {}: {:?}", height, cached_votes);
578563
for msg in cached_votes {
579564
let leader_fn = make_leader_fn(context, height);
580-
match shc.handle_vote(&leader_fn, msg) {
581-
ShcReturn::Decision(decision) => {
582-
return Ok(Some(decision));
583-
}
584-
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
585-
}
565+
let new_requests = shc.handle_vote(&leader_fn, msg);
566+
pending_requests.extend(new_requests);
586567
}
587568

588569
// Reflect initial height/round to context before executing requests.
589570
context.set_height_and_round(height, shc.current_round()).await;
590571
self.execute_requests(context, height, pending_requests, shc_events, broadcast_channels)
591-
.await?;
592-
Ok(None)
572+
.await
593573
}
594574

595575
/// Main consensus loop: handles incoming proposals, votes, events, and sync checks.
@@ -636,18 +616,11 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
636616
};
637617
// Reflect current height/round to context.
638618
context.set_height_and_round(height, shc.current_round()).await;
639-
match shc_return {
640-
ShcReturn::Decision(decision) => return Ok(RunHeightRes::Decision(decision)),
641-
ShcReturn::Requests(requests) => {
642-
self.execute_requests(
643-
context,
644-
height,
645-
requests,
646-
shc_events,
647-
broadcast_channels,
648-
)
649-
.await?;
650-
}
619+
if let Some(decision) = self
620+
.execute_requests(context, height, shc_return, shc_events, broadcast_channels)
621+
.await?
622+
{
623+
return Ok(RunHeightRes::Decision(decision));
651624
}
652625
}
653626
}
@@ -689,7 +662,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
689662
height: BlockNumber,
690663
shc: Option<&mut SingleHeightConsensus>,
691664
content_receiver: Option<mpsc::Receiver<ContextT::ProposalPart>>,
692-
) -> Result<ShcReturn, ConsensusError> {
665+
) -> Result<Requests, ConsensusError> {
693666
CONSENSUS_PROPOSALS_RECEIVED.increment(1);
694667
// Get the first message to verify the init was sent.
695668
let Some(mut content_receiver) = content_receiver else {
@@ -723,11 +696,11 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
723696
// When moving to version 1.0 make sure this is addressed.
724697
self.cache.cache_future_proposal(proposal_init, content_receiver);
725698
}
726-
Ok(ShcReturn::Requests(VecDeque::new()))
699+
Ok(VecDeque::new())
727700
}
728701
std::cmp::Ordering::Less => {
729702
trace!("Drop proposal from past height. {:?}", proposal_init);
730-
Ok(ShcReturn::Requests(VecDeque::new()))
703+
Ok(VecDeque::new())
731704
}
732705
std::cmp::Ordering::Equal => match shc {
733706
Some(shc) => {
@@ -746,12 +719,12 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
746719
)
747720
.await)
748721
} else {
749-
Ok(ShcReturn::Requests(VecDeque::new()))
722+
Ok(VecDeque::new())
750723
}
751724
}
752725
None => {
753726
trace!("Drop proposal from just completed height. {:?}", proposal_init);
754-
Ok(ShcReturn::Requests(VecDeque::new()))
727+
Ok(VecDeque::new())
755728
}
756729
},
757730
}
@@ -764,7 +737,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
764737
shc: &mut SingleHeightConsensus,
765738
proposal_init: ProposalInit,
766739
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
767-
) -> ShcReturn {
740+
) -> Requests {
768741
// Store the stream; requests will reference it by (height, round)
769742
self.current_height_proposals_streams
770743
.insert((height, proposal_init.round), content_receiver);
@@ -781,7 +754,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
781754
shc: Option<&mut SingleHeightConsensus>,
782755
vote: Option<(Result<Vote, ProtobufConversionError>, BroadcastedMessageMetadata)>,
783756
broadcast_channels: &mut BroadcastVoteChannel,
784-
) -> Result<ShcReturn, ConsensusError> {
757+
) -> Result<Requests, ConsensusError> {
785758
let message = match vote {
786759
None => Err(ConsensusError::InternalNetworkError(
787760
"NetworkReceiver should never be closed".to_string(),
@@ -821,24 +794,24 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
821794
trace!("Cache message for a future height. {:?}", message);
822795
self.cache.cache_future_vote(message);
823796
}
824-
Ok(ShcReturn::Requests(VecDeque::new()))
797+
Ok(VecDeque::new())
825798
}
826799
std::cmp::Ordering::Less => {
827800
trace!("Drop message from past height. {:?}", message);
828-
Ok(ShcReturn::Requests(VecDeque::new()))
801+
Ok(VecDeque::new())
829802
}
830803
std::cmp::Ordering::Equal => match shc {
831804
Some(shc) => {
832805
if self.cache.should_cache_vote(&height, shc.current_round(), &message) {
833806
let leader_fn = make_leader_fn(context, height);
834807
Ok(shc.handle_vote(&leader_fn, message))
835808
} else {
836-
Ok(ShcReturn::Requests(VecDeque::new()))
809+
Ok(VecDeque::new())
837810
}
838811
}
839812
None => {
840813
trace!("Drop message from just completed height. {:?}", message);
841-
Ok(ShcReturn::Requests(VecDeque::new()))
814+
Ok(VecDeque::new())
842815
}
843816
},
844817
}
@@ -851,15 +824,28 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
851824
mut requests: VecDeque<SMRequest>,
852825
shc_events: &mut FuturesUnordered<BoxFuture<'static, StateMachineEvent>>,
853826
broadcast_channels: &mut BroadcastVoteChannel,
854-
) -> Result<(), ConsensusError> {
827+
) -> Result<Option<Decision>, ConsensusError> {
828+
// Extract decision requests to handle after processing all other requests.
829+
let mut decision_request: Option<Decision> = None;
830+
855831
while let Some(request) = requests.pop_front() {
856-
if let Some(fut) =
857-
self.run_request(context, height, request, broadcast_channels).await?
858-
{
859-
shc_events.push(fut);
832+
match request {
833+
SMRequest::DecisionReached(decision) => {
834+
assert!(decision_request.is_none(), "multiple DecisionReached in one batch");
835+
decision_request = Some(decision);
836+
}
837+
_ => {
838+
if let Some(fut) =
839+
self.run_request(context, height, request, broadcast_channels).await?
840+
{
841+
shc_events.push(fut);
842+
}
843+
}
860844
}
861845
}
862-
Ok(())
846+
847+
// Return decision after all other requests are processed.
848+
Ok(decision_request)
863849
}
864850

865851
async fn run_request(
@@ -961,9 +947,8 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
961947
CONSENSUS_REPROPOSALS.increment(1);
962948
Ok(None)
963949
}
964-
SMRequest::DecisionReached(_, _) => {
965-
// Should be handled by SHC, not manager.
966-
unreachable!("Manager received DecisionReached request");
950+
SMRequest::DecisionReached(_) => {
951+
unreachable!("DecisionReached request should be handled in execute_requests");
967952
}
968953
}
969954
}

0 commit comments

Comments
 (0)