Skip to content

Commit f3ce6d7

Browse files
authored
Merge pull request #256 from ekexium/buffer-mutex
Remove the Mutex in Buffer
2 parents bdff7e3 + 540af4d commit f3ce6d7

File tree

4 files changed

+63
-92
lines changed

4 files changed

+63
-92
lines changed

src/transaction/buffer.rs

Lines changed: 46 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,36 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use crate::{BoundRange, Key, KvPair, Result, Value};
4-
use derive_new::new;
54
use std::{
65
collections::{btree_map::Entry, BTreeMap, HashMap},
76
future::Future,
87
};
98
use tikv_client_proto::kvrpcpb;
10-
use tokio::sync::{Mutex, MutexGuard};
119

12-
#[derive(new)]
13-
struct InnerBuffer {
14-
#[new(default)]
10+
/// A caching layer which buffers reads and writes in a transaction.
11+
pub struct Buffer {
1512
primary_key: Option<Key>,
16-
#[new(default)]
1713
entry_map: BTreeMap<Key, BufferEntry>,
1814
is_pessimistic: bool,
1915
}
2016

21-
impl InnerBuffer {
22-
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
23-
let key = key.into();
24-
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
25-
self.primary_key.get_or_insert_with(|| key.clone());
26-
}
27-
self.entry_map.insert(key, entry);
28-
}
29-
30-
/// Set the primary key if it is not set
31-
pub fn primary_key_or(&mut self, key: &Key) {
32-
self.primary_key.get_or_insert(key.clone());
33-
}
34-
}
35-
36-
/// A caching layer which buffers reads and writes in a transaction.
37-
pub struct Buffer {
38-
inner: Mutex<InnerBuffer>,
39-
}
40-
4117
impl Buffer {
4218
pub fn new(is_pessimistic: bool) -> Buffer {
4319
Buffer {
44-
inner: Mutex::new(InnerBuffer::new(is_pessimistic)),
20+
primary_key: None,
21+
entry_map: BTreeMap::new(),
22+
is_pessimistic,
4523
}
4624
}
4725

4826
/// Get the primary key of the buffer.
4927
pub async fn get_primary_key(&self) -> Option<Key> {
50-
self.inner.lock().await.primary_key.clone()
28+
self.primary_key.clone()
5129
}
5230

5331
/// Set the primary key if it is not set
54-
pub async fn primary_key_or(&self, key: &Key) {
55-
self.inner.lock().await.primary_key_or(key);
32+
pub async fn primary_key_or(&mut self, key: &Key) {
33+
self.primary_key.get_or_insert_with(|| key.clone());
5634
}
5735

5836
/// Get a value from the buffer.
@@ -66,7 +44,7 @@ impl Buffer {
6644

6745
/// Get a value from the buffer. If the value is not present, run `f` to get
6846
/// the value.
69-
pub async fn get_or_else<F, Fut>(&self, key: Key, f: F) -> Result<Option<Value>>
47+
pub async fn get_or_else<F, Fut>(&mut self, key: Key, f: F) -> Result<Option<Value>>
7048
where
7149
F: FnOnce(Key) -> Fut,
7250
Fut: Future<Output = Result<Option<Value>>>,
@@ -75,8 +53,7 @@ impl Buffer {
7553
MutationValue::Determined(value) => Ok(value),
7654
MutationValue::Undetermined => {
7755
let value = f(key.clone()).await?;
78-
let mut mutations = self.inner.lock().await;
79-
Self::update_cache(&mut mutations, key, value.clone());
56+
self.update_cache(key, value.clone());
8057
Ok(value)
8158
}
8259
}
@@ -87,7 +64,7 @@ impl Buffer {
8764
///
8865
/// only used for snapshot read (i.e. not for `batch_get_for_update`)
8966
pub async fn batch_get_or_else<F, Fut>(
90-
&self,
67+
&mut self,
9168
keys: impl Iterator<Item = Key>,
9269
f: F,
9370
) -> Result<impl Iterator<Item = KvPair>>
@@ -96,15 +73,14 @@ impl Buffer {
9673
Fut: Future<Output = Result<Vec<KvPair>>>,
9774
{
9875
let (cached_results, undetermined_keys) = {
99-
let mutations = self.inner.lock().await;
10076
// Partition the keys into those we have buffered and those we have to
10177
// get from the store.
10278
let (undetermined_keys, cached_results): (
10379
Vec<(Key, MutationValue)>,
10480
Vec<(Key, MutationValue)>,
10581
) = keys
10682
.map(|key| {
107-
let value = mutations
83+
let value = self
10884
.entry_map
10985
.get(&key)
11086
.map(BufferEntry::get_value)
@@ -122,11 +98,10 @@ impl Buffer {
12298
};
12399

124100
let fetched_results = f(Box::new(undetermined_keys)).await?;
125-
let mut mutations = self.inner.lock().await;
126101
for kvpair in &fetched_results {
127102
let key = kvpair.0.clone();
128103
let value = Some(kvpair.1.clone());
129-
Self::update_cache(&mut mutations, key, value);
104+
self.update_cache(key, value);
130105
}
131106

132107
let results = cached_results.chain(fetched_results.into_iter());
@@ -135,7 +110,7 @@ impl Buffer {
135110

136111
/// Run `f` to fetch entries in `range` from TiKV. Combine them with mutations in local buffer. Returns the results.
137112
pub async fn scan_and_fetch<F, Fut>(
138-
&self,
113+
&mut self,
139114
range: BoundRange,
140115
limit: u32,
141116
f: F,
@@ -145,8 +120,7 @@ impl Buffer {
145120
Fut: Future<Output = Result<Vec<KvPair>>>,
146121
{
147122
// read from local buffer
148-
let mut mutations = self.inner.lock().await;
149-
let mutation_range = mutations.entry_map.range(range.clone());
123+
let mutation_range = self.entry_map.range(range.clone());
150124

151125
// fetch from TiKV
152126
// fetch more entries because some of them may be deleted.
@@ -177,7 +151,7 @@ impl Buffer {
177151

178152
// update local buffer
179153
for (k, v) in &results {
180-
Self::update_cache(&mut mutations, k.clone(), Some(v.clone()));
154+
self.update_cache(k.clone(), Some(v.clone()));
181155
}
182156

183157
let mut res = results
@@ -190,10 +164,9 @@ impl Buffer {
190164
}
191165

192166
/// Lock the given key if necessary.
193-
pub async fn lock(&self, key: Key) {
194-
let mutations = &mut self.inner.lock().await;
195-
mutations.primary_key.get_or_insert_with(|| key.clone());
196-
let value = mutations
167+
pub async fn lock(&mut self, key: Key) {
168+
self.primary_key.get_or_insert_with(|| key.clone());
169+
let value = self
197170
.entry_map
198171
.entry(key)
199172
// Mutated keys don't need a lock.
@@ -205,68 +178,58 @@ impl Buffer {
205178
}
206179

207180
/// Insert a value into the buffer (does not write through).
208-
pub async fn put(&self, key: Key, value: Value) {
209-
self.inner.lock().await.insert(key, BufferEntry::Put(value));
181+
pub async fn put(&mut self, key: Key, value: Value) {
182+
self.insert_entry(key, BufferEntry::Put(value));
210183
}
211184

212185
/// Mark a value as Insert mutation into the buffer (does not write through).
213-
pub async fn insert(&self, key: Key, value: Value) {
214-
let mut mutations = self.inner.lock().await;
215-
let mut entry = mutations.entry_map.entry(key.clone());
186+
pub async fn insert(&mut self, key: Key, value: Value) {
187+
let mut entry = self.entry_map.entry(key.clone());
216188
match entry {
217189
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
218190
o.insert(BufferEntry::Put(value));
219191
}
220-
_ => mutations.insert(key, BufferEntry::Insert(value)),
192+
_ => self.insert_entry(key, BufferEntry::Insert(value)),
221193
}
222194
}
223195

224196
/// Mark a value as deleted.
225-
pub async fn delete(&self, key: Key) {
226-
let mut mutations = self.inner.lock().await;
227-
let is_pessimistic = mutations.is_pessimistic;
228-
let mut entry = mutations.entry_map.entry(key.clone());
197+
pub async fn delete(&mut self, key: Key) {
198+
let is_pessimistic = self.is_pessimistic;
199+
let mut entry = self.entry_map.entry(key.clone());
229200

230201
match entry {
231202
Entry::Occupied(ref mut o)
232203
if matches!(o.get(), BufferEntry::Insert(_)) && !is_pessimistic =>
233204
{
234205
o.insert(BufferEntry::CheckNotExist);
235206
}
236-
_ => mutations.insert(key, BufferEntry::Del),
207+
_ => self.insert_entry(key, BufferEntry::Del),
237208
}
238209
}
239210

240211
/// Converts the buffered mutations to the proto buffer version
241212
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
242-
self.inner
243-
.lock()
244-
.await
245-
.entry_map
213+
self.entry_map
246214
.iter()
247215
.filter_map(|(key, mutation)| mutation.to_proto_with_key(key))
248216
.collect()
249217
}
250218

251219
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
252-
self.inner
253-
.lock()
254-
.await
255-
.entry_map
220+
self.entry_map
256221
.get(&key)
257222
.map(BufferEntry::get_value)
258223
.unwrap_or(MutationValue::Undetermined)
259224
}
260225

261-
fn update_cache(buffer: &mut MutexGuard<InnerBuffer>, key: Key, value: Option<Value>) {
262-
match buffer.entry_map.get(&key) {
226+
fn update_cache(&mut self, key: Key, value: Option<Value>) {
227+
match self.entry_map.get(&key) {
263228
Some(BufferEntry::Locked(None)) => {
264-
buffer
265-
.entry_map
266-
.insert(key, BufferEntry::Locked(Some(value)));
229+
self.entry_map.insert(key, BufferEntry::Locked(Some(value)));
267230
}
268231
None => {
269-
buffer.entry_map.insert(key, BufferEntry::Cached(value));
232+
self.entry_map.insert(key, BufferEntry::Cached(value));
270233
}
271234
Some(BufferEntry::Cached(v)) | Some(BufferEntry::Locked(Some(v))) => {
272235
assert!(&value == v);
@@ -285,6 +248,14 @@ impl Buffer {
285248
}
286249
}
287250
}
251+
252+
fn insert_entry(&mut self, key: impl Into<Key>, entry: BufferEntry) {
253+
let key = key.into();
254+
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
255+
self.primary_key.get_or_insert_with(|| key.clone());
256+
}
257+
self.entry_map.insert(key, entry);
258+
}
288259
}
289260

290261
// The state of a key-value pair in the buffer.
@@ -388,7 +359,7 @@ mod tests {
388359
#[tokio::test]
389360
#[allow(unreachable_code)]
390361
async fn set_and_get_from_buffer() {
391-
let buffer = Buffer::new(false);
362+
let mut buffer = Buffer::new(false);
392363
buffer
393364
.put(b"key1".to_vec().into(), b"value1".to_vec())
394365
.await;
@@ -421,7 +392,7 @@ mod tests {
421392
#[tokio::test]
422393
#[allow(unreachable_code)]
423394
async fn insert_and_get_from_buffer() {
424-
let buffer = Buffer::new(false);
395+
let mut buffer = Buffer::new(false);
425396
buffer
426397
.insert(b"key1".to_vec().into(), b"value1".to_vec())
427398
.await;
@@ -463,13 +434,13 @@ mod tests {
463434
let v2: Value = b"value2".to_vec();
464435
let v2_ = v2.clone();
465436

466-
let buffer = Buffer::new(false);
437+
let mut buffer = Buffer::new(false);
467438
let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_)))));
468439
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!())));
469440
assert_eq!(r1.unwrap().unwrap(), v1);
470441
assert_eq!(r2.unwrap().unwrap(), v1);
471442

472-
let buffer = Buffer::new(false);
443+
let mut buffer = Buffer::new(false);
473444
let r1 = block_on(
474445
buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| {
475446
ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()]))

src/transaction/snapshot.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,26 @@ pub struct Snapshot {
1919

2020
impl Snapshot {
2121
/// Get the value associated with the given key.
22-
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
22+
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
2323
self.transaction.get(key).await
2424
}
2525

2626
/// Check whether the key exists.
27-
pub async fn key_exists(&self, key: impl Into<Key>) -> Result<bool> {
27+
pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
2828
self.transaction.key_exists(key).await
2929
}
3030

3131
/// Get the values associated with the given keys.
3232
pub async fn batch_get(
33-
&self,
33+
&mut self,
3434
keys: impl IntoIterator<Item = impl Into<Key>>,
3535
) -> Result<impl Iterator<Item = KvPair>> {
3636
self.transaction.batch_get(keys).await
3737
}
3838

3939
/// Scan a range, return at most `limit` key-value pairs that lying in the range.
4040
pub async fn scan(
41-
&self,
41+
&mut self,
4242
range: impl Into<BoundRange>,
4343
limit: u32,
4444
) -> Result<impl Iterator<Item = KvPair>> {
@@ -47,7 +47,7 @@ impl Snapshot {
4747

4848
/// Scan a range, return at most `limit` keys that lying in the range.
4949
pub async fn scan_keys(
50-
&self,
50+
&mut self,
5151
range: impl Into<BoundRange>,
5252
limit: u32,
5353
) -> Result<impl Iterator<Item = Key>> {
@@ -56,7 +56,7 @@ impl Snapshot {
5656

5757
/// Unimplemented. Similar to scan, but in the reverse direction.
5858
#[allow(dead_code)]
59-
fn scan_reverse(&self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
59+
fn scan_reverse(&mut self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
6060
self.transaction.scan_reverse(range)
6161
}
6262
}

0 commit comments

Comments
 (0)