Skip to content

Commit b7dfab4

Browse files
committed
Optimise sync_pull to pull frames in batches
Previously, we pulled frames one by one. This patch changes it pull frames in batches. Currently, the batch size is set to 128 (the maximum supported by the server)
1 parent 4ef5e81 commit b7dfab4

File tree

2 files changed

+83
-19
lines changed

2 files changed

+83
-19
lines changed

libsql/src/replication/remote_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use tonic::metadata::AsciiMetadataValue;
1818
use tonic::{Response, Status};
1919
use zerocopy::FromBytes;
2020

21-
async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
21+
pub(crate) async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
2222
let before = Instant::now();
2323
let out = fut.await;
2424
(out, before.elapsed())

libsql/src/sync.rs

Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ const METADATA_VERSION: u32 = 0;
2020

2121
const DEFAULT_MAX_RETRIES: usize = 5;
2222
const DEFAULT_PUSH_BATCH_SIZE: u32 = 128;
23+
const DEFAULT_PULL_BATCH_SIZE: u32 = 128;
2324

2425
#[derive(thiserror::Error, Debug)]
2526
#[non_exhaustive]
@@ -66,6 +67,8 @@ pub enum SyncError {
6667
InvalidLocalGeneration(u32, u32),
6768
#[error("invalid local state: {0}")]
6869
InvalidLocalState(String),
70+
#[error("server returned invalid length of frames: {0}")]
71+
InvalidPullFrameBytes(usize),
6972
}
7073

