Skip to content

Commit 3fad149

Browse files
authored
Merge pull request #232 from nrc/plan
Refactor the command abstraction
2 parents 56c9933 + 9a5fd1d commit 3fad149

38 files changed

+2147
-1507
lines changed

.travis.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ rust:
1212
env:
1313
global:
1414
- RUST_BACKTRACE=1
15-
- RUSTFLAGS="-D warnings"
16-
- RUSTDOCFLAGS="-D warnings"
15+
# - RUSTFLAGS="-D warnings"
16+
# - RUSTDOCFLAGS="-D warnings"
1717

1818
addons:
1919
apt:
@@ -43,7 +43,7 @@ script:
4343
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "stable" ]]; then cargo clippy -- -D clippy::all; fi
4444
- cargo build --all
4545
- cargo build --examples
46-
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo doc --exclude tikv-client-proto --document-private-items; fi
46+
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps; fi
4747
- if [[ $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo test --all -- --nocapture; fi
4848
# For now we only run full integration tests on Linux. Here's why:
4949
# * Docker on OS X is not supported by Travis.
@@ -53,7 +53,7 @@ script:
5353
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker ps; fi
5454
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs pd; fi
5555
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs kv; fi
56-
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then sleep 2; fi
56+
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then sleep 60; fi
5757
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi
5858
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example raw -- --pd="127.0.0.1:2379"; fi
5959
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example transaction -- --pd="127.0.0.1:2379"; fi

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async-trait = "0.1"
2222
derive-new = "0.5"
2323
futures = { version = "0.3.5", features = ["async-await", "thread-pool"] }
2424
futures-timer = "3.0"
25-
grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false }
25+
grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false }
2626
lazy_static = "1"
2727
log = "0.4"
2828
prometheus = { version = "0.11", features = [ "push", "process" ], default-features = false }

examples/pessimistic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
mod common;
44

55
use crate::common::parse_args;
6-
use tikv_client::{Config, Key, TransactionClient as Client, Value};
6+
use tikv_client::{Config, Key, TransactionClient as Client, TransactionOptions, Value};
77

88
#[tokio::main]
99
async fn main() {
@@ -50,7 +50,7 @@ async fn main() {
5050
{
5151
// another txn cannot write to the locked key
5252
let mut txn2 = client
53-
.begin_optimistic()
53+
.begin_with_options(TransactionOptions::new_optimistic().no_resolve_locks())
5454
.await
5555
.expect("Could not begin a transaction");
5656
let value2: Value = b"value2".to_vec();

mock-tikv/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ edition = "2018"
66
[dependencies]
77
derive-new = "0.5.8"
88
futures = "0.3.5"
9-
grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false }
9+
grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false }
1010
log = "0.4"
1111
tikv-client-proto = { path = "../tikv-client-proto"}

rustfmt.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
merge_imports = true
1+
imports_granularity="Crate"
22
format_code_in_doc_comments = true

src/backoff.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
use rand::{thread_rng, Rng};
66
use std::time::Duration;
77

8+
pub const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
9+
pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
10+
pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_backoff();
11+
812
/// When a request is retried, we can backoff for some time to avoid saturating the network.
913
///
1014
/// `Backoff` is an object which determines how long to wait for.

src/compat.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::pin::Pin;
1818
pub struct LoopFn<A, F> {
1919
future: A,
2020
func: F,
21+
errored: bool,
2122
}
2223

2324
pub fn stream_fn<S, T, A, F, E>(initial_state: S, mut func: F) -> LoopFn<A, F>
@@ -28,6 +29,7 @@ where
2829
LoopFn {
2930
future: func(initial_state),
3031
func,
32+
errored: false,
3133
}
3234
}
3335

@@ -38,10 +40,16 @@ where
3840
{
3941
type Item = Result<T, E>;
4042
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
43+
if self.errored {
44+
return Poll::Ready(None);
45+
}
4146
unsafe {
4247
let this = Pin::get_unchecked_mut(self);
4348
match ready!(Pin::new_unchecked(&mut this.future).poll(cx)) {
44-
Err(e) => Poll::Ready(Some(Err(e))),
49+
Err(e) => {
50+
this.errored = true;
51+
Poll::Ready(Some(Err(e)))
52+
}
4553
Ok(None) => Poll::Ready(None),
4654
Ok(Some((s, t))) => {
4755
this.future = (this.func)(s);

src/kv/bound_range.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ impl BoundRange {
7575
BoundRange { from, to }
7676
}
7777

78+
/// Create a new BoundRange bounded below by `from` and unbounded above.
79+
pub fn range_from(from: Key) -> BoundRange {
80+
BoundRange {
81+
from: Bound::Included(from),
82+
to: Bound::Unbounded,
83+
}
84+
}
85+
7886
/// Ranges used in scanning TiKV have a particularity to them.
7987
///
8088
/// The **start** of a scan is inclusive, unless appended with an '\0', then it is exclusive.

src/kv/key.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ impl<'a> From<&'a Key> for &'a [u8] {
152152
}
153153
}
154154

155+
impl<'a> From<&'a Vec<u8>> for &'a Key {
156+
fn from(key: &'a Vec<u8>) -> Self {
157+
unsafe { &*(key as *const Vec<u8> as *const Key) }
158+
}
159+
}
155160
impl AsRef<Key> for Key {
156161
fn as_ref(&self) -> &Key {
157162
self

src/kv/kvpair.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ impl From<KvPair> for (Key, Value) {
9696
}
9797
}
9898

99+
impl From<KvPair> for Key {
100+
fn from(pair: KvPair) -> Self {
101+
pair.0
102+
}
103+
}
104+
99105
impl From<kvrpcpb::KvPair> for KvPair {
100106
fn from(mut pair: kvrpcpb::KvPair) -> Self {
101107
KvPair(Key::from(pair.take_key()), pair.take_value())

0 commit comments

Comments
 (0)