Skip to content

Commit cd9d365

Browse files
authored
Merge pull request #1744 from tursodatabase/libsql-wal-fork
Libsql-wal: implement forking
2 parents 85c3709 + 1df0acb commit cd9d365

File tree

23 files changed

+283
-190
lines changed

23 files changed

+283
-190
lines changed

libsql-server/src/bottomless_migrate.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ pub async fn bottomless_migrate(
5050
let tmp = TempDir::new()?;
5151

5252
tokio::fs::create_dir_all(tmp.path().join("dbs")).await?;
53-
tokio::fs::create_dir_all(tmp.path().join("wals")).await?;
5453

5554
let configs_stream = meta_store.namespaces();
5655
tokio::pin!(configs_stream);
@@ -67,11 +66,7 @@ pub async fn bottomless_migrate(
6766
}
6867
});
6968

70-
let tmp_registry = Arc::new(WalRegistry::new(
71-
tmp.path().join("wals"),
72-
NoStorage.into(),
73-
sender,
74-
)?);
69+
let tmp_registry = Arc::new(WalRegistry::new(NoStorage.into(), sender)?);
7570

7671
let mut configurators = NamespaceConfigurators::default();
7772

@@ -110,7 +105,6 @@ pub async fn bottomless_migrate(
110105
// doesn't exist, then we restore it, otherwise, we delete it.
111106
tokio::fs::rename(&base_dbs_dir, &base_dbs_dir_tmp).await?;
112107
tokio::fs::rename(tmp.path().join("dbs"), base_dbs_dir).await?;
113-
tokio::fs::rename(tmp.path().join("wals"), base_config.base_path.join("wals")).await?;
114108
tokio::fs::remove_dir_all(base_config.base_path.join("_dbs")).await?;
115109

116110
Ok(())

libsql-server/src/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ impl IntoResponse for &ForkError {
325325
| ForkError::BackupServiceNotConfigured
326326
| ForkError::CreateNamespace(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
327327
ForkError::ForkReplica => self.format_err(StatusCode::BAD_REQUEST),
328+
ForkError::ForkNoStorage => self.format_err(StatusCode::BAD_REQUEST),
328329
}
329330
}
330331
}

libsql-server/src/lib.rs

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -835,20 +835,6 @@ where
835835
scripted_backup: Option<ScriptBackupManager>,
836836
meta_store: MetaStore,
837837
) -> anyhow::Result<(NamespaceConfigurators, MakeReplicationSvc)> {
838-
let wal_path = base_config.base_path.join("wals");
839-
let enable_libsql_wal_test = {
840-
let is_primary = self.rpc_server_config.is_some();
841-
let is_libsql_wal_test = std::env::var("LIBSQL_WAL_TEST").is_ok();
842-
is_primary && is_libsql_wal_test
843-
};
844-
let use_libsql_wal =
845-
self.use_custom_wal == Some(CustomWAL::LibsqlWal) || enable_libsql_wal_test;
846-
if !use_libsql_wal {
847-
if wal_path.try_exists()? {
848-
anyhow::bail!("database was previously setup to use libsql-wal");
849-
}
850-
}
851-
852838
#[cfg(feature = "durable-wal")]
853839
if let Some(CustomWAL::DurableWal) = self.use_custom_wal {
854840
if self.db_config.bottomless_replication.is_some() {
@@ -864,7 +850,6 @@ where
864850
task_manager,
865851
migration_scheduler_handle,
866852
scripted_backup,
867-
wal_path,
868853
meta_store,
869854
)
870855
.await
@@ -895,7 +880,6 @@ where
895880
task_manager: &mut TaskManager,
896881
migration_scheduler_handle: SchedulerHandle,
897882
scripted_backup: Option<ScriptBackupManager>,
898-
wal_path: PathBuf,
899883
meta_store: MetaStore,
900884
) -> anyhow::Result<(NamespaceConfigurators, MakeReplicationSvc)> {
901885
tracing::info!("using libsql wal");
@@ -964,7 +948,7 @@ where
964948
anyhow::bail!("replication without bottomless not supported yet");
965949
}
966950

967-
let registry = Arc::new(WalRegistry::new(wal_path, storage, sender)?);
951+
let registry = Arc::new(WalRegistry::new(storage, sender)?);
968952
let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 8);
969953
task_manager.spawn_with_shutdown_notify(|_| async move {
970954
checkpointer.run().await;
@@ -1288,15 +1272,16 @@ where
12881272
}
12891273

12901274
fn check_previous_migration_success(&self) -> anyhow::Result<bool> {
1291-
let wals_path = self.path.join("wals");
1292-
if !wals_path.try_exists()? {
1293-
return Ok(false);
1294-
}
1295-
1296-
let dir = std::fs::read_dir(&wals_path)?;
1297-
1298-
// wals dir exist and is not empty
1299-
Ok(dir.count() != 0)
1275+
todo!("not usings wal directory anymore");
1276+
// let wals_path = self.path.join("wals");
1277+
// if !wals_path.try_exists()? {
1278+
// return Ok(false);
1279+
// }
1280+
//
1281+
// let dir = std::fs::read_dir(&wals_path)?;
1282+
//
1283+
// // wals dir exist and is not empty
1284+
// Ok(dir.count() != 0)
13001285
}
13011286
}
13021287

