Skip to content

Commit 87001c0

Browse files
authored
Merge pull request #1769 from tursodatabase/libsql-wal-replication-fixes
libsql-wal replication fixed
2 parents 1c0da12 + db9d6e1 commit 87001c0

File tree

11 files changed

+311
-118
lines changed

11 files changed

+311
-118
lines changed

libsql-server/src/connection/connection_core.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use std::time::{Duration, Instant};
77
use libsql_sys::wal::{Wal, WalManager};
88
use metrics::histogram;
99
use parking_lot::Mutex;
10-
use tokio::sync::watch;
1110

1211
use crate::connection::legacy::open_conn_active_checkpoint;
1312
use crate::error::Error;
@@ -24,13 +23,15 @@ use crate::{Result, BLOCKING_RT};
2423
use super::config::DatabaseConfig;
2524
use super::program::{DescribeCol, DescribeParam, DescribeResponse, Program, Vm};
2625

26+
pub type GetCurrentFrameNo = Arc<dyn Fn() -> Option<FrameNo> + Send + Sync + 'static>;
27+
2728
/// The base connection type, shared between legacy and libsql-wal implementations
2829
pub(super) struct CoreConnection<W> {
2930
conn: libsql_sys::Connection<W>,
3031
stats: Arc<Stats>,
3132
config_store: MetaStoreHandle,
3233
builder_config: QueryBuilderConfig,
33-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
34+
get_current_frame_no: GetCurrentFrameNo,
3435
block_writes: Arc<AtomicBool>,
3536
resolve_attach_path: ResolveNamespacePathFn,
3637
forced_rollback: bool,
@@ -65,7 +66,7 @@ impl<W: Wal + Send + 'static> CoreConnection<W> {
6566
broadcaster: BroadcasterHandle,
6667
config_store: MetaStoreHandle,
6768
builder_config: QueryBuilderConfig,
68-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
69+
get_current_frame_no: GetCurrentFrameNo,
6970
block_writes: Arc<AtomicBool>,
7071
resolve_attach_path: ResolveNamespacePathFn,
7172
) -> Result<Self> {
@@ -118,13 +119,13 @@ impl<W: Wal + Send + 'static> CoreConnection<W> {
118119
stats,
119120
config_store,
120121
builder_config,
121-
current_frame_no_receiver,
122122
block_writes,
123123
resolve_attach_path,
124124
forced_rollback: false,
125125
broadcaster,
126126
hooked: false,
127127
canceled,
128+
get_current_frame_no,
128129
};
129130

130131
for ext in extensions.iter() {
@@ -265,9 +266,9 @@ impl<W: Wal + Send + 'static> CoreConnection<W> {
265266
}
266267

267268
{
268-
let mut lock = this.lock();
269+
let lock = this.lock();
269270
let is_autocommit = lock.conn.is_autocommit();
270-
let current_fno = *lock.current_frame_no_receiver.borrow_and_update();
271+
let current_fno = (lock.get_current_frame_no)();
271272
vm.builder().finish(current_fno, is_autocommit)?;
272273
}
273274

@@ -424,13 +425,13 @@ mod test {
424425
stats: Arc::new(Stats::default()),
425426
config_store: MetaStoreHandle::new_test(),
426427
builder_config: QueryBuilderConfig::default(),
427-
current_frame_no_receiver: watch::channel(None).1,
428428
block_writes: Default::default(),
429429
resolve_attach_path: Arc::new(|_| unreachable!()),
430430
forced_rollback: false,
431431
broadcaster: Default::default(),
432432
hooked: false,
433433
canceled: Arc::new(false.into()),
434+
get_current_frame_no: Arc::new(|| None),
434435
};
435436

