Skip to content

Commit 89c7f7d

Browse files
committed
pass callback instead of spawning task for durable frameno notify
1 parent a539ad0 commit 89c7f7d

File tree

5 files changed

+94
-70
lines changed

5 files changed

+94
-70
lines changed

libsql-wal/src/registry.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use std::future::Future;
21
use std::num::NonZeroU64;
32
use std::path::{Path, PathBuf};
43
use std::sync::atomic::{AtomicBool, Ordering};
@@ -19,7 +18,7 @@ use crate::replication::storage::StorageReplicator;
1918
use crate::segment::list::SegmentList;
2019
use crate::segment::{current::CurrentSegment, sealed::SealedSegment};
2120
use crate::shared_wal::{SharedWal, SwapLog};
22-
use crate::storage::Storage;
21+
use crate::storage::{OnStoreCallback, Storage};
2322
use crate::transaction::TxGuard;
2423
use libsql_sys::name::NamespaceName;
2524

@@ -119,11 +118,13 @@ where
119118
// where the current log is sealed and it wasn't swapped.
120119
if let Some(sealed) = current.seal()? {
121120
// todo: pass config override here
122-
let notify = self.storage.store(&shared.namespace, sealed.clone(), None);
123121
let notifier = self.checkpoint_notifier.clone();
124122
let namespace = shared.namespace().clone();
125123
let durable_frame_no = shared.durable_frame_no.clone();
126-
tokio::spawn(update_durable(notify, notifier, durable_frame_no, namespace));
124+
let cb: OnStoreCallback = Box::new(move |fno| Box::pin(async move {
125+
update_durable(fno, notifier, durable_frame_no, namespace).await;
126+
}));
127+
self.storage.store(&shared.namespace, sealed.clone(), None, cb);
127128
new.tail().push(sealed);
128129
}
129130

@@ -135,14 +136,13 @@ where
135136
}
136137

137138
async fn update_durable(
138-
notify: impl Future<Output = u64>,
139+
new_durable: u64,
139140
notifier: mpsc::Sender<CheckpointMessage>,
140-
durable_frame_no: Arc<Mutex<u64>>,
141+
durable_frame_no_slot: Arc<Mutex<u64>>,
141142
namespace: NamespaceName,
142143
) {
143-
let new_durable = notify.await;
144144
{
145-
let mut g = durable_frame_no.lock();
145+
let mut g = durable_frame_no_slot.lock();
146146
if *g < new_durable {
147147
*g = new_durable;
148148
}
@@ -253,12 +253,14 @@ where
253253
if let Some(sealed) =
254254
SealedSegment::open(file.into(), entry.path().to_path_buf(), Default::default())?
255255
{
256-
// TODO: pass config override here
257-
let notify = self.storage.store(&namespace, sealed.clone(), None);
258256
let notifier = self.checkpoint_notifier.clone();
259-
let namespace = namespace.clone();
257+
let ns = namespace.clone();
260258
let durable_frame_no = durable_frame_no.clone();
261-
tokio::spawn(update_durable(notify, notifier, durable_frame_no, namespace));
259+
let cb: OnStoreCallback = Box::new(move |fno| Box::pin(async move {
260+
update_durable(fno, notifier, durable_frame_no, ns).await;
261+
}));
262+
// TODO: pass config override here
263+
self.storage.store(&namespace, sealed.clone(), None, cb);
262264
tail.push(sealed);
263265
}
264266
}

libsql-wal/src/storage/async_storage.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
//! `AsyncStorage` is a `Storage` implementation that defer storage to a background thread. The
22
//! durable frame_no is notified asynchronously.
33
4-
use std::future::Future;
54
use std::sync::Arc;
65

76
use chrono::Utc;
@@ -15,7 +14,7 @@ use crate::segment::Segment;
1514

1615
use super::backend::Backend;
1716
use super::scheduler::Scheduler;
18-
use super::{RestoreOptions, Storage, StoreSegmentRequest};
17+
use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest};
1918

