Skip to content

Commit b38ba7a

Browse files
committed
wip
Signed-off-by: Ping Yu <[email protected]>
1 parent c72a079 commit b38ba7a

File tree

18 files changed

+134
-135
lines changed

18 files changed

+134
-135
lines changed

Cargo.toml

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,22 @@ default = [ "protobuf-codec" ]
1313
# Enable integration tests with a running TiKV and PD instance.
1414
# Use $PD_ADDRS, comma separated, to set the addresses the tests use.
1515
integration-tests = []
16-
protobuf-codec = ["tikv-client-proto/protobuf-codec", "grpcio/protobuf-codec"]
17-
prost-codec = ["tikv-client-proto/prost-codec", "grpcio/prost-codec"]
16+
protobuf-codec = [
17+
"grpcio/protobuf-codec",
18+
"tikv-client-proto/protobuf-codec",
19+
"tikv-client-common/protobuf-codec",
20+
"tikv-client-pd/protobuf-codec",
21+
"tikv-client-store/protobuf-codec",
22+
"mock-tikv/protobuf-codec",
23+
]
24+
prost-codec = [
25+
"grpcio/prost-codec",
26+
"tikv-client-proto/prost-codec",
27+
"tikv-client-common/prost-codec",
28+
"tikv-client-pd/prost-codec",
29+
"tikv-client-store/prost-codec",
30+
"mock-tikv/prost-codec",
31+
]
1832

1933
[lib]
2034
name = "tikv_client"
@@ -26,7 +40,7 @@ either = "1.6"
2640
fail = "0.4"
2741
futures = { version = "0.3", features = ["async-await", "thread-pool"] }
2842
futures-timer = "3.0"
29-
grpcio = { version = "0.10", features = [ "openssl-vendored" ], default-features = false }
43+
grpcio = { version = "0.10.4", features = [ "openssl-vendored" ], default-features = false }
3044
lazy_static = "1"
3145
log = "0.4"
3246
prometheus = { version = "0.12", features = [ "push", "process" ], default-features = false }

mock-tikv/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ name = "mock-tikv"
33
version = "0.0.0"
44
edition = "2018"
55

6+
[features]
7+
protobuf-codec = ["grpcio/protobuf-codec"]
8+
prost-codec = ["grpcio/prost-codec"]
9+
610
[dependencies]
711
derive-new = "0.5"
812
futures = "0.3"
9-
grpcio = { version = "0.10", features = [ "prost-codec" ], default-features = false }
13+
grpcio = { version = "0.10.4", default-features = false }
1014
log = "0.4"
1115
tikv-client-proto = { path = "../tikv-client-proto"}

mock-tikv/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
2+
3+
// To support both prost & rust-protobuf.
4+
#![cfg_attr(feature = "prost-codec", allow(clippy::useless_conversion))]
5+
26
mod pd;
37
mod server;
48
mod store;

