Skip to content

Commit 0048de9

Browse files
authored
Merge pull request #1613 from tursodatabase/current-segment-recovery
current segment recovery
2 parents 733e736 + 406f0fc commit 0048de9

File tree

18 files changed

+694
-75
lines changed

18 files changed

+694
-75
lines changed

libsql-wal/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ inquire = { version = "0.7.5", optional = true }
4242
tracing-subscriber = { version = "0.3.18", optional = true }
4343
aws-credential-types = { version = "1.2.0", optional = true }
4444
dashmap = "6.0.1"
45+
rand = "0.8.5"
4546

4647
[dev-dependencies]
4748
criterion = "0.5.1"

libsql-wal/src/checkpointer.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ pub struct Checkpointer<P> {
8989
join_set: JoinSet<(NamespaceName, crate::error::Result<()>)>,
9090
processing: Vec<NamespaceName>,
9191
errors: usize,
92+
/// previous iteration of the loop resulted in no work being enqueued
93+
no_work: bool,
9294
}
9395

9496
#[allow(private_bounds)]
@@ -111,6 +113,7 @@ where
111113
join_set: JoinSet::new(),
112114
processing: Vec::new(),
113115
errors: 0,
116+
no_work: false,
114117
}
115118
}
116119

@@ -164,17 +167,19 @@ where
164167
}
165168
// don't wait if there is stuff to enqueue
166169
_ = std::future::ready(()), if !self.scheduled.is_empty()
167-
&& self.join_set.len() < self.max_checkpointing_conccurency => (),
170+
&& self.join_set.len() < self.max_checkpointing_conccurency && !self.no_work => (),
168171
}
169172

