Skip to content

Commit e9234fe

Browse files
committed
fix: reap after persisting
1 parent 9f29c24 commit e9234fe

File tree

1 file changed

+120
-16
lines changed

1 file changed

+120
-16
lines changed

firewood/src/persist_worker.rs

Lines changed: 120 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,10 @@ impl PersistChannel {
284284
state.permits_available == state.max_permits.get()
285285
}
286286

287-
/// Enqueues `nodestore` for reaping.
287+
/// Enqueues `nodestore` for reaping on the next persist cycle.
288+
///
289+
/// The reap is not processed immediately; it is buffered until the next
290+
/// call to [`pop`](Self::pop) drains it alongside a persist.
288291
///
289292
/// ## Errors
290293
///
@@ -295,7 +298,6 @@ impl PersistChannel {
295298
return Err(PersistError::Shutdown);
296299
}
297300
state.pending_reaps.push(nodestore);
298-
self.persist_ready.notify_one();
299301

300302
Ok(())
301303
}
@@ -326,8 +328,9 @@ impl PersistChannel {
326328
Ok(())
327329
}
328330

329-
/// Blocks until there is work to do, then returns a [`PersistDataGuard`]
330-
/// containing the pending reaps and/or the latest committed revision.
331+
/// Blocks until the persist threshold is reached, then returns a
332+
/// [`PersistDataGuard`] containing the latest committed revision and any
333+
/// pending reaps that have accumulated since the last cycle.
331334
///
332335
/// ## Errors
333336
///
@@ -353,11 +356,6 @@ impl PersistChannel {
353356
state.latest_committed.take(),
354357
);
355358
}
356-
// Unblock even if we haven't met the threshold if there are pending reaps.
357-
// Permits to release is set to 0, and committed revision is not taken.
358-
if !state.pending_reaps.is_empty() {
359-
break (0, std::mem::take(&mut state.pending_reaps), None);
360-
}
361359
// Block until it is woken up by the committer thread.
362360
self.persist_ready.wait(&mut state);
363361
}
@@ -417,7 +415,7 @@ struct PersistChannelState {
417415
}
418416

419417
/// RAII guard returned by [`PersistChannel::pop`] that carries the data for
420-
/// one persistence cycle (a committed revision and/or pending reaps).
418+
/// one persistence cycle (a committed revision and any accumulated reaps).
421419
///
422420
/// On drop, it returns consumed permits back to the channel and notifies
423421
/// blocked committers via [`commit_not_full`](PersistChannel::commit_not_full),
@@ -430,8 +428,7 @@ struct PersistDataGuard<'a> {
430428
permits_to_release: u64,
431429
/// Expired node stores whose deleted nodes should be returned to free lists.
432430
pending_reaps: Vec<NodeStore<Committed, FileBacked>>,
433-
/// The latest committed revision to persist, if the threshold was reached.
434-
/// `None` when this cycle was triggered solely by pending reaps.
431+
/// The latest committed revision to persist.
435432
latest_committed: Option<CommittedRevision>,
436433
}
437434

@@ -497,16 +494,21 @@ impl PersistLoop {
497494
}
498495

