diff --git a/Cargo.lock b/Cargo.lock index 352ff779752..d0accea7039 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -862,6 +862,7 @@ dependencies = [ "bitvec 1.0.1", "bls", "criterion", + "dashmap", "derivative", "eth2", "eth2_network_config", @@ -2140,6 +2141,20 @@ dependencies = [ "libc", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core 0.9.10", +] + [[package]] name = "data-encoding" version = "2.8.0" diff --git a/Cargo.toml b/Cargo.toml index 66378a16c46..6f55ada9c7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,6 +129,7 @@ context_deserialize = { path = "consensus/context_deserialize/context_deserializ ] } context_deserialize_derive = { path = "consensus/context_deserialize/context_deserialize_derive" } criterion = "0.5" +dashmap = "6.1" delay_map = "0.4" deposit_contract = { path = "common/deposit_contract" } derivative = "2" diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index dca351cbac6..2e2280c5b96 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -18,6 +18,7 @@ test_backfill = [] alloy-primitives = { workspace = true } bitvec = { workspace = true } bls = { workspace = true } +dashmap = { workspace = true } derivative = { workspace = true } eth2 = { workspace = true } eth2_network_config = { workspace = true } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index afbf3278fe0..939373cbc3a 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -9,6 +9,7 @@ use crate::beacon_proposer_cache::{ BeaconProposerCache, EpochBlockProposers, ensure_state_can_determine_proposers_for_epoch, }; use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::block_status_table::{BlockState, BlockStatus, BlockStatusTable}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::POS_PANDA_BANNER; use crate::block_verification::{ @@ -475,6 +476,8 @@ pub struct BeaconChain { pub validator_monitor: RwLock>, /// The slot at which blocks are downloaded back to. pub genesis_backfill_slot: Slot, + //// A table to track the status of blocks during data availability import + pub block_status_table: Arc, /// Provides a KZG verification and temporary storage for blocks and blobs as /// they are collected and combined. pub data_availability_checker: Arc>, @@ -2927,9 +2930,7 @@ impl BeaconChain { ); return ChainSegmentResult::Failed { imported_blocks, - error: BlockError::AvailabilityCheck( - AvailabilityCheckError::MissingBlobs, - ), + error: AvailabilityCheckError::MissingBlobs.into(), }; } } @@ -3018,15 +3019,25 @@ impl BeaconChain { blob: GossipVerifiedBlob, ) -> Result { let block_root = blob.block_root(); + let slot = blob.slot(); - // If this block has already been imported to forkchoice it must have been available, so - // we don't need to process its blobs again. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - return Err(BlockError::DuplicateFullyImported(blob.block_root())); + // TODO: if somehow this blob is older than finalization, we need to not insert it - where is this checked? + let (_, state) = self + .block_status_table + .insert(block_root, slot, BlockStatus::Seen); + + match state { + BlockState::Invalid(e) => { + return Err(e); + } + BlockState::Valid(status) => { + if status.is_imported() { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + if status.is_past_pending() { + return Err(BlockError::DuplicateImportStatusKnown(block_root, status)); + } + } } // No need to process and import blobs beyond the PeerDAS epoch. @@ -3034,6 +3045,9 @@ impl BeaconChain { return Err(BlockError::BlobNotRequired(blob.slot())); } + // TODO: there appears to be no check on the parent block being in fork choice here.. + // TODO: again.. this could be a check on the block status table rather than fork choice + self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob())); self.check_gossip_blob_availability_and_import(blob).await @@ -3058,16 +3072,28 @@ impl BeaconChain { )); }; - // If this block has already been imported to forkchoice it must have been available, so - // we don't need to process its samples again. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - return Err(BlockError::DuplicateFullyImported(block_root)); + // TODO: if somehow this blob is older than finalization, we need to not insert it - where is this checked? + let (_, state) = self + .block_status_table + .insert(block_root, slot, BlockStatus::Seen); + + match state { + BlockState::Invalid(e) => { + return Err(e); + } + BlockState::Valid(status) => { + if status.is_imported() { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + if status.is_past_pending() { + return Err(BlockError::DuplicateImportStatusKnown(block_root, status)); + } + } } + // TODO: there appears to be no check on the parent block being in fork choice here.. + // TODO: again.. this could be a check on the block status table rather than fork choice + self.emit_sse_data_column_sidecar_events( &block_root, data_columns.iter().map(|column| column.as_data_column()), @@ -3091,14 +3117,23 @@ impl BeaconChain { block_root: Hash256, blobs: FixedBlobSidecarList, ) -> Result { - // If this block has already been imported to forkchoice it must have been available, so - // we don't need to process its blobs again. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - return Err(BlockError::DuplicateFullyImported(block_root)); + // TODO: if somehow this blob is older than finalization, we need to not insert it - where is this checked? + let (_, state) = self + .block_status_table + .insert(block_root, slot, BlockStatus::Seen); + + match state { + BlockState::Invalid(e) => { + return Err(e); + } + BlockState::Valid(status) => { + if status.is_imported() { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + if status.is_past_pending() { + return Err(BlockError::DuplicateImportStatusKnown(block_root, status)); + } + } } // Reject RPC blobs referencing unknown parents. Otherwise we allow potentially invalid data @@ -3113,6 +3148,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&parent_root) { + // TODO: This could be a check on the block status table rather than fork choice return Err(BlockError::ParentUnknown { parent_root }); } @@ -3131,14 +3167,28 @@ impl BeaconChain { ) -> Result { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its blobs again. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - return Err(BlockError::DuplicateFullyImported(block_root)); + // TODO: if somehow this blob is older than finalization, we need to not insert it - where is this checked? + let (_, state) = self + .block_status_table + .insert(block_root, slot, BlockStatus::Seen); + + match state { + BlockState::Invalid(e) => { + return Err(e); + } + BlockState::Valid(status) => { + if status.is_imported() { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + if status.is_past_pending() { + return Err(BlockError::DuplicateImportStatusKnown(block_root, status)); + } + } } + // TODO do we need to check if the parent block is in fork choice here? + // TODO this could be a check on the block status table rather than fork choice + match &engine_get_blobs_output { EngineGetBlobsOutput::Blobs(blobs) => { self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().map(|b| b.as_blob())); @@ -3218,14 +3268,23 @@ impl BeaconChain { )); }; - // If this block has already been imported to forkchoice it must have been available, so - // we don't need to process its columns again. - if self - .canonical_head - .fork_choice_read_lock() - .contains_block(&block_root) - { - return Err(BlockError::DuplicateFullyImported(block_root)); + // TODO: if somehow this block is older than finalization, we need to not insert it - where is this checked? + let (_, state) = self + .block_status_table + .insert(block_root, slot, BlockStatus::Seen); + + match state { + BlockState::Invalid(e) => { + return Err(e); + } + BlockState::Valid(status) => { + if status.is_imported() { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + if status.is_past_pending() { + return Err(BlockError::DuplicateImportStatusKnown(block_root, status)); + } + } } // Reject RPC columns referencing unknown parents. Otherwise we allow potentially invalid data @@ -3237,6 +3296,7 @@ impl BeaconChain { .fork_choice_read_lock() .contains_block(&parent_root) { + // TODO: This could be a check on the block status table rather than fork choice return Err(BlockError::ParentUnknown { parent_root }); } @@ -3340,6 +3400,36 @@ impl BeaconChain { ) -> Result { let block_slot = unverified_block.block().slot(); + // only process a block once + let (_, state) = self + .block_status_table + .insert(block_root, block_slot, BlockStatus::Seen); + match state { + BlockState::Invalid(e) => { + return Err(e); + } + BlockState::Valid(status) => { + if status.is_seen() { + match self.block_status_table.transition( + &block_root, + BlockStatus::Seen, + BlockStatus::Processing, + ) { + // we're the first to process the block; continue + Ok(_) => {} + Err(_) => { + // someone else beat us to it + return Err(BlockError::DuplicateImportStatusKnown(block_root, status)); + } + } + } else if status.is_imported() { + return Err(BlockError::DuplicateFullyImported(block_root)); + } else { + return Err(BlockError::DuplicateImportStatusKnown(block_root, status)); + } + } + } + // Set observed time if not already set. Usually this should be set by gossip or RPC, // but just in case we set it again here (useful for tests). if let Some(seen_timestamp) = self.slot_clock.now_duration() { @@ -3373,6 +3463,7 @@ impl BeaconChain { notify_execution_layer, )?; publish_fn()?; + let block_status_table = chain.block_status_table.clone(); // Record the time it took to complete consensus verification. if let Some(timestamp) = self.slot_clock.now_duration() { @@ -3401,6 +3492,16 @@ impl BeaconChain { .set_time_executed(block_root, block_slot, timestamp) } + // transition block status to pending + block_status_table + .transition(&block_root, BlockStatus::Processing, BlockStatus::Pending) + .map_err(|e| { + BlockError::InternalError(format!( + "BlockStatusTable failed on transition: {:?}", + e + )) + })?; + match executed_block { ExecutedBlock::Available(block) => { self.import_available_block(Box::new(block)).await @@ -3433,6 +3534,7 @@ impl BeaconChain { Ok(status) } Err(BlockError::BeaconChainError(e)) => { + // TODO: process errors here and mutate block status table accordingly! match e.as_ref() { BeaconChainError::TokioJoin(e) => { debug!( @@ -3582,7 +3684,7 @@ impl BeaconChain { header.message.proposer_index, block_root, ) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; + .map_err(|e| BlockError::BeaconChainError(Arc::new(e.into())))?; if let Some(slasher) = self.slasher.as_ref() { slasher.accept_block_header(header); } @@ -3614,6 +3716,25 @@ impl BeaconChain { block_root: Hash256, engine_get_blobs_output: EngineGetBlobsOutput, ) -> Result { + // TODO: if somehow this blob is older than finalization, we need to not insert it - where is this checked? + let (_, state) = self + .block_status_table + .insert(block_root, slot, BlockStatus::Seen); + + match state { + BlockState::Invalid(e) => { + return Err(e); + } + BlockState::Valid(status) => { + if status.is_imported() { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + if status.is_past_pending() { + return Err(BlockError::DuplicateImportStatusKnown(block_root, status)); + } + } + } + let availability = match engine_get_blobs_output { EngineGetBlobsOutput::Blobs(blobs) => { self.check_blobs_for_slashability(block_root, blobs.iter().map(|b| b.as_blob()))?; @@ -3680,7 +3801,7 @@ impl BeaconChain { header.message.proposer_index, block_root, ) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; + .map_err(|e| BlockError::BeaconChainError(Arc::new(e.into())))?; if let Some(slasher) = self.slasher.as_ref() { slasher.accept_block_header(header); } @@ -3689,6 +3810,10 @@ impl BeaconChain { Ok(()) } + /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` + /// + /// An error is returned if the block was unable to be imported. It may be partially imported + /// (i.e., this function is not atomic). /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` /// /// An error is returned if the block was unable to be imported. It may be partially imported @@ -3716,6 +3841,35 @@ impl BeaconChain { self: &Arc, block: Box>, ) -> Result { + // transition block status to importing + let block_root = block.block_root(); + match self.block_status_table.transition( + &block_root, + BlockStatus::Pending, + BlockStatus::Importing, + ) { + Ok(_) => {} + Err(_) => { + // someone else beat us to it + let Some(state) = self.block_status_table.get_state(&block_root) else { + return Err(BlockError::InternalError( + "Block not in status table at import".to_string(), + )); + }; + match state { + BlockState::Valid(status) if status.is_imported() => { + return Err(BlockError::DuplicateFullyImported(block_root)); + } + BlockState::Valid(status) => { + return Err(BlockError::DuplicateImportStatusKnown(block_root, status)); + } + BlockState::Invalid(e) => { + return Err(e); + } + } + } + } + let AvailableExecutedBlock { block, import_data, @@ -3740,6 +3894,7 @@ impl BeaconChain { // TODO(das) record custody column available timestamp + // TODO process errors here and mutate block status table accordingly! let block_root = { // Capture the current span before moving into the blocking task let current_span = tracing::Span::current(); @@ -3762,6 +3917,13 @@ impl BeaconChain { .await?? }; + // transition block status to imported + self.block_status_table + .transition(&block_root, BlockStatus::Importing, BlockStatus::Imported) + .map_err(|e| { + BlockError::InternalError(format!("BlockStatusTable failed on transition: {:?}", e)) + })?; + Ok(AvailabilityProcessingStatus::Imported(block_root)) } @@ -3868,7 +4030,7 @@ impl BeaconChain { payload_verification_status, &self.spec, ) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; + .map_err(|e| BlockError::BeaconChainError(Arc::new(e.into())))?; } // If the block is recent enough and it was not optimistically imported, check to see if it @@ -4065,7 +4227,7 @@ impl BeaconChain { warning = "The database is likely corrupt now, consider --purge-db", "No stored fork choice found to restore from" ); - Err(BlockError::BeaconChainError(Box::new(e))) + Err(BlockError::BeaconChainError(Arc::new(e))) } else { Ok(()) } @@ -4119,7 +4281,7 @@ impl BeaconChain { Provided block root is not a checkpoint.", )) .map_err(|err| { - BlockError::BeaconChainError(Box::new( + BlockError::BeaconChainError(Arc::new( BeaconChainError::WeakSubjectivtyShutdownError(err), )) })?; diff --git a/beacon_node/beacon_chain/src/block_status_table.rs b/beacon_node/beacon_chain/src/block_status_table.rs new file mode 100644 index 00000000000..fc00a8ea756 --- /dev/null +++ b/beacon_node/beacon_chain/src/block_status_table.rs @@ -0,0 +1,938 @@ +use crate::block_verification::BlockError; +use dashmap::DashMap; +use dashmap::mapref::entry::Entry; +use std::sync::Arc; +use std::sync::atomic::{AtomicU8, Ordering}; +use tokio::sync::broadcast; +use types::{Hash256, Slot}; + +#[derive(Debug)] +pub enum Error { + BlockNotFound, + InvalidStatusTransition { + expected: BlockStatus, + found: BlockStatus, + }, + InvalidStatus(u8), + BlockAlreadyInvalid(BlockError), +} + +/// Represents the status of a block during the data availability import process. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[repr(u8)] +pub enum BlockStatus { + /// Block (or blob) has been seen + Seen = 0, + /// Someone is running process_block() on the block + Processing = 1, + /// process_block() finished, now waiting for blobs to be available + Pending = 2, + /// Block is currently being executed + Executing = 3, + /// Block is currently being imported + Importing = 4, + /// Block has been successfully imported + Imported = 5, + /// Block status was invalid (this should never happen) + /// This keeps us from needing to return Results everywhere + Unknown = 255, +} + +impl BlockStatus { + /// Channel capacity - sufficient for all possible state transitions + pub const CHANNEL_CAPACITY: usize = 16; + + /// Converts a u8 value to a Status enum. + pub fn from_u8(value: u8) -> Self { + match value { + 0 => BlockStatus::Seen, + 1 => BlockStatus::Processing, + 2 => BlockStatus::Pending, + 3 => BlockStatus::Executing, + 4 => BlockStatus::Importing, + 5 => BlockStatus::Imported, + _ => BlockStatus::Unknown, + } + } + + /// Converts the Status to its u8 representation. + pub fn to_u8(self) -> u8 { + match self { + BlockStatus::Seen => 0, + BlockStatus::Processing => 1, + BlockStatus::Pending => 2, + BlockStatus::Executing => 3, + BlockStatus::Importing => 4, + BlockStatus::Imported => 5, + BlockStatus::Unknown => 255, + } + } + + pub fn is_seen(&self) -> bool { + self == &BlockStatus::Seen + } + + pub fn is_processing(&self) -> bool { + self == &BlockStatus::Processing + } + + pub fn is_pending(&self) -> bool { + self == &BlockStatus::Pending + } + + pub fn is_past_pending(&self) -> bool { + self > &BlockStatus::Pending + } + + pub fn is_imported(&self) -> bool { + self == &BlockStatus::Imported + } +} + +struct ValidEntry { + status: AtomicU8, + slot: Slot, + sender: broadcast::Sender, +} + +struct InvalidEntry { + slot: Slot, + block_error: BlockError, +} + +enum BlockStatusEntry { + Valid(ValidEntry), + Invalid(InvalidEntry), +} + +#[derive(Clone, Debug)] +pub enum BlockState { + Valid(BlockStatus), + Invalid(BlockError), +} + +impl BlockStatusEntry { + pub fn new(status: BlockStatus, slot: Slot) -> Self { + let (sender, _rx) = broadcast::channel(BlockStatus::CHANNEL_CAPACITY); + Self::Valid(ValidEntry { + status: AtomicU8::new(status.to_u8()), + slot, + sender, + }) + } + + pub fn new_invalid(slot: Slot, block_error: BlockError) -> Self { + Self::Invalid(InvalidEntry { slot, block_error }) + } + + pub fn new_subscribe( + status: BlockStatus, + slot: Slot, + ) -> (Self, broadcast::Receiver) { + let (sender, rx) = broadcast::channel(BlockStatus::CHANNEL_CAPACITY); + ( + Self::Valid(ValidEntry { + status: AtomicU8::new(status.to_u8()), + slot, + sender, + }), + rx, + ) + } + + pub fn state(&self) -> BlockState { + match self { + Self::Valid(entry) => { + BlockState::Valid(BlockStatus::from_u8(entry.status.load(Ordering::Acquire))) + } + Self::Invalid(entry) => BlockState::Invalid(entry.block_error.clone()), + } + } + + pub fn slot(&self) -> Slot { + match self { + Self::Valid(entry) => entry.slot, + Self::Invalid(entry) => entry.slot, + } + } + + // transition to a valid status + pub fn transition(&self, expected: BlockStatus, new: BlockStatus) -> Result<(), Error> { + match self { + Self::Valid(entry) => { + match entry + .status + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |value| { + if value == expected.to_u8() { + Some(new.to_u8()) + } else { + None + } + }) { + Ok(_) => { + // Notify subscribers of the new status + // Ignore send errors - means no active receivers + let _ = entry.sender.send(BlockState::Valid(new)); + Ok(()) + } + Err(actual_value) => Err(Error::InvalidStatusTransition { + expected, + found: BlockStatus::from_u8(actual_value), + }), + } + } + Self::Invalid(entry) => Err(Error::BlockAlreadyInvalid(entry.block_error.clone())), + } + } + + pub fn subscribe(&self) -> (BlockState, broadcast::Receiver) { + match self { + Self::Valid(v) => { + // by subscribing before reading the current status, a race condition is possible where + // the subscribing thread receives a status and then when they await the channel, they receive + // the same status again. This is preferable to the reverse race condition where an old status is + // read and the subscriber doesn't send the update to the latest status. + let rx = v.sender.subscribe(); + let cur = BlockState::Valid(BlockStatus::from_u8(v.status.load(Ordering::Acquire))); + (cur, rx) + } + Self::Invalid(inv) => { + // Provide a terminal, already-invalid stream: send once, then close. + let (tx, rx) = broadcast::channel(1); + let _ = tx.send(BlockState::Invalid(inv.block_error.clone())); + drop(tx); + (BlockState::Invalid(inv.block_error.clone()), rx) + } + } + } +} + +/// A thread-safe status tracking table for Ethereum blocks during data availability import. +/// +/// This table uses DashMap for lock-free concurrent access and AtomicU8 values for +/// efficient atomic operations on status transitions to prevent race conditions during +/// block processing. +pub struct BlockStatusTable { + /// The underlying concurrent hash map storing block status as AtomicU8 + table: DashMap>, +} + +impl BlockStatusTable { + /// Creates a new empty status table. + pub fn new() -> Self { + Self { + table: DashMap::new(), + } + } + + /// Inserts a new block with Pending status. + /// + /// Returns `true` if the block was inserted (i.e., it wasn't already present), + /// `false` if the block already exists in the table. + pub fn insert( + &self, + block_hash: Hash256, + slot: Slot, + status: BlockStatus, + ) -> (bool, BlockState) { + match self.table.entry(block_hash) { + Entry::Occupied(o) => (false, o.get().state()), + Entry::Vacant(entry) => { + entry.insert(Arc::new(BlockStatusEntry::new(status, slot))); + (true, BlockState::Valid(status)) + } + } + } + + pub fn insert_subscribe( + &self, + block_hash: Hash256, + slot: Slot, + status: BlockStatus, + ) -> (bool, BlockState, broadcast::Receiver) { + match self.table.entry(block_hash) { + Entry::Occupied(o) => { + let (state, rx) = o.get().subscribe(); + (false, state, rx) + } + Entry::Vacant(entry) => { + let (status_entry, rx) = BlockStatusEntry::new_subscribe(status, slot); + entry.insert(Arc::new(status_entry)); + (true, BlockState::Valid(status), rx) + } + } + } + + /// Atomically compares and swaps the status of a block. + /// + /// This operation is atomic and will only succeed if the current status + /// matches the expected status. Returns `Ok(())` if the swap succeeded, + /// `Err` otherwise. + /// + /// # Arguments + /// * `block_hash` - The hash of the block to update + /// * `expected` - The expected current status + /// * `new` - The new status to set + pub fn transition( + &self, + block_hash: &Hash256, + expected: BlockStatus, + new: BlockStatus, + ) -> Result<(), Error> { + let entry = self + .table + .get(block_hash) + // clone the arc and drop the dashmap guard quickly + .map(|guard| guard.clone()) + .ok_or(Error::BlockNotFound)?; + + entry.transition(expected, new) + } + + /// Marks a block as invalid. + /// + /// This method will transition the block to Invalid and store the error. + pub fn mark_invalid(&self, block_hash: Hash256, block_error: BlockError) -> Result<(), Error> { + match self.table.entry(block_hash) { + Entry::Occupied(mut o) => { + match o.get().as_ref() { + BlockStatusEntry::Valid(entry) => { + let tx = entry.sender.clone(); + // send the invalid state to the subscribers + let _ = tx.send(BlockState::Invalid(block_error.clone())); + // swap the valid entry for an invalid entry + o.insert(Arc::new(BlockStatusEntry::new_invalid( + entry.slot, + block_error, + ))); + Ok(()) + } + BlockStatusEntry::Invalid(entry) => { + Err(Error::BlockAlreadyInvalid(entry.block_error.clone())) + } + } + } + Entry::Vacant(_) => Err(Error::BlockNotFound), + } + } + + /// Subscribe to status updates for a specific block. + /// Returns None if the block is not being tracked. + /// + /// Late subscribers will receive all future status changes but not past ones. + pub fn subscribe( + &self, + block_hash: &Hash256, + ) -> Option<(BlockState, broadcast::Receiver)> { + let entry = self + .table + .get(block_hash) + // clone the arc and drop the dashmap guard quickly + .map(|guard| guard.clone())?; + + Some(entry.subscribe()) + } + + /// Gets the current status of a block. + /// + /// Returns `None` if the block is not tracked in the table. + pub fn get_state(&self, block_hash: &Hash256) -> Option { + let entry = self + .table + .get(block_hash) + // clone the arc and quickly drop dashmap guard + .map(|guard| guard.clone())?; + + Some(entry.state()) + } + + /// Removes a block from the status table. + /// + /// Returns the previous status if the block was present, `None` otherwise. + pub fn remove(&self, block_hash: &Hash256) -> Option { + // Remove entry from table + self.table + .remove(block_hash) + .map(|(_, entry)| entry.state()) + } + + /// Prunes entries with a slot lower than the given slot. + pub fn prune_finalized(&self, finalized_slot: Slot) { + self.table.retain(|_, entry| entry.slot() >= finalized_slot); + } + + /// Returns the number of entries currently in the table. + /// + /// Note: This is an approximate count that may be slightly inaccurate + /// due to concurrent operations, but is useful for monitoring and debugging. + pub fn len(&self) -> usize { + self.table.len() + } + + /// Returns true if the table is empty. + pub fn is_empty(&self) -> bool { + self.table.is_empty() + } + + /// Clears all entries from the table. + /// + /// This method should be used with caution as it removes all tracking information. + pub fn clear(&self) { + self.table.clear(); + } +} + +impl Default for BlockStatusTable { + fn default() -> Self { + Self::new() + } +} + +/* +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + + /// Helper function to create a test block hash + fn test_hash(n: u64) -> Hash256 { + // this needs to be padded to 32 bytes + let mut bytes = [0; 32]; + bytes[0..8].copy_from_slice(&n.to_be_bytes()); + Hash256::from_slice(&bytes) + } + + #[test] + fn test_new_status_table() { + let table = BlockStatusTable::new(); + assert!(table.is_empty()); + assert_eq!(table.len(), 0); + } + + #[test] + fn test_insert() { + let table = BlockStatusTable::new(); + let hash = test_hash(1); + let slot = Slot::new(1); + + // First insertion should succeed + let (inserted, status) = table.insert(hash, slot, BlockStatus::Pending); + assert!(inserted); + assert_eq!(status, BlockStatus::Pending); + assert_eq!(table.get_status(&hash), Some(BlockStatus::Pending)); + assert_eq!(table.len(), 1); + + // Second insertion of same hash should fail + let (inserted, status) = table.insert(hash, slot, BlockStatus::Pending); + assert!(!inserted); + assert_eq!(status, BlockStatus::Pending); + assert_eq!(table.len(), 1); + } + + #[test] + fn test_get_status() { + let table = BlockStatusTable::new(); + let hash = test_hash(1); + let slot = Slot::new(1); + + // Non-existent hash should return None + assert_eq!(table.get_status(&hash), None); + + // Insert and verify + table.insert(hash, slot, BlockStatus::Pending); + assert_eq!(table.get_status(&hash), Some(BlockStatus::Pending)); + } + + #[test] + fn test_transition() { + let table = BlockStatusTable::new(); + let hash = test_hash(1); + let slot = Slot::new(1); + + // CAS on non-existent entry should fail + assert!( + table + .transition(&hash, BlockStatus::Pending, BlockStatus::Executing) + .is_err() + ); + + // Insert entry + table.insert(hash, slot, BlockStatus::Pending); + + // Successful CAS + assert!( + table + .transition(&hash, BlockStatus::Pending, BlockStatus::Executing) + .is_ok() + ); + assert_eq!(table.get_status(&hash), Some(BlockStatus::Executing)); + + // Failed CAS with wrong expected value + assert!( + table + .transition(&hash, BlockStatus::Pending, BlockStatus::Invalid) + .is_err() + ); + assert_eq!(table.get_status(&hash), Some(BlockStatus::Executing)); + + // Successful CAS to terminal state + assert!( + table + .transition(&hash, BlockStatus::Executing, BlockStatus::Imported) + .is_ok() + ); + assert_eq!(table.get_status(&hash), Some(BlockStatus::Imported)); + } + + #[test] + fn test_remove() { + let table = BlockStatusTable::new(); + let hash = test_hash(1); + let slot = Slot::new(1); + + // Remove non-existent entry + assert_eq!(table.remove(&hash), None); + + // Insert and remove + table.insert(hash, slot, BlockStatus::Pending); + assert_eq!(table.len(), 1); + assert_eq!(table.remove(&hash), Some(BlockStatus::Pending)); + assert_eq!(table.len(), 0); + assert_eq!(table.get_status(&hash), None); + } + + #[test] + fn test_status_methods() { + assert!(BlockStatus::Invalid.is_terminal()); + assert!(BlockStatus::Imported.is_terminal()); + assert!(!BlockStatus::Pending.is_terminal()); + assert!(!BlockStatus::Executing.is_terminal()); + assert!(!BlockStatus::Importing.is_terminal()); + + assert!(BlockStatus::Executing.is_active()); + assert!(BlockStatus::Importing.is_active()); + assert!(!BlockStatus::Pending.is_active()); + assert!(!BlockStatus::Invalid.is_active()); + assert!(!BlockStatus::Imported.is_active()); + } + + #[test] + fn test_clear() { + let table = BlockStatusTable::new(); + let hash1 = test_hash(1); + let hash2 = test_hash(2); + let slot1 = Slot::new(1); + let slot2 = Slot::new(2); + + table.insert(hash1, slot1, BlockStatus::Pending); + table.insert(hash2, slot2, BlockStatus::Pending); + assert_eq!(table.len(), 2); + + table.clear(); + assert!(table.is_empty()); + assert_eq!(table.len(), 0); + assert_eq!(table.get_status(&hash1), None); + assert_eq!(table.get_status(&hash2), None); + } + + #[test] + fn test_prune_finalized() { + let table = BlockStatusTable::new(); + let blocks = 32usize; + for i in 0..blocks { + let hash = test_hash(i as u64); + let slot = Slot::new(i as u64); + table.insert(hash, slot, BlockStatus::Pending); + } + table.prune_finalized(Slot::new(16)); + assert_eq!(table.len(), 16); + for i in 0..blocks { + let hash = test_hash(i as u64); + if i < 16 { + assert_eq!(table.get_status(&hash), None); + } else { + assert_eq!(table.get_status(&hash), Some(BlockStatus::Pending)); + } + } + } + + #[test] + fn test_concurrent_access() { + let table = Arc::new(BlockStatusTable::new()); + let num_threads = 10; + let operations_per_thread = 100; + + let mut handles = Vec::new(); + + // Spawn threads that perform concurrent operations + for thread_id in 0..num_threads { + let table_clone = Arc::clone(&table); + let handle = thread::spawn(move || { + for op_id in 0..operations_per_thread { + let hash = test_hash((thread_id * operations_per_thread + op_id) as u64); + let slot = Slot::new(1); + + // Insert + table_clone.insert(hash, slot, BlockStatus::Pending); + + // Try to transition through states + assert!( + table_clone + .transition(&hash, BlockStatus::Pending, BlockStatus::Executing,) + .is_ok() + ); + + // Small delay to increase chance of contention + thread::sleep(Duration::from_nanos(1)); + + assert!( + table_clone + .transition(&hash, BlockStatus::Executing, BlockStatus::Imported,) + .is_ok() + ); + } + }); + handles.push(handle); + } + + // Wait for all threads to complete + for handle in handles { + handle.join().expect("Thread should complete successfully"); + } + + // Verify final state + assert_eq!(table.len(), num_threads * operations_per_thread); + } + + #[test] + fn test_atomic_operations_performance() { + let table = BlockStatusTable::new(); + let hash = test_hash(1); + let slot = Slot::new(1); + + // Test that atomic operations work correctly + table.insert(hash, slot, BlockStatus::Pending); + + // Test multiple rapid transitions + for _ in 0..1000 { + assert!( + table + .transition(&hash, BlockStatus::Pending, BlockStatus::Executing) + .is_ok() + ); + assert!( + table + .transition(&hash, BlockStatus::Executing, BlockStatus::Pending) + .is_ok() + ); + } + + // Verify final state + assert_eq!(table.get_status(&hash), Some(BlockStatus::Pending)); + } + + #[test] + fn test_status_conversion() { + // Test all valid conversions + assert_eq!(BlockStatus::from_u8(0), Ok(BlockStatus::Seen)); + assert_eq!(BlockStatus::from_u8(1), Ok(BlockStatus::Processing)); + assert_eq!(BlockStatus::from_u8(2), Ok(BlockStatus::Pending)); + assert_eq!(BlockStatus::from_u8(3), Ok(BlockStatus::Executing)); + assert_eq!(BlockStatus::from_u8(4), Ok(BlockStatus::Invalid)); + assert_eq!(BlockStatus::from_u8(5), Ok(BlockStatus::Importing)); + assert_eq!(BlockStatus::from_u8(6), Ok(BlockStatus::Imported)); + assert_eq!(BlockStatus::from_u8(7), Err(Error::InvalidStatus(7))); + assert_eq!(BlockStatus::from_u8(255), Err(Error::InvalidStatus(255))); + + // Test round-trip conversions + for &status in &[ + BlockStatus::Seen, + BlockStatus::Processing, + BlockStatus::Pending, + BlockStatus::Executing, + BlockStatus::Invalid, + BlockStatus::Importing, + BlockStatus::Imported, + ] { + assert_eq!(BlockStatus::from_u8(status.to_u8()), Ok(status)); + } + } + + #[test] + fn test_concurrent_cas_contention() { + let table = Arc::new(BlockStatusTable::new()); + let hash = test_hash(1); + let slot = Slot::new(1); + table.insert(hash, slot, BlockStatus::Pending); + + let num_threads = 10; + let operations_per_thread = 100; + let mut handles = Vec::new(); + + // Spawn threads that compete to transition the same block + for _thread_id in 0..num_threads { + let table_clone = Arc::clone(&table); + let handle = thread::spawn(move || { + let mut successful_cas = 0; + for _ in 0..operations_per_thread { + // Try to transition from Pending to Executing + if table_clone + .transition(&hash, BlockStatus::Pending, BlockStatus::Executing) + .is_ok() + { + successful_cas += 1; + // Immediately transition back to allow other threads to compete + assert!( + table_clone + .transition(&hash, BlockStatus::Executing, BlockStatus::Pending,) + .is_ok() + ); + } + } + successful_cas + }); + handles.push(handle); + } + + // Collect results + let mut total_successful = 0; + for handle in handles { + total_successful += handle.join().expect("Thread should complete"); + } + + // Each thread should have had some successful CAS operations + // The exact number depends on scheduling, but should be reasonable + assert!(total_successful > 0); + assert!(total_successful <= num_threads * operations_per_thread); + + // Final state should be consistent + let final_status = table.get_status(&hash); + assert!( + final_status == Some(BlockStatus::Pending) + || final_status == Some(BlockStatus::Executing) + ); + } + + #[tokio::test] + async fn test_subscribe_notifications() { + let table = Arc::new(BlockStatusTable::new()); + let hash = test_hash(1); + let slot = Slot::new(1); + + // Insert pending block + table.insert(hash, slot, BlockStatus::Pending); + + // Subscribe to updates + let (current_status, mut rx) = table.subscribe(&hash).expect("Should be able to subscribe"); + assert_eq!(current_status, BlockStatus::Pending); + + // Transition to Executing + assert!( + table + .transition(&hash, BlockStatus::Pending, BlockStatus::Executing) + .is_ok() + ); + + // Should receive notification + let status = rx.recv().await.expect("Should receive notification"); + assert_eq!(status, BlockStatus::Executing); + + // Transition to Importing + assert!( + table + .transition(&hash, BlockStatus::Executing, BlockStatus::Importing) + .is_ok() + ); + + let status = rx.recv().await.expect("Should receive notification"); + assert_eq!(status, BlockStatus::Importing); + + // Transition to Imported (terminal state) + assert!( + table + .transition(&hash, BlockStatus::Importing, BlockStatus::Imported) + .is_ok() + ); + + let status = rx.recv().await.expect("Should receive notification"); + assert_eq!(status, BlockStatus::Imported); + } + + #[tokio::test] + async fn test_subscribe_race_condition() { + let table = Arc::new(BlockStatusTable::new()); + let hash = test_hash(1); + let slot = Slot::new(1); + + table.insert(hash, slot, BlockStatus::Pending); + + // Spawn a task that will transition the status immediately + let table_clone = Arc::clone(&table); + let transition_handle = tokio::spawn(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + assert!( + table_clone + .transition(&hash, BlockStatus::Pending, BlockStatus::Executing,) + .is_ok() + ); + }); + + // Subscribe (might race with the transition above) + let (current_status, mut rx) = table.subscribe(&hash).expect("Should be able to subscribe"); + + transition_handle + .await + .expect("Transition task should complete"); + + // Depending on race outcome: + // - If we subscribed before transition: current_status = Pending, rx will receive Executing + // - If we subscribed after transition: current_status = Executing, rx might immediately have Executing + + if current_status == BlockStatus::Pending { + // Should receive the Executing notification + let status = tokio::time::timeout(tokio::time::Duration::from_secs(1), rx.recv()) + .await + .expect("Should not timeout") + .expect("Should receive notification"); + assert_eq!(status, BlockStatus::Executing); + } else { + // Already at Executing, might receive duplicate or might not + assert_eq!(current_status, BlockStatus::Executing); + } + + // Verify final state + assert_eq!(table.get_status(&hash), Some(BlockStatus::Executing)); + } + + #[tokio::test] + async fn test_multiple_subscribers() { + let table = Arc::new(BlockStatusTable::new()); + let hash = test_hash(1); + let slot = Slot::new(1); + + table.insert(hash, slot, BlockStatus::Pending); + + // Create multiple subscribers + let (status1, mut rx1) = table.subscribe(&hash).expect("Should subscribe"); + let (status2, mut rx2) = table.subscribe(&hash).expect("Should subscribe"); + let (status3, mut rx3) = table.subscribe(&hash).expect("Should subscribe"); + + assert_eq!(status1, BlockStatus::Pending); + assert_eq!(status2, BlockStatus::Pending); + assert_eq!(status3, BlockStatus::Pending); + + // Transition state + assert!( + table + .transition(&hash, BlockStatus::Pending, BlockStatus::Executing) + .is_ok() + ); + + // All subscribers should receive the notification + let s1 = rx1.recv().await.expect("rx1 should receive"); + let s2 = rx2.recv().await.expect("rx2 should receive"); + let s3 = rx3.recv().await.expect("rx3 should receive"); + + assert_eq!(s1, BlockStatus::Executing); + assert_eq!(s2, BlockStatus::Executing); + assert_eq!(s3, BlockStatus::Executing); + } + + #[tokio::test] + async fn test_subscribe_to_nonexistent_block() { + let table = BlockStatusTable::new(); + let hash = test_hash(999); + + // Should return None for non-existent block + assert!(table.subscribe(&hash).is_none()); + } + + #[tokio::test] + async fn test_subscribe_after_removal() { + let table = Arc::new(BlockStatusTable::new()); + let hash = test_hash(1); + let slot = Slot::new(1); + + table.insert(hash, slot, BlockStatus::Pending); + let (_status, mut rx) = table.subscribe(&hash).expect("Should subscribe"); + + // Transition to terminal state and remove + assert!( + table + .transition(&hash, BlockStatus::Pending, BlockStatus::Imported) + .is_ok() + ); + + // Receive the notification + let status = rx.recv().await.expect("Should receive Imported"); + assert_eq!(status, BlockStatus::Imported); + + // Remove from table + table.remove(&hash); + + // Channel should close (no more senders) + match rx.recv().await { + Err(broadcast::error::RecvError::Closed) => { + // Expected - channel closed after removal + } + other => panic!("Expected channel to be closed, got: {:?}", other), + } + } + + #[tokio::test] + async fn test_subscribe_with_late_subscriber() { + let table = Arc::new(BlockStatusTable::new()); + let hash = test_hash(1); + let slot = Slot::new(1); + + table.insert(hash, slot, BlockStatus::Pending); + + // First subscriber + let (_status1, mut rx1) = table.subscribe(&hash).expect("Should subscribe"); + + // Transition through multiple states + assert!( + table + .transition(&hash, BlockStatus::Pending, BlockStatus::Executing) + .is_ok() + ); + assert!( + table + .transition(&hash, BlockStatus::Executing, BlockStatus::Importing) + .is_ok() + ); + + // Late subscriber should get current state + let (status2, mut rx2) = table.subscribe(&hash).expect("Should subscribe"); + assert_eq!(status2, BlockStatus::Importing); + + // First subscriber should have received both notifications + assert_eq!( + rx1.recv().await.expect("Should receive"), + BlockStatus::Executing + ); + assert_eq!( + rx1.recv().await.expect("Should receive"), + BlockStatus::Importing + ); + + // Now both should receive future transitions + assert!( + table + .transition(&hash, BlockStatus::Importing, BlockStatus::Imported) + .is_ok() + ); + + assert_eq!( + rx1.recv().await.expect("Should receive"), + BlockStatus::Imported + ); + assert_eq!( + rx2.recv().await.expect("Should receive"), + BlockStatus::Imported + ); + } +} +*/ diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index d0ed8258e55..443daeb9a5b 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -50,6 +50,7 @@ use crate::beacon_snapshot::PreProcessingSnapshot; use crate::blob_verification::GossipBlobError; +use crate::block_status_table::BlockStatus; use crate::block_verification_types::{AsBlock, BlockImportData, RpcBlock}; use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock}; use crate::data_column_verification::GossipDataColumnError; @@ -138,7 +139,7 @@ const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files"); /// /// - The block is malformed/invalid (indicated by all results other than `BeaconChainError`. /// - We encountered an error whilst trying to verify the block (a `BeaconChainError`). -#[derive(Debug, AsRefStr)] +#[derive(Debug, AsRefStr, Clone)] pub enum BlockError { /// The parent block was unknown. /// @@ -193,6 +194,12 @@ pub enum BlockError { /// /// The block could be valid, or invalid. We don't know. DuplicateImportStatusUnknown(Hash256), + /// Block has already been imported by another thread but has not necessarily finished being imported. + /// + /// ## Peer scoring + /// + /// The block could be valid, or invalid. We don't know. + DuplicateImportStatusKnown(Hash256, BlockStatus), /// The block slot exceeds the MAXIMUM_BLOCK_SLOT_NUMBER. /// /// ## Peer scoring @@ -245,14 +252,14 @@ pub enum BlockError { /// ## Peer scoring /// /// The block is invalid and the peer is faulty. - PerBlockProcessingError(BlockProcessingError), + PerBlockProcessingError(Arc), /// There was an error whilst processing the block. It is not necessarily invalid. /// /// ## Peer scoring /// /// We were unable to process this block due to an internal error. It's unclear if the block is /// valid. - BeaconChainError(Box), + BeaconChainError(Arc), /// There was an error whilst verifying weak subjectivity. This block conflicts with the /// configured weak subjectivity checkpoint and was not imported. /// @@ -271,7 +278,7 @@ pub enum BlockError { /// ## Peer scoring /// /// See `ExecutionPayloadError` for scoring information - ExecutionPayloadError(ExecutionPayloadError), + ExecutionPayloadError(Arc), /// The block references an parent block which has an execution payload which was found to be /// invalid. /// @@ -308,7 +315,7 @@ pub enum BlockError { /// /// TODO: We may need to penalize the peer that gave us a potentially invalid rpc blob. /// https://github.com/sigp/lighthouse/issues/4546 - AvailabilityCheck(AvailabilityCheckError), + AvailabilityCheck(Arc), /// A Blob with a slot after PeerDAS is received and is not required to be imported. /// This can happen because we stay subscribed to the blob subnet after 2 epochs, as we could /// still receive valid blobs from a Deneb epoch after PeerDAS is activated. @@ -337,7 +344,7 @@ pub enum BlockError { } /// Which specific signature(s) are invalid in a SignedBeaconBlock -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum InvalidSignature { // The outer signature in a SignedBeaconBlock ProposerSignature, @@ -349,7 +356,7 @@ pub enum InvalidSignature { impl From for BlockError { fn from(e: AvailabilityCheckError) -> Self { - Self::AvailabilityCheck(e) + Self::AvailabilityCheck(Arc::new(e)) } } @@ -457,7 +464,7 @@ impl From for ExecutionPayloadError { impl From for BlockError { fn from(e: ExecutionPayloadError) -> Self { - BlockError::ExecutionPayloadError(e) + BlockError::ExecutionPayloadError(Arc::new(e)) } } @@ -494,31 +501,31 @@ impl From for BlockError { impl From for BlockError { fn from(e: BeaconChainError) -> Self { - BlockError::BeaconChainError(e.into()) + BlockError::BeaconChainError(Arc::new(e.into())) } } impl From for BlockError { fn from(e: BeaconStateError) -> Self { - BlockError::BeaconChainError(BeaconChainError::BeaconStateError(e).into()) + BlockError::BeaconChainError(Arc::new(BeaconChainError::BeaconStateError(e).into())) } } impl From for BlockError { fn from(e: SlotProcessingError) -> Self { - BlockError::BeaconChainError(BeaconChainError::SlotProcessingError(e).into()) + BlockError::BeaconChainError(Arc::new(BeaconChainError::SlotProcessingError(e).into())) } } impl From for BlockError { fn from(e: DBError) -> Self { - BlockError::BeaconChainError(BeaconChainError::DBError(e).into()) + BlockError::BeaconChainError(Arc::new(BeaconChainError::DBError(e).into())) } } impl From for BlockError { fn from(e: ArithError) -> Self { - BlockError::BeaconChainError(BeaconChainError::ArithError(e).into()) + BlockError::BeaconChainError(Arc::new(BeaconChainError::ArithError(e).into())) } } @@ -1002,7 +1009,7 @@ impl GossipVerifiedBlock { .observed_slashable .write() .observe_slashable(block.slot(), block.message().proposer_index(), block_root) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; + .map_err(|e| BlockError::BeaconChainError(Arc::new(e.into())))?; // Now the signature is valid, store the proposal so we don't accept another from this // validator and slot. // @@ -1012,7 +1019,7 @@ impl GossipVerifiedBlock { .observed_block_producers .write() .observe_proposal(block_root, block.message()) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))? + .map_err(|e| BlockError::BeaconChainError(Arc::new(e.into())))? { SeenBlock::Slashable => { return Err(BlockError::Slashable); @@ -1299,10 +1306,7 @@ impl IntoExecutionPendingBlock for RpcBlock .data_availability_checker .verify_kzg_for_rpc_block(self.clone()) .map_err(|e| { - BlockSlashInfo::SignatureNotChecked( - self.signed_block_header(), - BlockError::AvailabilityCheck(e), - ) + BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e.into()) })?; SignatureVerifiedBlock::check_slashable(maybe_available, block_root, chain)? .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) @@ -1338,13 +1342,13 @@ impl ExecutionPendingBlock { .observed_slashable .write() .observe_slashable(block.slot(), block.message().proposer_index(), block_root) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; + .map_err(|e| BlockError::BeaconChainError(Arc::new(e.into())))?; chain .observed_block_producers .write() .observe_proposal(block_root, block.message()) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?; + .map_err(|e| BlockError::BeaconChainError(Arc::new(e.into())))?; if let Some(parent) = chain .canonical_head @@ -1598,7 +1602,7 @@ impl ExecutionPendingBlock { // Capture `BeaconStateError` so that we can easily distinguish between a block // that's invalid and one that caused an internal error. BlockProcessingError::BeaconStateError(e) => return Err(e.into()), - other => return Err(BlockError::PerBlockProcessingError(other)), + other => return Err(BlockError::PerBlockProcessingError(Arc::new(other))), } }; @@ -1645,7 +1649,7 @@ impl ExecutionPendingBlock { for (i, attestation) in block.message().body().attestations().enumerate() { let indexed_attestation = consensus_context .get_indexed_attestation(&state, attestation) - .map_err(|e| BlockError::PerBlockProcessingError(e.into_with_index(i)))?; + .map_err(|e| BlockError::PerBlockProcessingError(e.into_with_index(i).into()))?; match fork_choice.on_attestation( current_slot, @@ -1656,7 +1660,7 @@ impl ExecutionPendingBlock { // Ignore invalid attestations whilst importing attestations from a block. The // block might be very old and therefore the attestations useless to fork choice. Err(ForkChoiceError::InvalidAttestation(_)) => Ok(()), - Err(e) => Err(BlockError::BeaconChainError(Box::new(e.into()))), + Err(e) => Err(BlockError::BeaconChainError(Arc::new(e.into()))), }?; } drop(fork_choice); @@ -1747,7 +1751,7 @@ pub fn check_block_is_finalized_checkpoint_or_descendant< if chain .store .block_exists(&block.parent_root()) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))? + .map_err(|e| BlockError::BeaconChainError(Arc::new(e.into())))? { Err(BlockError::NotFinalizedDescendant { block_parent_root: block.parent_root(), @@ -1796,6 +1800,7 @@ pub fn check_block_relevancy( // Do not process a block from a finalized slot. check_block_against_finalized_slot(block, block_root, chain)?; + // TODO: this could be a check on the block status table rather than fork choice // Check if the block is already known. We know it is post-finalization, so it is // sufficient to check the fork choice. if chain @@ -1894,7 +1899,7 @@ fn load_parent>( let root = block.parent_root(); let parent_block = chain .get_blinded_block(&block.parent_root()) - .map_err(|e| BlockError::BeaconChainError(Box::new(e)))? + .map_err(|e| BlockError::BeaconChainError(Arc::new(e)))? .ok_or_else(|| { // Return a `MissingBeaconBlock` error instead of a `ParentUnknown` error since // we've already checked fork choice for this block. diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 1a0b188fdcd..6545a901b7c 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -295,6 +295,10 @@ impl AvailableExecutedBlock { } blob_ids } + + pub fn block_root(&self) -> Hash256 { + self.import_data.block_root + } } /// A block that has completed all pre-deneb block processing checks, verification diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 5564c7916fa..204db95e5dd 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -4,6 +4,7 @@ use crate::beacon_chain::{ BEACON_CHAIN_DB_KEY, CanonicalHead, LightClientProducerEvent, OP_POOL_DB_KEY, }; use crate::beacon_proposer_cache::BeaconProposerCache; +use crate::block_status_table::BlockStatusTable; use crate::data_availability_checker::DataAvailabilityChecker; use crate::fork_choice_signal::ForkChoiceSignalTx; use crate::fork_revert::{reset_fork_choice_to_finalization, revert_to_fork_boundary}; @@ -1011,6 +1012,7 @@ where slasher: self.slasher.clone(), validator_monitor: RwLock::new(validator_monitor), genesis_backfill_slot, + block_status_table: Arc::new(BlockStatusTable::new()), data_availability_checker: Arc::new( DataAvailabilityChecker::new( complete_blob_backfill, diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index cfc7a9637b2..a938c007e11 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -1020,6 +1020,16 @@ impl BeaconChain { // Take a write-lock on the canonical head and signal for it to prune. self.canonical_head.fork_choice_write_lock().prune()?; + // kick off a task to prune the block status table in 2 minutes + let block_status_table = self.block_status_table.clone(); + self.task_executor.spawn_handle( + async move { + tokio::time::sleep(Duration::from_secs(120)).await; + block_status_table.prune_finalized(new_finalized_slot); + }, + "prune_block_status_table", + ); + Ok(()) } diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index f0cab06ca3d..722fc3900a0 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -76,7 +76,7 @@ impl PayloadNotifier { block_message.body(), &chain.spec, ) - .map_err(BlockError::PerBlockProcessingError)?; + .map_err(|e| BlockError::PerBlockProcessingError(Arc::new(e)))?; match notify_execution_layer { NotifyExecutionLayer::No if chain.config.optimistic_finalized_sync => { @@ -319,18 +319,18 @@ pub fn validate_execution_payload_for_gossip( .slot_clock .start_of(block.slot()) .map(|d| d.as_secs()) - .ok_or(BlockError::BeaconChainError(Box::new( + .ok_or(BlockError::BeaconChainError(Arc::new( BeaconChainError::UnableToComputeTimeAtSlot, )))?; // The block's execution payload timestamp is correct with respect to the slot if execution_payload.timestamp() != expected_timestamp { - return Err(BlockError::ExecutionPayloadError( + return Err(BlockError::ExecutionPayloadError(Arc::new( ExecutionPayloadError::InvalidPayloadTimestamp { expected: expected_timestamp, found: execution_payload.timestamp(), }, - )); + ))); } } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 9d8c3dba38f..62d9efba259 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -11,6 +11,7 @@ mod beacon_snapshot; pub mod bellatrix_readiness; pub mod blob_verification; pub mod block_reward; +pub mod block_status_table; mod block_times_cache; mod block_verification; pub mod block_verification_types; diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 05a4a4b7a4a..c57deb22e8c 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -133,7 +133,7 @@ pub async fn publish_block>( ); crate::publish_pubsub_message(&sender, PubsubMessage::BeaconBlock(block.clone())).map_err( - |_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)), + |_| BlockError::BeaconChainError(Arc::new(BeaconChainError::UnableToPublish)), )?; Ok(()) @@ -494,7 +494,7 @@ fn publish_blob_sidecars( ) -> Result<(), BlockError> { let pubsub_message = PubsubMessage::BlobSidecar(Box::new((blob.index(), blob.clone_blob()))); crate::publish_pubsub_message(sender_clone, pubsub_message) - .map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish))) + .map_err(|_| BlockError::BeaconChainError(Arc::new(BeaconChainError::UnableToPublish))) } fn publish_column_sidecars( @@ -527,7 +527,7 @@ fn publish_column_sidecars( }) .collect::>(); crate::publish_pubsub_messages(sender_clone, pubsub_messages) - .map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish))) + .map_err(|_| BlockError::BeaconChainError(Arc::new(BeaconChainError::UnableToPublish))) } async fn post_block_import_logging_and_response( @@ -798,7 +798,7 @@ fn check_slashable( block_clone.message().proposer_index(), block_root, ) - .map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))? + .map_err(|e| BlockError::BeaconChainError(Arc::new(e.into())))? { warn!( slot = %block_clone.slot(), diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 5fc94c29587..8eadc5d8432 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -1289,7 +1289,8 @@ impl NetworkBeaconProcessor { } Err( BlockError::DuplicateFullyImported(_) - | BlockError::DuplicateImportStatusUnknown(..), + | BlockError::DuplicateImportStatusUnknown(..) + | BlockError::DuplicateImportStatusKnown(..), ) => { debug!( %block_root, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index f8ffd298caf..c413bea5675 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -651,14 +651,19 @@ impl BlockLookups { // Note: currenlty only InvalidColumn errors have index granularity, // but future errors may follow the same pattern. Generalize this // pattern with https://github.com/sigp/lighthouse/pull/6321 - BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn((index_opt, _)), - ) => { - match index_opt { - Some(index) => peer_group.of_index(index as usize).collect(), - // If no index supplied this is an un-attributable fault. In practice - // this should never happen. - None => vec![], + BlockError::AvailabilityCheck(arc_err) => { + match arc_err.as_ref() { + AvailabilityCheckError::InvalidColumn((index_opt, _)) => { + match index_opt { + Some(index) => { + peer_group.of_index(*index as usize).collect() + } + // If no index supplied this is an un-attributable fault. In practice + // this should never happen. + None => vec![], + } + } + _ => peer_group.all().collect(), } } _ => peer_group.all().collect(),