Skip to content

Commit 78d330e

Browse files
authored
Consolidate reqresp_pre_import_cache into data_availability_checker (#8045)
This PR consolidates the `reqresp_pre_import_cache` into the `data_availability_checker` for the following reasons: - the `reqresp_pre_import_cache` suffers from the same TOCTOU bug we had with `data_availability_checker` earlier, and leads to unbounded memory leak, which we have observed over the last 6 months on some nodes. - the `reqresp_pre_import_cache` is no longer necessary, because we now hold blocks in the `data_availability_checker` for longer since (#7961), and recent blocks can be served from the DA checker. This PR also maintains the following functionalities - Serving pre-executed blocks over RPC, and they're now served from the `data_availability_checker` instead. - Using the cache for de-duplicating lookup requests. Co-Authored-By: Jimmy Chen <[email protected]> Co-Authored-By: Jimmy Chen <[email protected]>
1 parent 4111bcb commit 78d330e

File tree

9 files changed

+239
-208
lines changed

9 files changed

+239
-208
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 40 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -340,10 +340,6 @@ pub enum BlockProcessStatus<E: EthSpec> {
340340
ExecutionValidated(Arc<SignedBeaconBlock<E>>),
341341
}
342342

343-
pub struct BeaconChainMetrics {
344-
pub reqresp_pre_import_cache_len: usize,
345-
}
346-
347343
pub type LightClientProducerEvent<T> = (Hash256, Slot, SyncAggregate<T>);
348344

349345
pub type BeaconForkChoice<T> = ForkChoice<
@@ -363,9 +359,6 @@ pub type BeaconStore<T> = Arc<
363359
>,
364360
>;
365361

366-
/// Cache gossip verified blocks to serve over ReqResp before they are imported
367-
type ReqRespPreImportCache<E> = HashMap<Hash256, Arc<SignedBeaconBlock<E>>>;
368-
369362
/// Represents the "Beacon Chain" component of Ethereum 2.0. Allows import of blocks and block
370363
/// operations and chooses a canonical head.
371364
pub struct BeaconChain<T: BeaconChainTypes> {
@@ -462,8 +455,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
462455
pub(crate) attester_cache: Arc<AttesterCache>,
463456
/// A cache used when producing attestations whilst the head block is still being imported.
464457
pub early_attester_cache: EarlyAttesterCache<T::EthSpec>,
465-
/// Cache gossip verified blocks to serve over ReqResp before they are imported
466-
pub reqresp_pre_import_cache: Arc<RwLock<ReqRespPreImportCache<T::EthSpec>>>,
467458
/// A cache used to keep track of various block timings.
468459
pub block_times_cache: Arc<RwLock<BlockTimesCache>>,
469460
/// A cache used to track pre-finalization block roots for quick rejection.
@@ -1289,18 +1280,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
12891280
/// chain. Used by sync to learn the status of a block and prevent repeated downloads /
12901281
/// processing attempts.
12911282
pub fn get_block_process_status(&self, block_root: &Hash256) -> BlockProcessStatus<T::EthSpec> {
1292-
if let Some(block) = self
1293-
.data_availability_checker
1294-
.get_execution_valid_block(block_root)
1295-
{
1296-
return BlockProcessStatus::ExecutionValidated(block);
1297-
}
1298-
1299-
if let Some(block) = self.reqresp_pre_import_cache.read().get(block_root) {
1300-
// A block is on the `reqresp_pre_import_cache` but NOT in the
1301-
// `data_availability_checker` only if it is actively processing. We can expect a future
1302-
// event with the result of processing
1303-
return BlockProcessStatus::NotValidated(block.clone());
1283+
if let Some(cached_block) = self.data_availability_checker.get_cached_block(block_root) {
1284+
return cached_block;
13041285
}
13051286

13061287
BlockProcessStatus::Unknown
@@ -3054,8 +3035,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30543035

30553036
self.emit_sse_blob_sidecar_events(&block_root, std::iter::once(blob.as_blob()));
30563037

3057-
let r = self.check_gossip_blob_availability_and_import(blob).await;
3058-
self.remove_notified(&block_root, r)
3038+
self.check_gossip_blob_availability_and_import(blob).await
30593039
}
30603040

30613041
/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
@@ -3092,15 +3072,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30923072
data_columns.iter().map(|column| column.as_data_column()),
30933073
);
30943074

3095-
let r = self
3096-
.check_gossip_data_columns_availability_and_import(
3097-
slot,
3098-
block_root,
3099-
data_columns,
3100-
publish_fn,
3101-
)
3102-
.await;
3103-
self.remove_notified(&block_root, r)
3075+
self.check_gossip_data_columns_availability_and_import(
3076+
slot,
3077+
block_root,
3078+
data_columns,
3079+
publish_fn,
3080+
)
3081+
.await
31043082
}
31053083

31063084
/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
@@ -3139,10 +3117,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31393117

31403118
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
31413119

3142-
let r = self
3143-
.check_rpc_blob_availability_and_import(slot, block_root, blobs)
3144-
.await;
3145-
self.remove_notified(&block_root, r)
3120+
self.check_rpc_blob_availability_and_import(slot, block_root, blobs)
3121+
.await
31463122
}
31473123

31483124
/// Process blobs retrieved from the EL and returns the `AvailabilityProcessingStatus`.
@@ -3174,10 +3150,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
31743150
}
31753151
}
31763152

