Skip to content

Commit 92a10a1

Browse files
committed
fmt
1 parent 4b2ac5c commit 92a10a1

File tree

6 files changed

+59
-47
lines changed

6 files changed

+59
-47
lines changed

libsql-wal/src/io/mod.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ pub trait Io: Send + Sync + 'static {
3333
fn uuid(&self) -> Uuid;
3434
fn hard_link(&self, src: &Path, dst: &Path) -> io::Result<()>;
3535
fn with_rng<F, R>(&self, f: F) -> R
36-
where F: FnOnce(&mut Self::Rng) -> R;
36+
where
37+
F: FnOnce(&mut Self::Rng) -> R;
3738
}
3839

3940
#[derive(Default, Debug, Clone, Copy)]
@@ -79,9 +80,10 @@ impl Io for StdIO {
7980
}
8081

8182
fn with_rng<F, R>(&self, f: F) -> R
82-
where F: FnOnce(&mut Self::Rng) -> R,
83-
{
84-
f(&mut thread_rng())
83+
where
84+
F: FnOnce(&mut Self::Rng) -> R,
85+
{
86+
f(&mut thread_rng())
8587
}
8688
}
8789

@@ -121,8 +123,10 @@ impl<T: Io> Io for Arc<T> {
121123
}
122124

123125
fn with_rng<F, R>(&self, f: F) -> R
124-
where F: FnOnce(&mut Self::Rng) -> R {
125-
self.as_ref().with_rng(f)
126+
where
127+
F: FnOnce(&mut Self::Rng) -> R,
128+
{
129+
self.as_ref().with_rng(f)
126130
}
127131
}
128132

@@ -145,13 +149,13 @@ impl<W, F> io::Write for Inspect<W, F>
145149
where
146150
W: io::Write,
147151
for<'a> F: FnMut(&'a [u8]),
148-
{
149-
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
150-
(self.f)(buf);
151-
self.inner.write(buf)
152-
}
152+
{
153+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
154+
(self.f)(buf);
155+
self.inner.write(buf)
156+
}
153157

154-
fn flush(&mut self) -> io::Result<()> {
155-
self.inner.flush()
156-
}
158+
fn flush(&mut self) -> io::Result<()> {
159+
self.inner.flush()
157160
}
161+
}

libsql-wal/src/lib.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ const LIBSQL_MAGIC: u64 = u64::from_be_bytes(*b"LIBSQL\0\0");
1616
#[cfg(any(debug_assertions, test))]
1717
pub mod test {
1818
use std::fs::OpenOptions;
19-
use std::path::PathBuf;
2019
use std::path::Path;
20+
use std::path::PathBuf;
2121
use std::sync::Arc;
2222

23-
use libsql_sys::rusqlite::OpenFlags;
2423
use libsql_sys::name::NamespaceName;
24+
use libsql_sys::rusqlite::OpenFlags;
2525
use tempfile::{tempdir, TempDir};
2626
use tokio::sync::mpsc;
2727

@@ -77,10 +77,12 @@ pub mod test {
7777
)
7878
.unwrap(),
7979
);
80+
8081
if store {
8182
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 5);
8283
tokio::spawn(checkpointer.run());
8384
}
85+
8486
let wal = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
8587

8688
Self { tmp, registry, wal }
@@ -97,10 +99,7 @@ pub mod test {
9799
self.tmp.path().join(namespace)
98100
}
99101

