Skip to content

Commit 99c7308

Browse files
authored
Optimise sync_pull to pull frames in batches (#2089)
Previously, we pulled frames one by one. This patch changes it pull frames in batches. Currently, the batch size is set to 128 (the maximum supported by the server)
2 parents 4ef5e81 + b7dfab4 commit 99c7308

File tree

2 files changed

+83
-19
lines changed

2 files changed

+83
-19
lines changed

libsql/src/replication/remote_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use tonic::metadata::AsciiMetadataValue;
1818
use tonic::{Response, Status};
1919
use zerocopy::FromBytes;
2020

21-
async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
21+
pub(crate) async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
2222
let before = Instant::now();
2323
let out = fut.await;
2424
(out, before.elapsed())

libsql/src/sync.rs

Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const METADATA_VERSION: u32 = 0;
2020

2121
const DEFAULT_MAX_RETRIES: usize = 5;
2222
const DEFAULT_PUSH_BATCH_SIZE: u32 = 128;
23+
const DEFAULT_PULL_BATCH_SIZE: u32 = 128;
2324

2425
#[derive(thiserror::Error, Debug)]
2526
#[non_exhaustive]
@@ -66,6 +67,8 @@ pub enum SyncError {
6667
InvalidLocalGeneration(u32, u32),
6768
#[error("invalid local state: {0}")]
6869
InvalidLocalState(String),
70+
#[error("server returned invalid length of frames: {0}")]
71+
InvalidPullFrameBytes(usize),
6972
}
7073

7174
impl SyncError {
@@ -98,8 +101,8 @@ pub enum PushStatus {
98101
}
99102

100103
pub enum PullResult {
101-
/// A frame was successfully pulled.
102-
Frame(Bytes),
104+
/// Frames were successfully pulled.
105+
Frames(Bytes),
103106
/// We've reached the end of the generation.
104107
EndOfGeneration { max_generation: u32 },
105108
}
@@ -122,6 +125,7 @@ pub struct SyncContext {
122125
auth_token: Option<HeaderValue>,
123126
max_retries: usize,
124127
push_batch_size: u32,
128+
pull_batch_size: u32,
125129
/// The current durable generation.
126130
durable_generation: u32,
127131
/// Represents the max_frame_no from the server.
@@ -154,6 +158,7 @@ impl SyncContext {
154158
auth_token,
155159
max_retries: DEFAULT_MAX_RETRIES,
156160
push_batch_size: DEFAULT_PUSH_BATCH_SIZE,
161+
pull_batch_size: DEFAULT_PULL_BATCH_SIZE,
157162
client,
158163
durable_generation: 0,
159164
durable_frame_num: 0,
@@ -175,7 +180,7 @@ impl SyncContext {
175180
}
176181

177182
#[tracing::instrument(skip(self))]
178-
pub(crate) async fn pull_one_frame(
183+
pub(crate) async fn pull_frames(
179184
&mut self,
180185
generation: u32,
181186
frame_no: u32,
@@ -185,9 +190,10 @@ impl SyncContext {
185190
self.sync_url,
186191
generation,
187192
frame_no,
188-
frame_no + 1
193+
// the server expects the range of [start, end) frames, i.e. end is exclusive
194+
frame_no + self.pull_batch_size
189195
);
190-
tracing::debug!("pulling frame");
196+
tracing::debug!("pulling frame (uri={})", uri);
191197
self.pull_with_retry(uri, self.max_retries).await
192198
}
193199

@@ -417,20 +423,39 @@ impl SyncContext {
417423
.map_err(SyncError::HttpDispatch)?;
418424

419425
if res.status().is_success() {
420-
let frame = hyper::body::to_bytes(res.into_body())
426+
let frames = hyper::body::to_bytes(res.into_body())
421427
.await
422428
.map_err(SyncError::HttpBody)?;
423-
return Ok(PullResult::Frame(frame));
429+
// a success result should always return some frames
430+
if frames.is_empty() {
431+
tracing::error!("server returned empty frames in pull response");
432+
return Err(SyncError::InvalidPullFrameBytes(0).into());
433+
}
434+
// the minimum payload size cannot be less than a single frame
435+
if frames.len() < FRAME_SIZE {
436+
tracing::error!(
437+
"server returned frames with invalid length: {} < {}",
438+
frames.len(),
439+
FRAME_SIZE
440+
);
441+
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
442+
}
443+
return Ok(PullResult::Frames(frames));
424444
}
425445
// BUG ALERT: The server returns a 500 error if the remote database is empty.
426446
// This is a bug and should be fixed.
427447
if res.status() == StatusCode::BAD_REQUEST
428448
|| res.status() == StatusCode::INTERNAL_SERVER_ERROR
429449
{
450+
let status = res.status();
430451
let res_body = hyper::body::to_bytes(res.into_body())
431452
.await
432453
.map_err(SyncError::HttpBody)?;
433-
454+
tracing::trace!(
455+
"server returned: {} body: {}",
456+
status,
457+
String::from_utf8_lossy(&res_body[..])
458+
);
434459
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
435460
.map_err(SyncError::JsonDecode)?;
436461

@@ -650,22 +675,34 @@ impl SyncContext {
650675

651676
let req = req.body(Body::empty()).expect("valid request");
652677

653-
let res = self
654-
.client
655-
.request(req)
656-
.await
657-
.map_err(SyncError::HttpDispatch)?;
678+
let (res, http_duration) =
679+
crate::replication::remote_client::time(self.client.request(req)).await;
680+
let res = res.map_err(SyncError::HttpDispatch)?;
658681

659682
if !res.status().is_success() {
660683
let status = res.status();
661684
let body = hyper::body::to_bytes(res.into_body())
662685
.await
663686
.map_err(SyncError::HttpBody)?;
687+
tracing::error!(
688+
"failed to pull db file from remote server, status={}, body={}, url={}, duration={:?}",
689+
status,
690+
String::from_utf8_lossy(&body),
691+
uri,
692+
http_duration
693+
);
664694
return Err(
665695
SyncError::PullFrame(status, String::from_utf8_lossy(&body).to_string()).into(),
666696
);
667697
}
668698

699+
tracing::debug!(
700+
"pulled db file from remote server, status={}, url={}, duration={:?}",
701+
res.status(),
702+
uri,
703+
http_duration
704+
);
705+
669706
// todo: do streaming write to the disk
670707
let bytes = hyper::body::to_bytes(res.into_body())
671708
.await
@@ -887,6 +924,11 @@ async fn try_push(
887924
})
888925
}
889926

927+
/// PAGE_SIZE used by the sync / diskless server
928+
const PAGE_SIZE: usize = 4096;
929+
const FRAME_HEADER_SIZE: usize = 24;
930+
const FRAME_SIZE: usize = PAGE_SIZE + FRAME_HEADER_SIZE;
931+
890932
pub async fn try_pull(
891933
sync_ctx: &mut SyncContext,
892934
conn: &Connection,
@@ -898,10 +940,32 @@ pub async fn try_pull(
898940
loop {
899941
let generation = sync_ctx.durable_generation();
900942
let frame_no = sync_ctx.durable_frame_num() + 1;
901-
match sync_ctx.pull_one_frame(generation, frame_no).await {
902-
Ok(PullResult::Frame(frame)) => {
903-
insert_handle.insert(&frame)?;
904-
sync_ctx.durable_frame_num = frame_no;
943+
match sync_ctx.pull_frames(generation, frame_no).await {
944+
Ok(PullResult::Frames(frames)) => {
945+
tracing::debug!(
946+
"pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}",
947+
generation, frame_no, frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(),
948+
);
949+
if frames.len() % FRAME_SIZE != 0 {
950+
tracing::error!(
951+
"frame size {} is not a multiple of the expected size {}",
952+
frames.len(),
953+
FRAME_SIZE,
954+
);
955+
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
956+
}
957+
for chunk in frames.chunks(FRAME_SIZE) {
958+
let r = insert_handle.insert(&chunk);
959+
if let Err(e) = r {
960+
tracing::error!(
961+
"insert error (frame= {}) : {:?}",
962+
sync_ctx.durable_frame_num + 1,
963+
e
964+
);
965+
return Err(e);
966+
}
967+
sync_ctx.durable_frame_num += 1;
968+
}
905969
}
906970
Ok(PullResult::EndOfGeneration { max_generation }) => {
907971
// If there are no more generations to pull, we're done.
@@ -920,7 +984,7 @@ pub async fn try_pull(
920984
insert_handle.begin()?;
921985
}
922986
Err(e) => {
923-
tracing::debug!("pull_one_frame error: {:?}", e);
987+
tracing::debug!("pull_frames error: {:?}", e);
924988
err.replace(e);
925989
break;
926990
}

0 commit comments

Comments
 (0)