Skip to content

Commit 2baddc6

Browse files
committed
Add request ctor functions taking high-level types
Signed-off-by: Nick Cameron <[email protected]>
1 parent 71dd80e commit 2baddc6

File tree

12 files changed

+388
-163
lines changed

12 files changed

+388
-163
lines changed

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,14 @@ pub use crate::backoff::Backoff;
103103
#[doc(inline)]
104104
pub use crate::kv::{BoundRange, IntoOwnedRange, Key, KvPair, Value};
105105
#[doc(inline)]
106-
pub use crate::raw::{Client as RawClient, ColumnFamily};
106+
pub use crate::raw::{lowering::*, Client as RawClient, ColumnFamily};
107107
#[doc(inline)]
108108
pub use crate::request::RetryOptions;
109109
#[doc(inline)]
110110
pub use crate::timestamp::{Timestamp, TimestampExt};
111111
#[doc(inline)]
112112
pub use crate::transaction::{
113-
CheckLevel, Client as TransactionClient, Snapshot, Transaction, TransactionOptions,
113+
lowering::*, CheckLevel, Client as TransactionClient, Snapshot, Transaction, TransactionOptions,
114114
};
115115
#[doc(inline)]
116116
pub use config::Config;

src/raw/client.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22

33
use tikv_client_common::Error;
44

5-
use super::requests;
65
use crate::{
76
config::Config,
87
pd::PdRpcClient,
8+
raw::lowering::*,
99
request::{KvRequest, RetryOptions},
1010
BoundRange, ColumnFamily, Key, KvPair, Result, Value,
1111
};
@@ -110,7 +110,7 @@ impl Client {
110110
/// # });
111111
/// ```
112112
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
113-
requests::new_raw_get_request(key, self.cf.clone())
113+
new_raw_get_request(key.into(), self.cf.clone())
114114
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
115115
.await
116116
}
@@ -137,7 +137,7 @@ impl Client {
137137
&self,
138138
keys: impl IntoIterator<Item = impl Into<Key>>,
139139
) -> Result<Vec<KvPair>> {
140-
requests::new_raw_batch_get_request(keys, self.cf.clone())
140+
new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone())
141141
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
142142
.await
143143
}
@@ -159,7 +159,7 @@ impl Client {
159159
/// # });
160160
/// ```
161161
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
162-
requests::new_raw_put_request(key, value, self.cf.clone())
162+
new_raw_put_request(key.into(), value.into(), self.cf.clone())
163163
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
164164
.await
165165
}
@@ -185,7 +185,7 @@ impl Client {
185185
&self,
186186
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
187187
) -> Result<()> {
188-
requests::new_raw_batch_put_request(pairs, self.cf.clone())
188+
new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone())
189189
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
190190
.await
191191
}
@@ -208,7 +208,7 @@ impl Client {
208208
/// # });
209209
/// ```
210210
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
211-
requests::new_raw_delete_request(key, self.cf.clone())
211+
new_raw_delete_request(key.into(), self.cf.clone())
212212
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
213213
.await
214214
}
@@ -231,7 +231,7 @@ impl Client {
231231
/// # });
232232
/// ```
233233
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
234-
requests::new_raw_batch_delete_request(keys, self.cf.clone())
234+
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone())
235235
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
236236
.await
237237
}
@@ -252,7 +252,7 @@ impl Client {
252252
/// # });
253253
/// ```
254254
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
255-
requests::new_raw_delete_range_request(range, self.cf.clone())
255+
new_raw_delete_range_request(range.into(), self.cf.clone())
256256
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
257257
.await
258258
}
@@ -277,7 +277,7 @@ impl Client {
277277
/// # });
278278
/// ```
279279
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
280-
self.scan_inner(range, limit, false).await
280+
self.scan_inner(range.into(), limit, false).await
281281
}
282282

283283
/// Create a new 'scan' request that only returns the keys.
@@ -388,7 +388,7 @@ impl Client {
388388
});
389389
}
390390