100-
pub fn open_conn(
101-
&self,
102-
namespace: &'static str,
103-
) -> libsql_sys::Connection<LibsqlWal<IO>> {
102+
pub fn open_conn(&self, namespace: &'static str) -> libsql_sys::Connection<LibsqlWal<IO>> {
104103
let path = self.db_path(namespace);
105104
let wal = self.wal.clone();
106105
std::fs::create_dir_all(&path).unwrap();

libsql-wal/src/segment/current.rs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,11 @@ impl<F> CurrentSegment<F> {
233233
tracing::trace!(page_no, "recycling frame");
234234
self.file.write_all_at(page, page_offset(*offset))?;
235235
// we overwrote a frame, record that for later rewrite
236-
tx.recompute_checksum = Some(tx
237-
.recompute_checksum
238-
.map(|old| old.min(*offset))
239-
.unwrap_or(*offset));
236+
tx.recompute_checksum = Some(
237+
tx.recompute_checksum
238+
.map(|old| old.min(*offset))
239+
.unwrap_or(*offset),
240+
);
240241
continue;
241242
}
242243

@@ -272,7 +273,8 @@ impl<F> CurrentSegment<F> {
272273
self.header.lock().start_frame_no.get() + offset as u64,
273274
frame_no
274275
);
275-
self.file.write_at_vectored(slices, checked_frame_offset(offset))?;
276+
self.file
277+
.write_at_vectored(slices, checked_frame_offset(offset))?;
276278
assert!(
277279
current_savepoint.index.insert(page_no, offset).is_none(),
278280
"existing frames should be recycled"
@@ -290,7 +292,7 @@ impl<F> CurrentSegment<F> {
290292
} else {
291293
tx.current_checksum()
292294
};
293-
295+
294296
#[cfg(debug_assertions)]
295297
{
296298
// ensure that file checksum for that transaction is valid
@@ -319,8 +321,7 @@ impl<F> CurrentSegment<F> {
319321
// set the header last, so that a transaction does not witness a write before
320322
// it's actually committed.
321323
*self.header.lock() = header;
322-
self.current_checksum
323-
.store(new_checksum, Ordering::Relaxed);
324+
self.current_checksum.store(new_checksum, Ordering::Relaxed);
324325

325326
tx.is_commited = true;
326327

@@ -521,13 +522,14 @@ impl<F> CurrentSegment<F> {
521522
}
522523

523524
fn recompute_checksum(&self, start_offset: u32, until_offset: u32) -> Result<u32>
524-
where F: FileExt
525+
where
526+
F: FileExt,
525527
{
526528
let mut current_checksum = if start_offset == 0 {
527529
self.header.lock().salt.get()
528530
} else {
529531
// we get the checksum from the frame just before the the start offset
530-
let frame_offset = checked_frame_offset((start_offset - 1));
532+
let frame_offset = checked_frame_offset(start_offset - 1);
531533
let mut out = U32::new(0);
532534
self.file.read_exact_at(out.as_bytes_mut(), frame_offset)?;
533535
out.get()
@@ -536,9 +538,11 @@ impl<F> CurrentSegment<F> {
536538
let mut checked_frame: Box<CheckedFrame> = CheckedFrame::new_box_zeroed();
537539
for offset in start_offset..=until_offset {
538540
let frame_offset = checked_frame_offset(offset);
539-
self.file.read_exact_at(checked_frame.as_bytes_mut(), frame_offset)?;
541+
self.file
542+
.read_exact_at(checked_frame.as_bytes_mut(), frame_offset)?;
540543
current_checksum = checked_frame.frame.checksum(current_checksum);
541-
self.file.write_all_at(&current_checksum.to_le_bytes(), frame_offset)?;
544+
self.file
545+
.write_all_at(&current_checksum.to_le_bytes(), frame_offset)?;
542546
}
543547

544548
Ok(current_checksum)
@@ -548,7 +552,8 @@ impl<F> CurrentSegment<F> {
548552
#[cfg(debug_assertions)]
549553
#[track_caller]
550554
fn assert_valid_checksum(&self, from: u32, until: u32) -> Result<()>
551-
where F: FileExt
555+
where
556+
F: FileExt,
552557
{
553558
let mut frame: Box<CheckedFrame> = CheckedFrame::new_box_zeroed();
554559
let mut current_checksum = if from != 0 {
@@ -563,7 +568,11 @@ impl<F> CurrentSegment<F> {
563568
let offset = checked_frame_offset(i);
564569
self.file.read_exact_at(frame.as_bytes_mut(), offset)?;
565570
current_checksum = frame.frame.checksum(current_checksum);
566-
assert_eq!(current_checksum, frame.checksum.get(), "invalid checksum at offset {i}");
571+
assert_eq!(
572+
current_checksum,
573+
frame.checksum.get(),
574+
"invalid checksum at offset {i}"
575+
);
567576
}
568577

569578
Ok(())
@@ -902,8 +911,9 @@ mod test {
902911
// simulate a flush that only flushes half the pages and then fail
903912
let inner = self.inner();
904913
let inner = inner.lock();
905-
let npages = inner.len() / 4096;
906-
std::fs::write(&self.path, &inner[..4096 * (npages / 2)])?;
914+
// just keep 5 pages from the log. The log will be incomplete and frames will be
915+
// broken.
916+
std::fs::write(&self.path, &inner[..4096 * 5])?;
907917
Err(io::Error::new(io::ErrorKind::BrokenPipe, ""))
908918
}
909919

@@ -1007,12 +1017,9 @@ mod test {
10071017
let shared = env.shared("test");
10081018

10091019
conn.execute("create table test (x)", ()).unwrap();
1010-
conn.execute("insert into test values (1234)", ()).unwrap();
1011-
conn.execute("insert into test values (1234)", ()).unwrap();
1012-
conn.execute("insert into test values (1234)", ()).unwrap();
1013-
conn.execute("insert into test values (1234)", ()).unwrap();
1014-
conn.execute("insert into test values (1234)", ()).unwrap();
1015-
conn.execute("insert into test values (1234)", ()).unwrap();
1020+
for _ in 0..6 {
1021+
conn.execute("insert into test values (1234)", ()).unwrap();
1022+
}
10161023

10171024
// trigger a flush, that will fail. When we reopen the db, the log should need recovery
10181025
// this simulates a crash before flush
@@ -1028,8 +1035,9 @@ mod test {
10281035
{
10291036
let env = TestEnv::new_io_and_tmp(SyncFailBufferIo::default(), tmp.clone());
10301037
let conn = env.open_conn("test");
1038+
// the db was recovered: we lost some rows, but it still works
10311039
conn.query_row("select count(*) from test", (), |row| {
1032-
dbg!(row);
1040+
assert_eq!(row.get::<_, u32>(0).unwrap(), 2);
10331041
Ok(())
10341042
})
10351043
.unwrap();

libsql-wal/src/segment/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,6 @@ impl CheckedFrame {
255255
pub(crate) const fn offset_of_frame() -> usize {
256256
offset_of!(Self, frame)
257257
}
258-
259258
}
260259

261260
#[repr(C)]

libsql-wal/tests/flaky_fs.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,10 @@ impl Io for FlakyIo {
153153
}
154154

155155
fn with_rng<F, R>(&self, f: F) -> R
156-
where F: FnOnce(&mut Self::Rng) -> R {
157-
f(&mut self.rng.lock())
156+
where
157+
F: FnOnce(&mut Self::Rng) -> R,
158+
{
159+
f(&mut self.rng.lock())
158160
}
159161
}
160162

rust-toolchain.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[toolchain]
22
profile = "default"
3-
channel = "1.80.0"
3+
channel = "1.78.0"

0 commit comments

Comments
 (0)