Skip to content

Commit db9d6e1

Browse files
committed
libsql-wal replication fixed
1 parent 0916848 commit db9d6e1

File tree

11 files changed

+311
-118
lines changed

11 files changed

+311
-118
lines changed

libsql-server/src/connection/connection_core.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::time::{Duration, Instant};
77
use libsql_sys::wal::{Wal, WalManager};
88
use metrics::histogram;
99
use parking_lot::Mutex;
10-
use tokio::sync::watch;
1110

1211
use crate::connection::legacy::open_conn_active_checkpoint;
1312
use crate::error::Error;
@@ -24,13 +23,15 @@ use crate::{Result, BLOCKING_RT};
2423
use super::config::DatabaseConfig;
2524
use super::program::{DescribeCol, DescribeParam, DescribeResponse, Program, Vm};
2625

26+
pub type GetCurrentFrameNo = Arc<dyn Fn() -> Option<FrameNo> + Send + Sync + 'static>;
27+
2728
/// The base connection type, shared between legacy and libsql-wal implementations
2829
pub(super) struct CoreConnection<W> {
2930
conn: libsql_sys::Connection<W>,
3031
stats: Arc<Stats>,
3132
config_store: MetaStoreHandle,
3233
builder_config: QueryBuilderConfig,
33-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
34+
get_current_frame_no: GetCurrentFrameNo,
3435
block_writes: Arc<AtomicBool>,
3536
resolve_attach_path: ResolveNamespacePathFn,
3637
forced_rollback: bool,
@@ -65,7 +66,7 @@ impl<W: Wal + Send + 'static> CoreConnection<W> {
6566
broadcaster: BroadcasterHandle,
6667
config_store: MetaStoreHandle,
6768
builder_config: QueryBuilderConfig,
68-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
69+
get_current_frame_no: GetCurrentFrameNo,
6970
block_writes: Arc<AtomicBool>,
7071
resolve_attach_path: ResolveNamespacePathFn,
7172
) -> Result<Self> {
@@ -118,13 +119,13 @@ impl<W: Wal + Send + 'static> CoreConnection<W> {
118119
stats,
119120
config_store,
120121
builder_config,
121-
current_frame_no_receiver,
122122
block_writes,
123123
resolve_attach_path,
124124
forced_rollback: false,
125125
broadcaster,
126126
hooked: false,
127127
canceled,
128+
get_current_frame_no,
128129
};
129130

130131
for ext in extensions.iter() {
@@ -265,9 +266,9 @@ impl<W: Wal + Send + 'static> CoreConnection<W> {
265266
}
266267

267268
{
268-
let mut lock = this.lock();
269+
let lock = this.lock();
269270
let is_autocommit = lock.conn.is_autocommit();
270-
let current_fno = *lock.current_frame_no_receiver.borrow_and_update();
271+
let current_fno = (lock.get_current_frame_no)();
271272
vm.builder().finish(current_fno, is_autocommit)?;
272273
}
273274

@@ -424,13 +425,13 @@ mod test {
424425
stats: Arc::new(Stats::default()),
425426
config_store: MetaStoreHandle::new_test(),
426427
builder_config: QueryBuilderConfig::default(),
427-
current_frame_no_receiver: watch::channel(None).1,
428428
block_writes: Default::default(),
429429
resolve_attach_path: Arc::new(|_| unreachable!()),
430430
forced_rollback: false,
431431
broadcaster: Default::default(),
432432
hooked: false,
433433
canceled: Arc::new(false.into()),
434+
get_current_frame_no: Arc::new(|| None),
434435
};
435436

