44use std:: {
55 collections:: HashSet ,
66 ops:: Bound ,
7- sync:: { atomic:: AtomicBool , Arc } ,
7+ sync:: { atomic:: AtomicBool , atomic :: Ordering , Arc } ,
88} ;
99
1010use anyhow:: Result ;
@@ -16,7 +16,8 @@ use parking_lot::Mutex;
1616use crate :: {
1717 iterators:: { two_merge_iterator:: TwoMergeIterator , StorageIterator } ,
1818 lsm_iterator:: { FusedIterator , LsmIterator } ,
19- lsm_storage:: LsmStorageInner , mem_table:: map_bound,
19+ lsm_storage:: { LsmStorageInner , WriteBatchRecord } ,
20+ mem_table:: map_bound,
2021} ;
2122
2223pub struct Transaction {
@@ -42,7 +43,8 @@ impl Transaction {
4243 map : self . local_storage . clone ( ) ,
4344 iter_builder : |map| map. range ( ( map_bound ( lower) , map_bound ( upper) ) ) ,
4445 item : ( Bytes :: new ( ) , Bytes :: new ( ) ) ,
45- } . build ( ) ;
46+ }
47+ . build ( ) ;
4648 let _ = local_iter. next ( ) ;
4749
4850 let storage_iter = self . inner . scan_with_ts ( lower, upper, self . read_ts ) ?;
@@ -55,15 +57,33 @@ impl Transaction {
5557 }
5658
5759 pub fn put ( & self , key : & [ u8 ] , value : & [ u8 ] ) {
58- self . local_storage . insert ( Bytes :: copy_from_slice ( key) , Bytes :: copy_from_slice ( value) ) ;
60+ self . local_storage
61+ . insert ( Bytes :: copy_from_slice ( key) , Bytes :: copy_from_slice ( value) ) ;
5962 }
6063
6164 pub fn delete ( & self , key : & [ u8 ] ) {
62- self . local_storage . insert ( Bytes :: copy_from_slice ( key) , Bytes :: new ( ) ) ;
65+ self . local_storage
66+ . insert ( Bytes :: copy_from_slice ( key) , Bytes :: new ( ) ) ;
6367 }
6468
6569 pub fn commit ( & self ) -> Result < ( ) > {
66- unimplemented ! ( )
70+ self . committed
71+ . compare_exchange ( false , true , Ordering :: SeqCst , Ordering :: SeqCst )
72+ . expect ( "cannot operate on committed txn!" ) ;
73+
74+ // collect all key-value pairs from the local storage and submit a write batch to the storage engine
75+ let batch = self
76+ . local_storage
77+ . iter ( )
78+ . map ( |entry| {
79+ if entry. value ( ) . is_empty ( ) {
80+ WriteBatchRecord :: Del ( entry. key ( ) . clone ( ) )
81+ } else {
82+ WriteBatchRecord :: Put ( entry. key ( ) . clone ( ) , entry. value ( ) . clone ( ) )
83+ }
84+ } )
85+ . collect :: < Vec < _ > > ( ) ;
86+ Ok ( ( ) )
6787 }
6888}
6989
0 commit comments