Skip to content

Commit 570b9f9

Browse files
committed
refactor: introduce an atomic mode for raw client
Signed-off-by: ekexium <[email protected]>
1 parent 6e02f61 commit 570b9f9

File tree

3 files changed

+101
-97
lines changed

3 files changed

+101
-97
lines changed

src/raw/client.rs

Lines changed: 72 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
2424
pub struct Client {
2525
rpc: Arc<PdRpcClient>,
2626
cf: Option<ColumnFamily>,
27+
/// Whether to use the [`atomic mode`](Client::with_atomic).
28+
atomic: bool,
2729
}
2830

2931
impl Client {
@@ -63,7 +65,11 @@ impl Client {
6365
) -> Result<Client> {
6466
let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
6567
let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, &config, false).await?);
66-
Ok(Client { rpc, cf: None })
68+
Ok(Client {
69+
rpc,
70+
cf: None,
71+
atomic: false,
72+
})
6773
}
6874

6975
/// Set the column family of requests.
@@ -89,6 +95,22 @@ impl Client {
8995
Client {
9096
rpc: self.rpc.clone(),
9197
cf: Some(cf),
98+
atomic: self.atomic,
99+
}
100+
}
101+
102+
/// Set to use the atomic mode.
103+
///
104+
/// The only reason of using atomic mode is the
105+
/// [`compare_and_swap`](Client::compare_and_swap) operation. To guarantee
106+
/// the atomicity of CAS, write operations like [`put`](Client::put) or
107+
/// [`delete`](Client::delete) in atomic mode are more expensive. Some
108+
/// operations are not supported in the mode.
109+
pub fn with_atomic(&self) -> Client {
110+
Client {
111+
rpc: self.rpc.clone(),
112+
cf: self.cf.clone(),
113+
atomic: true,
92114
}
93115
}
94116

@@ -171,15 +193,15 @@ impl Client {
171193
/// # });
172194
/// ```
173195
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
174-
self.put_inner(key, value, false).await
175-
}
176-
177-
/// Create a new *atomic* 'put' request.
178-
/// Atomic operations can block each other on the same key.
179-
///
180-
/// Once resolved this request will result in the setting of the value associated with the given key.
181-
pub async fn atomic_put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
182-
self.put_inner(key, value, true).await
196+
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
197+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
198+
.single_region()
199+
.await?
200+
.retry_region(DEFAULT_REGION_BACKOFF)
201+
.extract_error()
202+
.plan();
203+
plan.execute().await?;
204+
Ok(())
183205
}
184206

185207
/// Create a new 'batch put' request.
@@ -203,19 +225,18 @@ impl Client {
203225
&self,
204226
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
205227
) -> Result<()> {
206-
self.batch_put_inner(pairs, false).await
207-
}
208-
209-
/// Create a new *atomic* 'batch put' request.
210-
/// Atomic operations can block each other on the same key.
211-
///
212-
/// Once resolved this request will result in the setting of the values
213-
/// associated with the given keys.
214-
pub async fn atomic_batch_put(
215-
&self,
216-
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
217-
) -> Result<()> {
218-
self.batch_put_inner(pairs, true).await
228+
let request = new_raw_batch_put_request(
229+
pairs.into_iter().map(Into::into),
230+
self.cf.clone(),
231+
self.atomic,
232+
);
233+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
234+
.multi_region()
235+
.retry_region(DEFAULT_REGION_BACKOFF)
236+
.extract_error()
237+
.plan();
238+
plan.execute().await?;
239+
Ok(())
219240
}
220241

221242
/// Create a new 'delete' request.
@@ -236,29 +257,15 @@ impl Client {
236257
/// # });
237258
/// ```
238259
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
239-
self.delete_inner(key, false).await
240-
}
241-
242-
/// Create a new *atomic* 'delete' request.
243-
/// Atomic operations can block each other on the same key.
244-
///
245-
/// Once resolved this request will result in the deletion of the given key.
246-
///
247-
/// It does not return an error if the key does not exist in TiKV.
248-
///
249-
/// # Examples
250-
/// ```rust,no_run
251-
/// # use tikv_client::{Key, Config, RawClient};
252-
/// # use futures::prelude::*;
253-
/// # futures::executor::block_on(async {
254-
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
255-
/// let key = "TiKV".to_owned();
256-
/// let req = client.delete(key);
257-
/// let result: () = req.await.unwrap();
258-
/// # });
259-
/// ```
260-
pub async fn atomic_delete(&self, key: impl Into<Key>) -> Result<()> {
261-
self.delete_inner(key, true).await
260+
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
261+
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
262+
.single_region()
263+
.await?
264+
.retry_region(DEFAULT_REGION_BACKOFF)
265+
.extract_error()
266+
.plan();
267+
plan.execute().await?;
268+
Ok(())
262269
}
263270

