diff --git a/config/src/config/storage_config.rs b/config/src/config/storage_config.rs index 962740b8bf380..14b79d76e0997 100644 --- a/config/src/config/storage_config.rs +++ b/config/src/config/storage_config.rs @@ -358,7 +358,7 @@ pub struct PrunerConfig { impl Default for LedgerPrunerConfig { fn default() -> Self { LedgerPrunerConfig { - enable: true, + enable: false, prune_window: 90_000_000, batch_size: 5_000, user_pruning_window_offset: 200_000, @@ -369,7 +369,7 @@ impl Default for LedgerPrunerConfig { impl Default for StateMerklePrunerConfig { fn default() -> Self { StateMerklePrunerConfig { - enable: true, + enable: false, // This allows a block / chunk being executed to have access to a non-latest state tree. // It needs to be greater than the number of versions the state committing thread is // able to commit during the execution of the block / chunk. If the bad case indeed @@ -386,7 +386,7 @@ impl Default for StateMerklePrunerConfig { impl Default for EpochSnapshotPrunerConfig { fn default() -> Self { Self { - enable: true, + enable: false, // This is based on ~5K TPS * 2h/epoch * 2 epochs. -- epoch ending snapshots are used // by state sync in fast sync mode. // The setting is in versions, not epochs, because this makes it behave more like other diff --git a/execution/executor-types/src/execution_output.rs b/execution/executor-types/src/execution_output.rs index f5691b51891a3..b3ec979727891 100644 --- a/execution/executor-types/src/execution_output.rs +++ b/execution/executor-types/src/execution_output.rs @@ -14,13 +14,15 @@ use aptos_storage_interface::state_store::{ use aptos_types::{ contract_event::ContractEvent, epoch_state::EpochState, - state_store::hot_state::HotStateConfig, + state_store::{ + hot_state::HotStateConfig, state_key::StateKey, state_value::StateValue, NUM_STATE_SHARDS, + }, transaction::{ block_epilogue::BlockEndInfo, ExecutionStatus, Transaction, TransactionStatus, Version, }, }; use derive_more::Deref; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; #[derive(Clone, Debug, Deref)] pub struct ExecutionOutput { @@ -41,6 +43,8 @@ impl ExecutionOutput { block_end_info: Option, next_epoch_state: Option, subscribable_events: Planned>, + hot_inserted: [HashMap>; NUM_STATE_SHARDS], + hot_evicted: [HashMap>; NUM_STATE_SHARDS], ) -> Self { let next_version = first_version + to_commit.len() as Version; assert_eq!(next_version, result_state.latest().next_version()); @@ -65,6 +69,8 @@ impl ExecutionOutput { block_end_info, next_epoch_state, subscribable_events, + hot_inserted, + hot_evicted, }) } @@ -81,6 +87,8 @@ impl ExecutionOutput { block_end_info: None, next_epoch_state: None, subscribable_events: Planned::ready(vec![]), + hot_inserted: std::array::from_fn(|_| HashMap::new()), + hot_evicted: std::array::from_fn(|_| HashMap::new()), }) } @@ -99,6 +107,8 @@ impl ExecutionOutput { block_end_info: None, next_epoch_state: None, subscribable_events: Planned::ready(vec![]), + hot_inserted: std::array::from_fn(|_| HashMap::new()), + hot_evicted: std::array::from_fn(|_| HashMap::new()), }) } @@ -119,6 +129,8 @@ impl ExecutionOutput { block_end_info: None, next_epoch_state: self.next_epoch_state.clone(), subscribable_events: Planned::ready(vec![]), + hot_inserted: std::array::from_fn(|_| HashMap::new()), + hot_evicted: std::array::from_fn(|_| HashMap::new()), }) } @@ -166,6 +178,8 @@ pub struct Inner { /// state cache. pub next_epoch_state: Option, pub subscribable_events: Planned>, + pub hot_inserted: [HashMap>; NUM_STATE_SHARDS], + pub hot_evicted: [HashMap>; NUM_STATE_SHARDS], } impl Inner { diff --git a/execution/executor/src/block_executor/mod.rs b/execution/executor/src/block_executor/mod.rs index b85aaa2110945..24da011c43563 100644 --- a/execution/executor/src/block_executor/mod.rs +++ b/execution/executor/src/block_executor/mod.rs @@ -315,7 +315,14 @@ where output.set_state_checkpoint_output(DoStateCheckpoint::run( &output.execution_output, parent_block.output.ensure_result_state_summary()?, - &ProvableStateSummary::new_persisted(self.db.reader.as_ref())?, + &ProvableStateSummary::new_persisted( + self.db.reader.as_ref(), + /* is_hot = */ true, + )?, + &ProvableStateSummary::new_persisted( + self.db.reader.as_ref(), + /* is_hot = */ false, + )?, None, )?); output.set_ledger_update_output(DoLedgerUpdate::run( diff --git a/execution/executor/src/chunk_executor/mod.rs b/execution/executor/src/chunk_executor/mod.rs index 08aedbd5811fb..e094ae4db3ef8 100644 --- a/execution/executor/src/chunk_executor/mod.rs +++ b/execution/executor/src/chunk_executor/mod.rs @@ -346,7 +346,8 @@ impl ChunkExecutorInner { let state_checkpoint_output = DoStateCheckpoint::run( &output.execution_output, &parent_state_summary, - &ProvableStateSummary::new_persisted(self.db.reader.as_ref())?, + &ProvableStateSummary::new_persisted(self.db.reader.as_ref(), true)?, + &ProvableStateSummary::new_persisted(self.db.reader.as_ref(), false)?, Some( chunk_verifier .transaction_infos() diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index 4881a7975182d..764e214504126 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -51,7 +51,7 @@ use aptos_types::{ }; use aptos_vm::VMBlockExecutor; use itertools::Itertools; -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; pub struct DoGetExecutionOutput; @@ -171,7 +171,7 @@ impl DoGetExecutionOutput { } } - Parser::parse( + let parsed = Parser::parse( state_view.next_version(), transactions, transaction_outputs, @@ -182,7 +182,9 @@ impl DoGetExecutionOutput { transaction_slice_metadata .append_state_checkpoint_to_block() .is_some(), - ) + )?; + info!("Finished executing transactions"); + Ok(parsed) } pub fn by_transaction_execution_sharded( @@ -417,12 +419,18 @@ impl Parser { }, )?; - let result_state = parent_state.update_with_memorized_reads( + let (result_state, hot_inserted, hot_evicted) = parent_state.update_with_memorized_reads( base_state_view.persisted_hot_state(), base_state_view.persisted_state(), to_commit.state_update_refs(), base_state_view.memorized_reads(), ); + // for (i, shard) in hot_inserted.iter().enumerate() { + // info!("shard {}: # inserted: {}", i, shard.len()); + // } + // for (i, shard) in hot_evicted.iter().enumerate() { + // info!("shard {}: # evicted: {}", i, shard.len()); + // } let state_reads = base_state_view.into_memorized_reads(); let out = ExecutionOutput::new( @@ -437,6 +445,8 @@ impl Parser { block_end_info, next_epoch_state, Planned::place_holder(), + hot_inserted, + hot_evicted, ); let ret = out.clone(); ret.subscribable_events diff --git a/execution/executor/src/workflow/do_state_checkpoint.rs b/execution/executor/src/workflow/do_state_checkpoint.rs index 69fc9f218a98c..f13cac9eb32f5 100644 --- a/execution/executor/src/workflow/do_state_checkpoint.rs +++ b/execution/executor/src/workflow/do_state_checkpoint.rs @@ -7,6 +7,7 @@ use aptos_crypto::HashValue; use aptos_executor_types::{ execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput, }; +use aptos_logger::info; use aptos_metrics_core::TimerHelper; use aptos_storage_interface::state_store::state_summary::{ LedgerStateSummary, ProvableStateSummary, @@ -18,14 +19,19 @@ impl DoStateCheckpoint { pub fn run( execution_output: &ExecutionOutput, parent_state_summary: &LedgerStateSummary, - persisted_state_summary: &ProvableStateSummary, + hot_persisted_state_summary: &ProvableStateSummary, + cold_persisted_state_summary: &ProvableStateSummary, known_state_checkpoints: Option>>, ) -> Result { let _timer = OTHER_TIMERS.timer_with(&["do_state_checkpoint"]); + // info!("Start getting state checkpoint"); let state_summary = parent_state_summary.update( - persisted_state_summary, - execution_output.to_commit.state_update_refs(), + hot_persisted_state_summary, + cold_persisted_state_summary, + &execution_output.hot_inserted, + &execution_output.hot_evicted, + execution_output.next_version(), )?; let state_checkpoint_hashes = Self::get_state_checkpoint_hashes( @@ -34,6 +40,8 @@ impl DoStateCheckpoint { &state_summary, )?; + // info!("Finished getting state checkpoint"); + Ok(StateCheckpointOutput::new( state_summary, state_checkpoint_hashes, diff --git a/execution/executor/src/workflow/mod.rs b/execution/executor/src/workflow/mod.rs index f5ef17c90ccbe..fddb6c1393f78 100644 --- a/execution/executor/src/workflow/mod.rs +++ b/execution/executor/src/workflow/mod.rs @@ -27,7 +27,8 @@ impl ApplyExecutionOutput { let state_checkpoint_output = DoStateCheckpoint::run( &execution_output, &base_view.state_summary, - &ProvableStateSummary::new_persisted(reader)?, + &ProvableStateSummary::new_persisted(reader, true)?, + &ProvableStateSummary::new_persisted(reader, false)?, None, )?; let ledger_update_output = DoLedgerUpdate::run( diff --git a/experimental/storage/layered-map/src/layer.rs b/experimental/storage/layered-map/src/layer.rs index 67a7a1b01a0d7..24cfedac68f32 100644 --- a/experimental/storage/layered-map/src/layer.rs +++ b/experimental/storage/layered-map/src/layer.rs @@ -11,11 +11,12 @@ use aptos_crypto::HashValue; use aptos_drop_helper::ArcAsyncDrop; use aptos_infallible::Mutex; use aptos_metrics_core::IntGaugeVecHelper; -use std::{marker::PhantomData, sync::Arc}; +use std::{marker::PhantomData, sync::{Arc, Weak}}; #[derive(Debug)] struct LayerInner { peak: FlattenPerfectTree, + parent: Weak>, children: Mutex>>>, use_case: &'static str, family: HashValue, @@ -50,6 +51,7 @@ impl LayerInner { let family = HashValue::random(); Arc::new(Self { peak: FlattenPerfectTree::new_with_empty_nodes(1), + parent: Weak::new(), children: Mutex::new(Vec::new()), use_case, family, @@ -61,6 +63,7 @@ impl LayerInner { fn spawn(self: &Arc, child_peak: FlattenPerfectTree, base_layer: u64) -> Arc { let child = Arc::new(Self { peak: child_peak, + parent: Arc::downgrade(self), children: Mutex::new(Vec::new()), use_case: self.use_case, family: self.family, @@ -108,6 +111,11 @@ impl MapLayer { } } + pub(crate) fn parent(&self) -> Option { + let parent_inner = self.inner.parent.upgrade(); + parent_inner.map(Self::new) + } + pub fn into_layers_view_after(self, base_layer: MapLayer) -> LayeredMap { assert!(base_layer.is_family(&self)); assert!(base_layer.inner.layer >= self.inner.base_layer); diff --git a/experimental/storage/layered-map/src/map/mod.rs b/experimental/storage/layered-map/src/map/mod.rs index e6038aedf75f6..165d383e7a299 100644 --- a/experimental/storage/layered-map/src/map/mod.rs +++ b/experimental/storage/layered-map/src/map/mod.rs @@ -43,9 +43,13 @@ where (base_layer, top_layer) } - pub(crate) fn base_layer(&self) -> u64 { + pub fn base_layer(&self) -> u64 { self.base_layer.layer() } + + pub fn top_layer(&self) -> u64 { + self.top_layer.layer() + } } impl LayeredMap @@ -114,4 +118,21 @@ where .into_feet_iter() .flat_map(|node| DescendantIterator::new(node, self.base_layer())) } + + pub fn inner_maps(&self) -> Vec { + let mut ret = Vec::new(); + + let mut current_layer = self.top_layer.clone(); + let mut current_layer_num = self.top_layer.layer(); + let lowest_layer_num = self.base_layer.layer(); + while current_layer_num > lowest_layer_num { + let next_high = current_layer.parent().expect("the next one must exist."); + ret.push(Self::new(next_high.clone(), current_layer)); + current_layer = next_high; + current_layer_num = current_layer.layer(); + } + ret.reverse(); + + ret + } } diff --git a/storage/aptosdb/src/db/aptosdb_reader.rs b/storage/aptosdb/src/db/aptosdb_reader.rs index 9ec96fd5634f7..cf5185a7fc620 100644 --- a/storage/aptosdb/src/db/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/aptosdb_reader.rs @@ -660,12 +660,13 @@ impl DbReader for AptosDB { key_hash: &HashValue, version: Version, root_depth: usize, + is_hot: bool, ) -> Result { gauged_api("get_state_proof_by_version_ext", || { self.error_if_state_merkle_pruned("State merkle", version)?; self.state_store - .get_state_proof_by_version_ext(key_hash, version, root_depth) + .get_state_proof_by_version_ext(key_hash, version, root_depth, is_hot) }) } @@ -674,12 +675,13 @@ impl DbReader for AptosDB { key_hash: &HashValue, version: Version, root_depth: usize, + is_hot: bool, ) -> Result<(Option, SparseMerkleProofExt)> { gauged_api("get_state_value_with_proof_by_version_ext", || { self.error_if_state_merkle_pruned("State merkle", version)?; self.state_store - .get_state_value_with_proof_by_version_ext(key_hash, version, root_depth) + .get_state_value_with_proof_by_version_ext(key_hash, version, root_depth, is_hot) }) } diff --git a/storage/aptosdb/src/db/aptosdb_testonly.rs b/storage/aptosdb/src/db/aptosdb_testonly.rs index fe848223318c7..9171c822495b2 100644 --- a/storage/aptosdb/src/db/aptosdb_testonly.rs +++ b/storage/aptosdb/src/db/aptosdb_testonly.rs @@ -130,67 +130,68 @@ impl AptosDB { ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>, sync_commit: bool, ) -> Result<()> { - let (transactions, transaction_outputs, transaction_infos) = - Self::disassemble_txns_to_commit(txns_to_commit); - // Keep auxiliary info consistent with what was used to create TransactionInfo - // Use block-relative indices to match what was used during TransactionInfo creation - let persisted_auxiliary_infos = txns_to_commit - .iter() - .map( - |txn_to_commit| match txn_to_commit.transaction_info.auxiliary_info_hash() { - Some(hash) => { - for i in 0..100 { - if hash - == CryptoHash::hash(&PersistedAuxiliaryInfo::V1 { - transaction_index: i as u32, - }) - { - return PersistedAuxiliaryInfo::V1 { - transaction_index: i as u32, - }; - } - } - panic!("Hash not found"); - }, - None => PersistedAuxiliaryInfo::None, - }, - ) - .collect(); - let transactions_to_keep = TransactionsToKeep::make( - first_version, - transactions, - transaction_outputs, - persisted_auxiliary_infos, - ); + unimplemented!(); + // let (transactions, transaction_outputs, transaction_infos) = + // Self::disassemble_txns_to_commit(txns_to_commit); + // // Keep auxiliary info consistent with what was used to create TransactionInfo + // // Use block-relative indices to match what was used during TransactionInfo creation + // let persisted_auxiliary_infos = txns_to_commit + // .iter() + // .map( + // |txn_to_commit| match txn_to_commit.transaction_info.auxiliary_info_hash() { + // Some(hash) => { + // for i in 0..100 { + // if hash + // == CryptoHash::hash(&PersistedAuxiliaryInfo::V1 { + // transaction_index: i as u32, + // }) + // { + // return PersistedAuxiliaryInfo::V1 { + // transaction_index: i as u32, + // }; + // } + // } + // panic!("Hash not found"); + // }, + // None => PersistedAuxiliaryInfo::None, + // }, + // ) + // .collect(); + // let transactions_to_keep = TransactionsToKeep::make( + // first_version, + // transactions, + // transaction_outputs, + // persisted_auxiliary_infos, + // ); - let current = self.state_store.current_state_locked().clone(); - let (hot_state, persisted_state) = self.state_store.get_persisted_state()?; - let (new_state, reads) = current.ledger_state().update_with_db_reader( - &persisted_state, - hot_state, - transactions_to_keep.state_update_refs(), - self.state_store.clone(), - )?; - let persisted_summary = self.state_store.get_persisted_state_summary()?; - let new_state_summary = current.ledger_state_summary().update( - &ProvableStateSummary::new(persisted_summary, self), - transactions_to_keep.state_update_refs(), - )?; + // let current = self.state_store.current_state_locked().clone(); + // let (hot_state, persisted_state) = self.state_store.get_persisted_state()?; + // let (new_state, reads) = current.ledger_state().update_with_db_reader( + // &persisted_state, + // hot_state, + // transactions_to_keep.state_update_refs(), + // self.state_store.clone(), + // )?; + // let persisted_summary = self.state_store.get_persisted_state_summary()?; + // let new_state_summary = current.ledger_state_summary().update( + // &ProvableStateSummary::new(persisted_summary, self), + // transactions_to_keep.state_update_refs(), + // )?; - let chunk = ChunkToCommit { - first_version, - transactions: &transactions_to_keep.transactions, - persisted_auxiliary_infos: &transactions_to_keep.persisted_auxiliary_infos, - transaction_outputs: &transactions_to_keep.transaction_outputs, - transaction_infos: &transaction_infos, - state: &new_state, - state_summary: &new_state_summary, - state_update_refs: transactions_to_keep.state_update_refs(), - state_reads: &reads, - is_reconfig: transactions_to_keep.is_reconfig(), - }; + // let chunk = ChunkToCommit { + // first_version, + // transactions: &transactions_to_keep.transactions, + // persisted_auxiliary_infos: &transactions_to_keep.persisted_auxiliary_infos, + // transaction_outputs: &transactions_to_keep.transaction_outputs, + // transaction_infos: &transaction_infos, + // state: &new_state, + // state_summary: &new_state_summary, + // state_update_refs: transactions_to_keep.state_update_refs(), + // state_reads: &reads, + // is_reconfig: transactions_to_keep.is_reconfig(), + // }; - self.save_transactions(chunk, ledger_info_with_sigs, sync_commit) + // self.save_transactions(chunk, ledger_info_with_sigs, sync_commit) } fn disassemble_txns_to_commit( diff --git a/storage/aptosdb/src/db/test_helper.rs b/storage/aptosdb/src/db/test_helper.rs index 40cbf73ef8de5..45e42fa9ab4c4 100644 --- a/storage/aptosdb/src/db/test_helper.rs +++ b/storage/aptosdb/src/db/test_helper.rs @@ -432,7 +432,7 @@ fn verify_snapshots( ); for (state_key, state_value) in &updates { let (state_value_in_db, proof) = db - .get_state_value_with_proof_by_version(state_key, snapshot_version) + .get_state_value_with_proof_by_version(state_key, snapshot_version, false) .unwrap(); assert_eq!(state_value_in_db.as_ref(), state_value.as_ref()); proof diff --git a/storage/aptosdb/src/schema/hot_jellyfish_merkle_node/mod.rs b/storage/aptosdb/src/schema/hot_jellyfish_merkle_node/mod.rs new file mode 100644 index 0000000000000..308da6d87f063 --- /dev/null +++ b/storage/aptosdb/src/schema/hot_jellyfish_merkle_node/mod.rs @@ -0,0 +1,61 @@ +// Copyright (c) Aptos Foundation +// Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE + +//! This module defines physical storage schema for nodes in the state Jellyfish Merkle Tree. +//! Node is identified by [NodeKey](jellyfish-merkle::node_type::NodeKey). +//! ```text +//! |<----key--->|<-----value----->| +//! | node_key | serialized_node | +//! ``` + +use crate::schema::JELLYFISH_MERKLE_NODE_CF_NAME; +use anyhow::Result; +use aptos_jellyfish_merkle::node_type::NodeKey; +use aptos_schemadb::{ + define_schema, + schema::{KeyCodec, SeekKeyCodec, ValueCodec}, +}; +use aptos_types::{state_store::state_key::StateKey, transaction::Version}; +use byteorder::{BigEndian, WriteBytesExt}; +use std::mem::size_of; + +type Node = aptos_jellyfish_merkle::node_type::Node; + +define_schema!( + HotJellyfishMerkleNodeSchema, + NodeKey, + Node, + JELLYFISH_MERKLE_NODE_CF_NAME +); + +impl KeyCodec for NodeKey { + fn encode_key(&self) -> Result> { + self.encode() + } + + fn decode_key(data: &[u8]) -> Result { + Self::decode(data) + } +} + +impl ValueCodec for Node { + fn encode_value(&self) -> Result> { + self.encode() + } + + fn decode_value(data: &[u8]) -> Result { + Self::decode(data) + } +} + +impl SeekKeyCodec for (Version, u8) { + fn encode_seek_key(&self) -> Result> { + let mut out = Vec::with_capacity(size_of::() + size_of::()); + out.write_u64::(self.0)?; + out.write_u8(self.1)?; + Ok(out) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/aptosdb/src/schema/hot_jellyfish_merkle_node/test.rs b/storage/aptosdb/src/schema/hot_jellyfish_merkle_node/test.rs new file mode 100644 index 0000000000000..0d1036159a474 --- /dev/null +++ b/storage/aptosdb/src/schema/hot_jellyfish_merkle_node/test.rs @@ -0,0 +1,26 @@ +// Copyright (c) Aptos Foundation +// Licensed pursuant to the Innovation-Enabling Source Code License, available at https://github.com/aptos-labs/aptos-core/blob/main/LICENSE + +use super::*; +use aptos_crypto::HashValue; +use aptos_jellyfish_merkle::node_type::Node; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_hot_jellyfish_merkle_node_schema( + node_key in any::(), + account_key in any::(), + value_hash in any::(), + state_key in any::(), + version in any::() + ) { + assert_encode_decode::( + &node_key, + &Node::new_leaf(account_key, value_hash, (state_key, version)), + ); + } +} + +test_no_panic_decoding!(HotJellyfishMerkleNodeSchema); diff --git a/storage/aptosdb/src/schema/mod.rs b/storage/aptosdb/src/schema/mod.rs index b770496ee632f..e04abd178f036 100644 --- a/storage/aptosdb/src/schema/mod.rs +++ b/storage/aptosdb/src/schema/mod.rs @@ -12,6 +12,7 @@ pub(crate) mod db_metadata; pub(crate) mod epoch_by_version; pub(crate) mod event; pub(crate) mod event_accumulator; +pub(crate) mod hot_jellyfish_merkle_node; pub(crate) mod hot_state_value_by_key_hash; pub(crate) mod jellyfish_merkle_node; pub(crate) mod ledger_info; @@ -43,6 +44,7 @@ pub const EVENT_ACCUMULATOR_CF_NAME: ColumnFamilyName = "event_accumulator"; pub const EVENT_BY_KEY_CF_NAME: ColumnFamilyName = "event_by_key"; pub const EVENT_BY_VERSION_CF_NAME: ColumnFamilyName = "event_by_version"; pub const EVENT_CF_NAME: ColumnFamilyName = "event"; +// pub const HOT_JELLYFISH_MERKLE_NODE_CF_NAME: ColumnFamilyName = "hot_jellyfish_merkle_node"; pub const HOT_STATE_VALUE_BY_KEY_HASH_CF_NAME: ColumnFamilyName = "hot_state_value_by_key_hash"; pub const JELLYFISH_MERKLE_NODE_CF_NAME: ColumnFamilyName = "jellyfish_merkle_node"; pub const LEDGER_INFO_CF_NAME: ColumnFamilyName = "ledger_info"; diff --git a/storage/aptosdb/src/state_merkle_db.rs b/storage/aptosdb/src/state_merkle_db.rs index c39ac88128d57..b39b51687b2d6 100644 --- a/storage/aptosdb/src/state_merkle_db.rs +++ b/storage/aptosdb/src/state_merkle_db.rs @@ -7,6 +7,7 @@ use crate::{ metrics::{NODE_CACHE_SECONDS, OTHER_TIMERS_SECONDS}, schema::{ db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, + hot_jellyfish_merkle_node::HotJellyfishMerkleNodeSchema, jellyfish_merkle_node::JellyfishMerkleNodeSchema, stale_node_index::StaleNodeIndexSchema, stale_node_index_cross_epoch::StaleNodeIndexCrossEpochSchema, @@ -49,6 +50,7 @@ use std::{ pub const STATE_MERKLE_DB_FOLDER_NAME: &str = "state_merkle_db"; pub const STATE_MERKLE_DB_NAME: &str = "state_merkle_db"; pub const STATE_MERKLE_METADATA_DB_NAME: &str = "state_merkle_metadata_db"; +pub const HOT_STATE_MERKLE_METADATA_DB_NAME: &str = "state_merkle_metadata_db"; pub(crate) type LeafNode = aptos_jellyfish_merkle::node_type::LeafNode; pub(crate) type Node = aptos_jellyfish_merkle::node_type::Node; @@ -58,8 +60,10 @@ type NodeBatch = aptos_jellyfish_merkle::NodeBatch; pub struct StateMerkleDb { // Stores metadata and top levels (non-sharded part) of tree nodes. state_merkle_metadata_db: Arc, + hot_state_merkle_metadata_db: Arc, // Stores sharded part of tree nodes. state_merkle_db_shards: [Arc; NUM_STATE_SHARDS], + hot_state_merkle_db_shards: [Arc; NUM_STATE_SHARDS], enable_sharding: bool, // shard_id -> cache. version_caches: HashMap, VersionedNodeCache>, @@ -67,6 +71,11 @@ pub struct StateMerkleDb { lru_cache: Option, } +struct HotDBWrapper { + metadata_db: Arc, + shards: [Arc; NUM_STATE_SHARDS], +} + impl StateMerkleDb { pub(crate) fn new( db_paths: &StorageDirPaths, @@ -76,8 +85,11 @@ impl StateMerkleDb { readonly: bool, // TODO(grao): Currently when this value is set to 0 we disable both caches. This is // hacky, need to revisit. - max_nodes_per_lru_cache_shard: usize, + mut max_nodes_per_lru_cache_shard: usize, ) -> Result { + // Disable caches for now. + max_nodes_per_lru_cache_shard = 0; + let sharding = rocksdb_configs.enable_storage_sharding; let state_merkle_db_config = rocksdb_configs.state_merkle_db_config; @@ -89,23 +101,7 @@ impl StateMerkleDb { let lru_cache = NonZeroUsize::new(max_nodes_per_lru_cache_shard).map(LruNodeCache::new); if !sharding { - info!("Sharded state merkle DB is not enabled!"); - let state_merkle_db_path = db_paths.default_root_path().join(STATE_MERKLE_DB_NAME); - let db = Arc::new(Self::open_db( - state_merkle_db_path, - STATE_MERKLE_DB_NAME, - &state_merkle_db_config, - env, - block_cache, - readonly, - )?); - return Ok(Self { - state_merkle_metadata_db: Arc::clone(&db), - state_merkle_db_shards: arr![Arc::clone(&db); 16], - enable_sharding: false, - version_caches, - lru_cache, - }); + panic!("Sharded state merkle DB is not enabled!"); } Self::open( @@ -124,6 +120,7 @@ impl StateMerkleDb { version: Version, top_levels_batch: impl IntoRawBatch, batches_for_shards: Vec, + is_hot: bool, ) -> Result<()> { ensure!( batches_for_shards.len() == NUM_STATE_SHARDS, @@ -134,15 +131,18 @@ impl StateMerkleDb { .into_par_iter() .enumerate() .for_each(|(shard_id, batch)| { - self.db_shard(shard_id) - .write_schemas(batch) - .unwrap_or_else(|err| { - panic!("Failed to commit state merkle shard {shard_id}: {err}") - }); + let db = if is_hot { + self.hot_db_shard(shard_id) + } else { + self.db_shard(shard_id) + }; + db.write_schemas(batch).unwrap_or_else(|err| { + panic!("Failed to commit state merkle shard {shard_id}: {err}") + }); }) }); - self.commit_top_levels(version, top_levels_batch) + self.commit_top_levels(version, top_levels_batch, is_hot) } /// Only used by fast sync / restore. @@ -191,15 +191,32 @@ impl StateMerkleDb { std::fs::create_dir_all(&cp_state_merkle_db_path).unwrap_or(()); } + let mut metadata_db_path = Self::metadata_db_path(cp_root_path.as_ref(), sharding); state_merkle_db .metadata_db() - .create_checkpoint(Self::metadata_db_path(cp_root_path.as_ref(), sharding))?; + .create_checkpoint(metadata_db_path.clone())?; + + metadata_db_path.set_file_name("hot_metadata"); + state_merkle_db + .hot_metadata_db() + .create_checkpoint(metadata_db_path)?; if sharding { for shard_id in 0..NUM_STATE_SHARDS { state_merkle_db .db_shard(shard_id) - .create_checkpoint(Self::db_shard_path(cp_root_path.as_ref(), shard_id))?; + .create_checkpoint(Self::db_shard_path( + cp_root_path.as_ref(), + shard_id, + /* is_hot = */ false, + ))?; + state_merkle_db + .hot_db_shard(shard_id) + .create_checkpoint(Self::db_shard_path( + cp_root_path.as_ref(), + shard_id, + /* is_hot = */ true, + ))?; } } @@ -210,6 +227,10 @@ impl StateMerkleDb { &self.state_merkle_metadata_db } + fn hot_metadata_db(&self) -> &DB { + &self.hot_state_merkle_metadata_db + } + pub(crate) fn metadata_db_arc(&self) -> Arc { Arc::clone(&self.state_merkle_metadata_db) } @@ -218,6 +239,10 @@ impl StateMerkleDb { &self.state_merkle_db_shards[shard_id] } + pub(crate) fn hot_db_shard(&self, shard_id: usize) -> &DB { + &self.hot_state_merkle_db_shards[shard_id] + } + pub(crate) fn db_shard_arc(&self, shard_id: usize) -> Arc { Arc::clone(&self.state_merkle_db_shards[shard_id]) } @@ -234,9 +259,14 @@ impl StateMerkleDb { &self, version: Version, batch: impl IntoRawBatch, + is_hot: bool, ) -> Result<()> { info!(version = version, "Committing StateMerkleDb."); - self.state_merkle_metadata_db.write_schemas(batch) + if is_hot { + self.hot_state_merkle_metadata_db.write_schemas(batch) + } else { + self.state_merkle_metadata_db.write_schemas(batch) + } } pub fn get_with_proof_ext( @@ -244,11 +274,20 @@ impl StateMerkleDb { key: &HashValue, version: Version, root_depth: usize, + is_hot: bool, ) -> Result<( Option<(HashValue, (StateKey, Version))>, SparseMerkleProofExt, )> { - JellyfishMerkleTree::new(self).get_with_proof_ext(key, version, root_depth) + if is_hot { + let wrapper = HotDBWrapper { + metadata_db: self.hot_state_merkle_metadata_db.clone(), + shards: self.hot_state_merkle_db_shards.clone(), + }; + JellyfishMerkleTree::new(&wrapper).get_with_proof_ext(key, version, root_depth) + } else { + JellyfishMerkleTree::new(self).get_with_proof_ext(key, version, root_depth) + } } pub fn get_range_proof( @@ -263,6 +302,14 @@ impl StateMerkleDb { JellyfishMerkleTree::new(self).get_root_hash(version) } + pub fn get_hot_root_hash(&self, version: Version) -> Result { + let wrapper = HotDBWrapper { + metadata_db: self.hot_state_merkle_metadata_db.clone(), + shards: self.hot_state_merkle_db_shards.clone(), + }; + JellyfishMerkleTree::new(&wrapper).get_root_hash(version) + } + pub fn get_leaf_count(&self, version: Version) -> Result { JellyfishMerkleTree::new(self).get_leaf_count(version) } @@ -284,6 +331,26 @@ impl StateMerkleDb { ) } + pub fn batch_put_hot_value_set_for_shard( + &self, + shard_id: usize, + value_set: Vec<(HashValue, Option<&(HashValue, StateKey)>)>, + persisted_version: Option, + version: Version, + ) -> Result<(Node, TreeUpdateBatch)> { + let wrapper = HotDBWrapper { + metadata_db: self.hot_state_merkle_metadata_db.clone(), + shards: self.hot_state_merkle_db_shards.clone(), + }; + JellyfishMerkleTree::new(&wrapper).batch_put_value_set_for_shard( + shard_id as u8, + value_set, + /* node_hashes = */ None, + persisted_version, + version, + ) + } + pub fn get_state_snapshot_version_before( &self, next_version: Version, @@ -427,6 +494,7 @@ impl StateMerkleDb { base_version: Option, shard_persisted_version: Option, previous_epoch_ending_version: Option, + is_hot: bool, ) -> Result<(Node, RawBatch)> { if let Some(shard_persisted_version) = shard_persisted_version { assert!(shard_persisted_version <= base_version.expect("Must have base version.")); @@ -435,16 +503,25 @@ impl StateMerkleDb { let (shard_root_node, tree_update_batch) = { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["jmt_update"]); - self.batch_put_value_set_for_shard( - shard_id, - value_set, - node_hashes, - shard_persisted_version, - version, - ) + if !is_hot { + self.batch_put_value_set_for_shard( + shard_id, + value_set, + node_hashes, + shard_persisted_version, + version, + ) + } else { + self.batch_put_hot_value_set_for_shard( + shard_id, + value_set, + shard_persisted_version, + version, + ) + } }?; - if self.cache_enabled() { + if self.cache_enabled() && !is_hot { self.version_caches .get(&Some(shard_id)) .unwrap() @@ -513,6 +590,17 @@ impl StateMerkleDb { JellyfishMerkleTree::new(self).get_shard_persisted_versions(root_persisted_version) } + pub(crate) fn get_hot_shard_persisted_versions( + &self, + root_persisted_version: Option, + ) -> Result<[Option; NUM_STATE_SHARDS]> { + let wrapper = HotDBWrapper { + metadata_db: self.hot_state_merkle_metadata_db.clone(), + shards: self.hot_state_merkle_db_shards.clone(), + }; + JellyfishMerkleTree::new(&wrapper).get_shard_persisted_versions(root_persisted_version) + } + pub(crate) fn sharding_enabled(&self) -> bool { self.enable_sharding } @@ -581,6 +669,17 @@ impl StateMerkleDb { readonly, )?); + let mut hot_state_merkle_metadata_db_path = state_merkle_metadata_db_path.clone(); + hot_state_merkle_metadata_db_path.set_file_name("hot_metadata"); + let hot_state_merkle_metadata_db = Arc::new(Self::open_db( + hot_state_merkle_metadata_db_path, + HOT_STATE_MERKLE_METADATA_DB_NAME, + &state_merkle_db_config, + env, + block_cache, + readonly, + )?); + info!( state_merkle_metadata_db_path = state_merkle_metadata_db_path, "Opened state merkle metadata db!" @@ -597,6 +696,29 @@ impl StateMerkleDb { env, block_cache, readonly, + /* is_hot = */ false, + ) + .unwrap_or_else(|e| { + panic!("Failed to open state merkle db shard {shard_id}: {e:?}.") + }); + Arc::new(db) + }) + .collect::>() + .try_into() + .unwrap(); + + let hot_state_merkle_db_shards = (0..NUM_STATE_SHARDS) + .into_par_iter() + .map(|shard_id| { + let shard_root_path = db_paths.state_merkle_db_shard_root_path(shard_id); + let db = Self::open_shard( + shard_root_path, + shard_id, + &state_merkle_db_config, + env, + block_cache, + readonly, + /* is_hot = */ true, ) .unwrap_or_else(|e| { panic!("Failed to open state merkle db shard {shard_id}: {e:?}.") @@ -609,7 +731,9 @@ impl StateMerkleDb { let state_merkle_db = Self { state_merkle_metadata_db, + hot_state_merkle_metadata_db, state_merkle_db_shards, + hot_state_merkle_db_shards, enable_sharding: true, version_caches, lru_cache, @@ -636,10 +760,15 @@ impl StateMerkleDb { env: Option<&Env>, block_cache: Option<&Cache>, readonly: bool, + is_hot: bool, ) -> Result { - let db_name = format!("state_merkle_db_shard_{}", shard_id); + let db_name = if is_hot { + format!("hot_state_merkle_db_shard_{}", shard_id) + } else { + format!("state_merkle_db_shard_{}", shard_id) + }; Self::open_db( - Self::db_shard_path(db_root_path, shard_id), + Self::db_shard_path(db_root_path, shard_id, is_hot), &db_name, state_merkle_db_config, env, @@ -673,8 +802,12 @@ impl StateMerkleDb { }) } - fn db_shard_path>(db_root_path: P, shard_id: usize) -> PathBuf { - let shard_sub_path = format!("shard_{}", shard_id); + fn db_shard_path>(db_root_path: P, shard_id: usize, is_hot: bool) -> PathBuf { + let shard_sub_path = format!( + "{}_{}", + if is_hot { "hot_shard" } else { "shard" }, + shard_id + ); db_root_path .as_ref() .join(STATE_MERKLE_DB_FOLDER_NAME) @@ -787,6 +920,22 @@ impl StateMerkleDb { } } +impl TreeReader for HotDBWrapper { + fn get_node_option(&self, node_key: &NodeKey, _tag: &str) -> Result> { + let db = if let Some(shard_id) = node_key.get_shard_id() { + &self.shards[shard_id] + } else { + &self.metadata_db + }; + let node_opt = db.get::(node_key)?; + Ok(node_opt) + } + + fn get_rightmost_leaf(&self, _version: Version) -> Result> { + unimplemented!() + } +} + impl TreeReader for StateMerkleDb { fn get_node_option(&self, node_key: &NodeKey, tag: &str) -> Result> { let start_time = Instant::now(); diff --git a/storage/aptosdb/src/state_store/buffered_state.rs b/storage/aptosdb/src/state_store/buffered_state.rs index 0821fad312a82..11e1698702ab6 100644 --- a/storage/aptosdb/src/state_store/buffered_state.rs +++ b/storage/aptosdb/src/state_store/buffered_state.rs @@ -104,6 +104,7 @@ impl BufferedState { || self.buffered_versions() >= TARGET_SNAPSHOT_INTERVAL_IN_VERSION) { self.enqueue_commit(checkpoint); + // self.drain_commits(); } } diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index d11c796248503..69c878e6aa10b 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -180,10 +180,11 @@ impl DbReader for StateDb { key_hash: &HashValue, version: Version, root_depth: usize, + is_hot: bool, ) -> Result { let (_, proof) = self .state_merkle_db - .get_with_proof_ext(key_hash, version, root_depth)?; + .get_with_proof_ext(key_hash, version, root_depth, is_hot)?; Ok(proof) } @@ -193,10 +194,11 @@ impl DbReader for StateDb { key_hash: &HashValue, version: Version, root_depth: usize, + is_hot: bool, ) -> Result<(Option, SparseMerkleProofExt)> { let (leaf_data, proof) = self .state_merkle_db - .get_with_proof_ext(key_hash, version, root_depth)?; + .get_with_proof_ext(key_hash, version, root_depth, is_hot)?; Ok(( match leaf_data { Some((_val_hash, (key, ver))) => Some(self.expect_value_by_version(&key, ver)?), @@ -264,9 +266,10 @@ impl DbReader for StateStore { key_hash: &HashValue, version: Version, root_depth: usize, + is_hot: bool, ) -> Result { self.deref() - .get_state_proof_by_version_ext(key_hash, version, root_depth) + .get_state_proof_by_version_ext(key_hash, version, root_depth, is_hot) } /// Get the state value with proof extension given the state key and version @@ -275,9 +278,10 @@ impl DbReader for StateStore { key_hash: &HashValue, version: Version, root_depth: usize, + is_hot: bool, ) -> Result<(Option, SparseMerkleProofExt)> { self.deref() - .get_state_value_with_proof_by_version_ext(key_hash, version, root_depth) + .get_state_value_with_proof_by_version_ext(key_hash, version, root_depth, is_hot) } } @@ -531,18 +535,32 @@ impl StateStore { latest_snapshot_version = latest_snapshot_version, "Initializing BufferedState." ); - let latest_snapshot_root_hash = if let Some(version) = latest_snapshot_version { - state_db - .state_merkle_db - .get_root_hash(version) - .expect("Failed to query latest checkpoint root hash on initialization.") - } else { - *SPARSE_MERKLE_PLACEHOLDER_HASH - }; + let (latest_snapshot_root_hash, latest_snapshot_hot_root_hash) = + if let Some(version) = latest_snapshot_version { + info!("version: {}", version); + let root_hash = state_db + .state_merkle_db + .get_root_hash(version) + .expect("Failed to query latest checkpoint root hash on initialization."); + let hot_root_hash = state_db + .state_merkle_db + .get_hot_root_hash(version) + .expect("Failed"); + (root_hash, hot_root_hash) + } else { + ( + *SPARSE_MERKLE_PLACEHOLDER_HASH, + *SPARSE_MERKLE_PLACEHOLDER_HASH, + ) + }; + info!( + "Got root hash from DB: {}, hot root hash: {}", + latest_snapshot_root_hash, latest_snapshot_hot_root_hash + ); let usage = state_db.get_state_storage_usage(latest_snapshot_version)?; let state = StateWithSummary::new_at_version( latest_snapshot_version, - *SPARSE_MERKLE_PLACEHOLDER_HASH, // TODO(HotState): for now hot state always starts from empty upon restart. + latest_snapshot_hot_root_hash, latest_snapshot_root_hash, usage, HotStateConfig::default(), @@ -574,56 +592,57 @@ impl StateStore { // Replaying the committed write sets after the latest snapshot. if snapshot_next_version < num_transactions { - if check_max_versions_after_snapshot { - ensure!( - num_transactions - snapshot_next_version <= MAX_WRITE_SETS_AFTER_SNAPSHOT, - "Too many versions after state snapshot. snapshot_next_version: {}, num_transactions: {}", - snapshot_next_version, - num_transactions, - ); - } - let write_sets = state_db - .ledger_db - .write_set_db() - .get_write_sets(snapshot_next_version, num_transactions)?; - let txn_info_iter = state_db - .ledger_db - .transaction_info_db() - .get_transaction_info_iter(snapshot_next_version, write_sets.len())?; - let all_checkpoint_indices = txn_info_iter - .into_iter() - .collect::>>()? - .into_iter() - .positions(|txn_info| txn_info.has_state_checkpoint_hash()) - .collect(); - - let state_update_refs = StateUpdateRefs::index_write_sets( - state.next_version(), - &write_sets, - write_sets.len(), - all_checkpoint_indices, - ); - let current_state = out_current_state.lock().clone(); - let (hot_state, state) = out_persisted_state.get_state(); - let (new_state, _state_reads) = current_state.ledger_state().update_with_db_reader( - &state, - hot_state, - &state_update_refs, - state_db.clone(), - )?; - let state_summary = out_persisted_state.get_state_summary(); - let new_state_summary = current_state.ledger_state_summary().update( - &ProvableStateSummary::new(state_summary, state_db.as_ref()), - &state_update_refs, - )?; - let updated = - LedgerStateWithSummary::from_state_and_summary(new_state, new_state_summary); - - // synchronously commit the snapshot at the last checkpoint here if not committed to disk yet. - buffered_state.update( - updated, 0, /* estimated_items, doesn't matter since we sync-commit */ - true, /* sync_commit */ - )?; + panic!("not implemented right now"); + // if check_max_versions_after_snapshot { + // ensure!( + // num_transactions - snapshot_next_version <= MAX_WRITE_SETS_AFTER_SNAPSHOT, + // "Too many versions after state snapshot. snapshot_next_version: {}, num_transactions: {}", + // snapshot_next_version, + // num_transactions, + // ); + // } + // let write_sets = state_db + // .ledger_db + // .write_set_db() + // .get_write_sets(snapshot_next_version, num_transactions)?; + // let txn_info_iter = state_db + // .ledger_db + // .transaction_info_db() + // .get_transaction_info_iter(snapshot_next_version, write_sets.len())?; + // let all_checkpoint_indices = txn_info_iter + // .into_iter() + // .collect::>>()? + // .into_iter() + // .positions(|txn_info| txn_info.has_state_checkpoint_hash()) + // .collect(); + + // let state_update_refs = StateUpdateRefs::index_write_sets( + // state.next_version(), + // &write_sets, + // write_sets.len(), + // all_checkpoint_indices, + // ); + // let current_state = out_current_state.lock().clone(); + // let (hot_state, state) = out_persisted_state.get_state(); + // let (new_state, _state_reads) = current_state.ledger_state().update_with_db_reader( + // &state, + // hot_state, + // &state_update_refs, + // state_db.clone(), + // )?; + // let state_summary = out_persisted_state.get_state_summary(); + // let new_state_summary = current_state.ledger_state_summary().update( + // &ProvableStateSummary::new(state_summary, state_db.as_ref()), + // &state_update_refs, + // )?; + // let updated = + // LedgerStateWithSummary::from_state_and_summary(new_state, new_state_summary); + + // // synchronously commit the snapshot at the last checkpoint here if not committed to disk yet. + // buffered_state.update( + // updated, 0, /* estimated_items, doesn't matter since we sync-commit */ + // true, /* sync_commit */ + // )?; } let current_state = out_current_state.lock().clone(); diff --git a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs index 5211b92d2f05b..8fd6a8f104fef 100644 --- a/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs +++ b/storage/aptosdb/src/state_store/state_merkle_batch_committer.rs @@ -25,14 +25,14 @@ pub struct StateMerkleBatch { pub(crate) struct StateMerkleBatchCommitter { state_db: Arc, - state_merkle_batch_receiver: Receiver>, + state_merkle_batch_receiver: Receiver>, persisted_state: PersistedState, } impl StateMerkleBatchCommitter { pub fn new( state_db: Arc, - state_merkle_batch_receiver: Receiver>, + state_merkle_batch_receiver: Receiver>, persisted_state: PersistedState, ) -> Self { Self { @@ -46,12 +46,17 @@ impl StateMerkleBatchCommitter { while let Ok(msg) = self.state_merkle_batch_receiver.recv() { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["batch_committer_work"]); match msg { - CommitMessage::Data(state_merkle_batch) => { + CommitMessage::Data((hot_state_merkle_batch, cold_state_merkle_batch)) => { let StateMerkleBatch { - top_levels_batch, - batches_for_shards, + top_levels_batch: hot_top_levels_batch, + batches_for_shards: hot_batches_for_shards, snapshot, - } = state_merkle_batch; + } = hot_state_merkle_batch; + let StateMerkleBatch { + top_levels_batch: cold_top_levels_batch, + batches_for_shards: cold_batches_for_shards, + .. + } = cold_state_merkle_batch; let base_version = self.persisted_state.get_state_summary().version(); let current_version = snapshot @@ -63,8 +68,23 @@ impl StateMerkleBatchCommitter { OTHER_TIMERS_SECONDS.timer_with(&["commit_jellyfish_merkle_nodes"]); self.state_db .state_merkle_db - .commit(current_version, top_levels_batch, batches_for_shards) - .expect("State merkle nodes commit failed."); + .commit( + current_version, + hot_top_levels_batch, + hot_batches_for_shards, + /* is_hot = */ true, + ) + .expect("State merkle hot nodes commit failed."); + self.state_db + .state_merkle_db + .commit( + current_version, + cold_top_levels_batch, + cold_batches_for_shards, + /* is_hot = */ false, + ) + .expect("State merkle cold nodes commit failed."); + if let Some(lru_cache) = self.state_db.state_merkle_db.lru_cache() { self.state_db .state_merkle_db @@ -87,7 +107,7 @@ impl StateMerkleBatchCommitter { .epoch_snapshot_pruner .maybe_set_pruner_target_db_version(current_version); - self.check_usage_consistency(&snapshot).unwrap(); + // self.check_usage_consistency(&snapshot).unwrap(); snapshot .summary() diff --git a/storage/aptosdb/src/state_store/state_snapshot_committer.rs b/storage/aptosdb/src/state_store/state_snapshot_committer.rs index 48e908cce1702..f814b0a813463 100644 --- a/storage/aptosdb/src/state_store/state_snapshot_committer.rs +++ b/storage/aptosdb/src/state_store/state_snapshot_committer.rs @@ -13,8 +13,9 @@ use crate::{ }, versioned_node_cache::VersionedNodeCache, }; +use aptos_crypto::hash::CryptoHash; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; -use aptos_logger::trace; +use aptos_logger::{info, trace}; use aptos_metrics_core::TimerHelper; use aptos_storage_interface::{ jmt_update_refs, state_store::state_with_summary::StateWithSummary, Result, @@ -23,9 +24,9 @@ use itertools::Itertools; use rayon::prelude::*; use static_assertions::const_assert; use std::{ + collections::HashMap, sync::{ - mpsc, - mpsc::{Receiver, SyncSender}, + mpsc::{self, Receiver, SyncSender}, Arc, }, thread::JoinHandle, @@ -36,7 +37,8 @@ pub(crate) struct StateSnapshotCommitter { /// Last snapshot merklized and sent for persistence, not guaranteed to have committed already. last_snapshot: StateWithSummary, state_snapshot_commit_receiver: Receiver>, - state_merkle_batch_commit_sender: SyncSender>, + state_merkle_batch_commit_sender: + SyncSender>, join_handle: Option>, } @@ -83,6 +85,10 @@ impl StateSnapshotCommitter { CommitMessage::Data(snapshot) => { let version = snapshot.version().expect("Cannot be empty"); let base_version = self.last_snapshot.version(); + info!( + "StateSnapshotCommitter: base_version: {:?}, version: {:?}", + base_version, version + ); let previous_epoch_ending_version = self .state_db .ledger_db @@ -91,17 +97,26 @@ impl StateSnapshotCommitter { .unwrap() .map(|(v, _e)| v); - let (shard_root_nodes, batches_for_shards) = { + let ( + (hot_shard_root_nodes, hot_batches_for_shards), + (cold_shard_root_nodes, cold_batches_for_shards), + ) = { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["calculate_batches_for_shards"]); - let shard_persisted_versions = self + let hot_shard_persisted_versions = self + .state_db + .state_merkle_db + .get_hot_shard_persisted_versions(base_version) + .unwrap(); + let cold_shard_persisted_versions = self .state_db .state_merkle_db .get_shard_persisted_versions(base_version) .unwrap(); let min_version = self.last_snapshot.next_version(); + // info!("min_version: {min_version}"); THREAD_MANAGER.get_non_exe_cpu_pool().install(|| { snapshot @@ -110,35 +125,96 @@ impl StateSnapshotCommitter { .par_iter() .enumerate() .map(|(shard_id, updates)| { - let node_hashes = snapshot - .summary() - .global_state_summary - .new_node_hashes_since( - &self.last_snapshot.summary().global_state_summary, - shard_id as u8, - ); - // TODO(aldenhu): iterator of refs - let updates = { - let _timer = - OTHER_TIMERS_SECONDS.timer_with(&["hash_jmt_updates"]); - - updates - .iter() - .filter_map(|(key, slot)| { + // TODO: use node_hashes for both hot and cold + // let node_hashes = snapshot + // .summary() + // .global_state_summary + // .new_node_hashes_since( + // &self.last_snapshot.summary().global_state_summary, + // shard_id as u8, + // ); + + // Now we need to figure out the updates to hot and the updates + // to cold. + let hot_updates = updates + .iter() + .filter_map(|(key, slot)| { + if slot.is_hot() { slot.maybe_update_jmt(key, min_version) - }) - .collect_vec() - }; - - self.state_db.state_merkle_db.merklize_value_set_for_shard( - shard_id, - jmt_update_refs(&updates), - Some(&node_hashes), - version, - base_version, - shard_persisted_versions[shard_id], - previous_epoch_ending_version, - ) + } else { + Some((CryptoHash::hash(&key), None)) + } + }) + .collect_vec(); + + // FIXME: probably not right. + // Example: + // - before: K is cold and its value is v0 + // - block N: write to key K (v1) => K exists in hot state and K's old value (v0) exists in cold state + // - block N+1: evict key K => insert K's new value (v1) into cold state + // - block N+2: read key K => promote it into hot state, but leave the value (v1) in cold as well + // - If we batch N to N+2, we won't touch it in cold state, but we SHOULD update it from v0 to v1 + + let inner_maps = updates.inner_maps(); + info!( + "base layer: {}. top layer: {}. #inner layers: {}", + updates.base_layer(), + updates.top_layer(), + inner_maps.len() + ); + assert_eq!( + inner_maps.first().unwrap().base_layer(), + updates.base_layer() + ); + assert_eq!( + inner_maps.last().unwrap().top_layer(), + updates.top_layer() + ); + + let mut all_cold_updates = HashMap::new(); + for block_update in inner_maps { + all_cold_updates.extend( + block_update.iter().filter(|(_, slot)| slot.is_cold()), + ); + } + let cold_updates = all_cold_updates + .into_iter() + .filter_map(|(key, slot)| slot.maybe_update_jmt(key, 0)) + .collect_vec(); + // info!( + // "shard {}: #hot updates: {}, #cold updates: {}", + // shard_id, + // hot_updates.len(), + // cold_updates.len() + // ); + + let hot_stuff = self + .state_db + .state_merkle_db + .merklize_value_set_for_shard( + shard_id, + jmt_update_refs(&hot_updates), + None, + version, + base_version, + hot_shard_persisted_versions[shard_id], + previous_epoch_ending_version, + /* is_hot = */ true, + )?; + let cold_stuff = self + .state_db + .state_merkle_db + .merklize_value_set_for_shard( + shard_id, + jmt_update_refs(&cold_updates), + None, + version, + base_version, + cold_shard_persisted_versions[shard_id], + previous_epoch_ending_version, + /* is_hot = */ false, + )?; + Ok((hot_stuff, cold_stuff)) }) .collect::>>() .expect("Error calculating StateMerkleBatch for shards.") @@ -147,47 +223,81 @@ impl StateSnapshotCommitter { }) }; - let (root_hash, leaf_count, top_levels_batch) = { + let (hot_root_hash, hot_leaf_count, hot_top_levels_batch) = { + let _timer = + OTHER_TIMERS_SECONDS.timer_with(&["calculate_top_levels_batch"]); + self.state_db + .state_merkle_db + .calculate_top_levels( + hot_shard_root_nodes, + version, + base_version, + previous_epoch_ending_version, + ) + .expect("Error calculating StateMerkleBatch for top levels.") + }; + let (cold_root_hash, cold_leaf_count, cold_top_levels_batch) = { let _timer = OTHER_TIMERS_SECONDS.timer_with(&["calculate_top_levels_batch"]); self.state_db .state_merkle_db .calculate_top_levels( - shard_root_nodes, + cold_shard_root_nodes, version, base_version, previous_epoch_ending_version, ) .expect("Error calculating StateMerkleBatch for top levels.") }; + + assert_eq!( + hot_root_hash, + snapshot.summary().hot_root_hash(), + "hot root hash mismatch: jmt: {}, smt: {}", + hot_root_hash, + snapshot.summary().hot_root_hash(), + ); assert_eq!( - root_hash, + cold_root_hash, snapshot.summary().root_hash(), - "root hash mismatch: jmt: {}, smt: {}", - root_hash, + "cold root hash mismatch: jmt: {}, smt: {}", + cold_root_hash, snapshot.summary().root_hash(), ); - let usage = snapshot.state().usage(); - if !usage.is_untracked() { - assert_eq!( - leaf_count, - usage.items(), - "Num of state items mismatch: jmt: {}, state: {}", - leaf_count, - usage.items(), - ); - } + // let usage = snapshot.state().usage(); + // if !usage.is_untracked() { + // assert_eq!( + // leaf_count, + // usage.items(), + // "Num of state items mismatch: jmt: {}, state: {}", + // leaf_count, + // usage.items(), + // ); + // } self.last_snapshot = snapshot.clone(); self.state_merkle_batch_commit_sender - .send(CommitMessage::Data(StateMerkleBatch { - top_levels_batch, - batches_for_shards, - snapshot, - })) + .send(CommitMessage::Data(( + StateMerkleBatch { + top_levels_batch: hot_top_levels_batch, + batches_for_shards: hot_batches_for_shards, + snapshot: snapshot.clone(), + }, + StateMerkleBatch { + top_levels_batch: cold_top_levels_batch, + batches_for_shards: cold_batches_for_shards, + snapshot, + }, + ))) + .unwrap(); + + let (my_tx, my_rx) = mpsc::channel(); + self.state_merkle_batch_commit_sender + .send(CommitMessage::Sync(my_tx)) .unwrap(); + my_rx.recv().unwrap(); }, CommitMessage::Sync(finish_sender) => { self.state_merkle_batch_commit_sender diff --git a/storage/aptosdb/src/utils/truncation_helper.rs b/storage/aptosdb/src/utils/truncation_helper.rs index 4e87d92c75415..3ad97e2cfcd75 100644 --- a/storage/aptosdb/src/utils/truncation_helper.rs +++ b/storage/aptosdb/src/utils/truncation_helper.rs @@ -156,6 +156,7 @@ pub(crate) fn truncate_state_merkle_db( break; } + unreachable!("SHOULD NOT TRUNCATE"); let version_before = find_closest_node_version_at_or_before( state_merkle_db.metadata_db(), current_version - 1, @@ -171,7 +172,11 @@ pub(crate) fn truncate_state_merkle_db( &mut top_levels_batch, )?; - state_merkle_db.commit_top_levels(version_before, top_levels_batch)?; + state_merkle_db.commit_top_levels( + version_before, + top_levels_batch, + /* is_hot = */ false, + )?; truncate_state_merkle_db_shards(state_merkle_db, version_before)?; } diff --git a/storage/jellyfish-merkle/src/lib.rs b/storage/jellyfish-merkle/src/lib.rs index 4d6895fbef582..be81406f21da7 100644 --- a/storage/jellyfish-merkle/src/lib.rs +++ b/storage/jellyfish-merkle/src/lib.rs @@ -85,6 +85,7 @@ pub mod test_helper; use crate::metrics::{APTOS_JELLYFISH_LEAF_COUNT, APTOS_JELLYFISH_LEAF_DELETION_COUNT, COUNTER}; use aptos_crypto::{hash::CryptoHash, HashValue}; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; +use aptos_logger::info; use aptos_metrics_core::{IntCounterHelper, IntCounterVecHelper}; use aptos_storage_interface::{db_ensure as ensure, db_other_bail, AptosDbError, Result}; use aptos_types::{ @@ -302,7 +303,7 @@ pub struct JellyfishMerkleTree<'a, R, K> { impl<'a, R, K> JellyfishMerkleTree<'a, R, K> where R: 'a + TreeReader + Sync, - K: Key, + K: Key + std::fmt::Debug, { /// Creates a `JellyfishMerkleTree` backed by the given [`TreeReader`](trait.TreeReader.html). pub fn new(reader: &'a R) -> Self { @@ -466,6 +467,7 @@ where if let Some(root_persisted_version) = root_persisted_version { let root_node_key = NodeKey::new_empty_path(root_persisted_version); let root_node = self.reader.get_node_with_tag(&root_node_key, "commit")?; + info!("root_node: {:?}", root_node); match root_node { Node::Internal(root_node) => { for shard_id in 0..16 { @@ -475,6 +477,7 @@ where } } }, + Node::Null => (), _ => { unreachable!("Assume the db doesn't have exactly 1 state.") }, diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index 2a1f311726a3a..27a92ac5eef18 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -390,6 +390,7 @@ pub trait DbReader: Send + Sync { key_hash: &HashValue, version: Version, root_depth: usize, + is_hot: bool, ) -> Result; /// Gets a state value by state key along with the proof, out of the ledger state indicated by the state @@ -405,6 +406,7 @@ pub trait DbReader: Send + Sync { key_hash: &HashValue, version: Version, root_depth: usize, + is_hot: bool, ) -> Result<(Option, SparseMerkleProofExt)>; /// Gets the latest LedgerView no matter if db has been bootstrapped. @@ -545,9 +547,15 @@ pub trait DbReader: Send + Sync { &self, state_key: &StateKey, version: Version, + is_hot: bool, ) -> Result<(Option, SparseMerkleProof)> { - self.get_state_value_with_proof_by_version_ext(state_key.crypto_hash_ref(), version, 0) - .map(|(value, proof_ext)| (value, proof_ext.into())) + self.get_state_value_with_proof_by_version_ext( + state_key.crypto_hash_ref(), + version, + 0, + is_hot, + ) + .map(|(value, proof_ext)| (value, proof_ext.into())) } fn ensure_synced_version(&self) -> Result { diff --git a/storage/storage-interface/src/mock.rs b/storage/storage-interface/src/mock.rs index a20e251ef6e54..cc40f7248d8a1 100644 --- a/storage/storage-interface/src/mock.rs +++ b/storage/storage-interface/src/mock.rs @@ -28,6 +28,7 @@ impl DbReader for MockDbReaderWriter { _key_hash: &HashValue, _version: Version, _root_depth: usize, + _is_hot: bool, ) -> Result { Ok(SparseMerkleProofExt::new(None, vec![])) } diff --git a/storage/storage-interface/src/state_store/state.rs b/storage/storage-interface/src/state_store/state.rs index 3f7f6e13e83bc..7f303c0485343 100644 --- a/storage/storage-interface/src/state_store/state.rs +++ b/storage/storage-interface/src/state_store/state.rs @@ -19,11 +19,13 @@ use crate::{ }; use anyhow::Result; use aptos_experimental_layered_map::{LayeredMap, MapLayer}; +use aptos_logger::info; use aptos_metrics_core::TimerHelper; use aptos_types::{ state_store::{ hot_state::HotStateConfig, state_key::StateKey, state_slot::StateSlot, - state_storage_usage::StateStorageUsage, StateViewId, NUM_STATE_SHARDS, + state_storage_usage::StateStorageUsage, state_value::StateValue, StateViewId, + NUM_STATE_SHARDS, }, transaction::Version, }; @@ -155,8 +157,13 @@ impl State { per_version_updates: &PerVersionStateUpdateRefs, all_checkpoint_versions: &[Version], state_cache: &ShardedStateCache, - ) -> Self { + ) -> ( + Self, + [HashMap>; NUM_STATE_SHARDS], + [HashMap>; NUM_STATE_SHARDS], + ) { let _timer = TIMER.timer_with(&["state__update"]); + let last_version = batched_updates.last_version(); // 1. The update batch must begin at self.next_version(). assert_eq!(self.next_version(), batched_updates.first_version); @@ -175,7 +182,10 @@ impl State { assert!(self.next_version() >= state_cache.next_version()); let overlay = self.make_delta(persisted); - let ((shards, new_metadata), usage_delta_per_shard): ((Vec<_>, Vec<_>), Vec<_>) = ( + let ((((shards, new_metadata), usage_delta_per_shard), all_inserted), all_evicted): ( + (((Vec<_>, Vec<_>), Vec<_>), Vec<_>), + Vec<_>, + ) = ( state_cache.shards.as_slice(), overlay.shards.as_slice(), self.hot_state_metadata.as_slice(), @@ -183,8 +193,9 @@ impl State { per_version_updates.shards.as_slice(), ) .into_par_iter() + .enumerate() .map( - |(cache, overlay, hot_metadata, batched_updates, per_version)| { + |(shard_id, (cache, overlay, hot_metadata, batched_updates, per_version))| { let mut lru = HotStateLRU::new( NonZeroUsize::new(self.hot_state_config.max_items_per_shard).unwrap(), Arc::clone(&persisted_hot_state), @@ -194,21 +205,46 @@ impl State { hot_metadata.num_items, ); let mut all_updates = per_version.iter(); + let mut updated = HashMap::new(); + let mut evicted = HashMap::new(); for ckpt_version in all_checkpoint_versions { for (key, update) in all_updates.take_while_ref(|(_k, u)| u.version <= *ckpt_version) { - Self::apply_one_update(&mut lru, overlay, cache, key, update); + let inserted_value = + Self::apply_one_update(&mut lru, overlay, cache, key, update); + updated.insert((*key).clone(), inserted_value); } - // Only evict at the checkpoints. - lru.maybe_evict(); + let actually_evicted = lru.maybe_evict(); + evicted = actually_evicted + .into_iter() + .map(|(k, slot)| { + assert!(slot.is_hot()); + (k, slot.into_state_value_opt()) + }) + .collect(); } for (key, update) in all_updates { + panic!("not state-sync right now"); Self::apply_one_update(&mut lru, overlay, cache, key, update); } + for key in evicted.keys() { + updated.remove(key); + } + // info!("shard {}. updated: {:?}", shard_id, updated.keys().collect_vec()); + // info!("shard {}. evicted: {:?}", shard_id, evicted.keys().collect_vec()); let (new_items, new_head, new_tail, new_num_items) = lru.into_updates(); let new_items = new_items.into_iter().collect_vec(); + // info!( + // "shard {}: #new items for version {:?}: {}. # updates: {}. # evictions: {}. Last version: {:?}", + // shard_id, + // last_version, + // new_items.len(), + // updated.len(), + // evicted.len(), + // last_version, + // ); // TODO(aldenhu): change interface to take iter of ref let new_layer = overlay.new_layer(&new_items); @@ -218,7 +254,7 @@ impl State { num_items: new_num_items, }; let new_usage = Self::usage_delta_for_shard(cache, overlay, batched_updates); - ((new_layer, new_metadata), new_usage) + ((((new_layer, new_metadata), new_usage), updated), evicted) }, ) .unzip(); @@ -226,13 +262,20 @@ impl State { let new_metadata = new_metadata.try_into().expect("Known to be 16 shards."); let usage = self.update_usage(usage_delta_per_shard); + assert_eq!(all_inserted.len(), 16); + assert_eq!(all_evicted.len(), 16); + // TODO(HotState): extract and pass new hot state onchain config if needed. - State::new_with_updates( - batched_updates.last_version(), - shards, - new_metadata, - usage, - self.hot_state_config, + ( + State::new_with_updates( + batched_updates.last_version(), + shards, + new_metadata, + usage, + self.hot_state_config, + ), + all_inserted.try_into().unwrap(), + all_evicted.try_into().unwrap(), ) } @@ -242,26 +285,28 @@ impl State { read_cache: &StateCacheShard, key: &StateKey, update: &StateUpdateRef, - ) { + ) -> Option { if update.state_op.is_value_write_op() { lru.insert((*key).clone(), update.to_result_slot().unwrap()); - return; + return update.state_op.as_state_value_opt().unwrap().cloned(); } if let Some(mut slot) = lru.get_slot(key) { - lru.insert( - (*key).clone(), - if slot.is_hot() { - slot.refresh(update.version); - slot - } else { - slot.to_hot(update.version) - }, - ); + let slot_to_insert = if slot.is_hot() { + slot.refresh(update.version); + slot + } else { + slot.to_hot(update.version) + }; + let ret = slot_to_insert.as_state_value_opt().cloned(); + lru.insert((*key).clone(), slot_to_insert); + ret } else { let slot = Self::expect_old_slot(overlay, read_cache, key); assert!(slot.is_cold()); + let ret = slot.as_state_value_opt().cloned(); lru.insert((*key).clone(), slot.to_hot(update.version)); + ret } } @@ -368,47 +413,31 @@ impl LedgerState { persisted_snapshot: &State, updates: &StateUpdateRefs, reads: &ShardedStateCache, - ) -> LedgerState { + ) -> ( + LedgerState, + [HashMap>; NUM_STATE_SHARDS], + [HashMap>; NUM_STATE_SHARDS], + ) { let _timer = TIMER.timer_with(&["ledger_state__update"]); - let last_checkpoint = if let Some(batched) = updates.for_last_checkpoint_batched() { - let per_version = updates - .for_last_checkpoint_per_version() - .expect("Both per-version and batched updates should exist."); - self.latest().update( - Arc::clone(&persisted_hot_view), - persisted_snapshot, - batched, - per_version, - updates.all_checkpoint_versions(), - reads, - ) - } else { - self.last_checkpoint.clone() - }; - - let base_of_latest = if updates.for_last_checkpoint_batched().is_none() { - self.latest() - } else { - &last_checkpoint - }; - let latest = if let Some(batched) = updates.for_latest_batched() { - let per_version = updates - .for_latest_per_version() - .expect("Both per-version and batched updates should exist."); - base_of_latest.update( - persisted_hot_view, - persisted_snapshot, - batched, - per_version, - &[], - reads, - ) - } else { - base_of_latest.clone() - }; + let batched = updates.for_last_checkpoint_batched().unwrap(); + let per_version = updates + .for_last_checkpoint_per_version() + .expect("Both per-version and batched updates should exist."); + let (new_ckpt, hot_inserted, hot_evicted) = self.latest().update( + Arc::clone(&persisted_hot_view), + persisted_snapshot, + batched, + per_version, + updates.all_checkpoint_versions(), + reads, + ); - LedgerState::new(latest, last_checkpoint) + ( + LedgerState::new(new_ckpt.clone(), new_ckpt), + hot_inserted, + hot_evicted, + ) } /// Old values of the updated keys are read from the DbReader at the version of the @@ -429,7 +458,7 @@ impl LedgerState { ); state_view.prime_cache(updates, PrimingPolicy::All)?; - let updated = self.update_with_memorized_reads( + let (updated, _, _) = self.update_with_memorized_reads( hot_state, persisted_snapshot, updates, diff --git a/storage/storage-interface/src/state_store/state_summary.rs b/storage/storage-interface/src/state_store/state_summary.rs index fae80acd226f4..b18f3b07a30e3 100644 --- a/storage/storage-interface/src/state_store/state_summary.rs +++ b/storage/storage-interface/src/state_store/state_summary.rs @@ -14,12 +14,18 @@ use aptos_crypto::{ hash::{CryptoHash, CORRUPTION_SENTINEL}, HashValue, }; +use aptos_logger::info; use aptos_metrics_core::TimerHelper; use aptos_scratchpad::{ProofRead, SparseMerkleTree}; -use aptos_types::{proof::SparseMerkleProofExt, transaction::Version}; +use aptos_types::{ + proof::SparseMerkleProofExt, + state_store::{state_key::StateKey, state_value::StateValue}, + transaction::Version, +}; use derive_more::Deref; use itertools::Itertools; use rayon::prelude::*; +use std::collections::HashMap; /// The data structure through which the entire state at a given /// version can be summarized to a concise digest (the root hash). @@ -56,6 +62,10 @@ impl StateSummary { self.global_state_summary.root_hash() } + pub fn hot_root_hash(&self) -> HashValue { + self.hot_state_summary.root_hash() + } + pub fn next_version(&self) -> Version { self.next_version } @@ -65,38 +75,66 @@ impl StateSummary { } pub fn is_descendant_of(&self, other: &Self) -> bool { - self.global_state_summary - .is_descendant_of(&other.global_state_summary) + self.hot_state_summary + .is_descendant_of(&other.hot_state_summary) + && self + .global_state_summary + .is_descendant_of(&other.global_state_summary) } pub fn update( &self, - persisted: &ProvableStateSummary, - updates: &BatchedStateUpdateRefs, + hot_persisted: &ProvableStateSummary, + cold_persisted: &ProvableStateSummary, + hot_inserted: &[HashMap>; 16], + hot_evicted: &[HashMap>; 16], + new_next_version: Version, ) -> Result { let _timer = TIMER.timer_with(&["state_summary__update"]); + assert_ne!(self.hot_state_summary.root_hash(), *CORRUPTION_SENTINEL); assert_ne!(self.global_state_summary.root_hash(), *CORRUPTION_SENTINEL); // Persisted must be before or at my version. - assert!(persisted.next_version() <= self.next_version()); + assert!(hot_persisted.next_version() <= self.next_version()); + assert!(cold_persisted.next_version() <= self.next_version()); // Updates must start at exactly my version. - assert_eq!(updates.first_version(), self.next_version()); + // assert_eq!(updates.first_version(), self.next_version()); - let smt_updates = updates - .shards + // info!("start compute hot smt updates"); + let hot_smt_updates = hot_inserted .par_iter() // clone hashes and sort items in parallel + .zip(hot_evicted.par_iter()) // TODO(aldenhu): smt per shard? - .flat_map(|shard| { - shard + .flat_map(|(inserted, evicted)| { + inserted .iter() - .filter_map(|(k, u)| { - // Filter out `MakeHot` ops. - u.state_op - .as_state_value_opt() - .map(|value_opt| (k, value_opt)) - }) - .map(|(k, value_opt)| (*k, value_opt.map(|v| v.hash()))) + .map(|(k, value_opt)| (k, value_opt.as_ref().map(|v| v.hash()))) + .chain(evicted.keys().map(|k| (k, None))) + // The keys in the shard are already unique, and shards are ordered by the + // first nibble of the key hash. `batch_update_sorted_uniq` can be + // called if within each shard items are sorted by key hash. + .sorted_by_key(|(k, _v)| k.crypto_hash_ref()) + .collect_vec() // TODO: Remove collect_vec?? Same below. + }) + .collect::>(); + // info!("done compute hot smt updates"); + + let hot_smt = self + .hot_state_summary + .freeze(&hot_persisted.hot_state_summary) + .batch_update_sorted_uniq(&hot_smt_updates, hot_persisted)? + .unfreeze(); + // info!("done computing hot smt"); + + // info!("start compute cold smt updates"); + let cold_smt_updates = hot_evicted + .par_iter() // clone hashes and sort items in parallel + // TODO(aldenhu): smt per shard? + .flat_map(|evicted| { + evicted + .iter() + .map(|(k, value_opt)| (k, value_opt.as_ref().map(|v| v.hash()))) // The keys in the shard are already unique, and shards are ordered by the // first nibble of the key hash. `batch_update_sorted_uniq` can be // called if within each shard items are sorted by key hash. @@ -104,19 +142,21 @@ impl StateSummary { .collect_vec() }) .collect::>(); + // info!("done compute cold smt updates"); - let smt = self + let cold_smt = self .global_state_summary - .freeze(&persisted.global_state_summary) - .batch_update_sorted_uniq(&smt_updates, persisted)? + .freeze(&cold_persisted.global_state_summary) + .batch_update_sorted_uniq(&cold_smt_updates, cold_persisted)? .unfreeze(); + // info!("done computing cold smt"); // TODO(HotState): compute new hot state from the `self.hot_state_summary` and // `updates`. Ok(Self { - next_version: updates.next_version(), - hot_state_summary: SparseMerkleTree::new_empty(), - global_state_summary: smt, + next_version: new_next_version, + hot_state_summary: hot_smt, + global_state_summary: cold_smt, }) } } @@ -166,29 +206,26 @@ impl LedgerStateSummary { pub fn update( &self, - persisted: &ProvableStateSummary, - updates: &StateUpdateRefs, + hot_persisted: &ProvableStateSummary, + cold_persisted: &ProvableStateSummary, + hot_inserted: &[HashMap>; 16], + hot_evicted: &[HashMap>; 16], + new_next_version: Version, ) -> Result { let _timer = TIMER.timer_with(&["ledger_state_summary__update"]); + assert_eq!( + self.latest.next_version(), + self.last_checkpoint.next_version() + ); - let last_checkpoint = if let Some(updates) = updates.for_last_checkpoint_batched() { - self.latest.update(persisted, updates)? - } else { - self.last_checkpoint.clone() - }; - - let base_of_latest = if updates.for_last_checkpoint_batched().is_none() { - self.latest() - } else { - &last_checkpoint - }; - let latest = if let Some(updates) = updates.for_latest_batched() { - base_of_latest.update(persisted, updates)? - } else { - base_of_latest.clone() - }; - - Ok(Self::new(last_checkpoint, latest)) + let new_ckpt = self.latest.update( + hot_persisted, + cold_persisted, + hot_inserted, + hot_evicted, + new_next_version, + )?; + Ok(Self::new(new_ckpt.clone(), new_ckpt)) } } @@ -197,15 +234,20 @@ pub struct ProvableStateSummary<'db> { #[deref] state_summary: StateSummary, db: &'db (dyn DbReader + Sync), + is_hot: bool, } impl<'db> ProvableStateSummary<'db> { - pub fn new_persisted(db: &'db (dyn DbReader + Sync)) -> Result { - Ok(Self::new(db.get_persisted_state_summary()?, db)) + pub fn new_persisted(db: &'db (dyn DbReader + Sync), is_hot: bool) -> Result { + Ok(Self::new(db.get_persisted_state_summary()?, db, is_hot)) } - pub fn new(state_summary: StateSummary, db: &'db (dyn DbReader + Sync)) -> Self { - Self { state_summary, db } + pub fn new(state_summary: StateSummary, db: &'db (dyn DbReader + Sync), is_hot: bool) -> Self { + Self { + state_summary, + db, + is_hot, + } } fn get_proof( @@ -219,9 +261,13 @@ impl<'db> ProvableStateSummary<'db> { let (val_opt, proof) = self .db // check the full proof - .get_state_value_with_proof_by_version_ext(key, version, 0)?; + .get_state_value_with_proof_by_version_ext(key, version, 0, self.is_hot)?; proof.verify( - self.state_summary.global_state_summary.root_hash(), + if self.is_hot { + self.state_summary.hot_state_summary.root_hash() + } else { + self.state_summary.global_state_summary.root_hash() + }, *key, val_opt.as_ref(), )?; @@ -229,7 +275,7 @@ impl<'db> ProvableStateSummary<'db> { } else { Ok(self .db - .get_state_proof_by_version_ext(key, version, root_depth)?) + .get_state_proof_by_version_ext(key, version, root_depth, self.is_hot)?) } } } diff --git a/storage/storage-interface/src/state_store/state_view/db_state_view.rs b/storage/storage-interface/src/state_store/state_view/db_state_view.rs index ce52e04ea383b..d7fef2b780339 100644 --- a/storage/storage-interface/src/state_store/state_view/db_state_view.rs +++ b/storage/storage-interface/src/state_store/state_view/db_state_view.rs @@ -31,8 +31,9 @@ impl DbStateView { // DB doesn't support returning proofs for buffered state, so only optionally // verify proof. // TODO: support returning state proof for buffered state. - if let Ok((value, proof)) = - self.db.get_state_value_with_proof_by_version(key, version) + if let Ok((value, proof)) = self + .db + .get_state_value_with_proof_by_version(key, version, /* is_hot = */ false) { proof.verify(root_hash, *key.crypto_hash_ref(), value.as_ref())?; } diff --git a/types/src/state_store/hot_state.rs b/types/src/state_store/hot_state.rs index 50b330c45d302..e2c5aa89974aa 100644 --- a/types/src/state_store/hot_state.rs +++ b/types/src/state_store/hot_state.rs @@ -4,7 +4,7 @@ // 256 MiB per shard pub const HOT_STATE_MAX_BYTES_PER_SHARD: usize = 256 * 1024 * 1024; // 250k items per shard -pub const HOT_STATE_MAX_ITEMS_PER_SHARD: usize = 250_000; +pub const HOT_STATE_MAX_ITEMS_PER_SHARD: usize = 60_000; // 10KB, worst case the hot state still caches about 400K items (all shards) pub const HOT_STATE_MAX_SINGLE_VALUE_BYTES: usize = 10 * 1024;