436437
let conn = Arc::new(Mutex::new(conn));
@@ -465,7 +466,7 @@ mod test {
465466
100000000,
466467
100000000,
467468
DEFAULT_AUTO_CHECKPOINT,
468-
watch::channel(None).1,
469+
Arc::new(|| None),
469470
None,
470471
Default::default(),
471472
Arc::new(|_| unreachable!()),
@@ -511,7 +512,7 @@ mod test {
511512
100000000,
512513
100000000,
513514
DEFAULT_AUTO_CHECKPOINT,
514-
watch::channel(None).1,
515+
Arc::new(|| None),
515516
None,
516517
Default::default(),
517518
Arc::new(|_| unreachable!()),
@@ -562,7 +563,7 @@ mod test {
562563
100000000,
563564
100000000,
564565
DEFAULT_AUTO_CHECKPOINT,
565-
watch::channel(None).1,
566+
Arc::new(|| None),
566567
None,
567568
Default::default(),
568569
Arc::new(|_| unreachable!()),
@@ -645,7 +646,7 @@ mod test {
645646
100000000,
646647
100000000,
647648
DEFAULT_AUTO_CHECKPOINT,
648-
watch::channel(None).1,
649+
Arc::new(|| None),
649650
None,
650651
Default::default(),
651652
Arc::new(|_| unreachable!()),
@@ -738,7 +739,7 @@ mod test {
738739
100000000,
739740
100000000,
740741
DEFAULT_AUTO_CHECKPOINT,
741-
watch::channel(None).1,
742+
Arc::new(|| None),
742743
None,
743744
Default::default(),
744745
Arc::new(|_| unreachable!()),

libsql-server/src/connection/legacy.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use libsql_sys::EncryptionConfig;
99
use parking_lot::Mutex;
1010
use rusqlite::ffi::SQLITE_BUSY;
1111
use rusqlite::{ErrorCode, OpenFlags};
12-
use tokio::sync::watch;
1312
use tokio::time::Duration;
1413

1514
use crate::error::Error;
@@ -22,7 +21,7 @@ use crate::replication::FrameNo;
2221
use crate::stats::Stats;
2322
use crate::{record_time, Result};
2423

25-
use super::connection_core::CoreConnection;
24+
use super::connection_core::{CoreConnection, GetCurrentFrameNo};
2625

2726
use super::connection_manager::{
2827
ConnectionManager, InnerWalManager, ManagedConnectionWal, ManagedConnectionWalWrapper,
@@ -40,7 +39,7 @@ pub struct MakeLegacyConnection<W> {
4039
max_response_size: u64,
4140
max_total_response_size: u64,
4241
auto_checkpoint: u32,
43-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
42+
get_current_frame_no: GetCurrentFrameNo,
4443
connection_manager: ConnectionManager,
4544
/// return sqlite busy. To mitigate that, we hold on to one connection
4645
_db: Option<LegacyConnection<W>>,
@@ -65,7 +64,7 @@ where
6564
max_response_size: u64,
6665
max_total_response_size: u64,
6766
auto_checkpoint: u32,
68-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
67+
current_frame_no: GetCurrentFrameNo,
6968
encryption_config: Option<EncryptionConfig>,
7069
block_writes: Arc<AtomicBool>,
7170
resolve_attach_path: ResolveNamespacePathFn,
@@ -82,7 +81,7 @@ where
8281
max_response_size,
8382
max_total_response_size,
8483
auto_checkpoint,
85-
current_frame_no_receiver,
84+
get_current_frame_no: current_frame_no,
8685
_db: None,
8786
wal_wrapper,
8887
encryption_config,
@@ -142,7 +141,7 @@ where
142141
auto_checkpoint: self.auto_checkpoint,
143142
encryption_config: self.encryption_config.clone(),
144143
},
145-
self.current_frame_no_receiver.clone(),
144+
self.get_current_frame_no.clone(),
146145
self.block_writes.clone(),
147146
self.resolve_attach_path.clone(),
148147
self.connection_manager.clone(),
@@ -185,7 +184,7 @@ impl LegacyConnection<libsql_sys::wal::wrapper::PassthroughWalWrapper> {
185184
Default::default(),
186185
MetaStoreHandle::new_test(),
187186
QueryBuilderConfig::default(),
188-
tokio::sync::watch::channel(None).1,
187+
Arc::new(|| None),
189188
Default::default(),
190189
Arc::new(|_| unreachable!()),
191190
ConnectionManager::new(TXN_TIMEOUT),
@@ -321,7 +320,7 @@ where
321320
broadcaster: BroadcasterHandle,
322321
config_store: MetaStoreHandle,
323322
builder_config: QueryBuilderConfig,
324-
current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
323+
current_frame_no_receiver: GetCurrentFrameNo,
325324
block_writes: Arc<AtomicBool>,
326325
resolve_attach_path: ResolveNamespacePathFn,
327326
connection_manager: ConnectionManager,

libsql-server/src/connection/libsql.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use libsql_sys::EncryptionConfig;
66
use libsql_wal::io::StdIO;
77
use libsql_wal::wal::{LibsqlWal, LibsqlWalManager};
88
use parking_lot::Mutex;
9-
use tokio::sync::watch;
109

1110
use crate::connection::program::check_program_auth;
1211
use crate::metrics::DESCRIBE_COUNT;
@@ -19,7 +18,7 @@ use crate::stats::Stats;
1918
use crate::Result;
2019
use crate::{record_time, SqldStorage, BLOCKING_RT};
2120

22-
use super::connection_core::CoreConnection;
21+
use super::connection_core::{CoreConnection, GetCurrentFrameNo};
2322
use super::program::{check_describe_auth, DescribeResponse, Program};
2423
use super::{MakeConnection, RequestContext};
2524

@@ -36,7 +35,7 @@ pub struct MakeLibsqlConnectionInner {
3635
pub(crate) max_response_size: u64,
3736
pub(crate) max_total_response_size: u64,
3837
pub(crate) auto_checkpoint: u32,
39-
pub(crate) current_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
38+
pub(crate) get_current_frame_no: GetCurrentFrameNo,
4039
pub(crate) encryption_config: Option<EncryptionConfig>,
4140
pub(crate) block_writes: Arc<AtomicBool>,
4241
pub(crate) resolve_attach_path: ResolveNamespacePathFn,
@@ -67,7 +66,7 @@ impl MakeConnection for MakeLibsqlConnection {
6766
inner.broadcaster.clone(),
6867
inner.config_store.clone(),
6968
builder_config,
70-
inner.current_frame_no_receiver.clone(),
69+
inner.get_current_frame_no.clone(),
7170
inner.block_writes.clone(),
7271
inner.resolve_attach_path.clone(),
7372
)

libsql-server/src/connection/write_proxy.rs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
use std::pin::Pin;
12
use std::sync::Arc;
23
use std::time::Duration;
34

5+
use futures::Future;
46
use futures_core::future::BoxFuture;
57
use futures_core::Stream;
68
use libsql_replication::rpc::proxy::proxy_client::ProxyClient;
@@ -9,7 +11,7 @@ use libsql_replication::rpc::proxy::{
911
};
1012
use libsql_sys::EncryptionConfig;
1113
use parking_lot::Mutex as PMutex;
12-
use tokio::sync::{mpsc, watch, Mutex};
14+
use tokio::sync::{mpsc, Mutex};
1315
use tokio_stream::StreamExt;
1416
use tonic::transport::Channel;
1517
use tonic::{Code, Request, Streaming};
@@ -23,16 +25,21 @@ use crate::replication::FrameNo;
2325
use crate::stats::Stats;
2426
use crate::{Result, DEFAULT_AUTO_CHECKPOINT};
2527

28+
use super::connection_core::GetCurrentFrameNo;
2629
use super::program::DescribeResponse;
2730
use super::{Connection, RequestContext};
2831
use super::{MakeConnection, Program};
2932

3033
pub type RpcStream = Streaming<ExecResp>;
34+
pub type WaitForFrameNo = Arc<
35+
dyn Fn(FrameNo) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>> + Send + 'static + Sync,
36+
>;
3137

3238
pub struct MakeWriteProxyConn<M> {
3339
client: ProxyClient<Channel>,
3440
stats: Arc<Stats>,
35-
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
41+
wait_for_frame_no: WaitForFrameNo,
42+
get_current_frame_no: GetCurrentFrameNo,
3643
max_response_size: u64,
3744
max_total_response_size: u64,
3845
primary_replication_index: Option<FrameNo>,
@@ -47,23 +54,25 @@ impl<M> MakeWriteProxyConn<M> {
4754
channel: Channel,
4855
uri: tonic::transport::Uri,
4956
stats: Arc<Stats>,
50-
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
57+
wait_for_frame_no: WaitForFrameNo,
5158
max_response_size: u64,
5259
max_total_response_size: u64,
5360
primary_replication_index: Option<FrameNo>,
5461
encryption_config: Option<EncryptionConfig>,
5562
make_read_only_conn: M,
63+
get_current_frame_no: GetCurrentFrameNo,
5664
) -> Self {
5765
let client = ProxyClient::with_origin(channel, uri);
5866
Self {
5967
client,
6068
stats,
61-
applied_frame_no_receiver,
69+
wait_for_frame_no,
6270
max_response_size,
6371
max_total_response_size,
6472
make_read_only_conn,
6573
primary_replication_index,
6674
encryption_config,
75+
get_current_frame_no,
6776
}
6877
}
6978
}
@@ -78,14 +87,15 @@ where
7887
Ok(WriteProxyConnection::new(
7988
self.client.clone(),
8089
self.stats.clone(),
81-
self.applied_frame_no_receiver.clone(),
90+
self.wait_for_frame_no.clone(),
8291
QueryBuilderConfig {
8392
max_size: Some(self.max_response_size),
8493
max_total_size: Some(self.max_total_response_size),
8594
auto_checkpoint: DEFAULT_AUTO_CHECKPOINT,
8695
encryption_config: self.encryption_config.clone(),
8796
},
8897
self.primary_replication_index,
98+
self.get_current_frame_no.clone(),
8999
self.make_read_only_conn.create().await?,
90100
)?)
91101
}
@@ -100,8 +110,9 @@ pub struct WriteProxyConnection<R, C> {
100110
/// any subsequent read on this connection must wait for the replicator to catch up with this
101111
/// frame_no
102112
last_write_frame_no: PMutex<Option<FrameNo>>,
103-
/// Notifier from the repliator of the currently applied frameno
104-
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
113+
/// Notifier from the replicator of the currently applied frame_no
114+
wait_for_frame_no: WaitForFrameNo,
115+
get_current_frame_no: GetCurrentFrameNo,
105116
builder_config: QueryBuilderConfig,
106117
stats: Arc<Stats>,
107118

@@ -115,21 +126,23 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
115126
fn new(
116127
write_proxy: ProxyClient<Channel>,
117128
stats: Arc<Stats>,
118-
applied_frame_no_receiver: watch::Receiver<Option<FrameNo>>,
129+
wait_for_frame_no: WaitForFrameNo,
119130
builder_config: QueryBuilderConfig,
120131
primary_replication_index: Option<u64>,
132+
get_current_frame_no: GetCurrentFrameNo,
121133
read_conn: C,
122134
) -> Result<Self> {
123135
Ok(Self {
124136
read_conn,
125137
write_proxy,
126138
state: Mutex::new(TxnStatus::Init),
127139
last_write_frame_no: Default::default(),
128-
applied_frame_no_receiver,
140+
wait_for_frame_no,
129141
builder_config,
130142
stats,
131143
remote_conn: Default::default(),
132144
primary_replication_index,
145+
get_current_frame_no,
133146
})
134147
}
135148

@@ -200,15 +213,7 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
200213
let current_fno = replication_index.or_else(|| *self.last_write_frame_no.lock());
201214
match current_fno {
202215
Some(current_frame_no) => {
203-
let mut receiver = self.applied_frame_no_receiver.clone();
204-
receiver
205-
.wait_for(|last_applied| match last_applied {
206-
Some(x) => *x >= current_frame_no,
207-
None => true,
208-
})
209-
.await
210-
.map_err(|_| Error::ReplicatorExited)?;
211-
216+
(self.wait_for_frame_no)(current_frame_no).await;
212217
Ok(())
213218
}
214219
None => Ok(()),
@@ -220,7 +225,7 @@ impl<C: Connection> WriteProxyConnection<RpcStream, C> {
220225
fn should_proxy(&self) -> bool {
221226
// There primary has data
222227
if let Some(primary_index) = self.primary_replication_index {
223-
let last_applied = *self.applied_frame_no_receiver.borrow();
228+
let last_applied = (self.get_current_frame_no)();
224229
// if we either don't have data while the primary has, or the data we have is
225230
// anterior to that of the primary when we loaded the namespace, then proxy the
226231
// request to the primary

libsql-server/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1008,12 +1008,13 @@ where
10081008
let make_replication_svc = Box::new({
10091009
let registry = registry.clone();
10101010
let disable_namespaces = self.disable_namespaces;
1011-
move |store, user_auth, _, _, _| -> BoxReplicationService {
1011+
move |store, user_auth, _, _, service_internal| -> BoxReplicationService {
10121012
Box::new(LibsqlReplicationService::new(
10131013
registry.clone(),
10141014
store,
10151015
user_auth,
10161016
disable_namespaces,
1017+
service_internal,
10171018
))
10181019
}
10191020
});

0 commit comments

Comments
 (0)