Skip to content

Commit e8dafb2

Browse files
committed
fix shutdown
1 parent 882fee6 commit e8dafb2

File tree

7 files changed

+155
-81
lines changed

7 files changed

+155
-81
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsql-server/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -714,9 +714,7 @@ where
714714
let shutdown_notify = self.shutdown.clone();
715715
let shutdown_fut = Box::pin(async move {
716716
shutdown_notify.notified().await;
717-
tokio::task::spawn_blocking(move || registry.shutdown())
718-
.await
719-
.unwrap()?;
717+
registry.shutdown().await?;
720718
Ok(())
721719
});
722720

libsql-wal/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ clap = { version = "4.5.9", optional = true, features = ["derive"] }
4141
inquire = { version = "0.7.5", optional = true }
4242
tracing-subscriber = { version = "0.3.18", optional = true }
4343
aws-credential-types = { version = "1.2.0", optional = true }
44+
dashmap = "6.0.1"
4445

4546
[dev-dependencies]
4647
criterion = "0.5.1"

libsql-wal/src/bins/shell/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ where
219219

220220
drop(conn);
221221

222-
registry.shutdown().unwrap();
222+
registry.shutdown().await.unwrap();
223223
}
224224

225225
async fn handle_builtin<S>(

libsql-wal/src/checkpointer.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,19 @@ use crate::registry::WalRegistry;
1111

1212
pub(crate) type NotifyCheckpointer = mpsc::Sender<NamespaceName>;
1313

14+
pub enum CheckpointMessage {
15+
/// notify that a namespace may be checkpointable
16+
Namespace(NamespaceName),
17+
/// shutdown initiated
18+
Shutdown,
19+
}
20+
21+
impl From<NamespaceName> for CheckpointMessage {
22+
fn from(value: NamespaceName) -> Self {
23+
Self::Namespace(value)
24+
}
25+
}
26+
1427
pub type LibsqlCheckpointer<IO, S> = Checkpointer<WalRegistry<IO, S>>;
1528

1629
impl<IO, S> LibsqlCheckpointer<IO, S>
@@ -20,7 +33,7 @@ where
2033
{
2134
pub fn new(
2235
registry: Arc<WalRegistry<IO, S>>,
23-
notifier: mpsc::Receiver<NamespaceName>,
36+
notifier: mpsc::Receiver<CheckpointMessage>,
2437
max_checkpointing_conccurency: usize,
2538
) -> Self {
2639
Self::new_with_performer(registry, notifier, max_checkpointing_conccurency)
@@ -70,7 +83,7 @@ pub struct Checkpointer<P> {
7083
checkpointing: HashSet<NamespaceName>,
7184
/// the checkpointer is notifier whenever there is a change to a namespage that could trigger a
7285
/// checkpoint
73-
recv: mpsc::Receiver<NamespaceName>,
86+
recv: mpsc::Receiver<CheckpointMessage>,
7487
max_checkpointing_conccurency: usize,
7588
shutting_down: bool,
7689
join_set: JoinSet<(NamespaceName, crate::error::Result<()>)>,
@@ -85,7 +98,7 @@ where
8598
{
8699
fn new_with_performer(
87100
perform_checkpoint: Arc<P>,
88-
notifier: mpsc::Receiver<NamespaceName>,
101+
notifier: mpsc::Receiver<CheckpointMessage>,
89102
max_checkpointing_conccurency: usize,
90103
) -> Self {
91104
Self {
@@ -141,10 +154,10 @@ where
141154
}
142155
notified = self.recv.recv(), if !self.shutting_down => {
143156
match notified {
144-
Some(namespace) => {
157+
Some(CheckpointMessage::Namespace(namespace)) => {
145158
self.scheduled.insert(namespace);
146159
}
147-
None => {
160+
None | Some(CheckpointMessage::Shutdown) => {
148161
self.shutting_down = true;
149162
}
150163
}
@@ -204,7 +217,7 @@ mod test {
204217
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
205218
let ns = NamespaceName::from("test");
206219

207-
sender.send(ns.clone()).await.unwrap();
220+
sender.send(ns.clone().into()).await.unwrap();
208221

209222
checkpointer.step().await;
210223

@@ -236,7 +249,7 @@ mod test {
236249
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
237250
let ns = NamespaceName::from("test");
238251

239-
sender.send(ns.clone()).await.unwrap();
252+
sender.send(ns.clone().into()).await.unwrap();
240253

241254
checkpointer.step().await;
242255
assert_eq!(checkpointer.errors, 0);
@@ -327,8 +340,8 @@ mod test {
327340

328341
let ns: NamespaceName = "test".into();
329342

330-
sender.send(ns.clone()).await.unwrap();
331-
sender.send(ns.clone()).await.unwrap();
343+
sender.send(ns.clone().into()).await.unwrap();
344+
sender.send(ns.clone().into()).await.unwrap();
332345

333346
checkpointer.step().await;
334347

@@ -360,8 +373,8 @@ mod test {
360373
let ns1: NamespaceName = "test1".into();
361374
let ns2: NamespaceName = "test2".into();
362375

363-
sender.send(ns1.clone()).await.unwrap();
364-
sender.send(ns2.clone()).await.unwrap();
376+
sender.send(ns1.clone().into()).await.unwrap();
377+
sender.send(ns2.clone().into()).await.unwrap();
365378

366379
checkpointer.step().await;
367380

@@ -396,9 +409,9 @@ mod test {
396409
let ns2: NamespaceName = "test2".into();
397410
let ns3: NamespaceName = "test3".into();
398411

399-
sender.send(ns1.clone()).await.unwrap();
400-
sender.send(ns2.clone()).await.unwrap();
401-
sender.send(ns3.clone()).await.unwrap();
412+
sender.send(ns1.clone().into()).await.unwrap();
413+
sender.send(ns2.clone().into()).await.unwrap();
414+
sender.send(ns3.clone().into()).await.unwrap();
402415

403416
checkpointer.step().await;
404417
checkpointer.step().await;

0 commit comments

Comments
 (0)