Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/src/config/storage_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ pub struct PrunerConfig {
impl Default for LedgerPrunerConfig {
fn default() -> Self {
LedgerPrunerConfig {
enable: true,
enable: false,
prune_window: 90_000_000,
batch_size: 5_000,
user_pruning_window_offset: 200_000,
Expand All @@ -369,7 +369,7 @@ impl Default for LedgerPrunerConfig {
impl Default for StateMerklePrunerConfig {
fn default() -> Self {
StateMerklePrunerConfig {
enable: true,
enable: false,
// This allows a block / chunk being executed to have access to a non-latest state tree.
// It needs to be greater than the number of versions the state committing thread is
// able to commit during the execution of the block / chunk. If the bad case indeed
Expand All @@ -386,7 +386,7 @@ impl Default for StateMerklePrunerConfig {
impl Default for EpochSnapshotPrunerConfig {
fn default() -> Self {
Self {
enable: true,
enable: false,
// This is based on ~5K TPS * 2h/epoch * 2 epochs. -- epoch ending snapshots are used
// by state sync in fast sync mode.
// The setting is in versions, not epochs, because this makes it behave more like other
Expand Down
18 changes: 16 additions & 2 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ use aptos_storage_interface::state_store::{
use aptos_types::{
contract_event::ContractEvent,
epoch_state::EpochState,
state_store::hot_state::HotStateConfig,
state_store::{
hot_state::HotStateConfig, state_key::StateKey, state_value::StateValue, NUM_STATE_SHARDS,
},
transaction::{
block_epilogue::BlockEndInfo, ExecutionStatus, Transaction, TransactionStatus, Version,
},
};
use derive_more::Deref;
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

#[derive(Clone, Debug, Deref)]
pub struct ExecutionOutput {
Expand All @@ -41,6 +43,8 @@ impl ExecutionOutput {
block_end_info: Option<BlockEndInfo>,
next_epoch_state: Option<EpochState>,
subscribable_events: Planned<Vec<ContractEvent>>,
hot_inserted: [HashMap<StateKey, Option<StateValue>>; NUM_STATE_SHARDS],
hot_evicted: [HashMap<StateKey, Option<StateValue>>; NUM_STATE_SHARDS],
) -> Self {
let next_version = first_version + to_commit.len() as Version;
assert_eq!(next_version, result_state.latest().next_version());
Expand All @@ -65,6 +69,8 @@ impl ExecutionOutput {
block_end_info,
next_epoch_state,
subscribable_events,
hot_inserted,
hot_evicted,
})
}

Expand All @@ -81,6 +87,8 @@ impl ExecutionOutput {
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
hot_inserted: std::array::from_fn(|_| HashMap::new()),
hot_evicted: std::array::from_fn(|_| HashMap::new()),
})
}

Expand All @@ -99,6 +107,8 @@ impl ExecutionOutput {
block_end_info: None,
next_epoch_state: None,
subscribable_events: Planned::ready(vec![]),
hot_inserted: std::array::from_fn(|_| HashMap::new()),
hot_evicted: std::array::from_fn(|_| HashMap::new()),
})
}

Expand All @@ -119,6 +129,8 @@ impl ExecutionOutput {
block_end_info: None,
next_epoch_state: self.next_epoch_state.clone(),
subscribable_events: Planned::ready(vec![]),
hot_inserted: std::array::from_fn(|_| HashMap::new()),
hot_evicted: std::array::from_fn(|_| HashMap::new()),
})
}

Expand Down Expand Up @@ -166,6 +178,8 @@ pub struct Inner {
/// state cache.
pub next_epoch_state: Option<EpochState>,
pub subscribable_events: Planned<Vec<ContractEvent>>,
pub hot_inserted: [HashMap<StateKey, Option<StateValue>>; NUM_STATE_SHARDS],
pub hot_evicted: [HashMap<StateKey, Option<StateValue>>; NUM_STATE_SHARDS],
}

impl Inner {
Expand Down
9 changes: 8 additions & 1 deletion execution/executor/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,14 @@ where
output.set_state_checkpoint_output(DoStateCheckpoint::run(
&output.execution_output,
parent_block.output.ensure_result_state_summary()?,
&ProvableStateSummary::new_persisted(self.db.reader.as_ref())?,
&ProvableStateSummary::new_persisted(
self.db.reader.as_ref(),
/* is_hot = */ true,
)?,
&ProvableStateSummary::new_persisted(
self.db.reader.as_ref(),
/* is_hot = */ false,
)?,
None,
)?);
output.set_ledger_update_output(DoLedgerUpdate::run(
Expand Down
3 changes: 2 additions & 1 deletion execution/executor/src/chunk_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ impl<V: VMBlockExecutor> ChunkExecutorInner<V> {
let state_checkpoint_output = DoStateCheckpoint::run(
&output.execution_output,
&parent_state_summary,
&ProvableStateSummary::new_persisted(self.db.reader.as_ref())?,
&ProvableStateSummary::new_persisted(self.db.reader.as_ref(), true)?,
&ProvableStateSummary::new_persisted(self.db.reader.as_ref(), false)?,
Some(
chunk_verifier
.transaction_infos()
Expand Down
18 changes: 14 additions & 4 deletions execution/executor/src/workflow/do_get_execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use aptos_types::{
};
use aptos_vm::VMBlockExecutor;
use itertools::Itertools;
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

pub struct DoGetExecutionOutput;

Expand Down Expand Up @@ -171,7 +171,7 @@ impl DoGetExecutionOutput {
}
}

Parser::parse(
let parsed = Parser::parse(
state_view.next_version(),
transactions,
transaction_outputs,
Expand All @@ -182,7 +182,9 @@ impl DoGetExecutionOutput {
transaction_slice_metadata
.append_state_checkpoint_to_block()
.is_some(),
)
)?;
info!("Finished executing transactions");
Ok(parsed)
}

pub fn by_transaction_execution_sharded<V: VMBlockExecutor>(
Expand Down Expand Up @@ -417,12 +419,18 @@ impl Parser {
},
)?;

let result_state = parent_state.update_with_memorized_reads(
let (result_state, hot_inserted, hot_evicted) = parent_state.update_with_memorized_reads(
base_state_view.persisted_hot_state(),
base_state_view.persisted_state(),
to_commit.state_update_refs(),
base_state_view.memorized_reads(),
);
// for (i, shard) in hot_inserted.iter().enumerate() {
// info!("shard {}: # inserted: {}", i, shard.len());
// }
// for (i, shard) in hot_evicted.iter().enumerate() {
// info!("shard {}: # evicted: {}", i, shard.len());
// }
let state_reads = base_state_view.into_memorized_reads();

let out = ExecutionOutput::new(
Expand All @@ -437,6 +445,8 @@ impl Parser {
block_end_info,
next_epoch_state,
Planned::place_holder(),
hot_inserted,
hot_evicted,
);
let ret = out.clone();
ret.subscribable_events
Expand Down
14 changes: 11 additions & 3 deletions execution/executor/src/workflow/do_state_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use aptos_crypto::HashValue;
use aptos_executor_types::{
execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput,
};
use aptos_logger::info;
use aptos_metrics_core::TimerHelper;
use aptos_storage_interface::state_store::state_summary::{
LedgerStateSummary, ProvableStateSummary,
Expand All @@ -18,14 +19,19 @@ impl DoStateCheckpoint {
pub fn run(
execution_output: &ExecutionOutput,
parent_state_summary: &LedgerStateSummary,
persisted_state_summary: &ProvableStateSummary,
hot_persisted_state_summary: &ProvableStateSummary,
cold_persisted_state_summary: &ProvableStateSummary,
known_state_checkpoints: Option<Vec<Option<HashValue>>>,
) -> Result<StateCheckpointOutput> {
let _timer = OTHER_TIMERS.timer_with(&["do_state_checkpoint"]);
// info!("Start getting state checkpoint");

let state_summary = parent_state_summary.update(
persisted_state_summary,
execution_output.to_commit.state_update_refs(),
hot_persisted_state_summary,
cold_persisted_state_summary,
&execution_output.hot_inserted,
&execution_output.hot_evicted,
execution_output.next_version(),
)?;

let state_checkpoint_hashes = Self::get_state_checkpoint_hashes(
Expand All @@ -34,6 +40,8 @@ impl DoStateCheckpoint {
&state_summary,
)?;

// info!("Finished getting state checkpoint");

Ok(StateCheckpointOutput::new(
state_summary,
state_checkpoint_hashes,
Expand Down
3 changes: 2 additions & 1 deletion execution/executor/src/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ impl ApplyExecutionOutput {
let state_checkpoint_output = DoStateCheckpoint::run(
&execution_output,
&base_view.state_summary,
&ProvableStateSummary::new_persisted(reader)?,
&ProvableStateSummary::new_persisted(reader, true)?,
&ProvableStateSummary::new_persisted(reader, false)?,
None,
)?;
let ledger_update_output = DoLedgerUpdate::run(
Expand Down
10 changes: 9 additions & 1 deletion experimental/storage/layered-map/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use aptos_crypto::HashValue;
use aptos_drop_helper::ArcAsyncDrop;
use aptos_infallible::Mutex;
use aptos_metrics_core::IntGaugeVecHelper;
use std::{marker::PhantomData, sync::Arc};
use std::{marker::PhantomData, sync::{Arc, Weak}};

#[derive(Debug)]
struct LayerInner<K: ArcAsyncDrop, V: ArcAsyncDrop> {
peak: FlattenPerfectTree<K, V>,
parent: Weak<LayerInner<K, V>>,
children: Mutex<Vec<Arc<LayerInner<K, V>>>>,
use_case: &'static str,
family: HashValue,
Expand Down Expand Up @@ -50,6 +51,7 @@ impl<K: ArcAsyncDrop, V: ArcAsyncDrop> LayerInner<K, V> {
let family = HashValue::random();
Arc::new(Self {
peak: FlattenPerfectTree::new_with_empty_nodes(1),
parent: Weak::new(),
children: Mutex::new(Vec::new()),
use_case,
family,
Expand All @@ -61,6 +63,7 @@ impl<K: ArcAsyncDrop, V: ArcAsyncDrop> LayerInner<K, V> {
fn spawn(self: &Arc<Self>, child_peak: FlattenPerfectTree<K, V>, base_layer: u64) -> Arc<Self> {
let child = Arc::new(Self {
peak: child_peak,
parent: Arc::downgrade(self),
children: Mutex::new(Vec::new()),
use_case: self.use_case,
family: self.family,
Expand Down Expand Up @@ -108,6 +111,11 @@ impl<K: ArcAsyncDrop, V: ArcAsyncDrop> MapLayer<K, V> {
}
}

pub(crate) fn parent(&self) -> Option<Self> {
let parent_inner = self.inner.parent.upgrade();
parent_inner.map(Self::new)
}

pub fn into_layers_view_after(self, base_layer: MapLayer<K, V>) -> LayeredMap<K, V> {
assert!(base_layer.is_family(&self));
assert!(base_layer.inner.layer >= self.inner.base_layer);
Expand Down
23 changes: 22 additions & 1 deletion experimental/storage/layered-map/src/map/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ where
(base_layer, top_layer)
}

pub(crate) fn base_layer(&self) -> u64 {
pub fn base_layer(&self) -> u64 {
self.base_layer.layer()
}

pub fn top_layer(&self) -> u64 {
self.top_layer.layer()
}
}

impl<K, V, S> LayeredMap<K, V, S>
Expand Down Expand Up @@ -114,4 +118,21 @@ where
.into_feet_iter()
.flat_map(|node| DescendantIterator::new(node, self.base_layer()))
}

pub fn inner_maps(&self) -> Vec<Self> {
let mut ret = Vec::new();

let mut current_layer = self.top_layer.clone();
let mut current_layer_num = self.top_layer.layer();
let lowest_layer_num = self.base_layer.layer();
while current_layer_num > lowest_layer_num {
let next_high = current_layer.parent().expect("the next one must exist.");
ret.push(Self::new(next_high.clone(), current_layer));
current_layer = next_high;
current_layer_num = current_layer.layer();
}
ret.reverse();

ret
}
}
6 changes: 4 additions & 2 deletions storage/aptosdb/src/db/aptosdb_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,12 +660,13 @@ impl DbReader for AptosDB {
key_hash: &HashValue,
version: Version,
root_depth: usize,
is_hot: bool,
) -> Result<SparseMerkleProofExt> {
gauged_api("get_state_proof_by_version_ext", || {
self.error_if_state_merkle_pruned("State merkle", version)?;

self.state_store
.get_state_proof_by_version_ext(key_hash, version, root_depth)
.get_state_proof_by_version_ext(key_hash, version, root_depth, is_hot)
})
}

Expand All @@ -674,12 +675,13 @@ impl DbReader for AptosDB {
key_hash: &HashValue,
version: Version,
root_depth: usize,
is_hot: bool,
) -> Result<(Option<StateValue>, SparseMerkleProofExt)> {
gauged_api("get_state_value_with_proof_by_version_ext", || {
self.error_if_state_merkle_pruned("State merkle", version)?;

self.state_store
.get_state_value_with_proof_by_version_ext(key_hash, version, root_depth)
.get_state_value_with_proof_by_version_ext(key_hash, version, root_depth, is_hot)
})
}

Expand Down
Loading
Loading