Skip to content

Commit 7be3b5a

Browse files
committed
feat: initial pass on remote writes for offline databases
1 parent 09715e4 commit 7be3b5a

File tree

13 files changed

+467
-164
lines changed

13 files changed

+467
-164
lines changed

libsql/src/database.rs

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ pub use builder::Builder;
88
pub use libsql_sys::{Cipher, EncryptionConfig};
99

1010
use crate::{Connection, Result};
11-
use std::fmt;
12-
use std::sync::atomic::AtomicU64;
13-
use std::sync::Arc;
11+
use std::{
12+
fmt,
13+
sync::{atomic::AtomicU64, Arc},
14+
};
15+
use tokio::sync::Mutex;
1416

1517
cfg_core! {
1618
bitflags::bitflags! {
@@ -82,7 +84,14 @@ enum DbType {
8284
encryption_config: Option<EncryptionConfig>,
8385
},
8486
#[cfg(feature = "sync")]
85-
Offline { db: crate::local::Database },
87+
Offline {
88+
db: crate::local::Database,
89+
remote_writes: bool,
90+
read_your_writes: bool,
91+
url: String,
92+
auth_token: String,
93+
connector: crate::util::ConnectorService,
94+
},
8695
#[cfg(feature = "remote")]
8796
Remote {
8897
url: String,
@@ -375,7 +384,7 @@ cfg_replication! {
375384
#[cfg(feature = "replication")]
376385
DbType::Sync { db, encryption_config: _ } => db.sync().await,
377386
#[cfg(feature = "sync")]
378-
DbType::Offline { db } => db.sync_offline().await,
387+
DbType::Offline { db, .. } => db.sync_offline().await,
379388
_ => Err(Error::SyncNotSupported(format!("{:?}", self.db_type))),
380389
}
381390
}
@@ -542,7 +551,7 @@ impl Database {
542551

543552
let conn = db.connect()?;
544553

545-
let conn = std::sync::Arc::new(LibsqlConnection { conn });
554+
let conn = Arc::new(LibsqlConnection { conn });
546555

547556
Ok(Connection { conn })
548557
}
@@ -590,7 +599,7 @@ impl Database {
590599
}
591600
}
592601

593-
let conn = std::sync::Arc::new(LibsqlConnection { conn });
602+
let conn = Arc::new(LibsqlConnection { conn });
594603

595604
Ok(Connection { conn })
596605
}
@@ -636,19 +645,47 @@ impl Database {
636645
writer,
637646
self.max_write_replication_index.clone(),
638647
);
639-
let conn = std::sync::Arc::new(remote);
648+
let conn = Arc::new(remote);
640649

641650
Ok(Connection { conn })
642651
}
643652

