Skip to content

Commit 070ae88

Browse files
refactor(rootstore): own nodestore logic (#1495)
As mentioned in #1470, there exists the following circular dependency: ```mermaid stateDiagram-v2 s1: Revision Manager s2: RootStore s1 --> s2: Wants historical revision X s2 --> s1: Wants latest revision ``` This PR eliminates the circular dependency above by giving `RootStore` a reference to the file backend, which allows it to construct revisions without having to rely on the storage of another `NodeStore` (previously, we needed the latest `NodeStore` whose storage we would clone). This PR does the following: - `NodeStore` construction with a specified root hash + address now takes in a storage reference rather than another `NodeStore. - Revision construction is now handled by `RootStore` instead of the RevisionManager - Moves revision caching from the RevisionManager to `RootStore` - Adds tests for `RootStore` caching logic Note: there exists an earlier version of this PR (#1480). However, #1480 (comment) pointed out that having to pass in a `NodeStore` to `NodeStore::with_root()` seemed odd. After some thought, I agree with this point and found that passing in a storage reference instead removed the need to have to share revisions between the `RevisionManager` and `RootStore`. Hence, I went with creating a new PR rather than clobbering the existing one.
1 parent baa5686 commit 070ae88

File tree

4 files changed

+168
-62
lines changed

4 files changed

+168
-62
lines changed

firewood/src/db.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1047,8 +1047,11 @@ mod test {
10471047
let latest_value = latest_revision.val(key).unwrap().unwrap();
10481048
assert_eq!(new_value, latest_value.as_ref());
10491049

1050-
let node_store =
1051-
NodeStore::with_root(root_hash.into_hash_type(), root_address, latest_revision);
1050+
let node_store = NodeStore::with_root(
1051+
root_hash.into_hash_type(),
1052+
root_address,
1053+
latest_revision.get_storage(),
1054+
);
10521055

10531056
let retrieved_value = node_store.val(key).unwrap().unwrap();
10541057
assert_eq!(value, retrieved_value.as_ref());

firewood/src/manager.rs

Lines changed: 23 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,21 @@ use parking_lot::{Mutex, RwLock};
1414
use std::collections::{HashMap, VecDeque};
1515
use std::num::NonZero;
1616
use std::path::PathBuf;
17-
use std::sync::{Arc, OnceLock, Weak};
17+
use std::sync::{Arc, OnceLock};
1818

1919
use firewood_storage::logger::{trace, warn};
2020
use metrics::gauge;
2121
use rayon::{ThreadPool, ThreadPoolBuilder};
2222
use typed_builder::TypedBuilder;
23-
use weak_table::WeakValueHashMap;
2423

2524
use crate::merkle::Merkle;
2625
use crate::root_store::RootStore;
2726
use crate::v2::api::{ArcDynDbView, HashKey, OptionalHashKeyExt};
2827

2928
pub use firewood_storage::CacheReadStrategy;
3029
use firewood_storage::{
31-
BranchNode, Committed, FileBacked, FileIoError, HashedNodeReader, ImmutableProposal,
32-
IntoHashType, NodeStore, TrieHash,
30+
BranchNode, Committed, FileBacked, FileIoError, HashedNodeReader, ImmutableProposal, NodeStore,
31+
TrieHash,
3332
};
3433

3534
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, TypedBuilder)]
@@ -69,7 +68,7 @@ pub struct ConfigManager {
6968
pub manager: RevisionManagerConfig,
7069
}
7170

72-
type CommittedRevision = Arc<NodeStore<Committed, FileBacked>>;
71+
pub type CommittedRevision = Arc<NodeStore<Committed, FileBacked>>;
7372
type ProposedRevision = Arc<NodeStore<Arc<ImmutableProposal>, FileBacked>>;
7473