3177-
let r = self
3178-
.check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output)
3179-
.await;
3180-
self.remove_notified(&block_root, r)
3153+
self.check_engine_blobs_availability_and_import(slot, block_root, engine_get_blobs_output)
3154+
.await
31813155
}
31823156

31833157
fn emit_sse_blob_sidecar_events<'a, I>(self: &Arc<Self>, block_root: &Hash256, blobs_iter: I)
@@ -3270,10 +3244,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32703244
custody_columns.iter().map(|column| column.as_ref()),
32713245
);
32723246

3273-
let r = self
3274-
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
3275-
.await;
3276-
self.remove_notified(&block_root, r)
3247+
self.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
3248+
.await
32773249
}
32783250

32793251
pub async fn reconstruct_data_columns(
@@ -3320,10 +3292,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
33203292
return Ok(None);
33213293
};
33223294

3323-
let r = self
3324-
.process_availability(slot, availability, || Ok(()))
3325-
.await;
3326-
self.remove_notified(&block_root, r)
3295+
self.process_availability(slot, availability, || Ok(()))
3296+
.await
33273297
.map(|availability_processing_status| {
33283298
Some((availability_processing_status, data_columns_to_publish))
33293299
})
@@ -3340,46 +3310,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
33403310
}
33413311
}
33423312

3343-
/// Remove any block components from the *processing cache* if we no longer require them. If the
3344-
/// block was imported full or erred, we no longer require them.
3345-
fn remove_notified(
3346-
&self,
3347-
block_root: &Hash256,
3348-
r: Result<AvailabilityProcessingStatus, BlockError>,
3349-
) -> Result<AvailabilityProcessingStatus, BlockError> {
3350-
let has_missing_components =
3351-
matches!(r, Ok(AvailabilityProcessingStatus::MissingComponents(_, _)));
3352-
if !has_missing_components {
3353-
self.reqresp_pre_import_cache.write().remove(block_root);
3354-
}
3355-
r
3356-
}
3357-
3358-
/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
3359-
/// and evict if the block was imported or errored.
3360-
pub async fn process_block_with_early_caching<B: IntoExecutionPendingBlock<T>>(
3361-
self: &Arc<Self>,
3362-
block_root: Hash256,
3363-
unverified_block: B,
3364-
block_source: BlockImportSource,
3365-
notify_execution_layer: NotifyExecutionLayer,
3366-
) -> Result<AvailabilityProcessingStatus, BlockError> {
3367-
self.reqresp_pre_import_cache
3368-
.write()
3369-
.insert(block_root, unverified_block.block_cloned());
3370-
3371-
let r = self
3372-
.process_block(
3373-
block_root,
3374-
unverified_block,
3375-
notify_execution_layer,
3376-
block_source,
3377-
|| Ok(()),
3378-
)
3379-
.await;
3380-
self.remove_notified(&block_root, r)
3381-
}
3382-
33833313
/// Check for known and configured invalid block roots before processing.
33843314
pub fn check_invalid_block_roots(&self, block_root: Hash256) -> Result<(), BlockError> {
33853315
if self.config.invalid_block_roots.contains(&block_root) {
@@ -3411,12 +3341,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
34113341
block_source: BlockImportSource,
34123342
publish_fn: impl FnOnce() -> Result<(), BlockError>,
34133343
) -> Result<AvailabilityProcessingStatus, BlockError> {
3414-
// Start the Prometheus timer.
3415-
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
3416-
3417-
// Increment the Prometheus counter for block processing requests.
3418-
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);
3419-
34203344
let block_slot = unverified_block.block().slot();
34213345

34223346
// Set observed time if not already set. Usually this should be set by gossip or RPC,
@@ -3431,6 +3355,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
34313355
);
34323356
}
34333357

3358+
self.data_availability_checker
3359+
.put_pre_execution_block(block_root, unverified_block.block_cloned())?;
3360+
3361+
// Start the Prometheus timer.
3362+
let _full_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_TIMES);
3363+
3364+
// Increment the Prometheus counter for block processing requests.
3365+
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);
3366+
34343367
// A small closure to group the verification and import errors.
34353368
let chain = self.clone();
34363369
let import_block = async move {
@@ -3448,7 +3381,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
34483381
.set_time_consensus_verified(block_root, block_slot, timestamp)
34493382
}
34503383

