Skip to content

Commit d63cb45

Browse files
committed
feat: replace PersistSemaphore
1 parent 58126dc commit d63cb45

File tree

1 file changed

+108
-52
lines changed

1 file changed

+108
-52
lines changed

firewood/src/persist_worker.rs

Lines changed: 108 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,11 @@ impl PersistWorker {
109109
channel: PersistChannel::new(commit_count, persist_threshold),
110110
});
111111

112-
let persist_loop = PersistLoop {
113-
shared: shared.clone(),
114-
};
115-
116-
let handle = thread::spawn(move || persist_loop.run());
112+
let bg_shared = shared.clone();
113+
let handle = thread::spawn(move || {
114+
let _guard = PanicGuard(bg_shared.clone());
115+
PersistLoop { shared: bg_shared }.run()
116+
});
117117

118118
Self {
119119
handle: Mutex::new(Some(handle)),
@@ -124,7 +124,14 @@ impl PersistWorker {
124124
/// Sends `committed` to the background thread for persistence. This call
125125
/// blocks if the limit of unpersisted commits has been reached.
126126
pub(crate) fn persist(&self, committed: CommittedRevision) -> Result<(), PersistError> {
127-
self.shared.channel.push(committed)
127+
if let Err(e) = self.shared.channel.push(committed) {
128+
self.join_handle();
129+
self.check_error()?;
130+
131+
return Err(e);
132+
}
133+
134+
Ok(())
128135
}
129136

130137
/// Sends `nodestore` to the background thread for reaping if archival mode
@@ -133,9 +140,15 @@ impl PersistWorker {
133140
&self,
134141
nodestore: NodeStore<Committed, FileBacked>,
135142
) -> Result<(), PersistError> {
136-
if self.shared.root_store.is_none() {
137-
self.shared.channel.reap(nodestore)?;
143+
if self.shared.root_store.is_none()
144+
&& let Err(e) = self.shared.channel.reap(nodestore)
145+
{
146+
self.join_handle();
147+
self.check_error()?;
148+
149+
return Err(e);
138150
}
151+
139152
Ok(())
140153
}
141154

@@ -154,12 +167,13 @@ impl PersistWorker {
154167

155168
/// Close the persist worker and persist `latest_committed_revision`.
156169
pub(crate) fn close(
157-
mut self,
170+
self,
158171
latest_committed_revision: CommittedRevision,
159172
) -> Result<(), PersistError> {
160173
self.shared
161174
.persist_on_shutdown
162-
.set(latest_committed_revision);
175+
.set(latest_committed_revision)
176+
.expect("should be empty");
163177

164178
self.shared.channel.close();
165179
self.join_handle();
@@ -190,15 +204,15 @@ impl PersistWorker {
190204
}
191205

192206
#[derive(Debug)]
193-
struct PersistChannel<T> {
194-
state: Mutex<PersistChannelState<T>>,
207+
struct PersistChannel {
208+
state: Mutex<PersistChannelState>,
195209
/// Condition variable to wake blocked committers when space is available.
196210
commit_not_full: Condvar,
197211
/// Condition variable to wake the persister when persistence is needed.
198212
persist_ready: Condvar,
199213
}
200214

201-
impl<T> PersistChannel<T> {
215+
impl PersistChannel {
202216
const fn new(max_permits: NonZeroU64, persist_threshold: u64) -> Self {
203217
Self {
204218
state: Mutex::new(PersistChannelState {
@@ -229,9 +243,9 @@ impl<T> PersistChannel<T> {
229243
Ok(())
230244
}
231245

232-
fn push(&self, item: T) -> Result<(), PersistError> {
246+
fn push(&self, item: CommittedRevision) -> Result<(), PersistError> {
233247
let mut state = self.state.lock();
234-
while state.permits_available == 0 {
248+
while state.permits_available == 0 && !state.shutdown {
235249
self.commit_not_full.wait(&mut state);
236250
}
237251

@@ -249,37 +263,39 @@ impl<T> PersistChannel<T> {
249263
Ok(())
250264
}
251265

252-
fn pop(&self) -> Result<PersistDataWrapper<'_, T>, PersistError> {
253-
let mut state = self.state.lock();
254-
let (permits_to_release, pending_reaps, data) = loop {
255-
// Shutdown requested. Return error.
256-
if state.shutdown {
257-
return Err(PersistError::ChannelDisconnected);
266+
fn pop(&self) -> Result<PersistDataWrapper<'_>, PersistError> {
267+
let (permits_to_release, pending_reaps, data) = {
268+
let mut state = self.state.lock();
269+
loop {
270+
// Shutdown requested. Return error.
271+
if state.shutdown {
272+
return Err(PersistError::ChannelDisconnected);
273+
}
274+
// Unblock to persist when permits available <= threshold
275+
if state.permits_available <= state.persist_threshold && state.data.is_some() {
276+
break (
277+
state
278+
.max_permits
279+
.get()
280+
.saturating_sub(state.permits_available),
281+
std::mem::take(&mut state.pending_reaps),
282+
state.data.take(),
283+
);
284+
}
285+
// Unblock even if we haven't met the threshold if there are pending reaps.
286+
// Permits to release is set to 0, and committed revision is not taken.
287+
if !state.pending_reaps.is_empty() {
288+
break (0, std::mem::take(&mut state.pending_reaps), None);
289+
}
290+
// Block until it is woken up by the committer thread.
291+
self.persist_ready.wait(&mut state);
258292
}
259-
// Unblock to persist when permits available <= threshold
260-
if state.permits_available <= state.persist_threshold && state.data.is_some() {
261-
break (
262-
state
263-
.max_permits
264-
.get()
265-
.saturating_sub(state.permits_available),
266-
std::mem::take(&mut state.pending_reaps),
267-
state.data.take(),
268-
);
269-
}
270-
// Unblock even if we haven't met the threshold if there are pending reaps.
271-
// Permits to release is set to 0, and committed revision is not taken.
272-
if !state.pending_reaps.is_empty() {
273-
break (0, std::mem::take(&mut state.pending_reaps), None);
274-
}
275-
// Block until it is woken up by the committer thread.
276-
self.persist_ready.wait(&mut state);
277293
};
278294
Ok(PersistDataWrapper {
279295
channel: self,
280296
permits_to_release,
281297
pending_reaps,
282-
data,
298+
latest_committed: data,
283299
})
284300
}
285301

@@ -311,25 +327,37 @@ impl<T> PersistChannel<T> {
311327
}
312328

313329
#[derive(Debug)]
314-
struct PersistChannelState<T> {
330+
struct PersistChannelState {
315331
permits_available: u64,
316332
/// Maximum number of unpersisted commits allowed.
317333
max_permits: NonZeroU64,
318334
/// Persist when remaining permits are at or below this threshold.
319335
persist_threshold: u64,
320336
shutdown: bool,
321337
pending_reaps: Vec<NodeStore<Committed, FileBacked>>,
322-
data: Option<T>,
338+
data: Option<CommittedRevision>,
323339
}
324340

325-
struct PersistDataWrapper<'a, T> {
326-
channel: &'a PersistChannel<T>,
341+
/// RAII wrapper returned by [`PersistChannel::pop`] that carries the data for
342+
/// one persistence cycle (a committed revision and/or pending reaps).
343+
///
344+
/// On drop, it returns consumed permits back to the channel and notifies
345+
/// blocked committers via [`commit_not_full`](PersistChannel::commit_not_full),
346+
/// ensuring backpressure is released even if the persist loop exits early due
347+
/// to an error.
348+
struct PersistDataWrapper<'a> {
349+
channel: &'a PersistChannel,
350+
/// Number of permits consumed by commits since the last persist cycle.
351+
/// Released back to the channel on drop.
327352
permits_to_release: u64,
353+
/// Expired node stores whose deleted nodes should be returned to free lists.
328354
pending_reaps: Vec<NodeStore<Committed, FileBacked>>,
329-
data: Option<T>,
355+
/// The latest committed revision to persist, if the threshold was reached.
356+
/// `None` when this cycle was triggered solely by pending reaps.
357+
latest_committed: Option<CommittedRevision>,
330358
}
331359

332-
impl<T> Drop for PersistDataWrapper<'_, T> {
360+
impl Drop for PersistDataWrapper<'_> {
333361
/// Handle permit release on drop of the wrapper
334362
fn drop(&mut self) {
335363
if self.permits_to_release == 0 {
@@ -359,7 +387,21 @@ struct SharedState {
359387
/// Unpersisted revision to persist on shutdown.
360388
persist_on_shutdown: OnceLock<CommittedRevision>,
361389
/// Channel for coordinating persist and reap operations.
362-
channel: PersistChannel<CommittedRevision>,
390+
channel: PersistChannel,
391+
}
392+
393+
/// Closes the persist channel if the background thread panics.
394+
///
395+
/// This ensures blocked committers are woken up and see the shutdown
396+
/// state rather than blocking indefinitely.
397+
struct PanicGuard(Arc<SharedState>);
398+
399+
impl Drop for PanicGuard {
400+
fn drop(&mut self) {
401+
if std::thread::panicking() {
402+
self.0.channel.close();
403+
}
404+
}
363405
}
364406

365407
/// The background persistence loop that runs in a separate thread.
@@ -393,9 +435,9 @@ impl PersistLoop {
393435
self.reap(nodestore)?;
394436
}
395437

396-
if let Some(revision) = persist_data.data.take() {
438+
if let Some(revision) = persist_data.latest_committed.take() {
397439
self.persist_to_disk(&revision)
398-
.and_then(|()| self.save_to_root_store(&revision))?;
440+
.and_then(|()| self.maybe_save_to_root_store(&revision))?;
399441
}
400442
}
401443

@@ -404,7 +446,7 @@ impl PersistLoop {
404446
&& !self.shared.channel.empty()
405447
{
406448
self.persist_to_disk(&revision)
407-
.and_then(|()| self.save_to_root_store(&revision))?;
449+
.and_then(|()| self.maybe_save_to_root_store(&revision))?;
408450
}
409451

410452
Ok(())
@@ -423,7 +465,10 @@ impl PersistLoop {
423465
fn reap(&self, nodestore: NodeStore<Committed, FileBacked>) -> Result<(), PersistError> {
424466
nodestore
425467
.reap_deleted(&mut self.shared.header.lock())
426-
.map_err(|e| PersistError::FileIo(Arc::new(e)))
468+
.map_err(|e| {
469+
error!("Failed to reap deleted nodes: {e}");
470+
PersistError::FileIo(Arc::new(e))
471+
})
427472
}
428473

429474
/// Saves the revision's root address to `RootStore` if configured.
@@ -438,9 +483,20 @@ impl PersistLoop {
438483
}
439484
}
440485

486+
#[crate::metrics("persist.root_store", "persist revision address to root store")]
487+
fn save_to_root_store(
488+
store: &RootStore,
489+
hash: &TrieHash,
490+
addr: &LinearAddress,
491+
) -> Result<(), PersistError> {
492+
store.add_root(hash, addr).map_err(|e| {
493+
error!("Failed to persist revision address to RootStore: {e}");
494+
PersistError::RootStore(e.into())
495+
})
496+
}
497+
441498
impl SharedState {
442499
#[cfg(test)]
443-
#[allow(clippy::arithmetic_side_effects)]
444500
fn wait_all_released(&self) {
445501
self.channel.wait_all_released();
446502
}

0 commit comments

Comments
 (0)