Skip to content

Commit 1ddc35d

Browse files
authored
[Turbopack] fix race condition in database compaction (#73495)
### What? * sort new SST files * assign moved SST sequence number in order instead of concurrently * add test case
1 parent afeb404 commit 1ddc35d

File tree

3 files changed

+124
-6
lines changed

3 files changed

+124
-6
lines changed

turbopack/crates/turbo-persistence/src/db.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl TurboPersistence {
290290
}
291291

292292
sst_files.retain(|seq| !deleted_files.contains(seq));
293-
sst_files.sort();
293+
sst_files.sort_unstable();
294294
let sst_files = sst_files
295295
.into_iter()
296296
.map(|seq| self.open_sst(seq))
@@ -407,11 +407,13 @@ impl TurboPersistence {
407407
/// new files.
408408
fn commit(
409409
&self,
410-
new_sst_files: Vec<(u32, File)>,
410+
mut new_sst_files: Vec<(u32, File)>,
411411
new_blob_files: Vec<File>,
412412
mut indicies_to_delete: Vec<usize>,
413413
mut seq: u32,
414414
) -> Result<(), anyhow::Error> {
415+
new_sst_files.sort_unstable_by_key(|(seq, _)| *seq);
416+
415417
let mut new_sst_files = new_sst_files
416418
.into_iter()
417419
.map(|(seq, file)| {
@@ -433,7 +435,7 @@ impl TurboPersistence {
433435
{
434436
let mut inner = self.inner.write();
435437
inner.current_sequence_number = seq;
436-
indicies_to_delete.sort();
438+
indicies_to_delete.sort_unstable();
437439
removed_ssts = remove_indicies(&mut inner.static_sorted_files, &indicies_to_delete);
438440
inner.static_sorted_files.append(&mut new_sst_files);
439441
}
@@ -442,7 +444,7 @@ impl TurboPersistence {
442444
.into_iter()
443445
.map(|sst| sst.sequence_number())
444446
.collect::<Vec<_>>();
445-
removed_ssts.sort();
447+
removed_ssts.sort_unstable();
446448

447449
if !indicies_to_delete.is_empty() {
448450
// Write *.del file, marking the selected files as to delete
@@ -747,14 +749,21 @@ impl TurboPersistence {
747749
})
748750
.collect::<Result<Vec<_>>>()?;
749751

752+
let move_jobs = move_jobs
753+
.into_iter()
754+
.map(|index| {
755+
let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
756+
(index, seq)
757+
})
758+
.collect::<Vec<_>>();
759+
750760
// Move SST files
751761
let mut new_sst_files = move_jobs
752762
.into_par_iter()
753763
.with_min_len(1)
754-
.map(|index| {
764+
.map(|(index, seq)| {
755765
let index = ssts_with_ranges[index].index;
756766
let sst = &static_sorted_files[index];
757-
let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1;
758767
let src_path = self.path.join(format!("{:08}.sst", sst.sequence_number()));
759768
let dst_path = self.path.join(format!("{:08}.sst", seq));
760769
if fs::hard_link(&src_path, &dst_path).is_err() {

turbopack/crates/turbo-persistence/src/key.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ pub trait StoreKey: KeyBase + Ord {
135135
fn write_to(&self, buf: &mut Vec<u8>);
136136
}
137137

138+
impl<const N: usize> StoreKey for [u8; N] {
139+
fn write_to(&self, buf: &mut Vec<u8>) {
140+
buf.extend_from_slice(&self[..]);
141+
}
142+
}
143+
138144
impl StoreKey for Vec<u8> {
139145
fn write_to(&self, buf: &mut Vec<u8>) {
140146
buf.extend_from_slice(self);

turbopack/crates/turbo-persistence/src/tests.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,3 +345,106 @@ fn full_cycle() -> Result<()> {
345345
}
346346
Ok(())
347347
}
348+
349+
#[test]
350+
fn persist_changes() -> Result<()> {
351+
let tempdir = tempfile::tempdir()?;
352+
let path = tempdir.path();
353+
354+
fn put(b: &WriteBatch<(u8, [u8; 4]), 1>, key: u8, value: u8) -> Result<()> {
355+
for i in 0..2000000u32 {
356+
b.put(0, (key, i.to_be_bytes()), vec![value].into())?;
357+
}
358+
Ok(())
359+
}
360+
fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> {
361+
for i in 0..200000u32 {
362+
let i = i * 10;
363+
assert_eq!(
364+
db.get(0, &(key, i.to_be_bytes()))?.as_deref(),
365+
Some(&[value][..])
366+
);
367+
}
368+
Ok(())
369+
}
370+
371+
{
372+
let db = TurboPersistence::open(path.to_path_buf())?;
373+
let b = db.write_batch::<_, 1>()?;
374+
put(&b, 1, 11)?;
375+
put(&b, 2, 21)?;
376+
put(&b, 3, 31)?;
377+
db.commit_write_batch(b)?;
378+
379+
check(&db, 1, 11)?;
380+
check(&db, 2, 21)?;
381+
check(&db, 3, 31)?;
382+
383+
db.shutdown()?;
384+
}
385+
386+
println!("---");
387+
{
388+
let db = TurboPersistence::open(path.to_path_buf())?;
389+
let b = db.write_batch::<_, 1>()?;
390+
put(&b, 1, 12)?;
391+
put(&b, 2, 22)?;
392+
db.commit_write_batch(b)?;
393+
394+
check(&db, 1, 12)?;
395+
check(&db, 2, 22)?;
396+
check(&db, 3, 31)?;
397+
398+
db.shutdown()?;
399+
}
400+
401+
{
402+
let db = TurboPersistence::open(path.to_path_buf())?;
403+
let b = db.write_batch::<_, 1>()?;
404+
put(&b, 1, 13)?;
405+
db.commit_write_batch(b)?;
406+
407+
check(&db, 1, 13)?;
408+
check(&db, 2, 22)?;
409+
check(&db, 3, 31)?;
410+
411+
db.shutdown()?;
412+
}
413+
414+
println!("---");
415+
{
416+
let db = TurboPersistence::open(path.to_path_buf())?;
417+
418+
check(&db, 1, 13)?;
419+
check(&db, 2, 22)?;
420+
check(&db, 3, 31)?;
421+
422+
db.shutdown()?;
423+
}
424+
425+
println!("---");
426+
{
427+
let db = TurboPersistence::open(path.to_path_buf())?;
428+
429+
db.compact(1.0, 3)?;
430+
431+
check(&db, 1, 13)?;
432+
check(&db, 2, 22)?;
433+
check(&db, 3, 31)?;
434+
435+
db.shutdown()?;
436+
}
437+
438+
println!("---");
439+
{
440+
let db = TurboPersistence::open(path.to_path_buf())?;
441+
442+
check(&db, 1, 13)?;
443+
check(&db, 2, 22)?;
444+
check(&db, 3, 31)?;
445+
446+
db.shutdown()?;
447+
}
448+
449+
Ok(())
450+
}

0 commit comments

Comments
 (0)