Skip to content

Commit 159de5c

Browse files
committed
Merge branch 'master' of github.com:tikv/client-rust into api-v2
Signed-off-by: iosmanthus <[email protected]>
2 parents 051a157 + 3f8ea11 commit 159de5c

File tree

6 files changed

+114
-17
lines changed

6 files changed

+114
-17
lines changed

src/transaction/buffer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ impl Buffer {
111111
range: BoundRange,
112112
limit: u32,
113113
update_cache: bool,
114+
reverse: bool,
114115
f: F,
115116
) -> Result<impl Iterator<Item = KvPair>>
116117
where
@@ -158,7 +159,13 @@ impl Buffer {
158159
.into_iter()
159160
.map(|(k, v)| KvPair::new(k, v))
160161
.collect::<Vec<_>>();
161-
res.sort_by_cached_key(|x| x.key().clone());
162+
163+
// TODO: use `BTreeMap` instead of `HashMap` to avoid sorting.
164+
if reverse {
165+
res.sort_unstable_by(|a, b| b.key().cmp(a.key()));
166+
} else {
167+
res.sort_unstable_by(|a, b| a.key().cmp(b.key()));
168+
}
162169

163170
Ok(res.into_iter().take(limit as usize))
164171
}

src/transaction/lowering.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub fn new_scan_request(
2323
timestamp: Timestamp,
2424
limit: u32,
2525
key_only: bool,
26+
reverse: bool,
2627
) -> kvrpcpb::ScanRequest {
2728
let (start_key, end_key) = range.into_keys();
2829
requests::new_scan_request(
@@ -31,6 +32,7 @@ pub fn new_scan_request(
3132
timestamp.version(),
3233
limit,
3334
key_only,
35+
reverse,
3436
)
3537
}
3638

src/transaction/requests.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,20 @@ pub fn new_scan_request(
124124
timestamp: u64,
125125
limit: u32,
126126
key_only: bool,
127+
reverse: bool,
127128
) -> kvrpcpb::ScanRequest {
128129
let mut req = kvrpcpb::ScanRequest::default();
129-
req.set_start_key(start_key);
130-
req.set_end_key(end_key);
130+
if !reverse {
131+
req.set_start_key(start_key);
132+
req.set_end_key(end_key);
133+
} else {
134+
req.set_start_key(end_key);
135+
req.set_end_key(start_key);
136+
}
131137
req.set_limit(limit);
132138
req.set_key_only(key_only);
133139
req.set_version(timestamp);
140+
req.set_reverse(reverse);
134141
req
135142
}
136143

src/transaction/snapshot.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
use crate::{pd::PdClient, BoundRange, Key, KvPair, Result, Transaction, Value};
44
use derive_new::new;
5-
use futures::stream::BoxStream;
65
use slog::Logger;
7-
use std::ops::RangeBounds;
86

97
/// A read-only transaction which reads at the given timestamp.
108
///
@@ -61,10 +59,26 @@ impl<PdC: PdClient> Snapshot<PdC> {
6159
self.transaction.scan_keys(range, limit).await
6260
}
6361

64-
/// Unimplemented. Similar to scan, but in the reverse direction.
65-
#[allow(dead_code)]
66-
fn scan_reverse(&mut self, range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
62+
/// Similar to scan, but in the reverse direction.
63+
pub async fn scan_reverse(
64+
&mut self,
65+
range: impl Into<BoundRange>,
66+
limit: u32,
67+
) -> Result<impl Iterator<Item = KvPair>> {
6768
debug!(self.logger, "invoking scan_reverse request on snapshot");
68-
self.transaction.scan_reverse(range)
69+
self.transaction.scan_reverse(range, limit).await
70+
}
71+
72+
/// Similar to scan_keys, but in the reverse direction.
73+
pub async fn scan_keys_reverse(
74+
&mut self,
75+
range: impl Into<BoundRange>,
76+
limit: u32,
77+
) -> Result<impl Iterator<Item = Key>> {
78+
debug!(
79+
self.logger,
80+
"invoking scan_keys_reverse request on snapshot"
81+
);
82+
self.transaction.scan_keys_reverse(range, limit).await
6983
}
7084
}

src/transaction/transaction.rs

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::{iter, ops::RangeBounds, sync::Arc, time::Instant};
3+
use std::{iter, sync::Arc, time::Instant};
44

55
use derive_new::new;
66
use fail::fail_point;
7-
use futures::{prelude::*, stream::BoxStream};
7+
use futures::prelude::*;
88
use slog::Logger;
99
use tokio::{sync::RwLock, time::Duration};
1010

@@ -354,7 +354,7 @@ impl<PdC: PdClient> Transaction<PdC> {
354354
limit: u32,
355355
) -> Result<impl Iterator<Item = KvPair>> {
356356
debug!(self.logger, "invoking transactional scan request");
357-
self.scan_inner(range, limit, false).await
357+
self.scan_inner(range, limit, false, false).await
358358
}
359359

360360
/// Create a new 'scan' request that only returns the keys.
@@ -392,17 +392,39 @@ impl<PdC: PdClient> Transaction<PdC> {
392392
) -> Result<impl Iterator<Item = Key>> {
393393
debug!(self.logger, "invoking transactional scan_keys request");
394394
Ok(self
395-
.scan_inner(range, limit, true)
395+
.scan_inner(range, limit, true, false)
396396
.await?
397397
.map(KvPair::into_key))
398398
}
399399

400400
/// Create a 'scan_reverse' request.
401401
///
402402
/// Similar to [`scan`](Transaction::scan), but scans in the reverse direction.
403-
pub(crate) fn scan_reverse(&self, _range: impl RangeBounds<Key>) -> BoxStream<Result<KvPair>> {
403+
pub async fn scan_reverse(
404+
&mut self,
405+
range: impl Into<BoundRange>,
406+
limit: u32,
407+
) -> Result<impl Iterator<Item = KvPair>> {
404408
debug!(self.logger, "invoking transactional scan_reverse request");
405-
unimplemented!()
409+
self.scan_inner(range, limit, false, true).await
410+
}
411+
412+
/// Create a 'scan_keys_reverse' request.
413+
///
414+
/// Similar to [`scan`](Transaction::scan_keys), but scans in the reverse direction.
415+
pub async fn scan_keys_reverse(
416+
&mut self,
417+
range: impl Into<BoundRange>,
418+
limit: u32,
419+
) -> Result<impl Iterator<Item = Key>> {
420+
debug!(
421+
self.logger,
422+
"invoking transactional scan_keys_reverse request"
423+
);
424+
Ok(self
425+
.scan_inner(range, limit, true, true)
426+
.await?
427+
.map(KvPair::into_key))
406428
}
407429

408430
/// Sets the value associated with the given key.
@@ -697,6 +719,7 @@ impl<PdC: PdClient> Transaction<PdC> {
697719
range: impl Into<BoundRange>,
698720
limit: u32,
699721
key_only: bool,
722+
reverse: bool,
700723
) -> Result<impl Iterator<Item = KvPair>> {
701724
self.check_allow_operation().await?;
702725
let timestamp = self.timestamp.clone();
@@ -708,8 +731,10 @@ impl<PdC: PdClient> Transaction<PdC> {
708731
range.into(),
709732
limit,
710733
!key_only,
734+
reverse,
711735
move |new_range, new_limit| async move {
712-
let request = new_scan_request(new_range, timestamp, new_limit, key_only);
736+
let request =
737+
new_scan_request(new_range, timestamp, new_limit, key_only, reverse);
713738
let plan = PlanBuilder::new(rpc, request)
714739
.resolve_lock(retry_options.lock_backoff)
715740
.retry_multi_region(retry_options.region_backoff)

tests/integration_tests.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use tikv_client::{
2525
raw,
2626
request::codec::RawCodec,
2727
transaction::{self, HeartbeatOption},
28-
Error, Key, KvPair, PdClient, RawClient, Result, Transaction, TransactionClient,
28+
BoundRange, Error, Key, KvPair, PdClient, RawClient, Result, Transaction, TransactionClient,
2929
TransactionOptions, Value,
3030
};
3131

@@ -920,6 +920,48 @@ async fn txn_scan() -> Result<()> {
920920
Ok(())
921921
}
922922

923+
#[tokio::test]
924+
#[serial]
925+
async fn txn_scan_reverse() -> Result<()> {
926+
init().await?;
927+
let client = TransactionClient::new_with_config(
928+
pd_addrs(),
929+
Default::default(),
930+
transaction::ApiV1::default(),
931+
None,
932+
)
933+
.await?;
934+
935+
let k1 = b"a1".to_vec();
936+
let k2 = b"a2".to_vec();
937+
let v1 = b"b1".to_vec();
938+
let v2 = b"b2".to_vec();
939+
940+
let reverse_resp = vec![
941+
(Key::from(k2.clone()), v2.clone()),
942+
(Key::from(k1.clone()), v1.clone()),
943+
];
944+
945+
// pessimistic
946+
let option = TransactionOptions::new_pessimistic().drop_check(tikv_client::CheckLevel::Warn);
947+
let mut t = client.begin_with_options(option.clone()).await?;
948+
t.put(k1.clone(), v1).await?;
949+
t.put(k2.clone(), v2).await?;
950+
t.commit().await?;
951+
952+
let mut t2 = client.begin_with_options(option).await?;
953+
let bound_range: BoundRange = (k1..=k2).into();
954+
let resp = t2
955+
.scan_reverse(bound_range, 2)
956+
.await?
957+
.map(|kv| (kv.0, kv.1))
958+
.collect::<Vec<(Key, Vec<u8>)>>();
959+
assert_eq!(resp, reverse_resp);
960+
t2.commit().await?;
961+
962+
Ok(())
963+
}
964+
923965
// helper function
924966
async fn get_u32<C: RawCodec>(client: &RawClient<C>, key: Vec<u8>) -> Result<u32> {
925967
let x = client.get(key).await?.unwrap();

0 commit comments

Comments
 (0)