Skip to content

Commit 059808f

Browse files
committed
SmtAncestor: move backpressure to the getting side
1 parent 8bd674a commit 059808f

File tree

5 files changed

+23
-78
lines changed

5 files changed

+23
-78
lines changed

crates/aptos-drop-helper/src/async_concurrent_dropper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl AsyncConcurrentDropper {
2828
Self {
2929
name,
3030
num_tasks_tracker: Arc::new(NumTasksTracker::new(max_tasks)),
31-
thread_pool: ThreadPool::new(num_threads),
31+
thread_pool: ThreadPool::with_name(format!("{}_conc_dropper", name), num_threads),
3232
}
3333
}
3434

storage/aptosdb/src/state_store/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ impl StateDb {
227227

228228
impl DbReader for StateStore {
229229
fn get_buffered_state_base(&self) -> Result<SparseMerkleTree<StateValue>> {
230-
Ok(self.smt_ancestors.lock().get_youngest()?)
230+
Ok(self.smt_ancestors.lock().get_youngest())
231231
}
232232

233233
/// Returns the latest state snapshot strictly before `next_version` if any.

storage/scratchpad/src/sparse_merkle/ancestors.rs

Lines changed: 20 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3,96 +3,44 @@
33

44
#![forbid(unsafe_code)]
55

6-
use crate::{sparse_merkle::metrics::TIMER, SparseMerkleTree};
6+
use crate::{
7+
sparse_merkle::{dropper::SUBTREE_DROPPER, metrics::TIMER},
8+
SparseMerkleTree,
9+
};
710
use aptos_crypto::hash::CryptoHash;
8-
use aptos_drop_helper::async_drop_queue::AsyncDropQueue;
911
use aptos_infallible::Mutex;
10-
use aptos_logger::prelude::*;
1112
use aptos_metrics_core::TimerHelper;
12-
use std::{
13-
collections::VecDeque,
14-
sync::{Arc, MutexGuard},
15-
time::Duration,
16-
};
13+
use std::sync::Arc;
1714

18-
type Result<V> = std::result::Result<V, Error>;
19-
20-
/// Keep the oldest SMTs in a central place so that:
21-
/// 1. elsewhere, dropping the SMT will be fast since this indirectly holds a ref to every tree in
22-
/// the entire forest.
23-
/// 2. this must be invoked somewhere in the critical path so that it can provide some
24-
/// back pressure to slow things down to prevent from leaking memory
15+
/// A container to track the ancestor of the SMTs that represent a committed state (older state is
16+
/// guaranteed to be found in persisted storage.
17+
/// When being queried, back pressure (a slow down) is provided in order to make sure not too many
18+
/// SMTs are being kept in memory.
2519
#[derive(Clone, Debug)]
2620
pub struct SmtAncestors<V: Clone + Send + Sync + 'static> {
27-
/// Keep a queue of ancestors, in hope that the when the oldest is being evicted, it's the
28-
/// the last ref of it, which means the drop latency doesn't impact other code paths.
29-
ancestors: Arc<Mutex<VecDeque<SparseMerkleTree<V>>>>,
30-
/// Drop the oldest ancestor asynchronously in good cases, with limited backlog, providing
31-
/// back pressure when the drops are slow in order to avoid memory leak.
32-
drop_queue: Arc<AsyncDropQueue>,
21+
youngest: Arc<Mutex<SparseMerkleTree<V>>>,
3322
}
3423

3524
impl<V: CryptoHash + Clone + Send + Sync + 'static> SmtAncestors<V> {
36-
const MAX_PENDING_DROPS: usize = 4;
37-
const NUM_ANCESTORS: usize = 8;
25+
const MAX_PENDING_DROPS: usize = 8;
3826

3927
pub fn new(ancestor: SparseMerkleTree<V>) -> Self {
4028
Self {
41-
ancestors: Arc::new(Mutex::new(VecDeque::from(vec![ancestor]))),
42-
drop_queue: Arc::new(AsyncDropQueue::new(
43-
"smt_ancestors",
44-
Self::MAX_PENDING_DROPS,
45-
)),
29+
youngest: Arc::new(Mutex::new(ancestor)),
4630
}
4731
}
4832

49-
fn ancestors(&self) -> MutexGuard<VecDeque<SparseMerkleTree<V>>> {
50-
self.ancestors.lock()
51-
}
33+
pub fn get_youngest(&self) -> SparseMerkleTree<V> {
34+
let _timer = TIMER.timer_with(&["get_youngest_ancestor"]);
5235

53-
pub fn get_youngest(&self) -> Result<SparseMerkleTree<V>> {
54-
self.ancestors()
55-
.back()
56-
.map(SparseMerkleTree::clone)
57-
.ok_or(Error::NotFound)
58-
}
59-
60-
pub fn add(&self, youngest: SparseMerkleTree<V>) {
61-
let _timer = TIMER.timer_with(&["add_smt_ancestor"]);
36+
// The back pressure is on the getting side (which is the execution side) so that it's less
37+
// likely for a lot of blocks locking the same old base SMT.
38+
SUBTREE_DROPPER.wait_for_backlog_drop(Self::MAX_PENDING_DROPS);
6239

63-
let mut ancestors = self.ancestors();
64-
ancestors.push_back(youngest);
65-
66-
if ancestors.len() > Self::NUM_ANCESTORS {
67-
let oldest = ancestors.pop_front().unwrap();
68-
oldest.log_generation("evict_ancestor");
69-
if !oldest.is_the_only_ref() {
70-
sample!(
71-
SampleRate::Duration(Duration::from_secs(1)),
72-
error!(
73-
"Oldest SMT being tracked by SmtAncestors is still referenced elsewhere. Potential memory leak.",
74-
)
75-
);
76-
} else {
77-
self.drop_queue.enqueue_drop(oldest);
78-
}
79-
}
40+
self.youngest.lock().clone()
8041
}
8142

82-
pub fn replace_with(&self, other: SmtAncestors<V>) {
83-
let Self {
84-
ancestors,
85-
drop_queue: _drop_queue,
86-
} = other;
87-
88-
*self.ancestors.lock() = Arc::into_inner(ancestors)
89-
.expect("Not the only ref.")
90-
.into_inner();
43+
pub fn add(&self, youngest: SparseMerkleTree<V>) {
44+
*self.youngest.lock() = youngest;
9145
}
9246
}
93-
94-
#[derive(Debug, thiserror::Error, Eq, PartialEq)]
95-
pub enum Error {
96-
#[error("Ancestor SMT not found.")]
97-
NotFound,
98-
}

storage/scratchpad/src/sparse_merkle/mod.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -598,10 +598,6 @@ where
598598
None
599599
}
600600
}
601-
602-
pub(crate) fn is_the_only_ref(&self) -> bool {
603-
Arc::strong_count(&self.inner) == 1
604-
}
605601
}
606602

607603
/// In tests and benchmark, reference to ancestors are manually managed

storage/scratchpad/src/sparse_merkle/sparse_merkle_test.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ fn test_update() {
400400

401401
// Now prune smt1.
402402
drop(smt1);
403+
SUBTREE_DROPPER.wait_for_backlog_drop(0);
403404

404405
// Verify oldest ancestor
405406
assert_eq_pointee(&smt2.get_oldest_ancestor(), &smt2);

0 commit comments

Comments
 (0)