Skip to content

Commit 9c89895

Browse files
committed
feat: implement atomic requests
Signed-off-by: ekexium <[email protected]>
1 parent 12f2aa8 commit 9c89895

File tree

7 files changed

+202
-37
lines changed

7 files changed

+202
-37
lines changed

src/raw/client.rs

Lines changed: 117 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -173,16 +173,15 @@ impl Client {
173173
/// # });
174174
/// ```
175175
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
176-
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone());
177-
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
178-
.single_region()
179-
.await?
180-
.resolve_lock(OPTIMISTIC_BACKOFF)
181-
.retry_region(DEFAULT_REGION_BACKOFF)
182-
.extract_error()
183-
.plan();
184-
plan.execute().await?;
185-
Ok(())
176+
self.put_inner(key, value, false).await
177+
}
178+
179+
/// Create a new *atomic* 'put' request.
180+
/// Atomic operations can block each other on the same key.
181+
///
182+
/// Once resolved this request will result in the setting of the value associated with the given key.
183+
pub async fn atomic_put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
184+
self.put_inner(key, value, true).await
186185
}
187186

188187
/// Create a new 'batch put' request.
@@ -206,15 +205,19 @@ impl Client {
206205
&self,
207206
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
208207
) -> Result<()> {
209-
let request = new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone());
210-
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
211-
.resolve_lock(OPTIMISTIC_BACKOFF)
212-
.multi_region()
213-
.retry_region(DEFAULT_REGION_BACKOFF)
214-
.extract_error()
215-
.plan();
216-
plan.execute().await?;
217-
Ok(())
208+
self.batch_put_inner(pairs, false).await
209+
}
210+
211+
/// Create a new *atomic* 'batch put' request.
212+
/// Atomic operations can block each other on the same key.
213+
///
214+
/// Once resolved this request will result in the setting of the values
215+
/// associated with the given keys.
216+
pub async fn atomic_batch_put(
217+
&self,
218+
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
219+
) -> Result<()> {
220+
self.batch_put_inner(pairs, true).await
218221
}
219222

220223
/// Create a new 'delete' request.
@@ -235,16 +238,29 @@ impl Client {
235238
/// # });
236239
/// ```
237240
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
238-
let request = new_raw_delete_request(key.into(), self.cf.clone());
239-
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
240-
.single_region()
241-
.await?
242-
.resolve_lock(OPTIMISTIC_BACKOFF)
243-
.retry_region(DEFAULT_REGION_BACKOFF)
244-
.extract_error()
245-
.plan();
246-
plan.execute().await?;
247-
Ok(())
241+
self.delete_inner(key, false).await
242+
}
243+
244+
/// Create a new *atomic* 'delete' request.
245+
/// Atomic operations can block each other on the same key.
246+
///
247+
/// Once resolved this request will result in the deletion of the given key.
248+
///
249+
/// It does not return an error if the key does not exist in TiKV.
250+
///
251+
/// # Examples
252+
/// ```rust,no_run
253+
/// # use tikv_client::{Key, Config, RawClient};
254+
/// # use futures::prelude::*;
255+
/// # futures::executor::block_on(async {
256+
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
257+
/// let key = "TiKV".to_owned();
258+
/// let req = client.delete(key);
259+
/// let result: () = req.await.unwrap();
260+
/// # });
261+
/// ```
262+
pub async fn atomic_delete(&self, key: impl Into<Key>) -> Result<()> {
263+
self.delete_inner(key, true).await
248264
}
249265

250266
/// Create a new 'batch delete' request.
@@ -422,6 +438,30 @@ impl Client {
422438
.collect())
423439
}
424440

441+
/// Create a new *atomic* 'compare and set' request.
442+
///
443+
/// Once resolved this request will result in an atomic `compare and set' operation for the given key.
444+
///
445+
/// If the value retrived is equal to `current_value`, `new_value` is written.
446+
///
447+
/// # Return Value
448+
/// A tuple is returned if successful: the previous value and whether the value is swapped
449+
pub async fn atomic_compare_and_swap(
450+
&self,
451+
key: impl Into<Key>,
452+
previous_value: Option<Value>,
453+
new_value: Value,
454+
) -> Result<(Option<Value>, bool)> {
455+
let req = new_cas_request(key.into(), new_value, previous_value, self.cf.clone());
456+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
457+
.single_region()
458+
.await?
459+
.retry_region(DEFAULT_REGION_BACKOFF)
460+
.post_process_default()
461+
.plan();
462+
plan.execute().await
463+
}
464+
425465
async fn scan_inner(
426466
&self,
427467
range: impl Into<BoundRange>,
@@ -476,4 +516,52 @@ impl Client {
476516
.plan();
477517
plan.execute().await
478518
}
519+
520+
async fn put_inner(
521+
&self,
522+
key: impl Into<Key>,
523+
value: impl Into<Value>,
524+
atomic: bool,
525+
) -> Result<()> {
526+
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), atomic);
527+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
528+
.single_region()
529+
.await?
530+
.resolve_lock(OPTIMISTIC_BACKOFF)
531+
.retry_region(DEFAULT_REGION_BACKOFF)
532+
.extract_error()
533+
.plan();
534+
plan.execute().await?;
535+
Ok(())
536+
}
537+
538+
async fn batch_put_inner(
539+
&self,
540+
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
541+
atomic: bool,
542+
) -> Result<()> {
543+
let request =
544+
new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone(), atomic);
545+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
546+
.resolve_lock(OPTIMISTIC_BACKOFF)
547+
.multi_region()
548+
.retry_region(DEFAULT_REGION_BACKOFF)
549+
.extract_error()
550+
.plan();
551+
plan.execute().await?;
552+
Ok(())
553+
}
554+
555+
async fn delete_inner(&self, key: impl Into<Key>, atomic: bool) -> Result<()> {
556+
let request = new_raw_delete_request(key.into(), self.cf.clone(), atomic);
557+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
558+
.single_region()
559+
.await?
560+
.resolve_lock(OPTIMISTIC_BACKOFF)
561+
.retry_region(DEFAULT_REGION_BACKOFF)
562+
.extract_error()
563+
.plan();
564+
plan.execute().await?;
565+
Ok(())
566+
}
479567
}

