diff --git a/Cargo.lock b/Cargo.lock index 1bd65e1721a..9986fe530a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1651,7 +1651,7 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fde0e0ec90c9dfb3b4b1a0891a7dcd0e2bffde2f7efed5fe7c9bb00e5bfb915e" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -5108,7 +5108,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -5961,8 +5961,7 @@ dependencies = [ [[package]] name = "milhouse" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bdb104e38d3a8c5ffb7e9d2c43c522e6bcc34070edbadba565e722f0dee56c7" +source = "git+https://github.com/sigp/milhouse?branch=main#6f67c0440e39438fbeecd80375feb34840a6aaca" dependencies = [ "alloy-primitives", "arbitrary", @@ -7356,7 +7355,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.100", @@ -8975,6 +8974,7 @@ dependencies = [ "logging", "lru", "metrics", + "milhouse", "parking_lot 0.12.3", "rand 0.9.0", "redb", @@ -10585,7 +10585,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8588be49c0c..81ad8b7d0c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,7 +188,7 @@ malloc_utils = { path = "common/malloc_utils" } maplit = "1" merkle_proof = { path = "consensus/merkle_proof" } metrics = { path = "common/metrics" } -milhouse = { version = "0.7", default-features = false } +milhouse = { git = "https://github.com/sigp/milhouse", branch = "main", default-features = false } mockall = "0.13" mockall_double = "0.3" mockito = "1.5.0" diff --git a/beacon_node/beacon_chain/src/chain_config.rs b/beacon_node/beacon_chain/src/chain_config.rs index d6be96afe94..9a9d646484b 100644 --- a/beacon_node/beacon_chain/src/chain_config.rs +++ b/beacon_node/beacon_chain/src/chain_config.rs @@ -10,7 +10,7 @@ pub const DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION: Epoch = Epoch::new(2); /// Default to 1/12th of the slot, which is 1 second on mainnet. pub const DEFAULT_RE_ORG_CUTOFF_DENOMINATOR: u32 = 12; pub const DEFAULT_FORK_CHOICE_BEFORE_PROPOSAL_TIMEOUT: u64 = 250; - +pub const DEFAULT_STATE_CACHE_MAX_BYTES: usize = 536870912; /// Default fraction of a slot lookahead for payload preparation (12/3 = 4 seconds on mainnet). pub const DEFAULT_PREPARE_PAYLOAD_LOOKAHEAD_FACTOR: u32 = 3; diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 386eb721a04..a9c08b0bc70 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -816,6 +816,15 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) + .arg( + Arg::new("state-cache-max-bytes") + .long("state-cache-max-bytes") + .value_name("BYTES") + .help("Specifies the maximum size of the state cache in bytes") + .default_value("536870912") + .action(ArgAction::Set) + .display_order(0) + ) /* * Execution Layer Integration */ diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 7e4b77e9aaf..c5cf5777f4c 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -377,6 +377,12 @@ pub fn get_config( .map_err(|_| "state-cache-size is not a valid integer".to_string())?; } + if let Some(max_bytes) = cli_args.get_one::("state-cache-max-bytes") { + client_config.store.max_state_cache_bytes = max_bytes + .parse() + .map_err(|_| "state-cache-max-bytes is not a valid integer".to_string())?; + } + if let Some(historic_state_cache_size) = clap_utils::parse_optional(cli_args, "historic-state-cache-size")? { diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 61a8474a731..ad3a74c693d 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -20,6 +20,7 @@ leveldb = { version = "0.8.6", optional = true, default-features = false } logging = { workspace = true } lru = { workspace = true } metrics = { workspace = true } +milhouse = { workspace = true } parking_lot = { workspace = true } redb = { version = "2.1.3", optional = true } safe_arith = { workspace = true } @@ -44,3 +45,7 @@ tempfile = { workspace = true } [[bench]] name = "hdiff" harness = false + +[[bench]] +name = "state_cache" +harness = false diff --git a/beacon_node/store/benches/state_cache.rs b/beacon_node/store/benches/state_cache.rs new file mode 100644 index 00000000000..fa434aa5954 --- /dev/null +++ b/beacon_node/store/benches/state_cache.rs @@ -0,0 +1,114 @@ +use bls::{FixedBytesExtended, Hash256, PublicKeyBytes}; +use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; +use rand::Rng; +use ssz::Decode; +use std::num::NonZeroUsize; +use store::state_cache::StateCache; + +use types::{ + BeaconState, ChainSpec, Epoch, Eth1Data, EthSpec, MainnetEthSpec as E, Slot, Validator, +}; + +fn build_state( + spec: &ChainSpec, + slot: Slot, + validator_count: usize, + rng: &mut impl Rng, +) -> BeaconState { + let genesis_time = 0; + let eth1_data = Eth1Data::default(); + let mut state = BeaconState::::new(genesis_time, eth1_data, spec); + + for _ in 0..validator_count { + append_validator(&mut state, rng); + } + + *state.slot_mut() = slot; + state.latest_block_header_mut().slot = slot; + state.apply_pending_mutations().unwrap(); + state +} + +fn append_validator(state: &mut BeaconState, mut rng: &mut impl Rng) { + state + .balances_mut() + .push(32_000_000_000 + rng.random_range(1..=1_000_000_000)) + .unwrap(); + if let Ok(inactivity_scores) = state.inactivity_scores_mut() { + inactivity_scores.push(0).unwrap(); + } + state + .validators_mut() + .push(rand_validator(&mut rng)) + .unwrap(); +} + +fn rand_validator(rng: &mut impl Rng) -> Validator { + let mut pubkey = [0u8; 48]; + rng.fill_bytes(&mut pubkey); + let withdrawal_credentials: [u8; 32] = rng.random(); + + Validator { + pubkey: PublicKeyBytes::from_ssz_bytes(&pubkey).unwrap(), + withdrawal_credentials: withdrawal_credentials.into(), + slashed: false, + effective_balance: 32_000_000_000, + activation_eligibility_epoch: Epoch::max_value(), + activation_epoch: Epoch::max_value(), + exit_epoch: Epoch::max_value(), + withdrawable_epoch: Epoch::max_value(), + } +} + +pub fn all_benches(c: &mut Criterion) { + let spec = E::default_spec(); + let mut rng = rand::rng(); + let num_states = 20; + let validator_count = 1024; + + let states: Vec<(Hash256, Hash256, BeaconState)> = (0..num_states) + .map(|i| { + let slot = Slot::new(i as u64); + let state = build_state(&spec, slot, validator_count, &mut rng); + let root = Hash256::from_low_u64_le(i as u64 + 1); + (root, root, state) + }).collect(); + + let capacity = NonZeroUsize::new(num_states).unwrap(); + let headroom = NonZeroUsize::new(1).unwrap(); + let hdiff_capacity = NonZeroUsize::new(1).unwrap(); + + c.bench_function("state_cache_insert_without_memory_limit", |b| { + b.iter_batched( + || StateCache::new(capacity, headroom, hdiff_capacity, usize::MAX), + |mut cache| { + for (state_root, block_root, state) in &states { + cache.put_state(*state_root, *block_root, state).unwrap(); + } + }, + BatchSize::SmallInput, + ); + }); + + let low_max_bytes = 1_000_000; + c.bench_function("state_cache_insert_with_memory_limit", |b| { + b.iter_batched( + || StateCache::new(capacity, headroom, hdiff_capacity, low_max_bytes), + |mut cache| { + for (state_root, block_root, state) in &states { + cache.put_state(*state_root, *block_root, state).unwrap(); + } + assert!(cache.cached_bytes() <= cache.max_cached_bytes()); + }, + BatchSize::SmallInput, + ); + }); + +} + +criterion_group!{ + name = benches; + config = Criterion::default().sample_size(10); + targets = all_benches +} +criterion_main!(benches); \ No newline at end of file diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index ad81fa6076a..f06b085e920 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -21,6 +21,7 @@ pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192; pub const DEFAULT_EPOCHS_PER_STATE_DIFF: u64 = 8; pub const DEFAULT_BLOCK_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(64); pub const DEFAULT_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(128); +pub const DEFAULT_STATE_CACHE_MAX_BYTES: usize = 512 * 1024 * 1024; pub const DEFAULT_STATE_CACHE_HEADROOM: NonZeroUsize = new_non_zero_usize(1); pub const DEFAULT_COMPRESSION_LEVEL: i32 = 1; pub const DEFAULT_HISTORIC_STATE_CACHE_SIZE: NonZeroUsize = new_non_zero_usize(1); @@ -37,6 +38,8 @@ pub struct StoreConfig { pub block_cache_size: NonZeroUsize, /// Maximum number of states to store in the in-memory state cache. pub state_cache_size: NonZeroUsize, + /// Maximum number of bytes to store in the in-memory state cache. + pub max_state_cache_bytes: usize, /// Minimum number of states to cull from the state cache upon fullness. pub state_cache_headroom: NonZeroUsize, /// Compression level for blocks, state diffs and other compressed values. @@ -107,6 +110,7 @@ impl Default for StoreConfig { Self { block_cache_size: DEFAULT_BLOCK_CACHE_SIZE, state_cache_size: DEFAULT_STATE_CACHE_SIZE, + max_state_cache_bytes: DEFAULT_STATE_CACHE_MAX_BYTES, state_cache_headroom: DEFAULT_STATE_CACHE_HEADROOM, historic_state_cache_size: DEFAULT_HISTORIC_STATE_CACHE_SIZE, cold_hdiff_buffer_cache_size: DEFAULT_COLD_HDIFF_BUFFER_CACHE_SIZE, diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 7b390b39f33..a2673176899 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -234,6 +234,7 @@ impl HotColdDB, MemoryStore> { config.state_cache_size, config.state_cache_headroom, config.hot_hdiff_buffer_cache_size, + config.max_state_cache_bytes, )), historic_state_cache: Mutex::new(HistoricStateCache::new( config.cold_hdiff_buffer_cache_size, @@ -286,6 +287,7 @@ impl HotColdDB, BeaconNodeBackend> { config.state_cache_size, config.state_cache_headroom, config.hot_hdiff_buffer_cache_size, + config.max_state_cache_bytes, )), historic_state_cache: Mutex::new(HistoricStateCache::new( config.cold_hdiff_buffer_cache_size, @@ -501,6 +503,10 @@ impl, Cold: ItemStore> HotColdDB &metrics::STORE_BEACON_STATE_CACHE_SIZE, state_cache.len() as i64, ); + metrics::set_gauge( + &metrics::STORE_BEACON_STATE_CACHE_MEMORY_SIZE, + state_cache.cached_bytes() as i64, + ); metrics::set_gauge_vec( &metrics::STORE_BEACON_HDIFF_BUFFER_CACHE_SIZE, HOT_METRIC, diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index a3d4e4a8cea..9a5d47a432c 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -19,6 +19,7 @@ pub mod historic_state_cache; pub mod hot_cold_store; mod impls; mod memory_store; +pub mod memsize; pub mod metadata; pub mod metrics; pub mod partial_beacon_state; diff --git a/beacon_node/store/src/memsize.rs b/beacon_node/store/src/memsize.rs new file mode 100644 index 00000000000..29c95ea81f5 --- /dev/null +++ b/beacon_node/store/src/memsize.rs @@ -0,0 +1,44 @@ +use crate::metrics::BEACON_STATE_MEMORY_SIZE_CALCULATION_TIME; +use milhouse::mem::{MemorySize, MemoryTracker}; +use std::time::Instant; +use types::{BeaconState, EthSpec}; + +/// BeaconState Wrapper for memory tracking. +pub struct BeaconStateWrapper<'a, E: EthSpec>(pub &'a BeaconState); + +impl<'a, E: EthSpec> MemorySize for BeaconStateWrapper<'a, E> { + fn self_pointer(&self) -> usize { + self.0 as *const _ as usize + } + + fn subtrees(&self) -> Vec<&dyn MemorySize> { + vec![] + } + + fn intrinsic_size(&self) -> usize { + std::mem::size_of::() + } +} + +/// Extension trait for approximate memory consumption of a `BeaconState`. +pub trait BeaconStateMemorySize { + fn memory_size(&self) -> usize; +} + +impl BeaconStateMemorySize for BeaconState { + fn memory_size(&self) -> usize { + let wrapper = BeaconStateWrapper(self); + // Timer for MemorySize + let timer = Instant::now(); + // Use MemoryTracker on the wrapper + let mut tracker = MemoryTracker::default(); + let stats = tracker.track_item(&wrapper); + + let elapsed_time = timer.elapsed(); + metrics::observe( + &BEACON_STATE_MEMORY_SIZE_CALCULATION_TIME, + elapsed_time.as_secs_f64(), + ); + stats.differential_size + } +} diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 93c9840586e..aeddf1f6d6f 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -269,6 +269,19 @@ pub static STORE_BEACON_STATE_CACHE_SIZE: LazyLock> = LazyLock: "Current count of items in beacon store state cache", ) }); +pub static STORE_BEACON_STATE_CACHE_MEMORY_SIZE: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "store_beacon_state_cache_memory_size", + "Memory consumed by items in the beacon store state cache", + ) +}); +pub static BEACON_STATE_MEMORY_SIZE_CALCULATION_TIME: LazyLock> = + LazyLock::new(|| { + try_create_histogram( + "beacon_state_memory_size_calculation_time", + "Time taken to calculate the memory size of a beacon state.", + ) + }); pub static STORE_BEACON_HISTORIC_STATE_CACHE_SIZE: LazyLock> = LazyLock::new(|| { try_create_int_gauge( diff --git a/beacon_node/store/src/state_cache.rs b/beacon_node/store/src/state_cache.rs index 05930c7b71e..b7369b58026 100644 --- a/beacon_node/store/src/state_cache.rs +++ b/beacon_node/store/src/state_cache.rs @@ -1,14 +1,16 @@ use crate::hdiff::HDiffBuffer; use crate::{ Error, + memsize::{BeaconStateMemorySize, BeaconStateWrapper}, metrics::{self, HOT_METRIC}, }; use lru::LruCache; +use milhouse::mem::MemoryTracker; use std::collections::{BTreeMap, HashMap, HashSet}; use std::num::NonZeroUsize; +use std::time::{Duration, Instant}; use tracing::instrument; use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, Slot}; - /// Fraction of the LRU cache to leave intact during culling. const CULL_EXEMPT_NUMERATOR: usize = 1; const CULL_EXEMPT_DENOMINATOR: usize = 10; @@ -16,7 +18,7 @@ const CULL_EXEMPT_DENOMINATOR: usize = 10; /// States that are less than or equal to this many epochs old *could* become finalized and will not /// be culled from the cache. const EPOCH_FINALIZATION_LIMIT: u64 = 4; - +const RECOMPUTE_INTERVAL: usize = 10; #[derive(Debug)] pub struct FinalizedState { state_root: Hash256, @@ -46,6 +48,9 @@ pub struct StateCache { max_epoch: Epoch, head_block_root: Hash256, headroom: NonZeroUsize, + cached_bytes: usize, + max_cached_bytes: usize, + put_count: usize, } /// Cache of hdiff buffers for hot states. @@ -83,6 +88,7 @@ impl StateCache { state_capacity: NonZeroUsize, headroom: NonZeroUsize, hdiff_capacity: NonZeroUsize, + max_cached_bytes: usize, ) -> Self { StateCache { finalized_state: None, @@ -92,9 +98,20 @@ impl StateCache { max_epoch: Epoch::new(0), head_block_root: Hash256::ZERO, headroom, + max_cached_bytes, + cached_bytes: 0, + put_count: 0, } } + pub fn max_cached_bytes(&self) -> usize { + self.max_cached_bytes + } + + pub fn cached_bytes(&self) -> usize { + self.cached_bytes + } + pub fn len(&self) -> usize { self.states.len() } @@ -251,10 +268,15 @@ impl StateCache { { deleted_states.push(deleted_state_root); } - // Record the connection from block root and slot to this state. let slot = state.slot(); self.block_map.insert(block_root, slot, state_root); + self.put_count += 1; + + if self.put_count >= RECOMPUTE_INTERVAL { + self.recompute_cached_bytes(); + self.put_count = 0; + } Ok(PutStateOutcome::New(deleted_states)) } @@ -328,7 +350,7 @@ impl StateCache { pub fn delete_block_states(&mut self, block_root: &Hash256) { if let Some(slot_map) = self.block_map.delete_block_states(block_root) { for state_root in slot_map.slots.values() { - self.states.pop(state_root); + self.delete_state(state_root); } } } @@ -391,11 +413,75 @@ impl StateCache { .collect::>(); for state_root in &state_roots_to_delete { + self.cached_bytes = self.cached_bytes.saturating_sub( + self.states + .peek(state_root) + .map_or(0, |(_, state)| state.memory_size()), + ); self.delete_state(state_root); } state_roots_to_delete } + + fn measure_cached_memory_size(&self) -> (usize, Duration) { + let mut tracker = MemoryTracker::default(); + let mut total_bytes: usize = 0; + let timer = Instant::now(); + // Use MemoryTracker on the states + // if let Some(finalized_state) = &self.finalized_state { + // total_bytes += tracker + // .track_item(&BeaconStateWrapper(&finalized_state.state)) + // .differential_size; + // } + for (_, (_, state)) in &self.states { + total_bytes += tracker + .track_item(&BeaconStateWrapper(state)) + .differential_size; + } + + let elapsed_time = timer.elapsed(); + (total_bytes, elapsed_time) + } + + fn recompute_cached_bytes(&mut self) { + let (mut total_bytes, _) = self.measure_cached_memory_size(); + self.cached_bytes = total_bytes; + + // Update metric again + metrics::set_gauge( + &metrics::STORE_BEACON_STATE_CACHE_MEMORY_SIZE, + total_bytes as i64, + ); + + let batch = self.headroom.get().clamp(5, 64); // tune batch size + + while total_bytes > self.max_cached_bytes { + // Cull the cache until we are under the max_cached_bytes limit. + let deleted_states = self.cull(batch); + if deleted_states.is_empty() { + // No more states to delete, break out of the loop. + break; + } + + // Recalculate the memory size after culling. + let (new_total_bytes, elapsed_time) = self.measure_cached_memory_size(); + total_bytes = new_total_bytes; + self.cached_bytes = total_bytes; + + // Update metric with the new size + metrics::set_gauge( + &metrics::STORE_BEACON_STATE_CACHE_MEMORY_SIZE, + total_bytes as i64, + ); + + // Log the time taken for recalculation + metrics::observe( + &metrics::BEACON_STATE_MEMORY_SIZE_CALCULATION_TIME, + elapsed_time.as_secs_f64(), + ); + } + } } impl BlockMap { diff --git a/book/src/help_bn.md b/book/src/help_bn.md index ea02b39bee6..448b070d8ac 100644 --- a/book/src/help_bn.md +++ b/book/src/help_bn.md @@ -380,6 +380,9 @@ Options: --state-cache-headroom Minimum number of states to cull from the state cache when it gets full [default: 1] + --state-cache-max-bytes + Specifies the maximum size of the state cache in bytes [default: + 536870912] --state-cache-size Specifies the size of the state cache [default: 128] --suggested-fee-recipient diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 38fd54d29dd..af0d81de7b9 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -1,8 +1,8 @@ use crate::exec::{CommandLineTestExec, CompletedTest}; use beacon_node::beacon_chain::chain_config::{ DEFAULT_RE_ORG_CUTOFF_DENOMINATOR, DEFAULT_RE_ORG_HEAD_THRESHOLD, - DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_SYNC_TOLERANCE_EPOCHS, - DisallowedReOrgOffsets, + DEFAULT_RE_ORG_MAX_EPOCHS_SINCE_FINALIZATION, DEFAULT_STATE_CACHE_MAX_BYTES, + DEFAULT_SYNC_TOLERANCE_EPOCHS, DisallowedReOrgOffsets, }; use beacon_node::{ ClientConfig as Config, beacon_chain::graffiti_calculator::GraffitiOrigin, @@ -1827,6 +1827,17 @@ fn state_cache_size_flag() { .with_config(|config| assert_eq!(config.store.state_cache_size, new_non_zero_usize(64))); } #[test] +fn state_cache_max_bytes_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| { + assert_eq!( + config.store.max_state_cache_bytes, + DEFAULT_STATE_CACHE_MAX_BYTES + ); + }); +} +#[test] fn state_cache_headroom_default() { CommandLineTest::new() .run_with_zero_port()