436437
let conn = Arc::new(Mutex::new(conn));
@@ -465,7 +466,7 @@ mod test {
465466
100000000,
466467
100000000,
467468
DEFAULT_AUTO_CHECKPOINT,
468-
watch::channel(None).1,
469+
Arc::new(|| None),
469470
None,
470471
Default::default(),
471472
Arc::new(|_| unreachable!()),
@@ -511,7 +512,7 @@ mod test {
511512
100000000,
512513
100000000,
513514
DEFAULT_AUTO_CHECKPOINT,
514-
watch::channel(None).1,
515+
Arc::new(|| None),
515516
None,
516517
Default::default(),
517518
Arc::new(|_| unreachable!()),
@@ -562,7 +563,7 @@ mod test {
562563
100000000,
563564
100000000,
564565
DEFAULT_AUTO_CHECKPOINT,
565-
watch::channel(None).1,
566+
Arc::new(|| None),
566567
None,
567568
Default::default(),
568569
Arc::new(|_| unreachable!()),
@@ -645,7 +646,7 @@ mod test {
645646
100000000,
646647
100000000,
647648
DEFAULT_AUTO_CHECKPOINT,
648-
watch::channel(None).1,
649+
Arc::new(|| None),
649650
None,
650651
Default::default(),
651652
Arc::new(|_| unreachable!()),
@@ -738,7 +739,7 @@ mod test {
738739
100000000,
739740
100000000,
740741
DEFAULT_AUTO_CHECKPOINT,
741-
watch::channel(None).1,
742+
Arc::new(|| None),
742743
None,
743744
Default::default(),
744745
Arc::new(|_| unreachable!()),

libsql-server/src/connection/legacy.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use libsql_sys::EncryptionConfig;
99
use parking_lot::Mutex;
1010
use rusqlite::ffi::SQLITE_BUSY;
1111
use rusqlite::{ErrorCode, OpenFlags};
12-
use tokio::sync::watch;
1312
use tokio::time::Duration;
1413

1514
use crate::error::Error;
@@ -22,7 +21,7 @@ use crate::replication::FrameNo;
2221
use crate::stats::Stats;
2322
use crate::{record_time, Result};
2423

25-
use super::connection_core::CoreConnection;
24+
use super::connection_core::{CoreConnection, GetCurrentFrameNo};
2625

2726
use super::connection_manager::{
2827
ConnectionManager, InnerWalManager, ManagedConnectionWal, ManagedConnectionWalWrapper,
@@ -40,7 +39,7 @@ pub struct MakeLegacyConnection<W> {
4039
max_response_size: u64,
4140
max_total_response_size: u64,
4241
auto_checkpoint: u32,
43-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
42+
get_current_frame_no: GetCurrentFrameNo,
4443
connection_manager: ConnectionManager,
4544
/// return sqlite busy. To mitigate that, we hold on to one connection
4645
_db: Option<LegacyConnection<W>>,
@@ -65,7 +64,7 @@ where
6564
max_response_size: u64,
6665
max_total_response_size: u64,
6766
auto_checkpoint: u32,
68-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
67+
current_frame_no: GetCurrentFrameNo,
6968
encryption_config: Option<EncryptionConfig>,
7069
block_writes: Arc<AtomicBool>,
7170
resolve_attach_path: ResolveNamespacePathFn,
@@ -82,7 +81,7 @@ where
8281
max_response_size,
8382
max_total_response_size,
8483
auto_checkpoint,
85-
current_frame_no_receiver,
84+
get_current_frame_no: current_frame_no,
8685
_db: None,
8786
wal_wrapper,
8887
encryption_config,
@@ -142,7 +141,7 @@ where
142141
auto_checkpoint: self.auto_checkpoint,
143142
encryption_config: self.encryption_config.clone(),
144143
},
145-
self.current_frame_no_receiver.clone(),
144+
self.get_current_frame_no.clone(),
146145
self.block_writes.clone(),
147146
self.resolve_attach_path.clone(),
148147
self.connection_manager.clone(),
@@ -185,7 +184,7 @@ impl LegacyConnection<libsql_sys::wal::wrapper::PassthroughWalWrapper> {
185184
Default::default(),
186185
MetaStoreHandle::new_test(),
187186
QueryBuilderConfig::default(),
188-
tokio::sync::watch::channel(None).1,
187+
Arc::new(|| None),
189188
Default::default(),
190189
Arc::new(|_| unreachable!()),
191190
ConnectionManager::new(TXN_TIMEOUT),
@@ -321,7 +320,7 @@ where
321320
broadcaster: BroadcasterHandle,
322321
config_store: MetaStoreHandle,
323322
builder_config: QueryBuilderConfig,
324-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
323+
current_frame_no_receiver: GetCurrentFrameNo,
325324
block_writes: Arc<AtomicBool>,
326325
resolve_attach_path: ResolveNamespacePathFn,
327326
connection_manager: ConnectionManager,

libsql-server/src/connection/libsql.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use libsql_sys::EncryptionConfig;
66
use libsql_wal::io::StdIO;
77
use libsql_wal::wal::{LibsqlWal, LibsqlWalManager};
88
use parking_lot::Mutex;
9-
use tokio::sync::watch;
109

1110
use crate::connection::program::check_program_auth;
1211
use crate::metrics::DESCRIBE_COUNT;
@@ -19,7 +18,7 @@ use crate::stats::Stats;
1918
use crate::Result;
2019
use crate::{record_time, SqldStorage, BLOCKING_RT};
2120

22-
use super::connection_core::CoreConnection;
21+
use super::connection_core::{CoreConnection, GetCurrentFrameNo};
2322
use super::program::{check_describe_auth, DescribeResponse, Program};
2423
use super::{MakeConnection, RequestContext};
2524

@@ -36,7 +35,7 @@ pub struct MakeLibsqlConnectionInner {
3635
pub(crate) max_response_size: u64,
3736
pub(crate) max_total_response_size: u64,
3837
pub(crate) auto_checkpoint: u32,
39-
pub(crate) current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
38+
pub(crate) get_current_frame_no: GetCurrentFrameNo,
4039
pub(crate) encryption_config: Option<EncryptionConfig>,
4140
pub(crate) block_writes: Arc<AtomicBool>,
4241
pub(crate) resolve_attach_path: ResolveNamespacePathFn,
@@ -67,7 +66,7 @@ impl MakeConnection for MakeLibsqlConnection {
6766
inner.broadcaster.clone(),
6867
inner.config_store.clone(),
6968
builder_config,
70-
inner.current_frame_no_receiver.clone(),
69+
inner.get_current_frame_no.clone(),
7170
inner.block_writes.clone(),
7271
inner.resolve_attach_path.clone(),
7372
)

libsql-server/src/connection/write_proxy.rs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
use std::pin::Pin;
12
use std::sync::Arc;
23

4+
use futures::Future;
35
use futures_core::future::BoxFuture;
46
use futures_core::Stream;
57
use libsql_replication::rpc::proxy::proxy_client::ProxyClient;
@@ -8,7 +10,7 @@ use libsql_replication::rpc::proxy::{
810
};
911
use libsql_sys::EncryptionConfig;
1012
use parking_lot::Mutex as PMutex;
11-
use tokio::sync::{mpsc, watch, Mutex};
13+
use tokio::sync::{mpsc, Mutex};
1214
use tokio_stream::StreamExt;
1315
use tonic::transport::Channel;
1416
use tonic::{Request, Streaming};
@@ -22,16 +24,21 @@ use crate::replication::FrameNo;
2224
use crate::stats::Stats;
2325
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};
2426

27+
use super::connection_core::GetCurrentFrameNo;
2528
use super::program::DescribeResponse;
2629
use super::{Connection, RequestContext};
2730
use super::{MakeConnection, Program};
2831

2932
pub type RpcStream = Streaming<ExecResp>;
33+
pub type WaitForFrameNo = Arc<
34+
dyn Fn(FrameNo) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,
35+
>;
3036

3137
pub struct MakeWriteProxyConn<M> {
3238
client: ProxyClient<Channel>,
3339
stats: Arc<Stats>,
34-
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
40+
wait_for_frame_no: WaitForFrameNo,
41+
get_current_frame_no: GetCurrentFrameNo,
3542
max_response_size: u64,
3643
max_total_response_size: u64,
3744
primary_replication_index: Option<FrameNo>,
@@ -46,23 +53,25 @@ impl<M> MakeWriteProxyConn<M> {
4653
channel: Channel,
4754
uri: tonic::transport::Uri,
4855
stats: Arc<Stats>,
49-
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
56+
wait_for_frame_no: WaitForFrameNo,
5057
max_response_size: u64,
5158
max_total_response_size: u64,
5259
primary_replication_index: Option<FrameNo>,
5360
encryption_config: Option<EncryptionConfig>,
5461
make_read_only_conn: M,
62+
get_current_frame_no: GetCurrentFrameNo,
5563
) -> Self {
5664
let client = ProxyClient::with_origin(channel, uri);
5765
Self {
5866
client,
5967
stats,
60-
applied_frame_no_receiver,
68+
wait_for_frame_no,
6169
max_response_size,
6270
max_total_response_size,
6371
make_read_only_conn,
6472
primary_replication_index,
6573
encryption_config,
74+
get_current_frame_no,
6675
}
6776
}
6877
}
@@ -77,14 +86,15 @@ where
7786
Ok(WriteProxyConnection::new(
7887
self.client.clone(),
7988
self.stats.clone(),
80-
self.applied_frame_no_receiver.clone(),
89+
self.wait_for_frame_no.clone(),
8190
QueryBuilderConfig {
8291
max_size: Some(self.max_response_size),
8392
max_total_size: Some(self.max_total_response_size),
8493
auto_checkpoint: DEFAULT_AUTO_CHECKPOINT,
8594
encryption_config: self.encryption_config.clone(),
8695
},
8796
self.primary_replication_index,
97+
self.get_current_frame_no.clone(),
8898
self.make_read_only_conn.create().await?,
8999
)?)
90100
}
@@ -99,8 +109,9 @@ pub struct WriteProxyConnection<R, C> {
99109
/// any subsequent read on this connection must wait for the replicator to catch up with this
100110
/// frame_no
101111
last_write_frame_no: PMutex<Option<FrameNo>>,
102-
/// Notifier from the repliator of the currently applied frameno
103-
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
112+
/// Notifier from the replicator of the currently applied frame_no
113+
wait_for_frame_no: WaitForFrameNo,
114+
get_current_frame_no: GetCurrentFrameNo,
104115
builder_config: QueryBuilderConfig,
105116
stats: Arc<Stats>,
106117

@@ -114,21 +125,23 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
114125
fn new(
115126
write_proxy: ProxyClient<Channel>,
116127
stats: Arc<Stats>,
117-
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
128+
wait_for_frame_no: WaitForFrameNo,
118129
builder_config: QueryBuilderConfig,
119130
primary_replication_index: Option<u64>,
131+
get_current_frame_no: GetCurrentFrameNo,
120132
read_conn: C,
121133
) -> Result<Self> {
122134
Ok(Self {
123135
read_conn,
124136
write_proxy,
125137
state: Mutex::new(TxnStatus::Init),
126138
last_write_frame_no: Default::default(),
127-
applied_frame_no_receiver,
139+
wait_for_frame_no,
128140
builder_config,
129141
stats,
130142
remote_conn: Default::default(),
131143
primary_replication_index,
144+
get_current_frame_no,
132145
})
133146
}
134147

@@ -199,15 +212,7 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
199212
let current_fno = replication_index.or_else(|| *self.last_write_frame_no.lock());
200213
match current_fno {
201214
Some(current_frame_no) => {
202-
let mut receiver = self.applied_frame_no_receiver.clone();
203-
receiver
204-
.wait_for(|last_applied| match last_applied {
205-
Some(x) => *x >= current_frame_no,
206-
None => true,
207-
})
208-
.await
209-
.map_err(|_| Error::ReplicatorExited)?;
210-
215+
(self.wait_for_frame_no)(current_frame_no).await;
211216
Ok(())
212217
}
213218
None => Ok(()),
@@ -219,7 +224,7 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
219224
fn should_proxy(&self) -> bool {
220225
// There primary has data
221226
if let Some(primary_index) = self.primary_replication_index {
222-
let last_applied = *self.applied_frame_no_receiver.borrow();
227+
let last_applied = (self.get_current_frame_no)();
223228
// if we either don't have data while the primary has, or the data we have is
224229
// anterior to that of the primary when we loaded the namespace, then proxy the
225230
// request to the primary

libsql-server/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1008,12 +1008,13 @@ where
10081008
let make_replication_svc = Box::new({
10091009
let registry = registry.clone();
10101010
let disable_namespaces = self.disable_namespaces;
1011-
move |store, user_auth, _, _, _| -> BoxReplicationService {
1011+
move |store, user_auth, _, _, service_internal| -> BoxReplicationService {
10121012
Box::new(LibsqlReplicationService::new(
10131013
registry.clone(),
10141014
store,
10151015
user_auth,
10161016
disable_namespaces,
1017+
service_internal,
10171018
))
10181019
}
10191020
});

0 commit comments

Comments
 (0)