Skip to content

Commit c871764

Browse files
committed
test(file_store): last_write_is_short
This test simulates a situation where the last write to the db is short. Aggregating the changeset after reopening the file should return an error (which includes a partially-aggregated changeset) containing an aggregation of changesets that were fully written. At this point, the test re-writes the final changeset (and this time it successfully writes in full). The file should be recoverable with all changesets, including the last one.
1 parent a3aa8b6 commit c871764

File tree

1 file changed

+73
-3
lines changed

1 file changed

+73
-3
lines changed

crates/file_store/src/store.rs

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ mod test {
219219

220220
use bincode::DefaultOptions;
221221
use std::{
222+
collections::BTreeSet,
222223
io::{Read, Write},
223224
vec::Vec,
224225
};
@@ -228,7 +229,7 @@ mod test {
228229
const TEST_MAGIC_BYTES: [u8; TEST_MAGIC_BYTES_LEN] =
229230
[98, 100, 107, 102, 115, 49, 49, 49, 49, 49, 49, 49];
230231

231-
type TestChangeSet = Vec<String>;
232+
type TestChangeSet = BTreeSet<String>;
232233

233234
#[derive(Debug)]
234235
struct TestTracker;
@@ -253,7 +254,7 @@ mod test {
253254
fn open_or_create_new() {
254255
let temp_dir = tempfile::tempdir().unwrap();
255256
let file_path = temp_dir.path().join("db_file");
256-
let changeset = vec!["hello".to_string(), "world".to_string()];
257+
let changeset = BTreeSet::from(["hello".to_string(), "world".to_string()]);
257258

258259
{
259260
let mut db = Store::<TestChangeSet>::open_or_create_new(&TEST_MAGIC_BYTES, &file_path)
@@ -304,7 +305,7 @@ mod test {
304305
let mut data = [255_u8; 2000];
305306
data[..TEST_MAGIC_BYTES_LEN].copy_from_slice(&TEST_MAGIC_BYTES);
306307

307-
let changeset = vec!["one".into(), "two".into(), "three!".into()];
308+
let changeset = TestChangeSet::from(["one".into(), "two".into(), "three!".into()]);
308309

309310
let mut file = NamedTempFile::new().unwrap();
310311
file.write_all(&data).expect("should write");
@@ -340,4 +341,73 @@ mod test {
340341

341342
assert_eq!(got_bytes, expected_bytes);
342343
}
344+
345+
#[test]
346+
fn last_write_is_short() {
347+
let temp_dir = tempfile::tempdir().unwrap();
348+
349+
let changesets = [
350+
TestChangeSet::from(["1".into()]),
351+
TestChangeSet::from(["2".into(), "3".into()]),
352+
TestChangeSet::from(["4".into(), "5".into(), "6".into()]),
353+
];
354+
let last_changeset = TestChangeSet::from(["7".into(), "8".into(), "9".into()]);
355+
let last_changeset_bytes = bincode_options().serialize(&last_changeset).unwrap();
356+
357+
for short_write_len in 1..last_changeset_bytes.len() - 1 {
358+
let file_path = temp_dir.path().join(format!("{}.dat", short_write_len));
359+
println!("Test file: {:?}", file_path);
360+
361+
// simulate creating a file, writing data where the last write is incomplete
362+
{
363+
let mut db =
364+
Store::<TestChangeSet>::create_new(&TEST_MAGIC_BYTES, &file_path).unwrap();
365+
for changeset in &changesets {
366+
db.append_changeset(changeset).unwrap();
367+
}
368+
// this is the incomplete write
369+
db.db_file
370+
.write_all(&last_changeset_bytes[..short_write_len])
371+
.unwrap();
372+
}
373+
374+
// load file again and aggregate changesets
375+
// write the last changeset again (this time it succeeds)
376+
{
377+
let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
378+
let err = db
379+
.aggregate_changesets()
380+
.expect_err("should return error as last read is short");
381+
assert_eq!(
382+
err.changeset,
383+
changesets.iter().cloned().reduce(|mut acc, cs| {
384+
Append::append(&mut acc, cs);
385+
acc
386+
}),
387+
"should recover all changesets that are written in full",
388+
);
389+
db.db_file.write_all(&last_changeset_bytes).unwrap();
390+
}
391+
392+
// load file again - this time we should successfully aggregate all changesets
393+
{
394+
let mut db = Store::<TestChangeSet>::open(&TEST_MAGIC_BYTES, &file_path).unwrap();
395+
let aggregated_changesets = db
396+
.aggregate_changesets()
397+
.expect("aggregating all changesets should succeed");
398+
assert_eq!(
399+
aggregated_changesets,
400+
changesets
401+
.iter()
402+
.cloned()
403+
.chain(core::iter::once(last_changeset.clone()))
404+
.reduce(|mut acc, cs| {
405+
Append::append(&mut acc, cs);
406+
acc
407+
}),
408+
"should recover all changesets",
409+
);
410+
}
411+
}
412+
}
343413
}

0 commit comments

Comments
 (0)