Skip to content

Commit ddfad81

Browse files
sunxiaoguangHoverbear
authored andcommitted
Public API Request for Comments (#3)
Implement draft of tikv/rfcs#7 as of 80457b2df1c03cbca7cb32afec95e3c20a99b8d2.
1 parent d3f036c commit ddfad81

File tree

7 files changed

+1103
-0
lines changed

7 files changed

+1103
-0
lines changed

Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "tikv-client"
3+
version = "0.0.0"
4+
keywords = ["TiKV", "KV", "distributed-systems"]
5+
license = "Apache-2.0"
6+
authors = ["The TiKV Project Developers"]
7+
repository = "https://github.com/tikv/client-rust"
8+
description = "The rust language implementation of TiKV client."
9+
10+
[lib]
11+
name = "tikv_client"
12+
13+
[dependencies]
14+
futures = "0.1"
15+
serde = "1.0"
16+
serde_derive = "1.0"
17+
quick-error = "1.2"
18+
grpcio = { version = "0.4", features = [ "secure" ] }

examples/raw.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
extern crate futures;
2+
extern crate tikv_client;
3+
4+
use std::path::PathBuf;
5+
6+
use futures::future::Future;
7+
use tikv_client::*;
8+
9+
fn main() {
10+
let config = Config::new(vec!["127.0.0.1:3379"]).with_security(
11+
PathBuf::from("/path/to/ca.pem"),
12+
PathBuf::from("/path/to/client.pem"),
13+
PathBuf::from("/path/to/client-key.pem"),
14+
);
15+
let raw = raw::Client::new(&config)
16+
.wait()
17+
.expect("Could not connect to tikv");
18+
19+
let key: Key = b"Company".to_vec().into();
20+
let value: Value = b"PingCAP".to_vec().into();
21+
22+
raw.put(key.clone(), value.clone())
23+
.cf("test_cf")
24+
.wait()
25+
.expect("Could not put kv pair to tikv");
26+
println!("Successfully put {:?}:{:?} to tikv", key, value);
27+
28+
let value = raw
29+
.get(&key)
30+
.cf("test_cf")
31+
.wait()
32+
.expect("Could not get value");
33+
println!("Found val: {:?} for key: {:?}", value, key);
34+
35+
raw.delete(&key)
36+
.cf("test_cf")
37+
.wait()
38+
.expect("Could not delete value");
39+
println!("Key: {:?} deleted", key);
40+
41+
raw.get(&key)
42+
.cf("test_cf")
43+
.wait()
44+
.expect_err("Get returned value for not existing key");
45+
46+
let keys = vec![b"k1".to_vec().into(), b"k2".to_vec().into()];
47+
48+
let values = raw
49+
.batch_get(&keys)
50+
.cf("test_cf")
51+
.wait()
52+
.expect("Could not get values");
53+
println!("Found values: {:?} for keys: {:?}", values, keys);
54+
55+
let start: Key = b"k1".to_vec().into();
56+
let end: Key = b"k2".to_vec().into();
57+
raw.scan(&start..&end, 10)
58+
.cf("test_cf")
59+
.key_only()
60+
.wait()
61+
.expect("Could not scan");
62+
63+
let ranges = [&start..&end, &start..&end];
64+
raw.batch_scan(&ranges, 10)
65+
.cf("test_cf")
66+
.key_only()
67+
.wait()
68+
.expect("Could not batch scan");
69+
}

examples/transaction.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
extern crate futures;
2+
extern crate tikv_client;
3+
4+
use std::ops::RangeBounds;
5+
use std::path::PathBuf;
6+
7+
use futures::{future, Future, Stream};
8+
use tikv_client::transaction::{Client, IsolationLevel};
9+
use tikv_client::*;
10+
11+
fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
12+
let mut txn = client.begin();
13+
let _: Vec<()> = future::join_all(
14+
pairs
15+
.into_iter()
16+
.map(Into::into)
17+
.map(|p| txn.set(p.key().clone(), p.value().clone())),
18+
).wait()
19+
.expect("Could not set key value pairs");
20+
txn.commit().wait().expect("Could not commit transaction");
21+
}
22+
23+
fn get(client: &Client, key: &Key) -> Value {
24+
let txn = client.begin();
25+
txn.get(key).wait().expect("Could not get value")
26+
}
27+
28+
fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
29+
client
30+
.begin()
31+
.scan(range)
32+
.take_while(move |_| {
33+
Ok(if limit == 0 {
34+
false
35+
} else {
36+
limit -= 1;
37+
true
38+
})
39+
}).for_each(|pair| {
40+
println!("{:?}", pair);
41+
Ok(())
42+
}).wait()
43+
.expect("Could not scan keys");
44+
}
45+
46+
fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
47+
let mut txn = client.begin();
48+
txn.set_isolation_level(IsolationLevel::ReadCommitted);
49+
let _: Vec<()> = keys
50+
.into_iter()
51+
.map(|p| {
52+
txn.delete(p).wait().expect("Could not delete key");
53+
}).collect();
54+
txn.commit().wait().expect("Could not commit transaction");
55+
}
56+
57+
fn main() {
58+
let config = Config::new(vec!["127.0.0.1:3379"]).with_security(
59+
PathBuf::from("/path/to/ca.pem"),
60+
PathBuf::from("/path/to/client.pem"),
61+
PathBuf::from("/path/to/client-key.pem"),
62+
);
63+
let txn = Client::new(&config)
64+
.wait()
65+
.expect("Could not connect to tikv");
66+
67+
// set
68+
let key1: Key = b"key1".to_vec().into();
69+
let value1: Value = b"value1".to_vec().into();
70+
let key2: Key = b"key2".to_vec().into();
71+
let value2: Value = b"value2".to_vec().into();
72+
puts(&txn, vec![(key1, value1), (key2, value2)]);
73+
74+
// get
75+
let key1: Key = b"key1".to_vec().into();
76+
let value1 = get(&txn, &key1);
77+
println!("{:?}", (key1, value1));
78+
79+
// scan
80+
let key1: Key = b"key1".to_vec().into();
81+
scan(&txn, key1.., 10);
82+
83+
// delete
84+
let key1: Key = b"key1".to_vec().into();
85+
let key2: Key = b"key2".to_vec().into();
86+
dels(&txn, vec![key1, key2]);
87+
}

