Skip to content

Commit 1d4c9c1

Browse files
apollo_consensus: fix ensure BroadcastVote is sent before DecisionReached processing
1 parent d676798 commit 1d4c9c1

File tree

5 files changed

+238
-205
lines changed

5 files changed

+238
-205
lines changed

crates/apollo_consensus/src/manager.rs

Lines changed: 46 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use crate::metrics::{
4242
CONSENSUS_PROPOSALS_RECEIVED,
4343
CONSENSUS_REPROPOSALS,
4444
};
45-
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
45+
use crate::single_height_consensus::{Requests, SingleHeightConsensus};
4646
use crate::state_machine::{SMRequest, StateMachineEvent, Step};
4747
use crate::storage::HeightVotedStorageTrait;
4848
use crate::types::{
@@ -572,49 +572,29 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
572572
self.cache.report_cached_votes_metric(height);
573573
let mut pending_requests = {
574574
let leader_fn = make_leader_fn(context, height);
575-
match shc.start(&leader_fn) {
576-
ShcReturn::Decision(decision) => {
577-
// Start should generate either StartValidateProposal (validator) or
578-
// StartBuildProposal (proposer). We do not enforce this
579-
// since the Manager is intentionally not meant to
580-
// understand consensus in detail.
581-
return Ok(Some(decision));
582-
}
583-
ShcReturn::Requests(requests) => requests,
584-
}
575+
shc.start(&leader_fn)
585576
};
586577

587578
let cached_proposals = self.cache.get_current_height_proposals(height);
588579
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
589580
for (init, content_receiver) in cached_proposals {
590-
match self
591-
.handle_proposal_known_init(context, height, shc, init, content_receiver)
592-
.await
593-
{
594-
ShcReturn::Decision(decision) => {
595-
return Ok(Some(decision));
596-
}
597-
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
598-
}
581+
let new_requests =
582+
self.handle_proposal_known_init(context, height, shc, init, content_receiver).await;
583+
pending_requests.extend(new_requests);
599584
}
600585

601586
let cached_votes = self.cache.get_current_height_votes(height);
602587
trace!("Cached votes for height {}: {:?}", height, cached_votes);
603588
for msg in cached_votes {
604589
let leader_fn = make_leader_fn(context, height);
605-
match shc.handle_vote(&leader_fn, msg) {
606-
ShcReturn::Decision(decision) => {
607-
return Ok(Some(decision));
608-
}
609-
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
610-
}
590+
let new_requests = shc.handle_vote(&leader_fn, msg);
591+
pending_requests.extend(new_requests);
611592
}
612593

613594
// Reflect initial height/round to context before executing requests.
614595
context.set_height_and_round(height, shc.current_round()).await?;
615596
self.execute_requests(context, height, pending_requests, shc_events, broadcast_channels)
616-
.await?;
617-
Ok(None)
597+
.await
618598
}
619599

