Skip to content

Commit 1dd2cc9

Browse files
committed
moved to frames_synced
1 parent c77bbc7 commit 1dd2cc9

File tree

4 files changed

+76
-31
lines changed

4 files changed

+76
-31
lines changed

libsql-replication/src/replicator.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ pub struct Replicator<C> {
141141
client: C,
142142
injector: Arc<Mutex<Injector>>,
143143
state: ReplicatorState,
144+
frames_synced: usize,
144145
}
145146

146147
const INJECTOR_BUFFER_CAPACITY: usize = 10;
@@ -178,6 +179,7 @@ impl<C: ReplicatorClient> Replicator<C> {
178179
client,
179180
injector: Arc::new(Mutex::new(injector)),
180181
state: ReplicatorState::NeedHandshake,
182+
frames_synced: 0,
181183
})
182184
}
183185

@@ -311,6 +313,8 @@ impl<C: ReplicatorClient> Replicator<C> {
311313
}
312314

313315
async fn inject_frame(&mut self, frame: Frame) -> Result<(), Error> {
316+
self.frames_synced += 1;
317+
314318
let injector = self.injector.clone();
315319
match spawn_blocking(move || injector.lock().inject_frame(frame)).await? {
316320
Ok(Some(commit_fno)) => {
@@ -335,6 +339,10 @@ impl<C: ReplicatorClient> Replicator<C> {
335339

336340
Ok(())
337341
}
342+
343+
pub fn frames_synced(&self) -> usize {
344+
self.frames_synced
345+
}
338346
}
339347

340348
/// Helper function to convert rpc frames results to replicator frames

libsql-server/tests/embedded_replica/mod.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,8 @@ fn replicate_with_snapshots() {
725725
.await
726726
.unwrap();
727727

728-
db.sync().await.unwrap();
728+
let rep = db.sync().await.unwrap();
729+
assert_eq!(rep.frames_synced(), 427);
729730

730731
let conn = db.connect().unwrap();
731732

@@ -757,7 +758,8 @@ fn replicate_with_snapshots() {
757758

758759
assert_eq!(stat, 427);
759760

760-
db.sync().await.unwrap();
761+
let rep = db.sync().await.unwrap();
762+
assert_eq!(rep.frames_synced(), 427);
761763

762764
let conn = db.connect().unwrap();
763765

@@ -1230,15 +1232,7 @@ fn txn_bug_issue_1283() {
12301232
#[test]
12311233
fn replicated_return() {
12321234
let tmp_embedded = tempdir().unwrap();
1233-
let tmp_host = tempdir().unwrap();
12341235
let tmp_embedded_path = tmp_embedded.path().to_owned();
1235-
let tmp_host_path = tmp_host.path().to_owned();
1236-
1237-
let mut sim = Builder::new()
1238-
.simulation_duration(Duration::from_secs(1000))
1239-
.build();
1240-
1241-
// make_primary(&mut sim, tmp_host_path.clone());
12421236

12431237
let mut sim = Builder::new()
12441238
.simulation_duration(Duration::from_secs(1000))
@@ -1318,7 +1312,7 @@ fn replicated_return() {
13181312

13191313
let rep = db.sync().await.unwrap();
13201314
assert_eq!(rep.frame_no(), None);
1321-
assert_eq!(rep.start_frame_no(), None);
1315+
assert_eq!(rep.frames_synced(), 0);
13221316

13231317
let conn = db.connect()?;
13241318

@@ -1328,7 +1322,7 @@ fn replicated_return() {
13281322

13291323
let rep = db.sync().await.unwrap();
13301324
assert_eq!(rep.frame_no(), Some(1));
1331-
assert_eq!(rep.start_frame_no(), None);
1325+
assert_eq!(rep.frames_synced(), 2);
13321326

13331327
conn.execute_batch(
13341328
"
@@ -1342,16 +1336,17 @@ fn replicated_return() {
13421336

13431337
let rep = db.sync().await.unwrap();
13441338
assert_eq!(rep.frame_no(), Some(10));
1345-
assert_eq!(rep.start_frame_no(), Some(1));
1339+
assert_eq!(rep.frames_synced(), 11);
13461340

1341+
// Regenerate log
13471342
notify.notify_waiters();
13481343
notify.notified().await;
13491344

13501345
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
13511346

13521347
let rep = db.sync().await.unwrap();
13531348
assert_eq!(rep.frame_no(), Some(4));
1354-
assert_eq!(rep.start_frame_no(), Some(10));
1349+
assert_eq!(rep.frames_synced(), 16);
13551350

13561351
let mut row = conn.query("select count(*) from user", ()).await.unwrap();
13571352
let count = row.next().await.unwrap().unwrap().get::<u64>(0).unwrap();

libsql/src/replication/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@ pub(crate) mod remote_client;
3535
#[derive(Debug)]
3636
pub struct Replicated {
3737
frame_no: Option<FrameNo>,
38-
start_frame_no: Option<FrameNo>,
38+
frames_synced: usize,
3939
}
4040

4141
impl Replicated {
4242
pub fn frame_no(&self) -> Option<FrameNo> {
4343
self.frame_no
4444
}
4545

46-
pub fn start_frame_no(&self) -> Option<FrameNo> {
47-
self.start_frame_no
46+
pub fn frames_synced(&self) -> usize {
47+
self.frames_synced
4848
}
4949
}
5050

@@ -209,8 +209,6 @@ impl EmbeddedReplicator {
209209
));
210210
}
211211

212-
let start_frame_no = replicator.client_mut().committed_frame_no();
213-
214212
// we force a handshake to get the most up to date replication index from the primary.
215213
replicator.force_handshake();
216214

@@ -235,7 +233,7 @@ impl EmbeddedReplicator {
235233
let Some(primary_index) = client.last_handshake_replication_index() else {
236234
return Ok(Replicated {
237235
frame_no: None,
238-
start_frame_no: None,
236+
frames_synced: 0,
239237
});
240238
};
241239
if let Some(replica_index) = replicator.client_mut().committed_frame_no() {
@@ -249,7 +247,7 @@ impl EmbeddedReplicator {
249247

250248
let replicated = Replicated {
251249
frame_no: replicator.client_mut().committed_frame_no(),
252-
start_frame_no,
250+
frames_synced: replicator.frames_synced(),
253251
};
254252

255253
Ok(replicated)

libsql/src/replication/remote_client.rs

Lines changed: 54 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use futures::StreamExt as _;
88
use libsql_replication::frame::{Frame, FrameHeader, FrameNo};
99
use libsql_replication::meta::WalIndexMeta;
1010
use libsql_replication::replicator::{map_frame_err, Error, ReplicatorClient};
11-
use libsql_replication::rpc::replication::{verify_session_token, Frames, HelloRequest, LogOffset, SESSION_TOKEN_KEY, HelloResponse};
11+
use libsql_replication::rpc::replication::{
12+
verify_session_token, Frames, HelloRequest, HelloResponse, LogOffset, SESSION_TOKEN_KEY,
13+
};
1214
use tokio_stream::Stream;
1315
use tonic::metadata::AsciiMetadataValue;
1416
use tonic::{Response, Status};
@@ -81,7 +83,10 @@ impl RemoteClient {
8183
self.last_handshake_replication_index
8284
}
8385

84-
async fn handle_handshake_response(&mut self, hello:Result<Response<HelloResponse>, Status>) -> Result<bool, Error> {
86+
async fn handle_handshake_response(
87+
&mut self,
88+
hello: Result<Response<HelloResponse>, Status>,
89+
) -> Result<bool, Error> {
8590
let hello = hello?.into_inner();
8691
verify_session_token(&hello.session_token).map_err(Error::Client)?;
8792
let new_session = self.session_token != Some(hello.session_token.clone());
@@ -130,7 +135,9 @@ impl RemoteClient {
130135
(hello_fut.await, None)
131136
};
132137
self.prefetched_batch_log_entries = if let Ok(true) = hello.0 {
133-
tracing::warn!("Frames prefetching failed because of new session token returned by handshake");
138+
tracing::warn!(
139+
"Frames prefetching failed because of new session token returned by handshake"
140+
);
134141
None
135142
} else {
136143
frames
@@ -139,7 +146,10 @@ impl RemoteClient {
139146
hello
140147
}
141148

142-
async fn handle_next_frames_response(&mut self, frames: Result<Response<Frames>, Status>) -> Result<<Self as ReplicatorClient>::FrameStream, Error> {
149+
async fn handle_next_frames_response(
150+
&mut self,
151+
frames: Result<Response<Frames>, Status>,
152+
) -> Result<<Self as ReplicatorClient>::FrameStream, Error> {
143153
let frames = frames?.into_inner().frames;
144154

145155
if let Some(f) = frames.last() {
@@ -157,7 +167,12 @@ impl RemoteClient {
157167
Ok(Box::pin(stream))
158168
}
159169

160-
async fn do_next_frames(&mut self) -> (Result<<Self as ReplicatorClient>::FrameStream, Error>, Duration) {
170+
async fn do_next_frames(
171+
&mut self,
172+
) -> (
173+
Result<<Self as ReplicatorClient>::FrameStream, Error>,
174+
Duration,
175+
) {
161176
let (frames, time) = match self.prefetched_batch_log_entries.take() {
162177
Some((result, time)) => (result, time),
163178
None => {
@@ -197,7 +212,13 @@ impl RemoteClient {
197212
}
198213
}
199214

200-
fn maybe_log<T>(time: Duration, sum: &mut Duration, count: &mut u128, result: &Result<T, Error>, op_name: &str) {
215+
fn maybe_log<T>(
216+
time: Duration,
217+
sum: &mut Duration,
218+
count: &mut u128,
219+
result: &Result<T, Error>,
220+
op_name: &str,
221+
) {
201222
if let Err(e) = &result {
202223
tracing::warn!("Failed {} in {} ms: {:?}", op_name, time.as_millis(), e);
203224
} else {
@@ -206,7 +227,12 @@ fn maybe_log<T>(time: Duration, sum: &mut Duration, count: &mut u128, result: &R
206227
let avg = (*sum).as_millis() / *count;
207228
let time = time.as_millis();
208229
if *count > 10 && time > 2 * avg {
209-
tracing::warn!("Unusually long {}. Took {} ms, average {} ms", op_name, time, avg);
230+
tracing::warn!(
231+
"Unusually long {}. Took {} ms, average {} ms",
232+
op_name,
233+
time,
234+
avg
235+
);
210236
}
211237
}
212238
}
@@ -218,22 +244,40 @@ impl ReplicatorClient for RemoteClient {
218244
/// Perform handshake with remote
219245
async fn handshake(&mut self) -> Result<(), Error> {
220246
let (result, time) = self.do_handshake_with_prefetch().await;
221-
maybe_log(time, &mut self.handshake_latency_sum, &mut self.handshake_latency_count, &result, "handshake");
247+
maybe_log(
248+
time,
249+
&mut self.handshake_latency_sum,
250+
&mut self.handshake_latency_count,
251+
&result,
252+
"handshake",
253+
);
222254
result.map(|_| ())
223255
}
224256

225257
/// Return a stream of frames to apply to the database
226258
async fn next_frames(&mut self) -> Result<Self::FrameStream, Error> {
227259
let (result, time) = self.do_next_frames().await;
228-
maybe_log(time, &mut self.frames_latency_sum, &mut self.frames_latency_count, &result, "frames fetch");
260+
maybe_log(
261+
time,
262+
&mut self.frames_latency_sum,
263+
&mut self.frames_latency_count,
264+
&result,
265+
"frames fetch",
266+
);
229267
result
230268
}
231269

232270
/// Return a snapshot for the current replication index. Called after next_frame has returned a
233271
/// NeedSnapshot error
234272
async fn snapshot(&mut self) -> Result<Self::FrameStream, Error> {
235273
let (snapshot, time) = time(self.do_snapshot()).await;
236-
maybe_log(time, &mut self.snapshot_latency_sum, &mut self.snapshot_latency_count, &snapshot, "snapshot fetch");
274+
maybe_log(
275+
time,
276+
&mut self.snapshot_latency_sum,
277+
&mut self.snapshot_latency_count,
278+
&snapshot,
279+
"snapshot fetch",
280+
);
237281
snapshot
238282
}
239283

0 commit comments

Comments
 (0)