Skip to content

Commit 72849a4

Browse files
committed
add fork logic for PITR and latest
1 parent bdc7d96 commit 72849a4

File tree

4 files changed

+131
-0
lines changed

4 files changed

+131
-0
lines changed

libsql-server/src/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ impl IntoResponse for &ForkError {
325325
| ForkError::BackupServiceNotConfigured
326326
| ForkError::CreateNamespace(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
327327
ForkError::ForkReplica => self.format_err(StatusCode::BAD_REQUEST),
328+
ForkError::ForkNoStorage => self.format_err(StatusCode::BAD_REQUEST),
328329
}
329330
}
330331
}

libsql-server/src/namespace/configurator/fork.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ pub enum ForkError {
9191
ForkReplica,
9292
#[error("backup service not configured")]
9393
BackupServiceNotConfigured,
94+
#[error("cannot fork a namespace without storage")]
95+
ForkNoStorage,
9496
}
9597

9698
impl From<tokio::task::JoinError> for ForkError {
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use std::path::Path;
2+
use std::pin::Pin;
3+
use std::sync::Arc;
4+
5+
use chrono::{DateTime, Utc};
6+
use futures::Stream;
7+
use libsql_sys::wal::either::Either;
8+
use libsql_wal::io::StdIO;
9+
use libsql_wal::registry::WalRegistry;
10+
use libsql_wal::replication::injector::Injector;
11+
use libsql_wal::replication::storage::StorageReplicator;
12+
use libsql_wal::replication::{replicator::Replicator, storage::ReplicateFromStorage as _};
13+
use libsql_wal::segment::Frame;
14+
use libsql_wal::shared_wal::SharedWal;
15+
use libsql_wal::storage::backend::Backend as _;
16+
use tempfile::tempdir;
17+
use tokio_stream::StreamExt as _;
18+
19+
use crate::namespace::configurator::fork::ForkError;
20+
use crate::namespace::RestoreOption;
21+
use crate::{
22+
namespace::{meta_store::MetaStoreHandle, Namespace, NamespaceName, NamespaceStore},
23+
SqldStorage,
24+
};
25+
26+
pub(crate) async fn libsql_wal_fork(
27+
registry: Arc<WalRegistry<StdIO, SqldStorage>>,
28+
base_path: &Path,
29+
from_ns: &Namespace,
30+
to_ns: NamespaceName,
31+
to_config: MetaStoreHandle,
32+
timestamp: Option<DateTime<Utc>>,
33+
store: NamespaceStore,
34+
) -> crate::Result<Namespace> {
35+
let mut seen = Default::default();
36+
let storage = registry.storage();
37+
match &*storage {
38+
Either::A(s) => {
39+
let from_ns_name: libsql_sys::name::NamespaceName = from_ns.name().clone().into();
40+
let to_ns_name: libsql_sys::name::NamespaceName = to_ns.clone().into();
41+
42+
let mut stream = match timestamp {
43+
Some(ts) => {
44+
let key = s
45+
.backend()
46+
.find_segment(
47+
&s.backend().default_config(),
48+
&from_ns_name,
49+
libsql_wal::storage::backend::FindSegmentReq::Timestamp(ts),
50+
)
51+
.await
52+
.unwrap();
53+
let restore_until = key.end_frame_no;
54+
let replicator = StorageReplicator::new(storage.clone(), from_ns_name.clone());
55+
replicator.stream(&mut seen, restore_until, 1)
56+
}
57+
// find the most recent frame_no
58+
None => {
59+
let from_shared = tokio::task::spawn_blocking({
60+
let registry = registry.clone();
61+
let from_ns_name = from_ns_name.clone();
62+
let path = from_ns.path.join("data");
63+
move || registry.open(&path, &from_ns_name)
64+
})
65+
.await
66+
.unwrap()?;
67+
68+
let replicator = Replicator::new(from_shared, 1, false);
69+
Box::pin(replicator.into_frame_stream())
70+
}
71+
};
72+
73+
let tmp = tempdir()?;
74+
let to_shared = tokio::task::spawn_blocking({
75+
let registry = registry.clone();
76+
let path = tmp.path().join("data");
77+
let to_ns_name = to_ns_name.clone();
78+
move || registry.open(&path, &to_ns_name)
79+
})
80+
.await
81+
.unwrap()?;
82+
83+
// make sure that nobody can use that namespace
84+
registry.tombstone(&to_ns_name).await;
85+
let ret = try_inject(to_shared, &mut stream).await;
86+
registry.remove(&to_ns_name).await;
87+
ret?;
88+
89+
tokio::fs::rename(tmp.path(), base_path.join("dbs").join(to_ns.as_str())).await?;
90+
91+
Ok(store
92+
.make_namespace(&to_ns, to_config, RestoreOption::Latest)
93+
.await?)
94+
}
95+
Either::B(_) => Err(crate::Error::Fork(super::fork::ForkError::ForkNoStorage)),
96+
}
97+
}
98+
99+
async fn try_inject(
100+
to_shared: Arc<SharedWal<StdIO>>,
101+
stream: &mut Pin<
102+
Box<dyn Stream<Item = Result<Box<Frame>, libsql_wal::replication::Error>> + Send + '_>,
103+
>,
104+
) -> crate::Result<()> {
105+
let stream = stream.peekable();
106+
tokio::pin!(stream);
107+
let mut injector = Injector::new(to_shared.clone(), 16)?;
108+
let mut count = 0;
109+
while let Some(f) = stream.next().await {
110+
let mut frame = f.map_err(|e| ForkError::Internal(anyhow::anyhow!(e)))?;
111+
count += 1;
112+
if stream.peek().await.is_none() {
113+
frame.header_mut().set_size_after(count);
114+
}
115+
116+
injector.insert_frame(frame).await?;
117+
}
118+
119+
tokio::task::spawn_blocking({
120+
let shared = to_shared.clone();
121+
move || shared.seal_current()
122+
})
123+
.await
124+
.unwrap()?;
125+
126+
Ok(())
127+
}

libsql-server/src/namespace/configurator/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use super::{
2020
};
2121

2222
pub mod fork;
23+
mod libsql_fork;
2324
mod helpers;
2425
mod libsql_primary;
2526
mod libsql_replica;

0 commit comments

Comments
 (0)