Skip to content

Commit 57043b6

Browse files
committed
make pull logic more correct
1 parent e3c73cf commit 57043b6

File tree

2 files changed

+61
-42
lines changed

2 files changed

+61
-42
lines changed

libsql/src/local/connection.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,10 @@ impl WalInsertHandle<'_> {
675675
self.conn.wal_insert_frame(frame_no, frame)
676676
}
677677

678+
pub fn in_session(&self) -> bool {
679+
*self.in_session.borrow()
680+
}
681+
678682
pub fn begin(&self) -> Result<()> {
679683
assert!(!*self.in_session.borrow());
680684
self.conn.wal_insert_begin()?;

libsql/src/sync.rs

Lines changed: 57 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use hyper::Body;
88
use std::path::Path;
99
use tokio::io::AsyncWriteExt as _;
1010
use uuid::Uuid;
11+
use zerocopy::big_endian;
1112

1213
#[cfg(test)]
1314
mod test;
@@ -953,18 +954,24 @@ pub async fn try_pull(
953954
sync_ctx: &mut SyncContext,
954955
conn: &Connection,
955956
) -> Result<crate::database::Replicated> {
956-
let insert_handle = conn.wal_insert_handle()?;
957+
// note, that updates of durable_frame_num are valid only after SQLite commited the WAL
958+
// (because if WAL has uncommited suffix - it will be omitted by any other SQLite connection - for example after restart)
959+
// so, try_pull maintains local next_frame_no during the pull operation and update durable_frame_num when it's appropriate
960+
let mut next_frame_no = sync_ctx.durable_frame_num + 1;
957961

958-
let mut err = None;
962+
// libsql maintain consistent state about WAL sync session locally in the insert_handle
963+
// note, that insert_handle will always close the session on drop - so we never keep active WAL session after we exit from the method
964+
let insert_handle = conn.wal_insert_handle()?;
959965

960966
loop {
967+
// get current generation (it may be updated multiple times during execution)
961968
let generation = sync_ctx.durable_generation();
962-
let frame_no = sync_ctx.durable_frame_num() + 1;
963-
match sync_ctx.pull_frames(generation, frame_no).await {
969+
970+
match sync_ctx.pull_frames(generation, next_frame_no).await {
964971
Ok(PullResult::Frames(frames)) => {
965972
tracing::debug!(
966973
"pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}",
967-
generation, frame_no, frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(),
974+
generation, next_frame_no, next_frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(),
968975
);
969976
if frames.len() % FRAME_SIZE != 0 {
970977
tracing::error!(
@@ -974,63 +981,71 @@ pub async fn try_pull(
974981
);
975982
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
976983
}
977-
for (i, chunk) in frames.chunks(FRAME_SIZE).enumerate() {
978-
let r = insert_handle.insert_at(frame_no + i as u32, &chunk);
979-
if let Err(e) = r {
980-
tracing::error!(
981-
"insert error (frame= {}) : {:?}",
982-
sync_ctx.durable_frame_num + 1,
983-
e
984+
for chunk in frames.chunks(FRAME_SIZE) {
985+
let mut size_after_buf = [0u8; 4];
986+
size_after_buf.copy_from_slice(&chunk[4..8]);
987+
let size_after = big_endian::U32::from_bytes(size_after_buf);
988+
// start WAL sync session if it was closed
989+
// (this can happen if on previous iteration client received commit frame)
990+
if !insert_handle.in_session() {
991+
tracing::debug!(
992+
"pull_frames: generation={}, frame={}, start wal transaction session",
993+
generation,
994+
next_frame_no
984995
);
996+
insert_handle.begin()?;
997+
}
998+
let result = insert_handle.insert_at(next_frame_no, &chunk);
999+
if let Err(e) = result {
1000+
tracing::error!("insert error (frame={}) : {:?}", next_frame_no, e);
9851001
return Err(e);
9861002
}
987-
sync_ctx.durable_frame_num += 1;
1003+
// if this is commit frame - we can close WAL sync session and update durable_frame_num
1004+
if size_after.get() > 0 {
1005+
tracing::debug!(
1006+
"pull_frames: generation={}, frame={}, finish wal transaction session, size_after={}",
1007+
generation,
1008+
next_frame_no,
1009+
size_after.get()
1010+
);
1011+
insert_handle.end()?;
1012+
sync_ctx.durable_frame_num = next_frame_no;
1013+
sync_ctx.write_metadata().await?;
1014+
}
1015+
1016+
next_frame_no += 1;
9881017
}
9891018
}
9901019
Ok(PullResult::EndOfGeneration { max_generation }) => {
9911020
// If there are no more generations to pull, we're done.
9921021
if generation >= max_generation {
9931022
break;
9941023
}
995-
insert_handle.end()?;
996-
sync_ctx.write_metadata().await?;
1024+
assert!(
1025+
!insert_handle.in_session(),
1026+
"WAL transaction must be finished"
1027+
);
9971028

1029+
tracing::debug!(
1030+
"pull_frames: generation={}, frame={}, checkpoint in order to move to next generation",
1031+
generation,
1032+
next_frame_no
1033+
);
9981034
// TODO: Make this crash-proof.
9991035
conn.wal_checkpoint(true)?;
10001036

10011037
sync_ctx.next_generation();
10021038
sync_ctx.write_metadata().await?;
1003-
1004-
insert_handle.begin()?;
1005-
}
1006-
Err(e) => {
1007-
tracing::debug!("pull_frames error: {:?}", e);
1008-
err.replace(e);
1009-
break;
1039+
next_frame_no = 1;
10101040
}
1041+
Err(e) => return Err(e),
10111042
}
10121043
}
1013-
// This is crash-proof because we:
1014-
//
1015-
// 1. Write WAL frame first
1016-
// 2. Write new max frame to temporary metadata
1017-
// 3. Atomically rename the temporary metadata to the real metadata
1018-
//
1019-
// If we crash before metadata rename completes, the old metadata still
1020-
// points to last successful frame, allowing safe retry from that point.
1021-
// If we happen to have the frame already in the WAL, it's fine to re-pull
1022-
// because append locally is idempotent.
1023-
insert_handle.end()?;
1024-
sync_ctx.write_metadata().await?;
10251044

1026-
if let Some(err) = err {
1027-
Err(err)
1028-
} else {
1029-
Ok(crate::database::Replicated {
1030-
frame_no: None,
1031-
frames_synced: 1,
1032-
})
1033-
}
1045+
Ok(crate::database::Replicated {
1046+
frame_no: None,
1047+
frames_synced: 1,
1048+
})
10341049
}
10351050

10361051
fn check_if_file_exists(path: &str) -> core::result::Result<bool, SyncError> {

0 commit comments

Comments
 (0)