Skip to content

Commit 1fe3475

Browse files
committed
fix deadlock
not really related to that, but it started triggerring consistently after I rebased - there could be a race condition between a segment being sealed, stored and registered for checkpoint (the checkpointer could be notified before the segment was added to the tail) - the checkpointer would not be notifier when the last reader left the segment, so we would hang - we could enter a busy loop when we got a notification for a namespace while there was still an in-flight job for that namespace, blocking the runtime.
1 parent 13c0dc0 commit 1fe3475

File tree

9 files changed

+50
-18
lines changed

9 files changed

+50
-18
lines changed

libsql-wal/src/checkpointer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ pub struct Checkpointer<P> {
8989
join_set: JoinSet<(NamespaceName, crate::error::Result<()>)>,
9090
processing: Vec<NamespaceName>,
9191
errors: usize,
92+
/// previous iteration of the loop resulted in no work being enqueued
93+
no_work: bool,
9294
}
9395

9496
#[allow(private_bounds)]
@@ -111,6 +113,7 @@ where
111113
join_set: JoinSet::new(),
112114
processing: Vec::new(),
113115
errors: 0,
116+
no_work: false,
114117
}
115118
}
116119

@@ -164,17 +167,19 @@ where
164167
}
165168
// don't wait if there is stuff to enqueue
166169
_ = std::future::ready(()), if !self.scheduled.is_empty()
167-
&& self.join_set.len() < self.max_checkpointing_conccurency => (),
170+
&& self.join_set.len() < self.max_checkpointing_conccurency && !self.no_work => (),
168171
}
169172

170173
let n_available = self.max_checkpointing_conccurency - self.join_set.len();
171174
if n_available > 0 {
175+
self.no_work = true;
172176
for namespace in self
173177
.scheduled
174178
.difference(&self.checkpointing)
175179
.take(n_available)
176180
.cloned()
177181
{
182+
self.no_work = false;
178183
self.processing.push(namespace.clone());
179184
let perform_checkpoint = self.perform_checkpoint.clone();
180185
self.join_set.spawn(async move {

libsql-wal/src/registry.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ use std::sync::Arc;
66
use dashmap::DashMap;
77
use libsql_sys::ffi::Sqlite3DbHeader;
88
use parking_lot::{Condvar, Mutex};
9+
use rand::Rng;
910
use tokio::sync::{mpsc, Notify, Semaphore};
1011
use tokio::task::JoinSet;
11-
use rand::Rng;
1212
use zerocopy::{AsBytes, FromZeroes};
1313

1414
use crate::checkpointer::CheckpointMessage;
@@ -128,9 +128,8 @@ where
128128
update_durable(fno, notifier, durable_frame_no, namespace).await;
129129
})
130130
});
131-
self.storage
132-
.store(&shared.namespace, sealed.clone(), None, cb);
133-
new.tail().push(sealed);
131+
new.tail().push(sealed.clone());
132+
self.storage.store(&shared.namespace, sealed, None, cb);
134133
}
135134

136135
shared.current.swap(Arc::new(new));
@@ -313,6 +312,7 @@ where
313312
namespace.clone(),
314313
)),
315314
shutdown: false.into(),
315+
checkpoint_notifier: self.checkpoint_notifier.clone(),
316316
});
317317

318318
self.opened

