Skip to content

Commit 540af4d

Browse files
committed
refactor: remove inner buffer
Signed-off-by: ekexium <[email protected]>
1 parent 9ceac82 commit 540af4d

File tree

1 file changed

+32
-51
lines changed

1 file changed

+32
-51
lines changed

src/transaction/buffer.rs

Lines changed: 32 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,36 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use crate::{BoundRange, Key, KvPair, Result, Value};
4-
use derive_new::new;
54
use std::{
65
collections::{btree_map::Entry, BTreeMap, HashMap},
76
future::Future,
87
};
98
use tikv_client_proto::kvrpcpb;
109

11-
#[derive(new)]
12-
struct InnerBuffer {
13-
#[new(default)]
10+
/// A caching layer which buffers reads and writes in a transaction.
11+
pub struct Buffer {
1412
primary_key: Option<Key>,
15-
#[new(default)]
1613
entry_map: BTreeMap<Key, BufferEntry>,
1714
is_pessimistic: bool,
1815
}
1916

20-
impl InnerBuffer {
21-
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
22-
let key = key.into();
23-
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
24-
self.primary_key.get_or_insert_with(|| key.clone());
25-
}
26-
self.entry_map.insert(key, entry);
27-
}
28-
29-
/// Set the primary key if it is not set
30-
pub fn primary_key_or(&mut self, key: &Key) {
31-
self.primary_key.get_or_insert(key.clone());
32-
}
33-
}
34-
35-
/// A caching layer which buffers reads and writes in a transaction.
36-
pub struct Buffer {
37-
inner: InnerBuffer,
38-
}
39-
4017
impl Buffer {
4118
pub fn new(is_pessimistic: bool) -> Buffer {
4219
Buffer {
43-
inner: InnerBuffer::new(is_pessimistic),
20+
primary_key: None,
21+
entry_map: BTreeMap::new(),
22+
is_pessimistic,
4423
}
4524
}
4625

4726
/// Get the primary key of the buffer.
4827
pub async fn get_primary_key(&self) -> Option<Key> {
49-
self.inner.primary_key.clone()
28+
self.primary_key.clone()
5029
}
5130

5231
/// Set the primary key if it is not set
5332
pub async fn primary_key_or(&mut self, key: &Key) {
54-
self.inner.primary_key_or(key);
33+
self.primary_key.get_or_insert_with(|| key.clone());
5534
}
5635

5736
/// Get a value from the buffer.
@@ -74,7 +53,7 @@ impl Buffer {
7453
MutationValue::Determined(value) => Ok(value),
7554
MutationValue::Undetermined => {
7655
let value = f(key.clone()).await?;
77-
Self::update_cache(&mut self.inner, key, value.clone());
56+
self.update_cache(key, value.clone());
7857
Ok(value)
7958
}
8059
}
@@ -102,7 +81,6 @@ impl Buffer {
10281
) = keys
10382
.map(|key| {
10483
let value = self
105-
.inner
10684
.entry_map
10785
.get(&key)
10886
.map(BufferEntry::get_value)
@@ -123,7 +101,7 @@ impl Buffer {
123101
for kvpair in &fetched_results {
124102
let key = kvpair.0.clone();
125103
let value = Some(kvpair.1.clone());
126-
Self::update_cache(&mut self.inner, key, value);
104+
self.update_cache(key, value);
127105
}
128106

129107
let results = cached_results.chain(fetched_results.into_iter());
@@ -142,7 +120,7 @@ impl Buffer {
142120
Fut: Future<Output = Result<Vec<KvPair>>>,
143121
{
144122
// read from local buffer
145-
let mutation_range = self.inner.entry_map.range(range.clone());
123+
let mutation_range = self.entry_map.range(range.clone());
146124

147125
// fetch from TiKV
148126
// fetch more entries because some of them may be deleted.
@@ -173,7 +151,7 @@ impl Buffer {
173151

174152
// update local buffer
175153
for (k, v) in &results {
176-
Self::update_cache(&mut self.inner, k.clone(), Some(v.clone()));
154+
self.update_cache(k.clone(), Some(v.clone()));
177155
}
178156

179157
let mut res = results
@@ -187,9 +165,8 @@ impl Buffer {
187165

188166
/// Lock the given key if necessary.
189167
pub async fn lock(&mut self, key: Key) {
190-
self.inner.primary_key.get_or_insert_with(|| key.clone());
168+
self.primary_key.get_or_insert_with(|| key.clone());
191169
let value = self
192-
.inner
193170
.entry_map
194171
.entry(key)
195172
// Mutated keys don't need a lock.
@@ -202,61 +179,57 @@ impl Buffer {
202179

203180
/// Insert a value into the buffer (does not write through).
204181
pub async fn put(&mut self, key: Key, value: Value) {
205-
self.inner.insert(key, BufferEntry::Put(value));
182+
self.insert_entry(key, BufferEntry::Put(value));
206183
}
207184

208185
/// Mark a value as Insert mutation into the buffer (does not write through).
209186
pub async fn insert(&mut self, key: Key, value: Value) {
210-
let mut entry = self.inner.entry_map.entry(key.clone());
187+
let mut entry = self.entry_map.entry(key.clone());
211188
match entry {
212189
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
213190
o.insert(BufferEntry::Put(value));
214191
}
215-
_ => self.inner.insert(key, BufferEntry::Insert(value)),
192+
_ => self.insert_entry(key, BufferEntry::Insert(value)),
216193
}
217194
}
218195

219196
/// Mark a value as deleted.
220197
pub async fn delete(&mut self, key: Key) {
221-
let is_pessimistic = self.inner.is_pessimistic;
222-
let mut entry = self.inner.entry_map.entry(key.clone());
198+
let is_pessimistic = self.is_pessimistic;
199+
let mut entry = self.entry_map.entry(key.clone());
223200

224201
match entry {
225202
Entry::Occupied(ref mut o)
226203
if matches!(o.get(), BufferEntry::Insert(_)) && !is_pessimistic =>
227204
{
228205
o.insert(BufferEntry::CheckNotExist);
229206
}
230-
_ => self.inner.insert(key, BufferEntry::Del),
207+
_ => self.insert_entry(key, BufferEntry::Del),
231208
}
232209
}
233210

234211
/// Converts the buffered mutations to the proto buffer version
235212
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
236-
self.inner
237-
.entry_map
213+
self.entry_map
238214
.iter()
239215
.filter_map(|(key, mutation)| mutation.to_proto_with_key(key))
240216
.collect()
241217
}
242218

243219
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
244-
self.inner
245-
.entry_map
220+
self.entry_map
246221
.get(&key)
247222
.map(BufferEntry::get_value)
248223
.unwrap_or(MutationValue::Undetermined)
249224
}
250225

251-
fn update_cache(buffer: &mut InnerBuffer, key: Key, value: Option<Value>) {
252-
match buffer.entry_map.get(&key) {
226+
fn update_cache(&mut self, key: Key, value: Option<Value>) {
227+
match self.entry_map.get(&key) {
253228
Some(BufferEntry::Locked(None)) => {
254-
buffer
255-
.entry_map
256-
.insert(key, BufferEntry::Locked(Some(value)));
229+
self.entry_map.insert(key, BufferEntry::Locked(Some(value)));
257230
}
258231
None => {
259-
buffer.entry_map.insert(key, BufferEntry::Cached(value));
232+
self.entry_map.insert(key, BufferEntry::Cached(value));
260233
}
261234
Some(BufferEntry::Cached(v)) | Some(BufferEntry::Locked(Some(v))) => {
262235
assert!(&value == v);
@@ -275,6 +248,14 @@ impl Buffer {
275248
}
276249
}
277250
}
251+
252+
fn insert_entry(&mut self, key: impl Into<Key>, entry: BufferEntry) {
253+
let key = key.into();
254+
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
255+
self.primary_key.get_or_insert_with(|| key.clone());
256+
}
257+
self.entry_map.insert(key, entry);
258+
}
278259
}
279260

280261
// The state of a key-value pair in the buffer.

0 commit comments

Comments
 (0)