Skip to content

Commit 94217e2

Browse files
authored
Merge pull request #1608 from tursodatabase/replicate-from-stored-segments
replicate from stored segments
2 parents a64a089 + 903c881 commit 94217e2

File tree

27 files changed

+1242
-386
lines changed

27 files changed

+1242
-386
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsql-server/src/lib.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use libsql_sys::wal::either::Either as EitherWAL;
4141
#[cfg(feature = "durable-wal")]
4242
use libsql_sys::wal::either::Either3 as EitherWAL;
4343
use libsql_sys::wal::Sqlite3WalManager;
44+
use libsql_wal::checkpointer::LibsqlCheckpointer;
4445
use libsql_wal::registry::WalRegistry;
4546
use libsql_wal::storage::NoStorage;
4647
use libsql_wal::wal::LibsqlWalManager;
@@ -456,7 +457,7 @@ where
456457
let (stats_sender, stats_receiver) = mpsc::channel(1024);
457458

458459
// chose the wal backend
459-
let (make_wal_manager, registry_shutdown) = self.configure_wal_manager()?;
460+
let (make_wal_manager, registry_shutdown) = self.configure_wal_manager(&mut join_set)?;
460461

461462
let ns_config = NamespaceConfig {
462463
db_kind,
@@ -660,6 +661,7 @@ where
660661

661662
fn configure_wal_manager(
662663
&self,
664+
join_set: &mut JoinSet<anyhow::Result<()>>,
663665
) -> anyhow::Result<(
664666
Arc<dyn Fn() -> InnerWalManager + Sync + Send + 'static>,
665667
Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync + 'static>>,
@@ -703,15 +705,19 @@ where
703705

704706
match self.use_custom_wal {
705707
Some(CustomWAL::LibsqlWal) => {
706-
let registry = Arc::new(WalRegistry::new(wal_path, NoStorage)?);
708+
let (sender, receiver) = tokio::sync::mpsc::channel(64);
709+
let registry = Arc::new(WalRegistry::new(wal_path, NoStorage, sender)?);
710+
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 8);
711+
join_set.spawn(async move {
712+
checkpointer.run().await;
713+
Ok(())
714+
});
707715

708716
let wal = LibsqlWalManager::new(registry.clone(), Arc::new(namespace_resolver));
709717
let shutdown_notify = self.shutdown.clone();
710718
let shutdown_fut = Box::pin(async move {
711719
shutdown_notify.notified().await;
712-
tokio::task::spawn_blocking(move || registry.shutdown())
713-
.await
714-
.unwrap()?;
720+
registry.shutdown().await?;
715721
Ok(())
716722
});
717723

libsql-wal/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ clap = { version = "4.5.9", optional = true, features = ["derive"] }
4141
inquire = { version = "0.7.5", optional = true }
4242
tracing-subscriber = { version = "0.3.18", optional = true }
4343
aws-credential-types = { version = "1.2.0", optional = true }
44+
dashmap = "6.0.1"
4445

4546
[dev-dependencies]
4647
criterion = "0.5.1"

libsql-wal/benches/benchmarks.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO>>)) {
5959
let tmp = tempdir().unwrap();
6060
let resolver = |_: &Path| NamespaceName::from_string("test".into());
6161

62-
let registry = Arc::new(WalRegistry::new(tmp.path().join("wals"), NoStorage).unwrap());
62+
let (sender, _) = tokio::sync::mpsc::channel(12);
63+
let registry = Arc::new(WalRegistry::new(tmp.path().join("wals"), NoStorage, sender).unwrap());
6364
let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver));
6465

