diff --git a/beacon_node/beacon_chain/src/canonical_head.rs b/beacon_node/beacon_chain/src/canonical_head.rs index 7dd4c88c513..474fc11ec8c 100644 --- a/beacon_node/beacon_chain/src/canonical_head.rs +++ b/beacon_node/beacon_chain/src/canonical_head.rs @@ -802,6 +802,7 @@ impl BeaconChain { // Update the state cache so it doesn't mistakenly prune the new head. self.store .state_cache + .inner .lock() .update_head_block_root(new_cached_head.head_block_root()); diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 0d8a65e0644..cc00d98ef90 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -74,7 +74,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// Cache of beacon states. /// /// LOCK ORDERING: this lock must always be locked *after* the `split` if both are required. - pub state_cache: Mutex>, + pub state_cache: StateCache, /// Cache of historic states and hierarchical diff buffers. /// /// This cache is never pruned. It is only populated in response to historical queries from the @@ -232,11 +232,11 @@ impl HotColdDB, MemoryStore> { block_cache: NonZeroUsize::new(config.block_cache_size) .map(BlockCache::new) .map(Mutex::new), - state_cache: Mutex::new(StateCache::new( + state_cache: StateCache::new( config.state_cache_size, config.state_cache_headroom, config.hot_hdiff_buffer_cache_size, - )), + ), historic_state_cache: Mutex::new(HistoricStateCache::new( config.cold_hdiff_buffer_cache_size, config.historic_state_cache_size, @@ -286,11 +286,11 @@ impl HotColdDB, BeaconNodeBackend> { block_cache: NonZeroUsize::new(config.block_cache_size) .map(BlockCache::new) .map(Mutex::new), - state_cache: Mutex::new(StateCache::new( + state_cache: StateCache::new( config.state_cache_size, config.state_cache_headroom, config.hot_hdiff_buffer_cache_size, - )), + ), historic_state_cache: Mutex::new(HistoricStateCache::new( config.cold_hdiff_buffer_cache_size, config.historic_state_cache_size, @@ -477,7 +477,7 @@ impl, Cold: ItemStore> HotColdDB let pre_finalized_slots_to_retain = self .hierarchy .closest_layer_points(state.slot(), start_slot); - self.state_cache.lock().update_finalized_state( + self.state_cache.inner.lock().update_finalized_state( state_root, block_root, state, @@ -486,7 +486,7 @@ impl, Cold: ItemStore> HotColdDB } pub fn state_cache_len(&self) -> usize { - self.state_cache.lock().len() + self.state_cache.inner.lock().len() } pub fn register_metrics(&self) { @@ -503,7 +503,7 @@ impl, Cold: ItemStore> HotColdDB cache.blob_cache.len() as i64, ); } - let state_cache = self.state_cache.lock(); + let state_cache = self.state_cache.inner.lock(); metrics::set_gauge( &metrics::STORE_BEACON_STATE_CACHE_SIZE, state_cache.len() as i64, @@ -1109,6 +1109,7 @@ impl, Cold: ItemStore> HotColdDB state.build_all_caches(&self.spec)?; if let PutStateOutcome::New(deleted_states) = self.state_cache + .inner .lock() .put_state(*state_root, block_root, state)? { @@ -1137,6 +1138,7 @@ impl, Cold: ItemStore> HotColdDB max_slot: Slot, ) -> Option<(Hash256, BeaconState)> { self.state_cache + .inner .lock() .get_by_block_root(block_root, max_slot) } @@ -1467,10 +1469,13 @@ impl, Cold: ItemStore> HotColdDB for op in &hot_db_cache_ops { match op { StoreOp::DeleteBlock(block_root) => { - self.state_cache.lock().delete_block_states(block_root); + self.state_cache + .inner + .lock() + .delete_block_states(block_root); } StoreOp::DeleteState(state_root, _) => { - self.state_cache.lock().delete_state(state_root) + self.state_cache.inner.lock().delete_state(state_root) } _ => (), } @@ -1538,7 +1543,7 @@ impl, Cold: ItemStore> HotColdDB state: &BeaconState, ops: &mut Vec, ) -> Result<(), Error> { - match self.state_cache.lock().put_state( + match self.state_cache.inner.lock().put_state( *state_root, state.get_latest_block_root(*state_root), state, @@ -1688,7 +1693,7 @@ impl, Cold: ItemStore> HotColdDB state_root: &Hash256, update_cache: bool, ) -> Result>, Error> { - if let Some(state) = self.state_cache.lock().get_by_state_root(*state_root) { + if let Some(state) = self.state_cache.inner.lock().get_by_state_root(*state_root) { return Ok(Some(state)); } @@ -1705,10 +1710,11 @@ impl, Cold: ItemStore> HotColdDB state.update_tree_hash_cache()?; state.build_all_caches(&self.spec)?; if update_cache { - if let PutStateOutcome::New(deleted_states) = - self.state_cache - .lock() - .put_state(*state_root, block_root, &state)? + if let PutStateOutcome::New(deleted_states) = self + .state_cache + .inner + .lock() + .put_state(*state_root, block_root, &state)? { debug!( ?state_root, @@ -1733,11 +1739,7 @@ impl, Cold: ItemStore> HotColdDB } fn load_hot_hdiff_buffer(&self, state_root: Hash256) -> Result { - if let Some(buffer) = self - .state_cache - .lock() - .get_hdiff_buffer_by_state_root(state_root) - { + if let Some(buffer) = self.state_cache.get_hdiff_buffer_by_state_root(state_root) { return Ok(buffer); } @@ -1794,6 +1796,7 @@ impl, Cold: ItemStore> HotColdDB // Add buffer to cache for future calls. self.state_cache + .inner .lock() .put_hdiff_buffer(state_root, slot, &buffer); @@ -1853,6 +1856,7 @@ impl, Cold: ItemStore> HotColdDB // Immediately rebase the state from diffs on the finalized state so that we // can utilise structural sharing and don't consume excess memory. self.state_cache + .inner .lock() .rebase_on_finalized(&mut state, &self.spec)?; @@ -1878,6 +1882,7 @@ impl, Cold: ItemStore> HotColdDB // Immediately rebase the state from disk on the finalized state so that we can // reuse parts of the tree for state root calculation in `replay_blocks`. self.state_cache + .inner .lock() .rebase_on_finalized(&mut base_state, &self.spec)?; @@ -1925,6 +1930,7 @@ impl, Cold: ItemStore> HotColdDB let latest_block_root = state.get_latest_block_root(state_root); if let PutStateOutcome::New(_) = self.state_cache + .inner .lock() .put_state(state_root, latest_block_root, state)? { diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index 4b0d1ee0160..052dc7b0e71 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -4,8 +4,10 @@ use crate::{ metrics::{self, HOT_METRIC}, }; use lru::LruCache; +use parking_lot::Mutex; use std::collections::{BTreeMap, HashMap, HashSet}; use std::num::NonZeroUsize; +use std::sync::Arc; use tracing::instrument; use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot}; @@ -37,6 +39,11 @@ pub struct SlotMap { #[derive(Debug)] pub struct StateCache { + pub inner: Mutex>, +} + +#[derive(Debug)] +pub struct StateCacheInner { finalized_state: Option>, // Stores the tuple (state_root, state) as LruCache only returns the value on put and we need // the state_root @@ -60,7 +67,7 @@ pub struct HotHDiffBufferCache { /// Cache of HDiffBuffers for states *prior* to the `finalized_state`. /// /// Maps state_root -> (slot, buffer). - hdiff_buffers: LruCache, + hdiff_buffers: LruCache>, } #[derive(Debug)] @@ -77,14 +84,57 @@ pub enum PutStateOutcome { New(Vec), } -#[allow(clippy::len_without_is_empty)] impl StateCache { pub fn new( state_capacity: NonZeroUsize, headroom: NonZeroUsize, hdiff_capacity: NonZeroUsize, ) -> Self { - StateCache { + Self { + inner: Mutex::new(StateCacheInner::new( + state_capacity, + headroom, + hdiff_capacity, + )), + } + } + + pub fn get_hdiff_buffer_by_state_root(&self, state_root: Hash256) -> Option { + let mut inner = self.inner.lock(); + + let opt_buffer = inner.hdiff_buffers.hdiff_buffers.get(&state_root).cloned(); + + if let Some((_, buffer)) = opt_buffer.as_deref() { + drop(inner); + + metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_HIT, HOT_METRIC); + let _timer = + metrics::start_timer_vec(&metrics::BEACON_HDIFF_BUFFER_CLONE_TIME, HOT_METRIC); + Some(buffer.clone()) + } else if let Some(state) = inner.get_by_state_root(state_root) { + // Drop the lock before performing the conversion from state to buffer, as this + // conversion is somewhat expensive. + drop(inner); + + metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_HIT, HOT_METRIC); + let buffer = HDiffBuffer::from_state(state); + + Some(buffer) + } else { + metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_MISS, HOT_METRIC); + None + } + } +} + +#[allow(clippy::len_without_is_empty)] +impl StateCacheInner { + pub fn new( + state_capacity: NonZeroUsize, + headroom: NonZeroUsize, + hdiff_capacity: NonZeroUsize, + ) -> Self { + StateCacheInner { finalized_state: None, states: LruCache::new(state_capacity), block_map: BlockMap::default(), @@ -143,9 +193,10 @@ impl StateCache { // at some slots in the case of long forks without finality. let new_hdiff_cache = HotHDiffBufferCache::new(self.hdiff_buffers.cap()); let old_hdiff_cache = std::mem::replace(&mut self.hdiff_buffers, new_hdiff_cache); - for (state_root, (slot, buffer)) in old_hdiff_cache.hdiff_buffers { + for (state_root, arc) in old_hdiff_cache.hdiff_buffers { + let slot = arc.0; if pre_finalized_slots_to_retain.contains(&slot) { - self.hdiff_buffers.put(state_root, slot, buffer); + self.hdiff_buffers.hdiff_buffers.put(state_root, arc); } } @@ -279,26 +330,6 @@ impl StateCache { self.hdiff_buffers.put(state_root, slot, buffer.clone()); } - pub fn get_hdiff_buffer_by_state_root(&mut self, state_root: Hash256) -> Option { - if let Some(buffer) = self.hdiff_buffers.get(&state_root) { - metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_HIT, HOT_METRIC); - let timer = - metrics::start_timer_vec(&metrics::BEACON_HDIFF_BUFFER_CLONE_TIME, HOT_METRIC); - let result = Some(buffer.clone()); - drop(timer); - return result; - } - if let Some(buffer) = self - .get_by_state_root(state_root) - .map(HDiffBuffer::from_state) - { - metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_HIT, HOT_METRIC); - return Some(buffer); - } - metrics::inc_counter_vec(&metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_MISS, HOT_METRIC); - None - } - #[instrument(skip_all, fields(?block_root, %slot), level = "debug")] pub fn get_by_block_root( &mut self, @@ -444,9 +475,7 @@ impl HotHDiffBufferCache { } pub fn get(&mut self, state_root: &Hash256) -> Option { - self.hdiff_buffers - .get(state_root) - .map(|(_, buffer)| buffer.clone()) + self.hdiff_buffers.get(state_root).map(|arc| arc.1.clone()) } /// Put a value in the cache, making room for it if necessary. @@ -455,7 +484,7 @@ impl HotHDiffBufferCache { pub fn put(&mut self, state_root: Hash256, slot: Slot, buffer: HDiffBuffer) -> bool { // If the cache is not full, simply insert the value. if self.hdiff_buffers.len() != self.hdiff_buffers.cap().get() { - self.hdiff_buffers.put(state_root, (slot, buffer)); + self.hdiff_buffers.put(state_root, Arc::new((slot, buffer))); return true; } @@ -466,28 +495,26 @@ impl HotHDiffBufferCache { // cache. This is a simplified way of retaining the snapshot in the cache. We don't need // to worry about inserting/retaining states older than the snapshot because these are // pruned on finalization and never reinserted. - let Some(min_slot) = self.hdiff_buffers.iter().map(|(_, (slot, _))| *slot).min() else { + let Some(min_slot) = self.hdiff_buffers.iter().map(|(_, arc)| arc.0).min() else { // Unreachable: cache is full so should have >0 entries. return false; }; if self.hdiff_buffers.cap().get() > 1 || slot < min_slot { // Remove LRU value. Cache is now at size `cap - 1`. - let Some((removed_state_root, (removed_slot, removed_buffer))) = - self.hdiff_buffers.pop_lru() - else { + let Some((removed_state_root, removed_arc)) = self.hdiff_buffers.pop_lru() else { // Unreachable: cache is full so should have at least one entry to pop. return false; }; // Insert new value. Cache size is now at size `cap`. - self.hdiff_buffers.put(state_root, (slot, buffer)); + self.hdiff_buffers.put(state_root, Arc::new((slot, buffer))); // If the removed value had the min slot and we didn't intend to replace it (cap=1) // then we reinsert it. + let removed_slot = removed_arc.0; if removed_slot == min_slot && slot >= min_slot { - self.hdiff_buffers - .put(removed_state_root, (removed_slot, removed_buffer)); + self.hdiff_buffers.put(removed_state_root, removed_arc); } true } else { @@ -506,9 +533,6 @@ impl HotHDiffBufferCache { } pub fn mem_usage(&self) -> usize { - self.hdiff_buffers - .iter() - .map(|(_, (_, buffer))| buffer.size()) - .sum() + self.hdiff_buffers.iter().map(|(_, arc)| arc.1.size()).sum() } }