Skip to content

Commit f1dac58

Browse files
committed
[Hot State] Compute root hash for hot state
1 parent c64f4f6 commit f1dac58

File tree

16 files changed

+537
-122
lines changed

16 files changed

+537
-122
lines changed

execution/executor-types/src/execution_output.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ use crate::{
1010
use aptos_config::config::HotStateConfig;
1111
use aptos_drop_helper::DropHelper;
1212
use aptos_storage_interface::state_store::{
13-
state::LedgerState, state_view::cached_state_view::ShardedStateCache,
13+
state::LedgerState, state_view::cached_state_view::ShardedStateCache, HotStateShardUpdates,
1414
};
1515
use aptos_types::{
1616
contract_event::ContractEvent,
1717
epoch_state::EpochState,
18+
state_store::NUM_STATE_SHARDS,
1819
transaction::{
1920
block_epilogue::BlockEndInfo, ExecutionStatus, Transaction, TransactionStatus, Version,
2021
},
@@ -41,6 +42,7 @@ impl ExecutionOutput {
4142
block_end_info: Option<BlockEndInfo>,
4243
next_epoch_state: Option<EpochState>,
4344
subscribable_events: Planned<Vec<ContractEvent>>,
45+
hot_state_updates: [Vec<HotStateShardUpdates>; NUM_STATE_SHARDS],
4446
) -> Self {
4547
let next_version = first_version + to_commit.len() as Version;
4648
assert_eq!(next_version, result_state.latest().next_version());
@@ -65,6 +67,7 @@ impl ExecutionOutput {
6567
block_end_info,
6668
next_epoch_state,
6769
subscribable_events,
70+
hot_state_updates,
6871
})
6972
}
7073

@@ -81,6 +84,7 @@ impl ExecutionOutput {
8184
block_end_info: None,
8285
next_epoch_state: None,
8386
subscribable_events: Planned::ready(vec![]),
87+
hot_state_updates: std::array::from_fn(|_| vec![]),
8488
})
8589
}
8690

@@ -99,6 +103,7 @@ impl ExecutionOutput {
99103
block_end_info: None,
100104
next_epoch_state: None,
101105
subscribable_events: Planned::ready(vec![]),
106+
hot_state_updates: std::array::from_fn(|_| vec![]),
102107
})
103108
}
104109

@@ -119,6 +124,7 @@ impl ExecutionOutput {
119124
block_end_info: None,
120125
next_epoch_state: self.next_epoch_state.clone(),
121126
subscribable_events: Planned::ready(vec![]),
127+
hot_state_updates: std::array::from_fn(|_| vec![]),
122128
})
123129
}
124130

@@ -166,6 +172,8 @@ pub struct Inner {
166172
/// state cache.
167173
pub next_epoch_state: Option<EpochState>,
168174
pub subscribable_events: Planned<Vec<ContractEvent>>,
175+
/// Per checkpoint and optionally a partial block.
176+
pub hot_state_updates: [Vec<HotStateShardUpdates>; NUM_STATE_SHARDS],
169177
}
170178

