Skip to content

Commit 263fbad

Browse files
author
Piotr Jastrzebski
committed
Turn push_one_frame to push_frames
Now it can push multiple frames at a time but we still push just one at each call. Signed-off-by: Piotr Jastrzebski <[email protected]>
1 parent 7aad3e6 commit 263fbad

File tree

2 files changed

+20
-19
lines changed

2 files changed

+20
-19
lines changed

libsql/src/sync.rs

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,25 +134,26 @@ impl SyncContext {
134134
self.pull_with_retry(uri, self.max_retries).await
135135
}
136136

137-
#[tracing::instrument(skip(self, frame))]
138-
pub(crate) async fn push_one_frame(
137+
#[tracing::instrument(skip(self, frames))]
138+
pub(crate) async fn push_frames(
139139
&mut self,
140-
frame: Bytes,
140+
frames: Bytes,
141141
generation: u32,
142142
frame_no: u32,
143+
frames_count: u32,
143144
) -> Result<u32> {
144145
let uri = format!(
145146
"{}/sync/{}/{}/{}",
146147
self.sync_url,
147148
generation,
148149
frame_no,
149-
frame_no + 1
150+
frame_no + frames_count
150151
);
151152
tracing::debug!("pushing frame");
152153

153-
let (generation, durable_frame_num) = self.push_with_retry(uri, frame, self.max_retries).await?;
154+
let (generation, durable_frame_num) = self.push_with_retry(uri, frames, self.max_retries).await?;
154155

155-
if durable_frame_num > frame_no {
156+
if durable_frame_num > frame_no + frames_count - 1 {
156157
tracing::error!(
157158
"server returned durable_frame_num larger than what we sent: sent={}, got={}",
158159
frame_no,
@@ -162,7 +163,7 @@ impl SyncContext {
162163
return Err(SyncError::InvalidPushFrameNoHigh(frame_no, durable_frame_num).into());
163164
}
164165

165-
if durable_frame_num < frame_no {
166+
if durable_frame_num < frame_no + frames_count - 1 {
166167
// Update our knowledge of where the server is at frame wise.
167168
self.durable_frame_num = durable_frame_num;
168169

@@ -186,7 +187,7 @@ impl SyncContext {
186187
Ok(durable_frame_num)
187188
}
188189

189-
async fn push_with_retry(&self, uri: String, frame: Bytes, max_retries: usize) -> Result<(u32, u32)> {
190+
async fn push_with_retry(&self, uri: String, body: Bytes, max_retries: usize) -> Result<(u32, u32)> {
190191
let mut nr_retries = 0;
191192
loop {
192193
let mut req = http::Request::post(uri.clone());
@@ -200,7 +201,7 @@ impl SyncContext {
200201
None => {}
201202
}
202203

203-
let req = req.body(frame.clone().into()).expect("valid body");
204+
let req = req.body(body.clone().into()).expect("valid body");
204205

205206
let res = self
206207
.client
@@ -543,7 +544,7 @@ async fn try_push(
543544
// frames the server already knows about, we need to update the
544545
// frame number to the one returned by the server.
545546
let max_frame_no = sync_ctx
546-
.push_one_frame(frame.freeze(), generation, frame_no)
547+
.push_frames(frame.freeze(), generation, frame_no, 1)
547548
.await?;
548549

549550
if max_frame_no > frame_no {

libsql/src/sync/test.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ async fn test_sync_context_push_frame() {
2828
let mut sync_ctx = sync_ctx;
2929

3030
// Push a frame and verify the response
31-
let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap();
31+
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
3232
sync_ctx.write_metadata().await.unwrap();
3333
assert_eq!(durable_frame, 0); // First frame should return max_frame_no = 0
3434

@@ -56,7 +56,7 @@ async fn test_sync_context_with_auth() {
5656
let frame = Bytes::from("test frame with auth");
5757
let mut sync_ctx = sync_ctx;
5858

59-
let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap();
59+
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
6060
sync_ctx.write_metadata().await.unwrap();
6161
assert_eq!(durable_frame, 0);
6262
assert_eq!(server.frame_count(), 1);
@@ -82,7 +82,7 @@ async fn test_sync_context_multiple_frames() {
8282
// Push multiple frames and verify incrementing frame numbers
8383
for i in 0..3 {
8484
let frame = Bytes::from(format!("frame data {}", i));
85-
let durable_frame = sync_ctx.push_one_frame(frame, 1, i).await.unwrap();
85+
let durable_frame = sync_ctx.push_frames(frame, 1, i, 1).await.unwrap();
8686
sync_ctx.write_metadata().await.unwrap();
8787
assert_eq!(durable_frame, i);
8888
assert_eq!(sync_ctx.durable_frame_num(), i);
@@ -108,7 +108,7 @@ async fn test_sync_context_corrupted_metadata() {
108108

109109
let mut sync_ctx = sync_ctx;
110110
let frame = Bytes::from("test frame data");
111-
let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap();
111+
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
112112
sync_ctx.write_metadata().await.unwrap();
113113
assert_eq!(durable_frame, 0);
114114
assert_eq!(server.frame_count(), 1);
@@ -152,7 +152,7 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
152152

153153
let mut sync_ctx = sync_ctx;
154154
let frame = Bytes::from("test frame data");
155-
let durable_frame = sync_ctx.push_one_frame(frame.clone(), 1, 0).await.unwrap();
155+
let durable_frame = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await.unwrap();
156156
sync_ctx.write_metadata().await.unwrap();
157157
assert_eq!(durable_frame, 0);
158158
assert_eq!(server.frame_count(), 1);
@@ -180,14 +180,14 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
180180
// This push should fail because we are ahead of the server and thus should get an invalid
181181
// frame no error.
182182
sync_ctx
183-
.push_one_frame(frame.clone(), 1, frame_no)
183+
.push_frames(frame.clone(), 1, frame_no, 1)
184184
.await
185185
.unwrap_err();
186186

187187
let frame_no = sync_ctx.durable_frame_num() + 1;
188188
// This then should work because when the last one failed it updated our state of the server
189189
// durable_frame_num and we should then start writing from there.
190-
sync_ctx.push_one_frame(frame, 1, frame_no).await.unwrap();
190+
sync_ctx.push_frames(frame, 1, frame_no, 1).await.unwrap();
191191
}
192192

193193
#[tokio::test]
@@ -215,7 +215,7 @@ async fn test_sync_context_retry_on_error() {
215215
server.return_error.store(true, Ordering::SeqCst);
216216

217217
// First attempt should fail but retry
218-
let result = sync_ctx.push_one_frame(frame.clone(), 1, 0).await;
218+
let result = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await;
219219
assert!(result.is_err());
220220

221221
// Advance time to trigger retries faster
@@ -228,7 +228,7 @@ async fn test_sync_context_retry_on_error() {
228228
server.return_error.store(false, Ordering::SeqCst);
229229

230230
// Next attempt should succeed
231-
let durable_frame = sync_ctx.push_one_frame(frame, 1, 0).await.unwrap();
231+
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
232232
sync_ctx.write_metadata().await.unwrap();
233233
assert_eq!(durable_frame, 0);
234234
assert_eq!(server.frame_count(), 1);

0 commit comments

Comments
 (0)