Skip to content

Commit dc161b5

Browse files
committed
[Hot State] Compute root hash for hot state
1 parent c64f4f6 commit dc161b5

File tree

13 files changed

+474
-186
lines changed

13 files changed

+474
-186
lines changed

execution/executor-types/src/execution_output.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ use crate::{
1010
use aptos_config::config::HotStateConfig;
1111
use aptos_drop_helper::DropHelper;
1212
use aptos_storage_interface::state_store::{
13-
state::LedgerState, state_view::cached_state_view::ShardedStateCache,
13+
state::LedgerState, state_view::cached_state_view::ShardedStateCache, HotStateShardUpdates,
1414
};
1515
use aptos_types::{
1616
contract_event::ContractEvent,
1717
epoch_state::EpochState,
18+
state_store::NUM_STATE_SHARDS,
1819
transaction::{
1920
block_epilogue::BlockEndInfo, ExecutionStatus, Transaction, TransactionStatus, Version,
2021
},
@@ -41,6 +42,7 @@ impl ExecutionOutput {
4142
block_end_info: Option<BlockEndInfo>,
4243
next_epoch_state: Option<EpochState>,
4344
subscribable_events: Planned<Vec<ContractEvent>>,
45+
hot_state_updates: [Vec<HotStateShardUpdates>; NUM_STATE_SHARDS],
4446
) -> Self {
4547
let next_version = first_version + to_commit.len() as Version;
4648
assert_eq!(next_version, result_state.latest().next_version());
@@ -65,6 +67,7 @@ impl ExecutionOutput {
6567
block_end_info,
6668
next_epoch_state,
6769
subscribable_events,
70+
hot_state_updates,
6871
})
6972
}
7073

@@ -81,6 +84,7 @@ impl ExecutionOutput {
8184
block_end_info: None,
8285
next_epoch_state: None,
8386
subscribable_events: Planned::ready(vec![]),
87+
hot_state_updates: std::array::from_fn(|_| vec![]),
8488
})
8589
}
8690

@@ -99,6 +103,7 @@ impl ExecutionOutput {
99103
block_end_info: None,
100104
next_epoch_state: None,
101105
subscribable_events: Planned::ready(vec![]),
106+
hot_state_updates: std::array::from_fn(|_| vec![]),
102107
})
103108
}
104109

@@ -119,6 +124,7 @@ impl ExecutionOutput {
119124
block_end_info: None,
120125
next_epoch_state: self.next_epoch_state.clone(),
121126
subscribable_events: Planned::ready(vec![]),
127+
hot_state_updates: std::array::from_fn(|_| vec![]),
122128
})
123129
}
124130

@@ -166,6 +172,8 @@ pub struct Inner {
166172
/// state cache.
167173
pub next_epoch_state: Option<EpochState>,
168174
pub subscribable_events: Planned<Vec<ContractEvent>>,
175+
/// Per checkpoint and optionally a partial block.
176+
pub hot_state_updates: [Vec<HotStateShardUpdates>; NUM_STATE_SHARDS],
169177
}
170178