3451-
let executed_block = chain.into_executed_block(execution_pending).await?;
3384+
let executed_block = chain
3385+
.into_executed_block(execution_pending)
3386+
.await
3387+
.inspect_err(|_| {
3388+
// If the block fails execution for whatever reason (e.g. engine offline),
3389+
// and we keep it in the cache, then the node will NOT perform lookup and
3390+
// reprocess this block until the block is evicted from DA checker, causing the
3391+
// chain to get stuck temporarily if the block is canonical. Therefore we remove
3392+
// it from the cache if execution fails.
3393+
self.data_availability_checker
3394+
.remove_block_on_execution_error(&block_root);
3395+
})?;
34523396

34533397
// Record the *additional* time it took to wait for execution layer verification.
34543398
if let Some(timestamp) = self.slot_clock.now_duration() {
@@ -3574,9 +3518,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
35743518
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
35753519
) -> Result<AvailabilityProcessingStatus, BlockError> {
35763520
let slot = block.block.slot();
3577-
let availability = self
3578-
.data_availability_checker
3579-
.put_pending_executed_block(block)?;
3521+
let availability = self.data_availability_checker.put_executed_block(block)?;
35803522
self.process_availability(slot, availability, || Ok(()))
35813523
.await
35823524
}
@@ -7156,12 +7098,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
71567098
)
71577099
}
71587100

7159-
pub fn metrics(&self) -> BeaconChainMetrics {
7160-
BeaconChainMetrics {
7161-
reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(),
7162-
}
7163-
}
7164-
71657101
pub(crate) fn get_blobs_or_columns_store_op(
71667102
&self,
71677103
block_root: Hash256,

beacon_node/beacon_chain/src/builder.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -998,7 +998,6 @@ where
998998
validator_pubkey_cache: RwLock::new(validator_pubkey_cache),
999999
attester_cache: <_>::default(),
10001000
early_attester_cache: <_>::default(),
1001-
reqresp_pre_import_cache: <_>::default(),
10021001
light_client_server_cache: LightClientServerCache::new(),
10031002
light_client_server_tx: self.light_client_server_tx,
10041003
shutdown_sender: self

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use crate::block_verification_types::{
77
use crate::data_availability_checker::overflow_lru_cache::{
88
DataAvailabilityCheckerInner, ReconstructColumnsDecision,
99
};
10-
use crate::{BeaconChain, BeaconChainTypes, BeaconStore, CustodyContext, metrics};
10+
use crate::{
11+
BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, CustodyContext, metrics,
12+
};
1113
use kzg::Kzg;
1214
use slot_clock::SlotClock;
1315
use std::fmt;
@@ -27,6 +29,7 @@ mod error;
2729
mod overflow_lru_cache;
2830
mod state_lru_cache;
2931

32+
use crate::data_availability_checker::error::Error;
3033
use crate::data_column_verification::{
3134
CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn,
3235
KzgVerifiedDataColumn, verify_kzg_for_data_column_list,
@@ -144,14 +147,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
144147
&self.custody_context
145148
}
146149

147-
/// Checks if the block root is currenlty in the availability cache awaiting import because
150+
/// Checks if the block root is currently in the availability cache awaiting import because
148151
/// of missing components.
149-
pub fn get_execution_valid_block(
150-
&self,
151-
block_root: &Hash256,
152-
) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
153-
self.availability_cache
154-
.get_execution_valid_block(block_root)
152+
///
153+
/// Returns the cache block wrapped in a `BlockProcessStatus` enum if it exists.
154+
pub fn get_cached_block(&self, block_root: &Hash256) -> Option<BlockProcessStatus<T::EthSpec>> {
155+
self.availability_cache.get_cached_block(block_root)
155156
}
156157

157158
/// Return the set of cached blob indexes for `block_root`. Returns None if there is no block
@@ -340,12 +341,29 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
340341

341342
/// Check if we have all the blobs for a block. Returns `Availability` which has information
342343
/// about whether all components have been received or more are required.
343-
pub fn put_pending_executed_block(
344+
pub fn put_executed_block(
344345
&self,
345346
executed_block: AvailabilityPendingExecutedBlock<T::EthSpec>,
346347
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
348+
self.availability_cache.put_executed_block(executed_block)
349+
}
350+
351+
/// Inserts a pre-execution block into the cache.
352+
/// This does NOT override an existing executed block.
353+
pub fn put_pre_execution_block(
354+
&self,
355+
block_root: Hash256,
356+
block: Arc<SignedBeaconBlock<T::EthSpec>>,
357+
) -> Result<(), Error> {
358+
self.availability_cache
359+
.put_pre_execution_block(block_root, block)
360+
}
361+
362+
/// Removes a pre-execution block from the cache.
363+
/// This does NOT remove an existing executed block.
364+
pub fn remove_block_on_execution_error(&self, block_root: &Hash256) {
347365
self.availability_cache
348-
.put_pending_executed_block(executed_block)
366+
.remove_pre_execution_block(block_root);
349367
}
350368

351369
/// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may

0 commit comments

Comments
 (0)