src/raw/lowering.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,25 @@ pub fn new_raw_put_request(
2222
key: Key,
2323
value: Value,
2424
cf: Option<ColumnFamily>,
25+
atomic: bool,
2526
) -> kvrpcpb::RawPutRequest {
26-
requests::new_raw_put_request(key.into(), value, cf)
27+
requests::new_raw_put_request(key.into(), value, cf, atomic)
2728
}
2829

2930
pub fn new_raw_batch_put_request(
3031
pairs: impl Iterator<Item = KvPair>,
3132
cf: Option<ColumnFamily>,
33+
atomic: bool,
3234
) -> kvrpcpb::RawBatchPutRequest {
33-
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf)
35+
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
3436
}
3537

36-
pub fn new_raw_delete_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
37-
requests::new_raw_delete_request(key.into(), cf)
38+
pub fn new_raw_delete_request(
39+
key: Key,
40+
cf: Option<ColumnFamily>,
41+
atomic: bool,
42+
) -> kvrpcpb::RawDeleteRequest {
43+
requests::new_raw_delete_request(key.into(), cf, atomic)
3844
}
3945

4046
pub fn new_raw_batch_delete_request(
@@ -76,3 +82,12 @@ pub fn new_raw_batch_scan_request(
7682
) -> kvrpcpb::RawBatchScanRequest {
7783
requests::new_raw_batch_scan_request(ranges.map(Into::into).collect(), each_limit, key_only, cf)
7884
}
85+
86+
pub fn new_cas_request(
87+
key: Key,
88+
value: Value,
89+
previous_value: Option<Value>,
90+
cf: Option<ColumnFamily>,
91+
) -> kvrpcpb::RawCasRequest {
92+
requests::new_cas_request(key.into(), value, previous_value, cf)
93+
}

src/raw/requests.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,13 @@ pub fn new_raw_put_request(
7676
key: Vec<u8>,
7777
value: Vec<u8>,
7878
cf: Option<ColumnFamily>,
79+
atomic: bool,
7980
) -> kvrpcpb::RawPutRequest {
8081
let mut req = kvrpcpb::RawPutRequest::default();
8182
req.set_key(key);
8283
req.set_value(value);
8384
req.maybe_set_cf(cf);
85+
req.set_for_cas(atomic);
8486

8587
req
8688
}
@@ -98,10 +100,12 @@ impl SingleKey for kvrpcpb::RawPutRequest {
98100
pub fn new_raw_batch_put_request(
99101
pairs: Vec<kvrpcpb::KvPair>,
100102
cf: Option<ColumnFamily>,
103+
atomic: bool,
101104
) -> kvrpcpb::RawBatchPutRequest {
102105
let mut req = kvrpcpb::RawBatchPutRequest::default();
103106
req.set_pairs(pairs);
104107
req.maybe_set_cf(cf);
108+
req.set_for_cas(atomic);
105109

106110
req
107111
}
@@ -132,10 +136,15 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
132136
}
133137
}
134138

135-
pub fn new_raw_delete_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
139+
pub fn new_raw_delete_request(
140+
key: Vec<u8>,
141+
cf: Option<ColumnFamily>,
142+
atomic: bool,
143+
) -> kvrpcpb::RawDeleteRequest {
136144
let mut req = kvrpcpb::RawDeleteRequest::default();
137145
req.set_key(key);
138146
req.maybe_set_cf(cf);
147+
req.set_for_cas(atomic);
139148

140149
req
141150
}
@@ -267,6 +276,46 @@ impl Merge<kvrpcpb::RawBatchScanResponse> for Collect {
267276
}
268277
}
269278

