Skip to content

Commit dbd7543

Browse files
committed
[Hot State] Include HotVacant slots in root hash
In #18390, we simply set the format of the hot state Merkle tree to `Map<StateKey, StateValue>`. This means that for `HotVacant` entries, for instance, entries that are read recently and determined to not exist in storage, we do not hash them into the tree. (And if they existed before, we remove them from the Merkle tree.) This is inaccurate. This commit introduces the `HotStateValue` struct and changes the format to `Map<StateKey, HotStateValue>`. This way, both `HotOccupied` and `HotVacant` entries are summarized into the root hash. In addition, we now check refresh interval and avoid refreshing read-only keys every block.
1 parent 7e7f2f0 commit dbd7543

File tree

8 files changed

+241
-90
lines changed

8 files changed

+241
-90
lines changed

aptos-move/block-executor/src/hot_state_op_accumulator.rs

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@ pub struct BlockHotStateOpAccumulator<Key> {
1818
writes: hashbrown::HashSet<Key>,
1919
/// To prevent the block epilogue from being too heavy.
2020
max_promotions_per_block: usize,
21-
/// Every now and then refresh `hot_since_version` for hot items to prevent them from being
22-
/// evicted.
23-
_refresh_interval_versions: usize,
2421
}
2522

2623
impl<Key> BlockHotStateOpAccumulator<Key>
@@ -29,25 +26,16 @@ where
2926
{
3027
/// TODO(HotState): make on-chain config
3128
const MAX_PROMOTIONS_PER_BLOCK: usize = 1024 * 10;
32-
/// TODO(HotState): make on-chain config
33-
const REFRESH_INTERVAL_VERSIONS: usize = 1_000_000;
3429

3530
pub fn new() -> Self {
36-
Self::new_with_config(
37-
Self::MAX_PROMOTIONS_PER_BLOCK,
38-
Self::REFRESH_INTERVAL_VERSIONS,
39-
)
31+
Self::new_with_config(Self::MAX_PROMOTIONS_PER_BLOCK)
4032
}
4133

42-
pub fn new_with_config(
43-
max_promotions_per_block: usize,
44-
refresh_interval_versions: usize,
45-
) -> Self {
34+
pub fn new_with_config(max_promotions_per_block: usize) -> Self {
4635
Self {
4736
to_make_hot: BTreeSet::new(),
4837
writes: hashbrown::HashSet::new(),
4938
max_promotions_per_block,
50-
_refresh_interval_versions: refresh_interval_versions,
5139
}
5240
}
5341

config/src/config/storage_config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ impl Default for RocksdbConfigs {
243243
pub struct HotStateConfig {
244244
/// Max number of items in each shard.
245245
pub max_items_per_shard: usize,
246+
/// Every now and then refresh `hot_since_version` for hot items to prevent them from being
247+
/// evicted.
248+
pub refresh_interval_versions: u64,
246249
/// Whether to delete persisted data on disk on restart. Used during development.
247250
pub delete_on_restart: bool,
248251
/// Whether we compute root hashes for hot state in executor and commit the resulting JMT to
@@ -254,6 +257,7 @@ impl Default for HotStateConfig {
254257
fn default() -> Self {
255258
Self {
256259
max_items_per_shard: 250_000,
260+
refresh_interval_versions: 100_000,
257261
delete_on_restart: true,
258262
compute_root_hash: true,
259263
}

storage/aptosdb/src/state_store/state_snapshot_committer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use aptos_storage_interface::{
2525
jmt_update_refs, state_store::state_with_summary::StateWithSummary, Result,
2626
};
2727
use aptos_types::{
28-
state_store::{state_key::StateKey, NUM_STATE_SHARDS},
28+
state_store::{hot_state::HotStateValueRef, state_key::StateKey, NUM_STATE_SHARDS},
2929
transaction::Version,
3030
};
3131
use rayon::prelude::*;
@@ -99,6 +99,7 @@ impl StateSnapshotCommitter {
9999
.map(|(v, _e)| v);
100100
let min_version = self.last_snapshot.next_version();
101101

102+
// Element format: (key_hash, Option<(value_hash, key)>)
102103
let (hot_updates, all_updates): (Vec<_>, Vec<_>) = snapshot
103104
.make_delta(&self.last_snapshot)
104105
.shards
@@ -111,8 +112,10 @@ impl StateSnapshotCommitter {
111112
if slot.is_hot() {
112113
hot_updates.push((
113114
CryptoHash::hash(&key),
114-
slot.as_state_value_opt()
115-
.map(|value| (CryptoHash::hash(value), key.clone())),
115+
Some((
116+
HotStateValueRef::from_slot(&slot).hash(),
117+
key.clone(),
118+
)),
116119
));
117120
} else {
118121
hot_updates.push((CryptoHash::hash(&key), None));

storage/aptosdb/src/state_store/tests/speculative_state_workflow.rs

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,12 @@ use aptos_storage_interface::{
2121
use aptos_types::{
2222
proof::SparseMerkleProofExt,
2323
state_store::{
24-
hot_state::LRUEntry, state_key::StateKey, state_slot::StateSlot,
25-
state_storage_usage::StateStorageUsage, state_value::StateValue, StateViewId,
26-
StateViewResult, TStateView, NUM_STATE_SHARDS,
24+
hot_state::{HotStateValueRef, LRUEntry},
25+
state_key::StateKey,
26+
state_slot::StateSlot,
27+
state_storage_usage::StateStorageUsage,
28+
state_value::StateValue,
29+
StateViewId, StateViewResult, TStateView, NUM_STATE_SHARDS,
2730
},
2831
transaction::Version,
2932
write_set::{BaseStateOp, HotStateOp, WriteOp},
@@ -52,10 +55,11 @@ const HOT_STATE_MAX_ITEMS_PER_SHARD: usize = NUM_KEYS / NUM_STATE_SHARDS / 2;
5255

5356
// TODO(HotState): these are not used much at the moment.
5457
const MAX_PROMOTIONS_PER_BLOCK: usize = NUM_KEYS;
55-
const REFRESH_INTERVAL_VERSIONS: usize = 50;
58+
const REFRESH_INTERVAL_VERSIONS: Version = 50;
5659

5760
const TEST_CONFIG: HotStateConfig = HotStateConfig {
5861
max_items_per_shard: HOT_STATE_MAX_ITEMS_PER_SHARD,
62+
refresh_interval_versions: REFRESH_INTERVAL_VERSIONS,
5963
delete_on_restart: false,
6064
compute_root_hash: true,
6165
};
@@ -209,10 +213,11 @@ impl VersionState {
209213
hot_since_version: version,
210214
lru_info: LRUEntry::uninitialized(),
211215
};
216+
hot_smt_updates
217+
.push((k.hash(), Some(HotStateValueRef::from_slot(&slot).hash())));
212218
hot_state[shard_id].put(k.clone(), slot);
213-
state.remove(k);
214-
hot_smt_updates.push((k.hash(), None));
215219
smt_updates.push((k.hash(), None));
220+
state.remove(k);
216221
},
217222
Some(v) => {
218223
let slot = StateSlot::HotOccupied {
@@ -221,35 +226,47 @@ impl VersionState {
221226
hot_since_version: version,
222227
lru_info: LRUEntry::uninitialized(),
223228
};
229+
hot_smt_updates
230+
.push((k.hash(), Some(HotStateValueRef::from_slot(&slot).hash())));
224231
hot_state[shard_id].put(k.clone(), slot);
225-
state.insert(k.clone(), (version, v.clone()));
226-
hot_smt_updates.push((k.hash(), Some(v.hash())));
227232
smt_updates.push((k.hash(), Some(v.hash())));
233+
state.insert(k.clone(), (version, v.clone()));
228234
},
229235
}
230236
}
231237

232238
for k in promotions {
233239
assert!(is_checkpoint, "No promotions unless in checkpoints");
234240
let shard_id = k.get_shard_id();
235-
if let Some(slot) = hot_state[shard_id].get_mut(k) {
236-
slot.refresh(version);
237-
continue;
241+
242+
let hot_value_hash;
243+
if let Some(slot) = hot_state[shard_id].peek_mut(k) {
244+
if slot.expect_hot_since_version() + REFRESH_INTERVAL_VERSIONS <= version {
245+
slot.refresh(version);
246+
hot_value_hash = Some(HotStateValueRef::from_slot(slot).hash());
247+
} else {
248+
hot_value_hash = None;
249+
}
250+
} else {
251+
let slot = match state.get(k) {
252+
Some((value_version, value)) => StateSlot::HotOccupied {
253+
value_version: *value_version,
254+
value: value.clone(),
255+
hot_since_version: version,
256+
lru_info: LRUEntry::uninitialized(),
257+
},
258+
None => StateSlot::HotVacant {
259+
hot_since_version: version,
260+
lru_info: LRUEntry::uninitialized(),
261+
},
262+
};
263+
hot_value_hash = Some(HotStateValueRef::from_slot(&slot).hash());
264+
hot_state[shard_id].put(k.clone(), slot);
265+
}
266+
if hot_value_hash.is_some() {
267+
assert!(hot_state[shard_id].promote(k));
268+
hot_smt_updates.push((k.hash(), hot_value_hash));
238269
}
239-
let slot = match state.get(k) {
240-
Some((value_version, value)) => StateSlot::HotOccupied {
241-
value_version: *value_version,
242-
value: value.clone(),
243-
hot_since_version: version,
244-
lru_info: LRUEntry::uninitialized(),
245-
},
246-
None => StateSlot::HotVacant {
247-
hot_since_version: version,
248-
lru_info: LRUEntry::uninitialized(),
249-
},
250-
};
251-
hot_smt_updates.push((k.hash(), slot.as_state_value_opt().map(|v| v.hash())));
252-
hot_state[shard_id].put(k.clone(), slot);
253270
}
254271

255272
if is_checkpoint {
@@ -268,6 +285,11 @@ impl VersionState {
268285
let bytes = state.iter().map(|(k, v)| k.size() + v.1.size()).sum();
269286
let usage = StateStorageUsage::new(items, bytes);
270287

288+
assert_eq!(
289+
hot_state.iter().map(|x| x.len()).sum::<usize>(),
290+
hot_summary.leaves.len()
291+
);
292+
271293
Self {
272294
usage,
273295
hot_state,
@@ -660,10 +682,8 @@ fn naive_run_blocks(blocks: Vec<(Vec<UserTxn>, bool)>) -> (Vec<Txn>, StateByVers
660682
let mut all_txns = vec![];
661683
let mut state_by_version = StateByVersion::new_empty();
662684
for (block_txns, append_epilogue) in blocks {
663-
let mut op_accu = BlockHotStateOpAccumulator::<StateKey>::new_with_config(
664-
MAX_PROMOTIONS_PER_BLOCK,
665-
REFRESH_INTERVAL_VERSIONS,
666-
);
685+
let mut op_accu =
686+
BlockHotStateOpAccumulator::<StateKey>::new_with_config(MAX_PROMOTIONS_PER_BLOCK);
667687
let num_txns = block_txns.len();
668688
for (idx, txn) in block_txns.into_iter().enumerate() {
669689
// No promotions except for block epilogue. Also note that in case of reconfig, there's

storage/storage-interface/src/state_store/mod.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,18 @@ pub mod state_view;
1010
pub mod state_with_summary;
1111
pub mod versioned_state_value;
1212

13-
use aptos_types::state_store::{state_key::StateKey, state_value::StateValue, NUM_STATE_SHARDS};
14-
use std::collections::HashMap;
13+
use aptos_types::state_store::{hot_state::HotStateValue, state_key::StateKey, NUM_STATE_SHARDS};
14+
use std::collections::{HashMap, HashSet};
1515

16-
// TODO(HotState): this isn't the final form yet. For instance, `HotVacant` entries probably should
17-
// be included, but right now they are `None` in these and will be removed from the Merkle trees.
1816
#[derive(Debug)]
1917
pub(crate) struct HotStateShardUpdates {
20-
insertions: HashMap<StateKey, Option<StateValue>>,
21-
evictions: HashMap<StateKey, Option<StateValue>>,
18+
insertions: HashMap<StateKey, HotStateValue>,
19+
// TODO(HotState): only keys are needed for now, since evictions do not affect cold state.
20+
evictions: HashSet<StateKey>,
2221
}
2322

2423
impl HotStateShardUpdates {
25-
pub fn new(
26-
insertions: HashMap<StateKey, Option<StateValue>>,
27-
evictions: HashMap<StateKey, Option<StateValue>>,
28-
) -> Self {
24+
pub fn new(insertions: HashMap<StateKey, HotStateValue>, evictions: HashSet<StateKey>) -> Self {
2925
Self {
3026
insertions,
3127
evictions,

storage/storage-interface/src/state_store/state.rs

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,20 @@ use aptos_experimental_layered_map::{LayeredMap, MapLayer};
2424
use aptos_metrics_core::TimerHelper;
2525
use aptos_types::{
2626
state_store::{
27-
state_key::StateKey, state_slot::StateSlot, state_storage_usage::StateStorageUsage,
28-
state_value::StateValue, StateViewId, NUM_STATE_SHARDS,
27+
hot_state::HotStateValue, state_key::StateKey, state_slot::StateSlot,
28+
state_storage_usage::StateStorageUsage, StateViewId, NUM_STATE_SHARDS,
2929
},
3030
transaction::Version,
3131
};
3232
use arr_macro::arr;
3333
use derive_more::Deref;
3434
use itertools::Itertools;
3535
use rayon::prelude::*;
36-
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc};
36+
use std::{
37+
collections::{HashMap, HashSet},
38+
num::NonZeroUsize,
39+
sync::Arc,
40+
};
3741

3842
#[derive(Clone, Debug)]
3943
pub struct HotStateMetadata {
@@ -200,30 +204,42 @@ impl State {
200204
);
201205
let mut all_updates = per_version.iter();
202206
let mut insertions = HashMap::new();
203-
let mut evictions = HashMap::new();
207+
let mut evictions = HashSet::new();
204208
for ckpt_version in all_checkpoint_versions {
205209
for (key, update) in
206210
all_updates.take_while_ref(|(_k, u)| u.version <= *ckpt_version)
207211
{
208212
evictions.remove(*key);
209-
insertions.insert(
210-
(*key).clone(),
211-
Self::apply_one_update(&mut lru, overlay, cache, key, update),
212-
);
213+
if let Some(hv) = Self::apply_one_update(
214+
&mut lru,
215+
overlay,
216+
cache,
217+
key,
218+
update,
219+
self.hot_state_config.refresh_interval_versions,
220+
) {
221+
insertions.insert((*key).clone(), hv);
222+
}
213223
}
214224
// Only evict at the checkpoints.
215225
evictions.extend(lru.maybe_evict().into_iter().map(|(key, slot)| {
216226
insertions.remove(&key);
217227
assert!(slot.is_hot());
218-
(key, slot.into_state_value_opt())
228+
key
219229
}));
220230
}
221231
for (key, update) in all_updates {
222232
evictions.remove(*key);
223-
insertions.insert(
224-
(*key).clone(),
225-
Self::apply_one_update(&mut lru, overlay, cache, key, update),
226-
);
233+
if let Some(hv) = Self::apply_one_update(
234+
&mut lru,
235+
overlay,
236+
cache,
237+
key,
238+
update,
239+
self.hot_state_config.refresh_interval_versions,
240+
) {
241+
insertions.insert((*key).clone(), hv);
242+
}
227243
}
228244

229245
let (new_items, new_head, new_tail, new_num_items) = lru.into_updates();
@@ -264,34 +280,48 @@ impl State {
264280
)
265281
}
266282

283+
/// Applies the update the returns the `HotStateValue` that will later go into the hot state
284+
/// Merkle tree. `None` if the op is `MakeHot` and it's determined that refresh is not
285+
/// necessary.
267286
fn apply_one_update(
268287
lru: &mut HotStateLRU,
269288
overlay: &LayeredMap<StateKey, StateSlot>,
270289
read_cache: &StateCacheShard,
271290
key: &StateKey,
272291
update: &StateUpdateRef,
273-
) -> Option<StateValue> {
292+
refresh_interval: Version,
293+
) -> Option<HotStateValue> {
274294
if let Some(state_value_opt) = update.state_op.as_state_value_opt() {
275295
lru.insert((*key).clone(), update.to_result_slot().unwrap());
276-
return state_value_opt.cloned();
296+
return Some(HotStateValue::new(state_value_opt.cloned(), update.version));
277297
}
278298

279299
if let Some(mut slot) = lru.get_slot(key) {
300+
let mut refreshed = true;
280301
let slot_to_insert = if slot.is_hot() {
281-
slot.refresh(update.version);
302+
if slot.expect_hot_since_version() + refresh_interval <= update.version {
303+
slot.refresh(update.version);
304+
} else {
305+
refreshed = false;
306+
}
282307
slot
283308
} else {
284309
slot.to_hot(update.version)
285310
};
286-
let ret = slot_to_insert.as_state_value_opt().cloned();
287-
lru.insert((*key).clone(), slot_to_insert);
288-
ret
311+
if refreshed {
312+
let ret = HotStateValue::clone_from_slot(&slot_to_insert);
313+
lru.insert((*key).clone(), slot_to_insert);
314+
Some(ret)
315+
} else {
316+
None
317+
}
289318
} else {
290319
let slot = Self::expect_old_slot(overlay, read_cache, key);
291320
assert!(slot.is_cold());
292-
let ret = slot.as_state_value_opt().cloned();
293-
lru.insert((*key).clone(), slot.to_hot(update.version));
294-
ret
321+
let slot = slot.to_hot(update.version);
322+
let ret = HotStateValue::clone_from_slot(&slot);
323+
lru.insert((*key).clone(), slot);
324+
Some(ret)
295325
}
296326
}
297327

0 commit comments

Comments
 (0)