Skip to content

Commit 64bec70

Browse files
authored
apollo_consensus: Handle errors of p2p state sync update (#11075)
1 parent 9c26b35 commit 64bec70

File tree

3 files changed

+205
-128
lines changed

3 files changed

+205
-128
lines changed

crates/apollo_consensus/src/manager.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -433,8 +433,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
433433
self.wait_until_sync_reaches_height(height, context).await;
434434
RunHeightRes::Sync
435435
}
436-
e @ ConsensusError::BlockInfoConversion(_)
437-
| e @ ConsensusError::InternalNetworkError(_) => {
436+
e @ ConsensusError::InternalNetworkError(_) => {
438437
// The node is missing required components/data and cannot continue
439438
// participating in the consensus. A fix and node restart are required.
440439
return Err(e);

crates/apollo_consensus/src/types.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,4 @@ pub enum ConsensusError {
154154
// As opposed to an error between this node and peer nodes.
155155
#[error("{0}")]
156156
InternalNetworkError(String),
157-
#[error("Block info conversion error: {0}")]
158-
BlockInfoConversion(#[from] starknet_api::StarknetApiError),
159157
}

crates/apollo_consensus_orchestrator/src/sequencer_consensus_context.rs

Lines changed: 204 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::sync::{Arc, Mutex};
1111
use std::time::Duration;
1212

1313
use apollo_batcher_types::batcher_types::{
14+
CentralObjects,
1415
DecisionReachedInput,
1516
DecisionReachedResponse,
1617
ProposalId,
@@ -48,6 +49,7 @@ use futures::future::ready;
4849
use futures::SinkExt;
4950
use starknet_api::block::{
5051
BlockHeaderWithoutHash,
52+
BlockInfo,
5153
BlockNumber,
5254
BlockTimestamp,
5355
GasPrice,
@@ -57,6 +59,8 @@ use starknet_api::block::{
5759
use starknet_api::consensus_transaction::InternalConsensusTransaction;
5860
use starknet_api::core::SequencerContractAddress;
5961
use starknet_api::data_availability::L1DataAvailabilityMode;
62+
use starknet_api::execution_resources::GasAmount;
63+
use starknet_api::state::ThinStateDiff;
6064
use starknet_api::transaction::TransactionHash;
6165
use starknet_api::versioned_constants_logic::VersionedConstantsTrait;
6266
use tokio::task::JoinHandle;
@@ -65,7 +69,7 @@ use tokio_util::task::AbortOnDropHandle;
6569
use tracing::{error, error_span, info, instrument, trace, warn, Instrument};
6670

6771
use crate::build_proposal::{build_proposal, BuildProposalError, ProposalBuildArguments};
68-
use crate::cende::{BlobParameters, CendeContext};
72+
use crate::cende::{BlobParameters, CendeAmbassadorError, CendeContext};
6973
use crate::fee_market::{calculate_next_base_gas_price, FeeMarketInfo};
7074
use crate::metrics::{
7175
record_build_proposal_failure,
@@ -243,6 +247,192 @@ impl SequencerConsensusContext {
243247
None => false,
244248
}
245249
}
250+
251+
async fn update_state_sync_with_new_block(
252+
&self,
253+
height: BlockNumber,
254+
state_diff: &ThinStateDiff,
255+
transactions: &[InternalConsensusTransaction],
256+
block_info: &ConsensusBlockInfo,
257+
cende_block_info: &BlockInfo,
258+
l2_gas_used: GasAmount,
259+
) -> Result<(), StateSyncClientError> {
260+
// Divide transactions hashes to L1Handler and RpcTransaction hashes.
261+
let account_transaction_hashes = transactions
262+
.iter()
263+
.filter_map(|tx| match tx {
264+
InternalConsensusTransaction::RpcTransaction(_) => Some(tx.tx_hash()),
265+
_ => None,
266+
})
267+
.collect::<Vec<TransactionHash>>();
268+
let l1_transaction_hashes = transactions
269+
.iter()
270+
.filter_map(|tx| match tx {
271+
InternalConsensusTransaction::L1Handler(_) => Some(tx.tx_hash()),
272+
_ => None,
273+
})
274+
.collect::<Vec<TransactionHash>>();
275+
276+
let l1_gas_price = GasPricePerToken {
277+
price_in_fri: cende_block_info.gas_prices.strk_gas_prices.l1_gas_price.get(),
278+
price_in_wei: cende_block_info.gas_prices.eth_gas_prices.l1_gas_price.get(),
279+
};
280+
let l1_data_gas_price = GasPricePerToken {
281+
price_in_fri: cende_block_info.gas_prices.strk_gas_prices.l1_data_gas_price.get(),
282+
price_in_wei: cende_block_info.gas_prices.eth_gas_prices.l1_data_gas_price.get(),
283+
};
284+
let l2_gas_price = GasPricePerToken {
285+
price_in_fri: cende_block_info.gas_prices.strk_gas_prices.l2_gas_price.get(),
286+
price_in_wei: cende_block_info.gas_prices.eth_gas_prices.l2_gas_price.get(),
287+
};
288+
let sequencer = SequencerContractAddress(block_info.builder);
289+
290+
let block_header_without_hash = BlockHeaderWithoutHash {
291+
block_number: height,
292+
l1_gas_price,
293+
l1_data_gas_price,
294+
l2_gas_price,
295+
l2_gas_consumed: l2_gas_used,
296+
next_l2_gas_price: self.l2_gas_price,
297+
sequencer,
298+
timestamp: BlockTimestamp(block_info.timestamp),
299+
l1_da_mode: block_info.l1_da_mode,
300+
// TODO(guy.f): Figure out where/if to get the values below from and fill them.
301+
..Default::default()
302+
};
303+
304+
let sync_block = SyncBlock {
305+
state_diff: state_diff.clone(),
306+
account_transaction_hashes,
307+
l1_transaction_hashes,
308+
block_header_without_hash,
309+
};
310+
311+
self.deps.state_sync_client.add_new_block(sync_block).await
312+
}
313+
314+
async fn prepare_blob_for_next_height(
315+
&self,
316+
cende_block_info: BlockInfo,
317+
state_diff: ThinStateDiff,
318+
central_objects: CentralObjects,
319+
transactions: Vec<InternalConsensusTransaction>,
320+
l2_gas_used: GasAmount,
321+
commitment: ProposalCommitment,
322+
) -> Result<(), CendeAmbassadorError> {
323+
// Strip the transaction hashes from `execution_infos`, since we don't use it in the blob
324+
// version of `execution_infos`.
325+
let stripped_execution_infos =
326+
central_objects.execution_infos.into_iter().map(|(_, info)| info).collect();
327+
328+
self.deps
329+
.cende_ambassador
330+
.prepare_blob_for_next_height(BlobParameters {
331+
block_info: cende_block_info,
332+
state_diff,
333+
compressed_state_diff: central_objects.compressed_state_diff,
334+
transactions,
335+
execution_infos: stripped_execution_infos,
336+
bouncer_weights: central_objects.bouncer_weights,
337+
casm_hash_computation_data_sierra_gas: central_objects
338+
.casm_hash_computation_data_sierra_gas,
339+
casm_hash_computation_data_proving_gas: central_objects
340+
.casm_hash_computation_data_proving_gas,
341+
fee_market_info: FeeMarketInfo {
342+
l2_gas_consumed: l2_gas_used,
343+
next_l2_gas_price: self.l2_gas_price,
344+
},
345+
compiled_class_hashes_for_migration: central_objects
346+
.compiled_class_hashes_for_migration,
347+
proposal_commitment: commitment,
348+
parent_proposal_commitment: central_objects
349+
.parent_proposal_commitment
350+
.map(|commitment| ProposalCommitment(commitment.state_diff_commitment.0.0)),
351+
})
352+
.await
353+
}
354+
355+
fn update_l2_gas_price(&mut self, l2_gas_used: GasAmount) {
356+
let gas_target = VersionedConstants::latest_constants().gas_target;
357+
if let Some(override_value) = self.config.override_l2_gas_price_fri {
358+
info!(
359+
"L2 gas price ({}) is not updated, remains on override value of {override_value} \
360+
fri",
361+
self.l2_gas_price.0
362+
);
363+
self.l2_gas_price = GasPrice(override_value);
364+
} else {
365+
self.l2_gas_price =
366+
calculate_next_base_gas_price(self.l2_gas_price, l2_gas_used, gas_target);
367+
}
368+
369+
let gas_price_u64 = u64::try_from(self.l2_gas_price.0).unwrap_or(u64::MAX);
370+
CONSENSUS_L2_GAS_PRICE.set_lossy(gas_price_u64);
371+
}
372+
373+
async fn finalize_decision(
374+
&mut self,
375+
height: BlockNumber,
376+
block_info: &ConsensusBlockInfo,
377+
commitment: ProposalCommitment,
378+
// Accepts transactions as a vector of batches, as stored in the `BuiltProposals` map.
379+
transactions: Vec<Vec<InternalConsensusTransaction>>,
380+
decision_reached_response: DecisionReachedResponse,
381+
) {
382+
let DecisionReachedResponse { state_diff, l2_gas_used, central_objects } =
383+
decision_reached_response;
384+
385+
self.update_l2_gas_price(l2_gas_used);
386+
387+
// Remove transactions that were not accepted by the Batcher, so `transactions` and
388+
// `central_objects.execution_infos` correspond to the same list of (only accepted)
389+
// transactions.
390+
let transactions: Vec<InternalConsensusTransaction> = transactions
391+
.concat()
392+
.into_iter()
393+
.filter(|tx| central_objects.execution_infos.contains_key(&tx.tx_hash()))
394+
.collect();
395+
396+
// The conversion should never fail, if we already managed to get a decision.
397+
let Ok(cende_block_info) = convert_to_sn_api_block_info(block_info) else {
398+
warn!(
399+
"Failed to convert block info to SN API block info at height {height}: \
400+
{block_info:?}"
401+
);
402+
return;
403+
};
404+
405+
if let Err(e) = self
406+
.update_state_sync_with_new_block(
407+
height,
408+
&state_diff,
409+
&transactions,
410+
block_info,
411+
&cende_block_info,
412+
l2_gas_used,
413+
)
414+
.await
415+
{
416+
// TODO(Shahak): Decide how to handle this error once p2p state sync is
417+
// production-ready. At this point, the block has already been committed to
418+
// the state.
419+
warn!("Failed to update state sync with new block at height {height}: {e:?}");
420+
}
421+
422+
if let Err(e) = self
423+
.prepare_blob_for_next_height(
424+
cende_block_info,
425+
state_diff,
426+
central_objects,
427+
transactions,
428+
l2_gas_used,
429+
commitment,
430+
)
431+
.await
432+
{
433+
error!("Failed to prepare blob for next height at height {height}: {e:?}");
434+
}
435+
}
246436
}
247437

248438
#[async_trait]
@@ -504,123 +694,24 @@ impl ConsensusContext for SequencerConsensusContext {
504694
proposals.remove_proposals_below_or_at_height(&height);
505695
}
506696

507-
let DecisionReachedResponse { state_diff, l2_gas_used, central_objects } =
697+
let decision_reached_response =
508698
self.deps.batcher.decision_reached(DecisionReachedInput { proposal_id }).await?;
509699

510-
// Remove transactions that were not accepted by the Batcher, so `transactions` and
511-
// `central_objects.execution_infos` correspond to the same list of (only accepted)
512-
// transactions.
513-
let transactions: Vec<InternalConsensusTransaction> = transactions
514-
.concat()
515-
.into_iter()
516-
.filter(|tx| central_objects.execution_infos.contains_key(&tx.tx_hash()))
517-
.collect();
518-
519-
let gas_target = VersionedConstants::latest_constants().gas_target;
520-
if let Some(override_value) = self.config.override_l2_gas_price_fri {
521-
info!(
522-
"L2 gas price ({}) is not updated, remains on override value of {override_value} \
523-
fri",
524-
self.l2_gas_price.0
525-
);
526-
self.l2_gas_price = GasPrice(override_value);
527-
} else {
528-
self.l2_gas_price =
529-
calculate_next_base_gas_price(self.l2_gas_price, l2_gas_used, gas_target);
530-
}
531-
532-
let gas_price_u64 = u64::try_from(self.l2_gas_price.0).unwrap_or(u64::MAX);
533-
CONSENSUS_L2_GAS_PRICE.set_lossy(gas_price_u64);
534-
535-
// The conversion should never fail, if we already managed to get a decision.
536-
let cende_block_info = convert_to_sn_api_block_info(&block_info)?;
537-
let l1_gas_price = GasPricePerToken {
538-
price_in_fri: cende_block_info.gas_prices.strk_gas_prices.l1_gas_price.get(),
539-
price_in_wei: cende_block_info.gas_prices.eth_gas_prices.l1_gas_price.get(),
540-
};
541-
let l1_data_gas_price = GasPricePerToken {
542-
price_in_fri: cende_block_info.gas_prices.strk_gas_prices.l1_data_gas_price.get(),
543-
price_in_wei: cende_block_info.gas_prices.eth_gas_prices.l1_data_gas_price.get(),
544-
};
545-
let l2_gas_price = GasPricePerToken {
546-
price_in_fri: cende_block_info.gas_prices.strk_gas_prices.l2_gas_price.get(),
547-
price_in_wei: cende_block_info.gas_prices.eth_gas_prices.l2_gas_price.get(),
548-
};
549-
let sequencer = SequencerContractAddress(block_info.builder);
550-
551-
let block_header_without_hash = BlockHeaderWithoutHash {
552-
block_number: height,
553-
l1_gas_price,
554-
l1_data_gas_price,
555-
l2_gas_price,
556-
l2_gas_consumed: l2_gas_used,
557-
next_l2_gas_price: self.l2_gas_price,
558-
sequencer,
559-
timestamp: BlockTimestamp(block_info.timestamp),
560-
l1_da_mode: block_info.l1_da_mode,
561-
// TODO(guy.f): Figure out where/if to get the values below from and fill them.
562-
..Default::default()
563-
};
564-
565-
// Divide transactions hashes to L1Handler and RpcTransaction hashes.
566-
let account_transaction_hashes = transactions
567-
.iter()
568-
.filter_map(|tx| match tx {
569-
InternalConsensusTransaction::RpcTransaction(_) => Some(tx.tx_hash()),
570-
_ => None,
571-
})
572-
.collect::<Vec<TransactionHash>>();
573-
let l1_transaction_hashes = transactions
574-
.iter()
575-
.filter_map(|tx| match tx {
576-
InternalConsensusTransaction::L1Handler(_) => Some(tx.tx_hash()),
577-
_ => None,
578-
})
579-
.collect::<Vec<TransactionHash>>();
580-
581-
let sync_block = SyncBlock {
582-
state_diff: state_diff.clone(),
583-
account_transaction_hashes,
584-
l1_transaction_hashes,
585-
block_header_without_hash,
586-
};
587-
self.sync_add_new_block(sync_block).await;
700+
// CRITICAL: The block is now committed. This function must not fail beyond this point
701+
// unless the state is fully reverted, otherwise the node will be left in an
702+
// inconsistent state.
588703

589-
// Strip the transaction hashes from `execution_infos`, since we don't use it in the blob
590-
// version of `execution_infos`.
591-
let stripped_execution_infos =
592-
central_objects.execution_infos.into_iter().map(|(_, info)| info).collect();
704+
self.finalize_decision(
705+
height,
706+
&block_info,
707+
commitment,
708+
transactions,
709+
decision_reached_response,
710+
)
711+
.await;
593712

594-
let _ = self
595-
.deps
596-
.cende_ambassador
597-
.prepare_blob_for_next_height(BlobParameters {
598-
block_info: cende_block_info,
599-
state_diff,
600-
compressed_state_diff: central_objects.compressed_state_diff,
601-
transactions,
602-
execution_infos: stripped_execution_infos,
603-
bouncer_weights: central_objects.bouncer_weights,
604-
casm_hash_computation_data_sierra_gas: central_objects
605-
.casm_hash_computation_data_sierra_gas,
606-
casm_hash_computation_data_proving_gas: central_objects
607-
.casm_hash_computation_data_proving_gas,
608-
fee_market_info: FeeMarketInfo {
609-
l2_gas_consumed: l2_gas_used,
610-
next_l2_gas_price: self.l2_gas_price,
611-
},
612-
compiled_class_hashes_for_migration: central_objects
613-
.compiled_class_hashes_for_migration,
614-
proposal_commitment: commitment,
615-
parent_proposal_commitment: central_objects
616-
.parent_proposal_commitment
617-
.map(|commitment| ProposalCommitment(commitment.state_diff_commitment.0.0)),
618-
})
619-
.await
620-
.inspect_err(|e| {
621-
error!("Failed to prepare blob for next height: {e:?}");
622-
});
623713
self.previous_block_info = Some(block_info);
714+
624715
Ok(())
625716
}
626717

@@ -803,17 +894,6 @@ impl SequencerConsensusContext {
803894
handle.await.expect("Proposal task failed, propagating panic");
804895
}
805896
}
806-
807-
// `add_new_block` returns immediately, it doesn't wait for sync to fully process the block.
808-
async fn sync_add_new_block(&mut self, sync_block: SyncBlock) {
809-
// TODO(Dafna): Properly handle errors. Not all errors should be propagated as panics. We
810-
// should have a way to report an error and continue to the next height.
811-
self.deps
812-
.state_sync_client
813-
.add_new_block(sync_block.clone())
814-
.await
815-
.expect("Failed to add new block due to sync error: {e:?}");
816-
}
817897
}
818898

819899
async fn validate_and_send(

0 commit comments

Comments
 (0)