170173
let n_available = self.max_checkpointing_conccurency - self.join_set.len();
171174
if n_available > 0 {
175+
self.no_work = true;
172176
for namespace in self
173177
.scheduled
174178
.difference(&self.checkpointing)
175179
.take(n_available)
176180
.cloned()
177181
{
182+
self.no_work = false;
178183
self.processing.push(namespace.clone());
179184
let perform_checkpoint = self.perform_checkpoint.clone();
180185
self.join_set.spawn(async move {

libsql-wal/src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ pub enum Error {
1111
BusySnapshot,
1212
#[error("invalid segment header checksum")]
1313
InvalidHeaderChecksum,
14+
#[error("invalid segment header magic")]
15+
InvalidHeaderMagic,
16+
#[error("invalid segment header version")]
17+
InvalidHeaderVersion,
1418
}
1519

1620
impl Into<libsql_sys::ffi::Error> for Error {

libsql-wal/src/io/file.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,10 @@ impl<W> BufCopy<W> {
214214
let Self { w, buf } = self;
215215
(w, buf)
216216
}
217+
218+
pub fn get_ref(&self) -> &W {
219+
&self.w
220+
}
217221
}
218222

219223
impl<W: Write> Write for BufCopy<W> {

libsql-wal/src/io/mod.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::path::Path;
33
use std::sync::Arc;
44

55
use chrono::{DateTime, Utc};
6+
use rand::{rngs::ThreadRng, thread_rng, Rng};
67
use uuid::Uuid;
78

89
pub use self::file::FileExt;
@@ -14,6 +15,7 @@ pub mod file;
1415
pub trait Io: Send + Sync + 'static {
1516
type File: FileExt;
1617
type TempFile: FileExt;
18+
type Rng: Rng;
1719

1820
fn create_dir_all(&self, path: &Path) -> io::Result<()>;
1921
/// TODO: when adding an async variant make sure all places where async is needed are replaced
@@ -30,6 +32,9 @@ pub trait Io: Send + Sync + 'static {
3032
fn now(&self) -> DateTime<Utc>;
3133
fn uuid(&self) -> Uuid;
3234
fn hard_link(&self, src: &Path, dst: &Path) -> io::Result<()>;
35+
fn with_rng<F, R>(&self, f: F) -> R
36+
where
37+
F: FnOnce(&mut Self::Rng) -> R;
3338
}
3439

3540
#[derive(Default, Debug, Clone, Copy)]
@@ -38,6 +43,7 @@ pub struct StdIO(pub(crate) ());
3843
impl Io for StdIO {
3944
type File = std::fs::File;
4045
type TempFile = std::fs::File;
46+
type Rng = ThreadRng;
4147

4248
fn create_dir_all(&self, path: &Path) -> io::Result<()> {
4349
std::fs::create_dir_all(path)
@@ -72,11 +78,19 @@ impl Io for StdIO {
7278
fn hard_link(&self, src: &Path, dst: &Path) -> io::Result<()> {
7379
std::fs::hard_link(src, dst)
7480
}
81+
82+
fn with_rng<F, R>(&self, f: F) -> R
83+
where
84+
F: FnOnce(&mut Self::Rng) -> R,
85+
{
86+
f(&mut thread_rng())
87+
}
7588
}
7689

7790
impl<T: Io> Io for Arc<T> {
7891
type File = T::File;
7992
type TempFile = T::TempFile;
93+
type Rng = T::Rng;
8094

8195
fn create_dir_all(&self, path: &Path) -> io::Result<()> {
8296
self.as_ref().create_dir_all(path)
@@ -107,4 +121,41 @@ impl<T: Io> Io for Arc<T> {
107121
fn hard_link(&self, src: &Path, dst: &Path) -> io::Result<()> {
108122
self.as_ref().hard_link(src, dst)
109123
}
124+
125+
fn with_rng<F, R>(&self, f: F) -> R
126+
where
127+
F: FnOnce(&mut Self::Rng) -> R,
128+
{
129+
self.as_ref().with_rng(f)
130+
}
131+
}
132+
133+
pub struct Inspect<W, F> {
134+
inner: W,
135+
f: F,
136+
}
137+
138+
impl<W, F> Inspect<W, F> {
139+
pub fn new(inner: W, f: F) -> Self {
140+
Self { inner, f }
141+
}
142+
143+
pub(crate) fn into_inner(self) -> W {
144+
self.inner
145+
}
146+
}
147+
148+
impl<W, F> io::Write for Inspect<W, F>
149+
where
150+
W: io::Write,
151+
for<'a> F: FnMut(&'a [u8]),
152+
{
153+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
154+
(self.f)(buf);
155+
self.inner.write(buf)
156+
}
157+
158+
fn flush(&mut self) -> io::Result<()> {
159+
self.inner.flush()
160+
}
110161
}

libsql-wal/src/lib.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,27 @@ const LIBSQL_MAGIC: u64 = u64::from_be_bytes(*b"LIBSQL\0\0");
1616
#[cfg(any(debug_assertions, test))]
1717
pub mod test {
1818
use std::fs::OpenOptions;
19+
use std::path::Path;
1920
use std::path::PathBuf;
20-
use std::{path::Path, sync::Arc};
21+
use std::sync::Arc;
2122

22-
use libsql_sys::{name::NamespaceName, rusqlite::OpenFlags};
23+
use libsql_sys::name::NamespaceName;
24+
use libsql_sys::rusqlite::OpenFlags;
2325
use tempfile::{tempdir, TempDir};
2426
use tokio::sync::mpsc;
2527

2628
use crate::checkpointer::LibsqlCheckpointer;
29+
use crate::io::Io;
2730
use crate::io::StdIO;
2831
use crate::registry::WalRegistry;
2932
use crate::shared_wal::SharedWal;
3033
use crate::storage::TestStorage;
3134
use crate::wal::{LibsqlWal, LibsqlWalManager};
3235

33-
pub struct TestEnv {
34-
pub tmp: TempDir,
35-
pub registry: Arc<WalRegistry<StdIO, TestStorage>>,
36-
pub wal: LibsqlWalManager<StdIO, TestStorage>,
36+
pub struct TestEnv<IO: Io = StdIO> {
37+
pub tmp: Arc<TempDir>,
38+
pub registry: Arc<WalRegistry<IO, TestStorage<IO>>>,
39+
pub wal: LibsqlWalManager<IO, TestStorage<IO>>,
3740
}
3841

3942
impl TestEnv {
@@ -42,7 +45,17 @@ pub mod test {
4245
}
4346

4447
pub fn new_store(store: bool) -> Self {
48+
TestEnv::new_io(StdIO(()), store)
49+
}
50+
}
51+
52+
impl<IO: Io + Clone> TestEnv<IO> {
53+
pub fn new_io(io: IO, store: bool) -> Self {
4554
let tmp = tempdir().unwrap();
55+
Self::new_io_and_tmp(io, tmp.into(), store)
56+
}
57+
58+
pub fn new_io_and_tmp(io: IO, tmp: Arc<TempDir>, store: bool) -> Self {
4659
let resolver = |path: &Path| {
4760
let name = path
4861
.parent()
@@ -56,23 +69,26 @@ pub mod test {
5669

5770
let (sender, receiver) = mpsc::channel(128);
5871
let registry = Arc::new(
59-
WalRegistry::new(
72+
WalRegistry::new_with_io(
73+
io.clone(),
6074
tmp.path().join("test/wals"),
61-
TestStorage::new_io(store, StdIO(())),
75+
TestStorage::new_io(store, io),
6276
sender,
6377
)
6478
.unwrap(),
6579
);
80+
6681
if store {
6782
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 5);
6883
tokio::spawn(checkpointer.run());
6984
}
85+
7086
let wal = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
7187

7288
Self { tmp, registry, wal }
7389
}
7490

75-
pub fn shared(&self, namespace: &str) -> Arc<SharedWal<StdIO>> {
91+
pub fn shared(&self, namespace: &str) -> Arc<SharedWal<IO>> {
7692
let path = self.tmp.path().join(namespace).join("data");
7793
let registry = self.registry.clone();
7894
let namespace = NamespaceName::from_string(namespace.into());
@@ -83,10 +99,7 @@ pub mod test {
8399
self.tmp.path().join(namespace)
84100
}
85101

86-
pub fn open_conn(
87-
&self,
88-
namespace: &'static str,
89-
) -> libsql_sys::Connection<LibsqlWal<StdIO>> {
102+
pub fn open_conn(&self, namespace: &'static str) -> libsql_sys::Connection<LibsqlWal<IO>> {
90103
let path = self.db_path(namespace);
91104
let wal = self.wal.clone();
92105
std::fs::create_dir_all(&path).unwrap();
@@ -110,7 +123,7 @@ pub mod test {
110123
}
111124
}
112125

113-
pub fn seal_current_segment(shared: &SharedWal<StdIO>) {
126+
pub fn seal_current_segment<IO: Io>(shared: &SharedWal<IO>) {
114127
let mut tx = shared.begin_read(99999).into();
115128
shared.upgrade(&mut tx).unwrap();
116129
{

libsql-wal/src/registry.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
use dashmap::DashMap;
77
use libsql_sys::ffi::Sqlite3DbHeader;
88
use parking_lot::{Condvar, Mutex};
9+
use rand::Rng;
910
use tokio::sync::{mpsc, Notify, Semaphore};
1011
use tokio::task::JoinSet;
1112
use zerocopy::{AsBytes, FromZeroes};
@@ -32,7 +33,7 @@ enum Slot<IO: Io> {
3233

3334
/// Wal Registry maintains a set of shared Wal, and their respective set of files.
3435
pub struct WalRegistry<IO: Io, S> {
35-
fs: IO,
36+
io: IO,
3637
path: PathBuf,
3738
shutdown: AtomicBool,
3839
opened: DashMap<NamespaceName, Slot<IO>>,
@@ -59,7 +60,7 @@ impl<IO: Io, S> WalRegistry<IO, S> {
5960
) -> Result<Self> {
6061
io.create_dir_all(&path)?;
6162
let registry = Self {
62-
fs: io,
63+
io,
6364
path,
6465
opened: Default::default(),
6566
shutdown: Default::default(),
@@ -105,14 +106,15 @@ where
105106
.join(shared.namespace().as_str())
106107
.join(format!("{}:{start_frame_no:020}.seg", shared.namespace()));
107108

108-
let segment_file = self.fs.open(true, true, true, &path)?;
109-
109+
let segment_file = self.io.open(true, true, true, &path)?;
110+
let salt = self.io.with_rng(|rng| rng.gen());
110111
let new = CurrentSegment::create(
111112
segment_file,
112113
path,
113114
start_frame_no,
114115
current.db_size(),
115116
current.tail().clone(),
117+
salt,
116118
)?;
117119
// sealing must the last fallible operation, because we don't want to end up in a situation
118120
// where the current log is sealed and it wasn't swapped.
@@ -126,9 +128,8 @@ where
126128
update_durable(fno, notifier, durable_frame_no, namespace).await;
127129
})
128130
});
129-
self.storage
130-
.store(&shared.namespace, sealed.clone(), None, cb);
131-
new.tail().push(sealed);
131+
new.tail().push(sealed.clone());
132+
self.storage.store(&shared.namespace, sealed, None, cb);
132133
}
133134

134135
shared.current.swap(Arc::new(new));
@@ -226,7 +227,7 @@ where
226227
db_path: &Path,
227228
) -> Result<Arc<SharedWal<IO>>> {
228229
let path = self.path.join(namespace.as_str());
229-
self.fs.create_dir_all(&path)?;
230+
self.io.create_dir_all(&path)?;
230231
// TODO: handle that with abstract io
231232
let dir = walkdir::WalkDir::new(&path).sort_by_file_name().into_iter();
232233

@@ -246,7 +247,7 @@ where
246247
continue;
247248
}
248249

249-
let file = self.fs.open(false, true, true, entry.path())?;
250+
let file = self.io.open(false, true, true, entry.path())?;
250251

251252
if let Some(sealed) =
252253
SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default())?
@@ -265,7 +266,7 @@ where
265266
}
266267
}
267268

268-
let db_file = self.fs.open(false, true, true, db_path)?;
269+
let db_file = self.io.open(false, true, true, db_path)?;
269270

270271
let mut header: Sqlite3DbHeader = Sqlite3DbHeader::new_zeroed();
271272
db_file.read_exact_at(header.as_bytes_mut(), 0)?;
@@ -283,14 +284,16 @@ where
283284

284285
let current_path = path.join(format!("{namespace}:{next_frame_no:020}.seg"));
285286

286-
let segment_file = self.fs.open(true, true, true, &current_path)?;
287+
let segment_file = self.io.open(true, true, true, &current_path)?;
288+
let salt = self.io.with_rng(|rng| rng.gen());
287289

288290
let current = arc_swap::ArcSwap::new(Arc::new(CurrentSegment::create(
289291
segment_file,
290292
current_path,
291293
next_frame_no,
292294
db_size,
293295
tail.into(),
296+
salt,
294297
)?));
295298

296299
let (new_frame_notifier, _) = tokio::sync::watch::channel(next_frame_no.get() - 1);
@@ -309,6 +312,7 @@ where
309312
namespace.clone(),
310313
)),
311314
shutdown: false.into(),
315+
checkpoint_notifier: self.checkpoint_notifier.clone(),
312316
});
313317

314318
self.opened

libsql-wal/src/replication/injector.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl<'a, IO: Io> Injector<'a, IO> {
5454
self.max_tx_frame_no = 0;
5555
}
5656
let buffer = current
57-
.insert_frames(buffer, commit_data, &mut self.tx)
57+
.inject_frames(buffer, commit_data, &mut self.tx)
5858
.await?;
5959
self.buffer = buffer;
6060

libsql-wal/src/replication/replicator.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,6 @@ mod test {
299299
let mut replica_content = vec![0u8; db_content.len()];
300300
while let Some(f) = stream.next().await {
301301
let frame = f.unwrap();
302-
dbg!(frame.header().page_no());
303302
let offset = (frame.header().page_no() as usize - 1) * 4096;
304303
tmp.as_file()
305304
.write_all_at(frame.data(), offset as u64)

0 commit comments

Comments
 (0)