Skip to content

Commit d676798

Browse files
authored
apollo_consensus: fallback to sync in case of Batcher errors (#10973)
1 parent 8cbd2d5 commit d676798

File tree

9 files changed

+154
-100
lines changed

9 files changed

+154
-100
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_batcher_types/src/communication.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ pub enum BatcherResponse {
121121
}
122122
impl_debug_for_infra_requests_and_responses!(BatcherResponse);
123123

124-
#[derive(Clone, Debug, Error)]
124+
#[derive(Clone, Debug, Error, PartialEq)]
125125
pub enum BatcherClientError {
126126
#[error(transparent)]
127127
ClientError(#[from] ClientError),

crates/apollo_consensus/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ description = "Reach consensus for Starknet"
1010
testing = ["mockall", "tempfile"]
1111

1212
[dependencies]
13+
apollo_batcher_types.workspace = true
1314
apollo_config_manager_types.workspace = true
1415
apollo_consensus_config.workspace = true
1516
apollo_infra_utils.workspace = true

crates/apollo_consensus/src/manager.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -411,13 +411,38 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
411411
proposals_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
412412
) -> Result<RunHeightRes, ConsensusError> {
413413
info!("Running consensus for height {}.", height);
414-
let res =
415-
self.run_height_inner(context, height, broadcast_channels, proposals_receiver).await?;
416414

417-
// Commit in case of decision.
418-
if let RunHeightRes::Decision(decision) = &res {
419-
context.decision_reached(height, decision.block).await?;
420-
}
415+
let consensus_result =
416+
self.run_height_inner(context, height, broadcast_channels, proposals_receiver).await;
417+
418+
let res = match consensus_result {
419+
Ok(ok) => match ok {
420+
RunHeightRes::Decision(decision) => {
421+
// Commit decision to context.
422+
context.decision_reached(height, decision.block).await?;
423+
RunHeightRes::Decision(decision)
424+
}
425+
RunHeightRes::Sync => RunHeightRes::Sync,
426+
},
427+
428+
Err(err) => match err {
429+
e @ ConsensusError::BatcherError(_) => {
430+
error!(
431+
"Error while running consensus for height {height}, fallback to sync: {e}"
432+
);
433+
self.wait_until_sync_reaches_height(height, context).await;
434+
RunHeightRes::Sync
435+
}
436+
e @ ConsensusError::BlockInfoConversion(_)
437+
| e @ ConsensusError::ProtobufConversionError(_)
438+
| e @ ConsensusError::SendError(_)
439+
| e @ ConsensusError::InternalNetworkError(_) => {
440+
// The node is missing required components/data and cannot continue
441+
// participating in the consensus. A fix and node restart are required.
442+
return Err(e);
443+
}
444+
},
445+
};
421446

422447
// Cleanup after height completion.
423448
self.cleanup_post_height(context, height, broadcast_channels, proposals_receiver).await?;
@@ -586,7 +611,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
586611
}
587612

588613
// Reflect initial height/round to context before executing requests.
589-
context.set_height_and_round(height, shc.current_round()).await;
614+
context.set_height_and_round(height, shc.current_round()).await?;
590615
self.execute_requests(context, height, pending_requests, shc_events, broadcast_channels)
591616
.await?;
592617
Ok(None)
@@ -635,7 +660,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
635660
}
636661
};
637662
// Reflect current height/round to context.
638-
context.set_height_and_round(height, shc.current_round()).await;
663+
context.set_height_and_round(height, shc.current_round()).await?;
639664
match shc_return {
640665
ShcReturn::Decision(decision) => return Ok(RunHeightRes::Decision(decision)),
641666
ShcReturn::Requests(requests) => {
@@ -883,7 +908,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
883908
// (round 0) proposal timeout for building to avoid giving the Batcher more time
884909
// when proposal time is extended for consensus.
885910
let timeout = timeouts.get_proposal_timeout(0);
886-
let receiver = context.build_proposal(init, timeout).await;
911+
let receiver = context.build_proposal(init, timeout).await?;
887912
let fut = async move {
888913
let proposal_id = receiver.await.ok();
889914
StateMachineEvent::FinishedBuilding(proposal_id, round)

crates/apollo_consensus/src/manager_test.rs

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::sync::{Arc, Mutex};
22
use std::time::Duration;
33
use std::vec;
44

5+
use apollo_batcher_types::communication::BatcherClientError;
6+
use apollo_batcher_types::errors::BatcherError;
57
use apollo_config_manager_types::communication::MockConfigManagerClient;
68
use apollo_consensus_config::config::{
79
ConsensusConfig,
@@ -39,7 +41,7 @@ use crate::test_utils::{
3941
NoOpHeightVotedStorage,
4042
TestProposalPart,
4143
};
42-
use crate::types::{Round, ValidatorId};
44+
use crate::types::{ConsensusError, Round, ValidatorId};
4345
use crate::votes_threshold::QuorumType;
4446
use crate::RunConsensusArguments;
4547

@@ -166,7 +168,7 @@ async fn manager_multiple_heights_unordered(consensus_config: ConsensusConfig) {
166168
expect_validate_proposal(&mut context, Felt::ONE, 1);
167169
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
168170
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
169-
context.expect_set_height_and_round().returning(move |_, _| ());
171+
context.expect_set_height_and_round().returning(move |_, _| Ok(()));
170172
context.expect_broadcast().returning(move |_| Ok(()));
171173
context
172174
.expect_decision_reached()
@@ -220,7 +222,7 @@ async fn run_consensus_sync(consensus_config: ConsensusConfig) {
220222
expect_validate_proposal(&mut context, Felt::TWO, 1);
221223
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
222224
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
223-
context.expect_set_height_and_round().returning(move |_, _| ());
225+
context.expect_set_height_and_round().returning(move |_, _| Ok(()));
224226
context.expect_broadcast().returning(move |_| Ok(()));
225227
context
226228
.expect_decision_reached()
@@ -286,7 +288,7 @@ async fn test_timeouts(consensus_config: ConsensusConfig) {
286288
send(&mut sender, precommit(None, HEIGHT_1, ROUND_0, *VALIDATOR_ID_3)).await;
287289

288290
let mut context = MockTestContext::new();
289-
context.expect_set_height_and_round().returning(move |_, _| ());
291+
context.expect_set_height_and_round().returning(move |_, _| Ok(()));
290292
expect_validate_proposal(&mut context, Felt::ONE, 2);
291293
context
292294
.expect_validators()
@@ -444,7 +446,7 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu
444446
expect_validate_proposal(&mut context, Felt::ONE, 1); // Height 1 validation
445447
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
446448
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
447-
context.expect_set_height_and_round().returning(move |_, _| ());
449+
context.expect_set_height_and_round().returning(move |_, _| Ok(()));
448450
// Set up coordination to detect when node votes Nil for height 2 (indicating proposal was
449451
// dropped, so the node didn't received the proposal and votes Nil).
450452
let (height2_nil_vote_trigger, height2_nil_vote_wait) = oneshot::channel();
@@ -583,16 +585,18 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C
583585
.times(1)
584586
.return_once(|_, _| {
585587
round1_trigger.send(()).unwrap();
588+
Ok(())
586589
});
587590
context
588591
.expect_set_height_and_round()
589592
.withf(|height, round| *height == HEIGHT_1 && *round == ROUND_2)
590593
.times(1)
591594
.return_once(|_, _| {
592595
round2_trigger.send(()).unwrap();
596+
Ok(())
593597
});
594598
// Handle all other set_height_and_round calls normally.
595-
context.expect_set_height_and_round().returning(move |_, _| ());
599+
context.expect_set_height_and_round().returning(move |_, _| Ok(()));
596600
context
597601
.expect_decision_reached()
598602
.withf(move |_, c| *c == ProposalCommitment(Felt::ONE))
@@ -667,7 +671,7 @@ async fn run_consensus_dynamic_client_updates_validator_between_heights(
667671
// Context with expectations: H1 we are the validator, learn height via sync; at H2 we are the
668672
// proposer.
669673
let mut context = MockTestContext::new();
670-
context.expect_set_height_and_round().returning(move |_, _| ());
674+
context.expect_set_height_and_round().returning(move |_, _| Ok(()));
671675
context.expect_validators().returning(move |h: BlockNumber| {
672676
if h == HEIGHT_1 { vec![*VALIDATOR_ID] } else { vec![*PROPOSER_ID] }
673677
});
@@ -688,7 +692,7 @@ async fn run_consensus_dynamic_client_updates_validator_between_heights(
688692
.returning(move |_, _| {
689693
let (sender, receiver) = oneshot::channel();
690694
sender.send(ProposalCommitment(Felt::TWO)).unwrap();
691-
receiver
695+
Ok(receiver)
692696
})
693697
.times(1);
694698
// Expect a decision at height 2.
@@ -818,7 +822,7 @@ async fn manager_runs_normally_when_height_is_greater_than_last_voted_height(
818822
expect_validate_proposal(&mut context, Felt::ONE, 1);
819823
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
820824
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
821-
context.expect_set_height_and_round().returning(move |_, _| ());
825+
context.expect_set_height_and_round().returning(move |_, _| Ok(()));
822826
context.expect_broadcast().returning(move |_| Ok(()));
823827
context
824828
.expect_decision_reached()
@@ -921,7 +925,7 @@ async fn writes_voted_height_to_storage(consensus_config: ConsensusConfig) {
921925
context
922926
.expect_validators()
923927
.returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID, *VALIDATOR_ID_2, *VALIDATOR_ID_3]);
924-
context.expect_set_height_and_round().returning(move |_, _| ());
928+
context.expect_set_height_and_round().returning(move |_, _| Ok(()));
925929
context.expect_try_sync().returning(|_| false);
926930

927931
// Set up storage expectation for prevote - must happen before broadcast
@@ -1047,3 +1051,45 @@ async fn writes_voted_height_to_storage(consensus_config: ConsensusConfig) {
10471051

10481052
assert_decision(decision, block_id.0, ROUND_0);
10491053
}
1054+
1055+
#[rstest]
1056+
#[tokio::test]
1057+
async fn manager_fallback_to_sync_on_height_level_errors(consensus_config: ConsensusConfig) {
1058+
let TestSubscriberChannels { mock_network: _mock_network, subscriber_channels } =
1059+
mock_register_broadcast_topic().unwrap();
1060+
1061+
let (mut _proposal_receiver_sender, mut proposal_receiver_receiver) =
1062+
mpsc::channel(CHANNEL_SIZE);
1063+
1064+
let mut context = MockTestContext::new();
1065+
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
1066+
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
1067+
1068+
// Sync should first fail, so consensus will try to run.
1069+
context.expect_try_sync().times(1).returning(|_| false);
1070+
1071+
// Consensus should fail when context.set_height_and_round fails.
1072+
context.expect_set_height_and_round().times(1).returning(move |_, _| {
1073+
Err(ConsensusError::BatcherError(BatcherClientError::BatcherError(
1074+
BatcherError::InternalError,
1075+
)))
1076+
});
1077+
1078+
// Now sync should be called and succeed.
1079+
context.expect_try_sync().withf(move |height| *height == HEIGHT_1).times(1).returning(|_| true);
1080+
1081+
let mut manager = MultiHeightManager::new(
1082+
consensus_config,
1083+
QuorumType::Byzantine,
1084+
Arc::new(Mutex::new(NoOpHeightVotedStorage)),
1085+
);
1086+
let res = manager
1087+
.run_height(
1088+
&mut context,
1089+
HEIGHT_1,
1090+
&mut subscriber_channels.into(),
1091+
&mut proposal_receiver_receiver,
1092+
)
1093+
.await;
1094+
assert_eq!(res, Ok(RunHeightRes::Sync));
1095+
}

crates/apollo_consensus/src/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ mock! {
6868
&mut self,
6969
init: ProposalInit,
7070
timeout: Duration,
71-
) -> oneshot::Receiver<ProposalCommitment>;
71+
) -> Result<oneshot::Receiver<ProposalCommitment>, ConsensusError>;
7272

7373
async fn validate_proposal(
7474
&mut self,
@@ -97,7 +97,7 @@ mock! {
9797

9898
async fn try_sync(&mut self, height: BlockNumber) -> bool;
9999

100-
async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);
100+
async fn set_height_and_round(&mut self, height: BlockNumber, round: Round) -> Result<(), ConsensusError>;
101101
}
102102
}
103103

crates/apollo_consensus/src/types.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
use std::fmt::Debug;
33
use std::time::Duration;
44

5+
use apollo_batcher_types::communication::BatcherClientError;
56
use apollo_network::network_manager::{
67
BroadcastTopicChannels,
78
BroadcastTopicClient,
@@ -57,7 +58,7 @@ pub trait ConsensusContext {
5758
&mut self,
5859
init: ProposalInit,
5960
timeout: Duration,
60-
) -> oneshot::Receiver<ProposalCommitment>;
61+
) -> Result<oneshot::Receiver<ProposalCommitment>, ConsensusError>;
6162

6263
/// This function is called by consensus to validate a block. It expects that this call will
6364
/// return immediately and that context can then stream in the block's content in parallel to
@@ -115,7 +116,11 @@ pub trait ConsensusContext {
115116

116117
/// Update the context with the current height and round.
117118
/// Must be called at the beginning of each height.
118-
async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);
119+
async fn set_height_and_round(
120+
&mut self,
121+
height: BlockNumber,
122+
round: Round,
123+
) -> Result<(), ConsensusError>;
119124
}
120125

121126
#[derive(PartialEq, Debug)]
@@ -147,6 +152,8 @@ pub enum ConsensusError {
147152
ProtobufConversionError(#[from] ProtobufConversionError),
148153
#[error(transparent)]
149154
SendError(#[from] mpsc::SendError),
155+
#[error(transparent)]
156+
BatcherError(#[from] BatcherClientError),
150157
// Indicates an error in communication between consensus and the node's networking component.
151158
// As opposed to an error between this node and peer nodes.
152159
#[error("{0}")]

0 commit comments

Comments
 (0)