Skip to content

Commit ad85885

Browse files
committed
impl w3d5 Transaction and Optimistic Concurrency Control
1 parent cc9c720 commit ad85885

File tree

4 files changed

+86
-3
lines changed

4 files changed

+86
-3
lines changed

mini-lsm-starter/src/lsm_iterator.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ impl LsmIterator {
6767
while self.inner.is_valid() && self.prev_key == self.inner.key().key_ref() {
6868
self.inner_next()?;
6969
}
70-
7170
if !self.inner.is_valid() {
7271
break;
7372
}
@@ -82,12 +81,16 @@ impl LsmIterator {
8281
{
8382
self.inner_next()?;
8483
}
85-
8684
// after call of self.inner_next(), self.inner may be invalid, so we should check once call self.inner_next()
8785
if !self.inner.is_valid() {
8886
break;
8987
}
9088

89+
// if seek to next different key, repeat the process
90+
if self.prev_key != self.inner.key().key_ref() {
91+
continue;
92+
}
93+
9194
if !self.inner.value().is_empty() {
9295
break;
9396
}

mini-lsm-starter/src/mvcc/txn.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ impl Transaction {
3333
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
3434
// first probe the local_sotrage
3535
if let Some(entry) = self.local_storage.get(key) {
36+
if entry.value().is_empty() {
37+
return Ok(None);
38+
}
3639
return Ok(Some(entry.value().clone()));
3740
}
3841
self.inner.get_with_ts(key, self.read_ts)
@@ -83,7 +86,8 @@ impl Transaction {
8386
}
8487
})
8588
.collect::<Vec<_>>();
86-
Ok(())
89+
90+
self.inner.write_batch(&batch)
8791
}
8892
}
8993

mini-lsm-starter/src/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ mod week3_day1;
1919
mod week3_day2;
2020
mod week3_day3;
2121
mod week3_day4;
22+
mod week3_day5;
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
use std::ops::Bound;
2+
3+
use bytes::Bytes;
4+
use tempfile::tempdir;
5+
6+
use crate::{
7+
compact::CompactionOptions,
8+
lsm_storage::{LsmStorageOptions, MiniLsm},
9+
tests::harness::check_lsm_iter_result_by_key,
10+
};
11+
12+
#[test]
13+
fn test_txn_integration() {
14+
let dir = tempdir().unwrap();
15+
let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction);
16+
let storage = MiniLsm::open(&dir, options.clone()).unwrap();
17+
let txn1 = storage.new_txn().unwrap();
18+
let txn2 = storage.new_txn().unwrap();
19+
txn1.put(b"test1", b"233");
20+
txn2.put(b"test2", b"233");
21+
check_lsm_iter_result_by_key(
22+
&mut txn1.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
23+
vec![(Bytes::from("test1"), Bytes::from("233"))],
24+
);
25+
check_lsm_iter_result_by_key(
26+
&mut txn2.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
27+
vec![(Bytes::from("test2"), Bytes::from("233"))],
28+
);
29+
let txn3 = storage.new_txn().unwrap();
30+
check_lsm_iter_result_by_key(
31+
&mut txn3.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
32+
vec![],
33+
);
34+
txn1.commit().unwrap();
35+
txn2.commit().unwrap();
36+
check_lsm_iter_result_by_key(
37+
&mut txn3.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
38+
vec![],
39+
);
40+
drop(txn3);
41+
check_lsm_iter_result_by_key(
42+
&mut storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
43+
vec![
44+
(Bytes::from("test1"), Bytes::from("233")),
45+
(Bytes::from("test2"), Bytes::from("233")),
46+
],
47+
);
48+
let txn4 = storage.new_txn().unwrap();
49+
assert_eq!(txn4.get(b"test1").unwrap(), Some(Bytes::from("233")));
50+
assert_eq!(txn4.get(b"test2").unwrap(), Some(Bytes::from("233")));
51+
check_lsm_iter_result_by_key(
52+
&mut txn4.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
53+
vec![
54+
(Bytes::from("test1"), Bytes::from("233")),
55+
(Bytes::from("test2"), Bytes::from("233")),
56+
],
57+
);
58+
txn4.put(b"test2", b"2333");
59+
assert_eq!(txn4.get(b"test1").unwrap(), Some(Bytes::from("233")));
60+
assert_eq!(txn4.get(b"test2").unwrap(), Some(Bytes::from("2333")));
61+
check_lsm_iter_result_by_key(
62+
&mut txn4.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
63+
vec![
64+
(Bytes::from("test1"), Bytes::from("233")),
65+
(Bytes::from("test2"), Bytes::from("2333")),
66+
],
67+
);
68+
txn4.delete(b"test2");
69+
assert_eq!(txn4.get(b"test1").unwrap(), Some(Bytes::from("233")));
70+
assert_eq!(txn4.get(b"test2").unwrap(), None);
71+
check_lsm_iter_result_by_key(
72+
&mut txn4.scan(Bound::Unbounded, Bound::Unbounded).unwrap(),
73+
vec![(Bytes::from("test1"), Bytes::from("233"))],
74+
);
75+
}

0 commit comments

Comments
 (0)