Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ context_deserialize = { path = "consensus/context_deserialize/context_deserializ
] }
context_deserialize_derive = { path = "consensus/context_deserialize/context_deserialize_derive" }
criterion = "0.5"
dashmap = "6.1"
delay_map = "0.4"
deposit_contract = { path = "common/deposit_contract" }
derivative = "2"
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ test_backfill = []
alloy-primitives = { workspace = true }
bitvec = { workspace = true }
bls = { workspace = true }
dashmap = { workspace = true }
derivative = { workspace = true }
eth2 = { workspace = true }
eth2_network_config = { workspace = true }
Expand Down
180 changes: 144 additions & 36 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::beacon_proposer_cache::{
BeaconProposerCache, EpochBlockProposers, ensure_state_can_determine_proposers_for_epoch,
};
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::block_status_table::{BlockStatus, BlockStatusTable, Error as BlockStatusTableError};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::POS_PANDA_BANNER;
use crate::block_verification::{
Expand Down Expand Up @@ -475,6 +476,8 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
/// The slot at which blocks are downloaded back to.
pub genesis_backfill_slot: Slot,
//// A table to track the status of blocks during data availability import
pub block_status_table: Arc<BlockStatusTable>,
/// Provides a KZG verification and temporary storage for blocks and blobs as
/// they are collected and combined.
pub data_availability_checker: Arc<DataAvailabilityChecker<T>>,
Expand Down Expand Up @@ -3018,22 +3021,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
blob: GossipVerifiedBlob<T>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let block_root = blob.block_root();
let slot = blob.slot();

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
return Err(BlockError::DuplicateFullyImported(blob.block_root()));
// TODO: if somehow this blob is older than finalization, we need to not insert it - where is this checked?
let status = self
.block_status_table
.get_status_or_insert_pending(block_root, slot)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why insert here?

.map_err(|e| {
BlockError::InternalError(format!(
"BlockStatusTable failed on get_status_or_insert_pending: {:?}",
e
))
})?;

if status.is_past_pending() {
return Err(BlockError::DuplicateFullyImported(block_root));
}

// No need to process and import blobs beyond the PeerDAS epoch.
if self.spec.is_peer_das_enabled_for_epoch(blob.epoch()) {
return Err(BlockError::BlobNotRequired(blob.slot()));
}

// TODO: there appears to be no check on the parent block being in fork choice here..
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's checked here

let Some(parent_block) = fork_choice.get_block(&block_parent_root) else {

// TODO: again.. this could be a check on the block status table rather than fork choice

self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob()));

self.check_gossip_blob_availability_and_import(blob).await
Expand All @@ -3058,16 +3070,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
));
};

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its samples again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
let status = self
.block_status_table
.get_status_or_insert_pending(block_root, slot)
.map_err(|e| {
BlockError::InternalError(format!(
"BlockStatusTable failed on get_status_or_insert_pending: {:?}",
e
))
})?;

// If this block is already available, we don't need to process its columns again.
if status.is_past_pending() {
return Err(BlockError::DuplicateFullyImported(block_root));
}

// TODO: there appears to be no check on the parent block being in fork choice here..
// TODO: again.. this could be a check on the block status table rather than fork choice

self.emit_sse_data_column_sidecar_events(
&block_root,
data_columns.iter().map(|column| column.as_data_column()),
Expand All @@ -3091,13 +3111,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
// TODO: if somehow this blob is older than finalization, we need to not insert it - where is this checked?
let status = self
.block_status_table
.get_status_or_insert_pending(block_root, slot)
.map_err(|e| {
BlockError::InternalError(format!(
"BlockStatusTable failed on get_status_or_insert_pending: {:?}",
e
))
})?;

if status.is_past_pending() {
return Err(BlockError::DuplicateFullyImported(block_root));
}

Expand All @@ -3113,6 +3138,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&parent_root)
{
// TODO: This could be a check on the block status table rather than fork choice
return Err(BlockError::ParentUnknown { parent_root });
}