7174
impl SyncError {
@@ -98,8 +101,8 @@ pub enum PushStatus {
98101
}
99102

100103
pub enum PullResult {
101-
/// A frame was successfully pulled.
102-
Frame(Bytes),
104+
/// Frames were successfully pulled.
105+
Frames(Bytes),
103106
/// We've reached the end of the generation.
104107
EndOfGeneration { max_generation: u32 },
105108
}
@@ -122,6 +125,7 @@ pub struct SyncContext {
122125
auth_token: Option<HeaderValue>,
123126
max_retries: usize,
124127
push_batch_size: u32,
128+
pull_batch_size: u32,
125129
/// The current durable generation.
126130
durable_generation: u32,
127131
/// Represents the max_frame_no from the server.
@@ -154,6 +158,7 @@ impl SyncContext {
154158
auth_token,
155159
max_retries: DEFAULT_MAX_RETRIES,
156160
push_batch_size: DEFAULT_PUSH_BATCH_SIZE,
161+
pull_batch_size: DEFAULT_PULL_BATCH_SIZE,
157162
client,
158163
durable_generation: 0,
159164
durable_frame_num: 0,
@@ -175,7 +180,7 @@ impl SyncContext {
175180
}
176181

177182
#[tracing::instrument(skip(self))]
178-
pub(crate) async fn pull_one_frame(
183+
pub(crate) async fn pull_frames(
179184
&mut self,
180185
generation: u32,
181186
frame_no: u32,
@@ -185,9 +190,10 @@ impl SyncContext {
185190
self.sync_url,
186191
generation,
187192
frame_no,
188-
frame_no + 1
193+
// the server expects the range of [start, end) frames, i.e. end is exclusive
194+
frame_no + self.pull_batch_size
189195
);
190-
tracing::debug!("pulling frame");
196+
tracing::debug!("pulling frame (uri={})", uri);
191197
self.pull_with_retry(uri, self.max_retries).await
192198
}
193199

@@ -417,20 +423,39 @@ impl SyncContext {
417423
.map_err(SyncError::HttpDispatch)?;
418424

419425
if res.status().is_success() {
420-
let frame = hyper::body::to_bytes(res.into_body())
426+
let frames = hyper::body::to_bytes(res.into_body())
421427
.await
422428
.map_err(SyncError::HttpBody)?;
423-
return Ok(PullResult::Frame(frame));
429+
// a success result should always return some frames
430+
if frames.is_empty() {
431+
tracing::error!("server returned empty frames in pull response");
432+
return Err(SyncError::InvalidPullFrameBytes(0).into());
433+
}
434+
// the minimum payload size cannot be less than a single frame
435+
if frames.len() < FRAME_SIZE {
436+
tracing::error!(
437+
"server returned frames with invalid length: {} < {}",
438+
frames.len(),
439+
FRAME_SIZE
440+
);
441+
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
442+
}
443+
return Ok(PullResult::Frames(frames));
424444
}
425445
// BUG ALERT: The server returns a 500 error if the remote database is empty.
426446
// This is a bug and should be fixed.
427447
if res.status() == StatusCode::BAD_REQUEST
428448
|| res.status() == StatusCode::INTERNAL_SERVER_ERROR
429449
{
450+
let status = res.status();
430451
let res_body = hyper::body::to_bytes(res.into_body())
431452
.await
432453
.map_err(SyncError::HttpBody)?;
433-
454+
tracing::trace!(
455+
"server returned: {} body: {}",
456+
status,
457+
String::from_utf8_lossy(&res_body[..])
458+
);
434459
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
435460
.map_err(SyncError::JsonDecode)?;
436461

@@ -650,22 +675,34 @@ impl SyncContext {
650675

651676
let req = req.body(Body::empty()).expect("valid request");
652677

653-
let res = self
654-
.client
655-
.request(req)
656-
.await
657-
.map_err(SyncError::HttpDispatch)?;
678+
let (res, http_duration) =
679+
crate::replication::remote_client::time(self.client.request(req)).await;
680+
let res = res.map_err(SyncError::HttpDispatch)?;
658681

659682
if !res.status().is_success() {
660683
let status = res.status();
661684
let body = hyper::body::to_bytes(res.into_body())
662685
.await
663686
.map_err(SyncError::HttpBody)?;
687+
tracing::error!(
688+
"failed to pull db file from remote server, status={}, body={}, url={}, duration={:?}",
689+
status,
690+
String::from_utf8_lossy(&body),
691+
uri,
692+
http_duration
693+
);
664694
return Err(
665695
SyncError::PullFrame(status, String::from_utf8_lossy(&body).to_string()).into(),
666696
);
667697
}
668698

699+
tracing::debug!(
700+
"pulled db file from remote server, status={}, url={}, duration={:?}",
701+
res.status(),
702+
uri,
703+
http_duration
704+
);
705+
669706
// todo: do streaming write to the disk
670707
let bytes = hyper::body::to_bytes(res.into_body())
671708
.await
@@ -887,6 +924,11 @@ async fn try_push(
887924
})
888925
}
889926

927+
/// PAGE_SIZE used by the sync / diskless server
928+
const PAGE_SIZE: usize = 4096;
929+
const FRAME_HEADER_SIZE: usize = 24;
930+
const FRAME_SIZE: usize = PAGE_SIZE + FRAME_HEADER_SIZE;
931+
890932
pub async fn try_pull(
891933
sync_ctx: &mut SyncContext,
892934
conn: &Connection,
@@ -898,10 +940,32 @@ pub async fn try_pull(
898940
loop {
899941
let generation = sync_ctx.durable_generation();
900942
let frame_no = sync_ctx.durable_frame_num() + 1;
901-
match sync_ctx.pull_one_frame(generation, frame_no).await {
902-
Ok(PullResult::Frame(frame)) => {
903-
insert_handle.insert(&frame)?;
904-
sync_ctx.durable_frame_num = frame_no;
943+
match sync_ctx.pull_frames(generation, frame_no).await {
944+
Ok(PullResult::Frames(frames)) => {
945+
tracing::debug!(
946+
"pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}",
947+
generation, frame_no, frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(),
948+
);
949+
if frames.len() % FRAME_SIZE != 0 {
950+
tracing::error!(
951+
"frame size {} is not a multiple of the expected size {}",
952+
frames.len(),
953+
FRAME_SIZE,
954+
);
955+
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
956+
}
957+
for chunk in frames.chunks(FRAME_SIZE) {
958+
let r = insert_handle.insert(&chunk);
959+
if let Err(e) = r {
960+
tracing::error!(
961+
"insert error (frame= {}) : {:?}",
962+
sync_ctx.durable_frame_num + 1,
963+
e
964+
);
965+
return Err(e);
966+
}
967+
sync_ctx.durable_frame_num += 1;
968+
}
905969
}
906970
Ok(PullResult::EndOfGeneration { max_generation }) => {
907971
// If there are no more generations to pull, we're done.
@@ -920,7 +984,7 @@ pub async fn try_pull(
920984
insert_handle.begin()?;
921985
}
922986
Err(e) => {
923-
tracing::debug!("pull_one_frame error: {:?}", e);
987+
tracing::debug!("pull_frames error: {:?}", e);
924988
err.replace(e);
925989
break;
926990
}

0 commit comments

Comments
 (0)