7574
#[derive(Debug)]
@@ -83,7 +82,6 @@ pub(crate) struct RevisionManager {
8382
proposals: Mutex<Vec<ProposedRevision>>,
8483
// committing_proposals: VecDeque<Arc<ProposedImmutable>>,
8584
by_hash: RwLock<HashMap<TrieHash, CommittedRevision>>,
86-
by_rootstore: Mutex<WeakValueHashMap<TrieHash, Weak<NodeStore<Committed, FileBacked>>>>,
8785
threadpool: OnceLock<ThreadPool>,
8886
root_store: Option<RootStore>,
8987
}
@@ -122,20 +120,20 @@ impl RevisionManager {
122120
// from opening the same database simultaneously
123121
fb.lock()?;
124122

125-
let root_store: Option<RootStore> = match config.root_store_dir {
126-
Some(path) => Some(RootStore::new(path).map_err(RevisionManagerError::RootStoreError)?),
127-
None => None,
128-
};
129-
130123
let storage = Arc::new(fb);
131124
let nodestore = Arc::new(NodeStore::open(storage.clone())?);
125+
let root_store = config
126+
.root_store_dir
127+
.map(|path| RootStore::new(path, storage.clone()))
128+
.transpose()
129+
.map_err(RevisionManagerError::RootStoreError)?;
130+
132131
let manager = Self {
133132
max_revisions: config.manager.max_revisions,
134133
historical: RwLock::new(VecDeque::from([nodestore.clone()])),
135134
by_hash: RwLock::new(Default::default()),
136135
proposals: Mutex::new(Default::default()),
137136
// committing_proposals: Default::default(),
138-
by_rootstore: Mutex::new(WeakValueHashMap::new()),
139137
threadpool: OnceLock::new(),
140138
root_store,
141139
};
@@ -322,47 +320,28 @@ impl RevisionManager {
322320
/// Retrieve a committed revision by its root hash.
323321
/// To retrieve a revision involves a few steps:
324322
/// 1. Check the in-memory revision manager.
325-
/// 2. Check the in-memory `RootStore` cache.
326-
/// 3. Check the persistent `RootStore`.
323+
/// 2. Check `RootStore` (if it exists).
327324
pub fn revision(&self, root_hash: HashKey) -> Result<CommittedRevision, RevisionManagerError> {
328325
// 1. Check the in-memory revision manager.
329326
if let Some(revision) = self.by_hash.read().get(&root_hash).cloned() {
330327
return Ok(revision);
331328
}
332329

333-
let mut cache_guard = self.by_rootstore.lock();
334-
335-
// 2. Check the in-memory `RootStore` cache.
336-
if let Some(nodestore) = cache_guard.get(&root_hash) {
337-
return Ok(nodestore);
338-
}
339-
340-
// 3. Check the persistent `RootStore`.
341-
// If the revision exists, get its root address and construct a NodeStore for it.
342-
let root_address = match &self.root_store {
343-
Some(store) => store
344-
.get(&root_hash)
345-
.map_err(RevisionManagerError::RootStoreError)?
330+
// 2. Check `RootStore` (if it exists).
331+
let root_store =
332+
self.root_store
333+
.as_ref()
346334
.ok_or(RevisionManagerError::RevisionNotFound {
347335
provided: root_hash.clone(),
348-
})?,
349-
None => {
350-
return Err(RevisionManagerError::RevisionNotFound {
351-
provided: root_hash.clone(),
352-
});
353-
}
354-
};
355-
356-
let nodestore = Arc::new(NodeStore::with_root(
357-
root_hash.clone().into_hash_type(),
358-
root_address,
359-
self.current_revision(),
360-
));
361-
362-
// Cache the nodestore (stored as a weak reference).
363-
cache_guard.insert(root_hash, nodestore.clone());
336+
})?;
337+
let revision = root_store
338+
.get(&root_hash)
339+
.map_err(RevisionManagerError::RootStoreError)?
340+
.ok_or(RevisionManagerError::RevisionNotFound {
341+
provided: root_hash.clone(),
342+
})?;
364343

365-
Ok(nodestore)
344+
Ok(revision)
366345
}
367346