391-
let res = requests::new_raw_scan_request(range, limit, key_only, self.cf.clone())
391+
let res = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone())
392392
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
393393
.await;
394394
res.map(|mut s| {
@@ -410,8 +410,13 @@ impl Client {
410410
});
411411
}
412412

413-
requests::new_raw_batch_scan_request(ranges, each_limit, key_only, self.cf.clone())
414-
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
415-
.await
413+
new_raw_batch_scan_request(
414+
ranges.into_iter().map(Into::into),
415+
each_limit,
416+
key_only,
417+
self.cf.clone(),
418+
)
419+
.execute(self.rpc.clone(), RetryOptions::default_optimistic())
420+
.await
416421
}
417422
}

src/raw/lowering.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
2+
3+
/// This module provides constructor functions for requests which take arguments as high-level
4+
/// types (i.e., the types from the client crate) and converts these to the types used in the
5+
/// generated protobuf code, then calls the low-level ctor functions in the requests module.
6+
use crate::{raw::requests, BoundRange, ColumnFamily, Key, KvPair, Value};
7+
use std::iter::Iterator;
8+
use tikv_client_proto::kvrpcpb;
9+
10+
pub fn new_raw_get_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
11+
requests::new_raw_get_request(key.into(), cf)
12+
}
13+
14+
pub fn new_raw_batch_get_request(
15+
keys: impl Iterator<Item = Key>,
16+
cf: Option<ColumnFamily>,
17+
) -> kvrpcpb::RawBatchGetRequest {
18+
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
19+
}
20+
21+
pub fn new_raw_put_request(
22+
key: Key,
23+
value: Value,
24+
cf: Option<ColumnFamily>,
25+
) -> kvrpcpb::RawPutRequest {
26+
requests::new_raw_put_request(key.into(), value, cf)
27+
}
28+
29+
pub fn new_raw_batch_put_request(
30+
pairs: impl Iterator<Item = KvPair>,
31+
cf: Option<ColumnFamily>,
32+
) -> kvrpcpb::RawBatchPutRequest {
33+
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf)
34+
}
35+
36+
pub fn new_raw_delete_request(key: Key, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
37+
requests::new_raw_delete_request(key.into(), cf)
38+
}
39+
40+
pub fn new_raw_batch_delete_request(
41+
keys: impl Iterator<Item = Key>,
42+
cf: Option<ColumnFamily>,
43+
) -> kvrpcpb::RawBatchDeleteRequest {
44+
requests::new_raw_batch_delete_request(keys.map(Into::into).collect(), cf)
45+
}
46+
47+
pub fn new_raw_delete_range_request(
48+
range: BoundRange,
49+
cf: Option<ColumnFamily>,
50+
) -> kvrpcpb::RawDeleteRangeRequest {
51+
let (start_key, end_key) = range.into_keys();
52+
requests::new_raw_delete_range_request(start_key.into(), end_key.unwrap_or_default().into(), cf)
53+
}
54+
55+
pub fn new_raw_scan_request(
56+
range: BoundRange,
57+
limit: u32,
58+
key_only: bool,
59+
cf: Option<ColumnFamily>,
60+
) -> kvrpcpb::RawScanRequest {
61+
let (start_key, end_key) = range.into_keys();
62+
requests::new_raw_scan_request(
63+
start_key.into(),
64+
end_key.unwrap_or_default().into(),
65+
limit,
66+
key_only,
67+
cf,
68+
)
69+
}
70+
71+
pub fn new_raw_batch_scan_request(
72+
ranges: impl Iterator<Item = BoundRange>,
73+
each_limit: u32,
74+
key_only: bool,
75+
cf: Option<ColumnFamily>,
76+
) -> kvrpcpb::RawBatchScanRequest {
77+
requests::new_raw_batch_scan_request(ranges.map(Into::into).collect(), each_limit, key_only, cf)
78+
}

src/raw/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::Error;
1414
use std::{convert::TryFrom, fmt};
1515

