Skip to content

Commit 18bef24

Browse files
authored
Merge pull request #134 from skel84/feat/extract-wal-file
refactor: extract shared wal file substrate
2 parents 079f2fa + f3d15d2 commit 18bef24

File tree

18 files changed

+294
-242
lines changed

18 files changed

+294
-242
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[workspace]
22
members = [
33
"crates/allocdb-retire-queue",
4+
"crates/allocdb-wal-file",
45
"crates/allocdb-wal-frame",
56
"crates/allocdb-bench",
67
"crates/allocdb-core",

crates/allocdb-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ rust-version.workspace = true
66

77
[dependencies]
88
allocdb-retire-queue = { path = "../allocdb-retire-queue" }
9+
allocdb-wal-file = { path = "../allocdb-wal-file" }
910
allocdb-wal-frame = { path = "../allocdb-wal-frame" }
1011
log = "0.4.28"
1112

crates/allocdb-core/src/recovery.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl<E> From<ReplayError> for RecoveryObserverError<E> {
149149
pub fn recover_allocdb(
150150
config: Config,
151151
snapshot_file: &SnapshotFile,
152-
wal_file: &WalFile,
152+
wal_file: &mut WalFile,
153153
) -> Result<RecoveryResult, RecoveryError> {
154154
recover_allocdb_with_observer(
155155
config,
@@ -173,7 +173,7 @@ pub fn recover_allocdb(
173173
pub fn recover_allocdb_with_observer<E, F>(
174174
config: Config,
175175
snapshot_file: &SnapshotFile,
176-
wal_file: &WalFile,
176+
wal_file: &mut WalFile,
177177
mut observer: F,
178178
) -> Result<RecoveryResult, RecoveryObserverError<E>>
179179
where
@@ -203,7 +203,7 @@ where
203203
fn recover_allocdb_impl<E, F>(
204204
config: Config,
205205
snapshot_file: &SnapshotFile,
206-
wal_file: &WalFile,
206+
wal_file: &mut WalFile,
207207
observer: &mut F,
208208
) -> Result<RecoveryResult, RecoveryObserverError<E>>
209209
where

crates/allocdb-core/src/recovery_issue_30_tests.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ fn recover_allocdb_rejects_non_monotonic_lsn() {
7979
.unwrap();
8080
wal.sync().unwrap();
8181

82-
let error = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap_err();
82+
let error =
83+
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err();
8384
assert!(matches!(
8485
error,
8586
RecoveryError::Replay(ReplayError::NonMonotonicLsn {
@@ -126,7 +127,8 @@ fn recover_allocdb_rejects_rewound_request_slot() {
126127
.unwrap();
127128
wal.sync().unwrap();
128129

129-
let error = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap_err();
130+
let error =
131+
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err();
130132
assert!(matches!(
131133
error,
132134
RecoveryError::Replay(ReplayError::RewoundRequestSlot {
@@ -143,7 +145,7 @@ fn recover_allocdb_rejects_rewound_request_slot() {
143145
fn recover_allocdb_rejects_semantically_invalid_snapshot() {
144146
let wal_path = test_path("recover-invalid-snapshot", "wal");
145147
let snapshot_path = test_path("recover-invalid-snapshot", "snapshot");
146-
let wal = WalFile::open(&wal_path, 512).unwrap();
148+
let mut wal = WalFile::open(&wal_path, 512).unwrap();
147149
let snapshot_file = SnapshotFile::new(&snapshot_path);
148150

149151
snapshot_file
@@ -172,7 +174,7 @@ fn recover_allocdb_rejects_semantically_invalid_snapshot() {
172174
})
173175
.unwrap();
174176

175-
let error = recover_allocdb(config(), &snapshot_file, &wal).unwrap_err();
177+
let error = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap_err();
176178
assert!(matches!(
177179
error,
178180
RecoveryError::Snapshot(SnapshotError::DuplicateResourceId(ResourceId(11)))

crates/allocdb-core/src/recovery_issue_31_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ fn recover_allocdb_rejects_client_slot_overflow_in_replayed_wal() {
6767
.unwrap();
6868
wal.sync().unwrap();
6969

70-
let error = recover_allocdb(config, &SnapshotFile::new(&snapshot_path), &wal).unwrap_err();
70+
let error = recover_allocdb(config, &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err();
7171
assert!(matches!(
7272
error,
7373
RecoveryError::Replay(ReplayError::SlotOverflow {

crates/allocdb-core/src/recovery_revoke_tests.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ fn recover_allocdb_preserves_revoking_state() {
8989
wal.append_frame(&client_frame(4, 2, &revoke)).unwrap();
9090
wal.sync().unwrap();
9191

92-
let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap();
92+
let recovered =
93+
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap();
9394
let reservation = recovered.db.reservation(ReservationId(2), Slot(2)).unwrap();
9495
let resource = recovered.db.resource(ResourceId(11)).unwrap();
9596

@@ -156,7 +157,8 @@ fn recover_allocdb_preserves_revoked_state() {
156157
wal.append_frame(&client_frame(5, 3, &reclaim)).unwrap();
157158
wal.sync().unwrap();
158159

159-
let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap();
160+
let recovered =
161+
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap();
160162
let reservation = recovered.db.reservation(ReservationId(2), Slot(5)).unwrap();
161163
let resource = recovered.db.resource(ResourceId(11)).unwrap();
162164

crates/allocdb-core/src/recovery_tests.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ fn recover_allocdb_replays_wal_without_snapshot() {
9292
wal.sync().unwrap();
9393

9494
let snapshot_file = SnapshotFile::new(&snapshot_path);
95-
let recovered = recover_allocdb(config(), &snapshot_file, &wal).unwrap();
95+
let recovered = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap();
9696

9797
assert!(!recovered.loaded_snapshot);
9898
assert_eq!(recovered.loaded_snapshot_lsn, None);
@@ -148,7 +148,7 @@ fn recover_allocdb_skips_frames_covered_by_snapshot() {
148148
wal.append_frame(&client_frame(3, 2, &confirm)).unwrap();
149149
wal.sync().unwrap();
150150

151-
let recovered = recover_allocdb(config(), &snapshot_file, &wal).unwrap();
151+
let recovered = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap();
152152

153153
assert!(recovered.loaded_snapshot);
154154
assert_eq!(recovered.loaded_snapshot_lsn, Some(Lsn(1)));
@@ -193,7 +193,8 @@ fn recover_allocdb_truncates_torn_tail() {
193193
.write_all(&torn[..torn.len() - 2])
194194
.unwrap();
195195

196-
let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap();
196+
let recovered =
197+
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap();
197198

198199
assert!(!recovered.loaded_snapshot);
199200
assert_eq!(recovered.recovered_wal.scan_result.frames.len(), 1);
@@ -212,13 +213,13 @@ fn recover_allocdb_truncates_torn_tail() {
212213
fn recover_allocdb_marks_empty_snapshot_as_loaded() {
213214
let wal_path = test_path("recover-empty-snapshot", "wal");
214215
let snapshot_path = test_path("recover-empty-snapshot", "snapshot");
215-
let wal = WalFile::open(&wal_path, 512).unwrap();
216+
let mut wal = WalFile::open(&wal_path, 512).unwrap();
216217
let snapshot_file = SnapshotFile::new(&snapshot_path);
217218
snapshot_file
218219
.write_snapshot(&AllocDb::new(config()).unwrap().snapshot())
219220
.unwrap();
220221

221-
let recovered = recover_allocdb(config(), &snapshot_file, &wal).unwrap();
222+
let recovered = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap();
222223

223224
assert!(recovered.loaded_snapshot);
224225
assert_eq!(recovered.loaded_snapshot_lsn, None);
@@ -262,7 +263,8 @@ fn recover_allocdb_replays_internal_commands() {
262263
wal.append_frame(&internal_frame(3, 5, &expire)).unwrap();
263264
wal.sync().unwrap();
264265

265-
let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap();
266+
let recovered =
267+
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap();
266268

267269
assert_eq!(recovered.replayed_wal_frame_count, 3);
268270
assert_eq!(recovered.replayed_wal_last_lsn, Some(Lsn(3)));
@@ -308,7 +310,8 @@ fn recover_allocdb_fails_closed_on_mid_log_corruption() {
308310
bytes[last_index] ^= 0xff;
309311
fs::write(&wal_path, bytes).unwrap();
310312

311-
let error = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap_err();
313+
let error =
314+
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err();
312315

313316
assert!(matches!(
314317
error,

crates/allocdb-core/src/wal_file.rs

Lines changed: 17 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
use std::fs::{self, File, OpenOptions};
2-
use std::io::{Read, Seek, SeekFrom, Write};
3-
use std::path::{Path, PathBuf};
1+
use std::path::Path;
42

53
use crate::wal::{DecodeError, Frame, ScanResult, ScanStopReason, scan_frames};
4+
use allocdb_wal_file::AppendWalFile;
65

76
#[derive(Debug)]
87
pub struct WalFile {
9-
path: PathBuf,
10-
file: File,
8+
file: AppendWalFile,
119
max_payload_bytes: usize,
1210
}
1311

@@ -43,19 +41,17 @@ impl WalFile {
4341
///
4442
/// Returns [`WalFileError`] if the file cannot be opened or created.
4543
pub fn open(path: impl AsRef<Path>, max_payload_bytes: usize) -> Result<Self, WalFileError> {
46-
let path = path.as_ref().to_path_buf();
47-
let file = open_append_file(&path)?;
44+
let file = AppendWalFile::open(path)?;
4845

4946
Ok(Self {
50-
path,
5147
file,
5248
max_payload_bytes,
5349
})
5450
}
5551

5652
#[must_use]
5753
pub fn path(&self) -> &Path {
58-
&self.path
54+
self.file.path()
5955
}
6056

6157
/// Appends one encoded frame to the WAL file.
@@ -66,9 +62,7 @@ impl WalFile {
6662
/// or [`WalFileError::Io`] if the append fails.
6763
pub fn append_frame(&mut self, frame: &Frame) -> Result<(), WalFileError> {
6864
self.validate_payload_len(frame)?;
69-
70-
let encoded = frame.encode();
71-
self.file.write_all(&encoded)?;
65+
self.file.append_bytes(&frame.encode())?;
7266
Ok(())
7367
}
7468

@@ -78,7 +72,7 @@ impl WalFile {
7872
///
7973
/// Returns [`WalFileError::Io`] if the sync fails.
8074
pub fn sync(&self) -> Result<(), WalFileError> {
81-
self.file.sync_data()?;
75+
self.file.sync()?;
8276
Ok(())
8377
}
8478

@@ -88,7 +82,7 @@ impl WalFile {
8882
///
8983
/// Returns [`WalFileError::Io`] if the file cannot be read.
9084
pub fn recover(&self) -> Result<RecoveredWal, WalFileError> {
91-
recover_path(&self.path)
85+
recover_file(&self.file)
9286
}
9387

9488
/// Truncates the file to the last valid frame boundary discovered by recovery scanning.
@@ -100,19 +94,16 @@ impl WalFile {
10094
/// # Panics
10195
///
10296
/// Panics only if the discovered valid prefix cannot fit into `u64`.
103-
pub fn truncate_to_valid_prefix(&self) -> Result<RecoveredWal, WalFileError> {
104-
let recovered = recover_path(&self.path)?;
97+
pub fn truncate_to_valid_prefix(&mut self) -> Result<RecoveredWal, WalFileError> {
98+
let recovered = recover_file(&self.file)?;
10599
let valid_prefix =
106100
u64::try_from(recovered.scan_result.valid_up_to).expect("valid WAL prefix must fit");
107101

108102
match recovered.scan_result.stop_reason {
109103
ScanStopReason::CleanEof => {}
110104
ScanStopReason::TornTail { .. } => {
111105
if recovered.file_len > valid_prefix {
112-
let mut file = OpenOptions::new().write(true).open(&self.path)?;
113-
file.set_len(valid_prefix)?;
114-
file.seek(SeekFrom::Start(valid_prefix))?;
115-
file.sync_data()?;
106+
self.file.truncate_to(valid_prefix)?;
116107
}
117108
}
118109
ScanStopReason::Corruption { offset, error } => {
@@ -134,22 +125,11 @@ impl WalFile {
134125
self.validate_payload_len(frame)?;
135126
}
136127

137-
if let Some(parent) = self.path.parent() {
138-
fs::create_dir_all(parent)?;
139-
}
140-
141-
let temp_path = self.temp_path();
142-
{
143-
let mut temp_file = File::create(&temp_path)?;
144-
for frame in frames {
145-
temp_file.write_all(&frame.encode())?;
146-
}
147-
temp_file.sync_data()?;
128+
let mut bytes = Vec::new();
129+
for frame in frames {
130+
bytes.extend_from_slice(&frame.encode());
148131
}
149-
150-
fs::rename(&temp_path, &self.path)?;
151-
sync_parent_dir(&self.path)?;
152-
self.file = open_append_file(&self.path)?;
132+
self.file.replace_with_bytes(&bytes)?;
153133
Ok(())
154134
}
155135

@@ -163,50 +143,17 @@ impl WalFile {
163143

164144
Ok(())
165145
}
166-
167-
fn temp_path(&self) -> PathBuf {
168-
let mut temp_path = self.path.clone();
169-
let extension = temp_path
170-
.extension()
171-
.and_then(|value| value.to_str())
172-
.map_or_else(|| "tmp".to_owned(), |value| format!("{value}.tmp"));
173-
temp_path.set_extension(extension);
174-
temp_path
175-
}
176146
}
177147

178-
fn recover_path(path: &Path) -> Result<RecoveredWal, WalFileError> {
179-
let mut file = File::open(path)?;
180-
let mut bytes = Vec::new();
181-
file.read_to_end(&mut bytes)?;
148+
fn recover_file(file: &AppendWalFile) -> Result<RecoveredWal, WalFileError> {
149+
let bytes = file.read_all()?;
182150
let scan_result = scan_frames(&bytes);
183151
Ok(RecoveredWal {
184152
scan_result,
185153
file_len: u64::try_from(bytes.len()).expect("file length must fit into u64"),
186154
})
187155
}
188156

189-
fn open_append_file(path: &Path) -> Result<File, std::io::Error> {
190-
OpenOptions::new()
191-
.create(true)
192-
.read(true)
193-
.append(true)
194-
.open(path)
195-
}
196-
197-
#[cfg(unix)]
198-
fn sync_parent_dir(path: &Path) -> Result<(), std::io::Error> {
199-
if let Some(parent) = path.parent() {
200-
OpenOptions::new().read(true).open(parent)?.sync_all()?;
201-
}
202-
Ok(())
203-
}
204-
205-
#[cfg(not(unix))]
206-
fn sync_parent_dir(_path: &Path) -> Result<(), std::io::Error> {
207-
Ok(())
208-
}
209-
210157
#[cfg(test)]
211158
mod tests {
212159
use std::fs;

0 commit comments

Comments
 (0)