Skip to content

Commit 6e2cd97

Browse files
committed
remove durable_notifier
use callbacks instead
1 parent 89c7f7d commit 6e2cd97

File tree

2 files changed

+17
-29
lines changed

2 files changed

+17
-29
lines changed

libsql-wal/src/storage/async_storage.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,6 @@ enum StorageLoopMessage<C, S> {
144144
pub struct AsyncStorage<B: Backend, S> {
145145
/// send request to the main loop
146146
job_sender: mpsc::UnboundedSender<StorageLoopMessage<B::Config, S>>,
147-
/// receiver for the current max durable index
148-
durable_notifier: mpsc::Receiver<(NamespaceName, u64)>,
149147
force_shutdown: oneshot::Sender<()>,
150148
backend: Arc<B>,
151149
}
@@ -273,9 +271,8 @@ impl<B: Backend, S> AsyncStorage<B, S> {
273271
S: Segment,
274272
{
275273
let (job_snd, job_rcv) = tokio::sync::mpsc::unbounded_channel();
276-
let (durable_notifier_snd, durable_notifier_rcv) = tokio::sync::mpsc::channel(16);
277274
let (shutdown_snd, shutdown_rcv) = tokio::sync::oneshot::channel();
278-
let scheduler = Scheduler::new(durable_notifier_snd);
275+
let scheduler = Scheduler::new();
279276
let storage_loop = AsyncStorageLoop {
280277
receiver: job_rcv,
281278
scheduler,
@@ -287,7 +284,6 @@ impl<B: Backend, S> AsyncStorage<B, S> {
287284

288285
let this = Self {
289286
job_sender: job_snd,
290-
durable_notifier: durable_notifier_rcv,
291287
force_shutdown: shutdown_snd,
292288
backend: config.backend,
293289
};

libsql-wal/src/storage/scheduler.rs

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
use std::cmp::Reverse;
22
use std::collections::{HashMap, VecDeque};
33

4-
use tokio::sync::mpsc;
5-
64
use super::job::{IndexedRequest, Job, JobResult};
75
use super::StoreSegmentRequest;
86
use libsql_sys::name::NamespaceName;
@@ -32,16 +30,14 @@ impl<C, F> Default for NamespaceRequests<C, F> {
3230
/// It is generic over C: the storage config type (for config overrides), and T, the segment type
3331
pub(crate) struct Scheduler<C, T> {
3432
/// notify new durability index for namespace
35-
durable_notifier: mpsc::Sender<(NamespaceName, u64)>,
3633
requests: HashMap<NamespaceName, NamespaceRequests<C, T>>,
3734
queue: priority_queue::PriorityQueue<NamespaceName, Reverse<u64>>,
3835
next_request_id: u64,
3936
}
4037

4138
impl<C, T> Scheduler<C, T> {
42-
pub fn new(durable_notifier: mpsc::Sender<(NamespaceName, u64)>) -> Self {
39+
pub fn new() -> Self {
4340
Self {
44-
durable_notifier,
4541
requests: Default::default(),
4642
queue: Default::default(),
4743
next_request_id: Default::default(),
@@ -108,14 +104,6 @@ impl<C, T> Scheduler<C, T> {
108104
Ok(durable_index) => {
109105
tracing::debug!("job success registered");
110106
(result.job.request.request.on_store_callback)(durable_index).await;
111-
if self
112-
.durable_notifier
113-
.send((name.clone(), durable_index))
114-
.await
115-
.is_err()
116-
{
117-
tracing::warn!("durability notifier was closed, proceeding anyway");
118-
}
119107
}
120108
Err(e) => {
121109
tracing::error!("error processing request, re-enqueuing: {e}");
@@ -154,6 +142,7 @@ mod test {
154142
use std::future::ready;
155143

156144
use chrono::Utc;
145+
use tokio::sync::oneshot;
157146

158147
use crate::storage::Error;
159148
use libsql_sys::name::NamespaceName;
@@ -162,29 +151,34 @@ mod test {
162151

163152
#[tokio::test]
164153
async fn schedule_simple() {
165-
let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
166-
let mut scheduler = Scheduler::<(), ()>::new(sender);
154+
let mut scheduler = Scheduler::<(), ()>::new();
167155

168156
let ns1 = NamespaceName::from("test1");
169157
let ns2 = NamespaceName::from("test2");
170158

159+
let (job_1_snd, job_1_rcv) = oneshot::channel();
171160
scheduler.register(
172161
StoreSegmentRequest {
173162
namespace: ns1.clone(),
174163
segment: (),
175164
created_at: Utc::now(),
176165
storage_config_override: None,
177-
on_store_callback: Box::new(|_| Box::pin(ready(()))),
166+
on_store_callback: Box::new(move |n| Box::pin( async move {
167+
let _ = job_1_snd.send(n);
168+
})),
178169
},
179170
);
180171

172+
let (job_2_snd, job_2_rcv) = oneshot::channel();
181173
scheduler.register(
182174
StoreSegmentRequest {
183175
namespace: ns2.clone(),
184176
segment: (),
185177
created_at: Utc::now(),
186178
storage_config_override: None,
187-
on_store_callback: Box::new(|_| Box::pin(ready(()))),
179+
on_store_callback: Box::new(move |n| Box::pin( async move {
180+
let _ = job_2_snd.send(n);
181+
})),
188182
},
189183
);
190184

@@ -194,7 +188,7 @@ mod test {
194188
segment: (),
195189
created_at: Utc::now(),
196190
storage_config_override: None,
197-
on_store_callback: Box::new(|_| Box::pin(ready(()))),
191+
on_store_callback: Box::new(move |_| Box::pin(ready(()))),
198192
},
199193
);
200194

@@ -215,15 +209,15 @@ mod test {
215209
.await;
216210

217211
assert!(scheduler.schedule().is_none());
218-
assert_eq!(receiver.recv().await.unwrap(), (ns2.clone(), 42));
212+
assert_eq!(job_2_rcv.await.unwrap(), 42);
219213

220214
scheduler
221215
.report(JobResult {
222216
job: job1,
223217
result: Ok(10),
224218
})
225219
.await;
226-
assert_eq!(receiver.recv().await.unwrap(), (ns1.clone(), 10));
220+
assert_eq!(job_1_rcv.await.unwrap(), 10);
227221

228222
let job1 = scheduler.schedule().unwrap();
229223
assert_eq!(job1.request.request.namespace, ns1);
@@ -232,8 +226,7 @@ mod test {
232226

233227
#[tokio::test]
234228
async fn job_error_reschedule() {
235-
let (sender, _) = tokio::sync::mpsc::channel(10);
236-
let mut scheduler = Scheduler::<(), ()>::new(sender);
229+
let mut scheduler = Scheduler::<(), ()>::new();
237230

238231
let ns1 = NamespaceName::from("test1");
239232
let ns2 = NamespaceName::from("test2");
@@ -275,8 +268,7 @@ mod test {
275268

276269
#[tokio::test]
277270
async fn schedule_while_in_flight() {
278-
let (sender, _) = tokio::sync::mpsc::channel(10);
279-
let mut scheduler = Scheduler::<(), ()>::new(sender);
271+
let mut scheduler = Scheduler::<(), ()>::new();
280272

281273
let ns1 = NamespaceName::from("test1");
282274

0 commit comments

Comments
 (0)