1616
mod client;
17+
pub mod lowering;
1718
mod requests;
1819

1920
/// A [`ColumnFamily`](ColumnFamily) is an optional parameter for [`raw::Client`](Client) requests.

src/raw/requests.rs

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,9 @@ impl KvRequest for kvrpcpb::RawGetRequest {
5454
}
5555
}
5656

57-
pub fn new_raw_get_request(
58-
key: impl Into<Key>,
59-
cf: Option<ColumnFamily>,
60-
) -> kvrpcpb::RawGetRequest {
57+
pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
6158
let mut req = kvrpcpb::RawGetRequest::default();
62-
req.set_key(key.into().into());
59+
req.set_key(key);
6360
req.maybe_set_cf(cf);
6461

6562
req
@@ -98,11 +95,11 @@ impl KvRequest for kvrpcpb::RawBatchGetRequest {
9895
}
9996

10097
pub fn new_raw_batch_get_request(
101-
keys: impl IntoIterator<Item = impl Into<Key>>,
98+
keys: Vec<Vec<u8>>,
10299
cf: Option<ColumnFamily>,
103100
) -> kvrpcpb::RawBatchGetRequest {
104101
let mut req = kvrpcpb::RawBatchGetRequest::default();
105-
req.set_keys(keys.into_iter().map(Into::into).map(Into::into).collect());
102+
req.set_keys(keys);
106103
req.maybe_set_cf(cf);
107104

108105
req
@@ -144,13 +141,13 @@ impl KvRequest for kvrpcpb::RawPutRequest {
144141
}
145142

146143
pub fn new_raw_put_request(
147-
key: impl Into<Key>,
148-
value: impl Into<Value>,
144+
key: Vec<u8>,
145+
value: Vec<u8>,
149146
cf: Option<ColumnFamily>,
150147
) -> kvrpcpb::RawPutRequest {
151148
let mut req = kvrpcpb::RawPutRequest::default();
152-
req.set_key(key.into().into());
153-
req.set_value(value.into());
149+
req.set_key(key);
150+
req.set_value(value);
154151
req.maybe_set_cf(cf);
155152

156153
req
@@ -187,11 +184,11 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
187184
}
188185

189186
pub fn new_raw_batch_put_request(
190-
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
187+
pairs: Vec<kvrpcpb::KvPair>,
191188
cf: Option<ColumnFamily>,
192189
) -> kvrpcpb::RawBatchPutRequest {
193190
let mut req = kvrpcpb::RawBatchPutRequest::default();
194-
req.set_pairs(pairs.into_iter().map(Into::into).map(Into::into).collect());
191+
req.set_pairs(pairs);
195192
req.maybe_set_cf(cf);
196193

197194
req
@@ -229,12 +226,9 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
229226
}
230227
}
231228

232-
pub fn new_raw_delete_request(
233-
key: impl Into<Key>,
234-
cf: Option<ColumnFamily>,
235-
) -> kvrpcpb::RawDeleteRequest {
229+
pub fn new_raw_delete_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawDeleteRequest {
236230
let mut req = kvrpcpb::RawDeleteRequest::default();
237-
req.set_key(key.into().into());
231+
req.set_key(key);
238232
req.maybe_set_cf(cf);
239233

240234
req
@@ -271,11 +265,11 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
271265
}
272266

273267
pub fn new_raw_batch_delete_request(
274-
keys: impl IntoIterator<Item = impl Into<Key>>,
268+
keys: Vec<Vec<u8>>,
275269
cf: Option<ColumnFamily>,
276270
) -> kvrpcpb::RawBatchDeleteRequest {
277271
let mut req = kvrpcpb::RawBatchDeleteRequest::default();
278-
req.set_keys(keys.into_iter().map(Into::into).map(Into::into).collect());
272+
req.set_keys(keys);
279273
req.maybe_set_cf(cf);
280274

281275
req
@@ -316,13 +310,13 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
316310
}
317311

318312
pub fn new_raw_delete_range_request(
319-
range: impl Into<BoundRange>,
313+
start_key: Vec<u8>,
314+
end_key: Vec<u8>,
320315
cf: Option<ColumnFamily>,
321316
) -> kvrpcpb::RawDeleteRangeRequest {
322-
let (start_key, end_key) = range.into().into_keys();
323317
let mut req = kvrpcpb::RawDeleteRangeRequest::default();
324-
req.set_start_key(start_key.into());
325-
req.set_end_key(end_key.unwrap_or_default().into());
318+
req.set_start_key(start_key);
319+
req.set_end_key(end_key);
326320
req.maybe_set_cf(cf);
327321

328322
req
@@ -365,15 +359,15 @@ impl KvRequest for kvrpcpb::RawScanRequest {
365359
}
366360

367361
pub fn new_raw_scan_request(
368-
range: impl Into<BoundRange>,
362+
start_key: Vec<u8>,
363+
end_key: Vec<u8>,
369364
limit: u32,
370365
key_only: bool,
371366
cf: Option<ColumnFamily>,
372367
) -> kvrpcpb::RawScanRequest {
373-
let (start_key, end_key) = range.into().into_keys();
374368
let mut req = kvrpcpb::RawScanRequest::default();
375-
req.set_start_key(start_key.into());
376-
req.set_end_key(end_key.unwrap_or_default().into());
369+
req.set_start_key(start_key);
370+
req.set_end_key(end_key);
377371
req.set_limit(limit);
378372
req.set_key_only(key_only);
379373
req.maybe_set_cf(cf);
@@ -418,13 +412,13 @@ impl KvRequest for kvrpcpb::RawBatchScanRequest {
418412
}
419413

420414
pub fn new_raw_batch_scan_request(
421-
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
415+
ranges: Vec<kvrpcpb::KeyRange>,
422416
each_limit: u32,
423417
key_only: bool,
424418
cf: Option<ColumnFamily>,
425419
) -> kvrpcpb::RawBatchScanRequest {
426420
let mut req = kvrpcpb::RawBatchScanRequest::default();
427-
req.set_ranges(ranges.into_iter().map(Into::into).map(Into::into).collect());
421+
req.set_ranges(ranges);
428422
req.set_each_limit(each_limit);
429423
req.set_key_only(key_only);
430424
req.maybe_set_cf(cf);

src/transaction/buffer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl Buffer {
8181
f: F,
8282
) -> Result<impl Iterator<Item = KvPair>>
8383
where
84-
F: FnOnce(Vec<Key>) -> Fut,
84+
F: FnOnce(Box<dyn Iterator<Item = Key>>) -> Fut,
8585
Fut: Future<Output = Result<Vec<KvPair>>>,
8686
{
8787
let (cached_results, undetermined_keys) = {
@@ -106,11 +106,11 @@ impl Buffer {
106106
.into_iter()
107107
.filter_map(|(k, v)| v.unwrap().map(|v| KvPair(k, v)));
108108

109-
let undetermined_keys = undetermined_keys.into_iter().map(|(k, _)| k).collect();
109+
let undetermined_keys = undetermined_keys.into_iter().map(|(k, _)| k);
110110
(cached_results, undetermined_keys)
111111
};
112112

113-
let fetched_results = f(undetermined_keys).await?;
113+
let fetched_results = f(Box::new(undetermined_keys)).await?;
114114
let mut mutations = self.mutations.lock().await;
115115
for kvpair in &fetched_results {
116116
let key = kvpair.0.clone();

src/transaction/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ impl Client {
185185
let mut start_key = vec![];
186186
loop {
187187
let req = new_scan_lock_request(
188-
mem::take(&mut start_key).into(),
189-
safepoint.clone(),
188+
mem::take(&mut start_key),
189+
safepoint.version(),
190190
SCAN_LOCK_BATCH_SIZE,
191191
);
192192
let res: Vec<kvrpcpb::LockInfo> = req

0 commit comments

Comments
 (0)