Skip to content

Commit 6e02f61

Browse files
committed
Merge remote-tracking branch 'upstream/master' into atomic
Signed-off-by: ekexium <[email protected]>
2 parents ba7a5f2 + bdff7e3 commit 6e02f61

File tree

7 files changed

+82
-73
lines changed

7 files changed

+82
-73
lines changed

rust-toolchain

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/raw/client.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use tikv_client_common::Error;
44

55
use crate::{
6-
backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
6+
backoff::DEFAULT_REGION_BACKOFF,
77
config::Config,
88
pd::PdRpcClient,
99
raw::lowering::*,
@@ -115,7 +115,6 @@ impl Client {
115115
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
116116
.single_region()
117117
.await?
118-
.resolve_lock(OPTIMISTIC_BACKOFF)
119118
.retry_region(DEFAULT_REGION_BACKOFF)
120119
.post_process_default()
121120
.plan();
@@ -146,7 +145,6 @@ impl Client {
146145
) -> Result<Vec<KvPair>> {
147146
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
148147
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
149-
.resolve_lock(OPTIMISTIC_BACKOFF)
150148
.multi_region()
151149
.retry_region(DEFAULT_REGION_BACKOFF)
152150
.merge(Collect)
@@ -284,7 +282,6 @@ impl Client {
284282
let request =
285283
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
286284
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
287-
.resolve_lock(OPTIMISTIC_BACKOFF)
288285
.multi_region()
289286
.retry_region(DEFAULT_REGION_BACKOFF)
290287
.extract_error()
@@ -311,7 +308,6 @@ impl Client {
311308
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
312309
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
313310
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
314-
.resolve_lock(OPTIMISTIC_BACKOFF)
315311
.multi_region()
316312
.retry_region(DEFAULT_REGION_BACKOFF)
317313
.extract_error()
@@ -482,7 +478,6 @@ impl Client {
482478

483479
let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
484480
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
485-
.resolve_lock(OPTIMISTIC_BACKOFF)
486481
.multi_region()
487482
.retry_region(DEFAULT_REGION_BACKOFF)
488483
.merge(Collect)
@@ -514,7 +509,6 @@ impl Client {
514509
self.cf.clone(),
515510
);
516511
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
517-
.resolve_lock(OPTIMISTIC_BACKOFF)
518512
.multi_region()
519513
.retry_region(DEFAULT_REGION_BACKOFF)
520514
.merge(Collect)
@@ -532,7 +526,6 @@ impl Client {
532526
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
533527
.single_region()
534528
.await?
535-
.resolve_lock(OPTIMISTIC_BACKOFF)
536529
.retry_region(DEFAULT_REGION_BACKOFF)
537530
.extract_error()
538531
.plan();
@@ -548,7 +541,6 @@ impl Client {
548541
let request =
549542
new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone(), atomic);
550543
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
551-
.resolve_lock(OPTIMISTIC_BACKOFF)
552544
.multi_region()
553545
.retry_region(DEFAULT_REGION_BACKOFF)
554546
.extract_error()
@@ -562,7 +554,6 @@ impl Client {
562554
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
563555
.single_region()
564556
.await?
565-
.resolve_lock(OPTIMISTIC_BACKOFF)
566557
.retry_region(DEFAULT_REGION_BACKOFF)
567558
.extract_error()
568559
.plan();

src/transaction/buffer.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,38 @@ struct InnerBuffer {
2121
impl InnerBuffer {
2222
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
2323
let key = key.into();
24-
if !matches!(entry, BufferEntry::Cached(_)) {
24+
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
2525
self.primary_key.get_or_insert_with(|| key.clone());
2626
}
2727
self.entry_map.insert(key, entry);
2828
}
2929

30-
pub fn get_primary_key_or(&mut self, key: &Key) -> &Key {
31-
self.primary_key.get_or_insert(key.clone())
30+
/// Set the primary key if it is not set
31+
pub fn primary_key_or(&mut self, key: &Key) {
32+
self.primary_key.get_or_insert(key.clone());
3233
}
3334
}
3435

3536
/// A caching layer which buffers reads and writes in a transaction.
3637
pub struct Buffer {
37-
mutations: Mutex<InnerBuffer>,
38+
inner: Mutex<InnerBuffer>,
3839
}
3940

4041
impl Buffer {
4142
pub fn new(is_pessimistic: bool) -> Buffer {
4243
Buffer {
43-
mutations: Mutex::new(InnerBuffer::new(is_pessimistic)),
44+
inner: Mutex::new(InnerBuffer::new(is_pessimistic)),
4445
}
4546
}
4647

4748
/// Get the primary key of the buffer.
4849
pub async fn get_primary_key(&self) -> Option<Key> {
49-
self.mutations.lock().await.primary_key.clone()
50+
self.inner.lock().await.primary_key.clone()
5051
}
5152

52-
/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
53-
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
54-
self.mutations.lock().await.get_primary_key_or(key).clone()
53+
/// Set the primary key if it is not set
54+
pub async fn primary_key_or(&self, key: &Key) {
55+
self.inner.lock().await.primary_key_or(key);
5556
}
5657

5758
/// Get a value from the buffer.
@@ -74,7 +75,7 @@ impl Buffer {
7475
MutationValue::Determined(value) => Ok(value),
7576
MutationValue::Undetermined => {
7677
let value = f(key.clone()).await?;
77-
let mut mutations = self.mutations.lock().await;
78+
let mut mutations = self.inner.lock().await;
7879
Self::update_cache(&mut mutations, key, value.clone());
7980
Ok(value)
8081
}
@@ -95,7 +96,7 @@ impl Buffer {
9596
Fut: Future<Output = Result<Vec<KvPair>>>,
9697
{
9798
let (cached_results, undetermined_keys) = {
98-
let mutations = self.mutations.lock().await;
99+
let mutations = self.inner.lock().await;
99100
// Partition the keys into those we have buffered and those we have to
100101
// get from the store.
101102
let (undetermined_keys, cached_results): (
@@ -121,7 +122,7 @@ impl Buffer {
121122
};
122123

123124
let fetched_results = f(Box::new(undetermined_keys)).await?;
124-
let mut mutations = self.mutations.lock().await;
125+
let mut mutations = self.inner.lock().await;
125126
for kvpair in &fetched_results {
126127
let key = kvpair.0.clone();
127128
let value = Some(kvpair.1.clone());
@@ -144,7 +145,7 @@ impl Buffer {
144145
Fut: Future<Output = Result<Vec<KvPair>>>,
145146
{
146147
// read from local buffer
147-
let mut mutations = self.mutations.lock().await;
148+
let mut mutations = self.inner.lock().await;
148149
let mutation_range = mutations.entry_map.range(range.clone());
149150

150151
// fetch from TiKV
@@ -190,8 +191,8 @@ impl Buffer {
190191

191192
/// Lock the given key if necessary.
192193
pub async fn lock(&self, key: Key) {
193-
let mutations = &mut self.mutations.lock().await;
194-
mutations.primary_key.get_or_insert(key.clone());
194+
let mutations = &mut self.inner.lock().await;
195+
mutations.primary_key.get_or_insert_with(|| key.clone());
195196
let value = mutations
196197
.entry_map
197198
.entry(key)
@@ -205,15 +206,12 @@ impl Buffer {
205206

206207
/// Insert a value into the buffer (does not write through).
207208
pub async fn put(&self, key: Key, value: Value) {
208-
self.mutations
209-
.lock()
210-
.await
211-
.insert(key, BufferEntry::Put(value));
209+
self.inner.lock().await.insert(key, BufferEntry::Put(value));
212210
}
213211

214212
/// Mark a value as Insert mutation into the buffer (does not write through).
215213
pub async fn insert(&self, key: Key, value: Value) {
216-
let mut mutations = self.mutations.lock().await;
214+
let mut mutations = self.inner.lock().await;
217215
let mut entry = mutations.entry_map.entry(key.clone());
218216
match entry {
219217
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
@@ -225,7 +223,7 @@ impl Buffer {
225223

226224
/// Mark a value as deleted.
227225
pub async fn delete(&self, key: Key) {
228-
let mut mutations = self.mutations.lock().await;
226+
let mut mutations = self.inner.lock().await;
229227
let is_pessimistic = mutations.is_pessimistic;
230228
let mut entry = mutations.entry_map.entry(key.clone());
231229

@@ -241,7 +239,7 @@ impl Buffer {
241239

242240
/// Converts the buffered mutations to the proto buffer version
243241
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
244-
self.mutations
242+
self.inner
245243
.lock()
246244
.await
247245
.entry_map
@@ -251,7 +249,7 @@ impl Buffer {
251249
}
252250

253251
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
254-
self.mutations
252+
self.inner
255253
.lock()
256254
.await
257255
.entry_map

src/transaction/transaction.rs

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,30 @@ impl<PdC: PdClient> Transaction<PdC> {
116116
}
117117

118118
/// Create a `get for udpate` request.
119-
/// Once resolved this request will pessimistically lock and fetch the latest
120-
/// value associated with the given key at **current timestamp**.
121119
///
122-
/// The "current timestamp" (also called `for_update_ts` of the request) is fetched immediately from PD.
120+
/// The request reads and "locks" a key. It is similar to `SELECT ... FOR
121+
/// UPDATE` in TiDB, and has different behavior in optimistic and
122+
/// pessimistic transactions.
123123
///
124-
/// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
125-
/// which is similar to that in MySQL. It reads the latest value (using current timestamp),
126-
/// and the value is not cached in the local buffer.
127-
/// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`.
124+
/// # Optimistic transaction
128125
///
126+
/// It reads at the "start timestamp" and caches the value, just like normal
127+
/// get requests. The lock is written in prewrite and commit, so it cannot
128+
/// prevent concurrent transactions from writing the same key, but can only
129+
/// prevent itself from committing.
129130
///
130-
/// It can only be used in pessimistic mode.
131+
/// # Pessimistic transaction
132+
///
133+
/// It reads at the "current timestamp" and thus does not cache the value.
134+
/// So following read requests won't be affected by the `get_for_udpate`.
135+
/// A lock will be acquired immediately with this request, which prevents
136+
/// concurrent transactions from mutating the keys.
137+
///
138+
/// The "current timestamp" (also called `for_update_ts` of the request) is
139+
/// fetched immediately from the timestamp oracle.
140+
///
141+
/// Note: The behavior of the request under pessimistic transaction does not
142+
/// follow snapshot isolation.
131143
///
132144
/// # Examples
133145
/// ```rust,no_run
@@ -146,7 +158,9 @@ impl<PdC: PdClient> Transaction<PdC> {
146158
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
147159
self.check_allow_operation().await?;
148160
if !self.is_pessimistic() {
149-
Err(Error::InvalidTransactionType)
161+
let key = key.into();
162+
self.lock_keys(iter::once(key.clone())).await?;
163+
self.get(key).await
150164
} else {
151165
let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?;
152166
debug_assert!(pairs.len() <= 1);
@@ -228,33 +242,25 @@ impl<PdC: PdClient> Transaction<PdC> {
228242

229243
/// Create a new 'batch get for update' request.
230244
///
231-
/// Once resolved this request will pessimistically lock the keys and
232-
/// fetch the values associated with the given keys.
233-
///
234-
/// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
235-
/// which is similar to that in MySQL. It reads the latest value (using current timestamp),
236-
/// and the value is not cached in the local buffer.
237-
/// So normal `get`-like commands after `batch_get_for_update` will not be influenced, they still read values at `start_ts`.
238-
///
239-
/// Non-existent entries will not appear in the result. The order of the keys is not retained in the result.
245+
/// Similar [`get_for_update`](Transaction::get_for_update), but it works
246+
/// for a batch of keys.
240247
///
241-
/// It can only be used in pessimistic mode.
248+
/// Non-existent entries will not appear in the result. The order of the
249+
/// keys is not retained in the result.
242250
///
243251
/// # Examples
244252
/// ```rust,no_run
245-
/// # use tikv_client::{Key, Value, Config, TransactionClient};
253+
/// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair};
246254
/// # use futures::prelude::*;
247255
/// # use std::collections::HashMap;
248256
/// # futures::executor::block_on(async {
249257
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
250258
/// let mut txn = client.begin_pessimistic().await.unwrap();
251259
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
252-
/// let result: HashMap<Key, Value> = txn
260+
/// let result: Vec<KvPair> = txn
253261
/// .batch_get_for_update(keys)
254262
/// .await
255-
/// .unwrap()
256-
/// .map(|pair| (pair.0, pair.1))
257-
/// .collect();
263+
/// .unwrap();
258264
/// // now "TiKV" and "TiDB" are both locked
259265
/// // Finish the transaction...
260266
/// txn.commit().await.unwrap();
@@ -263,13 +269,15 @@ impl<PdC: PdClient> Transaction<PdC> {
263269
pub async fn batch_get_for_update(
264270
&mut self,
265271
keys: impl IntoIterator<Item = impl Into<Key>>,
266-
) -> Result<impl Iterator<Item = KvPair>> {
272+
) -> Result<Vec<KvPair>> {
267273
self.check_allow_operation().await?;
274+
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
268275
if !self.is_pessimistic() {
269-
return Err(Error::InvalidTransactionType);
276+
self.lock_keys(keys.clone()).await?;
277+
Ok(self.batch_get(keys).await?.collect())
278+
} else {
279+
self.pessimistic_lock(keys, true).await
270280
}
271-
let keys: Vec<Key> = keys.into_iter().map(|it| it.into()).collect();
272-
Ok(self.pessimistic_lock(keys, true).await?.into_iter())
273281
}
274282

275283
/// Create a new 'scan' request.
@@ -473,8 +481,8 @@ impl<PdC: PdClient> Transaction<PdC> {
473481
}
474482
}
475483
TransactionKind::Pessimistic(_) => {
476-
self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false)
477-
.await?;
484+
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
485+
self.pessimistic_lock(keys.into_iter(), false).await?;
478486
}
479487
}
480488
Ok(())
@@ -649,7 +657,13 @@ impl<PdC: PdClient> Transaction<PdC> {
649657
}
650658

651659
let first_key = keys[0].clone().key();
652-
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
660+
// we do not set the primary key here, because pessimistic lock request
661+
// can fail, in which case the keys may not be part of the transaction.
662+
let primary_lock = self
663+
.buffer
664+
.get_primary_key()
665+
.await
666+
.unwrap_or_else(|| first_key.clone());
653667
let for_update_ts = self.rpc.clone().get_timestamp().await?;
654668
self.options.push_for_update_ts(for_update_ts.clone());
655669
let request = new_pessimistic_lock_request(
@@ -669,6 +683,9 @@ impl<PdC: PdClient> Transaction<PdC> {
669683
.plan();
670684
let pairs = plan.execute().await;
671685

686+
// primary key will be set here if needed
687+
self.buffer.primary_key_or(&first_key).await;
688+
672689
self.start_auto_heartbeat().await;
673690

674691
for key in keys {

0 commit comments

Comments
 (0)