Expand All @@ -3131,14 +3157,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
let status = self
.block_status_table
.get_status_or_insert_pending(block_root, slot)
.map_err(|e| {
BlockError::InternalError(format!(
"BlockStatusTable failed on get_status_or_insert_pending: {:?}",
e
))
})?;
if status.is_past_pending() {
return Err(BlockError::DuplicateFullyImported(block_root));
}

// TODO do we need to check if the parent block is in fork choice here?
// TODO this could be a check on the block status table rather than fork choice

match &engine_get_blobs_output {
EngineGetBlobsOutput::Blobs(blobs) => {
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob()));
Expand Down Expand Up @@ -3218,13 +3252,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
));
};

// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its columns again.
if self
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
// TODO: if somehow this column is older than finalization, we need to not insert it - where is this checked?
let status = self
.block_status_table
.get_status_or_insert_pending(block_root, slot)
.map_err(|e| {
BlockError::InternalError(format!(
"BlockStatusTable failed on get_status_or_insert_pending: {:?}",
e
))
})?;

// If this block is already available, we don't need to process its columns again. // we don't need to process its columns again.
if status.is_past_pending() {
return Err(BlockError::DuplicateFullyImported(block_root));
}

Expand All @@ -3237,6 +3277,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.fork_choice_read_lock()
.contains_block(&parent_root)
{
// TODO: This could be a check on the block status table rather than fork choice
return Err(BlockError::ParentUnknown { parent_root });
}

Expand Down Expand Up @@ -3340,6 +3381,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<AvailabilityProcessingStatus, BlockError> {
let block_slot = unverified_block.block().slot();

// ensure the block is in the status table
self.block_status_table
.insert_pending(block_root, block_slot);

// Set observed time if not already set. Usually this should be set by gossip or RPC,
// but just in case we set it again here (useful for tests).
if let Some(seen_timestamp) = self.slot_clock.now_duration() {
Expand Down Expand Up @@ -3614,6 +3659,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
engine_get_blobs_output: EngineGetBlobsOutput<T>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let status = self
.block_status_table
.get_status_or_insert_pending(block_root, slot)
.map_err(|e| {
BlockError::InternalError(format!(
"BlockStatusTable failed on get_status_or_insert_pending: {:?}",
e
))
})?;
if status.is_past_pending() {
return Err(BlockError::DuplicateFullyImported(block_root));
}

let availability = match engine_get_blobs_output {
EngineGetBlobsOutput::Blobs(blobs) => {
self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?;
Expand Down Expand Up @@ -3702,8 +3760,58 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
match availability {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block).await
let block_root = block.block_root();
match self.block_status_table.transition(
&block_root,
BlockStatus::Pending,
BlockStatus::Importing,
) {
Ok(_) => {
let result = self.import_available_block(block).await;
if let Ok(AvailabilityProcessingStatus::Imported(_)) = result.as_ref() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is equivalent to a match with a default statement, which have been very problematic in the past as we modify the variants. Can you do a full match on result?

self.block_status_table
.transition(
&block_root,
BlockStatus::Importing,
BlockStatus::Imported,
)
.map_err(|e| {
BlockError::InternalError(format!(
"BlockStatusTable failed on transition: {:?}",
e
))
})?;
} else {
// something went wrong on import, mutate the status table back to pending
// TODO: determine when we would mutate this to Invalid!
self.block_status_table
.transition(
&block_root,
BlockStatus::Importing,
BlockStatus::Pending,
)
.map_err(|e| {
BlockError::InternalError(format!(
"BlockStatusTable failed on transition: {:?}",
e
))
})?;
}

result
}
Err(e) => match e {
// someone beat us to it - the block is already being imported
BlockStatusTableError::InvalidStatusTransition { .. } => {
Err(BlockError::DuplicateFullyImported(block_root))
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not true, this error will trigger if the current status is != Pending. If that status happens to be Invalid you will return DuplicateFullyImported which is incorrect

// other errors should not occur
e => Err(BlockError::InternalError(format!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid default match statement, just match explicitly the variants BlockNotFound / InvalidStatus. Are you sure BlockNotFound can never happen?

"BlockStatusTable failed on transition: {:?}",
e
))),
},
}
}
Availability::MissingComponents(block_root) => Ok(
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
Expand Down
Loading
Loading