171179
impl Inner {

execution/executor/src/block_executor/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,14 @@ where
315315
output.set_state_checkpoint_output(DoStateCheckpoint::run(
316316
&output.execution_output,
317317
parent_block.output.ensure_result_state_summary()?,
318-
&ProvableStateSummary::new_persisted(self.db.reader.as_ref())?,
318+
&ProvableStateSummary::new_persisted(
319+
self.db.reader.as_ref(),
320+
/* is_hot = */ true,
321+
)?,
322+
&ProvableStateSummary::new_persisted(
323+
self.db.reader.as_ref(),
324+
/* is_hot = */ false,
325+
)?,
319326
None,
320327
)?);
321328
output.set_ledger_update_output(DoLedgerUpdate::run(

execution/executor/src/chunk_executor/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,14 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
346346
let state_checkpoint_output = DoStateCheckpoint::run(
347347
&output.execution_output,
348348
&parent_state_summary,
349-
&ProvableStateSummary::new_persisted(self.db.reader.as_ref())?,
349+
&ProvableStateSummary::new_persisted(
350+
self.db.reader.as_ref(),
351+
/* is_hot = */ true,
352+
)?,
353+
&ProvableStateSummary::new_persisted(
354+
self.db.reader.as_ref(),
355+
/* is_hot = */ false,
356+
)?,
350357
Some(
351358
chunk_verifier
352359
.transaction_infos()

execution/executor/src/workflow/do_get_execution_output.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ impl Parser {
417417
},
418418
)?;
419419

420-
let result_state = parent_state.update_with_memorized_reads(
420+
let (result_state, hot_state_updates) = parent_state.update_with_memorized_reads(
421421
base_state_view.persisted_hot_state(),
422422
base_state_view.persisted_state(),
423423
to_commit.state_update_refs(),
@@ -437,6 +437,7 @@ impl Parser {
437437
block_end_info,
438438
next_epoch_state,
439439
Planned::place_holder(),
440+
hot_state_updates,
440441
);
441442
let ret = out.clone();
442443
ret.subscribable_events

execution/executor/src/workflow/do_state_checkpoint.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ impl DoStateCheckpoint {
1818
pub fn run(
1919
execution_output: &ExecutionOutput,
2020
parent_state_summary: &LedgerStateSummary,
21-
persisted_state_summary: &ProvableStateSummary,
21+
hot_persisted_state_summary: &ProvableStateSummary,
22+
cold_persisted_state_summary: &ProvableStateSummary,
2223
known_state_checkpoints: Option<Vec<Option<HashValue>>>,
2324
) -> Result<StateCheckpointOutput> {
2425
let _timer = OTHER_TIMERS.timer_with(&["do_state_checkpoint"]);
2526

2627
let state_summary = parent_state_summary.update(
27-
persisted_state_summary,
28+
hot_persisted_state_summary,
29+
&execution_output.hot_state_updates,
30+
cold_persisted_state_summary,
2831
execution_output.to_commit.state_update_refs(),
2932
)?;
3033

execution/executor/src/workflow/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ impl ApplyExecutionOutput {
2727
let state_checkpoint_output = DoStateCheckpoint::run(
2828
&execution_output,
2929
&base_view.state_summary,
30-
&ProvableStateSummary::new_persisted(reader)?,
30+
&ProvableStateSummary::new_persisted(reader, /* is_hot = */ true)?,
31+
&ProvableStateSummary::new_persisted(reader, /* is_hot = */ false)?,
3132
None,
3233
)?;
3334
let ledger_update_output = DoLedgerUpdate::run(

storage/aptosdb/src/db/aptosdb_testonly.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,15 +166,17 @@ impl AptosDB {
166166

167167
let current = self.state_store.current_state_locked().clone();
168168
let (hot_state, persisted_state) = self.state_store.get_persisted_state()?;
169-
let (new_state, reads) = current.ledger_state().update_with_db_reader(
169+
let (new_state, reads, hot_state_updates) = current.ledger_state().update_with_db_reader(
170170
&persisted_state,
171171
hot_state,
172172
transactions_to_keep.state_update_refs(),
173173
self.state_store.clone(),
174174
)?;
175175
let persisted_summary = self.state_store.get_persisted_state_summary()?;
176176
let new_state_summary = current.ledger_state_summary().update(
177-
&ProvableStateSummary::new(persisted_summary, self),
177+
&ProvableStateSummary::new(persisted_summary.clone(), self, /* is_hot = */ true),
178+
&hot_state_updates,
179+
&ProvableStateSummary::new(persisted_summary, self, /* is_hot = */ false),
178180
transactions_to_keep.state_update_refs(),
179181
)?;
180182

storage/aptosdb/src/state_store/mod.rs

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ use aptos_db_indexer_schemas::{
4141
schema::indexer_metadata::InternalIndexerMetadataSchema,
4242
};
4343
use aptos_infallible::Mutex;
44-
use aptos_jellyfish_merkle::iterator::JellyfishMerkleIterator;
44+
use aptos_jellyfish_merkle::{
45+
iterator::JellyfishMerkleIterator,
46+
node_type::{Node, NodeKey},
47+
TreeWriter,
48+
};
4549
use aptos_logger::info;
4650
use aptos_metrics_core::TimerHelper;
4751
use aptos_schemadb::batch::{NativeBatch, SchemaBatch, WriteBatch};
@@ -58,6 +62,7 @@ use aptos_storage_interface::{
5862
},
5963
state_with_summary::{LedgerStateWithSummary, StateWithSummary},
6064
versioned_state_value::StateUpdateRef,
65+
HotStateShardUpdates,
6166
},
6267
AptosDbError, DbReader, Result, StateSnapshotReceiver,
6368
};
@@ -79,6 +84,7 @@ use claims::{assert_ge, assert_le};
7984
use itertools::Itertools;
8085
use rayon::prelude::*;
8186
use std::{
87+
collections::HashMap,
8288
ops::Deref,
8389
sync::{Arc, MutexGuard},
8490
};
@@ -554,6 +560,7 @@ impl StateStore {
554560
latest_snapshot_version = latest_snapshot_version,
555561
"Initializing BufferedState."
556562
);
563+
// TODO(HotState): read hot root hash from DB.
557564
let latest_snapshot_root_hash = if let Some(version) = latest_snapshot_version {
558565
state_db
559566
.state_merkle_db
@@ -595,6 +602,18 @@ impl StateStore {
595602
);
596603
}
597604

605+
if snapshot_next_version > 0 {
606+
let prev_version = snapshot_next_version - 1;
607+
let node_key = NodeKey::new_empty_path(prev_version);
608+
let node = Node::Null;
609+
let mut node_batch = HashMap::new();
610+
node_batch.insert(node_key, node);
611+
if let Some(db) = &state_db.hot_state_merkle_db {
612+
db.write_node_batch(&node_batch)?;
613+
}
614+
info!("Wrote null node for hot state at version {prev_version}");
615+
}
616+
598617
// Replaying the committed write sets after the latest snapshot.
599618
if snapshot_next_version < num_transactions {
600619
if check_max_versions_after_snapshot {
@@ -605,6 +624,8 @@ impl StateStore {
605624
num_transactions,
606625
);
607626
}
627+
info!("Replaying writesets from {snapshot_next_version} to {num_transactions} to let state Merkle DB catch up.");
628+
608629
let write_sets = state_db
609630
.ledger_db
610631
.write_set_db()
@@ -628,17 +649,28 @@ impl StateStore {
628649
);
629650
let current_state = out_current_state.lock().clone();
630651
let (hot_state, state) = out_persisted_state.get_state();
631-
let (new_state, _state_reads) = current_state.ledger_state().update_with_db_reader(
632-
&state,
633-
hot_state,
634-
&state_update_refs,
635-
state_db.clone(),
636-
)?;
652+
let (new_state, _state_reads, hot_state_updates) = current_state
653+
.ledger_state()
654+
.update_with_db_reader(&state, hot_state, &state_update_refs, state_db.clone())?;
637655
let state_summary = out_persisted_state.get_state_summary();
638656
let new_state_summary = current_state.ledger_state_summary().update(
639-
&ProvableStateSummary::new(state_summary, state_db.as_ref()),
657+
&ProvableStateSummary::new(
658+
state_summary.clone(),
659+
state_db.as_ref(),
660+
/* is_hot = */ true,
661+
),
662+
&hot_state_updates,
663+
&ProvableStateSummary::new(
664+
state_summary,
665+
state_db.as_ref(),
666+
/* is_hot = */ false,
667+
),
640668
&state_update_refs,
641669
)?;
670+
info!(
671+
"new hot state summary root hash: {}",
672+
new_state_summary.hot_root_hash()
673+
);
642674
let updated =
643675
LedgerStateWithSummary::from_state_and_summary(new_state, new_state_summary);
644676

@@ -720,7 +752,7 @@ impl StateStore {
720752
) -> Result<LedgerState> {
721753
let current = self.current_state_locked().ledger_state();
722754
let (hot_state, persisted) = self.get_persisted_state()?;
723-
let (new_state, reads) = current.update_with_db_reader(
755+
let (new_state, reads, _hot_state_updates) = current.update_with_db_reader(
724756
&persisted,
725757
hot_state,
726758
state_update_refs,
@@ -1403,27 +1435,29 @@ mod test_only {
14031435
let current = self.current_state_locked().ledger_state_summary();
14041436
let persisted = self.persisted_state.get_state_summary();
14051437

1406-
let new_state_summary = current
1407-
.update(
1408-
&ProvableStateSummary::new(persisted, self.state_db.as_ref()),
1409-
&state_update_refs,
1410-
)
1411-
.unwrap();
1412-
let root_hash = new_state_summary.root_hash();
1413-
1414-
self.buffered_state
1415-
.lock()
1416-
.update(
1417-
LedgerStateWithSummary::from_state_and_summary(
1418-
new_ledger_state,
1419-
new_state_summary,
1420-
),
1421-
0, /* estimated_items, doesn't matter since we sync-commit */
1422-
true, /* sync_commit */
1423-
)
1424-
.unwrap();
1425-
1426-
root_hash
1438+
unimplemented!();
1439+
1440+
// let new_state_summary = current
1441+
// .update(
1442+
// &ProvableStateSummary::new(persisted, self.state_db.as_ref()),
1443+
// &state_update_refs,
1444+
// )
1445+
// .unwrap();
1446+
// let root_hash = new_state_summary.root_hash();
1447+
1448+
// self.buffered_state
1449+
// .lock()
1450+
// .update(
1451+
// LedgerStateWithSummary::from_state_and_summary(
1452+
// new_ledger_state,
1453+
// new_state_summary,
1454+
// ),
1455+
// 0, /* estimated_items, doesn't matter since we sync-commit */
1456+
// true, /* sync_commit */
1457+
// )
1458+
// .unwrap();
1459+
1460+
// root_hash
14271461
}
14281462
}
14291463
}

storage/aptosdb/src/state_store/state_merkle_batch_committer.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ pub struct StateMerkleBatch {
2525

2626
pub(crate) struct StateMerkleBatchCommitter {
2727
state_db: Arc<StateDb>,
28-
state_merkle_batch_receiver: Receiver<CommitMessage<StateMerkleBatch>>,
28+
state_merkle_batch_receiver: Receiver<CommitMessage<(StateMerkleBatch, StateMerkleBatch)>>,
2929
persisted_state: PersistedState,
3030
}
3131

3232
impl StateMerkleBatchCommitter {
3333
pub fn new(
3434
state_db: Arc<StateDb>,
35-
state_merkle_batch_receiver: Receiver<CommitMessage<StateMerkleBatch>>,
35+
state_merkle_batch_receiver: Receiver<CommitMessage<(StateMerkleBatch, StateMerkleBatch)>>,
3636
persisted_state: PersistedState,
3737
) -> Self {
3838
Self {
@@ -46,7 +46,12 @@ impl StateMerkleBatchCommitter {
4646
while let Ok(msg) = self.state_merkle_batch_receiver.recv() {
4747
let _timer = OTHER_TIMERS_SECONDS.timer_with(&["batch_committer_work"]);
4848
match msg {
49-
CommitMessage::Data(state_merkle_batch) => {
49+
CommitMessage::Data((hot_state_merkle_batch, state_merkle_batch)) => {
50+
let StateMerkleBatch {
51+
top_levels_batch: hot_top_levels_batch,
52+
batches_for_shards: hot_batches_for_shards,
53+
snapshot,
54+
} = hot_state_merkle_batch;
5055
let StateMerkleBatch {
5156
top_levels_batch,
5257
batches_for_shards,
@@ -61,6 +66,31 @@ impl StateMerkleBatchCommitter {
6166
// commit jellyfish merkle nodes
6267
let _timer =
6368
OTHER_TIMERS_SECONDS.timer_with(&["commit_jellyfish_merkle_nodes"]);
69+
self.state_db
70+
.hot_state_merkle_db
71+
.as_ref()
72+
.unwrap()
73+
.commit(
74+
current_version,
75+
hot_top_levels_batch,
76+
hot_batches_for_shards,
77+
)
78+
.expect("Hot state merkle nodes commit failed.");
79+
if let Some(lru_cache) = self
80+
.state_db
81+
.hot_state_merkle_db
82+
.as_ref()
83+
.unwrap()
84+
.lru_cache()
85+
{
86+
self.state_db
87+
.hot_state_merkle_db
88+
.as_ref()
89+
.unwrap()
90+
.version_caches()
91+
.iter()
92+
.for_each(|(_, cache)| cache.maybe_evict_version(lru_cache));
93+
}
6494
self.state_db
6595
.state_merkle_db
6696
.commit(current_version, top_levels_batch, batches_for_shards)

0 commit comments

Comments
 (0)