Skip to content

Commit b924a5a

Browse files
committed
fmt
1 parent 6e2cd97 commit b924a5a

File tree

14 files changed

+164
-111
lines changed

14 files changed

+164
-111
lines changed

libsql-server/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,10 @@ where
708708
let (sender, receiver) = tokio::sync::mpsc::channel(64);
709709
let registry = Arc::new(WalRegistry::new(wal_path, NoStorage, sender)?);
710710
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 8);
711-
join_set.spawn(async move { checkpointer.run().await; Ok(()) });
711+
join_set.spawn(async move {
712+
checkpointer.run().await;
713+
Ok(())
714+
});
712715

713716
let wal = LibsqlWalManager::new(registry.clone(), Arc::new(namespace_resolver));
714717
let shutdown_notify = self.shutdown.clone();

libsql-wal/src/bins/shell/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,11 @@ where
147147
println!("max durable frame: {durable}");
148148
}
149149

150-
async fn run_shell<S>(registry: Arc<WalRegistry<StdIO, S>>, db_path: &Path, namespace: NamespaceName)
151-
where
150+
async fn run_shell<S>(
151+
registry: Arc<WalRegistry<StdIO, S>>,
152+
db_path: &Path,
153+
namespace: NamespaceName,
154+
) where
152155
S: Storage<Segment = SealedSegment<std::fs::File>>,
153156
{
154157
let db_path = db_path.join("dbs").join(namespace.as_str());

libsql-wal/src/checkpointer.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ mod test {
214214
}
215215

216216
let (sender, receiver) = mpsc::channel(8);
217-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
217+
let mut checkpointer =
218+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
218219
let ns = NamespaceName::from("test");
219220

220221
sender.send(ns.clone().into()).await.unwrap();
@@ -246,7 +247,8 @@ mod test {
246247
}
247248

248249
let (sender, receiver) = mpsc::channel(8);
249-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
250+
let mut checkpointer =
251+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
250252
let ns = NamespaceName::from("test");
251253

252254
sender.send(ns.clone().into()).await.unwrap();
@@ -277,7 +279,8 @@ mod test {
277279
}
278280

279281
let (sender, receiver) = mpsc::channel(8);
280-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
282+
let mut checkpointer =
283+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
281284

282285
drop(sender);
283286

@@ -303,7 +306,8 @@ mod test {
303306
}
304307

305308
let (sender, receiver) = mpsc::channel(8);
306-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
309+
let mut checkpointer =
310+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
307311

308312
drop(sender);
309313

@@ -336,7 +340,8 @@ mod test {
336340
}
337341

338342
let (sender, receiver) = mpsc::channel(8);
339-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
343+
let mut checkpointer =
344+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
340345

341346
let ns: NamespaceName = "test".into();
342347

@@ -368,7 +373,8 @@ mod test {
368373
}
369374

370375
let (sender, receiver) = mpsc::channel(8);
371-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
376+
let mut checkpointer =
377+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
372378

373379
let ns1: NamespaceName = "test1".into();
374380
let ns2: NamespaceName = "test2".into();
@@ -403,7 +409,8 @@ mod test {
403409
}
404410

405411
let (sender, receiver) = mpsc::channel(8);
406-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 2);
412+
let mut checkpointer =
413+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 2);
407414

408415
let ns1: NamespaceName = "test1".into();
409416
let ns2: NamespaceName = "test2".into();

libsql-wal/src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ pub mod test {
5656

5757
let (sender, receiver) = mpsc::channel(128);
5858
let registry = Arc::new(
59-
WalRegistry::new(tmp.path().join("test/wals"), TestStorage::new_io(store, StdIO(())), sender).unwrap(),
59+
WalRegistry::new(
60+
tmp.path().join("test/wals"),
61+
TestStorage::new_io(store, StdIO(())),
62+
sender,
63+
)
64+
.unwrap(),
6065
);
6166
if store {
6267
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 5);

libsql-wal/src/registry.rs

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,13 @@ where
121121
let notifier = self.checkpoint_notifier.clone();
122122
let namespace = shared.namespace().clone();
123123
let durable_frame_no = shared.durable_frame_no.clone();
124-
let cb: OnStoreCallback = Box::new(move |fno| Box::pin(async move {
125-
update_durable(fno, notifier, durable_frame_no, namespace).await;
126-
}));
127-
self.storage.store(&shared.namespace, sealed.clone(), None, cb);
124+
let cb: OnStoreCallback = Box::new(move |fno| {
125+
Box::pin(async move {
126+
update_durable(fno, notifier, durable_frame_no, namespace).await;
127+
})
128+
});
129+
self.storage
130+
.store(&shared.namespace, sealed.clone(), None, cb);
128131
new.tail().push(sealed);
129132
}
130133