171179
impl Inner {

execution/executor/src/block_executor/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,14 @@ where
315315
output.set_state_checkpoint_output(DoStateCheckpoint::run(
316316
&output.execution_output,
317317
parent_block.output.ensure_result_state_summary()?,
318-
&ProvableStateSummary::new_persisted(self.db.reader.as_ref())?,
318+
&ProvableStateSummary::new_persisted(
319+
self.db.reader.as_ref(),
320+
/* is_hot = */ true,
321+
)?,
322+
&ProvableStateSummary::new_persisted(
323+
self.db.reader.as_ref(),
324+
/* is_hot = */ false,
325+
)?,
319326
None,
320327
)?);
321328
output.set_ledger_update_output(DoLedgerUpdate::run(

execution/executor/src/chunk_executor/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,14 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
346346
let state_checkpoint_output = DoStateCheckpoint::run(
347347
&output.execution_output,
348348
&parent_state_summary,
349-
&ProvableStateSummary::new_persisted(self.db.reader.as_ref())?,
349+
&ProvableStateSummary::new_persisted(
350+
self.db.reader.as_ref(),
351+
/* is_hot = */ true,
352+
)?,
353+
&ProvableStateSummary::new_persisted(
354+
self.db.reader.as_ref(),
355+
/* is_hot = */ false,
356+
)?,
350357
Some(
351358
chunk_verifier
352359
.transaction_infos()

execution/executor/src/workflow/do_get_execution_output.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ impl Parser {
417417
},
418418
)?;
419419

420-
let result_state = parent_state.update_with_memorized_reads(
420+
let (result_state, hot_state_updates) = parent_state.update_with_memorized_reads(
421421
base_state_view.persisted_hot_state(),
422422
base_state_view.persisted_state(),
423423
to_commit.state_update_refs(),
@@ -437,6 +437,7 @@ impl Parser {
437437
block_end_info,
438438
next_epoch_state,
439439
Planned::place_holder(),
440+
hot_state_updates,
440441
);
441442
let ret = out.clone();
442443
ret.subscribable_events

execution/executor/src/workflow/do_state_checkpoint.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ impl DoStateCheckpoint {
1818
pub fn run(
1919
execution_output: &ExecutionOutput,
2020
parent_state_summary: &LedgerStateSummary,
21-
persisted_state_summary: &ProvableStateSummary,
21+
hot_persisted_state_summary: &ProvableStateSummary,
22+
cold_persisted_state_summary: &ProvableStateSummary,
2223
known_state_checkpoints: Option<Vec<Option<HashValue>>>,
2324
) -> Result<StateCheckpointOutput> {
2425
let _timer = OTHER_TIMERS.timer_with(&["do_state_checkpoint"]);
2526

2627
let state_summary = parent_state_summary.update(
27-
persisted_state_summary,
28+
hot_persisted_state_summary,
29+
&execution_output.hot_state_updates,
30+
cold_persisted_state_summary,
2831
execution_output.to_commit.state_update_refs(),
2932
)?;
3033

execution/executor/src/workflow/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ impl ApplyExecutionOutput {
2727
let state_checkpoint_output = DoStateCheckpoint::run(
2828
&execution_output,
2929
&base_view.state_summary,
30-
&ProvableStateSummary::new_persisted(reader)?,
30+
&ProvableStateSummary::new_persisted(reader, /* is_hot = */ true)?,
31+
&ProvableStateSummary::new_persisted(reader, /* is_hot = */ false)?,
3132
None,
3233
)?;
3334
let ledger_update_output = DoLedgerUpdate::run(

storage/aptosdb/src/db/aptosdb_testonly.rs

Lines changed: 59 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -131,67 +131,68 @@ impl AptosDB {
131131
ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
132132
sync_commit: bool,
133133
) -> Result<()> {
134-
let (transactions, transaction_outputs, transaction_infos) =
135-
Self::disassemble_txns_to_commit(txns_to_commit);
136-
// Keep auxiliary info consistent with what was used to create TransactionInfo
137-
// Use block-relative indices to match what was used during TransactionInfo creation
138-
let persisted_auxiliary_infos = txns_to_commit
139-
.iter()
140-
.map(
141-
|txn_to_commit| match txn_to_commit.transaction_info.auxiliary_info_hash() {
142-
Some(hash) => {
143-
for i in 0..100 {
144-
if hash
145-
== CryptoHash::hash(&PersistedAuxiliaryInfo::V1 {
146-
transaction_index: i as u32,
147-
})
148-
{
149-
return PersistedAuxiliaryInfo::V1 {
150-
transaction_index: i as u32,
151-
};
152-
}
153-
}
154-
panic!("Hash not found");
155-
},
156-
None => PersistedAuxiliaryInfo::None,
157-
},
158-
)
159-
.collect();
160-
let transactions_to_keep = TransactionsToKeep::make(
161-
first_version,
162-
transactions,
163-
transaction_outputs,
164-
persisted_auxiliary_infos,
165-
);
134+
unimplemented!();
135+
// let (transactions, transaction_outputs, transaction_infos) =
136+
// Self::disassemble_txns_to_commit(txns_to_commit);
137+
// // Keep auxiliary info consistent with what was used to create TransactionInfo
138+
// // Use block-relative indices to match what was used during TransactionInfo creation
139+
// let persisted_auxiliary_infos = txns_to_commit
140+
// .iter()
141+
// .map(
142+
// |txn_to_commit| match txn_to_commit.transaction_info.auxiliary_info_hash() {
143+
// Some(hash) => {
144+
// for i in 0..100 {
145+
// if hash
146+
// == CryptoHash::hash(&PersistedAuxiliaryInfo::V1 {
147+
// transaction_index: i as u32,
148+
// })
149+
// {
150+
// return PersistedAuxiliaryInfo::V1 {
151+
// transaction_index: i as u32,
152+
// };
153+
// }
154+
// }
155+
// panic!("Hash not found");
156+
// },
157+
// None => PersistedAuxiliaryInfo::None,
158+
// },
159+
// )
160+
// .collect();
161+
// let transactions_to_keep = TransactionsToKeep::make(
162+
// first_version,
163+
// transactions,
164+
// transaction_outputs,
165+
// persisted_auxiliary_infos,
166+
// );
166167

167-
let current = self.state_store.current_state_locked().clone();
168-
let (hot_state, persisted_state) = self.state_store.get_persisted_state()?;
169-
let (new_state, reads) = current.ledger_state().update_with_db_reader(
170-
&persisted_state,
171-
hot_state,
172-
transactions_to_keep.state_update_refs(),
173-
self.state_store.clone(),
174-
)?;
175-
let persisted_summary = self.state_store.get_persisted_state_summary()?;
176-
let new_state_summary = current.ledger_state_summary().update(
177-
&ProvableStateSummary::new(persisted_summary, self),
178-
transactions_to_keep.state_update_refs(),
179-
)?;
168+
// let current = self.state_store.current_state_locked().clone();
169+
// let (hot_state, persisted_state) = self.state_store.get_persisted_state()?;
170+
// let (new_state, reads) = current.ledger_state().update_with_db_reader(
171+
// &persisted_state,
172+
// hot_state,
173+
// transactions_to_keep.state_update_refs(),
174+
// self.state_store.clone(),
175+
// )?;
176+
// let persisted_summary = self.state_store.get_persisted_state_summary()?;
177+
// let new_state_summary = current.ledger_state_summary().update(
178+
// &ProvableStateSummary::new(persisted_summary, self, /* is_hot = */ false),
179+
// transactions_to_keep.state_update_refs(),
180+
// )?;
180181

181-
let chunk = ChunkToCommit {
182-
first_version,
183-
transactions: &transactions_to_keep.transactions,
184-
persisted_auxiliary_infos: &transactions_to_keep.persisted_auxiliary_infos,
185-
transaction_outputs: &transactions_to_keep.transaction_outputs,
186-
transaction_infos: &transaction_infos,
187-
state: &new_state,
188-
state_summary: &new_state_summary,
189-
state_update_refs: transactions_to_keep.state_update_refs(),
190-
state_reads: &reads,
191-
is_reconfig: transactions_to_keep.is_reconfig(),
192-
};
182+
// let chunk = ChunkToCommit {
183+
// first_version,
184+
// transactions: &transactions_to_keep.transactions,
185+
// persisted_auxiliary_infos: &transactions_to_keep.persisted_auxiliary_infos,
186+
// transaction_outputs: &transactions_to_keep.transaction_outputs,
187+
// transaction_infos: &transaction_infos,
188+
// state: &new_state,
189+
// state_summary: &new_state_summary,
190+
// state_update_refs: transactions_to_keep.state_update_refs(),
191+
// state_reads: &reads,
192+
// is_reconfig: transactions_to_keep.is_reconfig(),
193+
// };
193194

194-
self.save_transactions(chunk, ledger_info_with_sigs, sync_commit)
195+
// self.save_transactions(chunk, ledger_info_with_sigs, sync_commit)
195196
}
196197

197198
fn disassemble_txns_to_commit(

storage/aptosdb/src/state_store/mod.rs

Lines changed: 51 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -597,56 +597,57 @@ impl StateStore {
597597

598598
// Replaying the committed write sets after the latest snapshot.
599599
if snapshot_next_version < num_transactions {
600-
if check_max_versions_after_snapshot {
601-
ensure!(
602-
num_transactions - snapshot_next_version <= MAX_WRITE_SETS_AFTER_SNAPSHOT,
603-
"Too many versions after state snapshot. snapshot_next_version: {}, num_transactions: {}",
604-
snapshot_next_version,
605-
num_transactions,
606-
);
607-
}
608-
let write_sets = state_db
609-
.ledger_db
610-
.write_set_db()
611-
.get_write_sets(snapshot_next_version, num_transactions)?;
612-
let txn_info_iter = state_db
613-
.ledger_db
614-
.transaction_info_db()
615-
.get_transaction_info_iter(snapshot_next_version, write_sets.len())?;
616-
let all_checkpoint_indices = txn_info_iter
617-
.into_iter()
618-
.collect::<Result<Vec<_>>>()?
619-
.into_iter()
620-
.positions(|txn_info| txn_info.has_state_checkpoint_hash())
621-
.collect();
622-
623-
let state_update_refs = StateUpdateRefs::index_write_sets(
624-
state.next_version(),
625-
&write_sets,
626-
write_sets.len(),
627-
all_checkpoint_indices,
628-
);
629-
let current_state = out_current_state.lock().clone();
630-
let (hot_state, state) = out_persisted_state.get_state();
631-
let (new_state, _state_reads) = current_state.ledger_state().update_with_db_reader(
632-
&state,
633-
hot_state,
634-
&state_update_refs,
635-
state_db.clone(),
636-
)?;
637-
let state_summary = out_persisted_state.get_state_summary();
638-
let new_state_summary = current_state.ledger_state_summary().update(
639-
&ProvableStateSummary::new(state_summary, state_db.as_ref()),
640-
&state_update_refs,
641-
)?;
642-
let updated =
643-
LedgerStateWithSummary::from_state_and_summary(new_state, new_state_summary);
644-
645-
// synchronously commit the snapshot at the last checkpoint here if not committed to disk yet.
646-
buffered_state.update(
647-
updated, 0, /* estimated_items, doesn't matter since we sync-commit */
648-
true, /* sync_commit */
649-
)?;
600+
unimplemented!();
601+
// if check_max_versions_after_snapshot {
602+
// ensure!(
603+
// num_transactions - snapshot_next_version <= MAX_WRITE_SETS_AFTER_SNAPSHOT,
604+
// "Too many versions after state snapshot. snapshot_next_version: {}, num_transactions: {}",
605+
// snapshot_next_version,
606+
// num_transactions,
607+
// );
608+
// }
609+
// let write_sets = state_db
610+
// .ledger_db
611+
// .write_set_db()
612+
// .get_write_sets(snapshot_next_version, num_transactions)?;
613+
// let txn_info_iter = state_db
614+
// .ledger_db
615+
// .transaction_info_db()
616+
// .get_transaction_info_iter(snapshot_next_version, write_sets.len())?;
617+
// let all_checkpoint_indices = txn_info_iter
618+
// .into_iter()
619+
// .collect::<Result<Vec<_>>>()?
620+
// .into_iter()
621+
// .positions(|txn_info| txn_info.has_state_checkpoint_hash())
622+
// .collect();
623+
624+
// let state_update_refs = StateUpdateRefs::index_write_sets(
625+
// state.next_version(),
626+
// &write_sets,
627+
// write_sets.len(),
628+
// all_checkpoint_indices,
629+
// );
630+
// let current_state = out_current_state.lock().clone();
631+
// let (hot_state, state) = out_persisted_state.get_state();
632+
// let (new_state, _state_reads) = current_state.ledger_state().update_with_db_reader(
633+
// &state,
634+
// hot_state,
635+
// &state_update_refs,
636+
// state_db.clone(),
637+
// )?;
638+
// let state_summary = out_persisted_state.get_state_summary();
639+
// let new_state_summary = current_state.ledger_state_summary().update(
640+
// &ProvableStateSummary::new(state_summary, state_db.as_ref()),
641+
// &state_update_refs,
642+
// )?;
643+
// let updated =
644+
// LedgerStateWithSummary::from_state_and_summary(new_state, new_state_summary);
645+
646+
// // synchronously commit the snapshot at the last checkpoint here if not committed to disk yet.
647+
// buffered_state.update(
648+
// updated, 0, /* estimated_items, doesn't matter since we sync-commit */
649+
// true, /* sync_commit */
650+
// )?;
650651
}
651652

652653
let current_state = out_current_state.lock().clone();

0 commit comments

Comments
 (0)