6566
let mut conn = libsql_sys::Connection::open(

libsql-wal/src/bins/shell/main.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use aws_config::{BehaviorVersion, Region};
66
use aws_credential_types::Credentials;
77
use aws_sdk_s3::config::SharedCredentialsProvider;
88
use clap::{Parser, ValueEnum};
9+
use libsql_wal::checkpointer::LibsqlCheckpointer;
910
use tokio::task::{block_in_place, JoinSet};
1011

1112
use libsql_sys::name::NamespaceName;
@@ -83,21 +84,24 @@ async fn main() {
8384

8485
if cli.s3_args.enable_s3 {
8586
let storage = setup_s3_storage(&cli, &mut join_set).await;
86-
handle(&cli, storage).await;
87+
handle(&cli, storage, &mut join_set).await;
8788
} else {
8889
todo!()
8990
}
9091

9192
while join_set.join_next().await.is_some() {}
9293
}
9394

94-
async fn handle<S>(cli: &Cli, storage: S)
95+
async fn handle<S>(cli: &Cli, storage: S, join_set: &mut JoinSet<()>)
9596
where
9697
S: Storage<Segment = SealedSegment<std::fs::File>>,
9798
{
9899
match &cli.subcommand {
99100
Subcommand::Shell { db_path } => {
100-
let registry = WalRegistry::new(db_path.clone(), storage).unwrap();
101+
let (sender, receiver) = tokio::sync::mpsc::channel(64);
102+
let registry = Arc::new(WalRegistry::new(db_path.clone(), storage, sender).unwrap());
103+
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 64);
104+
join_set.spawn(checkpointer.run());
101105
run_shell(
102106
registry,
103107
&db_path,
@@ -143,13 +147,15 @@ where
143147
println!("max durable frame: {durable}");
144148
}
145149

146-
async fn run_shell<S>(registry: WalRegistry<StdIO, S>, db_path: &Path, namespace: NamespaceName)
147-
where
150+
async fn run_shell<S>(
151+
registry: Arc<WalRegistry<StdIO, S>>,
152+
db_path: &Path,
153+
namespace: NamespaceName,
154+
) where
148155
S: Storage<Segment = SealedSegment<std::fs::File>>,
149156
{
150157
let db_path = db_path.join("dbs").join(namespace.as_str());
151158
tokio::fs::create_dir_all(&db_path).await.unwrap();
152-
let registry = Arc::new(registry);
153159
let resolver = move |path: &Path| {
154160
NamespaceName::from_string(
155161
path.parent()
@@ -216,7 +222,7 @@ where
216222

217223
drop(conn);
218224

219-
registry.shutdown().unwrap();
225+
registry.shutdown().await.unwrap();
220226
}
221227

222228
async fn handle_builtin<S>(

libsql-wal/src/checkpointer.rs

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,29 @@ use crate::registry::WalRegistry;
1111

1212
pub(crate) type NotifyCheckpointer = mpsc::Sender<NamespaceName>;
1313

14-
type LibsqlCheckpointer<IO, S> = Checkpointer<WalRegistry<IO, S>>;
14+
pub enum CheckpointMessage {
15+
/// notify that a namespace may be checkpointable
16+
Namespace(NamespaceName),
17+
/// shutdown initiated
18+
Shutdown,
19+
}
20+
21+
impl From<NamespaceName> for CheckpointMessage {
22+
fn from(value: NamespaceName) -> Self {
23+
Self::Namespace(value)
24+
}
25+
}
26+
27+
pub type LibsqlCheckpointer<IO, S> = Checkpointer<WalRegistry<IO, S>>;
1528

1629
impl<IO, S> LibsqlCheckpointer<IO, S>
1730
where
1831
IO: Io,
1932
S: Sync + Send + 'static,
2033
{
2134
pub fn new(
22-
registry: WalRegistry<IO, S>,
23-
notifier: mpsc::Receiver<NamespaceName>,
35+
registry: Arc<WalRegistry<IO, S>>,
36+
notifier: mpsc::Receiver<CheckpointMessage>,
2437
max_checkpointing_conccurency: usize,
2538
) -> Self {
2639
Self::new_with_performer(registry, notifier, max_checkpointing_conccurency)
@@ -70,7 +83,7 @@ pub struct Checkpointer<P> {
7083
checkpointing: HashSet<NamespaceName>,
7184
/// the checkpointer is notifier whenever there is a change to a namespage that could trigger a
7285
/// checkpoint
73-
recv: mpsc::Receiver<NamespaceName>,
86+
recv: mpsc::Receiver<CheckpointMessage>,
7487
max_checkpointing_conccurency: usize,
7588
shutting_down: bool,
7689
join_set: JoinSet<(NamespaceName, crate::error::Result<()>)>,
@@ -84,12 +97,12 @@ where
8497
P: PerformCheckpoint + Send + Sync + 'static,
8598
{
8699
fn new_with_performer(
87-
perform_checkpoint: P,
88-
notifier: mpsc::Receiver<NamespaceName>,
100+
perform_checkpoint: Arc<P>,
101+
notifier: mpsc::Receiver<CheckpointMessage>,
89102
max_checkpointing_conccurency: usize,
90103
) -> Self {
91104
Self {
92-
perform_checkpoint: Arc::new(perform_checkpoint),
105+
perform_checkpoint,
93106
scheduled: Default::default(),
94107
checkpointing: Default::default(),
95108
recv: notifier,
@@ -141,10 +154,10 @@ where
141154
}
142155
notified = self.recv.recv(), if !self.shutting_down => {
143156
match notified {
144-
Some(namespace) => {
157+
Some(CheckpointMessage::Namespace(namespace)) => {
145158
self.scheduled.insert(namespace);
146159
}
147-
None => {
160+
None | Some(CheckpointMessage::Shutdown) => {
148161
self.shutting_down = true;
149162
}
150163
}
@@ -201,10 +214,11 @@ mod test {
201214
}
202215

203216
let (sender, receiver) = mpsc::channel(8);
204-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint, receiver, 5);
217+
let mut checkpointer =
218+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
205219
let ns = NamespaceName::from("test");
206220

207-
sender.send(ns.clone()).await.unwrap();
221+
sender.send(ns.clone().into()).await.unwrap();
208222

209223
checkpointer.step().await;
210224

@@ -233,10 +247,11 @@ mod test {
233247
}
234248

235249
let (sender, receiver) = mpsc::channel(8);
236-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint, receiver, 5);
250+
let mut checkpointer =
251+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
237252
let ns = NamespaceName::from("test");
238253

239-
sender.send(ns.clone()).await.unwrap();
254+
sender.send(ns.clone().into()).await.unwrap();
240255

241256
checkpointer.step().await;
242257
assert_eq!(checkpointer.errors, 0);
@@ -264,7 +279,8 @@ mod test {
264279
}
265280

266281
let (sender, receiver) = mpsc::channel(8);
267-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint, receiver, 5);
282+
let mut checkpointer =
283+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
268284

269285
drop(sender);
270286

@@ -290,7 +306,8 @@ mod test {
290306
}
291307

292308
let (sender, receiver) = mpsc::channel(8);
293-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint, receiver, 5);
309+
let mut checkpointer =
310+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
294311

295312
drop(sender);
296313

@@ -323,12 +340,13 @@ mod test {
323340
}
324341

325342
let (sender, receiver) = mpsc::channel(8);
326-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint, receiver, 5);
343+
let mut checkpointer =
344+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
327345

328346
let ns: NamespaceName = "test".into();
329347

330-
sender.send(ns.clone()).await.unwrap();
331-
sender.send(ns.clone()).await.unwrap();
348+
sender.send(ns.clone().into()).await.unwrap();
349+
sender.send(ns.clone().into()).await.unwrap();
332350

333351
checkpointer.step().await;
334352

@@ -355,13 +373,14 @@ mod test {
355373
}
356374

357375
let (sender, receiver) = mpsc::channel(8);
358-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint, receiver, 5);
376+
let mut checkpointer =
377+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 5);
359378

360379
let ns1: NamespaceName = "test1".into();
361380
let ns2: NamespaceName = "test2".into();
362381

363-
sender.send(ns1.clone()).await.unwrap();
364-
sender.send(ns2.clone()).await.unwrap();
382+
sender.send(ns1.clone().into()).await.unwrap();
383+
sender.send(ns2.clone().into()).await.unwrap();
365384

366385
checkpointer.step().await;
367386

@@ -390,15 +409,16 @@ mod test {
390409
}
391410

392411
let (sender, receiver) = mpsc::channel(8);
393-
let mut checkpointer = Checkpointer::new_with_performer(TestPerformCheckoint, receiver, 2);
412+
let mut checkpointer =
413+
Checkpointer::new_with_performer(TestPerformCheckoint.into(), receiver, 2);
394414

395415
let ns1: NamespaceName = "test1".into();
396416
let ns2: NamespaceName = "test2".into();
397417
let ns3: NamespaceName = "test3".into();
398418

399-
sender.send(ns1.clone()).await.unwrap();
400-
sender.send(ns2.clone()).await.unwrap();
401-
sender.send(ns3.clone()).await.unwrap();
419+
sender.send(ns1.clone().into()).await.unwrap();
420+
sender.send(ns2.clone().into()).await.unwrap();
421+
sender.send(ns3.clone().into()).await.unwrap();
402422

403423
checkpointer.step().await;
404424
checkpointer.step().await;

0 commit comments

Comments
 (0)