Skip to content

Commit e320efa

Browse files
authored
Merge pull request #254 from ekexium/pessimistic-lock-primary-key
Don't set primary key in a pessimistic lock request until it succeeds
2 parents 5688e08 + 2247c5d commit e320efa

File tree

2 files changed

+31
-24
lines changed

2 files changed

+31
-24
lines changed

src/transaction/buffer.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,38 @@ struct InnerBuffer {
2121
impl InnerBuffer {
2222
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
2323
let key = key.into();
24-
if !matches!(entry, BufferEntry::Cached(_)) {
24+
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
2525
self.primary_key.get_or_insert_with(|| key.clone());
2626
}
2727
self.entry_map.insert(key, entry);
2828
}
2929

30-
pub fn get_primary_key_or(&mut self, key: &Key) -> &Key {
31-
self.primary_key.get_or_insert(key.clone())
30+
/// Set the primary key if it is not set
31+
pub fn primary_key_or(&mut self, key: &Key) {
32+
self.primary_key.get_or_insert(key.clone());
3233
}
3334
}
3435

3536
/// A caching layer which buffers reads and writes in a transaction.
3637
pub struct Buffer {
37-
mutations: Mutex<InnerBuffer>,
38+
inner: Mutex<InnerBuffer>,
3839
}
3940

4041
impl Buffer {
4142
pub fn new(is_pessimistic: bool) -> Buffer {
4243
Buffer {
43-
mutations: Mutex::new(InnerBuffer::new(is_pessimistic)),
44+
inner: Mutex::new(InnerBuffer::new(is_pessimistic)),
4445
}
4546
}
4647

4748
/// Get the primary key of the buffer.
4849
pub async fn get_primary_key(&self) -> Option<Key> {
49-
self.mutations.lock().await.primary_key.clone()
50+
self.inner.lock().await.primary_key.clone()
5051
}
5152

52-
/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
53-
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
54-
self.mutations.lock().await.get_primary_key_or(key).clone()
53+
/// Set the primary key if it is not set
54+
pub async fn primary_key_or(&self, key: &Key) {
55+
self.inner.lock().await.primary_key_or(key);
5556
}
5657

5758
/// Get a value from the buffer.
@@ -74,7 +75,7 @@ impl Buffer {
7475
MutationValue::Determined(value) => Ok(value),
7576
MutationValue::Undetermined => {
7677
let value = f(key.clone()).await?;
77-
let mut mutations = self.mutations.lock().await;
78+
let mut mutations = self.inner.lock().await;
7879
Self::update_cache(&mut mutations, key, value.clone());
7980
Ok(value)
8081
}
@@ -95,7 +96,7 @@ impl Buffer {
9596
Fut: Future<Output = Result<Vec<KvPair>>>,
9697
{
9798
let (cached_results, undetermined_keys) = {
98-
let mutations = self.mutations.lock().await;
99+
let mutations = self.inner.lock().await;
99100
// Partition the keys into those we have buffered and those we have to
100101
// get from the store.
101102
let (undetermined_keys, cached_results): (
@@ -121,7 +122,7 @@ impl Buffer {
121122
};
122123

123124
let fetched_results = f(Box::new(undetermined_keys)).await?;
124-
let mut mutations = self.mutations.lock().await;
125+
let mut mutations = self.inner.lock().await;
125126
for kvpair in &fetched_results {
126127
let key = kvpair.0.clone();
127128
let value = Some(kvpair.1.clone());
@@ -144,7 +145,7 @@ impl Buffer {
144145
Fut: Future<Output = Result<Vec<KvPair>>>,
145146
{
146147
// read from local buffer
147-
let mut mutations = self.mutations.lock().await;
148+
let mut mutations = self.inner.lock().await;
148149
let mutation_range = mutations.entry_map.range(range.clone());
149150

150151
// fetch from TiKV
@@ -190,8 +191,8 @@ impl Buffer {
190191

191192
/// Lock the given key if necessary.
192193
pub async fn lock(&self, key: Key) {
193-
let mutations = &mut self.mutations.lock().await;
194-
mutations.primary_key.get_or_insert(key.clone());
194+
let mutations = &mut self.inner.lock().await;
195+
mutations.primary_key.get_or_insert_with(|| key.clone());
195196
let value = mutations
196197
.entry_map
197198
.entry(key)
@@ -205,15 +206,12 @@ impl Buffer {
205206

206207
/// Insert a value into the buffer (does not write through).
207208
pub async fn put(&self, key: Key, value: Value) {
208-
self.mutations
209-
.lock()
210-
.await
211-
.insert(key, BufferEntry::Put(value));
209+
self.inner.lock().await.insert(key, BufferEntry::Put(value));
212210
}
213211

214212
/// Mark a value as Insert mutation into the buffer (does not write through).
215213
pub async fn insert(&self, key: Key, value: Value) {
216-
let mut mutations = self.mutations.lock().await;
214+
let mut mutations = self.inner.lock().await;
217215
let mut entry = mutations.entry_map.entry(key.clone());
218216
match entry {
219217
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
@@ -225,7 +223,7 @@ impl Buffer {
225223

226224
/// Mark a value as deleted.
227225
pub async fn delete(&self, key: Key) {
228-
let mut mutations = self.mutations.lock().await;
226+
let mut mutations = self.inner.lock().await;
229227
let is_pessimistic = mutations.is_pessimistic;
230228
let mut entry = mutations.entry_map.entry(key.clone());
231229

@@ -241,7 +239,7 @@ impl Buffer {
241239

242240
/// Converts the buffered mutations to the proto buffer version
243241
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
244-
self.mutations
242+
self.inner
245243
.lock()
246244
.await
247245
.entry_map
@@ -251,7 +249,7 @@ impl Buffer {
251249
}
252250

253251
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
254-
self.mutations
252+
self.inner
255253
.lock()
256254
.await
257255
.entry_map

src/transaction/transaction.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,13 @@ impl<PdC: PdClient> Transaction<PdC> {
649649
}
650650

651651
let first_key = keys[0].clone().key();
652-
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
652+
// we do not set the primary key here, because pessimistic lock request
653+
// can fail, in which case the keys may not be part of the transaction.
654+
let primary_lock = self
655+
.buffer
656+
.get_primary_key()
657+
.await
658+
.unwrap_or_else(|| first_key.clone());
653659
let for_update_ts = self.rpc.clone().get_timestamp().await?;
654660
self.options.push_for_update_ts(for_update_ts.clone());
655661
let request = new_pessimistic_lock_request(
@@ -669,6 +675,9 @@ impl<PdC: PdClient> Transaction<PdC> {
669675
.plan();
670676
let pairs = plan.execute().await;
671677

678+
// primary key will be set here if needed
679+
self.buffer.primary_key_or(&first_key).await;
680+
672681
self.start_auto_heartbeat().await;
673682

674683
for key in keys {

0 commit comments

Comments
 (0)