Skip to content

Commit 7ecd8c8

Browse files
committed
Reuse the same sync session
Currently, we have a bug where each `push` request creates a new sync session in the server. Each push request can send up to 128 frames. However, if a transaction spans more than 128 frames, then it needs to reuse the same sync session. Otherwise, the first push will succeed, but the rest of the pushes will fail due to the write lock held by the previous session. This patch reuses the sync session. The way we do this is by using batons. If the server sends a baton, then we pass it back. Reusing the same baton ensures that we are using the same sync session.
1 parent bbeabc9 commit 7ecd8c8

File tree

1 file changed

+59
-12
lines changed

1 file changed

+59
-12
lines changed

libsql/src/sync.rs

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub struct PushResult {
7878
status: PushStatus,
7979
generation: u32,
8080
max_frame_no: u32,
81+
baton: Option<String>,
8182
}
8283

8384
pub enum PushStatus {
@@ -97,6 +98,11 @@ struct InfoResult {
9798
current_generation: u32,
9899
}
99100

101+
struct PushFramesResult {
102+
max_frame_no: u32,
103+
baton: Option<String>,
104+
}
105+
100106
pub struct SyncContext {
101107
db_path: String,
102108
client: hyper::Client<ConnectorService, Body>,
@@ -180,15 +186,30 @@ impl SyncContext {
180186
generation: u32,
181187
frame_no: u32,
182188
frames_count: u32,
183-
) -> Result<u32> {
184-
let uri = format!(
185-
"{}/sync/{}/{}/{}",
186-
self.sync_url,
187-
generation,
189+
baton: Option<String>,
190+
) -> Result<PushFramesResult> {
191+
let uri = {
192+
let mut uri = format!(
193+
"{}/sync/{}/{}/{}",
194+
self.sync_url,
195+
generation,
196+
frame_no,
197+
frame_no + frames_count
198+
);
199+
if let Some(ref baton) = baton {
200+
uri.push_str(&format!("/{}", baton));
201+
}
202+
uri
203+
};
204+
205+
tracing::debug!(
206+
"pushing frame(frame_no={} (to={}), count={}, generation={}, baton={:?})",
188207
frame_no,
189-
frame_no + frames_count
208+
frame_no + frames_count,
209+
frames_count,
210+
generation,
211+
baton
190212
);
191-
tracing::debug!("pushing frame(frame_no={}, count={}, generation={})", frame_no, frames_count, generation);
192213

193214
let result = self.push_with_retry(uri, frames, self.max_retries).await?;
194215

@@ -200,6 +221,7 @@ impl SyncContext {
200221
}
201222
let generation = result.generation;
202223
let durable_frame_num = result.max_frame_no;
224+
let baton = result.baton;
203225

204226
if durable_frame_num > frame_no + frames_count - 1 {
205227
tracing::error!(
@@ -232,7 +254,10 @@ impl SyncContext {
232254
self.durable_generation = generation;
233255
self.durable_frame_num = durable_frame_num;
234256

235-
Ok(durable_frame_num)
257+
Ok(PushFramesResult {
258+
max_frame_no: durable_frame_num,
259+
baton,
260+
})
236261
}
237262

238263
async fn push_with_retry(&self, mut uri: String, body: Bytes, max_retries: usize) -> Result<PushResult> {
@@ -289,14 +314,32 @@ impl SyncContext {
289314
.as_u64()
290315
.ok_or_else(|| SyncError::JsonValue(max_frame_no.clone()))?;
291316

317+
let baton = resp
318+
.get("baton")
319+
.and_then(|v| v.as_str())
320+
.map(|s| s.to_string());
321+
322+
tracing::trace!(
323+
?baton,
324+
?generation,
325+
?max_frame_no,
326+
?status,
327+
"pushed frame to server"
328+
);
329+
292330
let status = match status {
293331
"ok" => PushStatus::Ok,
294332
"conflict" => PushStatus::Conflict,
295333
_ => return Err(SyncError::JsonValue(resp.clone()).into()),
296334
};
297-
let generation = generation as u32;
335+
let generation = generation as u32;
298336
let max_frame_no = max_frame_no as u32;
299-
return Ok(PushResult { status, generation, max_frame_no });
337+
return Ok(PushResult {
338+
status,
339+
generation,
340+
max_frame_no,
341+
baton,
342+
});
300343
}
301344

302345
if res.status().is_redirection() {
@@ -778,6 +821,7 @@ async fn try_push(
778821
let generation = sync_ctx.durable_generation();
779822
let start_frame_no = sync_ctx.durable_frame_num() + 1;
780823
let end_frame_no = max_frame_no;
824+
let mut baton = None;
781825

782826
let mut frame_no = start_frame_no;
783827
while frame_no <= end_frame_no {
@@ -794,9 +838,12 @@ async fn try_push(
794838
// The server returns its maximum frame number. To avoid resending
795839
// frames the server already knows about, we need to update the
796840
// frame number to the one returned by the server.
797-
let max_frame_no = sync_ctx
798-
.push_frames(frames.freeze(), generation, frame_no, batch_size)
841+
let result = sync_ctx
842+
.push_frames(frames.freeze(), generation, frame_no, batch_size, baton)
799843
.await?;
844+
// if the server sent us a baton, then we will reuse it for the next request
845+
baton = result.baton;
846+
let max_frame_no = result.max_frame_no;
800847

801848
if max_frame_no > frame_no {
802849
frame_no = max_frame_no + 1;

0 commit comments

Comments
 (0)