Skip to content

Commit 19a91a6

Browse files
authored
Merge pull request #1592 from tursodatabase/lucio/improved-embedded-replica-return
libsql: provide more return info for sync
2 parents d6a380c + 8c7a9bf commit 19a91a6

File tree

6 files changed

+258
-36
lines changed

6 files changed

+258
-36
lines changed

libsql-replication/src/replicator.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ pub struct Replicator<C> {
141141
client: C,
142142
injector: Arc<Mutex<Injector>>,
143143
state: ReplicatorState,
144+
frames_synced: usize,
144145
}
145146

146147
const INJECTOR_BUFFER_CAPACITY: usize = 10;
@@ -178,6 +179,7 @@ impl<C: ReplicatorClient> Replicator<C> {
178179
client,
179180
injector: Arc::new(Mutex::new(injector)),
180181
state: ReplicatorState::NeedHandshake,
182+
frames_synced: 0,
181183
})
182184
}
183185

@@ -311,6 +313,8 @@ impl<C: ReplicatorClient> Replicator<C> {
311313
}
312314

313315
async fn inject_frame(&mut self, frame: Frame) -> Result<(), Error> {
316+
self.frames_synced += 1;
317+
314318
let injector = self.injector.clone();
315319
match spawn_blocking(move || injector.lock().inject_frame(frame)).await? {
316320
Ok(Some(commit_fno)) => {
@@ -335,6 +339,10 @@ impl<C: ReplicatorClient> Replicator<C> {
335339

336340
Ok(())
337341
}
342+
343+
pub fn frames_synced(&self) -> usize {
344+
self.frames_synced
345+
}
338346
}
339347

340348
/// Helper function to convert rpc frames results to replicator frames

libsql-server/tests/embedded_replica/mod.rs

Lines changed: 145 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,15 @@ fn embedded_replica() {
9898
)
9999
.await?;
100100

101-
let n = db.sync().await?;
101+
let n = db.sync().await?.frame_no();
102102
assert_eq!(n, None);
103103

104104
let conn = db.connect()?;
105105

106106
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
107107
.await?;
108108

109-
let n = db.sync().await?;
109+
let n = db.sync().await?.frame_no();
110110
assert_eq!(n, Some(1));
111111

112112
let err = conn
@@ -171,15 +171,15 @@ fn execute_batch() {
171171
)
172172
.await?;
173173

174-
let n = db.sync().await?;
174+
let n = db.sync().await?.frame_no();
175175
assert_eq!(n, None);
176176

177177
let conn = db.connect()?;
178178

179179
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
180180
.await?;
181181

182-
let n = db.sync().await?;
182+
let n = db.sync().await?.frame_no();
183183
assert_eq!(n, Some(1));
184184

185185
conn.execute_batch(
@@ -224,15 +224,15 @@ fn stream() {
224224
)
225225
.await?;
226226

227-
let n = db.sync().await?;
227+
let n = db.sync().await?.frame_no();
228228
assert_eq!(n, None);
229229

230230
let conn = db.connect()?;
231231

232232
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
233233
.await?;
234234

235-
let n = db.sync().await?;
235+
let n = db.sync().await?.frame_no();
236236
assert_eq!(n, Some(1));
237237

238238
conn.execute_batch(
@@ -299,15 +299,15 @@ fn embedded_replica_with_encryption() {
299299
)
300300
.await?;
301301

302-
let n = db.sync().await?;
302+
let n = db.sync().await?.frame_no();
303303
assert_eq!(n, None);
304304

305305
let conn = db.connect()?;
306306

307307
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
308308
.await?;
309309

310-
let n = db.sync().await?;
310+
let n = db.sync().await?.frame_no();
311311
assert_eq!(n, Some(1));
312312

313313
let err = conn
@@ -461,7 +461,7 @@ fn replica_primary_reset() {
461461
)
462462
.await
463463
.unwrap();
464-
let replica_index = replica.sync().await.unwrap().unwrap();
464+
let replica_index = replica.sync().await.unwrap().frame_no().unwrap();
465465
let primary_index = Client::new()
466466
.get("http://primary:9090/v1/namespaces/default/stats")
467467
.await
@@ -520,7 +520,7 @@ fn replica_primary_reset() {
520520
)
521521
.await
522522
.unwrap();
523-
let replica_index = replica.sync().await.unwrap().unwrap();
523+
let replica_index = replica.sync().await.unwrap().frame_no().unwrap();
524524
let primary_index = Client::new()
525525
.get("http://primary:9090/v1/namespaces/default/stats")
526526
.await
@@ -625,7 +625,7 @@ fn replica_no_resync_on_restart() {
625625
)
626626
.await
627627
.unwrap();
628-
db.sync().await.unwrap().unwrap()
628+
db.sync().await.unwrap().frame_no().unwrap()
629629
};
630630
let first_sync = before.elapsed();
631631

@@ -641,7 +641,7 @@ fn replica_no_resync_on_restart() {
641641
)
642642
.await
643643
.unwrap();
644-
db.sync().await.unwrap().unwrap()
644+
db.sync().await.unwrap().frame_no().unwrap()
645645
};
646646
let second_sync = before.elapsed();
647647

@@ -725,7 +725,8 @@ fn replicate_with_snapshots() {
725725
.await
726726
.unwrap();
727727

728-
db.sync().await.unwrap();
728+
let rep = db.sync().await.unwrap();
729+
assert_eq!(rep.frames_synced(), 427);
729730

730731
let conn = db.connect().unwrap();
731732

@@ -757,7 +758,8 @@ fn replicate_with_snapshots() {
757758

758759
assert_eq!(stat, 427);
759760

760-
db.sync().await.unwrap();
761+
let rep = db.sync().await.unwrap();
762+
assert_eq!(rep.frames_synced(), 0);
761763

762764
let conn = db.connect().unwrap();
763765

@@ -1226,3 +1228,132 @@ fn txn_bug_issue_1283() {
12261228

12271229
sim.run().unwrap();
12281230
}
1231+
1232+
#[test]
1233+
fn replicated_return() {
1234+
let tmp_embedded = tempdir().unwrap();
1235+
let tmp_embedded_path = tmp_embedded.path().to_owned();
1236+
1237+
let mut sim = Builder::new()
1238+
.simulation_duration(Duration::from_secs(1000))
1239+
.build();
1240+
let tmp = tempdir().unwrap();
1241+
1242+
let notify = Arc::new(Notify::new());
1243+
let notify_clone = notify.clone();
1244+
1245+
init_tracing();
1246+
sim.host("primary", move || {
1247+
let notify = notify_clone.clone();
1248+
let path = tmp.path().to_path_buf();
1249+
async move {
1250+
let make_server = || async {
1251+
TestServer {
1252+
path: path.clone().into(),
1253+
user_api_config: UserApiConfig {
1254+
..Default::default()
1255+
},
1256+
admin_api_config: Some(AdminApiConfig {
1257+
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(),
1258+
connector: TurmoilConnector,
1259+
disable_metrics: true,
1260+
}),
1261+
rpc_server_config: Some(RpcServerConfig {
1262+
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(),
1263+
tls_config: None,
1264+
}),
1265+
..Default::default()
1266+
}
1267+
};
1268+
let server = make_server().await;
1269+
let shutdown = server.shutdown.clone();
1270+
1271+
let fut = async move { server.start_sim(8080).await };
1272+
1273+
tokio::pin!(fut);
1274+
1275+
loop {
1276+
tokio::select! {
1277+
res = &mut fut => {
1278+
res.unwrap();
1279+
break
1280+
}
1281+
_ = notify.notified() => {
1282+
shutdown.notify_waiters();
1283+
},
1284+
}
1285+
}
1286+
1287+
drop(fut);
1288+
1289+
tokio::fs::File::create(path.join("dbs").join("default").join(".sentinel"))
1290+
.await
1291+
.unwrap();
1292+
1293+
notify.notify_waiters();
1294+
let server = make_server().await;
1295+
server.start_sim(8080).await.unwrap();
1296+
1297+
Ok(())
1298+
}
1299+
});
1300+
1301+
sim.client("client", async move {
1302+
let path = tmp_embedded_path.join("embedded");
1303+
let db = Database::open_with_remote_sync_connector(
1304+
path.to_str().unwrap(),
1305+
"http://primary:8080",
1306+
"",
1307+
TurmoilConnector,
1308+
false,
1309+
None,
1310+
)
1311+
.await?;
1312+
1313+
let rep = db.sync().await.unwrap();
1314+
assert_eq!(rep.frame_no(), None);
1315+
assert_eq!(rep.frames_synced(), 0);
1316+
1317+
let conn = db.connect()?;
1318+
1319+
conn.execute("CREATE TABLE user (id INTEGER)", ())
1320+
.await
1321+
.unwrap();
1322+
1323+
let rep = db.sync().await.unwrap();
1324+
assert_eq!(rep.frame_no(), Some(1));
1325+
assert_eq!(rep.frames_synced(), 2);
1326+
1327+
conn.execute_batch(
1328+
"
1329+
INSERT into user(id) values (randomblob(4096));
1330+
INSERT into user(id) values (randomblob(4096));
1331+
INSERT into user(id) values (randomblob(4096));
1332+
",
1333+
)
1334+
.await
1335+
.unwrap();
1336+
1337+
let rep = db.sync().await.unwrap();
1338+
assert_eq!(rep.frame_no(), Some(10));
1339+
assert_eq!(rep.frames_synced(), 9);
1340+
1341+
// Regenerate log
1342+
notify.notify_waiters();
1343+
notify.notified().await;
1344+
1345+
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1346+
1347+
let rep = db.sync().await.unwrap();
1348+
assert_eq!(rep.frame_no(), Some(4));
1349+
assert_eq!(rep.frames_synced(), 3);
1350+
1351+
let mut row = conn.query("select count(*) from user", ()).await.unwrap();
1352+
let count = row.next().await.unwrap().unwrap().get::<u64>(0).unwrap();
1353+
assert_eq!(count, 3);
1354+
1355+
Ok(())
1356+
});
1357+
1358+
sim.run().unwrap();
1359+
}

libsql/src/database.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ cfg_replication! {
323323

324324
/// Sync database from remote, and returns the committed frame_no after syncing, if
325325
/// applicable.
326-
pub async fn sync(&self) -> Result<Option<FrameNo>> {
326+
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
327327
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
328328
db.sync().await
329329
} else {

libsql/src/local/database.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ impl Database {
8383
encryption_config: Option<EncryptionConfig>,
8484
sync_interval: Option<std::time::Duration>,
8585
http_request_callback: Option<crate::util::HttpRequestCallback>,
86-
namespace: Option<String>
86+
namespace: Option<String>,
8787
) -> Result<Database> {
8888
use std::path::PathBuf;
8989

@@ -260,8 +260,8 @@ impl Database {
260260
#[cfg(feature = "replication")]
261261
/// Perform a sync step, returning the new replication index, or None, if the nothing was
262262
/// replicated yet
263-
pub async fn sync_oneshot(&self) -> Result<Option<FrameNo>> {
264-
if let Some(ref ctx) = self.replication_ctx {
263+
pub async fn sync_oneshot(&self) -> Result<crate::replication::Replicated> {
264+
if let Some(ctx) = &self.replication_ctx {
265265
ctx.replicator.sync_oneshot().await
266266
} else {
267267
Err(crate::errors::Error::Misuse(
@@ -273,7 +273,7 @@ impl Database {
273273

274274
#[cfg(feature = "replication")]
275275
/// Sync with primary
276-
pub async fn sync(&self) -> Result<Option<FrameNo>> {
276+
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
277277
Ok(self.sync_oneshot().await?)
278278
}
279279

0 commit comments

Comments
 (0)