Skip to content

Commit 3d149d6

Browse files
authored
Fix large sync sessions by reusing the same session (#2036)
Currently, we have a bug where each `push` request creates a new sync session in the server. Each push request can send up to 128 frames. However, if a transaction spans more than 128 frames, then it needs to reuse the same sync session. Otherwise, the first push will succeed, but the rest of the pushes will fail due to the write lock held by the previous session. This patch reuses the sync session. The way we do this is by using batons. If the server sends a baton, then we pass it back. Reusing the same baton ensures that we are using the same sync session.
2 parents 034aead + 15c924d commit 3d149d6

File tree

2 files changed

+81
-27
lines changed

2 files changed

+81
-27
lines changed

libsql/src/sync.rs

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub struct PushResult {
7878
status: PushStatus,
7979
generation: u32,
8080
max_frame_no: u32,
81+
baton: Option<String>,
8182
}
8283

8384
pub enum PushStatus {
@@ -97,6 +98,12 @@ struct InfoResult {
9798
current_generation: u32,
9899
}
99100

101+
#[derive(Debug)]
102+
struct PushFramesResult {
103+
max_frame_no: u32,
104+
baton: Option<String>,
105+
}
106+
100107
pub struct SyncContext {
101108
db_path: String,
102109
client: hyper::Client<ConnectorService, Body>,
@@ -180,15 +187,30 @@ impl SyncContext {
180187
generation: u32,
181188
frame_no: u32,
182189
frames_count: u32,
183-
) -> Result<u32> {
184-
let uri = format!(
185-
"{}/sync/{}/{}/{}",
186-
self.sync_url,
187-
generation,
190+
baton: Option<String>,
191+
) -> Result<PushFramesResult> {
192+
let uri = {
193+
let mut uri = format!(
194+
"{}/sync/{}/{}/{}",
195+
self.sync_url,
196+
generation,
197+
frame_no,
198+
frame_no + frames_count
199+
);
200+
if let Some(ref baton) = baton {
201+
uri.push_str(&format!("/{}", baton));
202+
}
203+
uri
204+
};
205+
206+
tracing::debug!(
207+
"pushing frame(frame_no={} (to={}), count={}, generation={}, baton={:?})",
188208
frame_no,
189-
frame_no + frames_count
209+
frame_no + frames_count,
210+
frames_count,
211+
generation,
212+
baton
190213
);
191-
tracing::debug!("pushing frame(frame_no={}, count={}, generation={})", frame_no, frames_count, generation);
192214

193215
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
194216

@@ -200,6 +222,7 @@ impl SyncContext {
200222
}
201223
let generation = result.generation;
202224
let durable_frame_num = result.max_frame_no;
225+
let baton = result.baton;
203226

204227
if durable_frame_num > frame_no + frames_count - 1 {
205228
tracing::error!(
@@ -232,7 +255,10 @@ impl SyncContext {
232255
self.durable_generation = generation;
233256
self.durable_frame_num = durable_frame_num;
234257

235-
Ok(durable_frame_num)
258+
Ok(PushFramesResult {
259+
max_frame_no: durable_frame_num,
260+
baton,
261+
})
236262
}
237263

238264
async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<PushResult> {
@@ -289,14 +315,32 @@ impl SyncContext {
289315
.as_u64()
290316
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;
291317

318+
let baton = resp
319+
.get("baton")
320+
.and_then(|v| v.as_str())
321+
.map(|s| s.to_string());
322+
323+
tracing::trace!(
324+
?baton,
325+
?generation,
326+
?max_frame_no,
327+
?status,
328+
"pushed frame to server"
329+
);
330+
292331
let status = match status {
293332
"ok" => PushStatus::Ok,
294333
"conflict" => PushStatus::Conflict,
295334
_ => return Err(SyncError::JsonValue(resp.clone()).into()),
296335
};
297-
let generation = generation as u32;
336+
let generation = generation as u32;
298337
let max_frame_no = max_frame_no as u32;
299-
return Ok(PushResult { status, generation, max_frame_no });
338+
return Ok(PushResult {
339+
status,
340+
generation,
341+
max_frame_no,
342+
baton,
343+
});
300344
}
301345

302346
if res.status().is_redirection() {
@@ -778,6 +822,7 @@ async fn try_push(
778822
let generation = sync_ctx.durable_generation();
779823
let start_frame_no = sync_ctx.durable_frame_num() + 1;
780824
let end_frame_no = max_frame_no;
825+
let mut baton = None;
781826

782827
let mut frame_no = start_frame_no;
783828
while frame_no <= end_frame_no {
@@ -794,9 +839,12 @@ async fn try_push(
794839
// The server returns its maximum frame number. To avoid resending
795840
// frames the server already knows about, we need to update the
796841
// frame number to the one returned by the server.
797-
let max_frame_no = sync_ctx
798-
.push_frames(frames.freeze(), generation, frame_no, batch_size)
842+
let result = sync_ctx
843+
.push_frames(frames.freeze(), generation, frame_no, batch_size, baton)
799844
.await?;
845+
// if the server sent us a baton, then we will reuse it for the next request
846+
baton = result.baton;
847+
let max_frame_no = result.max_frame_no;
800848

801849
if max_frame_no > frame_no {
802850
frame_no = max_frame_no + 1;

libsql/src/sync/test.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ async fn test_sync_context_push_frame() {
2828
let mut sync_ctx = sync_ctx;
2929

3030
// Push a frame and verify the response
31-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
31+
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap();
3232
sync_ctx.write_metadata().await.unwrap();
33-
assert_eq!(durable_frame, 0); // First frame should return max_frame_no = 0
33+
assert_eq!(durable_frame.max_frame_no, 0); // First frame should return max_frame_no = 0
3434

3535
// Verify internal state was updated
3636
assert_eq!(sync_ctx.durable_frame_num(), 0);
@@ -56,9 +56,9 @@ async fn test_sync_context_with_auth() {
5656
let frame = Bytes::from("test frame with auth");
5757
let mut sync_ctx = sync_ctx;
5858

59-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
59+
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap();
6060
sync_ctx.write_metadata().await.unwrap();
61-
assert_eq!(durable_frame, 0);
61+
assert_eq!(durable_frame.max_frame_no, 0);
6262
assert_eq!(server.frame_count(), 1);
6363
}
6464

@@ -82,9 +82,9 @@ async fn test_sync_context_multiple_frames() {
8282
// Push multiple frames and verify incrementing frame numbers
8383
for i in 0..3 {
8484
let frame = Bytes::from(format!("frame data {}", i));
85-
let durable_frame = sync_ctx.push_frames(frame, 1, i, 1).await.unwrap();
85+
let durable_frame = sync_ctx.push_frames(frame, 1, i, 1, None).await.unwrap();
8686
sync_ctx.write_metadata().await.unwrap();
87-
assert_eq!(durable_frame, i);
87+
assert_eq!(durable_frame.max_frame_no, i);
8888
assert_eq!(sync_ctx.durable_frame_num(), i);
8989
assert_eq!(server.frame_count(), i + 1);
9090
}
@@ -108,9 +108,9 @@ async fn test_sync_context_corrupted_metadata() {
108108

109109
let mut sync_ctx = sync_ctx;
110110
let frame = Bytes::from("test frame data");
111-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
111+
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap();
112112
sync_ctx.write_metadata().await.unwrap();
113-
assert_eq!(durable_frame, 0);
113+
assert_eq!(durable_frame.max_frame_no, 0);
114114
assert_eq!(server.frame_count(), 1);
115115

116116
// Update metadata path to use -info instead of .meta
@@ -152,9 +152,12 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
152152

153153
let mut sync_ctx = sync_ctx;
154154
let frame = Bytes::from("test frame data");
155-
let durable_frame = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await.unwrap();
155+
let durable_frame = sync_ctx
156+
.push_frames(frame.clone(), 1, 0, 1, None)
157+
.await
158+
.unwrap();
156159
sync_ctx.write_metadata().await.unwrap();
157-
assert_eq!(durable_frame, 0);
160+
assert_eq!(durable_frame.max_frame_no, 0);
158161
assert_eq!(server.frame_count(), 1);
159162

160163
// Bump the durable frame num so that the next time we call the
@@ -180,14 +183,17 @@ async fn test_sync_restarts_with_lower_max_frame_no() {
180183
// This push should fail because we are ahead of the server and thus should get an invalid
181184
// frame no error.
182185
sync_ctx
183-
.push_frames(frame.clone(), 1, frame_no, 1)
186+
.push_frames(frame.clone(), 1, frame_no, 1, None)
184187
.await
185188
.unwrap_err();
186189

187190
let frame_no = sync_ctx.durable_frame_num() + 1;
188191
// This then should work because when the last one failed it updated our state of the server
189192
// durable_frame_num and we should then start writing from there.
190-
sync_ctx.push_frames(frame, 1, frame_no, 1).await.unwrap();
193+
sync_ctx
194+
.push_frames(frame, 1, frame_no, 1, None)
195+
.await
196+
.unwrap();
191197
}
192198

193199
#[tokio::test]
@@ -215,7 +221,7 @@ async fn test_sync_context_retry_on_error() {
215221
server.return_error.store(true, Ordering::SeqCst);
216222

217223
// First attempt should fail but retry
218-
let result = sync_ctx.push_frames(frame.clone(), 1, 0, 1).await;
224+
let result = sync_ctx.push_frames(frame.clone(), 1, 0, 1, None).await;
219225
assert!(result.is_err());
220226

221227
// Advance time to trigger retries faster
@@ -228,9 +234,9 @@ async fn test_sync_context_retry_on_error() {
228234
server.return_error.store(false, Ordering::SeqCst);
229235

230236
// Next attempt should succeed
231-
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1).await.unwrap();
237+
let durable_frame = sync_ctx.push_frames(frame, 1, 0, 1, None).await.unwrap();
232238
sync_ctx.write_metadata().await.unwrap();
233-
assert_eq!(durable_frame, 0);
239+
assert_eq!(durable_frame.max_frame_no, 0);
234240
assert_eq!(server.frame_count(), 1);
235241
}
236242

0 commit comments

Comments
 (0)