@@ -140,7 +143,7 @@ async fn update_durable(
140143
notifier: mpsc::Sender<CheckpointMessage>,
141144
durable_frame_no_slot: Arc<Mutex<u64>>,
142145
namespace: NamespaceName,
143-
) {
146+
) {
144147
{
145148
let mut g = durable_frame_no_slot.lock();
146149
if *g < new_durable {
@@ -180,23 +183,18 @@ where
180183
}
181184

182185
let action = match self.opened.entry(namespace.clone()) {
183-
dashmap::Entry::Occupied(e) => {
184-
match e.get() {
185-
Slot::Wal(shared) => return Ok(shared.clone()),
186-
Slot::Building(wait, _) => {
187-
Err(wait.clone())
188-
},
189-
}
186+
dashmap::Entry::Occupied(e) => match e.get() {
187+
Slot::Wal(shared) => return Ok(shared.clone()),
188+
Slot::Building(wait, _) => Err(wait.clone()),
190189
},
191190
dashmap::Entry::Vacant(e) => {
192191
let notifier = Arc::new((Condvar::new(), Mutex::new(false)));
193192
let async_notifier = Arc::new(Notify::new());
194193
e.insert(Slot::Building(notifier.clone(), async_notifier.clone()));
195194
Ok((notifier, async_notifier))
196-
},
195+
}
197196
};
198197

199-
200198
match action {
201199
Ok((notifier, async_notifier)) => {
202200
// if try_open succedded, then the slot was updated and contains the shared wal, if it
@@ -216,8 +214,8 @@ where
216214
cond.0
217215
.wait_while(&mut cond.1.lock(), |ready: &mut bool| !*ready);
218216
// the slot was updated: try again
219-
continue
220-
},
217+
continue;
218+
}
221219
}
222220
}
223221
}
@@ -254,11 +252,13 @@ where
254252
SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default())?
255253
{
256254
let notifier = self.checkpoint_notifier.clone();
257-
let ns = namespace.clone();
255+
let ns = namespace.clone();
258256
let durable_frame_no = durable_frame_no.clone();
259-
let cb: OnStoreCallback = Box::new(move |fno| Box::pin(async move {
260-
update_durable(fno, notifier, durable_frame_no, ns).await;
261-
}));
257+
let cb: OnStoreCallback = Box::new(move |fno| {
258+
Box::pin(async move {
259+
update_durable(fno, notifier, durable_frame_no, ns).await;
260+
})
261+
});
262262
// TODO: pass config override here
263263
self.storage.store(&namespace, sealed.clone(), None, cb);
264264
tail.push(sealed);
@@ -311,7 +311,8 @@ where
311311
shutdown: false.into(),
312312
});
313313

314-
self.opened.insert(namespace.clone(), Slot::Wal(shared.clone()));
314+
self.opened
315+
.insert(namespace.clone(), Slot::Wal(shared.clone()));
315316

316317
return Ok(shared);
317318
}
@@ -321,7 +322,6 @@ where
321322
pub async fn shutdown(self: Arc<Self>) -> Result<()> {
322323
self.shutdown.store(true, Ordering::SeqCst);
323324

324-
325325
let mut join_set = JoinSet::<Result<()>>::new();
326326
let semaphore = Arc::new(Semaphore::new(8));
327327
for item in self.opened.iter() {
@@ -347,20 +347,23 @@ where
347347

348348
Ok(())
349349
});
350-
break
351-
},
350+
break;
351+
}
352352
Slot::Building(_, notify) => {
353353
// wait for shared to finish building
354354
notify.notified().await;
355-
},
355+
}
356356
}
357357
}
358358
}
359359

360360
while join_set.join_next().await.is_some() {}
361361

362362
// wait for checkpointer to exit
363-
let _ = self.checkpoint_notifier.send(CheckpointMessage::Shutdown).await;
363+
let _ = self
364+
.checkpoint_notifier
365+
.send(CheckpointMessage::Shutdown)
366+
.await;
364367
self.checkpoint_notifier.closed().await;
365368

366369
Ok(())

libsql-wal/src/replication/replicator.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,9 @@ mod test {
269269
tokio::task::spawn_blocking({
270270
let shared = shared.clone();
271271
move || seal_current_segment(&shared)
272-
}).await.unwrap();
272+
})
273+
.await
274+
.unwrap();
273275

274276
conn.execute("create table test2 (x)", ()).unwrap();
275277
conn.execute("insert into test2 values (randomblob(128))", ())
@@ -278,7 +280,9 @@ mod test {
278280
tokio::task::spawn_blocking({
279281
let shared = shared.clone();
280282
move || seal_current_segment(&shared)
281-
}).await.unwrap();
283+
})
284+
.await
285+
.unwrap();
282286

