1
1
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
2
3
3
use crate :: { BoundRange , Key , KvPair , Result , Value } ;
4
- use derive_new:: new;
5
4
use std:: {
6
5
collections:: { btree_map:: Entry , BTreeMap , HashMap } ,
7
6
future:: Future ,
8
7
} ;
9
8
use tikv_client_proto:: kvrpcpb;
10
- use tokio:: sync:: { Mutex , MutexGuard } ;
11
9
12
- #[ derive( new) ]
13
- struct InnerBuffer {
14
- #[ new( default ) ]
10
+ /// A caching layer which buffers reads and writes in a transaction.
11
+ pub struct Buffer {
15
12
primary_key : Option < Key > ,
16
- #[ new( default ) ]
17
13
entry_map : BTreeMap < Key , BufferEntry > ,
18
14
is_pessimistic : bool ,
19
15
}
20
16
21
- impl InnerBuffer {
22
- fn insert ( & mut self , key : impl Into < Key > , entry : BufferEntry ) {
23
- let key = key. into ( ) ;
24
- if !matches ! ( entry, BufferEntry :: Cached ( _) | BufferEntry :: CheckNotExist ) {
25
- self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
26
- }
27
- self . entry_map . insert ( key, entry) ;
28
- }
29
-
30
- /// Set the primary key if it is not set
31
- pub fn primary_key_or ( & mut self , key : & Key ) {
32
- self . primary_key . get_or_insert ( key. clone ( ) ) ;
33
- }
34
- }
35
-
36
- /// A caching layer which buffers reads and writes in a transaction.
37
- pub struct Buffer {
38
- inner : Mutex < InnerBuffer > ,
39
- }
40
-
41
17
impl Buffer {
42
18
pub fn new ( is_pessimistic : bool ) -> Buffer {
43
19
Buffer {
44
- inner : Mutex :: new ( InnerBuffer :: new ( is_pessimistic) ) ,
20
+ primary_key : None ,
21
+ entry_map : BTreeMap :: new ( ) ,
22
+ is_pessimistic,
45
23
}
46
24
}
47
25
48
26
/// Get the primary key of the buffer.
49
27
pub async fn get_primary_key ( & self ) -> Option < Key > {
50
- self . inner . lock ( ) . await . primary_key . clone ( )
28
+ self . primary_key . clone ( )
51
29
}
52
30
53
31
/// Set the primary key if it is not set
54
- pub async fn primary_key_or ( & self , key : & Key ) {
55
- self . inner . lock ( ) . await . primary_key_or ( key ) ;
32
+ pub async fn primary_key_or ( & mut self , key : & Key ) {
33
+ self . primary_key . get_or_insert_with ( || key . clone ( ) ) ;
56
34
}
57
35
58
36
/// Get a value from the buffer.
@@ -66,7 +44,7 @@ impl Buffer {
66
44
67
45
/// Get a value from the buffer. If the value is not present, run `f` to get
68
46
/// the value.
69
- pub async fn get_or_else < F , Fut > ( & self , key : Key , f : F ) -> Result < Option < Value > >
47
+ pub async fn get_or_else < F , Fut > ( & mut self , key : Key , f : F ) -> Result < Option < Value > >
70
48
where
71
49
F : FnOnce ( Key ) -> Fut ,
72
50
Fut : Future < Output = Result < Option < Value > > > ,
@@ -75,8 +53,7 @@ impl Buffer {
75
53
MutationValue :: Determined ( value) => Ok ( value) ,
76
54
MutationValue :: Undetermined => {
77
55
let value = f ( key. clone ( ) ) . await ?;
78
- let mut mutations = self . inner . lock ( ) . await ;
79
- Self :: update_cache ( & mut mutations, key, value. clone ( ) ) ;
56
+ self . update_cache ( key, value. clone ( ) ) ;
80
57
Ok ( value)
81
58
}
82
59
}
@@ -87,7 +64,7 @@ impl Buffer {
87
64
///
88
65
/// only used for snapshot read (i.e. not for `batch_get_for_update`)
89
66
pub async fn batch_get_or_else < F , Fut > (
90
- & self ,
67
+ & mut self ,
91
68
keys : impl Iterator < Item = Key > ,
92
69
f : F ,
93
70
) -> Result < impl Iterator < Item = KvPair > >
@@ -96,15 +73,14 @@ impl Buffer {
96
73
Fut : Future < Output = Result < Vec < KvPair > > > ,
97
74
{
98
75
let ( cached_results, undetermined_keys) = {
99
- let mutations = self . inner . lock ( ) . await ;
100
76
// Partition the keys into those we have buffered and those we have to
101
77
// get from the store.
102
78
let ( undetermined_keys, cached_results) : (
103
79
Vec < ( Key , MutationValue ) > ,
104
80
Vec < ( Key , MutationValue ) > ,
105
81
) = keys
106
82
. map ( |key| {
107
- let value = mutations
83
+ let value = self
108
84
. entry_map
109
85
. get ( & key)
110
86
. map ( BufferEntry :: get_value)
@@ -122,11 +98,10 @@ impl Buffer {
122
98
} ;
123
99
124
100
let fetched_results = f ( Box :: new ( undetermined_keys) ) . await ?;
125
- let mut mutations = self . inner . lock ( ) . await ;
126
101
for kvpair in & fetched_results {
127
102
let key = kvpair. 0 . clone ( ) ;
128
103
let value = Some ( kvpair. 1 . clone ( ) ) ;
129
- Self :: update_cache ( & mut mutations , key, value) ;
104
+ self . update_cache ( key, value) ;
130
105
}
131
106
132
107
let results = cached_results. chain ( fetched_results. into_iter ( ) ) ;
@@ -135,7 +110,7 @@ impl Buffer {
135
110
136
111
/// Run `f` to fetch entries in `range` from TiKV. Combine them with mutations in local buffer. Returns the results.
137
112
pub async fn scan_and_fetch < F , Fut > (
138
- & self ,
113
+ & mut self ,
139
114
range : BoundRange ,
140
115
limit : u32 ,
141
116
f : F ,
@@ -145,8 +120,7 @@ impl Buffer {
145
120
Fut : Future < Output = Result < Vec < KvPair > > > ,
146
121
{
147
122
// read from local buffer
148
- let mut mutations = self . inner . lock ( ) . await ;
149
- let mutation_range = mutations. entry_map . range ( range. clone ( ) ) ;
123
+ let mutation_range = self . entry_map . range ( range. clone ( ) ) ;
150
124
151
125
// fetch from TiKV
152
126
// fetch more entries because some of them may be deleted.
@@ -177,7 +151,7 @@ impl Buffer {
177
151
178
152
// update local buffer
179
153
for ( k, v) in & results {
180
- Self :: update_cache ( & mut mutations , k. clone ( ) , Some ( v. clone ( ) ) ) ;
154
+ self . update_cache ( k. clone ( ) , Some ( v. clone ( ) ) ) ;
181
155
}
182
156
183
157
let mut res = results
@@ -190,10 +164,9 @@ impl Buffer {
190
164
}
191
165
192
166
/// Lock the given key if necessary.
193
- pub async fn lock ( & self , key : Key ) {
194
- let mutations = & mut self . inner . lock ( ) . await ;
195
- mutations. primary_key . get_or_insert_with ( || key. clone ( ) ) ;
196
- let value = mutations
167
+ pub async fn lock ( & mut self , key : Key ) {
168
+ self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
169
+ let value = self
197
170
. entry_map
198
171
. entry ( key)
199
172
// Mutated keys don't need a lock.
@@ -205,68 +178,58 @@ impl Buffer {
205
178
}
206
179
207
180
/// Insert a value into the buffer (does not write through).
208
- pub async fn put ( & self , key : Key , value : Value ) {
209
- self . inner . lock ( ) . await . insert ( key, BufferEntry :: Put ( value) ) ;
181
+ pub async fn put ( & mut self , key : Key , value : Value ) {
182
+ self . insert_entry ( key, BufferEntry :: Put ( value) ) ;
210
183
}
211
184
212
185
/// Mark a value as Insert mutation into the buffer (does not write through).
213
- pub async fn insert ( & self , key : Key , value : Value ) {
214
- let mut mutations = self . inner . lock ( ) . await ;
215
- let mut entry = mutations. entry_map . entry ( key. clone ( ) ) ;
186
+ pub async fn insert ( & mut self , key : Key , value : Value ) {
187
+ let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
216
188
match entry {
217
189
Entry :: Occupied ( ref mut o) if matches ! ( o. get( ) , BufferEntry :: Del ) => {
218
190
o. insert ( BufferEntry :: Put ( value) ) ;
219
191
}
220
- _ => mutations . insert ( key, BufferEntry :: Insert ( value) ) ,
192
+ _ => self . insert_entry ( key, BufferEntry :: Insert ( value) ) ,
221
193
}
222
194
}
223
195
224
196
/// Mark a value as deleted.
225
- pub async fn delete ( & self , key : Key ) {
226
- let mut mutations = self . inner . lock ( ) . await ;
227
- let is_pessimistic = mutations. is_pessimistic ;
228
- let mut entry = mutations. entry_map . entry ( key. clone ( ) ) ;
197
+ pub async fn delete ( & mut self , key : Key ) {
198
+ let is_pessimistic = self . is_pessimistic ;
199
+ let mut entry = self . entry_map . entry ( key. clone ( ) ) ;
229
200
230
201
match entry {
231
202
Entry :: Occupied ( ref mut o)
232
203
if matches ! ( o. get( ) , BufferEntry :: Insert ( _) ) && !is_pessimistic =>
233
204
{
234
205
o. insert ( BufferEntry :: CheckNotExist ) ;
235
206
}
236
- _ => mutations . insert ( key, BufferEntry :: Del ) ,
207
+ _ => self . insert_entry ( key, BufferEntry :: Del ) ,
237
208
}
238
209
}
239
210
240
211
/// Converts the buffered mutations to the proto buffer version
241
212
pub async fn to_proto_mutations ( & self ) -> Vec < kvrpcpb:: Mutation > {
242
- self . inner
243
- . lock ( )
244
- . await
245
- . entry_map
213
+ self . entry_map
246
214
. iter ( )
247
215
. filter_map ( |( key, mutation) | mutation. to_proto_with_key ( key) )
248
216
. collect ( )
249
217
}
250
218
251
219
async fn get_from_mutations ( & self , key : & Key ) -> MutationValue {
252
- self . inner
253
- . lock ( )
254
- . await
255
- . entry_map
220
+ self . entry_map
256
221
. get ( & key)
257
222
. map ( BufferEntry :: get_value)
258
223
. unwrap_or ( MutationValue :: Undetermined )
259
224
}
260
225
261
- fn update_cache ( buffer : & mut MutexGuard < InnerBuffer > , key : Key , value : Option < Value > ) {
262
- match buffer . entry_map . get ( & key) {
226
+ fn update_cache ( & mut self , key : Key , value : Option < Value > ) {
227
+ match self . entry_map . get ( & key) {
263
228
Some ( BufferEntry :: Locked ( None ) ) => {
264
- buffer
265
- . entry_map
266
- . insert ( key, BufferEntry :: Locked ( Some ( value) ) ) ;
229
+ self . entry_map . insert ( key, BufferEntry :: Locked ( Some ( value) ) ) ;
267
230
}
268
231
None => {
269
- buffer . entry_map . insert ( key, BufferEntry :: Cached ( value) ) ;
232
+ self . entry_map . insert ( key, BufferEntry :: Cached ( value) ) ;
270
233
}
271
234
Some ( BufferEntry :: Cached ( v) ) | Some ( BufferEntry :: Locked ( Some ( v) ) ) => {
272
235
assert ! ( & value == v) ;
@@ -285,6 +248,14 @@ impl Buffer {
285
248
}
286
249
}
287
250
}
251
+
252
+ fn insert_entry ( & mut self , key : impl Into < Key > , entry : BufferEntry ) {
253
+ let key = key. into ( ) ;
254
+ if !matches ! ( entry, BufferEntry :: Cached ( _) | BufferEntry :: CheckNotExist ) {
255
+ self . primary_key . get_or_insert_with ( || key. clone ( ) ) ;
256
+ }
257
+ self . entry_map . insert ( key, entry) ;
258
+ }
288
259
}
289
260
290
261
// The state of a key-value pair in the buffer.
@@ -388,7 +359,7 @@ mod tests {
388
359
#[ tokio:: test]
389
360
#[ allow( unreachable_code) ]
390
361
async fn set_and_get_from_buffer ( ) {
391
- let buffer = Buffer :: new ( false ) ;
362
+ let mut buffer = Buffer :: new ( false ) ;
392
363
buffer
393
364
. put ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) )
394
365
. await ;
@@ -421,7 +392,7 @@ mod tests {
421
392
#[ tokio:: test]
422
393
#[ allow( unreachable_code) ]
423
394
async fn insert_and_get_from_buffer ( ) {
424
- let buffer = Buffer :: new ( false ) ;
395
+ let mut buffer = Buffer :: new ( false ) ;
425
396
buffer
426
397
. insert ( b"key1" . to_vec ( ) . into ( ) , b"value1" . to_vec ( ) )
427
398
. await ;
@@ -463,13 +434,13 @@ mod tests {
463
434
let v2: Value = b"value2" . to_vec ( ) ;
464
435
let v2_ = v2. clone ( ) ;
465
436
466
- let buffer = Buffer :: new ( false ) ;
437
+ let mut buffer = Buffer :: new ( false ) ;
467
438
let r1 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( Ok ( Some ( v1_) ) ) ) ) ;
468
439
let r2 = block_on ( buffer. get_or_else ( k1. clone ( ) , move |_| ready ( panic ! ( ) ) ) ) ;
469
440
assert_eq ! ( r1. unwrap( ) . unwrap( ) , v1) ;
470
441
assert_eq ! ( r2. unwrap( ) . unwrap( ) , v1) ;
471
442
472
- let buffer = Buffer :: new ( false ) ;
443
+ let mut buffer = Buffer :: new ( false ) ;
473
444
let r1 = block_on (
474
445
buffer. batch_get_or_else ( vec ! [ k1. clone( ) , k2. clone( ) ] . into_iter ( ) , move |_| {
475
446
ready ( Ok ( vec ! [ ( k1_, v1__) . into( ) , ( k2_, v2_) . into( ) ] ) )
0 commit comments