src/errors.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2016 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
use std::error;
15+
use std::result;
16+
17+
quick_error!{
18+
#[derive(Debug)]
19+
pub enum Error {
20+
Io(err: ::std::io::Error) {
21+
from()
22+
cause(err)
23+
description(err.description())
24+
}
25+
Grpc(err: ::grpc::Error) {
26+
from()
27+
cause(err)
28+
description(err.description())
29+
}
30+
Canceled(err: ::futures::sync::oneshot::Canceled) {
31+
from()
32+
cause(err)
33+
description(err.description())
34+
}
35+
Other(err: Box<error::Error + Sync + Send>) {
36+
from()
37+
cause(err.as_ref())
38+
description(err.description())
39+
display("unknown error {:?}", err)
40+
}
41+
RegionForKeyNotFound(key: Vec<u8>) {
42+
description("region is not found")
43+
display("region is not found for key {:?}", key)
44+
}
45+
RegionNotFound(id: u64) {
46+
description("region is not found")
47+
display("region {:?} is not found", id)
48+
}
49+
NotLeader(region_id: u64) {
50+
description("peer is not leader")
51+
display("peer is not leader for region {:?}.", region_id)
52+
}
53+
StoreNotMatch {
54+
description("store not match")
55+
display("store not match")
56+
}
57+
KeyNotInRegion(key: Vec<u8>, region_id: u64, start_key: Vec<u8>, end_key: Vec<u8>) {
58+
description("region is not found")
59+
display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key)
60+
}
61+
StaleEpoch {
62+
description("stale epoch")
63+
display("stale epoch")
64+
}
65+
ServerIsBusy(reason: String) {
66+
description("server is busy")
67+
display("server is busy: {:?}", reason)
68+
}
69+
RaftEntryTooLarge(region_id: u64, entry_size: u64) {
70+
description("raft entry too large")
71+
display("{:?} bytes raft entry of region {:?} is too large", entry_size, region_id)
72+
}
73+
}
74+
}
75+
76+
pub type Result<T> = result::Result<T, Error>;