264271
/// Create a new 'batch delete' request.
@@ -279,6 +286,7 @@ impl Client {
279286
/// # });
280287
/// ```
281288
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
289+
self.assert_non_atomic()?;
282290
let request =
283291
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
284292
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
@@ -306,6 +314,7 @@ impl Client {
306314
/// # });
307315
/// ```
308316
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
317+
self.assert_non_atomic()?;
309318
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
310319
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
311320
.multi_region()
@@ -436,18 +445,23 @@ impl Client {
436445

437446
/// Create a new *atomic* 'compare and set' request.
438447
///
439-
/// Once resolved this request will result in an atomic `compare and set' operation for the given key.
448+
/// Once resolved this request will result in an atomic `compare and set'
449+
/// operation for the given key.
440450
///
441-
/// If the value retrived is equal to `current_value`, `new_value` is written.
451+
/// If the value retrived is equal to `current_value`, `new_value` is
452+
/// written.
442453
///
443454
/// # Return Value
444-
/// A tuple is returned if successful: the previous value and whether the value is swapped
445-
pub async fn atomic_compare_and_swap(
455+
///
456+
/// A tuple is returned if successful: the previous value and whether the
457+
/// value is swapped
458+
pub async fn compare_and_swap(
446459
&self,
447460
key: impl Into<Key>,
448461
previous_value: impl Into<Option<Value>>,
449462
new_value: impl Into<Value>,
450463
) -> Result<(Option<Value>, bool)> {
464+
self.assert_atomic()?;
451465
let req = new_cas_request(
452466
key.into(),
453467
new_value.into(),
@@ -516,48 +530,15 @@ impl Client {
516530
plan.execute().await
517531
}
518532

519-
async fn put_inner(
520-
&self,
521-
key: impl Into<Key>,
522-
value: impl Into<Value>,
523-
atomic: bool,
524-
) -> Result<()> {
525-
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), atomic);
526-
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
527-
.single_region()
528-
.await?
529-
.retry_region(DEFAULT_REGION_BACKOFF)
530-
.extract_error()
531-
.plan();
532-
plan.execute().await?;
533-
Ok(())
534-
}
535-
536-
async fn batch_put_inner(
537-
&self,
538-
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
539-
atomic: bool,
540-
) -> Result<()> {
541-
let request =
542-
new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone(), atomic);
543-
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
544-
.multi_region()
545-
.retry_region(DEFAULT_REGION_BACKOFF)
546-
.extract_error()
547-
.plan();
548-
plan.execute().await?;
549-
Ok(())
533+
fn assert_non_atomic(&self) -> Result<()> {
534+
(!self.atomic)
535+
.then(|| ())
536+
.ok_or(Error::UnsupportedInAtomicMode)
550537
}
551538

552-
async fn delete_inner(&self, key: impl Into<Key>, atomic: bool) -> Result<()> {
553-
let request = new_raw_delete_request(key.into(), self.cf.clone(), atomic);
554-
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
555-
.single_region()
556-
.await?
557-
.retry_region(DEFAULT_REGION_BACKOFF)
558-
.extract_error()
559-
.plan();
560-
plan.execute().await?;
561-
Ok(())
539+
fn assert_atomic(&self) -> Result<()> {
540+
self.atomic
541+
.then(|| ())
542+
.ok_or(Error::UnsupportedInNonAtomicMode)
562543
}
563544
}

tests/integration_tests.rs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use std::{
1111
iter,
1212
};
1313
use tikv_client::{
14-
Key, KvPair, RawClient, Result, Transaction, TransactionClient, TransactionOptions, Value,
14+
Error, Key, KvPair, RawClient, Result, Transaction, TransactionClient, TransactionOptions,
15+
Value,
1516
};
1617

1718
// Parameters used in test
@@ -759,19 +760,19 @@ async fn pessimistic_heartbeat() -> Result<()> {
759760
#[serial]
760761
async fn raw_cas() -> Result<()> {
761762
clear_tikv().await;
762-
let client = RawClient::new(pd_addrs()).await?;
763+
let client = RawClient::new(pd_addrs()).await?.with_atomic();
763764
let key = "key".to_owned();
764765
let value = "value".to_owned();
765766
let new_value = "new value".to_owned();
766767

767-
client.atomic_put(key.clone(), value.clone()).await?;
768+
client.put(key.clone(), value.clone()).await?;
768769
assert_eq!(
769770
client.get(key.clone()).await?.unwrap(),
770771
value.clone().as_bytes()
771772
);
772773

773774
client
774-
.atomic_compare_and_swap(
775+
.compare_and_swap(
775776
key.clone(),
776777
Some("another_value".to_owned()).map(|v| v.into()),
777778
new_value.clone(),
@@ -783,7 +784,7 @@ async fn raw_cas() -> Result<()> {
783784
);
784785

785786
client
786-
.atomic_compare_and_swap(
787+
.compare_and_swap(
787788
key.clone(),
788789
Some(value.to_owned()).map(|v| v.into()),
789790
new_value.clone(),
@@ -794,8 +795,24 @@ async fn raw_cas() -> Result<()> {
794795
new_value.clone().as_bytes()
795796
);
796797

797-
client.atomic_delete(key.clone()).await?;
798+
client.delete(key.clone()).await?;
798799
assert!(client.get(key.clone()).await?.is_none());
800+
801+
// check unsupported operations
802+
assert!(matches!(
803+
client.batch_delete(vec![key.clone()]).await.err().unwrap(),
804+
Error::UnsupportedInAtomicMode
805+
));
806+
let client = RawClient::new(pd_addrs()).await?;
807+
assert!(matches!(
808+
client
809+
.compare_and_swap(key.clone(), None, vec![])
810+
.await
811+
.err()
812+
.unwrap(),
813+
Error::UnsupportedInNonAtomicMode
814+
));
815+
799816
Ok(())
800817
}
801818

tikv-client-common/src/errors.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ pub enum Error {
2828
/// An operation requires a primary key, but the transaction was empty.
2929
#[error("transaction has no primary key")]
3030
NoPrimaryKey,
31+
#[error(
32+
"The operation does is not supported in raw-atomic mode, please consider using an atomic client"
33+
)]
34+
UnsupportedInAtomicMode,
35+
#[error("The operation is only supported in raw-atomic mode, please consider using a non-atomic raw client")]
36+
UnsupportedInNonAtomicMode,
3137
/// Wraps a `std::io::Error`.
3238
#[error("IO error: {0}")]
3339
Io(#[from] std::io::Error),

0 commit comments

Comments
 (0)