Skip to content

Commit 9a5fd1d

Browse files
committed
Address review comments
Signed-off-by: Nick Cameron <[email protected]>
1 parent 8cb8dd6 commit 9a5fd1d

File tree

22 files changed

+238
-147
lines changed

22 files changed

+238
-147
lines changed

.travis.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ os:
66
# - windows # TODO: https://github.com/pingcap/kvproto/issues/355
77
- osx
88
rust:
9-
# - stable
9+
- stable
1010
- nightly
1111

1212
env:
1313
global:
1414
- RUST_BACKTRACE=1
15-
- RUSTFLAGS="-D warnings"
16-
- RUSTDOCFLAGS="-D warnings"
15+
# - RUSTFLAGS="-D warnings"
16+
# - RUSTDOCFLAGS="-D warnings"
1717

1818
addons:
1919
apt:
@@ -43,7 +43,7 @@ script:
4343
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "stable" ]]; then cargo clippy -- -D clippy::all; fi
4444
- cargo build --all
4545
- cargo build --examples
46-
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo doc --exclude tikv-client-proto --document-private-items; fi
46+
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps; fi
4747
- if [[ $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo test --all -- --nocapture; fi
4848
# For now we only run full integration tests on Linux. Here's why:
4949
# * Docker on OS X is not supported by Travis.
@@ -53,7 +53,7 @@ script:
5353
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker ps; fi
5454
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs pd; fi
5555
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then docker logs kv; fi
56-
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then sleep 2; fi
56+
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then sleep 60; fi
5757
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then PD_ADDRS="127.0.0.1:2379" cargo test --all --features integration-tests -- --nocapture; fi
5858
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example raw -- --pd="127.0.0.1:2379"; fi
5959
- if [[ $TRAVIS_OS_NAME == "linux" && $TRAVIS_RUST_VERSION == "nightly" ]]; then cargo run --example transaction -- --pd="127.0.0.1:2379"; fi

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ async-trait = "0.1"
2222
derive-new = "0.5"
2323
futures = { version = "0.3.5", features = ["async-await", "thread-pool"] }
2424
futures-timer = "3.0"
25-
grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false }
25+
grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false }
2626
lazy_static = "1"
2727
log = "0.4"
2828
prometheus = { version = "0.11", features = [ "push", "process" ], default-features = false }

mock-tikv/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ edition = "2018"
66
[dependencies]
77
derive-new = "0.5.8"
88
futures = "0.3.5"
9-
grpcio = { version = "0.7", features = [ "secure", "prost-codec" ], default-features = false }
9+
grpcio = { version = "0.8", features = [ "secure", "prost-codec", "use-bindgen" ], default-features = false }
1010
log = "0.4"
1111
tikv-client-proto = { path = "../tikv-client-proto"}

rustfmt.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
merge_imports = true
1+
imports_granularity="Crate"
22
format_code_in_doc_comments = true

src/compat.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::pin::Pin;
1818
pub struct LoopFn<A, F> {
1919
future: A,
2020
func: F,
21+
errored: bool,
2122
}
2223

2324
pub fn stream_fn<S, T, A, F, E>(initial_state: S, mut func: F) -> LoopFn<A, F>
@@ -28,6 +29,7 @@ where
2829
LoopFn {
2930
future: func(initial_state),
3031
func,
32+
errored: false,
3133
}
3234
}
3335

