Skip to content

Commit 3dfe831

Browse files
committed
impl w3d4 Watermark and Garbage Collection
1 parent 8eac085 commit 3dfe831

File tree

5 files changed

+223
-12
lines changed

5 files changed

+223
-12
lines changed

mini-lsm-starter/src/compact.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,12 @@ impl LsmStorageInner {
118118
&self,
119119
mut iter: impl for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
120120
) -> Result<Vec<Arc<SsTable>>> {
121-
// if lots of the same keys exist, simply retain the latest one.
122-
// if the latest version is a delete marker, we do not need to keep it in the produced SST files.
121+
// If a version of a key is above watermark, keep it.
122+
// For all versions of a key below or equal to the watermark, keep the latest version.
123123
let target_sst_size = self.options.target_sst_size;
124124
let block_size = self.options.block_size;
125+
let min_ts = self.mvcc().watermark();
126+
125127
let mut sstables = Vec::new();
126128
let mut sst_builder = None;
127129
let mut prev_key = Vec::<u8>::new();
@@ -132,6 +134,33 @@ impl LsmStorageInner {
132134

133135
let is_same_as_prevkey = prev_key == iter.key().key_ref();
134136
let builder_inner = sst_builder.as_mut().unwrap();
137+
138+
if is_same_as_prevkey && iter.key().ts() < min_ts {
139+
println!(
140+
"prev_key: {:?}, iter.key(): {:?}, ts: {:?}, min_ts: {:?}",
141+
prev_key,
142+
iter.key(),
143+
iter.key().ts(),
144+
min_ts
145+
);
146+
iter.next()?;
147+
continue;
148+
}
149+
150+
if !is_same_as_prevkey && iter.key().ts() <= min_ts && iter.value().is_empty() {
151+
println!(
152+
"delete key: {:?}, value: {:?}, ts: {:?}, min_ts: {:?}",
153+
iter.key(),
154+
iter.value(),
155+
iter.key().ts(),
156+
min_ts
157+
);
158+
prev_key.clear();
159+
prev_key.extend(iter.key().key_ref());
160+
iter.next()?;
161+
continue;
162+
}
163+
135164
builder_inner.add(iter.key(), iter.value());
136165

137166
if builder_inner.estimated_size() >= target_sst_size && !is_same_as_prevkey {
@@ -146,12 +175,11 @@ impl LsmStorageInner {
146175
)?));
147176
}
148177

149-
iter.next()?;
150-
151-
if !is_same_as_prevkey && iter.is_valid() {
178+
if !is_same_as_prevkey {
152179
prev_key.clear();
153180
prev_key.extend(iter.key().key_ref());
154181
}
182+
iter.next()?;
155183
}
156184