libsql-wal/src/replication/replicator.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,6 @@ mod test {
299299
let mut replica_content = vec![0u8; db_content.len()];
300300
while let Some(f) = stream.next().await {
301301
let frame = f.unwrap();
302-
dbg!(frame.header().page_no());
303302
let offset = (frame.header().page_no() as usize - 1) * 4096;
304303
tmp.as_file()
305304
.write_all_at(frame.data(), offset as u64)

libsql-wal/src/segment/current.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub struct CurrentSegment<F> {
3939
/// lock
4040
read_locks: Arc<AtomicU64>,
4141
sealed: AtomicBool,
42-
/// current runnign checksum
42+
/// current running checksum
4343
current_checksum: AtomicU32,
4444
tail: Arc<SegmentList<SealedSegment<F>>>,
4545
}
@@ -458,8 +458,9 @@ impl<F> CurrentSegment<F> {
458458
self.read_locks().fetch_add(1, Ordering::SeqCst);
459459
}
460460

461-
pub fn dec_reader_count(&self) {
462-
self.read_locks().fetch_sub(1, Ordering::SeqCst);
461+
/// return true if the reader count is 0
462+
pub fn dec_reader_count(&self) -> bool {
463+
self.read_locks().fetch_sub(1, Ordering::SeqCst) - 1 == 0
463464
}
464465

465466
pub fn read_locks(&self) -> &AtomicU64 {
@@ -1017,7 +1018,7 @@ mod test {
10171018

10181019
let tmp = Arc::new(tempdir().unwrap());
10191020
{
1020-
let env = TestEnv::new_io_and_tmp(SyncFailBufferIo::default(), tmp.clone());
1021+
let env = TestEnv::new_io_and_tmp(SyncFailBufferIo::default(), tmp.clone(), false);
10211022
let conn = env.open_conn("test");
10221023
let shared = env.shared("test");
10231024

@@ -1038,7 +1039,7 @@ mod test {
10381039
}
10391040

10401041
{
1041-
let env = TestEnv::new_io_and_tmp(SyncFailBufferIo::default(), tmp.clone());
1042+
let env = TestEnv::new_io_and_tmp(SyncFailBufferIo::default(), tmp.clone(), false);
10421043
let conn = env.open_conn("test");
10431044
// the db was recovered: we lost some rows, but it still works
10441045
conn.query_row("select count(*) from test", (), |row| {

libsql-wal/src/segment/list.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,23 @@ where
8181
where
8282
F: FileExt,
8383
{
84+
struct Guard<'a>(&'a AtomicBool);
85+
impl<'a> Drop for Guard<'a> {
86+
fn drop(&mut self) {
87+
self.0.store(false, Ordering::SeqCst);
88+
}
89+
}
90+
8491
if self
8592
.checkpointing
8693
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
8794
.is_err()
8895
{
8996
return Ok(None);
9097
}
98+
99+
let _g = Guard(&self.checkpointing);
100+
91101
let mut segs = Vec::new();
92102
let mut current = self.head.load();
93103
// find the longest chain of segments that can be checkpointed, iow, segments that do not have
@@ -172,8 +182,6 @@ where
172182

173183
db_file.set_len(size_after as u64 * 4096)?;
174184

175-
self.checkpointing.store(false, Ordering::SeqCst);
176-
177185
Ok(Some(last_replication_index))
178186
}
179187

libsql-wal/src/segment/sealed.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ use zerocopy::{AsBytes, FromZeroes};
1515
use crate::error::Result;
1616
use crate::io::buf::{IoBufMut, ZeroCopyBuf};
1717
use crate::io::file::{BufCopy, FileExt};
18-
use crate::LIBSQL_MAGIC;
1918
use crate::io::Inspect;
19+
use crate::segment::{checked_frame_offset, CheckedFrame};
20+
use crate::LIBSQL_MAGIC;
2021

2122
use super::compacted::{CompactedSegmentDataFooter, CompactedSegmentDataHeader};
22-
use super::{frame_offset, page_offset, Frame, FrameHeader, Segment, SegmentHeader, SegmentFlags};
23+
use super::{frame_offset, page_offset, Frame, Segment, SegmentFlags, SegmentHeader};
2324

2425
/// an immutable, wal segment
2526
#[derive(Debug)]

libsql-wal/src/shared_wal.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use arc_swap::ArcSwap;
77
use crossbeam::deque::Injector;
88
use crossbeam::sync::Unparker;
99
use parking_lot::{Mutex, MutexGuard};
10+
use tokio::sync::mpsc;
1011

12+
use crate::checkpointer::CheckpointMessage;
1113
use crate::error::{Error, Result};
1214
use crate::io::file::FileExt;
1315
use crate::io::Io;
@@ -48,6 +50,7 @@ pub struct SharedWal<IO: Io> {
4850
pub(crate) new_frame_notifier: tokio::sync::watch::Sender<u64>,
4951
pub(crate) stored_segments: Box<dyn ReplicateFromStorage>,
5052
pub(crate) shutdown: AtomicBool,
53+
pub(crate) checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
5154
}
5255

5356
impl<IO: Io> SharedWal<IO> {
@@ -87,6 +90,8 @@ impl<IO: Io> SharedWal<IO> {
8790
created_at: Instant::now(),
8891
conn_id,
8992
pages_read: 0,
93+
namespace: self.namespace.clone(),
94+
checkpoint_notifier: self.checkpoint_notifier.clone(),
9095
}
9196
}
9297

libsql-wal/src/storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ impl<IO: Io> Storage for TestStorage<IO> {
339339
.entry(namespace.clone())
340340
.or_default()
341341
.insert(key, (out_path, index));
342-
tokio::runtime::Handle::current().block_on(on_store(end_frame_no))
342+
tokio::runtime::Handle::current().block_on(on_store(end_frame_no));
343343
}
344344
}
345345

libsql-wal/src/transaction.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ use std::ops::{Deref, DerefMut};
33
use std::sync::Arc;
44
use std::time::Instant;
55

6+
use libsql_sys::name::NamespaceName;
67
use parking_lot::{ArcMutexGuard, RawMutex};
8+
use tokio::sync::mpsc;
79

10+
use crate::checkpointer::CheckpointMessage;
811
use crate::segment::current::{CurrentSegment, SegmentIndex};
912
use crate::shared_wal::WalLock;
1013

@@ -77,8 +80,11 @@ pub struct ReadTransaction<F> {
7780
/// number of pages read by this transaction. This is used to determine whether a write lock
7881
/// will be re-acquired.
7982
pub pages_read: usize,
83+
pub namespace: NamespaceName,
84+
pub checkpoint_notifier: mpsc::Sender<CheckpointMessage>,
8085
}
8186

87+
// fixme: clone should probably not be implemented for this type, figure a way to do it
8288
impl<F> Clone for ReadTransaction<F> {
8389
fn clone(&self) -> Self {
8490
self.current.inc_reader_count();
@@ -90,14 +96,21 @@ impl<F> Clone for ReadTransaction<F> {
9096
created_at: self.created_at,
9197
conn_id: self.conn_id,
9298
pages_read: self.pages_read,
99+
namespace: self.namespace.clone(),
100+
checkpoint_notifier: self.checkpoint_notifier.clone(),
93101
}
94102
}
95103
}
96104

97105
impl<F> Drop for ReadTransaction<F> {
98106
fn drop(&mut self) {
99-
// FIXME: if the count drops to 0, register for compaction.
100-
self.current.dec_reader_count();
107+
// FIXME: it would be more approriate to wait till the segment is stored before notfying,
108+
// because we are not waiting for read to be released before that
109+
if self.current.dec_reader_count() && self.current.is_sealed() {
110+
let _: Result<_, _> = self
111+
.checkpoint_notifier
112+
.try_send(self.namespace.clone().into());
113+
}
101114
}
102115
}
103116

0 commit comments

Comments
 (0)