Skip to content

Commit c8d928c

Browse files
committed
add w3d6 Task 3: Engine Interface and Serializable Validation
1 parent 588e85b commit c8d928c

File tree

4 files changed

+101
-9
lines changed

4 files changed

+101
-9
lines changed

mini-lsm-starter/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,15 @@ fn main() {
109109
```
110110
https://rustwiki.org/zh-CN/book/appendix-03-derivable-traits.html
111111
```
112+
113+
## week3 day6
114+
### Task 3: Engine Interface and Serializable Validation
115+
1. serializable validation
116+
go through committed txn in range (read_ts, expected_commit_ts)
117+
118+
if read_set overlaps with write_set of those txns, validation fails
119+
120+
if validation succeeds, commit the write_batch, insert write_set into `self.inner.mvcc().committed_txns`
121+
122+
123+

mini-lsm-starter/src/lsm_storage.rs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ impl LsmStorageInner {
591591
}
592592

593593
/// Write a batch of data into the storage. Implement in week 2 day 7.
594-
pub fn write_batch<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<()> {
594+
pub fn write_batch_inner<T: AsRef<[u8]>>(&self, batch: &[WriteBatchRecord<T>]) -> Result<u64> {
595595
let _lk = self.mvcc().write_lock.lock();
596596
let ts = self.mvcc().latest_commit_ts() + 1;
597597
for record in batch {
@@ -625,17 +625,54 @@ impl LsmStorageInner {
625625
}
626626
}
627627
self.mvcc().update_commit_ts(ts);
628+
Ok(ts)
629+
}
630+
631+
/// Write a batch of data into the storage. Implement in week 2 day 7.
632+
pub fn write_batch<T: AsRef<[u8]>>(self: &Arc<Self>, batch: &[WriteBatchRecord<T>]) -> Result<()> {
633+
if !self.options.serializable {
634+
self.write_batch_inner(batch)?;
635+
}
636+
else {
637+
let new_txn = self.mvcc().new_txn(self.clone(), self.options.serializable);
638+
for record in batch {
639+
match record {
640+
WriteBatchRecord::Put(key, value) => {
641+
new_txn.put(key.as_ref(), value.as_ref());
642+
}
643+
WriteBatchRecord::Del(key) => {
644+
new_txn.delete(key.as_ref());
645+
}
646+
}
647+
}
648+
}
628649
Ok(())
629650
}
630651

631652
/// Put a key-value pair into the storage by writing into the current memtable.
632-
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
633-
self.write_batch(&[WriteBatchRecord::Put(key, value)])
653+
pub fn put(self: &Arc<Self>, key: &[u8], value: &[u8]) -> Result<()> {
654+
if !self.options.serializable {
655+
self.write_batch_inner(&[WriteBatchRecord::Put(key, value)])?;
656+
}
657+
else {
658+
let new_txn = self.mvcc().new_txn(self.clone(), self.options.serializable);
659+
new_txn.put(key, value);
660+
new_txn.commit()?;
661+
}
662+
Ok(())
634663
}
635664

636665
/// Remove a key from the storage by writing an empty value.
637-
pub fn delete(&self, key: &[u8]) -> Result<()> {
638-
self.write_batch(&[WriteBatchRecord::Del(key)])
666+
pub fn delete(self: &Arc<Self>, key: &[u8]) -> Result<()> {
667+
if !self.options.serializable {
668+
self.write_batch_inner(&[WriteBatchRecord::Del(key)])?;
669+
}
670+
else {
671+
let new_txn = self.mvcc().new_txn(self.clone(), self.options.serializable);
672+
new_txn.delete(key);
673+
new_txn.commit()?;
674+
}
675+
Ok(())
639676
}
640677

641678
pub(crate) fn path_of_sst_static(path: impl AsRef<Path>, id: usize) -> PathBuf {

mini-lsm-starter/src/mvcc.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,11 @@ impl LsmMvccInner {
6464
inner,
6565
local_storage: Arc::new(SkipMap::new()),
6666
committed: Arc::new(AtomicBool::new(false)),
67-
key_hashes: if serializable {Some(Mutex::new((HashSet::new(), HashSet::new())))} else {None},
67+
key_hashes: if serializable {
68+
Some(Mutex::new((HashSet::new(), HashSet::new())))
69+
} else {
70+
None
71+
},
6872
})
6973
}
7074
}

mini-lsm-starter/src/mvcc/txn.rs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
sync::{atomic::AtomicBool, atomic::Ordering, Arc},
88
};
99

10-
use anyhow::Result;
10+
use anyhow::{bail, Result};
1111
use bytes::Bytes;
1212
use crossbeam_skiplist::SkipMap;
1313
use ouroboros::self_referencing;
@@ -20,6 +20,8 @@ use crate::{
2020
mem_table::map_bound,
2121
};
2222

23+
use super::CommittedTxnData;
24+
2325
pub struct Transaction {
2426
pub(crate) read_ts: u64,
2527
pub(crate) inner: Arc<LsmStorageInner>,
@@ -37,7 +39,7 @@ impl Transaction {
3739

3840
if self.key_hashes.is_some() {
3941
let mut guard = self.key_hashes.as_ref().unwrap().lock();
40-
let (_, read_set) = & mut *guard;
42+
let (_, read_set) = &mut *guard;
4143
read_set.insert(crc32fast::hash(key));
4244
}
4345

@@ -90,6 +92,28 @@ impl Transaction {
9092
}
9193

9294
pub fn commit(&self) -> Result<()> {
95+
// serializable validation
96+
let mut serializable_check = false;
97+
let _commit_lock = self.inner.mvcc().commit_lock.lock();
98+
let expected_commit_ts = self.inner.mvcc().latest_commit_ts() + 1;
99+
if self.key_hashes.is_some() {
100+
let guard = self.key_hashes.as_ref().unwrap().lock();
101+
let (write_set, read_set) = &*guard;
102+
if !write_set.is_empty() {
103+
let committed_txns = self.inner.mvcc().committed_txns.lock();
104+
for (_ts, txn_data) in committed_txns.iter() {
105+
let ts_overlap = self.read_ts < txn_data.commit_ts
106+
&& txn_data.commit_ts < expected_commit_ts;
107+
// commit timestamp within range (read_ts, expected_commit_ts), and has jointset
108+
if ts_overlap && !read_set.is_disjoint(&txn_data.key_hashes) {
109+
bail!("Serializable Validation fail!");
110+
}
111+
}
112+
}
113+
serializable_check = true;
114+
}
115+
116+
// begin commit process
93117
self.committed
94118
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
95119
.expect("cannot operate on committed txn!");
@@ -107,7 +131,22 @@ impl Transaction {
107131
})
108132
.collect::<Vec<_>>();
109133

110-
self.inner.write_batch(&batch)
134+
let ts = self.inner.write_batch_inner(&batch)?;
135+
if serializable_check {
136+
let mut guard = self.key_hashes.as_ref().unwrap().lock();
137+
let (write_set, read_set) = &mut *guard;
138+
let mut committed_txns = self.inner.mvcc().committed_txns.lock();
139+
let old_tnx_data = committed_txns.insert(
140+
ts,
141+
CommittedTxnData {
142+
commit_ts: ts,
143+
key_hashes: std::mem::take(write_set),
144+
read_ts: self.read_ts,
145+
},
146+
);
147+
assert!(old_tnx_data.is_none());
148+
}
149+
Ok(())
111150
}
112151
}
113152

0 commit comments

Comments
 (0)