644653
#[cfg(feature = "sync")]
645-
DbType::Offline { db } => {
646-
use crate::local::impls::LibsqlConnection;
647-
648-
let conn = db.connect()?;
654+
DbType::Offline {
655+
db,
656+
remote_writes,
657+
read_your_writes,
658+
url,
659+
auth_token,
660+
connector,
661+
} => {
662+
use crate::{
663+
hrana::{connection::HttpConnection, hyper::HttpSender},
664+
local::impls::LibsqlConnection,
665+
replication::connection::State,
666+
sync::connection::SyncedConnection,
667+
};
649668

650-
let conn = std::sync::Arc::new(LibsqlConnection { conn });
669+
let local = db.connect()?;
670+
671+
if *remote_writes {
672+
let synced = SyncedConnection {
673+
local,
674+
remote: HttpConnection::new(
675+
url.clone(),
676+
auth_token.clone(),
677+
HttpSender::new(connector.clone(), None),
678+
),
679+
read_your_writes: *read_your_writes,
680+
context: db.sync_ctx.clone().unwrap(),
681+
state: Arc::new(Mutex::new(State::Init)),
682+
};
683+
684+
let conn = Arc::new(synced);
685+
return Ok(Connection { conn });
686+
}
651687

688+
let conn = Arc::new(LibsqlConnection { conn: local });
652689
Ok(Connection { conn })
653690
}
654691

@@ -659,7 +696,7 @@ impl Database {
659696
connector,
660697
version,
661698
} => {
662-
let conn = std::sync::Arc::new(
699+
let conn = Arc::new(
663700
crate::hrana::connection::HttpConnection::new_with_connector(
664701
url,
665702
auth_token,

libsql/src/database/builder.rs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ impl Builder<()> {
102102
connector: None,
103103
version: None,
104104
},
105-
connector:None,
105+
connector: None,
106+
read_your_writes: true,
107+
remote_writes: false,
106108
},
107109
}
108110
}
@@ -463,6 +465,8 @@ cfg_sync! {
463465
flags: crate::OpenFlags,
464466
remote: Remote,
465467
connector: Option<crate::util::ConnectorService>,
468+
remote_writes: bool,
469+
read_your_writes: bool,
466470
}
467471

468472
impl Builder<SyncedDatabase> {
@@ -472,6 +476,16 @@ cfg_sync! {
472476
self
473477
}
474478

479+
pub fn read_your_writes(mut self, v: bool) -> Builder<SyncedDatabase> {
480+
self.inner.read_your_writes = v;
481+
self
482+
}
483+
484+
pub fn remote_writes(mut self, v: bool) -> Builder<SyncedDatabase> {
485+
self.inner.remote_writes = v;
486+
self
487+
}
488+
475489
/// Provide a custom http connector that will be used to create http connections.
476490
pub fn connector<C>(mut self, connector: C) -> Builder<SyncedDatabase>
477491
where
@@ -497,6 +511,8 @@ cfg_sync! {
497511
version: _,
498512
},
499513
connector,
514+
remote_writes,
515+
read_your_writes,
500516
} = self.inner;
501517

502518
let path = path.to_str().ok_or(crate::Error::InvalidUTF8Path)?.to_owned();
@@ -515,16 +531,23 @@ cfg_sync! {
515531
let connector = crate::util::ConnectorService::new(svc);
516532

517533
let db = crate::local::Database::open_local_with_offline_writes(
518-
connector,
534+
connector.clone(),
519535
path,
520536
flags,
521-
url,
522-
auth_token,
537+
url.clone(),
538+
auth_token.clone(),
523539
)
524540
.await?;
525541

526542
Ok(Database {
527-
db_type: DbType::Offline { db },
543+
db_type: DbType::Offline {
544+
db,
545+
remote_writes,
546+
read_your_writes,
547+
url,
548+
auth_token,
549+
connector,
550+
},
528551
max_write_replication_index: Default::default(),
529552
})
530553
}

libsql/src/hrana/hyper.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -305,14 +305,17 @@ impl Conn for HranaStream<HttpSender> {
305305
let parse = crate::parser::Statement::parse(sql);
306306
for s in parse {
307307
let s = s?;
308-
if s.kind == crate::parser::StmtKind::TxnBegin
309-
|| s.kind == crate::parser::StmtKind::TxnBeginReadOnly
310-
|| s.kind == crate::parser::StmtKind::TxnEnd
311-
{
308+
309+
use crate::parser::StmtKind;
310+
if matches!(
311+
s.kind,
312+
StmtKind::TxnBegin | StmtKind::TxnBeginReadOnly | StmtKind::TxnEnd
313+
) {
312314
return Err(Error::TransactionalBatchError(
313315
"Transactions forbidden inside transactional batch".to_string(),
314316
));
315317
}
318+
316319
stmts.push(Stmt::new(s.stmt, false));
317320
}
318321
let res = self

libsql/src/hrana/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
pub mod connection;
44

55
cfg_remote! {
6-
mod hyper;
6+
pub mod hyper;
77
}
88

99
mod cursor;

0 commit comments

Comments
 (0)