Skip to content

Commit 7f731b8

Browse files
authored
sealed segment replicator (#1518)
* segment list stream frames * current segment replicator refactor * impl Replicator::stream_frames * test snapshots * fmt
1 parent b17b6ee commit 7f731b8

12 files changed

+721
-132
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsql-wal/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ libsql-sys = { path = "../libsql-sys", features = ["rusqlite"] }
2020
nix = { version = "0.28.0", features = ["uio", "fs"] }
2121
parking_lot = { version = "0.12.3", features = ["arc_lock"] }
2222
priority-queue = "2.0.2"
23+
roaring = "0.10.5"
2324
tempfile = "3.10.1"
2425
thiserror = "1.0.58"
2526
tokio = { version = "1", features = ["full"] }

libsql-wal/src/lib.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,69 @@ pub mod segment;
99
pub mod shared_wal;
1010
pub mod transaction;
1111
pub mod wal;
12+
13+
#[cfg(test)]
14+
pub(crate) mod test {
15+
use std::{path::Path, sync::Arc};
16+
17+
use libsql_sys::{name::NamespaceName, rusqlite::OpenFlags};
18+
use tempfile::{tempdir, TempDir};
19+
20+
use crate::{
21+
io::StdIO,
22+
registry::WalRegistry,
23+
shared_wal::SharedWal,
24+
wal::{LibsqlWal, LibsqlWalManager},
25+
};
26+
27+
pub struct TestEnv {
28+
pub tmp: TempDir,
29+
pub registry: Arc<WalRegistry<StdIO>>,
30+
pub wal: LibsqlWalManager<StdIO>,
31+
}
32+
33+
impl TestEnv {
34+
pub fn new() -> Self {
35+
let tmp = tempdir().unwrap();
36+
let resolver = |path: &Path| {
37+
let name = path.file_name().unwrap().to_str().unwrap();
38+
NamespaceName::from_string(name.to_string())
39+
};
40+
41+
let registry =
42+
Arc::new(WalRegistry::new(tmp.path().join("test/wals"), resolver, ()).unwrap());
43+
let wal = LibsqlWalManager::new(registry.clone());
44+
45+
Self { tmp, registry, wal }
46+
}
47+
48+
pub fn shared(&self, namespace: &str) -> Arc<SharedWal<StdIO>> {
49+
let path = self.tmp.path().join(namespace).join("data");
50+
self.registry.clone().open(path.as_ref()).unwrap()
51+
}
52+
53+
pub fn open_conn(&self, namespace: &str) -> libsql_sys::Connection<LibsqlWal<StdIO>> {
54+
let path = self.tmp.path().join(namespace);
55+
std::fs::create_dir_all(&path).unwrap();
56+
libsql_sys::Connection::open(
57+
path.join("data"),
58+
OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,
59+
self.wal.clone(),
60+
100000,
61+
None,
62+
)
63+
.unwrap()
64+
}
65+
}
66+
67+
pub fn seal_current_segment(shared: &SharedWal<StdIO>) {
68+
let mut tx = shared.begin_read(99999).into();
69+
shared.upgrade(&mut tx).unwrap();
70+
{
71+
let mut guard = tx.as_write_mut().unwrap().lock();
72+
guard.commit();
73+
shared.swap_current(&mut guard).unwrap();
74+
}
75+
tx.end();
76+
}
77+
}

libsql-wal/src/registry.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ enum Slot<FS: Io> {
3737
}
3838

3939
/// Wal Registry maintains a set of shared Wal, and their respective set of files.
40-
pub struct WalRegistry<FS: Io> {
41-
fs: FS,
40+
pub struct WalRegistry<IO: Io> {
41+
fs: IO,
4242
path: PathBuf,
4343
shutdown: AtomicBool,
44-
opened: RwLock<HashMap<NamespaceName, Slot<FS>>>,
44+
opened: RwLock<HashMap<NamespaceName, Slot<IO>>>,
4545
resolver: Box<dyn NamespaceResolver + Send + Sync + 'static>,
46-
swap_handler: Box<dyn SegmentSwapHandler<FS::File>>,
46+
swap_handler: Box<dyn SegmentSwapHandler<IO::File>>,
4747
}
4848

4949
impl WalRegistry<StdIO> {

0 commit comments

Comments
 (0)