157185
if let Some(builder) = sst_builder {

mini-lsm-starter/src/mvcc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#![allow(dead_code)] // TODO(you): remove this lint after implementing this mod
33

44
pub mod txn;
5-
mod watermark;
5+
pub mod watermark;
66

77
use std::{
88
collections::{BTreeMap, HashSet},

mini-lsm-starter/src/mvcc/watermark.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,10 @@ impl Watermark {
2727
}
2828

2929
pub fn watermark(&self) -> Option<u64> {
30-
if let Some(min_ts) = self.readers.keys().next() {
31-
return Some(*min_ts);
32-
}
33-
else {
34-
return None;
35-
}
30+
self.readers.first_key_value().map(|ts| *ts.0)
31+
}
32+
33+
pub fn num_retained_snapshots(&self) -> usize {
34+
self.readers.len()
3635
}
3736
}

mini-lsm-starter/src/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ mod week2_day6;
1818
mod week3_day1;
1919
mod week3_day2;
2020
mod week3_day3;
21+
mod week3_day4;
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
use bytes::Bytes;
2+
use tempfile::tempdir;
3+
4+
use crate::{
5+
compact::CompactionOptions,
6+
lsm_storage::{LsmStorageOptions, MiniLsm, WriteBatchRecord},
7+
mvcc::watermark::Watermark,
8+
};
9+
10+
use super::harness::{check_iter_result_by_key, construct_merge_iterator_over_storage};
11+
12+
#[test]
13+
fn test_task1_watermark() {
14+
let mut watermark = Watermark::new();
15+
watermark.add_reader(0);
16+
for i in 1..=1000 {
17+
watermark.add_reader(i);
18+
assert_eq!(watermark.watermark(), Some(0));
19+
assert_eq!(watermark.num_retained_snapshots(), i as usize + 1);
20+
}
21+
let mut cnt = 1001;
22+
for i in 0..500 {
23+
watermark.remove_reader(i);
24+
assert_eq!(watermark.watermark(), Some(i + 1));
25+
cnt -= 1;
26+
assert_eq!(watermark.num_retained_snapshots(), cnt);
27+
}
28+
for i in (501..=1000).rev() {
29+
watermark.remove_reader(i);
30+
assert_eq!(watermark.watermark(), Some(500));
31+
cnt -= 1;
32+
assert_eq!(watermark.num_retained_snapshots(), cnt);
33+
}
34+
watermark.remove_reader(500);
35+
assert_eq!(watermark.watermark(), None);
36+
assert_eq!(watermark.num_retained_snapshots(), 0);
37+
watermark.add_reader(2000);
38+
watermark.add_reader(2000);
39+
watermark.add_reader(2001);
40+
assert_eq!(watermark.num_retained_snapshots(), 2);
41+
assert_eq!(watermark.watermark(), Some(2000));
42+
watermark.remove_reader(2000);
43+
assert_eq!(watermark.num_retained_snapshots(), 2);
44+
assert_eq!(watermark.watermark(), Some(2000));
45+
watermark.remove_reader(2000);
46+
assert_eq!(watermark.num_retained_snapshots(), 1);
47+
assert_eq!(watermark.watermark(), Some(2001));
48+
}
49+
50+
#[test]
51+
fn test_task2_snapshot_watermark() {
52+
let dir = tempdir().unwrap();
53+
let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction);
54+
let storage = MiniLsm::open(&dir, options.clone()).unwrap();
55+
let txn1 = storage.new_txn().unwrap();
56+
let txn2 = storage.new_txn().unwrap();
57+
storage.put(b"233", b"23333").unwrap();
58+
let txn3 = storage.new_txn().unwrap();
59+
assert_eq!(storage.inner.mvcc().watermark(), txn1.read_ts);
60+
drop(txn1);
61+
assert_eq!(storage.inner.mvcc().watermark(), txn2.read_ts);
62+
drop(txn2);
63+
assert_eq!(storage.inner.mvcc().watermark(), txn3.read_ts);
64+
drop(txn3);
65+
assert_eq!(
66+
storage.inner.mvcc().watermark(),
67+
storage.inner.mvcc().latest_commit_ts()
68+
);
69+
}
70+
71+
#[test]
72+
fn test_task3_mvcc_compaction() {
73+
let dir = tempdir().unwrap();
74+
let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction);
75+
let storage = MiniLsm::open(&dir, options.clone()).unwrap();
76+
let snapshot0 = storage.new_txn().unwrap();
77+
storage
78+
.write_batch(&[
79+
WriteBatchRecord::Put(b"a", b"1"),
80+
WriteBatchRecord::Put(b"b", b"1"),
81+
])
82+
.unwrap();
83+
let snapshot1 = storage.new_txn().unwrap();
84+
storage
85+
.write_batch(&[
86+
WriteBatchRecord::Put(b"a", b"2"),
87+
WriteBatchRecord::Put(b"d", b"2"),
88+
])
89+
.unwrap();
90+
let snapshot2 = storage.new_txn().unwrap();
91+
storage
92+
.write_batch(&[
93+
WriteBatchRecord::Put(b"a", b"3"),
94+
WriteBatchRecord::Del(b"d"),
95+
])
96+
.unwrap();
97+
let snapshot3 = storage.new_txn().unwrap();
98+
storage
99+
.write_batch(&[
100+
WriteBatchRecord::Put(b"c", b"4"),
101+
WriteBatchRecord::Del(b"a"),
102+
])
103+
.unwrap();
104+
105+
storage.force_flush().unwrap();
106+
storage.force_full_compaction().unwrap();
107+
108+
let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
109+
check_iter_result_by_key(
110+
&mut iter,
111+
vec![
112+
(Bytes::from("a"), Bytes::new()),
113+
(Bytes::from("a"), Bytes::from("3")),
114+
(Bytes::from("a"), Bytes::from("2")),
115+
(Bytes::from("a"), Bytes::from("1")),
116+
(Bytes::from("b"), Bytes::from("1")),
117+
(Bytes::from("c"), Bytes::from("4")),
118+
(Bytes::from("d"), Bytes::new()),
119+
(Bytes::from("d"), Bytes::from("2")),
120+
],
121+
);
122+
123+
drop(snapshot0);
124+
storage.force_full_compaction().unwrap();
125+
126+
let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
127+
check_iter_result_by_key(
128+
&mut iter,
129+
vec![
130+
(Bytes::from("a"), Bytes::new()),
131+
(Bytes::from("a"), Bytes::from("3")),
132+
(Bytes::from("a"), Bytes::from("2")),
133+
(Bytes::from("a"), Bytes::from("1")),
134+
(Bytes::from("b"), Bytes::from("1")),
135+
(Bytes::from("c"), Bytes::from("4")),
136+
(Bytes::from("d"), Bytes::new()),
137+
(Bytes::from("d"), Bytes::from("2")),
138+
],
139+
);
140+
141+
drop(snapshot1);
142+
storage.force_full_compaction().unwrap();
143+
144+
let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
145+
check_iter_result_by_key(
146+
&mut iter,
147+
vec![
148+
(Bytes::from("a"), Bytes::new()),
149+
(Bytes::from("a"), Bytes::from("3")),
150+
(Bytes::from("a"), Bytes::from("2")),
151+
(Bytes::from("b"), Bytes::from("1")),
152+
(Bytes::from("c"), Bytes::from("4")),
153+
(Bytes::from("d"), Bytes::new()),
154+
(Bytes::from("d"), Bytes::from("2")),
155+
],
156+
);
157+
158+
drop(snapshot2);
159+
storage.force_full_compaction().unwrap();
160+
161+
let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
162+
check_iter_result_by_key(
163+
&mut iter,
164+
vec![
165+
(Bytes::from("a"), Bytes::new()),
166+
(Bytes::from("a"), Bytes::from("3")),
167+
(Bytes::from("b"), Bytes::from("1")),
168+
(Bytes::from("c"), Bytes::from("4")),
169+
],
170+
);
171+
172+
drop(snapshot3);
173+
storage.force_full_compaction().unwrap();
174+
175+
let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read());
176+
check_iter_result_by_key(
177+
&mut iter,
178+
vec![
179+
(Bytes::from("b"), Bytes::from("1")),
180+
(Bytes::from("c"), Bytes::from("4")),
181+
],
182+
);
183+
}

0 commit comments

Comments
 (0)