mock-tikv/src/pd.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ impl MockPd {
2020
tikv_client_proto::metapb::Region {
2121
start_key: vec![],
2222
end_key: vec![],
23-
peers: vec![Self::leader()],
23+
peers: vec![Self::leader()].into(),
2424
..Default::default()
2525
}
2626
}
@@ -58,12 +58,12 @@ impl Pd for MockPd {
5858
) {
5959
let member = Member {
6060
name: "mock tikv".to_owned(),
61-
client_urls: vec![format!("localhost:{MOCK_PD_PORT}")],
61+
client_urls: vec![format!("localhost:{MOCK_PD_PORT}")].into(),
6262
..Default::default()
6363
};
6464
let resp = GetMembersResponse {
65-
members: vec![member.clone()],
66-
leader: Some(member),
65+
members: vec![member.clone()].into(),
66+
leader: Some(member).into(),
6767
..Default::default()
6868
};
6969
spawn_unary_success!(ctx, req, resp, sink);
@@ -120,7 +120,7 @@ impl Pd for MockPd {
120120
sink: ::grpcio::UnarySink<GetStoreResponse>,
121121
) {
122122
let resp = GetStoreResponse {
123-
store: Some(Self::store()),
123+
store: Some(Self::store()).into(),
124124
..Default::default()
125125
};
126126
spawn_unary_success!(ctx, req, resp, sink);
@@ -169,8 +169,8 @@ impl Pd for MockPd {
169169
sink: ::grpcio::UnarySink<GetRegionResponse>,
170170
) {
171171
let resp = GetRegionResponse {
172-
region: Some(Self::region()),
173-
leader: Some(Self::leader()),
172+
region: Some(Self::region()).into(),
173+
leader: Some(Self::leader()).into(),
174174
..Default::default()
175175
};
176176
spawn_unary_success!(ctx, req, resp, sink);
@@ -192,8 +192,8 @@ impl Pd for MockPd {
192192
sink: ::grpcio::UnarySink<GetRegionResponse>,
193193
) {
194194
let resp = GetRegionResponse {
195-
region: Some(Self::region()),
196-
leader: Some(Self::leader()),
195+
region: Some(Self::region()).into(),
196+
leader: Some(Self::leader()).into(),
197197
..Default::default()
198198
};
199199
spawn_unary_success!(ctx, req, resp, sink);

mock-tikv/src/server.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
22

3+
#![allow(clippy::useless_conversion)] // To support both prost & rust-protobuf.
4+
35
use crate::{spawn_unary_success, KvStore};
46
use derive_new::new;
57
use futures::{FutureExt, TryFutureExt};
@@ -192,7 +194,7 @@ impl Tikv for MockTikv {
192194
sink: grpcio::UnarySink<tikv_client_proto::kvrpcpb::RawBatchGetResponse>,
193195
) {
194196
let mut resp = tikv_client_proto::kvrpcpb::RawBatchGetResponse::default();
195-
resp.set_pairs(self.inner.raw_batch_get(req.take_keys()));
197+
resp.set_pairs(self.inner.raw_batch_get(req.take_keys().into()).into());
196198
spawn_unary_success!(ctx, req, resp, sink);
197199
}
198200

@@ -214,7 +216,7 @@ impl Tikv for MockTikv {
214216
mut req: tikv_client_proto::kvrpcpb::RawBatchPutRequest,
215217
sink: grpcio::UnarySink<tikv_client_proto::kvrpcpb::RawBatchPutResponse>,
216218
) {
217-
let pairs = req.take_pairs();
219+
let pairs = req.take_pairs().into();
218220
self.inner.raw_batch_put(pairs);
219221
let resp = RawBatchPutResponse::default();
220222
spawn_unary_success!(ctx, req, resp, sink);
@@ -238,7 +240,7 @@ impl Tikv for MockTikv {
238240
mut req: tikv_client_proto::kvrpcpb::RawBatchDeleteRequest,
239241
sink: grpcio::UnarySink<tikv_client_proto::kvrpcpb::RawBatchDeleteResponse>,
240242
) {
241-
let keys = req.take_keys();
243+
let keys = req.take_keys().into();
242244
self.inner.raw_batch_delete(keys);
243245
let resp = RawBatchDeleteResponse::default();
244246
spawn_unary_success!(ctx, req, resp, sink);

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@
9090
//! # })}
9191
//! ```
9292
93+
// To support both prost & rust-protobuf.
94+
#![cfg_attr(feature = "prost-codec", allow(clippy::useless_conversion))]
95+
9396
#[macro_use]
9497
pub mod request;
9598
#[macro_use]

src/pd/client.rs

Lines changed: 20 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,8 @@ pub trait PdClient: Send + Sync + 'static {
150150
let region_end = region.end_key();
151151
let mut grouped = vec![];
152152
if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) {
153-
grouped.push(kvrpcpb::KeyRange {
154-
start_key: start_key.into(),
155-
end_key: region_end.clone().into(),
156-
});
157-
ranges.push(kvrpcpb::KeyRange {
158-
start_key: region_end.into(),
159-
end_key: end_key.into(),
160-
});
153+
grouped.push(make_key_range(start_key.into(), region_end.clone().into()));
154+
ranges.push(make_key_range(region_end.into(), end_key.into()));
161155
return Ok(Some((Some(ranges), (region, grouped))));
162156
}
163157
grouped.push(range);
@@ -170,14 +164,9 @@ pub trait PdClient: Send + Sync + 'static {
170164
break;
171165
}
172166
if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) {
173-
grouped.push(kvrpcpb::KeyRange {
174-
start_key: start_key.into(),
175-
end_key: region_end.clone().into(),
176-
});
177-
ranges.push(kvrpcpb::KeyRange {
178-
start_key: region_end.into(),
179-
end_key: end_key.into(),
180-
});
167+
grouped
168+
.push(make_key_range(start_key.into(), region_end.clone().into()));
169+
ranges.push(make_key_range(region_end.into(), end_key.into()));
181170
return Ok(Some((Some(ranges), (region, grouped))));
182171
}
183172
grouped.push(range);
@@ -348,6 +337,13 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
348337
}
349338
}
350339

340+
fn make_key_range(start_key: Vec<u8>, end_key: Vec<u8>) -> kvrpcpb::KeyRange {
341+
let mut key_range = kvrpcpb::KeyRange::default();
342+
key_range.set_start_key(start_key);
343+
key_range.set_end_key(end_key);
344+
key_range
345+
}
346+
351347
#[cfg(test)]
352348
pub mod test {
353349
use super::*;
@@ -430,18 +426,9 @@ pub mod test {
430426
let k3 = vec![11, 4];
431427
let k4 = vec![16, 4];
432428
let k_split = vec![10];
433-
let range1 = kvrpcpb::KeyRange {
434-
start_key: k1.clone(),
435-
end_key: k2.clone(),
436-
};
437-
let range2 = kvrpcpb::KeyRange {
438-
start_key: k1.clone(),
439-
end_key: k3.clone(),
440-
};
441-
let range3 = kvrpcpb::KeyRange {
442-
start_key: k2.clone(),
443-
end_key: k4.clone(),
444-
};
429+
let range1 = make_key_range(k1.clone(), k2.clone());
430+
let range2 = make_key_range(k1.clone(), k3.clone());
431+
let range3 = make_key_range(k2.clone(), k4.clone());
445432
let ranges = vec![range1, range2, range3];
446433

447434
let mut stream = executor::block_on_stream(client.group_ranges_by_region(ranges));
@@ -454,40 +441,16 @@ pub mod test {
454441
assert_eq!(
455442
ranges1.1,
456443
vec![
457-
kvrpcpb::KeyRange {
458-
start_key: k1.clone(),
459-
end_key: k2.clone()
460-
},
461-
kvrpcpb::KeyRange {
462-
start_key: k1,
463-
end_key: k_split.clone()
464-
}
444+
make_key_range(k1.clone(), k2.clone()),
445+
make_key_range(k1, k_split.clone()),
465446
]
466447
);
467448
assert_eq!(ranges2.0.id(), 2);
468-
assert_eq!(
469-
ranges2.1,
470-
vec![kvrpcpb::KeyRange {
471-
start_key: k_split.clone(),
472-
end_key: k3
473-
}]
474-
);
449+
assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3)]);
475450
assert_eq!(ranges3.0.id(), 1);
476-
assert_eq!(
477-
ranges3.1,
478-
vec![kvrpcpb::KeyRange {
479-
start_key: k2,
480-
end_key: k_split.clone()
481-
}]
482-
);
451+
assert_eq!(ranges3.1, vec![make_key_range(k2, k_split.clone())]);
483452
assert_eq!(ranges4.0.id(), 2);
484-
assert_eq!(
485-
ranges4.1,
486-
vec![kvrpcpb::KeyRange {
487-
start_key: k_split,
488-
end_key: k4
489-
}]
490-
);
453+
assert_eq!(ranges4.1, vec![make_key_range(k_split, k4)]);
491454
assert!(stream.next().is_none());
492455
}
493456
}

