Skip to content

Commit 1ca6d10

Browse files
committed
fix: reap after persisting
1 parent b95c6b0 commit 1ca6d10

File tree

1 file changed

+129
-17
lines changed

1 file changed

+129
-17
lines changed

firewood/src/persist_worker.rs

Lines changed: 129 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,10 @@ impl PersistChannel {
293293
state.permits_available == state.max_permits.get()
294294
}
295295

296-
/// Enqueues `nodestore` for reaping.
296+
/// Enqueues `nodestore` for reaping on the next persist cycle.
297+
///
298+
/// The reap is not processed immediately; it is buffered until the next
299+
/// call to [`pop`](Self::pop) drains it alongside a persist.
297300
///
298301
/// ## Errors
299302
///
@@ -304,7 +307,6 @@ impl PersistChannel {
304307
return Err(PersistError::ChannelDisconnected);
305308
}
306309
state.pending_reaps.push(nodestore);
307-
self.persist_ready.notify_one();
308310

309311
Ok(())
310312
}
@@ -335,8 +337,9 @@ impl PersistChannel {
335337
Ok(())
336338
}
337339

338-
/// Blocks until there is work to do, then returns a [`PersistDataGuard`]
339-
/// containing the pending reaps and/or the latest committed revision.
340+
/// Blocks until the persist threshold is reached, then returns a
341+
/// [`PersistDataGuard`] containing the latest committed revision and any
342+
/// pending reaps that have accumulated since the last cycle.
340343
///
341344
/// ## Errors
342345
///
@@ -362,11 +365,6 @@ impl PersistChannel {
362365
state.latest_committed.take(),
363366
);
364367
}
365-
// Unblock even if we haven't met the threshold if there are pending reaps.
366-
// Permits to release is set to 0, and committed revision is not taken.
367-
if !state.pending_reaps.is_empty() {
368-
break (0, std::mem::take(&mut state.pending_reaps), None);
369-
}
370368
// Block until it is woken up by the committer thread.
371369
self.persist_ready.wait(&mut state);
372370
}
@@ -426,7 +424,7 @@ struct PersistChannelState {
426424
}
427425

428426
/// RAII guard returned by [`PersistChannel::pop`] that carries the data for
429-
/// one persistence cycle (a committed revision and/or pending reaps).
427+
/// one persistence cycle (a committed revision and any accumulated reaps).
430428
///
431429
/// On drop, it returns consumed permits back to the channel and notifies
432430
/// blocked committers via [`commit_not_full`](PersistChannel::commit_not_full),
@@ -439,8 +437,7 @@ struct PersistDataGuard<'a> {
439437
permits_to_release: u64,
440438
/// Expired node stores whose deleted nodes should be returned to free lists.
441439
pending_reaps: Vec<NodeStore<Committed, FileBacked>>,
442-
/// The latest committed revision to persist, if the threshold was reached.
443-
/// `None` when this cycle was triggered solely by pending reaps.
440+
/// The latest committed revision to persist.
444441
latest_committed: Option<CommittedRevision>,
445442
}
446443

@@ -509,20 +506,25 @@ impl PersistLoop {
509506
}
510507

