Skip to content

Commit bdff7e3

Browse files
authored
Merge pull request #258 from ekexium/unify-locking
Make get_for_update work for optimistic txns
2 parents 87d5cbb + 1824288 commit bdff7e3

File tree

2 files changed

+49
-36
lines changed

2 files changed

+49
-36
lines changed

src/transaction/transaction.rs

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -116,18 +116,30 @@ impl<PdC: PdClient> Transaction<PdC> {
116116
}
117117

118118
/// Create a `get for udpate` request.
119-
/// Once resolved this request will pessimistically lock and fetch the latest
120-
/// value associated with the given key at **current timestamp**.
121119
///
122-
/// The "current timestamp" (also called `for_update_ts` of the request) is fetched immediately from PD.
120+
/// The request reads and "locks" a key. It is similar to `SELECT ... FOR
121+
/// UPDATE` in TiDB, and has different behavior in optimistic and
122+
/// pessimistic transactions.
123123
///
124-
/// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
125-
/// which is similar to that in MySQL. It reads the latest value (using current timestamp),
126-
/// and the value is not cached in the local buffer.
127-
/// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`.
124+
/// # Optimistic transaction
128125
///
126+
/// It reads at the "start timestamp" and caches the value, just like normal
127+
/// get requests. The lock is written in prewrite and commit, so it cannot
128+
/// prevent concurrent transactions from writing the same key, but can only
129+
/// prevent itself from committing.
129130
///
130-
/// It can only be used in pessimistic mode.
131+
/// # Pessimistic transaction
132+
///
133+
/// It reads at the "current timestamp" and thus does not cache the value.
134+
/// So following read requests won't be affected by the `get_for_udpate`.
135+
/// A lock will be acquired immediately with this request, which prevents
136+
/// concurrent transactions from mutating the keys.
137+
///
138+
/// The "current timestamp" (also called `for_update_ts` of the request) is
139+
/// fetched immediately from the timestamp oracle.
140+
///
141+
/// Note: The behavior of the request under pessimistic transaction does not
142+
/// follow snapshot isolation.
131143
///
132144
/// # Examples
133145
/// ```rust,no_run
@@ -146,7 +158,9 @@ impl<PdC: PdClient> Transaction<PdC> {
146158
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
147159
self.check_allow_operation().await?;
148160
if !self.is_pessimistic() {
149-
Err(Error::InvalidTransactionType)
161+
let key = key.into();
162+
self.lock_keys(iter::once(key.clone())).await?;
163+
self.get(key).await
150164
} else {
151165
let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?;
152166
debug_assert!(pairs.len() <= 1);
@@ -228,33 +242,25 @@ impl<PdC: PdClient> Transaction<PdC> {
228242

229243
/// Create a new 'batch get for update' request.
230244
///
231-
/// Once resolved this request will pessimistically lock the keys and
232-
/// fetch the values associated with the given keys.
233-
///
234-
/// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
235-
/// which is similar to that in MySQL. It reads the latest value (using current timestamp),
236-
/// and the value is not cached in the local buffer.
237-
/// So normal `get`-like commands after `batch_get_for_update` will not be influenced, they still read values at `start_ts`.
245+
/// Similar [`get_for_update`](Transaction::get_for_update), but it works
246+
/// for a batch of keys.
238247
///
239-
/// Non-existent entries will not appear in the result. The order of the keys is not retained in the result.
240-
///
241-
/// It can only be used in pessimistic mode.
248+
/// Non-existent entries will not appear in the result. The order of the
249+
/// keys is not retained in the result.
242250
///
243251
/// # Examples
244252
/// ```rust,no_run
245-
/// # use tikv_client::{Key, Value, Config, TransactionClient};
253+
/// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair};
246254
/// # use futures::prelude::*;
247255
/// # use std::collections::HashMap;
248256
/// # futures::executor::block_on(async {
249257
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
250258
/// let mut txn = client.begin_pessimistic().await.unwrap();
251259
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
252-
/// let result: HashMap<Key, Value> = txn
260+
/// let result: Vec<KvPair> = txn
253261
/// .batch_get_for_update(keys)
254262
/// .await
255-
/// .unwrap()
256-
/// .map(|pair| (pair.0, pair.1))
257-
/// .collect();
263+
/// .unwrap();
258264
/// // now "TiKV" and "TiDB" are both locked
259265
/// // Finish the transaction...
260266
/// txn.commit().await.unwrap();
@@ -263,13 +269,15 @@ impl<PdC: PdClient> Transaction<PdC> {
263269
pub async fn batch_get_for_update(
264270
&mut self,
265271
keys: impl IntoIterator<Item = impl Into<Key>>,
266-
) -> Result<impl Iterator<Item = KvPair>> {
272+
) -> Result<Vec<KvPair>> {
267273
self.check_allow_operation().await?;
274+
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
268275
if !self.is_pessimistic() {
269-
return Err(Error::InvalidTransactionType);
276+
self.lock_keys(keys.clone()).await?;
277+
Ok(self.batch_get(keys).await?.collect())
278+
} else {
279+
self.pessimistic_lock(keys, true).await
270280
}
271-
let keys: Vec<Key> = keys.into_iter().map(|it| it.into()).collect();
272-
Ok(self.pessimistic_lock(keys, true).await?.into_iter())
273281
}
274282

275283
/// Create a new 'scan' request.
@@ -473,8 +481,8 @@ impl<PdC: PdClient> Transaction<PdC> {
473481
}
474482
}
475483
TransactionKind::Pessimistic(_) => {
476-
self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false)
477-
.await?;
484+
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
485+
self.pessimistic_lock(keys.into_iter(), false).await?;
478486
}
479487
}
480488
Ok(())

tests/integration_tests.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,7 @@ async fn txn_write_million() -> Result<()> {
305305
let res = txn.batch_get(keys.clone()).await?.collect::<Vec<_>>();
306306
assert_eq!(res.len(), keys.len());
307307

308-
let res = txn
309-
.batch_get_for_update(keys.clone())
310-
.await?
311-
.collect::<Vec<_>>();
308+
let res = txn.batch_get_for_update(keys.clone()).await?;
312309
assert_eq!(res.len(), keys.len());
313310

314311
txn.commit().await?;
@@ -686,7 +683,8 @@ async fn get_for_update() -> Result<()> {
686683

687684
let mut t1 = client.begin_pessimistic().await?;
688685
let mut t2 = client.begin_pessimistic().await?;
689-
686+
let mut t3 = client.begin_optimistic().await?;
687+
let mut t4 = client.begin_optimistic().await?;
690688
let mut t0 = client.begin_pessimistic().await?;
691689
t0.put(key1.clone(), value1).await?;
692690
t0.put(key2.clone(), value2).await?;
@@ -700,12 +698,19 @@ async fn get_for_update() -> Result<()> {
700698
let res: HashMap<_, _> = t2
701699
.batch_get_for_update(keys.clone())
702700
.await?
701+
.into_iter()
703702
.map(From::from)
704703
.collect();
705704
t2.commit().await?;
706-
assert!(res.get(&key1.into()).unwrap() == &value1);
705+
assert!(res.get(&key1.clone().into()).unwrap() == &value1);
707706
assert!(res.get(&key2.into()).unwrap() == &value2);
708707

708+
assert!(t3.get_for_update(key1).await?.is_none());
709+
assert!(t3.commit().await.is_err());
710+
711+
assert!(t4.batch_get_for_update(keys).await?.len() == 0);
712+
assert!(t4.commit().await.is_err());
713+
709714
Ok(())
710715
}
711716

0 commit comments

Comments
 (0)