src/pd/retry.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,11 @@ impl fmt::Debug for RetryClient {
193193
}
194194

195195
fn region_from_response(
196-
resp: pdpb::GetRegionResponse,
196+
mut resp: pdpb::GetRegionResponse,
197197
err: impl FnOnce() -> Error,
198198
) -> Result<RegionWithLeader> {
199-
let region = resp.region.ok_or_else(err)?;
200-
Ok(RegionWithLeader::new(region, resp.leader))
199+
let region = resp.region.take().ok_or_else(err)?;
200+
Ok(RegionWithLeader::new(region, resp.leader.take()))
201201
}
202202

203203
// A node-like thing that can be connected to.

src/raw/requests.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub fn new_raw_batch_get_request(
6161
cf: Option<ColumnFamily>,
6262
) -> kvrpcpb::RawBatchGetRequest {
6363
let mut req = kvrpcpb::RawBatchGetRequest::default();
64-
req.set_keys(keys);
64+
req.set_keys(keys.into());
6565
req.maybe_set_cf(cf);
6666

6767
req
@@ -117,7 +117,7 @@ pub fn new_raw_batch_put_request(
117117
atomic: bool,
118118
) -> kvrpcpb::RawBatchPutRequest {
119119
let mut req = kvrpcpb::RawBatchPutRequest::default();
120-
req.set_pairs(pairs);
120+
req.set_pairs(pairs.into());
121121
req.maybe_set_cf(cf);
122122
req.set_for_cas(atomic);
123123

@@ -145,7 +145,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
145145

146146
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
147147
self.set_context(store.region_with_leader.context()?);
148-
self.set_pairs(shard);
148+
self.set_pairs(shard.into());
149149
Ok(())
150150
}
151151
}
@@ -180,7 +180,7 @@ pub fn new_raw_batch_delete_request(
180180
cf: Option<ColumnFamily>,
181181
) -> kvrpcpb::RawBatchDeleteRequest {
182182
let mut req = kvrpcpb::RawBatchDeleteRequest::default();
183-
req.set_keys(keys);
183+
req.set_keys(keys.into());
184184
req.maybe_set_cf(cf);
185185

186186
req
@@ -252,7 +252,7 @@ pub fn new_raw_batch_scan_request(
252252
cf: Option<ColumnFamily>,
253253
) -> kvrpcpb::RawBatchScanRequest {
254254
let mut req = kvrpcpb::RawBatchScanRequest::default();
255-
req.set_ranges(ranges);
255+
req.set_ranges(ranges.into());
256256
req.set_each_limit(each_limit);
257257
req.set_key_only(key_only);
258258
req.maybe_set_cf(cf);
@@ -271,12 +271,12 @@ impl Shardable for kvrpcpb::RawBatchScanRequest {
271271
&self,
272272
pd_client: &Arc<impl PdClient>,
273273
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
274-
store_stream_for_ranges(self.ranges.clone(), pd_client.clone())
274+
store_stream_for_ranges(self.ranges.clone().into(), pd_client.clone())
275275
}
276276

277277
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
278278
self.set_context(store.region_with_leader.context()?);
279-
self.set_ranges(shard);
279+
self.set_ranges(shard.into());
280280
Ok(())
281281
}
282282
}
@@ -346,7 +346,7 @@ pub fn new_raw_coprocessor_request(
346346
let mut inner = kvrpcpb::RawCoprocessorRequest::default();
347347
inner.set_copr_name(copr_name);
348348
inner.set_copr_version_req(copr_version_req);
349-
inner.set_ranges(ranges);
349+
inner.set_ranges(ranges.into());
350350
RawCoprocessorRequest {
351351
inner,
352352
data_builder,
@@ -389,12 +389,12 @@ impl Shardable for RawCoprocessorRequest {
389389
&self,
390390
pd_client: &Arc<impl PdClient>,
391391
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
392-
store_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
392+
store_stream_for_ranges(self.inner.ranges.clone().into(), pd_client.clone())
393393
}
394394

395395
fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
396396
self.inner.set_context(store.region_with_leader.context()?);
397-
self.inner.set_ranges(shard.clone());
397+
self.inner.set_ranges(shard.clone().into());
398398
self.inner.set_data((self.data_builder)(
399399
store.region_with_leader.region.clone(),
400400
shard,

src/request/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,9 @@ mod test {
181181
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
182182
|_: &dyn Any| {
183183
Ok(Box::new(kvrpcpb::CommitResponse {
184-
region_error: None,
185-
error: Some(kvrpcpb::KeyError::default()),
184+
error: Some(kvrpcpb::KeyError::default()).into(),
186185
commit_version: 0,
186+
..Default::default()
187187
}) as Box<dyn Any>)
188188
},
189189
)));

0 commit comments

Comments
 (0)