libsql-server/src/namespace/configurator/fork.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ pub enum ForkError {
9191
ForkReplica,
9292
#[error("backup service not configured")]
9393
BackupServiceNotConfigured,
94+
#[error("cannot fork a namespace without storage")]
95+
ForkNoStorage,
9496
}
9597

9698
impl From<tokio::task::JoinError> for ForkError {

libsql-server/src/namespace/configurator/helpers.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -494,28 +494,6 @@ pub async fn cleanup_libsql(
494494
let _ = tokio::fs::remove_dir_all(ns_db_path).await;
495495
}
496496

497-
let ns_wals_path = base_path.join("wals").join(namespace.as_str());
498-
if ns_wals_path.try_exists()? {
499-
tracing::debug!("removing database directory: {}", ns_wals_path.display());
500-
if let Err(e) = tokio::fs::remove_dir_all(ns_wals_path).await {
501-
// what can go wrong?:
502-
match e.kind() {
503-
// alright, there's nothing to delete anyway
504-
std::io::ErrorKind::NotFound => (),
505-
_ => {
506-
// something unexpected happened, this namespaces is in a bad state.
507-
// The entry will not be removed from the registry to prevent another
508-
// namespace with the same name to be reuse the same wal files. a
509-
// manual intervention is necessary
510-
// FIXME: on namespace creation, we could ensure that this directory is
511-
// clean.
512-
tracing::error!("error deleting `{namespace}` wal directory, manual intervention may be necessary: {e}");
513-
return Err(e.into());
514-
}
515-
}
516-
}
517-
}
518-
519497
// when all is cleaned, leave place for next one
520498
registry.remove(&namespace).await;
521499

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use std::path::Path;
2+
use std::pin::Pin;
3+
use std::sync::Arc;
4+
5+
use chrono::{DateTime, Utc};
6+
use futures::Stream;
7+
use libsql_sys::wal::either::Either;
8+
use libsql_wal::io::StdIO;
9+
use libsql_wal::registry::WalRegistry;
10+
use libsql_wal::replication::injector::Injector;
11+
use libsql_wal::replication::storage::StorageReplicator;
12+
use libsql_wal::replication::{replicator::Replicator, storage::ReplicateFromStorage as _};
13+
use libsql_wal::segment::Frame;
14+
use libsql_wal::shared_wal::SharedWal;
15+
use libsql_wal::storage::backend::Backend as _;
16+
use tempfile::tempdir;
17+
use tokio_stream::StreamExt as _;
18+
19+
use crate::namespace::configurator::fork::ForkError;
20+
use crate::namespace::RestoreOption;
21+
use crate::{
22+
namespace::{meta_store::MetaStoreHandle, Namespace, NamespaceName, NamespaceStore},
23+
SqldStorage,
24+
};
25+
26+
pub(crate) async fn libsql_wal_fork(
27+
registry: Arc<WalRegistry<StdIO, SqldStorage>>,
28+
base_path: &Path,
29+
from_ns: &Namespace,
30+
to_ns: NamespaceName,
31+
to_config: MetaStoreHandle,
32+
timestamp: Option<DateTime<Utc>>,
33+
store: NamespaceStore,
34+
) -> crate::Result<Namespace> {
35+
let mut seen = Default::default();
36+
let storage = registry.storage();
37+
match &*storage {
38+
Either::A(s) => {
39+
let from_ns_name: libsql_sys::name::NamespaceName = from_ns.name().clone().into();
40+
let to_ns_name: libsql_sys::name::NamespaceName = to_ns.clone().into();
41+
42+
let mut stream = match timestamp {
43+
Some(ts) => {
44+
let key = s
45+
.backend()
46+
.find_segment(
47+
&s.backend().default_config(),
48+
&from_ns_name,
49+
libsql_wal::storage::backend::FindSegmentReq::Timestamp(ts),
50+
)
51+
.await
52+
.unwrap();
53+
let restore_until = key.end_frame_no;
54+
let replicator = StorageReplicator::new(storage.clone(), from_ns_name.clone());
55+
replicator.stream(&mut seen, restore_until, 1)
56+
}
57+
// find the most recent frame_no
58+
None => {
59+
let from_shared = tokio::task::spawn_blocking({
60+
let registry = registry.clone();
61+
let from_ns_name = from_ns_name.clone();
62+
let path = from_ns.path.join("data");
63+
move || registry.open(&path, &from_ns_name)
64+
})
65+
.await
66+
.unwrap()?;
67+
68+
let replicator = Replicator::new(from_shared, 1, false);
69+
Box::pin(replicator.into_frame_stream())
70+
}
71+
};
72+
73+
let tmp = tempdir()?;
74+
let to_shared = tokio::task::spawn_blocking({
75+
let registry = registry.clone();
76+
let path = tmp.path().join("data");
77+
let to_ns_name = to_ns_name.clone();
78+
move || registry.open(&path, &to_ns_name)
79+
})
80+
.await
81+
.unwrap()?;
82+
83+
// make sure that nobody can use that namespace
84+
registry.tombstone(&to_ns_name).await;
85+
let ret = try_inject(to_shared, &mut stream).await;
86+
registry.remove(&to_ns_name).await;
87+
ret?;
88+
89+
tokio::fs::rename(tmp.path(), base_path.join("dbs").join(to_ns.as_str())).await?;
90+
91+
Ok(store
92+
.make_namespace(&to_ns, to_config, RestoreOption::Latest)
93+
.await?)
94+
}
95+
Either::B(_) => Err(crate::Error::Fork(super::fork::ForkError::ForkNoStorage)),
96+
}
97+
}
98+
99+
async fn try_inject(
100+
to_shared: Arc<SharedWal<StdIO>>,
101+
stream: &mut Pin<
102+
Box<dyn Stream<Item = Result<Box<Frame>, libsql_wal::replication::Error>> + Send + '_>,
103+
>,
104+
) -> crate::Result<()> {
105+
let stream = stream.peekable();
106+
tokio::pin!(stream);
107+
let mut injector = Injector::new(to_shared.clone(), 16)?;
108+
let mut count = 0;
109+
while let Some(f) = stream.next().await {
110+
let mut frame = f.map_err(|e| ForkError::Internal(anyhow::anyhow!(e)))?;
111+
count += 1;
112+
if stream.peek().await.is_none() {
113+
frame.header_mut().set_size_after(count);
114+
}
115+
116+
injector.insert_frame(frame).await?;
117+
}
118+
119+
tokio::task::spawn_blocking({
120+
let shared = to_shared.clone();
121+
move || shared.seal_current()
122+
})
123+
.await
124+
.unwrap()?;
125+
126+
Ok(())
127+
}

libsql-server/src/namespace/configurator/libsql_primary.rs

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ use std::pin::Pin;
33
use std::sync::atomic::{AtomicBool, Ordering};
44
use std::sync::Arc;
55

6+
use chrono::NaiveDateTime;
67
use futures::prelude::Future;
78
use libsql_sys::name::NamespaceResolver;
8-
use libsql_sys::wal::either::Either;
99
use libsql_wal::io::StdIO;
1010
use libsql_wal::registry::WalRegistry;
11-
use libsql_wal::storage::backend::Backend;
1211
use libsql_wal::wal::LibsqlWalManager;
1312
use tokio::task::JoinSet;
1413

@@ -267,38 +266,21 @@ impl ConfigureNamespace for LibsqlPrimaryConfigurator {
267266
&'a self,
268267
from_ns: &'a Namespace,
269268
_from_config: MetaStoreHandle,
270-
_to_ns: NamespaceName,
271-
_to_config: MetaStoreHandle,
272-
timestamp: Option<chrono::prelude::NaiveDateTime>,
273-
_store: NamespaceStore,
269+
to_ns: NamespaceName,
270+
to_config: MetaStoreHandle,
271+
timestamp: Option<NaiveDateTime>,
272+
store: NamespaceStore,
274273
) -> Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
275-
Box::pin(async move {
276-
match self.registry.storage() {
277-
Either::A(s) => {
278-
match timestamp {
279-
Some(ts) => {
280-
let ns: libsql_sys::name::NamespaceName = from_ns.name().clone().into();
281-
let _key = s
282-
.backend()
283-
.find_segment(
284-
&s.backend().default_config(),
285-
&ns,
286-
libsql_wal::storage::backend::FindSegmentReq::Timestamp(
287-
ts.and_utc(),
288-
),
289-
)
290-
.await
291-
.unwrap();
292-
todo!()
293-
}
294-
// find the most recent frame_no
295-
None => todo!("fork from most recent"),
296-
};
297-
}
298-
Either::B(_) => {
299-
todo!("cannot fork without storage");
300-
}
301-
}
302-
})
274+
let registry = self.registry.clone();
275+
let base_path = &self.base.base_path;
276+
Box::pin(super::libsql_fork::libsql_wal_fork(
277+
registry,
278+
base_path,
279+
from_ns,
280+
to_ns,
281+
to_config,
282+
timestamp.map(|ts| ts.and_utc()),
283+
store,
284+
))
303285
}
304286
}

