Skip to content

Commit ba86754

Browse files
sunxiaoguangHoverbear
authored andcommitted
The initial version of Raw KV implementation (#14)
* Initial version of Raw Kv client Note: raw::Client::batch_scan is not implemented yet. Signed-off-by: Xiaoguang Sun <[email protected]> * Document most public code. Signed-off-by: Ana Hobden <[email protected]> * Reduce pub surface Signed-off-by: Ana Hobden <[email protected]> * fmt/lint Signed-off-by: Ana Hobden <[email protected]> * Add cf to concrete builder types Signed-off-by: Xiaoguang Sun <[email protected]> * Fixed some comments and confusing name Signed-off-by: Xiaoguang Sun <[email protected]> * Change Request from struct to enum Signed-off-by: Xiaoguang Sun <[email protected]> * Change tso_tx/rx channel to bounded Signed-off-by: Xiaoguang Sun <[email protected]> * Fix format issues and improve implementations Signed-off-by: Xiaoguang Sun <[email protected]> * Change to dyn trait syntax Signed-off-by: Xiaoguang Sun <[email protected]> * inline some functions Signed-off-by: Ana Hobden <[email protected]> * Better note on KvPair Signed-off-by: Ana Hobden <[email protected]> * Use 3 PDs in raw example Signed-off-by: Ana Hobden <[email protected]> * Clarify documentation Signed-off-by: Ana Hobden <[email protected]> * Get CI green Signed-off-by: Ana Hobden <[email protected]> * Remove not useful PrivateKey type Signed-off-by: Xiaoguang Sun <[email protected]> * Change CUSTOM_CF to "default" in examples/raw.rs Signed-off-by: Xiaoguang Sun <[email protected]>
1 parent 5ed3327 commit ba86754

23 files changed

+4776
-443
lines changed

.travis.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ env:
1616
cache: cargo
1717

1818
rust:
19+
os:
20+
- linux
21+
- windows
22+
- osx
1923

2024
matrix:
2125
include:
@@ -33,6 +37,8 @@ matrix:
3337

3438

3539
script:
40+
- docker run -d --net=host --name pd --rm pingcap/pd --name "pd" --data-dir "pd" --client-urls "http://127.0.0.1:2379" --advertise-client-urls "http://127.0.0.1:2379"
41+
- docker run -d --net=host --name kv --rm pingcap/tikv --pd-endpoints "127.0.0.1:2379" --addr "127.0.0.1:2378" --data-dir "kv"
3642
- cargo test --all -- --nocapture
3743
# Validate benches still work.
3844
- cargo bench --all -- --test

Cargo.toml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,20 @@ serde = "1.0"
1717
serde_derive = "1.0"
1818
quick-error = "1.2"
1919
grpcio = { version = "0.4", features = [ "secure" ] }
20+
protobuf = "~2.0"
21+
tokio-core = "0.1"
22+
tokio-timer = "0.2"
23+
fxhash = "0.2"
24+
lazy_static = "0.2.1"
25+
log = "0.3.9"
26+
27+
[dependencies.kvproto]
28+
git = "https://github.com/pingcap/kvproto.git"
29+
30+
[dependencies.prometheus]
31+
version = "0.4.2"
32+
default-features = false
33+
features = ["push", "process"]
34+
35+
[dev-dependencies]
36+
tempdir = "0.3"

examples/raw.rs

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,66 +13,104 @@
1313

1414
use futures::future::Future;
1515
use std::path::PathBuf;
16-
use tikv_client::*;
16+
use tikv_client::{raw::Client, Config, Key, KvPair, Result, Value};
1717

18-
fn main() {
19-
let config = Config::new(vec!["127.0.0.1:3379"]).with_security(
18+
const KEY: &str = "TiKV";
19+
const VALUE: &str = "Rust";
20+
const CUSTOM_CF: &str = "default";
21+
22+
fn main() -> Result<()> {
23+
// Create a configuration to use for the example.
24+
// Optionally encrypt the traffic.
25+
let config = Config::new(vec![
26+
"192.168.0.100:3379", // Avoid a single point of failure,
27+
"192.168.0.101:3379", // use more than one PD endpoint.
28+
"192.168.0.102:3379",
29+
])
30+
.with_security(
2031
PathBuf::from("/path/to/ca.pem"),
2132
PathBuf::from("/path/to/client.pem"),
2233
PathBuf::from("/path/to/client-key.pem"),
2334
);
24-
let raw = raw::Client::new(&config)
25-
.wait()
26-
.expect("Could not connect to tikv");
2735

28-
let key: Key = b"Company".to_vec().into();
29-
let value: Value = b"PingCAP".to_vec().into();
36+
// When we first create a client we recieve a `Connect` structure which must be resolved before
37+
// the client is actually connected and usable.
38+
let unconnnected_client = Client::new(&config);
39+
let client = unconnnected_client.wait()?;
3040

31-
raw.put(key.clone(), value.clone())
32-
.cf("test_cf")
33-
.wait()
34-
.expect("Could not put kv pair to tikv");
35-
println!("Successfully put {:?}:{:?} to tikv", key, value);
41+
// Requests are created from the connected client. These calls return structures which
42+
// implement `Future`. This means the `Future` must be resolved before the action ever takes
43+
// place.
44+
//
45+
// Here we set the key `TiKV` to have the value `Rust` associated with it.
46+
let put_request = client.put(KEY, VALUE);
47+
put_request.wait()?; // Returns a `tikv_client::Error` on failure.
48+
println!("Put key \"{}\", value \"{}\".", KEY, VALUE);
3649

37-
let value = raw
38-
.get(&key)
39-
.cf("test_cf")
40-
.wait()
41-
.expect("Could not get value");
42-
println!("Found val: {:?} for key: {:?}", value, key);
50+
//
51+
// Unlike a standard Rust HashMap all calls take owned values. This is because under the hood
52+
// protobufs must take ownership of the data. If we only took a borrow we'd need to internally
53+
// clone it. This is against Rust API guidelines, so you must manage this yourself.
54+
//
55+
// Above, you saw we can use a `&'static str`, this is primarily for making examples short.
56+
// This type is practical to use for real things, and usage forces an internal copy.
57+
//
58+
// It is best to pass a `Vec<u8>` in terms of explictness and speed. `String`s and a few other
59+
// types are supported as well, but it all ends up as `Vec<u8>` in the end.
60+
let key: String = String::from(KEY);
61+
let value: Value = client.get(key.clone()).wait()?;
62+
assert_eq!(value.as_ref(), VALUE.as_bytes());
63+
println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY);
4364

44-
raw.delete(&key)
45-
.cf("test_cf")
65+
// You can also set the `ColumnFamily` used by the request.
66+
// This is *advanced usage* and should have some special considerations.
67+
client
68+
.delete(key.clone())
69+
.cf(CUSTOM_CF)
4670
.wait()
4771
.expect("Could not delete value");
4872
println!("Key: {:?} deleted", key);
4973

50-
raw.get(&key)
51-
.cf("test_cf")
74+
client
75+
.get(key)
76+
.cf(CUSTOM_CF)
5277
.wait()
5378
.expect_err("Get returned value for not existing key");
5479

55-
let keys = vec![b"k1".to_vec().into(), b"k2".to_vec().into()];
80+
let pairs: Vec<KvPair> = (1..3)
81+
.map(|i| KvPair::from((Key::from(format!("k{}", i)), Value::from(format!("v{}", i)))))
82+
.collect();
83+
client
84+
.batch_put(pairs.clone())
85+
.wait()
86+
.expect("Could not put pairs");
87+
88+
let keys = vec![Key::from(b"k1".to_vec()), Key::from(b"k2".to_vec())];
5689

57-
let values = raw
58-
.batch_get(&keys)
59-
.cf("test_cf")
90+
let values = client
91+
.batch_get(keys.clone())
92+
.cf(CUSTOM_CF)
6093
.wait()
6194
.expect("Could not get values");
6295
println!("Found values: {:?} for keys: {:?}", values, keys);
6396

6497
let start: Key = b"k1".to_vec().into();
6598
let end: Key = b"k2".to_vec().into();
66-
raw.scan(&start..&end, 10)
67-
.cf("test_cf")
99+
client
100+
.scan(start.clone()..end.clone(), 10)
101+
.cf(CUSTOM_CF)
68102
.key_only()
69103
.wait()
70104
.expect("Could not scan");
71105

72-
let ranges = [&start..&end, &start..&end];
73-
raw.batch_scan(&ranges, 10)
74-
.cf("test_cf")
106+
let ranges = vec![start.clone()..end.clone(), start.clone()..end.clone()];
107+
client
108+
.batch_scan(ranges, 10)
109+
.cf(CUSTOM_CF)
75110
.key_only()
76111
.wait()
77112
.expect("Could not batch scan");
113+
114+
// Cleanly exit.
115+
Ok(())
78116
}

examples/transaction.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
3232
txn.commit().wait().expect("Could not commit transaction");
3333
}
3434

35-
fn get(client: &Client, key: &Key) -> Value {
35+
fn get(client: &Client, key: Key) -> Value {
3636
let txn = client.begin();
3737
txn.get(key).wait().expect("Could not get value")
3838
}
@@ -70,7 +70,7 @@ fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
7070
}
7171

7272
fn main() {
73-
let config = Config::new(vec!["127.0.0.1:3379"]).with_security(
73+
let config = Config::new(vec!["127.0.0.1:2379"]).with_security(
7474
PathBuf::from("/path/to/ca.pem"),
7575
PathBuf::from("/path/to/client.pem"),
7676
PathBuf::from("/path/to/client-key.pem"),
@@ -88,7 +88,7 @@ fn main() {
8888

8989
// get
9090
let key1: Key = b"key1".to_vec().into();
91-
let value1 = get(&txn, &key1);
91+
let value1 = get(&txn, key1.clone());
9292
println!("{:?}", (key1, value1));
9393

9494
// scan

rust-toolchain

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
stable
1+
stable

src/errors.rs

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,62 +16,121 @@ use quick_error::quick_error;
1616
use std::{error, result};
1717

1818
quick_error! {
19+
/// An error originating from the TiKV client or dependencies.
20+
///
21+
/// This client currently uses [`quick_error`](https://docs.rs/quick-error/1.2.2/quick_error/)
22+
/// for errors. *This may change in future versions.*
1923
#[derive(Debug)]
2024
pub enum Error {
25+
/// Wraps a `std::io::Error`.
2126
Io(err: ::std::io::Error) {
2227
from()
2328
cause(err)
2429
description(err.description())
2530
}
31+
/// Wraps a `grpcio::Error`.
2632
Grpc(err: grpcio::Error) {
2733
from()
2834
cause(err)
2935
description(err.description())
3036
}
37+
/// Represents that a futures oneshot channel was cancelled.
3138
Canceled(err: ::futures::sync::oneshot::Canceled) {
3239
from()
3340
cause(err)
3441
description(err.description())
3542
}
43+
/// An unknown error.
44+
///
45+
/// Generally, this is not an expected error. Please report it if encountered.
3646
Other(err: Box<error::Error + Sync + Send>) {
3747
from()
3848
cause(err.as_ref())
3949
description(err.description())
4050
display("unknown error {:?}", err)
4151
}
52+
/// A region was not found for the given key.
4253
RegionForKeyNotFound(key: Vec<u8>) {
4354
description("region is not found")
4455
display("region is not found for key {:?}", key)
4556
}
46-
RegionNotFound(id: u64) {
57+
/// A region was not found.
58+
RegionNotFound(region_id: u64, message: Option<String>) {
4759
description("region is not found")
48-
display("region {:?} is not found", id)
60+
display("region {:?} is not found. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
4961
}
50-
NotLeader(region_id: u64) {
62+
/// The peer is not a leader of the given region.
63+
NotLeader(region_id: u64, message: Option<String>) {
5164
description("peer is not leader")
52-
display("peer is not leader for region {:?}.", region_id)
65+
display("peer is not leader for region {}. {}", region_id, message.as_ref().unwrap_or(&"".to_owned()))
5366
}
54-
StoreNotMatch {
67+
/// The store does not match.
68+
StoreNotMatch(request_store_id: u64, actual_store_id: u64, message: String) {
5569
description("store not match")
56-
display("store not match")
70+
display("requesting store '{}' when actual store is '{}'. {}", request_store_id, actual_store_id, message)
5771
}
72+
/// The given key is not within the given region.
5873
KeyNotInRegion(key: Vec<u8>, region_id: u64, start_key: Vec<u8>, end_key: Vec<u8>) {
5974
description("region is not found")
6075
display("key {:?} is not in region {:?}: [{:?}, {:?})", key, region_id, start_key, end_key)
6176
}
62-
StaleEpoch {
77+
/// A stale epoch.
78+
StaleEpoch(message: Option<String>) {
6379
description("stale epoch")
64-
display("stale epoch")
80+
display("{}", message.as_ref().unwrap_or(&"".to_owned()))
6581
}
66-
ServerIsBusy(reason: String) {
82+
StaleCommand(message: String) {
83+
description("stale command")
84+
display("{}", message)
85+
}
86+
/// The server is too busy.
87+
ServerIsBusy(reason: String, backoff: u64) {
6788
description("server is busy")
68-
display("server is busy: {:?}", reason)
89+
display("server is busy: {:?}. Backoff {} ms", reason, backoff)
6990
}
70-
RaftEntryTooLarge(region_id: u64, entry_size: u64) {
91+
/// The given raft entry is too large for the region.
92+
RaftEntryTooLarge(region_id: u64, entry_size: u64, message: String) {
7193
description("raft entry too large")
72-
display("{:?} bytes raft entry of region {:?} is too large", entry_size, region_id)
94+
display("{:?} bytes raft entry of region {:?} is too large. {}", entry_size, region_id, message)
95+
}
96+
KeyError(message: String) {
97+
description("key error")
98+
display("{}", message)
99+
}
100+
KVError(message: String) {
101+
description("kv error")
102+
display("{}", message)
103+
}
104+
InternalError(message: String) {
105+
description("internal error")
106+
display("{}", message)
107+
}
108+
InvalidKeyRange {
109+
description("invalid key range")
110+
display("Only left closed intervals are supported")
111+
}
112+
Unimplemented {
113+
description("unimplemented feature")
114+
display("Unimplemented feature")
115+
}
116+
EmptyValue {
117+
description("can not set empty value")
118+
display("Can not set empty value")
119+
}
120+
NoSuchKey {
121+
description("key does not exist")
122+
display("Key doest not exist")
123+
}
124+
InvalidOverlappingRanges {
125+
description("ranges can not be overlapping")
126+
display("Ranges can not be overlapping")
127+
}
128+
MaxScanLimitExceeded(limit: u32, max_limit: u32) {
129+
description("limit exceeds max scan limit")
130+
display("Limit {} excceds max scan limit {}", limit, max_limit)
73131
}
74132
}
75133
}
76134

135+
/// A result holding an [`Error`](enum.Error.html).
77136
pub type Result<T> = result::Result<T, Error>;

0 commit comments

Comments
 (0)