Skip to content

Commit c47b92b

Browse files
sunxiaoguangHoverbear
authored andcommitted
Change raw::Client::get to return Option<Value> (#24)
Signed-off-by: Xiaoguang Sun <[email protected]>
1 parent c33bc13 commit c47b92b

File tree

6 files changed

+73
-84
lines changed

6 files changed

+73
-84
lines changed

examples/raw.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ fn main() -> Result<()> {
5858
// It is best to pass a `Vec<u8>` in terms of explictness and speed. `String`s and a few other
5959
// types are supported as well, but it all ends up as `Vec<u8>` in the end.
6060
let key: String = String::from(KEY);
61-
let value: Value = client.get(key.clone()).wait()?;
61+
let value: Value = client.get(key.clone()).wait()?.expect("value must exist");
6262
assert_eq!(value.as_ref(), VALUE.as_bytes());
6363
println!("Get key \"{:?}\" returned value \"{:?}\".", value, KEY);
6464

src/raw.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl Client {
6666
/// # let connected_client = connecting_client.wait().unwrap();
6767
/// let key = "TiKV";
6868
/// let req = connected_client.get(key);
69-
/// let result: Value = req.wait().unwrap();
69+
/// let result: Option<Value> = req.wait().unwrap();
7070
/// ```
7171
pub fn get(&self, key: impl Into<Key>) -> Get {
7272
Get::new(self.rpc(), GetInner::new(key.into()))
@@ -388,7 +388,7 @@ impl Get {
388388
}
389389

390390
impl Future for Get {
391-
type Item = Value;
391+
type Item = Option<Value>;
392392
type Error = Error;
393393

394394
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
@@ -407,7 +407,7 @@ impl GetInner {
407407
}
408408

409409
impl RequestInner for GetInner {
410-
type Resp = Value;
410+
type Resp = Option<Value>;
411411

412412
fn execute(self, client: Arc<RpcClient>, cf: Option<ColumnFamily>) -> KvFuture<Self::Resp> {
413413
Box::new(client.raw_get(self.key, cf))

src/rpc/client.rs

Lines changed: 60 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,11 @@ impl RpcClient {
231231
key: &Key,
232232
cf: Option<ColumnFamily>,
233233
) -> impl Future<Item = RawContext, Error = Error> {
234-
Self::region_context(inner, key)
235-
.map(move |(region, client)| RawContext::new(region, client, cf))
234+
Self::region_context(inner, key).map(|(region, client)| RawContext::new(region, client, cf))
236235
}
237236

238237
fn txn(inner: Arc<RpcClientInner>, key: &Key) -> impl Future<Item = TxnContext, Error = Error> {
239-
Self::region_context(inner, key).map(move |(region, _client)| TxnContext::new(region))
238+
Self::region_context(inner, key).map(|(region, _client)| TxnContext::new(region))
240239
}
241240

242241
#[inline]
@@ -248,16 +247,10 @@ impl RpcClient {
248247
&self,
249248
key: Key,
250249
cf: Option<ColumnFamily>,
251-
) -> impl Future<Item = Value, Error = Error> {
250+
) -> impl Future<Item = Option<Value>, Error = Error> {
252251
Self::raw(self.inner(), &key, cf)
253-
.and_then(move |context| context.client().raw_get(context, key))
254-
.and_then(move |value| {
255-
if value.is_empty() {
256-
Err(Error::NoSuchKey)
257-
} else {
258-
Ok(value)
259-
}
260-
})
252+
.and_then(|context| context.client().raw_get(context, key))
253+
.map(|value| if value.is_empty() { None } else { Some(value) })
261254
}
262255

263256
pub fn raw_batch_get(
@@ -274,8 +267,8 @@ impl RpcClient {
274267
let inner = Arc::clone(&inner);
275268
let cf = cf.clone();
276269
let task = Self::region_context_by_id(inner, region.id)
277-
.map(move |(region, client)| RawContext::new(region, client, cf))
278-
.and_then(move |context| {
270+
.map(|(region, client)| RawContext::new(region, client, cf))
271+
.and_then(|context| {
279272
context.client().raw_batch_get(context, keys.into_iter())
280273
});
281274
tasks.push(task);
@@ -296,7 +289,7 @@ impl RpcClient {
296289
} else {
297290
Either::B(
298291
Self::raw(self.inner(), &key, cf)
299-
.and_then(move |context| context.client().raw_put(context, key, value)),
292+
.and_then(|context| context.client().raw_put(context, key, value)),
300293
)
301294
}
302295
}
@@ -319,10 +312,8 @@ impl RpcClient {
319312
let inner = Arc::clone(&inner);
320313
let cf = cf.clone();
321314
let task = Self::region_context_by_id(inner, region.id)
322-
.map(move |(region, client)| RawContext::new(region, client, cf))
323-
.and_then(move |context| {
324-
context.client().raw_batch_put(context, pairs)
325-
});
315+
.map(|(region, client)| RawContext::new(region, client, cf))
316+
.and_then(|context| context.client().raw_batch_put(context, pairs));
326317
tasks.push(task);
327318
}
328319
future::join_all(tasks)
@@ -338,7 +329,7 @@ impl RpcClient {
338329
cf: Option<ColumnFamily>,
339330
) -> impl Future<Item = (), Error = Error> {
340331
Self::raw(self.inner(), &key, cf)
341-
.and_then(move |context| context.client().raw_delete(context, key))
332+
.and_then(|context| context.client().raw_delete(context, key))
342333
}
343334

344335
pub fn raw_batch_delete(
@@ -355,8 +346,8 @@ impl RpcClient {
355346
let inner = Arc::clone(&inner);
356347
let cf = cf.clone();
357348
let task = Self::region_context_by_id(inner, region.id)
358-
.map(move |(region, client)| RawContext::new(region, client, cf))
359-
.and_then(move |context| context.client().raw_batch_delete(context, keys));
349+
.map(|(region, client)| RawContext::new(region, client, cf))
350+
.and_then(|context| context.client().raw_batch_delete(context, keys));
360351
tasks.push(task);
361352
}
362353
future::join_all(tasks)
@@ -386,35 +377,33 @@ impl RpcClient {
386377
);
387378
let inner = Arc::clone(&self.inner);
388379
loop_fn((inner, scan), |(inner, scan)| {
389-
inner
390-
.locate_key(scan.start_key())
391-
.and_then(move |location| {
392-
let region = location.into_inner();
393-
let cf = scan.cf.clone();
394-
Self::region_context_by_id(Arc::clone(&inner), region.id)
395-
.map(move |(region, client)| {
396-
(scan, region.range(), RawContext::new(region, client, cf))
397-
})
398-
.and_then(move |(mut scan, region_range, context)| {
399-
let (start_key, end_key) = scan.range();
400-
context
401-
.client()
402-
.raw_scan(context, start_key, end_key, scan.limit, scan.key_only)
403-
.map(move |pairs| (scan, region_range, pairs))
404-
})
405-
.map(move |(mut scan, region_range, mut pairs)| {
406-
let limit = scan.limit;
407-
scan.result_mut().append(&mut pairs);
408-
if scan.result().len() as u32 >= limit {
409-
Loop::Break(scan.into_inner())
410-
} else {
411-
match scan.next(region_range) {
412-
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
413-
ScanRegionsStatus::Break => Loop::Break(scan.into_inner()),
414-
}
380+
inner.locate_key(scan.start_key()).and_then(|location| {
381+
let region = location.into_inner();
382+
let cf = scan.cf.clone();
383+
Self::region_context_by_id(Arc::clone(&inner), region.id)
384+
.map(|(region, client)| {
385+
(scan, region.range(), RawContext::new(region, client, cf))
386+
})
387+
.and_then(|(mut scan, region_range, context)| {
388+
let (start_key, end_key) = scan.range();
389+
context
390+
.client()
391+
.raw_scan(context, start_key, end_key, scan.limit, scan.key_only)
392+
.map(|pairs| (scan, region_range, pairs))
393+
})
394+
.map(|(mut scan, region_range, mut pairs)| {
395+
let limit = scan.limit;
396+
scan.result_mut().append(&mut pairs);
397+
if scan.result().len() as u32 >= limit {
398+
Loop::Break(scan.into_inner())
399+
} else {
400+
match scan.next(region_range) {
401+
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
402+
ScanRegionsStatus::Break => Loop::Break(scan.into_inner()),
415403
}
416-
})
417-
})
404+
}
405+
})
406+
})
418407
})
419408
}
420409

@@ -438,31 +427,27 @@ impl RpcClient {
438427
let scan: ScanRegionsContext<(), Option<ColumnFamily>> = ScanRegionsContext::new(range, cf);
439428
let inner = Arc::clone(&self.inner);
440429
loop_fn((inner, scan), |(inner, scan)| {
441-
inner
442-
.locate_key(scan.start_key())
443-
.and_then(move |location| {
444-
let region = location.into_inner();
445-
let cf = scan.clone();
446-
Self::region_context_by_id(Arc::clone(&inner), region.id)
447-
.map(move |(region, client)| {
448-
(scan, region.range(), RawContext::new(region, client, cf))
449-
})
450-
.and_then(move |(mut scan, region_range, context)| {
451-
let (start_key, end_key) = scan.range();
452-
let start_key = start_key.expect("start key must be specified");
453-
let end_key = end_key.expect("end key must be specified");
454-
context
455-
.client()
456-
.raw_delete_range(context, start_key, end_key)
457-
.map(move |_| (scan, region_range))
458-
})
459-
.map(
460-
move |(mut scan, region_range)| match scan.next(region_range) {
461-
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
462-
ScanRegionsStatus::Break => Loop::Break(()),
463-
},
464-
)
465-
})
430+
inner.locate_key(scan.start_key()).and_then(|location| {
431+
let region = location.into_inner();
432+
let cf = scan.clone();
433+
Self::region_context_by_id(Arc::clone(&inner), region.id)
434+
.map(|(region, client)| {
435+
(scan, region.range(), RawContext::new(region, client, cf))
436+
})
437+
.and_then(|(mut scan, region_range, context)| {
438+
let (start_key, end_key) = scan.range();
439+
let start_key = start_key.expect("start key must be specified");
440+
let end_key = end_key.expect("end key must be specified");
441+
context
442+
.client()
443+
.raw_delete_range(context, start_key, end_key)
444+
.map(|_| (scan, region_range))
445+
})
446+
.map(|(mut scan, region_range)| match scan.next(region_range) {
447+
ScanRegionsStatus::Continue => Loop::Continue((inner, scan)),
448+
ScanRegionsStatus::Break => Loop::Break(()),
449+
})
450+
})
466451
})
467452
}
468453
}

src/rpc/pd/leader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ pub fn try_connect_leader(
401401
let mut resp = None;
402402
// Try to connect to other members, then the previous leader.
403403
'outer: for m in members
404-
.into_iter()
404+
.iter()
405405
.filter(|m| *m != previous_leader)
406406
.chain(&[previous_leader.clone()])
407407
{

src/rpc/security.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ mod tests {
115115
let example_cert = temp.path().join("cert");
116116
let example_pem = temp.path().join("key");
117117
for (id, f) in (&[&example_ca, &example_cert, &example_pem])
118-
.into_iter()
118+
.iter()
119119
.enumerate()
120120
{
121121
File::create(f).unwrap().write_all(&[id as u8]).unwrap();

tests/raw.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,18 @@ fn test_existence(client: &Client, existing_pairs: &[KvPair], not_existing_keys:
5959
for pair in existing_pairs.iter().map(Clone::clone) {
6060
let (key, value) = pair.into_inner();
6161
assert_eq!(
62-
client.get(key).wait().expect("Could not get value"),
62+
client
63+
.get(key)
64+
.wait()
65+
.expect("Could not get value")
66+
.expect("key doesn't exist"),
6367
value.clone(),
6468
);
6569
}
6670

6771
for key in not_existing_keys.clone().into_iter() {
68-
let r = client.get(key).wait();
69-
assert!(r.is_err());
72+
let r = client.get(key).wait().expect("Cound not get value");
73+
assert!(r.is_none());
7074
}
7175

7276
let mut existing_keys = Vec::with_capacity(existing_pairs.len());

0 commit comments

Comments
 (0)