libsql-server/src/namespace/configurator/libsql_schema.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::path::Path;
22
use std::sync::Arc;
33

4+
use chrono::NaiveDateTime;
45
use futures::prelude::Future;
56
use libsql_sys::name::NamespaceResolver;
67
use libsql_wal::io::StdIO;
@@ -160,21 +161,22 @@ impl ConfigureNamespace for LibsqlSchemaConfigurator {
160161
fn fork<'a>(
161162
&'a self,
162163
from_ns: &'a Namespace,
163-
from_config: MetaStoreHandle,
164+
_from_config: MetaStoreHandle,
164165
to_ns: NamespaceName,
165166
to_config: MetaStoreHandle,
166-
timestamp: Option<chrono::prelude::NaiveDateTime>,
167+
timestamp: Option<NaiveDateTime>,
167168
store: NamespaceStore,
168169
) -> std::pin::Pin<Box<dyn Future<Output = crate::Result<Namespace>> + Send + 'a>> {
169-
Box::pin(super::fork::fork(
170+
let registry = self.registry.clone();
171+
let base_path = &self.base.base_path;
172+
Box::pin(super::libsql_fork::libsql_wal_fork(
173+
registry,
174+
base_path,
170175
from_ns,
171-
from_config,
172176
to_ns,
173177
to_config,
174-
timestamp,
178+
timestamp.map(|ts| ts.and_utc()),
175179
store,
176-
&self.primary_config,
177-
self.base.base_path.clone(),
178180
))
179181
}
180182
}

libsql-server/src/namespace/configurator/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use super::{
2121

2222
pub mod fork;
2323
mod helpers;
24+
mod libsql_fork;
2425
mod libsql_primary;
2526
mod libsql_replica;
2627
mod libsql_schema;

libsql-server/src/rpc/replication/libsql_replicator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ impl ReplicationLog for LibsqlReplicationService {
157157
let replicator = libsql_wal::replication::replicator::Replicator::new(
158158
shared.clone(),
159159
req.next_offset.max(1),
160+
true,
160161
);
161162

162163
let flavor = req.wal_flavor();

0 commit comments

Comments
 (0)