Skip to content

Commit a49ab95

Browse files
committed
Performance: optimise offline sync
Currently, the sync code pulls all the generations and frames one by one. However, this is very inefficient, for databases with many generations it could take hours. This patch adds a optimisation that it pulls the latest generation by calling `export` endpoint and boostraps the db file quickly. My testing shows insane improvements. For a db with 30 generations, and about 100 frames in each gen, it has brought down the initial sync from 1hr to 1 minute.
1 parent a05ad5d commit a49ab95

File tree

1 file changed

+130
-2
lines changed

1 file changed

+130
-2
lines changed

libsql/src/sync.rs

Lines changed: 130 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ pub enum SyncError {
6060
RedirectHeader(http::header::ToStrError),
6161
#[error("redirect response with no location header")]
6262
NoRedirectLocationHeader,
63+
#[error("failed to pull db export: status={0}, error={1}")]
64+
PullDb(StatusCode, String),
65+
#[error("server returned a lower generation than local: local={0}, remote={1}")]
66+
InvalidLocalGeneration(u32, u32),
6367
}
6468

6569
impl SyncError {
@@ -86,6 +90,11 @@ pub enum PullResult {
8690
EndOfGeneration { max_generation: u32 },
8791
}
8892

93+
#[derive(serde::Deserialize)]
94+
struct InfoResult {
95+
current_generation: u32,
96+
}
97+
8998
pub struct SyncContext {
9099
db_path: String,
91100
client: hyper::Client<ConnectorService, Body>,
@@ -97,6 +106,9 @@ pub struct SyncContext {
97106
durable_generation: u32,
98107
/// Represents the max_frame_no from the server.
99108
durable_frame_num: u32,
109+
/// whenever sync is called very first time, we will call the remote server
110+
/// to get the generation information and sync the db file if needed
111+
initial_server_sync: bool,
100112
}
101113

102114
impl SyncContext {
@@ -123,8 +135,9 @@ impl SyncContext {
123135
max_retries: DEFAULT_MAX_RETRIES,
124136
push_batch_size: DEFAULT_PUSH_BATCH_SIZE,
125137
client,
126-
durable_generation: 1,
138+
durable_generation: 0,
127139
durable_frame_num: 0,
140+
initial_server_sync: false,
128141
};
129142

130143
if let Err(e) = me.read_metadata().await {
@@ -173,7 +186,7 @@ impl SyncContext {
173186
frame_no,
174187
frame_no + frames_count
175188
);
176-
tracing::debug!("pushing frame");
189+
tracing::debug!("pushing frame(frame_no={}, count={}, generation={})", frame_no, frames_count, generation);
177190

178191
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
179192

@@ -458,6 +471,105 @@ impl SyncContext {
458471

459472
Ok(())
460473
}
474+
475+
/// get_remote_info calls the remote server to get the current generation information.
476+
async fn get_remote_info(&self) -> Result<InfoResult> {
477+
let uri = format!("{}/info", self.sync_url);
478+
let mut req = http::Request::builder().method("GET").uri(&uri);
479+
480+
if let Some(auth_token) = &self.auth_token {
481+
req = req.header("Authorization", auth_token);
482+
}
483+
484+
let req = req.body(Body::empty()).expect("valid request");
485+
486+
let res = self
487+
.client
488+
.request(req)
489+
.await
490+
.map_err(SyncError::HttpDispatch)?;
491+
492+
if !res.status().is_success() {
493+
let status = res.status();
494+
let body = hyper::body::to_bytes(res.into_body())
495+
.await
496+
.map_err(SyncError::HttpBody)?;
497+
return Err(
498+
SyncError::PullDb(status, String::from_utf8_lossy(&body).to_string()).into(),
499+
);
500+
}
501+
502+
let body = hyper::body::to_bytes(res.into_body())
503+
.await
504+
.map_err(SyncError::HttpBody)?;
505+
506+
let info = serde_json::from_slice(&body).map_err(SyncError::JsonDecode)?;
507+
508+
Ok(info)
509+
}
510+
511+
async fn sync_db_if_needed(&mut self, generation: u32) -> Result<()> {
512+
// we will get the export file only if the remote generation is different from the one we have
513+
if generation == self.durable_generation {
514+
return Ok(());
515+
}
516+
// somehow we are ahead of the remote in generations. following should not happen because
517+
// we checkpoint only if the remote server tells us to do so.
518+
if self.durable_generation > generation {
519+
tracing::error!(
520+
"server returned a lower generation than what we have: sent={}, got={}",
521+
self.durable_generation,
522+
generation
523+
);
524+
return Err(
525+
SyncError::InvalidLocalGeneration(self.durable_generation, generation).into(),
526+
);
527+
}
528+
tracing::debug!(
529+
"syncing db file from remote server, generation={}",
530+
generation
531+
);
532+
self.sync_db(generation).await
533+
}
534+
535+
/// sync_db will download the db file from the remote server and replace the local file.
536+
async fn sync_db(&mut self, generation: u32) -> Result<()> {
537+
let uri = format!("{}/export/{}", self.sync_url, generation);
538+
let mut req = http::Request::builder().method("GET").uri(&uri);
539+
540+
if let Some(auth_token) = &self.auth_token {
541+
req = req.header("Authorization", auth_token);
542+
}
543+
544+
let req = req.body(Body::empty()).expect("valid request");
545+
546+
let res = self
547+
.client
548+
.request(req)
549+
.await
550+
.map_err(SyncError::HttpDispatch)?;
551+
552+
if !res.status().is_success() {
553+
let status = res.status();
554+
let body = hyper::body::to_bytes(res.into_body())
555+
.await
556+
.map_err(SyncError::HttpBody)?;
557+
return Err(
558+
SyncError::PullFrame(status, String::from_utf8_lossy(&body).to_string()).into(),
559+
);
560+
}
561+
562+
// todo: do streaming write to the disk
563+
let bytes = hyper::body::to_bytes(res.into_body())
564+
.await
565+
.map_err(SyncError::HttpBody)?;
566+
567+
atomic_write(&self.db_path, &bytes).await?;
568+
self.durable_generation = generation;
569+
self.durable_frame_num = 0;
570+
self.write_metadata().await?;
571+
Ok(())
572+
}
461573
}
462574

463575
#[derive(serde::Serialize, serde::Deserialize, Debug)]
@@ -555,6 +667,22 @@ pub async fn sync_offline(
555667
Err(e) => Err(e),
556668
}
557669
} else {
670+
// todo: we are checking with the remote server only during initialisation. ideally,
671+
// we should check everytime we try to sync with the remote server. However, we need to close
672+
// all the ongoing connections since we replace `.db` file and remove the `.db-wal` file
673+
if !sync_ctx.initial_server_sync {
674+
// sync is being called first time. so we will call remote, get the generation information
675+
// if we are lagging behind, then we will call the export API and get to the latest
676+
// generation directly.
677+
let info = sync_ctx.get_remote_info().await?;
678+
sync_ctx
679+
.sync_db_if_needed(info.current_generation)
680+
.await?;
681+
// when sync_ctx is initialised, we set durable_generation to 0. however, once
682+
// sync_db is called, it should be > 0.
683+
assert!(sync_ctx.durable_generation > 0, "generation should be > 0");
684+
sync_ctx.initial_server_sync = true;
685+
}
558686
try_pull(sync_ctx, conn).await
559687
}
560688
.or_else(|err| {

0 commit comments

Comments
 (0)