279+
pub fn new_cas_request(
280+
key: Vec<u8>,
281+
value: Vec<u8>,
282+
previous_value: Option<Vec<u8>>,
283+
cf: Option<ColumnFamily>,
284+
) -> kvrpcpb::RawCasRequest {
285+
let mut req = kvrpcpb::RawCasRequest::default();
286+
req.set_key(key);
287+
req.set_value(value);
288+
match previous_value {
289+
Some(v) => req.set_previous_value(v),
290+
None => req.set_previous_not_exist(true),
291+
}
292+
req.maybe_set_cf(cf);
293+
req
294+
}
295+
296+
impl KvRequest for kvrpcpb::RawCasRequest {
297+
type Response = kvrpcpb::RawCasResponse;
298+
}
299+
300+
impl SingleKey for kvrpcpb::RawCasRequest {
301+
fn key(&self) -> &Vec<u8> {
302+
&self.key
303+
}
304+
}
305+
306+
impl Process<kvrpcpb::RawCasResponse> for DefaultProcessor {
307+
type Out = (Option<Value>, bool); // (previous_value, swapped)
308+
309+
fn process(&self, input: Result<kvrpcpb::RawCasResponse>) -> Result<Self::Out> {
310+
let input = input?;
311+
if input.previous_not_exist {
312+
Ok((None, input.succeed))
313+
} else {
314+
Ok((Some(input.previous_value), input.succeed))
315+
}
316+
}
317+
}
318+
270319
macro_rules! impl_raw_rpc_request {
271320
($name: ident) => {
272321
impl RawRpcRequest for kvrpcpb::$name {
@@ -286,6 +335,7 @@ impl_raw_rpc_request!(RawBatchDeleteRequest);
286335
impl_raw_rpc_request!(RawScanRequest);
287336
impl_raw_rpc_request!(RawBatchScanRequest);
288337
impl_raw_rpc_request!(RawDeleteRangeRequest);
338+
impl_raw_rpc_request!(RawCasRequest);
289339

290340
impl HasLocks for kvrpcpb::RawGetResponse {}
291341
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
@@ -296,6 +346,7 @@ impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
296346
impl HasLocks for kvrpcpb::RawScanResponse {}
297347
impl HasLocks for kvrpcpb::RawBatchScanResponse {}
298348
impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
349+
impl HasLocks for kvrpcpb::RawCasResponse {}
299350

300351
#[cfg(test)]
301352
mod test {

src/request/plan.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,8 +276,11 @@ impl<P: Plan + HasKeys, PdC: PdClient> HasKeys for ResolveLock<P, PdC> {
276276
}
277277
}
278278

279-
/// When executed, the plan extracts errors from its inner plan, and
280-
/// returns an `Err` wrapping the error.
279+
/// When executed, the plan extracts errors from its inner plan, and returns an
280+
/// `Err` wrapping the error.
281+
///
282+
/// We usually need to apply this plan if (and only if) the output of the inner
283+
/// plan is of a response type.
281284
///
282285
/// The errors come from two places: `Err` from inner plans, and `Ok(response)`
283286
/// where `response` contains unresolved errors (`error` and `region_error`).

src/timestamp.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl TimestampExt for Timestamp {
3535
Self {
3636
physical: version >> PHYSICAL_SHIFT_BITS,
3737
logical: version & LOGICAL_MASK,
38-
// We only support global transactions
38+
// Now we only support global transactions
3939
suffix_bits: 0,
4040
}
4141
}

tikv-client-store/src/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ has_region_error!(kvrpcpb::RawBatchDeleteResponse);
5252
has_region_error!(kvrpcpb::RawDeleteRangeResponse);
5353
has_region_error!(kvrpcpb::RawScanResponse);
5454
has_region_error!(kvrpcpb::RawBatchScanResponse);
55+
has_region_error!(kvrpcpb::RawCasResponse);
5556

5657
macro_rules! has_key_error {
5758
($type:ty) => {
@@ -100,6 +101,7 @@ has_str_error!(kvrpcpb::RawBatchPutResponse);
100101
has_str_error!(kvrpcpb::RawDeleteResponse);
101102
has_str_error!(kvrpcpb::RawBatchDeleteResponse);
102103
has_str_error!(kvrpcpb::RawDeleteRangeResponse);
104+
has_str_error!(kvrpcpb::RawCasResponse);
103105
has_str_error!(kvrpcpb::ImportResponse);
104106
has_str_error!(kvrpcpb::DeleteRangeResponse);
105107

tikv-client-store/src/request.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ impl_request!(
6666
raw_delete_range_async_opt,
6767
"raw_delete_range"
6868
);
69+
impl_request!(
70+
RawCasRequest,
71+
raw_compare_and_swap_async_opt,
72+
"raw_compare_and_swap"
73+
);
74+
6975
impl_request!(GetRequest, kv_get_async_opt, "kv_get");
7076
impl_request!(ScanRequest, kv_scan_async_opt, "kv_scan");
7177
impl_request!(PrewriteRequest, kv_prewrite_async_opt, "kv_prewrite");

0 commit comments

Comments
 (0)