@@ -116,18 +116,30 @@ impl<PdC: PdClient> Transaction<PdC> {
116
116
}
117
117
118
118
/// Create a `get for udpate` request.
119
- /// Once resolved this request will pessimistically lock and fetch the latest
120
- /// value associated with the given key at **current timestamp**.
121
119
///
122
- /// The "current timestamp" (also called `for_update_ts` of the request) is fetched immediately from PD.
120
+ /// The request reads and "locks" a key. It is similar to `SELECT ... FOR
121
+ /// UPDATE` in TiDB, and has different behavior in optimistic and
122
+ /// pessimistic transactions.
123
123
///
124
- /// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
125
- /// which is similar to that in MySQL. It reads the latest value (using current timestamp),
126
- /// and the value is not cached in the local buffer.
127
- /// So normal `get`-like commands after `get_for_update` will not be influenced, they still read values at `start_ts`.
124
+ /// # Optimistic transaction
128
125
///
126
+ /// It reads at the "start timestamp" and caches the value, just like normal
127
+ /// get requests. The lock is written in prewrite and commit, so it cannot
128
+ /// prevent concurrent transactions from writing the same key, but can only
129
+ /// prevent itself from committing.
129
130
///
130
- /// It can only be used in pessimistic mode.
131
+ /// # Pessimistic transaction
132
+ ///
133
+ /// It reads at the "current timestamp" and thus does not cache the value.
134
+ /// So following read requests won't be affected by the `get_for_udpate`.
135
+ /// A lock will be acquired immediately with this request, which prevents
136
+ /// concurrent transactions from mutating the keys.
137
+ ///
138
+ /// The "current timestamp" (also called `for_update_ts` of the request) is
139
+ /// fetched immediately from the timestamp oracle.
140
+ ///
141
+ /// Note: The behavior of the request under pessimistic transaction does not
142
+ /// follow snapshot isolation.
131
143
///
132
144
/// # Examples
133
145
/// ```rust,no_run
@@ -146,7 +158,9 @@ impl<PdC: PdClient> Transaction<PdC> {
146
158
pub async fn get_for_update ( & mut self , key : impl Into < Key > ) -> Result < Option < Value > > {
147
159
self . check_allow_operation ( ) . await ?;
148
160
if !self . is_pessimistic ( ) {
149
- Err ( Error :: InvalidTransactionType )
161
+ let key = key. into ( ) ;
162
+ self . lock_keys ( iter:: once ( key. clone ( ) ) ) . await ?;
163
+ self . get ( key) . await
150
164
} else {
151
165
let mut pairs = self . pessimistic_lock ( iter:: once ( key. into ( ) ) , true ) . await ?;
152
166
debug_assert ! ( pairs. len( ) <= 1 ) ;
@@ -228,33 +242,25 @@ impl<PdC: PdClient> Transaction<PdC> {
228
242
229
243
/// Create a new 'batch get for update' request.
230
244
///
231
- /// Once resolved this request will pessimistically lock the keys and
232
- /// fetch the values associated with the given keys.
233
- ///
234
- /// Note: The behavior of this command does not follow snapshot isolation. It is similar to `select for update` in TiDB,
235
- /// which is similar to that in MySQL. It reads the latest value (using current timestamp),
236
- /// and the value is not cached in the local buffer.
237
- /// So normal `get`-like commands after `batch_get_for_update` will not be influenced, they still read values at `start_ts`.
238
- ///
239
- /// Non-existent entries will not appear in the result. The order of the keys is not retained in the result.
245
+ /// Similar [`get_for_update`](Transaction::get_for_update), but it works
246
+ /// for a batch of keys.
240
247
///
241
- /// It can only be used in pessimistic mode.
248
+ /// Non-existent entries will not appear in the result. The order of the
249
+ /// keys is not retained in the result.
242
250
///
243
251
/// # Examples
244
252
/// ```rust,no_run
245
- /// # use tikv_client::{Key, Value, Config, TransactionClient};
253
+ /// # use tikv_client::{Key, Value, Config, TransactionClient, KvPair };
246
254
/// # use futures::prelude::*;
247
255
/// # use std::collections::HashMap;
248
256
/// # futures::executor::block_on(async {
249
257
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
250
258
/// let mut txn = client.begin_pessimistic().await.unwrap();
251
259
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
252
- /// let result: HashMap<Key, Value > = txn
260
+ /// let result: Vec<KvPair > = txn
253
261
/// .batch_get_for_update(keys)
254
262
/// .await
255
- /// .unwrap()
256
- /// .map(|pair| (pair.0, pair.1))
257
- /// .collect();
263
+ /// .unwrap();
258
264
/// // now "TiKV" and "TiDB" are both locked
259
265
/// // Finish the transaction...
260
266
/// txn.commit().await.unwrap();
@@ -263,13 +269,15 @@ impl<PdC: PdClient> Transaction<PdC> {
263
269
pub async fn batch_get_for_update (
264
270
& mut self ,
265
271
keys : impl IntoIterator < Item = impl Into < Key > > ,
266
- ) -> Result < impl Iterator < Item = KvPair > > {
272
+ ) -> Result < Vec < KvPair > > {
267
273
self . check_allow_operation ( ) . await ?;
274
+ let keys: Vec < Key > = keys. into_iter ( ) . map ( |k| k. into ( ) ) . collect ( ) ;
268
275
if !self . is_pessimistic ( ) {
269
- return Err ( Error :: InvalidTransactionType ) ;
276
+ self . lock_keys ( keys. clone ( ) ) . await ?;
277
+ Ok ( self . batch_get ( keys) . await ?. collect ( ) )
278
+ } else {
279
+ self . pessimistic_lock ( keys, true ) . await
270
280
}
271
- let keys: Vec < Key > = keys. into_iter ( ) . map ( |it| it. into ( ) ) . collect ( ) ;
272
- Ok ( self . pessimistic_lock ( keys, true ) . await ?. into_iter ( ) )
273
281
}
274
282
275
283
/// Create a new 'scan' request.
@@ -473,8 +481,8 @@ impl<PdC: PdClient> Transaction<PdC> {
473
481
}
474
482
}
475
483
TransactionKind :: Pessimistic ( _) => {
476
- self . pessimistic_lock ( keys. into_iter ( ) . map ( |k| k. into ( ) ) , false )
477
- . await ?;
484
+ let keys: Vec < Key > = keys . into_iter ( ) . map ( |k| k. into ( ) ) . collect ( ) ;
485
+ self . pessimistic_lock ( keys . into_iter ( ) , false ) . await ?;
478
486
}
479
487
}
480
488
Ok ( ( ) )
@@ -649,7 +657,13 @@ impl<PdC: PdClient> Transaction<PdC> {
649
657
}
650
658
651
659
let first_key = keys[ 0 ] . clone ( ) . key ( ) ;
652
- let primary_lock = self . buffer . get_primary_key_or ( & first_key) . await ;
660
+ // we do not set the primary key here, because pessimistic lock request
661
+ // can fail, in which case the keys may not be part of the transaction.
662
+ let primary_lock = self
663
+ . buffer
664
+ . get_primary_key ( )
665
+ . await
666
+ . unwrap_or_else ( || first_key. clone ( ) ) ;
653
667
let for_update_ts = self . rpc . clone ( ) . get_timestamp ( ) . await ?;
654
668
self . options . push_for_update_ts ( for_update_ts. clone ( ) ) ;
655
669
let request = new_pessimistic_lock_request (
@@ -669,6 +683,9 @@ impl<PdC: PdClient> Transaction<PdC> {
669
683
. plan ( ) ;
670
684
let pairs = plan. execute ( ) . await ;
671
685
686
+ // primary key will be set here if needed
687
+ self . buffer . primary_key_or ( & first_key) . await ;
688
+
672
689
self . start_auto_heartbeat ( ) . await ;
673
690
674
691
for key in keys {
0 commit comments