Skip to content

Commit 8ba16a0

Browse files
committed
fix: remove the Mutex in Buffer
Signed-off-by: ekexium <[email protected]>
1 parent 4870985 commit 8ba16a0

File tree

4 files changed

+49
-63
lines changed

4 files changed

+49
-63
lines changed

src/transaction/buffer.rs

Lines changed: 32 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use std::{
66
future::Future,
77
};
88
use tikv_client_proto::kvrpcpb;
9-
use tokio::sync::{Mutex, MutexGuard};
109

1110
#[derive(Default)]
1211
struct InnerBuffer {
@@ -31,17 +30,17 @@ impl InnerBuffer {
3130
/// A caching layer which buffers reads and writes in a transaction.
3231
#[derive(Default)]
3332
pub struct Buffer {
34-
mutations: Mutex<InnerBuffer>,
33+
inner: InnerBuffer,
3534
}
3635

3736
impl Buffer {
3837
/// Get the primary key of the buffer.
3938
pub async fn get_primary_key(&self) -> Option<Key> {
40-
self.mutations.lock().await.primary_key.clone()
39+
self.inner.primary_key.clone()
4140
}
4241
/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
43-
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
44-
self.mutations.lock().await.get_primary_key_or(key).clone()
42+
pub async fn get_primary_key_or(&mut self, key: &Key) -> Key {
43+
self.inner.get_primary_key_or(key).clone()
4544
}
4645

4746
/// Get a value from the buffer.
@@ -55,7 +54,7 @@ impl Buffer {
5554

5655
/// Get a value from the buffer. If the value is not present, run `f` to get
5756
/// the value.
58-
pub async fn get_or_else<F, Fut>(&self, key: Key, f: F) -> Result<Option<Value>>
57+
pub async fn get_or_else<F, Fut>(&mut self, key: Key, f: F) -> Result<Option<Value>>
5958
where
6059
F: FnOnce(Key) -> Fut,
6160
Fut: Future<Output = Result<Option<Value>>>,
@@ -64,8 +63,7 @@ impl Buffer {
6463
MutationValue::Determined(value) => Ok(value),
6564
MutationValue::Undetermined => {
6665
let value = f(key.clone()).await?;
67-
let mut mutations = self.mutations.lock().await;
68-
Self::update_cache(&mut mutations, key, value.clone());
66+
Self::update_cache(&mut self.inner, key, value.clone());
6967
Ok(value)
7068
}
7169
}
@@ -76,7 +74,7 @@ impl Buffer {
7674
///
7775
/// only used for snapshot read (i.e. not for `batch_get_for_update`)
7876
pub async fn batch_get_or_else<F, Fut>(
79-
&self,
77+
&mut self,
8078
keys: impl Iterator<Item = Key>,
8179
f: F,
8280
) -> Result<impl Iterator<Item = KvPair>>
@@ -85,15 +83,15 @@ impl Buffer {
8583
Fut: Future<Output = Result<Vec<KvPair>>>,
8684
{
8785
let (cached_results, undetermined_keys) = {
88-
let mutations = self.mutations.lock().await;
8986
// Partition the keys into those we have buffered and those we have to
9087
// get from the store.
9188
let (undetermined_keys, cached_results): (
9289
Vec<(Key, MutationValue)>,
9390
Vec<(Key, MutationValue)>,
9491
) = keys
9592
.map(|key| {
96-
let value = mutations
93+
let value = self
94+
.inner
9795
.entry_map
9896
.get(&key)
9997
.map(BufferEntry::get_value)
@@ -111,11 +109,10 @@ impl Buffer {
111109
};
112110

113111
let fetched_results = f(Box::new(undetermined_keys)).await?;
114-
let mut mutations = self.mutations.lock().await;
115112
for kvpair in &fetched_results {
116113
let key = kvpair.0.clone();
117114
let value = Some(kvpair.1.clone());
118-
Self::update_cache(&mut mutations, key, value);
115+
Self::update_cache(&mut self.inner, key, value);
119116
}
120117

121118
let results = cached_results.chain(fetched_results.into_iter());
@@ -124,7 +121,7 @@ impl Buffer {
124121

125122
/// Run `f` to fetch entries in `range` from TiKV. Combine them with mutations in local buffer. Returns the results.
126123
pub async fn scan_and_fetch<F, Fut>(
127-
&self,
124+
&mut self,
128125
range: BoundRange,
129126
limit: u32,
130127
f: F,
@@ -134,8 +131,7 @@ impl Buffer {
134131
Fut: Future<Output = Result<Vec<KvPair>>>,
135132
{
136133
// read from local buffer
137-
let mut mutations = self.mutations.lock().await;
138-
let mutation_range = mutations.entry_map.range(range.clone());
134+
let mutation_range = self.inner.entry_map.range(range.clone());
139135

140136
// fetch from TiKV
141137
// fetch more entries because some of them may be deleted.
@@ -166,7 +162,7 @@ impl Buffer {
166162

167163
// update local buffer
168164
for (k, v) in &results {
169-
Self::update_cache(&mut mutations, k.clone(), Some(v.clone()));
165+
Self::update_cache(&mut self.inner, k.clone(), Some(v.clone()));
170166
}
171167

172168
let mut res = results
@@ -179,10 +175,10 @@ impl Buffer {
179175
}
180176

181177
/// Lock the given key if necessary.
182-
pub async fn lock(&self, key: Key) {
183-
let mutations = &mut self.mutations.lock().await;
184-
mutations.primary_key.get_or_insert(key.clone());
185-
let value = mutations
178+
pub async fn lock(&mut self, key: Key) {
179+
self.inner.primary_key.get_or_insert(key.clone());
180+
let value = self
181+
.inner
186182
.entry_map
187183
.entry(key)
188184
// Mutated keys don't need a lock.
@@ -194,25 +190,19 @@ impl Buffer {
194190
}
195191

196192
/// Insert a value into the buffer (does not write through).
197-
pub async fn put(&self, key: Key, value: Value) {
198-
self.mutations
199-
.lock()
200-
.await
201-
.insert(key, BufferEntry::Put(value));
193+
pub async fn put(&mut self, key: Key, value: Value) {
194+
self.inner.insert(key, BufferEntry::Put(value));
202195
}
203196

204197
/// Mark a value as Insert mutation into the buffer (does not write through).
205-
pub async fn insert(&self, key: Key, value: Value) {
206-
self.mutations
207-
.lock()
208-
.await
209-
.insert(key, BufferEntry::Insert(value));
198+
pub async fn insert(&mut self, key: Key, value: Value) {
199+
self.inner.insert(key, BufferEntry::Insert(value));
210200
}
211201

212202
/// Mark a value as deleted.
213-
pub async fn delete(&self, key: Key) {
214-
let mut mutations = self.mutations.lock().await;
215-
let value = mutations
203+
pub async fn delete(&mut self, key: Key) {
204+
let value = self
205+
.inner
216206
.entry_map
217207
.entry(key.clone())
218208
.or_insert(BufferEntry::Del);
@@ -224,31 +214,27 @@ impl Buffer {
224214
new_value = BufferEntry::Del
225215
}
226216

227-
mutations.insert(key, new_value);
217+
self.inner.insert(key, new_value);
228218
}
229219

230220
/// Converts the buffered mutations to the proto buffer version
231221
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
232-
self.mutations
233-
.lock()
234-
.await
222+
self.inner
235223
.entry_map
236224
.iter()
237225
.filter_map(|(key, mutation)| mutation.to_proto_with_key(key))
238226
.collect()
239227
}
240228

241229
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
242-
self.mutations
243-
.lock()
244-
.await
230+
self.inner
245231
.entry_map
246232
.get(&key)
247233
.map(BufferEntry::get_value)
248234
.unwrap_or(MutationValue::Undetermined)
249235
}
250236

251-
fn update_cache(buffer: &mut MutexGuard<InnerBuffer>, key: Key, value: Option<Value>) {
237+
fn update_cache(buffer: &mut InnerBuffer, key: Key, value: Option<Value>) {
252238
match buffer.entry_map.get(&key) {
253239
Some(BufferEntry::Locked(None)) => {
254240
buffer
@@ -378,7 +364,7 @@ mod tests {
378364
#[tokio::test]
379365
#[allow(unreachable_code)]
380366
async fn set_and_get_from_buffer() {
381-
let buffer = Buffer::default();
367+
let mut buffer = Buffer::default();
382368
buffer
383369
.put(b"key1".to_vec().into(), b"value1".to_vec())
384370
.await;
@@ -411,7 +397,7 @@ mod tests {
411397
#[tokio::test]
412398
#[allow(unreachable_code)]
413399
async fn insert_and_get_from_buffer() {
414-
let buffer = Buffer::default();
400+
let mut buffer = Buffer::default();
415401
buffer
416402
.insert(b"key1".to_vec().into(), b"value1".to_vec())
417403
.await;
@@ -453,13 +439,13 @@ mod tests {
453439
let v2: Value = b"value2".to_vec();
454440
let v2_ = v2.clone();
455441

456-
let buffer = Buffer::default();
442+
let mut buffer = Buffer::default();
457443
let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_)))));
458444
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!())));
459445
assert_eq!(r1.unwrap().unwrap(), v1);
460446
assert_eq!(r2.unwrap().unwrap(), v1);
461447

462-
let buffer = Buffer::default();
448+
let mut buffer = Buffer::default();
463449
let r1 = block_on(
464450
buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| {
465451
ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()]))

src/transaction/snapshot.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,26 @@ pub struct Snapshot {
1919

2020
impl Snapshot {
2121
/// Get the value associated with the given key.
22-
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
22+
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
2323
self.transaction.get(key).await
2424
}
2525

2626
/// Check whether the key exists.
27-
pub async fn key_exists(&self, key: impl Into<Key>) -> Result<bool> {
27+
pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
2828
self.transaction.key_exists(key).await
2929
}
3030

3131
/// Get the values associated with the given keys.
3232
pub async fn batch_get(
33-
&self,
33+
&mut self,
3434
keys: impl IntoIterator<Item = impl Into<Key>>,
3535
) -> Result<impl Iterator<Item = KvPair>> {
3636
self.transaction.batch_get(keys).await
3737
}
3838

3939
/// Scan a range, return at most `limit` key-value pairs that lying in the range.
4040
pub async fn scan(
41-
&self,
41+
&mut self,
4242
range: impl Into<BoundRange>,
4343
limit: u32,
4444
) -> Result<impl Iterator<Item = KvPair>> {
@@ -47,7 +47,7 @@ impl Snapshot {
4747

4848
/// Scan a range, return at most `limit` keys that lying in the range.
4949
pub async fn scan_keys(
50-
&self,
50+
&mut self,
5151
range: impl Into<BoundRange>,
5252
limit: u32,
5353
) -> Result<impl Iterator<Item = Key>> {
@@ -56,7 +56,7 @@ impl Snapshot {
5656

5757
/// Unimplemented. Similar to scan, but in the reverse direction.
5858
#[allow(dead_code)]
59-
fn scan_reverse(&self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
59+
fn scan_reverse(&mut self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
6060
self.transaction.scan_reverse(range)
6161
}
6262
}

src/transaction/transaction.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl<PdC: PdClient> Transaction<PdC> {
9393
/// txn.commit().await.unwrap();
9494
/// # });
9595
/// ```
96-
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
96+
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
9797
self.check_allow_operation().await?;
9898
let timestamp = self.timestamp.clone();
9999
let rpc = self.rpc.clone();
@@ -170,7 +170,7 @@ impl<PdC: PdClient> Transaction<PdC> {
170170
/// txn.commit().await.unwrap();
171171
/// # });
172172
/// ```
173-
pub async fn key_exists(&self, key: impl Into<Key>) -> Result<bool> {
173+
pub async fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
174174
let key = key.into();
175175
Ok(self.scan_keys(key.clone()..=key, 1).await?.next().is_some())
176176
}
@@ -202,7 +202,7 @@ impl<PdC: PdClient> Transaction<PdC> {
202202
/// # });
203203
/// ```
204204
pub async fn batch_get(
205-
&self,
205+
&mut self,
206206
keys: impl IntoIterator<Item = impl Into<Key>>,
207207
) -> Result<impl Iterator<Item = KvPair>> {
208208
self.check_allow_operation().await?;
@@ -299,7 +299,7 @@ impl<PdC: PdClient> Transaction<PdC> {
299299
/// # });
300300
/// ```
301301
pub async fn scan(
302-
&self,
302+
&mut self,
303303
range: impl Into<BoundRange>,
304304
limit: u32,
305305
) -> Result<impl Iterator<Item = KvPair>> {
@@ -333,7 +333,7 @@ impl<PdC: PdClient> Transaction<PdC> {
333333
/// # });
334334
/// ```
335335
pub async fn scan_keys(
336-
&self,
336+
&mut self,
337337
range: impl Into<BoundRange>,
338338
limit: u32,
339339
) -> Result<impl Iterator<Item = Key>> {
@@ -589,7 +589,7 @@ impl<PdC: PdClient> Transaction<PdC> {
589589
}
590590

591591
async fn scan_inner(
592-
&self,
592+
&mut self,
593593
range: impl Into<BoundRange>,
594594
limit: u32,
595595
key_only: bool,

tests/integration_tests.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ async fn crud() -> Result<()> {
102102
txn.commit().await?;
103103

104104
// Read again from TiKV
105-
let snapshot = client.snapshot(
105+
let mut snapshot = client.snapshot(
106106
client.current_timestamp().await?,
107107
// TODO needed because pessimistic does not check locks (#235)
108108
TransactionOptions::new_optimistic(),
@@ -341,9 +341,9 @@ async fn txn_bank_transfer() -> Result<()> {
341341
.await?;
342342
let chosen_people = people.iter().choose_multiple(&mut rng, 2);
343343
let alice = chosen_people[0];
344-
let mut alice_balance = get_txn_u32(&txn, alice.clone()).await?;
344+
let mut alice_balance = get_txn_u32(&mut txn, alice.clone()).await?;
345345
let bob = chosen_people[1];
346-
let mut bob_balance = get_txn_u32(&txn, bob.clone()).await?;
346+
let mut bob_balance = get_txn_u32(&mut txn, bob.clone()).await?;
347347
if alice_balance == 0 {
348348
txn.rollback().await?;
349349
continue;
@@ -362,7 +362,7 @@ async fn txn_bank_transfer() -> Result<()> {
362362
let mut new_sum = 0;
363363
let mut txn = client.begin_optimistic().await?;
364364
for person in people.iter() {
365-
new_sum += get_txn_u32(&txn, person.clone()).await?;
365+
new_sum += get_txn_u32(&mut txn, person.clone()).await?;
366366
}
367367
assert_eq!(sum, new_sum);
368368
txn.commit().await?;
@@ -716,7 +716,7 @@ async fn get_u32(client: &RawClient, key: Vec<u8>) -> Result<u32> {
716716
}
717717

718718
// helper function
719-
async fn get_txn_u32(txn: &Transaction, key: Vec<u8>) -> Result<u32> {
719+
async fn get_txn_u32(txn: &mut Transaction, key: Vec<u8>) -> Result<u32> {
720720
let x = txn.get(key).await?.unwrap();
721721
let boxed_slice = x.into_boxed_slice();
722722
let array: Box<[u8; 4]> = boxed_slice

0 commit comments

Comments
 (0)