Skip to content

Commit b83b029

Browse files
committed
Expose detailed sync stats through Database::get_sync_usage_stats
Signed-off-by: Piotr Jastrzebski <[email protected]>
1 parent 7889902 commit b83b029

File tree

3 files changed

+103
-1
lines changed

3 files changed

+103
-1
lines changed

libsql/src/local/database.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ cfg_replication!(
1010
use crate::replication::remote_client::RemoteClient;
1111
use crate::replication::EmbeddedReplicator;
1212
pub use crate::replication::Frames;
13+
pub use crate::replication::SyncUsageStats;
1314

1415
pub struct ReplicationContext {
1516
pub(crate) replicator: EmbeddedReplicator,
@@ -277,6 +278,20 @@ impl Database {
277278
Ok(self.sync_oneshot().await?)
278279
}
279280

281+
#[cfg(feature = "replication")]
282+
/// Return detailed logs about bytes synced with primary
283+
pub async fn get_sync_usage_stats(&self) -> Result<SyncUsageStats> {
284+
if let Some(ctx) = &self.replication_ctx {
285+
let sync_stats = ctx.replicator.get_sync_usage_stats().await?;
286+
Ok(sync_stats)
287+
} else {
288+
Err(crate::errors::Error::Misuse(
289+
"No replicator available. Use Database::with_replicator() to enable replication"
290+
.to_string(),
291+
))
292+
}
293+
}
294+
280295
#[cfg(feature = "replication")]
281296
/// Sync with primary at least to a given replication index
282297
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::replication::Replicated> {

libsql/src/replication/mod.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,60 @@ pub enum Frames {
6565
Snapshot(SnapshotFile),
6666
}
6767

68+
/// Detailed logs about bytes synced with primary
69+
pub struct SyncUsageStats {
70+
prefetched_bytes: u64,
71+
prefetched_bytes_discarded_due_to_new_session: u64,
72+
prefetched_bytes_discarded_due_to_consecutive_handshake: u64,
73+
prefetched_bytes_discarded_due_to_invalid_frame_header: u64,
74+
synced_bytes_discarded_due_to_invalid_frame_header: u64,
75+
prefetched_bytes_used: u64,
76+
synced_bytes_used: u64,
77+
snapshot_bytes: u64,
78+
}
79+
80+
impl SyncUsageStats {
81+
/// Number of bytes prefetched while doing handshake
82+
pub fn prefetched_bytes(&self) -> u64 {
83+
self.prefetched_bytes
84+
}
85+
86+
/// Number of bytes prefetched and discarded due to the change of the client session
87+
pub fn prefetched_bytes_discarded_due_to_new_session(&self) -> u64 {
88+
self.prefetched_bytes_discarded_due_to_new_session
89+
}
90+
91+
/// Number of bytes prefetched and discarded due to consecutive handshake with new prefetch
92+
pub fn prefetched_bytes_discarded_due_to_consecutive_handshake(&self) -> u64 {
93+
self.prefetched_bytes_discarded_due_to_consecutive_handshake
94+
}
95+
96+
/// Number of bytes prefetched and discarded due to invalid frame header in received frames
97+
pub fn prefetched_bytes_discarded_due_to_invalid_frame_header(&self) -> u64 {
98+
self.prefetched_bytes_discarded_due_to_invalid_frame_header
99+
}
100+
101+
/// Number of bytes synced and discarded due to invalid frame header in received frames
102+
pub fn synced_bytes_discarded_due_to_invalid_frame_header(&self) -> u64 {
103+
self.synced_bytes_discarded_due_to_invalid_frame_header
104+
}
105+
106+
/// Number of bytes prefetched and used
107+
pub fn prefetched_bytes_used(&self) -> u64 {
108+
self.prefetched_bytes_used
109+
}
110+
111+
/// Number of bytes synced and used
112+
pub fn synced_bytes_used(&self) -> u64 {
113+
self.synced_bytes_used
114+
}
115+
116+
/// Number of bytes downloaded as snapshots
117+
pub fn snapshot_bytes(&self) -> u64 {
118+
self.snapshot_bytes
119+
}
120+
}
121+
68122
#[derive(Clone)]
69123
pub(crate) struct Writer {
70124
pub(crate) client: client::Client,
@@ -210,6 +264,35 @@ impl EmbeddedReplicator {
210264
})
211265
}
212266

267+
pub async fn get_sync_usage_stats(&self) -> Result<SyncUsageStats> {
268+
let mut replicator = self.replicator.lock().await;
269+
match replicator.client_mut() {
270+
Either::Right(_) => {
271+
Err(crate::errors::Error::Misuse(
272+
"Trying to get sync usage stats, but this is a local replicator".into(),
273+
))
274+
}
275+
Either::Left(c) => {
276+
let stats = c.sync_stats();
277+
Ok(SyncUsageStats {
278+
prefetched_bytes: stats.prefetched_bytes.load(std::sync::atomic::Ordering::SeqCst),
279+
prefetched_bytes_discarded_due_to_new_session: stats
280+
.prefetched_bytes_discarded_due_to_new_session.load(std::sync::atomic::Ordering::SeqCst),
281+
prefetched_bytes_discarded_due_to_consecutive_handshake: stats
282+
.prefetched_bytes_discarded_due_to_consecutive_handshake.load(std::sync::atomic::Ordering::SeqCst),
283+
prefetched_bytes_discarded_due_to_invalid_frame_header: stats
284+
.prefetched_bytes_discarded_due_to_invalid_frame_header.load(std::sync::atomic::Ordering::SeqCst),
285+
synced_bytes_discarded_due_to_invalid_frame_header: stats
286+
.synced_bytes_discarded_due_to_invalid_frame_header.load(std::sync::atomic::Ordering::SeqCst),
287+
prefetched_bytes_used: stats.prefetched_bytes_used.load(std::sync::atomic::Ordering::SeqCst),
288+
synced_bytes_used: stats.synced_bytes_used.load(std::sync::atomic::Ordering::SeqCst),
289+
snapshot_bytes: stats.snapshot_bytes.load(std::sync::atomic::Ordering::SeqCst),
290+
})
291+
}
292+
}
293+
294+
}
295+
213296
pub async fn sync_oneshot(&self) -> Result<Replicated> {
214297
use libsql_replication::replicator::ReplicatorClient;
215298

libsql/src/replication/remote_client.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
2424
(out, before.elapsed())
2525
}
2626

27-
struct SyncStats {
27+
pub(crate) struct SyncStats {
2828
pub prefetched_bytes: AtomicU64,
2929
pub prefetched_bytes_discarded_due_to_new_session: AtomicU64,
3030
pub prefetched_bytes_discarded_due_to_consecutive_handshake: AtomicU64,
@@ -121,6 +121,10 @@ impl RemoteClient {
121121
})
122122
}
123123

124+
pub(crate) fn sync_stats(&self) -> Arc<SyncStats> {
125+
self.sync_stats.clone()
126+
}
127+
124128
fn next_offset(&self) -> FrameNo {
125129
match self.last_received {
126130
Some(fno) => fno + 1,

0 commit comments

Comments
 (0)