499496
/// Processes pending work until shutdown or an error occurs.
497+
///
498+
/// Reaps are always processed *after* a persist so that the on-disk header
499+
/// has been updated past the nodes being freed. This prevents `https://github.com/ava-labs/firewood/issues/1737`
500+
/// where a crash between a reap and a persist would leave the header pointing
501+
/// to freed nodes.
500502
fn event_loop(&self) -> Result<(), PersistError> {
501503
while let Ok(mut persist_data) = self.shared.channel.pop() {
502-
for nodestore in std::mem::take(&mut persist_data.pending_reaps) {
503-
self.reap(nodestore)?;
504-
}
505-
506504
if let Some(revision) = persist_data.latest_committed.take() {
507505
self.persist_to_disk(&revision)
508506
.and_then(|()| self.maybe_save_to_root_store(&revision))?;
509507
}
508+
509+
for nodestore in std::mem::take(&mut persist_data.pending_reaps) {
510+
self.reap(nodestore)?;
511+
}
510512
}
511513

512514
// Persist the last unpersisted revision on shutdown
@@ -569,3 +571,105 @@ impl SharedState {
569571
self.channel.wait_all_released();
570572
}
571573
}
574+
575+
#[cfg(test)]
576+
#[allow(clippy::unwrap_used)]
577+
mod tests {
578+
use super::*;
579+
580+
use firewood_storage::{
581+
CacheReadStrategy, Committed, FileBacked, ImmutableProposal, LeafNode, Node,
582+
NodeHashAlgorithm, NodeStore, NodeStoreHeader, Parentable, Path,
583+
};
584+
use nonzero_ext::nonzero;
585+
586+
/// Creates a committed revision with a single leaf node from the given parent.
587+
fn commit_leaf(
588+
parent: &NodeStore<impl Parentable, FileBacked>,
589+
value: &[u8],
590+
) -> NodeStore<Committed, FileBacked> {
591+
let mut proposal = NodeStore::new(parent).unwrap();
592+
*proposal.root_mut() = Some(Node::Leaf(LeafNode {
593+
partial_path: Path::from(&[1, 2, 3]),
594+
value: value.to_vec().into_boxed_slice(),
595+
}));
596+
let immutable = NodeStore::<Arc<ImmutableProposal>, _>::try_from(proposal).unwrap();
597+
immutable.as_committed()
598+
}
599+
600+
/// Regression test for `https://github.com/ava-labs/firewood/issues/1737`
601+
///
602+
/// Tests that reaps are processed *after* a persist moves the header
603+
/// forward, so nodes referenced by the header are never freed prematurely.
604+
#[test]
605+
fn test_reopen_succeeds_after_reap_without_persist() {
606+
let tmpdir = tempfile::tempdir().unwrap();
607+
let dbfile = tmpdir.path().join("test.db");
608+
609+
let storage = Arc::new(
610+
FileBacked::new(
611+
dbfile.clone(),
612+
nonzero!(10usize),
613+
nonzero!(10usize),
614+
false,
615+
true,
616+
CacheReadStrategy::WritesOnly,
617+
NodeHashAlgorithm::compile_option(),
618+
)
619+
.unwrap(),
620+
);
621+
622+
let header = NodeStoreHeader::new(NodeHashAlgorithm::compile_option());
623+
let empty = NodeStore::new_empty_committed(storage.clone());
624+
625+
let rev1: CommittedRevision = commit_leaf(&empty, b"value1").into();
626+
627+
// rev2.deleted = [rev1's root] because NodeStore::new adds the
628+
// parent's root to the proposal's deleted list.
629+
let rev2 = commit_leaf(&rev1, b"value2");
630+
631+
// With commit_count=1, persist_interval=1: every Commit triggers an
632+
// immediate persist and resets permits to max.
633+
let worker = PersistWorker::new(nonzero!(1u64), header, None);
634+
635+
// Persist rev1. The bg thread persists immediately (threshold reached),
636+
// so the on-disk header now points to rev1's root.
637+
worker.persist(rev1.clone()).unwrap();
638+
worker.wait_persisted();
639+
640+
// Send rev2 for reaping. The reap is buffered (not processed) because
641+
// no persist follows to make it safe.
642+
worker.reap(rev2).unwrap();
643+
644+
// Close the worker. No unpersisted commits remain, so close() does NOT
645+
// trigger a shutdown persist. The buffered reap is never processed and
646+
// rev1's root remains intact on disk.
647+
let empty_revision: CommittedRevision =
648+
NodeStore::new_empty_committed(storage.clone()).into();
649+
worker.close(empty_revision).unwrap();
650+
651+
// Record rev1's root address before dropping storage.
652+
let root_addr = rev1.root_address().expect("rev1 has root");
653+
drop(storage);
654+
655+
// Reopen: the header still points to rev1's root, which was NOT freed.
656+
let storage = Arc::new(
657+
FileBacked::new(
658+
dbfile,
659+
nonzero!(10usize),
660+
nonzero!(10usize),
661+
false,
662+
false,
663+
CacheReadStrategy::WritesOnly,
664+
NodeHashAlgorithm::compile_option(),
665+
)
666+
.unwrap(),
667+
);
668+
let header = NodeStoreHeader::read_from_storage(storage.as_ref()).unwrap();
669+
let nodestore = NodeStore::open(&header, storage).expect("open succeeds (hash cached)");
670+
671+
// Reading the root from disk should succeed because the deferred reap
672+
// was never processed (no persist followed to make it safe).
673+
assert!(nodestore.read_node_from_disk(root_addr, "test").is_ok());
674+
}
675+
}

0 commit comments

Comments
 (0)