Skip to content

Commit 2a5e204

Browse files
committed
move open logic to SharedWal
1 parent 28398ef commit 2a5e204

File tree

15 files changed

+393
-368
lines changed

15 files changed

+393
-368
lines changed

libsql-replication/src/injector/libsql_injector.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
use std::mem::size_of;
22

3-
use libsql_wal::io::StdIO;
43
use libsql_wal::replication::injector::Injector;
4+
use libsql_wal::segment::sealed::SealedSegment;
55
use libsql_wal::segment::Frame as WalFrame;
6+
use libsql_wal::{io::StdIO, storage::Storage};
67
use zerocopy::{AsBytes, FromZeroes};
78

89
use crate::frame::FrameNo;
910
use crate::rpc::replication::Frame as RpcFrame;
1011

1112
use super::error::{Error, Result};
1213

13-
pub struct LibsqlInjector {
14-
injector: Injector<StdIO>,
14+
pub struct LibsqlInjector<S> {
15+
injector: Injector<StdIO, S>,
1516
}
1617

17-
impl LibsqlInjector {
18-
pub fn new(injector: Injector<StdIO>) -> Self {
18+
impl<S> LibsqlInjector<S> {
19+
pub fn new(injector: Injector<StdIO, S>) -> Self {
1920
Self { injector }
2021
}
2122
}
2223

23-
impl super::Injector for LibsqlInjector {
24+
impl<S: Storage<Segment = SealedSegment<std::fs::File>>> super::Injector for LibsqlInjector<S> {
2425
async fn inject_frame(&mut self, frame: RpcFrame) -> Result<Option<FrameNo>> {
2526
// this is a bit annoying be we want to read the frame, and it has to be aligned, so we
2627
// must copy it...

libsql-server/src/connection/connection_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ pub type ConnId = u64;
3131
pub type InnerWalManager =
3232
Either3<Sqlite3WalManager, LibsqlWalManager<StdIO, SqldStorage>, DurableWalManager>;
3333
#[cfg(feature = "durable-wal")]
34-
pub type InnerWal = Either3<Sqlite3Wal, LibsqlWal<StdIO>, DurableWal>;
34+
pub type InnerWal = Either3<Sqlite3Wal, LibsqlWal<StdIO, SqldStorage>, DurableWal>;
3535

3636
#[cfg(not(feature = "durable-wal"))]
3737
pub type InnerWalManager = Either<Sqlite3WalManager, LibsqlWalManager<StdIO, SqldStorage>>;
3838

3939
#[cfg(not(feature = "durable-wal"))]
40-
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdIO>>;
40+
pub type InnerWal = Either<Sqlite3Wal, LibsqlWal<StdIO, SqldStorage>>;
4141
pub type ManagedConnectionWal = WrappedWal<ManagedConnectionWalWrapper, InnerWal>;
4242

4343
#[derive(Copy, Clone, Debug)]

libsql-server/src/connection/libsql.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ impl MakeConnection for MakeLibsqlConnection {
8282

8383
#[derive(Clone)]
8484
pub struct LibsqlConnection {
85-
inner: Arc<Mutex<CoreConnection<LibsqlWal<StdIO>>>>,
85+
inner: Arc<Mutex<CoreConnection<LibsqlWal<StdIO, SqldStorage>>>>,
8686
}
8787

8888
impl LibsqlConnection {

libsql-server/src/namespace/configurator/libsql_fork.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub(crate) async fn libsql_wal_fork(
9797
}
9898

9999
async fn try_inject(
100-
to_shared: Arc<SharedWal<StdIO>>,
100+
to_shared: Arc<SharedWal<StdIO, SqldStorage>>,
101101
stream: &mut Pin<
102102
Box<dyn Stream<Item = Result<Box<Frame>, libsql_wal::replication::Error>> + Send + '_>,
103103
>,

libsql-server/src/replication/replicator_client.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ use crate::metrics::{
2929
use crate::namespace::meta_store::MetaStoreHandle;
3030
use crate::namespace::{NamespaceName, NamespaceStore};
3131
use crate::replication::FrameNo;
32+
use crate::SqldStorage;
3233

3334
pub enum WalImpl {
3435
LibsqlWal {
35-
shared: Arc<SharedWal<StdIO>>,
36+
shared: Arc<SharedWal<StdIO, SqldStorage>>,
3637
},
3738
SqliteWal {
3839
meta: WalIndexMeta,
@@ -52,7 +53,7 @@ impl WalImpl {
5253
})
5354
}
5455

55-
pub fn new_libsql(shared: Arc<SharedWal<StdIO>>) -> Self {
56+
pub fn new_libsql(shared: Arc<SharedWal<StdIO, SqldStorage>>) -> Self {
5657
Self::LibsqlWal { shared }
5758
}
5859

libsql-server/src/rpc/replication/libsql_replicator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ pin_project_lite::pin_project! {
8282
#[pin]
8383
inner: S,
8484
flavor: WalFlavor,
85-
shared: Arc<SharedWal<StdIO>>,
85+
shared: Arc<SharedWal<StdIO, SqldStorage>>,
8686
}
8787
}
8888

8989
impl<S> FrameStreamAdapter<S> {
90-
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO>>) -> Self {
90+
fn new(inner: S, flavor: WalFlavor, shared: Arc<SharedWal<StdIO, SqldStorage>>) -> Self {
9191
Self {
9292
inner,
9393
flavor,

libsql-wal/benches/benchmarks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ fn prepare_for_random_reads<W: Wal>(conn: &mut Connection<W>) {
5555
}
5656
}
5757

58-
fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO>>)) {
58+
fn with_libsql_conn(f: impl FnOnce(&mut Connection<LibsqlWal<StdIO, NoStorage>>)) {
5959
let tmp = tempdir().unwrap();
6060
let resolver = |_: &Path| NamespaceName::from_string("test".into());
6161

libsql-wal/src/checkpointer.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use tokio::task::JoinSet;
88

99
use crate::io::Io;
1010
use crate::registry::WalRegistry;
11+
use crate::segment::sealed::SealedSegment;
12+
use crate::storage::Storage;
1113

1214
pub(crate) type NotifyCheckpointer = mpsc::Sender<NamespaceName>;
1315

@@ -29,7 +31,7 @@ pub type LibsqlCheckpointer<IO, S> = Checkpointer<WalRegistry<IO, S>>;
2931
impl<IO, S> LibsqlCheckpointer<IO, S>
3032
where
3133
IO: Io,
32-
S: Sync + Send + 'static,
34+
S: Storage<Segment = SealedSegment<IO::File>>,
3335
{
3436
pub fn new(
3537
registry: Arc<WalRegistry<IO, S>>,
@@ -51,6 +53,7 @@ impl<IO, S> PerformCheckpoint for WalRegistry<IO, S>
5153
where
5254
IO: Io,
5355
S: Sync + Send + 'static,
56+
S: Storage<Segment = SealedSegment<IO::File>>,
5457
{
5558
#[tracing::instrument(skip(self))]
5659
fn checkpoint(

libsql-wal/src/lib.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ pub mod test {
124124
Self { tmp, registry, wal }
125125
}
126126

127-
pub fn shared(&self, namespace: &str) -> Arc<SharedWal<IO>> {
127+
pub fn shared(&self, namespace: &str) -> Arc<SharedWal<IO, TestStorage<IO>>> {
128128
let path = self.tmp.path().join(namespace).join("data");
129129
let registry = self.registry.clone();
130130
let namespace = NamespaceName::from_string(namespace.into());
@@ -135,7 +135,10 @@ pub mod test {
135135
self.tmp.path().join(namespace)
136136
}
137137

138-
pub fn open_conn(&self, namespace: &'static str) -> libsql_sys::Connection<LibsqlWal<IO>> {
138+
pub fn open_conn(
139+
&self,
140+
namespace: &'static str,
141+
) -> libsql_sys::Connection<LibsqlWal<IO, TestStorage<IO>>> {
139142
let path = self.db_path(namespace);
140143
let wal = self.wal.clone();
141144
std::fs::create_dir_all(&path).unwrap();
@@ -159,7 +162,7 @@ pub mod test {
159162
}
160163
}
161164

162-
pub fn seal_current_segment<IO: Io>(shared: &SharedWal<IO>) {
165+
pub fn seal_current_segment<IO: Io>(shared: &SharedWal<IO, TestStorage<IO>>) {
163166
let mut tx = shared.begin_read(99999).into();
164167
shared.upgrade(&mut tx).unwrap();
165168
{
@@ -170,7 +173,7 @@ pub mod test {
170173
tx.end();
171174
}
172175

173-
pub async fn wait_current_durable<IO: Io>(shared: &SharedWal<IO>) {
176+
pub async fn wait_current_durable<IO: Io>(shared: &SharedWal<IO, TestStorage>) {
174177
let current = shared.current.load().next_frame_no().get() - 1;
175178
loop {
176179
{

0 commit comments

Comments
 (0)