@@ -38,10 +40,16 @@ where
3840
{
3941
type Item = Result<T, E>;
4042
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
43+
if self.errored {
44+
return Poll::Ready(None);
45+
}
4146
unsafe {
4247
let this = Pin::get_unchecked_mut(self);
4348
match ready!(Pin::new_unchecked(&mut this.future).poll(cx)) {
44-
Err(e) => Poll::Ready(Some(Err(e))),
49+
Err(e) => {
50+
this.errored = true;
51+
Poll::Ready(Some(Err(e)))
52+
}
4553
Ok(None) => Poll::Ready(None),
4654
Ok(Some((s, t))) => {
4755
this.future = (this.func)(s);

src/lib.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
#![allow(clippy::redundant_closure)]
66
#![allow(clippy::type_complexity)]
77
#![allow(incomplete_features)]
8-
#![cfg_attr(test, feature(specialization))]
9-
#![feature(associated_type_bounds)]
10-
#![feature(decl_macro)]
118

129
//! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
1310
//! distributed transactional Key-Value database written in Rust.
@@ -77,6 +74,9 @@
7774
//!
7875
//! At this point, you should seek the documentation in the related API modules.
7976
77+
#[macro_use]
78+
mod request;
79+
8080
#[macro_use]
8181
mod transaction;
8282

@@ -87,7 +87,6 @@ mod kv;
8787
mod pd;
8888
mod raw;
8989
mod region;
90-
mod request;
9190
mod stats;
9291
mod store;
9392
mod timestamp;

src/mock.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,3 +154,10 @@ impl PdClient for MockPdClient {
154154
unimplemented!()
155155
}
156156
}
157+
158+
pub fn mock_store() -> Store {
159+
Store {
160+
region: Region::default(),
161+
client: Arc::new(MockKvClient::new("foo".to_owned(), None)),
162+
}
163+
}

src/raw/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ impl Client {
117117
.await?
118118
.resolve_lock(OPTIMISTIC_BACKOFF)
119119
.retry_region(DEFAULT_REGION_BACKOFF)
120-
.post_process()
120+
.post_process_default()
121121
.plan();
122122
plan.execute().await
123123
}
@@ -147,8 +147,8 @@ impl Client {
147147
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
148148
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
149149
.resolve_lock(OPTIMISTIC_BACKOFF)
150-
.retry_region(DEFAULT_REGION_BACKOFF)
151150
.multi_region()
151+
.retry_region(DEFAULT_REGION_BACKOFF)
152152
.merge(Collect)
153153
.plan();
154154
plan.execute()

src/raw/requests.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
use super::RawRpcRequest;
44
use crate::{
55
pd::PdClient,
6-
request::{
7-
shardable_keys, shardable_range, Collect, KvRequest, Merge, Process, Shardable, SingleKey,
8-
},
6+
request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey},
97
store::{store_stream_for_keys, store_stream_for_ranges, Store},
108
transaction::HasLocks,
119
util::iter::FlatMapOkIterExt,
@@ -33,10 +31,10 @@ impl SingleKey for kvrpcpb::RawGetRequest {
3331
}
3432
}
3533

36-
impl Process for kvrpcpb::RawGetResponse {
34+
impl Process<kvrpcpb::RawGetResponse> for DefaultProcessor {
3735
type Out = Option<Value>;
3836

39-
fn process(input: Result<Self>) -> Result<Self::Out> {
37+
fn process(&self, input: Result<kvrpcpb::RawGetResponse>) -> Result<Self::Out> {
4038
let mut input = input?;
4139
Ok(if input.not_found {
4240
None

src/request/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,16 @@ use tikv_client_store::{HasError, Request};
1010

1111
pub use self::{
1212
plan::{
13-
Collect, CollectError, Dispatch, Merge, MergeResponse, MultiRegionPlan, Plan, Process,
14-
ProcessResponse, ResolveLockPlan, RetryRegionPlan,
13+
Collect, CollectError, DefaultProcessor, Dispatch, Merge, MergeResponse, MultiRegion, Plan,
14+
Process, ProcessResponse, ResolveLock, RetryRegion,
1515
},
1616
plan_builder::{PlanBuilder, SingleKey},
17-
shard::{shardable_keys, shardable_range, Shardable},
17+
shard::Shardable,
1818
};
1919

2020
mod plan;
2121
mod plan_builder;
22+
#[macro_use]
2223
mod shard;
2324

2425
/// Abstracts any request sent to a TiKV server.

0 commit comments

Comments
 (0)