Skip to content

Commit d3e5ffb

Browse files
committed
feat: make get_for_update work for optimistic txns
Signed-off-by: ekexium <[email protected]>
1 parent 4870985 commit d3e5ffb

File tree

2 files changed

+46
-21
lines changed

2 files changed

+46
-21
lines changed

src/transaction/transaction.rs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,17 @@ impl<PdC: PdClient> Transaction<PdC> {
116116
}
117117

118118
/// Create a `get for udpate` request.
119+
/// It has different behaviors in optimistic and pessimistic transactions.
120+
///
121+
/// # Optimistic transaction
122+
/// Once resolved this request will retrieve the value just like a normal `get` request,
123+
/// and "locks" the key. This lock will not affect other (concurrent) transactions, but will
124+
/// prevent the current transaction from successfully committing if there is another write
125+
/// containing the "locked" key which is committed between the start and commit of the current transaction.
126+
///
127+
/// The value is read from the `start timestamp`, thus it is cached in the local buffer.
128+
///
129+
/// # Pessimistic transaction
119130
/// Once resolved this request will pessimistically lock and fetch the latest
120131
/// value associated with the given key at **current timestamp**.
121132
///
@@ -127,8 +138,6 @@ impl<PdC: PdClient> Transaction<PdC> {
127138
/// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`.
128139
///
129140
///
130-
/// It can only be used in pessimistic mode.
131-
///
132141
/// # Examples
133142
/// ```rust,no_run
134143
/// # use tikv_client::{Value, Config, TransactionClient};
@@ -146,7 +155,9 @@ impl<PdC: PdClient> Transaction<PdC> {
146155
pub async fn get_for_update(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
147156
self.check_allow_operation().await?;
148157
if !self.is_pessimistic() {
149-
Err(Error::InvalidTransactionType)
158+
let key = key.into();
159+
self.lock_keys(iter::once(key.clone())).await?;
160+
self.get(key).await
150161
} else {
151162
let mut pairs = self.pessimistic_lock(iter::once(key.into()), true).await?;
152163
debug_assert!(pairs.len() <= 1);
@@ -227,7 +238,17 @@ impl<PdC: PdClient> Transaction<PdC> {
227238
}
228239

229240
/// Create a new 'batch get for update' request.
241+
/// It has different behaviors in optimistic and pessimistic transactions.
230242
///
243+
/// # Optimistic transaction
244+
/// Once resolved this request will retrieve the values just like a normal `batch_get` request,
245+
/// and "locks" the keys. The locks will not affect other (concurrent) transactions, but will
246+
/// prevent the current transaction from successfully committing if there is any other write
247+
/// containing a "locked" key which is committed between the start and commit of the current transaction.
248+
///
249+
/// The values are read from the `start timestamp`, thus they are cached in the local buffer.
250+
///
251+
/// # Pessimistic transaction
231252
/// Once resolved this request will pessimistically lock the keys and
232253
/// fetch the values associated with the given keys.
233254
///
@@ -238,23 +259,20 @@ impl<PdC: PdClient> Transaction<PdC> {
238259
///
239260
/// Non-existent entries will not appear in the result. The order of the keys is not retained in the result.
240261
///
241-
/// It can only be used in pessimistic mode.
242262
///
243263
/// # Examples
244264
/// ```rust,no_run
245-
/// # use tikv_client::{Key, Value, Config, TransactionClient};
265+
/// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair};
246266
/// # use futures::prelude::*;
247267
/// # use std::collections::HashMap;
248268
/// # futures::executor::block_on(async {
249269
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
250270
/// let mut txn = client.begin_pessimistic().await.unwrap();
251271
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
252-
/// let result: HashMap<Key, Value> = txn
272+
/// let result: Vec<KvPair> = txn
253273
/// .batch_get_for_update(keys)
254274
/// .await
255-
/// .unwrap()
256-
/// .map(|pair| (pair.0, pair.1))
257-
/// .collect();
275+
/// .unwrap();
258276
/// // now "TiKV" and "TiDB" are both locked
259277
/// // Finish the transaction...
260278
/// txn.commit().await.unwrap();
@@ -263,13 +281,15 @@ impl<PdC: PdClient> Transaction<PdC> {
263281
pub async fn batch_get_for_update(
264282
&mut self,
265283
keys: impl IntoIterator<Item = impl Into<Key>>,
266-
) -> Result<impl Iterator<Item = KvPair>> {
284+
) -> Result<Vec<KvPair>> {
267285
self.check_allow_operation().await?;
286+
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
268287
if !self.is_pessimistic() {
269-
return Err(Error::InvalidTransactionType);
288+
self.lock_keys(keys.clone()).await?;
289+
Ok(self.batch_get(keys).await?.collect())
290+
} else {
291+
self.pessimistic_lock(keys, true).await
270292
}
271-
let keys: Vec<Key> = keys.into_iter().map(|it| it.into()).collect();
272-
Ok(self.pessimistic_lock(keys, true).await?.into_iter())
273293
}
274294

275295
/// Create a new 'scan' request.
@@ -470,8 +490,8 @@ impl<PdC: PdClient> Transaction<PdC> {
470490
}
471491
}
472492
TransactionKind::Pessimistic(_) => {
473-
self.pessimistic_lock(keys.into_iter().map(|k| k.into()), false)
474-
.await?;
493+
let keys: Vec<Key> = keys.into_iter().map(|k| k.into()).collect();
494+
self.pessimistic_lock(keys.into_iter(), false).await?;
475495
}
476496
}
477497
Ok(())

tests/integration_tests.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,7 @@ async fn txn_write_million() -> Result<()> {
305305
let res = txn.batch_get(keys.clone()).await?.collect::<Vec<_>>();
306306
assert_eq!(res.len(), keys.len());
307307

308-
let res = txn
309-
.batch_get_for_update(keys.clone())
310-
.await?
311-
.collect::<Vec<_>>();
308+
let res = txn.batch_get_for_update(keys.clone()).await?;
312309
assert_eq!(res.len(), keys.len());
313310

314311
txn.commit().await?;
@@ -643,7 +640,8 @@ async fn get_for_update() -> Result<()> {
643640

644641
let mut t1 = client.begin_pessimistic().await?;
645642
let mut t2 = client.begin_pessimistic().await?;
646-
643+
let mut t3 = client.begin_optimistic().await?;
644+
let mut t4 = client.begin_optimistic().await?;
647645
let mut t0 = client.begin_pessimistic().await?;
648646
t0.put(key1.clone(), value1).await?;
649647
t0.put(key2.clone(), value2).await?;
@@ -657,12 +655,19 @@ async fn get_for_update() -> Result<()> {
657655
let res: HashMap<_, _> = t2
658656
.batch_get_for_update(keys.clone())
659657
.await?
658+
.into_iter()
660659
.map(From::from)
661660
.collect();
662661
t2.commit().await?;
663-
assert!(res.get(&key1.into()).unwrap() == &value1);
662+
assert!(res.get(&key1.clone().into()).unwrap() == &value1);
664663
assert!(res.get(&key2.into()).unwrap() == &value2);
665664

665+
assert!(t3.get_for_update(key1).await?.is_none());
666+
assert!(t3.commit().await.is_err());
667+
668+
assert!(t4.batch_get_for_update(keys).await?.len() == 0);
669+
assert!(t4.commit().await.is_err());
670+
666671
Ok(())
667672
}
668673

0 commit comments

Comments
 (0)