511508
/// Processes pending work until shutdown or an error occurs.
509+
///
510+
/// Reaps are always processed *after* a persist so that the on-disk header
511+
/// has been updated past the nodes being freed. This prevents issue #1737
512+
/// where a crash between a reap and a persist would leave the header
513+
/// pointing to freed nodes.
512514
fn event_loop(&self) -> Result<(), PersistError> {
513515
loop {
514516
let Ok(mut persist_data) = self.shared.channel.pop() else {
515-
break; // Channel failed due to a shutdown. Break to handle shutdown gracefully.
517+
break; // Channel closed due to shutdown. Break to handle shutdown gracefully.
516518
};
517519

518-
for nodestore in std::mem::take(&mut persist_data.pending_reaps) {
519-
self.reap(nodestore)?;
520-
}
521-
522520
if let Some(revision) = persist_data.latest_committed.take() {
523521
self.persist_to_disk(&revision)
524522
.and_then(|()| self.maybe_save_to_root_store(&revision))?;
525523
}
524+
525+
for nodestore in std::mem::take(&mut persist_data.pending_reaps) {
526+
self.reap(nodestore)?;
527+
}
526528
}
527529

528530
// Persist the last unpersisted revision on shutdown
@@ -585,3 +587,113 @@ impl SharedState {
585587
self.channel.wait_all_released();
586588
}
587589
}
590+
591+
#[cfg(test)]
592+
#[allow(clippy::unwrap_used)]
593+
mod tests {
594+
use super::*;
595+
596+
use firewood_storage::{
597+
CacheReadStrategy, FileBacked, ImmutableProposal, LeafNode, Node, NodeHashAlgorithm,
598+
NodeStore, NodeStoreHeader, Path,
599+
};
600+
use nonzero_ext::nonzero;
601+
602+
/// Creates a committed revision with a single leaf node from the given parent.
603+
fn commit_leaf(
604+
parent: &NodeStore<impl firewood_storage::Parentable, FileBacked>,
605+
value: &[u8],
606+
) -> NodeStore<firewood_storage::Committed, FileBacked> {
607+
let mut proposal = NodeStore::new(parent).unwrap();
608+
*proposal.root_mut() = Some(Node::Leaf(LeafNode {
609+
partial_path: Path::from(&[1, 2, 3]),
610+
value: value.to_vec().into_boxed_slice(),
611+
}));
612+
let immutable = NodeStore::<Arc<ImmutableProposal>, _>::try_from(proposal).unwrap();
613+
immutable.as_committed()
614+
}
615+
616+
/// Regression test for issue #1737.
617+
///
618+
/// Reaps free deleted nodes on disk. If a reap frees nodes belonging to the
619+
/// last persisted revision without a subsequent persist updating the header,
620+
/// the on-disk header is left pointing to freed space. On reopen, reading
621+
/// the root node fails with "attempt to read freed area".
622+
///
623+
/// The fix ensures reaps are only processed *after* a persist that moves
624+
/// the header forward, so nodes referenced by the header are never freed
625+
/// prematurely.
626+
#[test]
627+
fn test_reopen_succeeds_after_reap_without_persist() {
628+
let tmpdir = tempfile::tempdir().unwrap();
629+
let dbfile = tmpdir.path().join("test.db");
630+
631+
let storage = Arc::new(
632+
FileBacked::new(
633+
dbfile.clone(),
634+
nonzero!(10usize),
635+
nonzero!(10usize),
636+
false,
637+
true,
638+
CacheReadStrategy::WritesOnly,
639+
NodeHashAlgorithm::compile_option(),
640+
)
641+
.unwrap(),
642+
);
643+
644+
let header = NodeStoreHeader::new(NodeHashAlgorithm::compile_option());
645+
let empty = NodeStore::new_empty_committed(storage.clone());
646+
647+
let rev1: CommittedRevision = commit_leaf(&empty, b"value1").into();
648+
649+
// rev2.deleted = [rev1's root] because NodeStore::new adds the
650+
// parent's root to the proposal's deleted list.
651+
let rev2 = commit_leaf(&rev1, b"value2");
652+
653+
// With commit_count=1, persist_interval=1: every Commit triggers an
654+
// immediate persist and resets permits to max.
655+
let worker = PersistWorker::new(nonzero!(1u64), header, None);
656+
657+
// Persist rev1. The bg thread persists immediately (threshold reached),
658+
// so the on-disk header now points to rev1's root.
659+
worker.persist(rev1.clone()).unwrap();
660+
worker.wait_persisted();
661+
662+
// Send rev2 for reaping. The reap is buffered (not processed) because
663+
// no persist follows to make it safe.
664+
worker.reap(rev2).unwrap();
665+
666+
// Close the worker. No unpersisted commits remain, so close() does NOT
667+
// trigger a shutdown persist. The buffered reap is never processed and
668+
// rev1's root remains intact on disk.
669+
let rev_dummy: CommittedRevision = NodeStore::new_empty_committed(storage.clone()).into();
670+
worker.close(rev_dummy).unwrap();
671+
672+
// Record rev1's root address before dropping storage.
673+
let root_addr = rev1.root_address().expect("rev1 has root");
674+
drop(rev1);
675+
drop(storage);
676+
677+
// Reopen: the header still points to rev1's root, which was NOT freed.
678+
let storage = Arc::new(
679+
FileBacked::new(
680+
dbfile,
681+
nonzero!(10usize),
682+
nonzero!(10usize),
683+
false,
684+
false,
685+
CacheReadStrategy::WritesOnly,
686+
NodeHashAlgorithm::compile_option(),
687+
)
688+
.unwrap(),
689+
);
690+
let header = NodeStoreHeader::read_from_storage(storage.as_ref()).unwrap();
691+
let nodestore = NodeStore::open(&header, storage).expect("open succeeds (hash cached)");
692+
693+
// Reading the root from disk should succeed because the deferred reap
694+
// was never processed (no persist followed to make it safe).
695+
nodestore
696+
.read_node_from_disk(root_addr, "test")
697+
.expect("root should be readable after reopen");
698+
}
699+
}

0 commit comments

Comments
 (0)