283287
while !shared.current.load().tail().is_empty() {
284288
tokio::time::sleep(Duration::from_millis(50)).await;
@@ -293,12 +297,14 @@ mod test {
293297

294298
let tmp = NamedTempFile::new().unwrap();
295299
let mut replica_content = vec![0u8; db_content.len()];
296-
while let Some(f) = stream.next().await {
300+
while let Some(f) = stream.next().await {
297301
let frame = f.unwrap();
298302
dbg!(frame.header().page_no());
299303
let offset = (frame.header().page_no() as usize - 1) * 4096;
300-
tmp.as_file().write_all_at(frame.data(), offset as u64).unwrap();
301-
replica_content[offset..offset+4096].copy_from_slice(frame.data());
304+
tmp.as_file()
305+
.write_all_at(frame.data(), offset as u64)
306+
.unwrap();
307+
replica_content[offset..offset + 4096].copy_from_slice(frame.data());
302308
}
303309

304310
assert_eq!(replica_content, db_content);

libsql-wal/src/replication/storage.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,7 @@ pub struct StorageReplicator<S> {
2727
}
2828

2929
impl<S> StorageReplicator<S> {
30-
pub fn new(
31-
storage: Arc<S>,
32-
namespace: NamespaceName,
33-
) -> Self {
30+
pub fn new(storage: Arc<S>, namespace: NamespaceName) -> Self {
3431
Self { storage, namespace }
3532
}
3633
}

libsql-wal/src/segment/compacted.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use std::mem::size_of;
21
use std::io;
2+
use std::mem::size_of;
33

4-
use zerocopy::little_endian::{U128 as lu128, U32 as lu32, U64 as lu64, U16 as lu16};
4+
use zerocopy::little_endian::{U128 as lu128, U16 as lu16, U32 as lu32, U64 as lu64};
55
use zerocopy::{AsBytes, FromBytes, FromZeroes};
66

77
use crate::io::buf::{ZeroCopyBoxIoBuf, ZeroCopyBuf};
@@ -39,10 +39,13 @@ impl<F: FileExt> CompactedSegment<F> {
3939
ret?;
4040
let header = buf.into_inner();
4141
Ok(Self { file, header })
42-
4342
}
4443

45-
pub(crate) async fn read_frame(&self, frame: Box<Frame>, offset: u32) -> (Box<Frame>, io::Result<()>) {
44+
pub(crate) async fn read_frame(
45+
&self,
46+
frame: Box<Frame>,
47+
offset: u32,
48+
) -> (Box<Frame>, io::Result<()>) {
4649
let offset = size_of::<CompactedSegmentDataHeader>() + size_of::<Frame>() * offset as usize;
4750
let buf = ZeroCopyBoxIoBuf::new(frame);
4851
let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await;

libsql-wal/src/segment/list.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,19 @@ where
155155
// nothing to do
156156
} else {
157157
loop {
158-
let next = current.as_ref().unwrap().next.compare_and_swap(&segs[0], None);
158+
let next = current
159+
.as_ref()
160+
.unwrap()
161+
.next
162+
.compare_and_swap(&segs[0], None);
159163
if Arc::ptr_eq(&segs[0], next.as_ref().unwrap()) {
160-
break
164+
break;
161165
} else {
162166
current = next;
163167
}
164168
}
165169
}
166-
170+
167171
self.len.fetch_sub(segs.len(), Ordering::Relaxed);
168172

169173
db_file.set_len(size_after as u64 * 4096)?;
@@ -181,7 +185,10 @@ where
181185
current_fno: u64,
182186
until_fno: u64,
183187
seen: &'a mut RoaringBitmap,
184-
) -> (impl Stream<Item = crate::error::Result<Box<Frame>>> + 'a, u64) {
188+
) -> (
189+
impl Stream<Item = crate::error::Result<Box<Frame>>> + 'a,
190+
u64,
191+
) {
185192
// collect all the segments we need to read from to be up to date.
186193
// We keep a reference to them so that they are not discarded while we read them.
187194
let mut segments = Vec::new();
@@ -197,7 +204,10 @@ where
197204
}
198205

199206
if segments.is_empty() {
200-
return (tokio_util::either::Either::Left(tokio_stream::empty()), current_fno)
207+
return (
208+
tokio_util::either::Either::Left(tokio_stream::empty()),
209+
current_fno,
210+
);
201211
}
202212

203213
let new_current = segments
@@ -468,8 +478,9 @@ mod test {
468478
seal_current_segment(&shared);
469479

470480
let mut seen = RoaringBitmap::new();
471-
let (stream, replicated_until) =
472-
segment_list.stream_pages_from(0, last_offset, &mut seen).await;
481+
let (stream, replicated_until) = segment_list
482+
.stream_pages_from(0, last_offset, &mut seen)
483+
.await;
473484
tokio::pin!(stream);
474485

475486
assert_eq!(replicated_until, last_offset);

0 commit comments

Comments
 (0)