Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Update the state cache so it doesn't mistakenly prune the new head.
self.store
.state_cache
.inner
.lock()
.update_head_block_root(new_cached_head.head_block_root());

Expand Down
48 changes: 27 additions & 21 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct HotColdDB<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> {
/// Cache of beacon states.
///
/// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required.
pub state_cache: Mutex<StateCache<E>>,
pub state_cache: StateCache<E>,
/// Cache of historic states and hierarchical diff buffers.
///
/// This cache is never pruned. It is only populated in response to historical queries from the
Expand Down Expand Up @@ -232,11 +232,11 @@ impl<E: EthSpec> HotColdDB<E, MemoryStore<E>, MemoryStore<E>> {
block_cache: NonZeroUsize::new(config.block_cache_size)
.map(BlockCache::new)
.map(Mutex::new),
state_cache: Mutex::new(StateCache::new(
state_cache: StateCache::new(
config.state_cache_size,
config.state_cache_headroom,
config.hot_hdiff_buffer_cache_size,
)),
),
historic_state_cache: Mutex::new(HistoricStateCache::new(
config.cold_hdiff_buffer_cache_size,
config.historic_state_cache_size,
Expand Down Expand Up @@ -286,11 +286,11 @@ impl<E: EthSpec> HotColdDB<E, BeaconNodeBackend<E>, BeaconNodeBackend<E>> {
block_cache: NonZeroUsize::new(config.block_cache_size)
.map(BlockCache::new)
.map(Mutex::new),
state_cache: Mutex::new(StateCache::new(
state_cache: StateCache::new(
config.state_cache_size,
config.state_cache_headroom,
config.hot_hdiff_buffer_cache_size,
)),
),
historic_state_cache: Mutex::new(HistoricStateCache::new(
config.cold_hdiff_buffer_cache_size,
config.historic_state_cache_size,
Expand Down Expand Up @@ -477,7 +477,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let pre_finalized_slots_to_retain = self
.hierarchy
.closest_layer_points(state.slot(), start_slot);
self.state_cache.lock().update_finalized_state(
self.state_cache.inner.lock().update_finalized_state(
state_root,
block_root,
state,
Expand All @@ -486,7 +486,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

pub fn state_cache_len(&self) -> usize {
self.state_cache.lock().len()
self.state_cache.inner.lock().len()
}

pub fn register_metrics(&self) {
Expand All @@ -503,7 +503,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
cache.blob_cache.len() as i64,
);
}
let state_cache = self.state_cache.lock();
let state_cache = self.state_cache.inner.lock();
metrics::set_gauge(
&metrics::STORE_BEACON_STATE_CACHE_SIZE,
state_cache.len() as i64,
Expand Down Expand Up @@ -1109,6 +1109,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state.build_all_caches(&self.spec)?;
if let PutStateOutcome::New(deleted_states) =
self.state_cache
.inner
.lock()
.put_state(*state_root, block_root, state)?
{
Expand Down Expand Up @@ -1137,6 +1138,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
max_slot: Slot,
) -> Option<(Hash256, BeaconState<E>)> {
self.state_cache
.inner
.lock()
.get_by_block_root(block_root, max_slot)
}
Expand Down Expand Up @@ -1467,10 +1469,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
for op in &hot_db_cache_ops {
match op {
StoreOp::DeleteBlock(block_root) => {
self.state_cache.lock().delete_block_states(block_root);
self.state_cache
.inner
.lock()
.delete_block_states(block_root);
}
StoreOp::DeleteState(state_root, _) => {
self.state_cache.lock().delete_state(state_root)
self.state_cache.inner.lock().delete_state(state_root)
}
_ => (),
}
Expand Down Expand Up @@ -1538,7 +1543,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state: &BeaconState<E>,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
match self.state_cache.lock().put_state(
match self.state_cache.inner.lock().put_state(
*state_root,
state.get_latest_block_root(*state_root),
state,
Expand Down Expand Up @@ -1688,7 +1693,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state_root: &Hash256,
update_cache: bool,
) -> Result<Option<BeaconState<E>>, Error> {
if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) {
if let Some(state) = self.state_cache.inner.lock().get_by_state_root(*state_root) {
return Ok(Some(state));
}

Expand All @@ -1705,10 +1710,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
state.update_tree_hash_cache()?;
state.build_all_caches(&self.spec)?;
if update_cache {
if let PutStateOutcome::New(deleted_states) =
self.state_cache
.lock()
.put_state(*state_root, block_root, &state)?
if let PutStateOutcome::New(deleted_states) = self
.state_cache
.inner
.lock()
.put_state(*state_root, block_root, &state)?
{
debug!(
?state_root,
Expand All @@ -1733,11 +1739,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result<HDiffBuffer, Error> {
if let Some(buffer) = self
.state_cache
.lock()
.get_hdiff_buffer_by_state_root(state_root)
{
if let Some(buffer) = self.state_cache.get_hdiff_buffer_by_state_root(state_root) {
return Ok(buffer);
}

Expand Down Expand Up @@ -1794,6 +1796,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

// Add buffer to cache for future calls.
self.state_cache
.inner
.lock()
.put_hdiff_buffer(state_root, slot, &buffer);

Expand Down Expand Up @@ -1853,6 +1856,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// Immediately rebase the state from diffs on the finalized state so that we
// can utilise structural sharing and don't consume excess memory.
self.state_cache
.inner
.lock()
.rebase_on_finalized(&mut state, &self.spec)?;

Expand All @@ -1878,6 +1882,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// Immediately rebase the state from disk on the finalized state so that we can
// reuse parts of the tree for state root calculation in `replay_blocks`.
self.state_cache
.inner
.lock()
.rebase_on_finalized(&mut base_state, &self.spec)?;

Expand Down Expand Up @@ -1925,6 +1930,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let latest_block_root = state.get_latest_block_root(state_root);
if let PutStateOutcome::New(_) =
self.state_cache
.inner
.lock()
.put_state(state_root, latest_block_root, state)?
{
Expand Down
104 changes: 64 additions & 40 deletions beacon_node/store/src/state_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use crate::{
metrics::{self, HOT_METRIC},
};
use lru::LruCache;
use parking_lot::Mutex;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::num::NonZeroUsize;
use std::sync::Arc;
use tracing::instrument;
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot};

Expand Down Expand Up @@ -37,6 +39,11 @@ pub struct SlotMap {

#[derive(Debug)]
pub struct StateCache<E: EthSpec> {
pub inner: Mutex<StateCacheInner<E>>,
}

#[derive(Debug)]
pub struct StateCacheInner<E: EthSpec> {
finalized_state: Option<FinalizedState<E>>,
// Stores the tuple (state_root, state) as LruCache only returns the value on put and we need
// the state_root
Expand All @@ -60,7 +67,7 @@ pub struct HotHDiffBufferCache {
/// Cache of HDiffBuffers for states *prior* to the `finalized_state`.
///
/// Maps state_root -> (slot, buffer).
hdiff_buffers: LruCache<Hash256, (Slot, HDiffBuffer)>,
hdiff_buffers: LruCache<Hash256, Arc<(Slot, HDiffBuffer)>>,
}

#[derive(Debug)]
Expand All @@ -77,14 +84,57 @@ pub enum PutStateOutcome {
New(Vec<Hash256>),
}

#[allow(clippy::len_without_is_empty)]
impl<E: EthSpec> StateCache<E> {
pub fn new(
state_capacity: NonZeroUsize,
headroom: NonZeroUsize,
hdiff_capacity: NonZeroUsize,
) -> Self {
StateCache {
Self {
inner: Mutex::new(StateCacheInner::new(
state_capacity,
headroom,
hdiff_capacity,
)),
}
}

pub fn get_hdiff_buffer_by_state_root(&self, state_root: Hash256) -> Option<HDiffBuffer> {
let mut inner = self.inner.lock();

let opt_buffer = inner.hdiff_buffers.hdiff_buffers.get(&state_root).cloned();

if let Some((_, buffer)) = opt_buffer.as_deref() {
drop(inner);

metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_HIT, HOT_METRIC);
let _timer =
metrics::start_timer_vec(&metrics::BEACON_HDIFF_BUFFER_CLONE_TIME, HOT_METRIC);
Some(buffer.clone())
} else if let Some(state) = inner.get_by_state_root(state_root) {
// Drop the lock before performing the conversion from state to buffer, as this
// conversion is somewhat expensive.
drop(inner);

metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_HIT, HOT_METRIC);
let buffer = HDiffBuffer::from_state(state);

Some(buffer)
} else {
metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_MISS, HOT_METRIC);
None
}
}
}

#[allow(clippy::len_without_is_empty)]
impl<E: EthSpec> StateCacheInner<E> {
pub fn new(
state_capacity: NonZeroUsize,
headroom: NonZeroUsize,
hdiff_capacity: NonZeroUsize,
) -> Self {
StateCacheInner {
finalized_state: None,
states: LruCache::new(state_capacity),
block_map: BlockMap::default(),
Expand Down Expand Up @@ -143,9 +193,10 @@ impl<E: EthSpec> StateCache<E> {
// at some slots in the case of long forks without finality.
let new_hdiff_cache = HotHDiffBufferCache::new(self.hdiff_buffers.cap());
let old_hdiff_cache = std::mem::replace(&mut self.hdiff_buffers, new_hdiff_cache);
for (state_root, (slot, buffer)) in old_hdiff_cache.hdiff_buffers {
for (state_root, arc) in old_hdiff_cache.hdiff_buffers {
let slot = arc.0;
if pre_finalized_slots_to_retain.contains(&slot) {
self.hdiff_buffers.put(state_root, slot, buffer);
self.hdiff_buffers.hdiff_buffers.put(state_root, arc);
}
}

Expand Down Expand Up @@ -279,26 +330,6 @@ impl<E: EthSpec> StateCache<E> {
self.hdiff_buffers.put(state_root, slot, buffer.clone());
}

pub fn get_hdiff_buffer_by_state_root(&mut self, state_root: Hash256) -> Option<HDiffBuffer> {
if let Some(buffer) = self.hdiff_buffers.get(&state_root) {
metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_HIT, HOT_METRIC);
let timer =
metrics::start_timer_vec(&metrics::BEACON_HDIFF_BUFFER_CLONE_TIME, HOT_METRIC);
let result = Some(buffer.clone());
drop(timer);
return result;
}
if let Some(buffer) = self
.get_by_state_root(state_root)
.map(HDiffBuffer::from_state)
{
metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_HIT, HOT_METRIC);
return Some(buffer);
}
metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_MISS, HOT_METRIC);
None
}

#[instrument(skip_all, fields(?block_root, %slot), level = "debug")]
pub fn get_by_block_root(
&mut self,
Expand Down Expand Up @@ -444,9 +475,7 @@ impl HotHDiffBufferCache {
}

pub fn get(&mut self, state_root: &Hash256) -> Option<HDiffBuffer> {
self.hdiff_buffers
.get(state_root)
.map(|(_, buffer)| buffer.clone())
self.hdiff_buffers.get(state_root).map(|arc| arc.1.clone())
}

/// Put a value in the cache, making room for it if necessary.
Expand All @@ -455,7 +484,7 @@ impl HotHDiffBufferCache {
pub fn put(&mut self, state_root: Hash256, slot: Slot, buffer: HDiffBuffer) -> bool {
// If the cache is not full, simply insert the value.
if self.hdiff_buffers.len() != self.hdiff_buffers.cap().get() {
self.hdiff_buffers.put(state_root, (slot, buffer));
self.hdiff_buffers.put(state_root, Arc::new((slot, buffer)));
return true;
}

Expand All @@ -466,28 +495,26 @@ impl HotHDiffBufferCache {
// cache. This is a simplified way of retaining the snapshot in the cache. We don't need
// to worry about inserting/retaining states older than the snapshot because these are
// pruned on finalization and never reinserted.
let Some(min_slot) = self.hdiff_buffers.iter().map(|(_, (slot, _))| *slot).min() else {
let Some(min_slot) = self.hdiff_buffers.iter().map(|(_, arc)| arc.0).min() else {
// Unreachable: cache is full so should have >0 entries.
return false;
};

if self.hdiff_buffers.cap().get() > 1 || slot < min_slot {
// Remove LRU value. Cache is now at size `cap - 1`.
let Some((removed_state_root, (removed_slot, removed_buffer))) =
self.hdiff_buffers.pop_lru()
else {
let Some((removed_state_root, removed_arc)) = self.hdiff_buffers.pop_lru() else {
// Unreachable: cache is full so should have at least one entry to pop.
return false;
};

// Insert new value. Cache size is now at size `cap`.
self.hdiff_buffers.put(state_root, (slot, buffer));
self.hdiff_buffers.put(state_root, Arc::new((slot, buffer)));

// If the removed value had the min slot and we didn't intend to replace it (cap=1)
// then we reinsert it.
let removed_slot = removed_arc.0;
if removed_slot == min_slot && slot >= min_slot {
self.hdiff_buffers
.put(removed_state_root, (removed_slot, removed_buffer));
self.hdiff_buffers.put(removed_state_root, removed_arc);
}
true
} else {
Expand All @@ -506,9 +533,6 @@ impl HotHDiffBufferCache {
}

pub fn mem_usage(&self) -> usize {
self.hdiff_buffers
.iter()
.map(|(_, (_, buffer))| buffer.size())
.sum()
self.hdiff_buffers.iter().map(|(_, arc)| arc.1.size()).sum()
}
}