Skip to content

Commit dcc70b6

Browse files
authored
Fix durable frame tracking to follow transaction boundaries (#2129)
Pull logic used invalid property of WAL in its implementation and was written under assumption that any frames written to WAL are durable. This is not true in case of frames written for unfinished transaction, because such frames will be omitted by any SQLite connection except from the one which created them. So, in case of restart - we will "loose" these frames, but `durable_frame_num` will incorrectly state that we already written "missed" portion of WAL. This PR fix this issue and make pull to persist information about "durable_frame_num" only at transaction boundaries.
2 parents e3c73cf + 57043b6 commit dcc70b6

File tree

2 files changed

+61
-42
lines changed

2 files changed

+61
-42
lines changed

libsql/src/local/connection.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,10 @@ impl WalInsertHandle<'_> {
675675
self.conn.wal_insert_frame(frame_no, frame)
676676
}
677677

678+
pub fn in_session(&self) -> bool {
679+
*self.in_session.borrow()
680+
}
681+
678682
pub fn begin(&self) -> Result<()> {
679683
assert!(!*self.in_session.borrow());
680684
self.conn.wal_insert_begin()?;

libsql/src/sync.rs

Lines changed: 57 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use hyper::Body;
88
use std::path::Path;
99
use tokio::io::AsyncWriteExt as _;
1010
use uuid::Uuid;
11+
use zerocopy::big_endian;
1112

1213
#[cfg(test)]
1314
mod test;
@@ -953,18 +954,24 @@ pub async fn try_pull(
953954
sync_ctx: &mut SyncContext,
954955
conn: &Connection,
955956
) -> Result<crate::database::Replicated> {
956-
let insert_handle = conn.wal_insert_handle()?;
957+
// note, that updates of durable_frame_num are valid only after SQLite commited the WAL
958+
// (because if WAL has uncommited suffix - it will be omitted by any other SQLite connection - for example after restart)
959+
// so, try_pull maintains local next_frame_no during the pull operation and update durable_frame_num when it's appropriate
960+
let mut next_frame_no = sync_ctx.durable_frame_num + 1;
957961

958-
let mut err = None;
962+
// libsql maintain consistent state about WAL sync session locally in the insert_handle
963+
// note, that insert_handle will always close the session on drop - so we never keep active WAL session after we exit from the method
964+
let insert_handle = conn.wal_insert_handle()?;
959965

960966
loop {
967+
// get current generation (it may be updated multiple times during execution)
961968
let generation = sync_ctx.durable_generation();
962-
let frame_no = sync_ctx.durable_frame_num() + 1;
963-
match sync_ctx.pull_frames(generation, frame_no).await {
969+
970+
match sync_ctx.pull_frames(generation, next_frame_no).await {
964971
Ok(PullResult::Frames(frames)) => {
965972
tracing::debug!(
966973
"pull_frames: generation={}, start_frame={} (end_frame={}, batch_size={}), frames_size={}",
967-
generation, frame_no, frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(),
974+
generation, next_frame_no, next_frame_no + sync_ctx.pull_batch_size, sync_ctx.pull_batch_size, frames.len(),
968975
);
969976
if frames.len() % FRAME_SIZE != 0 {
970977
tracing::error!(
@@ -974,63 +981,71 @@ pub async fn try_pull(
974981
);
975982
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
976983
}
977-
for (i, chunk) in frames.chunks(FRAME_SIZE).enumerate() {
978-
let r = insert_handle.insert_at(frame_no + i as u32, &chunk);
979-
if let Err(e) = r {
980-
tracing::error!(
981-
"insert error (frame= {}) : {:?}",
982-
sync_ctx.durable_frame_num + 1,
983-
e
984+
for chunk in frames.chunks(FRAME_SIZE) {
985+
let mut size_after_buf = [0u8; 4];
986+
size_after_buf.copy_from_slice(&chunk[4..8]);
987+
let size_after = big_endian::U32::from_bytes(size_after_buf);
988+
// start WAL sync session if it was closed
989+
// (this can happen if on previous iteration client received commit frame)
990+
if !insert_handle.in_session() {
991+
tracing::debug!(
992+
"pull_frames: generation={}, frame={}, start wal transaction session",
993+
generation,
994+
next_frame_no
984995
);
996+
insert_handle.begin()?;
997+
}
998+
let result = insert_handle.insert_at(next_frame_no, &chunk);
999+
if let Err(e) = result {
1000+
tracing::error!("insert error (frame={}) : {:?}", next_frame_no, e);
9851001
return Err(e);
9861002
}
987-
sync_ctx.durable_frame_num += 1;
1003+
// if this is commit frame - we can close WAL sync session and update durable_frame_num
1004+
if size_after.get() > 0 {
1005+
tracing::debug!(
1006+
"pull_frames: generation={}, frame={}, finish wal transaction session, size_after={}",
1007+
generation,
1008+
next_frame_no,
1009+
size_after.get()
1010+
);
1011+
insert_handle.end()?;
1012+
sync_ctx.durable_frame_num = next_frame_no;
1013+
sync_ctx.write_metadata().await?;
1014+
}
1015+
1016+
next_frame_no += 1;
9881017
}
9891018
}
9901019
Ok(PullResult::EndOfGeneration { max_generation }) => {
9911020
// If there are no more generations to pull, we're done.
9921021
if generation >= max_generation {
9931022
break;
9941023
}
995-
insert_handle.end()?;
996-
sync_ctx.write_metadata().await?;
1024+
assert!(
1025+
!insert_handle.in_session(),
1026+
"WAL transaction must be finished"
1027+
);
9971028

1029+
tracing::debug!(
1030+
"pull_frames: generation={}, frame={}, checkpoint in order to move to next generation",
1031+
generation,
1032+
next_frame_no
1033+
);
9981034
// TODO: Make this crash-proof.
9991035
conn.wal_checkpoint(true)?;
10001036

10011037
sync_ctx.next_generation();
10021038
sync_ctx.write_metadata().await?;
1003-
1004-
insert_handle.begin()?;
1005-
}
1006-
Err(e) => {
1007-
tracing::debug!("pull_frames error: {:?}", e);
1008-
err.replace(e);
1009-
break;
1039+
next_frame_no = 1;
10101040
}
1041+
Err(e) => return Err(e),
10111042
}
10121043
}
1013-
// This is crash-proof because we:
1014-
//
1015-
// 1. Write WAL frame first
1016-
// 2. Write new max frame to temporary metadata
1017-
// 3. Atomically rename the temporary metadata to the real metadata
1018-
//
1019-
// If we crash before metadata rename completes, the old metadata still
1020-
// points to last successful frame, allowing safe retry from that point.
1021-
// If we happen to have the frame already in the WAL, it's fine to re-pull
1022-
// because append locally is idempotent.
1023-
insert_handle.end()?;
1024-
sync_ctx.write_metadata().await?;
10251044

1026-
if let Some(err) = err {
1027-
Err(err)
1028-
} else {
1029-
Ok(crate::database::Replicated {
1030-
frame_no: None,
1031-
frames_synced: 1,
1032-
})
1033-
}
1045+
Ok(crate::database::Replicated {
1046+
frame_no: None,
1047+
frames_synced: 1,
1048+
})
10341049
}
10351050

10361051
fn check_if_file_exists(path: &str) -> core::result::Result<bool, SyncError> {

0 commit comments

Comments
 (0)