368347
pub fn root_hash(&self) -> Result<Option<HashKey>, RevisionManagerError> {

firewood/src/root_store.rs

Lines changed: 129 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,17 @@
22
// See the file LICENSE.md for licensing terms.
33

44
use fjall::{Config, Keyspace, PartitionCreateOptions, PartitionHandle, PersistMode};
5-
use std::path::Path;
5+
use parking_lot::Mutex;
6+
use std::{
7+
path::Path,
8+
sync::{Arc, Weak},
9+
};
10+
use weak_table::WeakValueHashMap;
611

712
use derive_where::derive_where;
8-
use firewood_storage::{LinearAddress, TrieHash};
13+
use firewood_storage::{Committed, FileBacked, IntoHashType, LinearAddress, NodeStore, TrieHash};
14+
15+
use crate::manager::CommittedRevision;
916

1017
const FJALL_PARTITION_NAME: &str = "firewood";
1118

@@ -14,25 +21,36 @@ const FJALL_PARTITION_NAME: &str = "firewood";
1421
pub struct RootStore {
1522
keyspace: Keyspace,
1623
items: PartitionHandle,
24+
storage: Arc<FileBacked>,
25+
/// Cache of reconstructed revisions by hash.
26+
revision_cache: Mutex<WeakValueHashMap<TrieHash, Weak<NodeStore<Committed, FileBacked>>>>,
1727
}
1828

1929
impl RootStore {
2030
/// Creates or opens an instance of `RootStore`.
2131
///
2232
/// Args:
23-
/// - path: the directory where `RootStore` will write to.
33+
/// - `path`: the directory where `RootStore` will write to.
34+
/// - `storage`: the underlying store to create nodestores from.
2435
///
2536
/// # Errors
2637
///
2738
/// Will return an error if unable to create or open an instance of `RootStore`.
2839
pub fn new<P: AsRef<Path>>(
2940
path: P,
41+
storage: Arc<FileBacked>,
3042
) -> Result<RootStore, Box<dyn std::error::Error + Send + Sync>> {
3143
let keyspace = Config::new(path).open()?;
3244
let items =
3345
keyspace.open_partition(FJALL_PARTITION_NAME, PartitionCreateOptions::default())?;
46+
let revision_cache = Mutex::new(WeakValueHashMap::new());
3447

35-
Ok(Self { keyspace, items })
48+
Ok(Self {
49+
keyspace,
50+
items,
51+
storage,
52+
revision_cache,
53+
})
3654
}
3755

3856
/// `add_root` persists a revision's address to `RootStore`.
@@ -57,24 +75,127 @@ impl RootStore {
5775
Ok(())
5876
}
5977

60-
/// `get` returns the address of a revision.
78+
/// `get` retrieves a committed revision by its hash.
79+
///
80+
/// To retrieve a committed revision involves a few steps:
81+
/// 1. Check if the committed revision is cached.
82+
/// 2. If the committed revision is not cached, query the underlying
83+
/// datastore for the revision's root address.
84+
/// 3. Construct the committed revision.
6185
///
6286
/// Args:
6387
/// - hash: the hash of the revision
6488
///
6589
/// # Errors
6690
///
67-
/// Will return an error if unable to query the underlying datastore.
91+
/// Will return an error if unable to query the underlying datastore or if
92+
/// the stored address is invalid.
93+
///
94+
/// # Panics
95+
///
96+
/// Will panic if the latest revision does not exist.
6897
pub fn get(
6998
&self,
7099
hash: &TrieHash,
71-
) -> Result<Option<LinearAddress>, Box<dyn std::error::Error + Send + Sync>> {
100+
) -> Result<Option<CommittedRevision>, Box<dyn std::error::Error + Send + Sync>> {
101+
// Obtain the lock to prevent multiple threads from caching the same result.
102+
let mut revision_cache = self.revision_cache.lock();
103+
104+
// 1. Check if the committed revision is cached.
105+
if let Some(v) = revision_cache.get(hash) {
106+
return Ok(Some(v));
107+
}
108+
109+
// 2. If the committed revision is not cached, query the underlying
110+
// datastore for the revision's root address.
72111
let Some(v) = self.items.get(**hash)? else {
73112
return Ok(None);
74113
};
75114

76115
let array: [u8; 8] = v.as_ref().try_into()?;
116+
let addr = LinearAddress::new(u64::from_be_bytes(array))
117+
.ok_or("invalid address: empty address")?;
118+
119+
// 3. Construct the committed revision.
120+
let nodestore = Arc::new(NodeStore::with_root(
121+
hash.clone().into_hash_type(),
122+
addr,
123+
self.storage.clone(),
124+
));
125+
126+
// Cache for future lookups.
127+
revision_cache.insert(hash.clone(), nodestore.clone());
128+
129+
Ok(Some(nodestore))
130+
}
131+
}
132+
133+
#[cfg(test)]
134+
#[allow(clippy::unwrap_used)]
135+
mod tests {
136+
use super::*;
137+
use firewood_storage::{CacheReadStrategy, FileBacked, NodeStore};
138+
use std::num::NonZero;
139+
use std::sync::Arc;
140+
141+
#[test]
142+
fn test_cache_hit() {
143+
let tmpdir = tempfile::tempdir().unwrap();
144+
145+
let db_path = tmpdir.as_ref().join("testdb");
146+
let file_backed = Arc::new(
147+
FileBacked::new(
148+
db_path,
149+
NonZero::new(1024).unwrap(),
150+
NonZero::new(1024).unwrap(),
151+
false,
152+
true,
153+
CacheReadStrategy::WritesOnly,
154+
)
155+
.unwrap(),
156+
);
157+
158+
let root_store_dir = tmpdir.as_ref().join("root_store");
159+
let root_store = RootStore::new(root_store_dir, file_backed.clone()).unwrap();
160+
161+
// Create a revision to cache.
162+
let revision = Arc::new(NodeStore::new_empty_committed(file_backed.clone()));
163+
164+
let hash = TrieHash::from_bytes([1; 32]);
165+
root_store
166+
.revision_cache
167+
.lock()
168+
.insert(hash.clone(), revision.clone());
169+
170+
// Since the underlying datastore is empty, this should get the revision
171+
// from the cache.
172+
let retrieved_revision = root_store.get(&hash).unwrap().unwrap();
173+
174+
assert!(Arc::ptr_eq(&revision, &retrieved_revision));
175+
}
176+
177+
#[test]
178+
fn test_nonexistent_revision() {
179+
let tmpdir = tempfile::tempdir().unwrap();
180+
181+
let db_path = tmpdir.as_ref().join("testdb");
182+
let file_backed = Arc::new(
183+
FileBacked::new(
184+
db_path,
185+
NonZero::new(1024).unwrap(),
186+
NonZero::new(1024).unwrap(),
187+
false,
188+
true,
189+
CacheReadStrategy::WritesOnly,
190+
)
191+
.unwrap(),
192+
);
193+
194+
let root_store_dir = tmpdir.as_ref().join("root_store");
195+
let root_store = RootStore::new(root_store_dir, file_backed.clone()).unwrap();
77196

78-
Ok(LinearAddress::new(u64::from_be_bytes(array)))
197+
// Try to get a hash that doesn't exist in the cache nor in the underlying datastore.
198+
let nonexistent_hash = TrieHash::from_bytes([1; 32]);
199+
assert!(root_store.get(&nonexistent_hash).unwrap().is_none());
79200
}
80201
}

storage/src/nodestore/mod.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,21 +160,15 @@ impl<S: ReadableStorage> NodeStore<Committed, S> {
160160
///
161161
/// This constructor is used when you have an existing root node at a known
162162
/// address and hash, typically when reconstructing a [`NodeStore`] from
163-
/// a committed state. The `latest_nodestore` provides access to the underlying
164-
/// storage backend containing the persisted trie data.
163+
/// a committed state.
165164
///
166165
/// ## Panics
167166
///
168167
/// Panics in debug builds if the hash of the node at `root_address` does
169168
/// not equal `root_hash`.
170169
#[must_use]
171-
pub fn with_root(
172-
root_hash: HashType,
173-
root_address: LinearAddress,
174-
latest_nodestore: Arc<NodeStore<Committed, S>>,
175-
) -> Self {
170+
pub fn with_root(root_hash: HashType, root_address: LinearAddress, storage: Arc<S>) -> Self {
176171
let header = NodeStoreHeader::with_root(Some(root_address));
177-
let storage = latest_nodestore.storage.clone();
178172

179173
let nodestore = NodeStore {
180174
header,
@@ -201,6 +195,15 @@ impl<S: ReadableStorage> NodeStore<Committed, S> {
201195
}
202196
}
203197

198+
impl<S: ReadableStorage> NodeStore<Committed, S> {
199+
/// Get the underlying storage for a `NodeStore`.
200+
#[cfg(any(test, feature = "test_utils"))]
201+
#[must_use]
202+
pub fn get_storage(&self) -> Arc<S> {
203+
self.storage.clone()
204+
}
205+
}
206+
204207
/// Some nodestore kinds implement Parentable.
205208
///
206209
/// This means that the nodestore can have children.

0 commit comments

Comments
 (0)