2019
/// Background loop task state.
2120
///
@@ -86,8 +85,8 @@ where
8685
}
8786
msg = self.receiver.recv(), if !shutting_down => {
8887
match msg {
89-
Some(StorageLoopMessage::StoreReq(req, ret)) => {
90-
self.scheduler.register(req, ret);
88+
Some(StorageLoopMessage::StoreReq(req)) => {
89+
self.scheduler.register(req);
9190
}
9291
Some(StorageLoopMessage::DurableFrameNoReq { namespace, ret, config_override }) => {
9392
self.fetch_durable_frame_no_async(namespace, ret, config_override);
@@ -134,7 +133,7 @@ pub struct BottomlessConfig<C> {
134133
}
135134

136135
enum StorageLoopMessage<C, S> {
137-
StoreReq(StoreSegmentRequest<C, S>, oneshot::Sender<u64>),
136+
StoreReq(StoreSegmentRequest<C, S>),
138137
DurableFrameNoReq {
139138
namespace: NamespaceName,
140139
config_override: Option<Arc<C>>,
@@ -164,22 +163,19 @@ where
164163
namespace: &NamespaceName,
165164
segment: Self::Segment,
166165
config_override: Option<Arc<Self::Config>>,
167-
) -> impl Future<Output = u64> + Send + Sync + 'static{
166+
on_store_callback: OnStoreCallback,
167+
){
168168
let req = StoreSegmentRequest {
169169
namespace: namespace.clone(),
170170
segment,
171171
created_at: Utc::now(),
172172
storage_config_override: config_override,
173+
on_store_callback,
173174
};
174175

175-
let (sender, receiver) = oneshot::channel();
176176
self.job_sender
177-
.send(StorageLoopMessage::StoreReq(req, sender))
177+
.send(StorageLoopMessage::StoreReq(req))
178178
.expect("bottomless loop was closed before the handle was dropped");
179-
180-
async move {
181-
receiver.await.unwrap()
182-
}
183179
}
184180

185181
async fn durable_frame_no(

libsql-wal/src/storage/job.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
use std::ops::Deref;
22

3-
use tokio::sync::oneshot;
4-
53
use super::backend::Backend;
64
use super::backend::SegmentMeta;
75
use super::Result;
@@ -14,7 +12,6 @@ use crate::segment::Segment;
1412
pub(crate) struct IndexedRequest<C, T> {
1513
pub(crate) request: StoreSegmentRequest<C, T>,
1614
pub(crate) id: u64,
17-
pub(crate) ret: oneshot::Sender<u64>,
1815
}
1916

2017
impl<C, T> Deref for IndexedRequest<C, T> {
@@ -102,6 +99,7 @@ pub(crate) struct JobResult<C, S> {
10299

103100
#[cfg(test)]
104101
mod test {
102+
use std::future::ready;
105103
// use std::fs::File;
106104
// use std::io::Write;
107105
// use std::mem::size_of;
@@ -489,7 +487,7 @@ mod test {
489487
_config: &Self::Config,
490488
_namespace: &NamespaceName,
491489
_key: &SegmentKey,
492-
_file: &impl FileExt
490+
_file: &impl FileExt,
493491
) -> Result<()> {
494492
todo!()
495493
}
@@ -521,9 +519,9 @@ mod test {
521519
segment: TestSegment,
522520
created_at: Utc::now(),
523521
storage_config_override: None,
522+
on_store_callback: Box::new(|_| Box::pin(ready(()))),
524523
},
525524
id: 0,
526-
ret: oneshot::channel().0,
527525
},
528526
};
529527

libsql-wal/src/storage/mod.rs

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::BTreeMap;
22
use std::fmt;
33
use std::future::Future;
44
use std::path::PathBuf;
5+
use std::pin::Pin;
56
use std::str::FromStr;
67
use std::sync::Arc;
78

@@ -121,6 +122,14 @@ impl fmt::Display for SegmentKey {
121122
}
122123
}
123124

125+
/// takes the new durable frame_no and returns a future
126+
pub type OnStoreCallback = Box<
127+
dyn FnOnce(u64) -> Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>
128+
+ Send
129+
+ Sync
130+
+ 'static,
131+
>;
132+
124133
pub trait Storage: Send + Sync + 'static {
125134
type Segment: Segment;
126135
type Config;
@@ -133,7 +142,8 @@ pub trait Storage: Send + Sync + 'static {
133142
namespace: &NamespaceName,
134143
seg: Self::Segment,
135144
config_override: Option<Arc<Self::Config>>,
136-
) -> impl Future<Output = u64> + Send + Sync + 'static;
145+
on_store: OnStoreCallback,
146+
);
137147

138148
fn durable_frame_no_sync(
139149
&self,
@@ -190,8 +200,8 @@ impl Storage for NoStorage {
190200
_namespace: &NamespaceName,
191201
_seg: Self::Segment,
192202
_config: Option<Arc<Self::Config>>,
193-
) -> impl Future<Output = u64> + Send + Sync + 'static{
194-
std::future::ready(u64::MAX)
203+
_on_store: OnStoreCallback,
204+
) {
195205
}
196206

197207
async fn durable_frame_no(
@@ -286,12 +296,15 @@ impl<IO: Io> TestStorage<IO> {
286296
pub fn new_io(store: bool, io: IO) -> Self {
287297
let dir = tempdir().unwrap();
288298
Self {
289-
inner: Arc::new(TestStorageInner {
290-
dir,
291-
stored: Default::default(),
292-
io,
293-
store,
294-
}.into()),
299+
inner: Arc::new(
300+
TestStorageInner {
301+
dir,
302+
stored: Default::default(),
303+
io,
304+
store,
305+
}
306+
.into(),
307+
),
295308
}
296309
}
297310
}
@@ -305,14 +318,17 @@ impl<IO: Io> Storage for TestStorage<IO> {
305318
namespace: &NamespaceName,
306319
seg: Self::Segment,
307320
_config: Option<Arc<Self::Config>>,
308-
) -> impl Future<Output = u64> + Send + Sync + 'static{
321+
on_store: OnStoreCallback,
322+
) {
309323
let mut inner = self.inner.lock();
310324
if inner.store {
311325
let id = uuid::Uuid::new_v4();
312326
let out_path = inner.dir.path().join(id.to_string());
313327
let out_file = inner.io.open(true, true, true, &out_path).unwrap();
314-
let index = tokio::runtime::Handle::current().block_on(seg.compact(&out_file, id)).unwrap();
315-
let end_frame_no = seg.header().last_committed();
328+
let index = tokio::runtime::Handle::current()
329+
.block_on(seg.compact(&out_file, id))
330+
.unwrap();
331+
let end_frame_no = seg.header().last_committed();
316332
let key = SegmentKey {
317333
start_frame_no: seg.header().start_frame_no.get(),
318334
end_frame_no,
@@ -323,9 +339,7 @@ impl<IO: Io> Storage for TestStorage<IO> {
323339
.entry(namespace.clone())
324340
.or_default()
325341
.insert(key, (out_path, index));
326-
std::future::ready(end_frame_no)
327-
} else {
328-
std::future::ready(u64::MAX)
342+
tokio::runtime::Handle::current().block_on(on_store(end_frame_no))
329343
}
330344
}
331345

@@ -354,12 +368,10 @@ impl<IO: Io> Storage for TestStorage<IO> {
354368
) -> u64 {
355369
let inner = self.inner.lock();
356370
if inner.store {
357-
let Some(segs) = inner.stored.get(namespace) else { return 0 };
358-
segs
359-
.keys()
360-
.map(|k| k.end_frame_no)
361-
.max()
362-
.unwrap_or(0)
371+
let Some(segs) = inner.stored.get(namespace) else {
372+
return 0;
373+
};
374+
segs.keys().map(|k| k.end_frame_no).max().unwrap_or(0)
363375
} else {
364376
u64::MAX
365377
}
@@ -374,9 +386,10 @@ impl<IO: Io> Storage for TestStorage<IO> {
374386
let inner = self.inner.lock();
375387
if inner.store {
376388
if let Some(segs) = inner.stored.get(namespace) {
377-
let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(frame_no))
378-
else { return Err(Error::FrameNotFound(frame_no)) };
379-
return Ok(*key)
389+
let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(frame_no)) else {
390+
return Err(Error::FrameNotFound(frame_no));
391+
};
392+
return Ok(*key);
380393
} else {
381394
panic!("namespace not found");
382395
}
@@ -394,12 +407,9 @@ impl<IO: Io> Storage for TestStorage<IO> {
394407
let inner = self.inner.lock();
395408
if inner.store {
396409
match inner.stored.get(namespace) {
397-
Some(segs) => {
398-
Ok(segs.get(&key).unwrap().1.clone())
399-
}
410+
Some(segs) => Ok(segs.get(&key).unwrap().1.clone()),
400411
None => panic!("unknown namespace"),
401412
}
402-
403413
} else {
404414
panic!("not storing")
405415
}
@@ -421,14 +431,12 @@ impl<IO: Io> Storage for TestStorage<IO> {
421431
}
422432
None => panic!("unknown namespace"),
423433
}
424-
425434
} else {
426435
panic!("not storing")
427436
}
428437
}
429438
}
430439

431-
#[derive(Debug)]
432440
pub struct StoreSegmentRequest<C, S> {
433441
namespace: NamespaceName,
434442
/// Path to the segment. Read-only for bottomless
@@ -439,4 +447,21 @@ pub struct StoreSegmentRequest<C, S> {
439447
/// alternative configuration to use with the storage layer.
440448
/// e.g: S3 overrides
441449
storage_config_override: Option<Arc<C>>,
450+
/// Called after the segment was stored, with the new durable index
451+
on_store_callback: OnStoreCallback,
452+
}
453+
454+
impl<C, S> fmt::Debug for StoreSegmentRequest<C, S>
455+
where
456+
C: fmt::Debug,
457+
S: fmt::Debug,
458+
{
459+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
460+
f.debug_struct("StoreSegmentRequest")
461+
.field("namespace", &self.namespace)
462+
.field("segment", &self.segment)
463+
.field("created_at", &self.created_at)
464+
.field("storage_config_override", &self.storage_config_override)
465+
.finish()
466+
}
442467
}

0 commit comments

Comments
 (0)