620600
/// Main consensus loop: handles incoming proposals, votes, events, and sync checks.
@@ -632,7 +612,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
632612
let mut sync_poll_deadline = clock.now() + sync_retry_interval;
633613
loop {
634614
self.cache.report_max_cached_block_number_metric(height);
635-
let shc_return = tokio::select! {
615+
let requests = tokio::select! {
636616
message = broadcast_channels.broadcasted_messages_receiver.next() => {
637617
self.handle_vote(context, height, Some(shc), message, broadcast_channels).await?
638618
},
@@ -661,18 +641,11 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
661641
};
662642
// Reflect current height/round to context.
663643
context.set_height_and_round(height, shc.current_round()).await?;
664-
match shc_return {
665-
ShcReturn::Decision(decision) => return Ok(RunHeightRes::Decision(decision)),
666-
ShcReturn::Requests(requests) => {
667-
self.execute_requests(
668-
context,
669-
height,
670-
requests,
671-
shc_events,
672-
broadcast_channels,
673-
)
674-
.await?;
675-
}
644+
if let Some(decision) = self
645+
.execute_requests(context, height, requests, shc_events, broadcast_channels)
646+
.await?
647+
{
648+
return Ok(RunHeightRes::Decision(decision));
676649
}
677650
}
678651
}
@@ -714,7 +687,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
714687
height: BlockNumber,
715688
shc: Option<&mut SingleHeightConsensus>,
716689
content_receiver: Option<mpsc::Receiver<ContextT::ProposalPart>>,
717-
) -> Result<ShcReturn, ConsensusError> {
690+
) -> Result<Requests, ConsensusError> {
718691
CONSENSUS_PROPOSALS_RECEIVED.increment(1);
719692
// Get the first message to verify the init was sent.
720693
let Some(mut content_receiver) = content_receiver else {
@@ -748,11 +721,11 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
748721
// When moving to version 1.0 make sure this is addressed.
749722
self.cache.cache_future_proposal(proposal_init, content_receiver);
750723
}
751-
Ok(ShcReturn::Requests(VecDeque::new()))
724+
Ok(VecDeque::new())
752725
}
753726
std::cmp::Ordering::Less => {
754727
trace!("Drop proposal from past height. {:?}", proposal_init);
755-
Ok(ShcReturn::Requests(VecDeque::new()))
728+
Ok(VecDeque::new())
756729
}
757730
std::cmp::Ordering::Equal => match shc {
758731
Some(shc) => {
@@ -771,12 +744,12 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
771744
)
772745
.await)
773746
} else {
774-
Ok(ShcReturn::Requests(VecDeque::new()))
747+
Ok(VecDeque::new())
775748
}
776749
}
777750
None => {
778751
trace!("Drop proposal from just completed height. {:?}", proposal_init);
779-
Ok(ShcReturn::Requests(VecDeque::new()))
752+
Ok(VecDeque::new())
780753
}
781754
},
782755
}
@@ -789,7 +762,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
789762
shc: &mut SingleHeightConsensus,
790763
proposal_init: ProposalInit,
791764
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
792-
) -> ShcReturn {
765+
) -> Requests {
793766
// Store the stream; requests will reference it by (height, round)
794767
self.current_height_proposals_streams
795768
.insert((height, proposal_init.round), content_receiver);
@@ -806,7 +779,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
806779
shc: Option<&mut SingleHeightConsensus>,
807780
vote: Option<(Result<Vote, ProtobufConversionError>, BroadcastedMessageMetadata)>,
808781
broadcast_channels: &mut BroadcastVoteChannel,
809-
) -> Result<ShcReturn, ConsensusError> {
782+
) -> Result<Requests, ConsensusError> {
810783
let message = match vote {
811784
None => Err(ConsensusError::InternalNetworkError(
812785
"NetworkReceiver should never be closed".to_string(),
@@ -846,24 +819,24 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
846819
trace!("Cache message for a future height. {:?}", message);
847820
self.cache.cache_future_vote(message);
848821
}
849-
Ok(ShcReturn::Requests(VecDeque::new()))
822+
Ok(VecDeque::new())
850823
}
851824
std::cmp::Ordering::Less => {
852825
trace!("Drop message from past height. {:?}", message);
853-
Ok(ShcReturn::Requests(VecDeque::new()))
826+
Ok(VecDeque::new())
854827
}
855828
std::cmp::Ordering::Equal => match shc {
856829
Some(shc) => {
857830
if self.cache.should_cache_vote(&height, shc.current_round(), &message) {
858831
let leader_fn = make_leader_fn(context, height);
859832
Ok(shc.handle_vote(&leader_fn, message))
860833
} else {
861-
Ok(ShcReturn::Requests(VecDeque::new()))
834+
Ok(VecDeque::new())
862835
}
863836
}
864837
None => {
865838
trace!("Drop message from just completed height. {:?}", message);
866-
Ok(ShcReturn::Requests(VecDeque::new()))
839+
Ok(VecDeque::new())
867840
}
868841
},
869842
}
@@ -876,15 +849,28 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
876849
mut requests: VecDeque<SMRequest>,
877850
shc_events: &mut FuturesUnordered<BoxFuture<'static, StateMachineEvent>>,
878851
broadcast_channels: &mut BroadcastVoteChannel,
879-
) -> Result<(), ConsensusError> {
852+
) -> Result<Option<Decision>, ConsensusError> {
853+
// Extract decision requests to handle after processing all other requests.
854+
let mut decision_request: Option<Decision> = None;
855+
880856
while let Some(request) = requests.pop_front() {
881-
if let Some(fut) =
882-
self.run_request(context, height, request, broadcast_channels).await?
883-
{
884-
shc_events.push(fut);
857+
match request {
858+
SMRequest::DecisionReached(decision) => {
859+
assert!(decision_request.is_none(), "multiple DecisionReached in one batch");
860+
decision_request = Some(decision);
861+
}
862+
_ => {
863+
if let Some(fut) =
864+
self.run_request(context, height, request, broadcast_channels).await?
865+
{
866+
shc_events.push(fut);
867+
}
868+
}
885869
}
886870
}
887-
Ok(())
871+
872+
// Return decision after all other requests are processed.
873+
Ok(decision_request)
888874
}
889875

890876
async fn run_request(
@@ -986,9 +972,8 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
986972
CONSENSUS_REPROPOSALS.increment(1);
987973
Ok(None)
988974
}
989-
SMRequest::DecisionReached(_, _) => {
990-
// Should be handled by SHC, not manager.
991-
unreachable!("Manager received DecisionReached request");
975+
SMRequest::DecisionReached(_) => {
976+
unreachable!("DecisionReached request should be handled in execute_requests");
992977
}
993978
}
994979
}

0 commit comments

Comments
 (0)