src/lib.rs

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
extern crate futures;
2+
extern crate serde;
3+
#[macro_use]
4+
extern crate serde_derive;
5+
#[macro_use]
6+
extern crate quick_error;
7+
extern crate grpcio as grpc;
8+
9+
pub mod errors;
10+
pub mod raw;
11+
pub mod transaction;
12+
13+
use std::ops::Deref;
14+
use std::path::PathBuf;
15+
16+
pub use errors::Error;
17+
pub use errors::Result;
18+
19+
#[derive(Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)]
20+
pub struct Key(Vec<u8>);
21+
#[derive(Default, Clone, Eq, PartialEq, Hash, Debug)]
22+
pub struct Value(Vec<u8>);
23+
#[derive(Default, Clone, Eq, PartialEq, Debug)]
24+
pub struct KvPair(Key, Value);
25+
26+
impl Into<Key> for Vec<u8> {
27+
fn into(self) -> Key {
28+
Key(self)
29+
}
30+
}
31+
32+
impl AsRef<Key> for Key {
33+
fn as_ref(&self) -> &Self {
34+
self
35+
}
36+
}
37+
38+
impl Deref for Key {
39+
type Target = Vec<u8>;
40+
41+
fn deref(&self) -> &Self::Target {
42+
&self.0
43+
}
44+
}
45+
46+
impl Into<Value> for Vec<u8> {
47+
fn into(self) -> Value {
48+
Value(self)
49+
}
50+
}
51+
52+
impl Deref for Value {
53+
type Target = Vec<u8>;
54+
55+
fn deref(&self) -> &Self::Target {
56+
&self.0
57+
}
58+
}
59+
60+
impl KvPair {
61+
pub fn new(key: Key, value: Value) -> Self {
62+
KvPair(key, value)
63+
}
64+
65+
pub fn key(&self) -> &Key {
66+
&self.0
67+
}
68+
69+
pub fn value(&self) -> &Value {
70+
&self.1
71+
}
72+
}
73+
74+
impl Into<KvPair> for (Key, Value) {
75+
fn into(self) -> KvPair {
76+
KvPair(self.0, self.1)
77+
}
78+
}
79+
80+
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
81+
#[serde(default)]
82+
#[serde(rename_all = "kebab-case")]
83+
pub struct Config {
84+
pub pd_endpoints: Vec<String>,
85+
pub ca_path: Option<PathBuf>,
86+
pub cert_path: Option<PathBuf>,
87+
pub key_path: Option<PathBuf>,
88+
}
89+
90+
impl Config {
91+
pub fn new(pd_endpoints: impl IntoIterator<Item = impl Into<String>>) -> Self {
92+
Config {
93+
pd_endpoints: pd_endpoints.into_iter().map(Into::into).collect(),
94+
ca_path: None,
95+
cert_path: None,
96+
key_path: None,
97+
}
98+
}
99+
100+
pub fn with_security(
101+
mut self,
102+
ca_path: impl Into<PathBuf>,
103+
cert_path: impl Into<PathBuf>,
104+
key_path: impl Into<PathBuf>,
105+
) -> Self {
106+
self.ca_path = Some(ca_path.into());
107+
self.cert_path = Some(cert_path.into());
108+
self.key_path = Some(key_path.into());
109+
self
110+
}
111+
}

0 commit comments

Comments
 (0)