Skip to content

Commit c953114

Browse files
committed
report durable frame_no
1 parent 44507d6 commit c953114

File tree

4 files changed

+75
-36
lines changed

4 files changed

+75
-36
lines changed

libsql-wal/src/storage/async_storage.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
//! `AsyncStorage` is a `Storage` implementation that defer storage to a background thread. The
22
//! durable frame_no is notified asynchronously.
33
4+
use std::future::Future;
45
use std::sync::Arc;
56

67
use chrono::Utc;
78
use libsql_sys::name::NamespaceName;
89
use tokio::sync::{mpsc, oneshot};
910
use tokio::task::JoinSet;
1011

11-
use crate::io::{Io, StdIO};
12+
use crate::io::{FileExt, Io, StdIO};
13+
use crate::segment::compacted::CompactedSegment;
1214
use crate::segment::Segment;
1315

1416
use super::backend::Backend;
@@ -84,8 +86,8 @@ where
8486
}
8587
msg = self.receiver.recv(), if !shutting_down => {
8688
match msg {
87-
Some(StorageLoopMessage::StoreReq(req)) => {
88-
self.scheduler.register(req);
89+
Some(StorageLoopMessage::StoreReq(req, ret)) => {
90+
self.scheduler.register(req, ret);
8991
}
9092
Some(StorageLoopMessage::DurableFrameNoReq { namespace, ret, config_override }) => {
9193
self.fetch_durable_frame_no_async(namespace, ret, config_override);
@@ -132,7 +134,7 @@ pub struct BottomlessConfig<C> {
132134
}
133135

134136
enum StorageLoopMessage<C, S> {
135-
StoreReq(StoreSegmentRequest<C, S>),
137+
StoreReq(StoreSegmentRequest<C, S>, oneshot::Sender<u64>),
136138
DurableFrameNoReq {
137139
namespace: NamespaceName,
138140
config_override: Option<Arc<C>>,
@@ -162,17 +164,22 @@ where
162164
namespace: &NamespaceName,
163165
segment: Self::Segment,
164166
config_override: Option<Arc<Self::Config>>,
165-
) {
167+
) -> impl Future<Output = u64> + Send + Sync + 'static{
166168
let req = StoreSegmentRequest {
167169
namespace: namespace.clone(),
168170
segment,
169171
created_at: Utc::now(),
170172
storage_config_override: config_override,
171173
};
172174

175+
let (sender, receiver) = oneshot::channel();
173176
self.job_sender
174-
.send(StorageLoopMessage::StoreReq(req))
177+
.send(StorageLoopMessage::StoreReq(req, sender))
175178
.expect("bottomless loop was closed before the handle was dropped");
179+
180+
async move {
181+
receiver.await.unwrap()
182+
}
176183
}
177184

178185
async fn durable_frame_no(

libsql-wal/src/storage/job.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::ops::Deref;
22

3+
use tokio::sync::oneshot;
4+
35
use super::backend::Backend;
46
use super::backend::SegmentMeta;
57
use super::Result;
@@ -12,6 +14,7 @@ use crate::segment::Segment;
1214
pub(crate) struct IndexedRequest<C, T> {
1315
pub(crate) request: StoreSegmentRequest<C, T>,
1416
pub(crate) id: u64,
17+
pub(crate) ret: oneshot::Sender<u64>,
1518
}
1619

1720
impl<C, T> Deref for IndexedRequest<C, T> {
@@ -484,6 +487,7 @@ mod test {
484487
storage_config_override: None,
485488
},
486489
id: 0,
490+
ret: oneshot::channel().0,
487491
},
488492
};
489493

libsql-wal/src/storage/mod.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
1-
use std::marker::PhantomData;
1+
use std::collections::BTreeMap;
2+
use std::future::Future;
3+
use std::path::PathBuf;
24
use std::sync::Arc;
35

46
use chrono::{DateTime, Utc};
7+
use fst::Map;
8+
use hashbrown::HashMap;
59
use libsql_sys::name::NamespaceName;
10+
use parking_lot::Mutex;
11+
use tempfile::{tempdir, TempDir};
612

7-
use crate::io::FileExt;
13+
use crate::io::{FileExt, Io, StdIO};
14+
use crate::segment::compacted::CompactedSegment;
815
use crate::segment::{sealed::SealedSegment, Segment};
916

17+
use self::backend::s3::SegmentKey;
1018
pub use self::error::Error;
1119

12-
mod job;
1320
pub mod async_storage;
1421
pub mod backend;
1522
pub(crate) mod error;
23+
mod job;
1624
mod scheduler;
1725

1826
pub type Result<T, E = self::error::Error> = std::result::Result<T, E>;
@@ -27,12 +35,14 @@ pub trait Storage: Send + Sync + 'static {
2735
type Config;
2836
/// store the passed segment for `namespace`. This function is called in a context where
2937
/// blocking is acceptable.
38+
/// returns a future that resolves when the segment is stored
39+
/// The segment should be stored whether or not the future is polled.
3040
fn store(
3141
&self,
3242
namespace: &NamespaceName,
3343
seg: Self::Segment,
3444
config_override: Option<Arc<Self::Config>>,
35-
);
45+
) -> impl Future<Output = u64> + Send + Sync + 'static;
3646

3747
fn durable_frame_no_sync(
3848
&self,
@@ -89,7 +99,8 @@ impl Storage for NoStorage {
8999
_namespace: &NamespaceName,
90100
_seg: Self::Segment,
91101
_config: Option<Arc<Self::Config>>,
92-
) {
102+
) -> impl Future<Output = u64> + Send + Sync + 'static{
103+
std::future::ready(u64::MAX)
93104
}
94105

95106
async fn durable_frame_no(

libsql-wal/src/storage/scheduler.rs

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::cmp::Reverse;
22
use std::collections::{HashMap, VecDeque};
33

4-
use tokio::sync::mpsc;
4+
use tokio::sync::{mpsc, oneshot};
55

66
use super::job::{IndexedRequest, Job, JobResult};
77
use super::StoreSegmentRequest;
@@ -50,14 +50,14 @@ impl<C, T> Scheduler<C, T> {
5050

5151
/// Register a new request with the scheduler
5252
#[tracing::instrument(skip_all)]
53-
pub fn register(&mut self, request: StoreSegmentRequest<C, T>) {
53+
pub fn register(&mut self, request: StoreSegmentRequest<C, T>, ret: oneshot::Sender<u64>) {
5454
// invariant: new segment comes immediately after the latest segment for that namespace. This means:
5555
// - immediately after the last registered segment, if there is any
5656
// - immediately after the last durable index
5757
let id = self.next_request_id;
5858
self.next_request_id += 1;
5959
let name = request.namespace.clone();
60-
let slot = IndexedRequest { request, id };
60+
let slot = IndexedRequest { request, id, ret };
6161
let requests = self.requests.entry(name.clone()).or_default();
6262
requests.requests.push_back(slot);
6363

@@ -107,6 +107,7 @@ impl<C, T> Scheduler<C, T> {
107107
match result.result {
108108
Ok(durable_index) => {
109109
tracing::debug!("job success registered");
110+
let _ = result.job.request.ret.send(durable_index);
110111
if self
111112
.durable_notifier
112113
.send((name.clone(), durable_index))
@@ -165,26 +166,35 @@ mod test {
165166
let ns1 = NamespaceName::from("test1");
166167
let ns2 = NamespaceName::from("test2");
167168

168-
scheduler.register(StoreSegmentRequest {
169-
namespace: ns1.clone(),
170-
segment: (),
171-
created_at: Utc::now(),
172-
storage_config_override: None,
173-
});
169+
scheduler.register(
170+
StoreSegmentRequest {
171+
namespace: ns1.clone(),
172+
segment: (),
173+
created_at: Utc::now(),
174+
storage_config_override: None,
175+
},
176+
oneshot::channel().0,
177+
);
174178

175-
scheduler.register(StoreSegmentRequest {
176-
namespace: ns2.clone(),
177-
segment: (),
178-
created_at: Utc::now(),
179-
storage_config_override: None,
180-
});
179+
scheduler.register(
180+
StoreSegmentRequest {
181+
namespace: ns2.clone(),
182+
segment: (),
183+
created_at: Utc::now(),
184+
storage_config_override: None,
185+
},
186+
oneshot::channel().0,
187+
);
181188

182-
scheduler.register(StoreSegmentRequest {
183-
namespace: ns1.clone(),
184-
segment: (),
185-
created_at: Utc::now(),
186-
storage_config_override: None,
187-
});
189+
scheduler.register(
190+
StoreSegmentRequest {
191+
namespace: ns1.clone(),
192+
segment: (),
193+
created_at: Utc::now(),
194+
storage_config_override: None,
195+
},
196+
oneshot::channel().0,
197+
);
188198

189199
let job1 = scheduler.schedule().unwrap();
190200
assert_eq!(job1.request.request.namespace, ns1);
@@ -231,14 +241,18 @@ mod test {
231241
segment: (),
232242
created_at: Utc::now(),
233243
storage_config_override: None,
234-
});
244+
},
245+
oneshot::channel().0,
246+
);
235247

236248
scheduler.register(StoreSegmentRequest {
237249
namespace: ns2.clone(),
238250
segment: (),
239251
created_at: Utc::now(),
240252
storage_config_override: None,
241-
});
253+
},
254+
oneshot::channel().0,
255+
);
242256

243257
let job1 = scheduler.schedule().unwrap();
244258
assert_eq!(job1.request.request.namespace, ns1);
@@ -269,7 +283,9 @@ mod test {
269283
segment: (),
270284
created_at: Utc::now(),
271285
storage_config_override: None,
272-
});
286+
},
287+
oneshot::channel().0,
288+
);
273289

274290
let job = scheduler.schedule().unwrap();
275291
assert_eq!(job.request.request.namespace, ns1);
@@ -280,7 +296,8 @@ mod test {
280296
segment: (),
281297
created_at: Utc::now(),
282298
storage_config_override: None,
283-
});
299+
}, oneshot::channel().0,
300+
);
284301

285302
assert!(scheduler.schedule().is_none());
286303

0 commit comments

Comments
 (0)