Skip to content

Commit 8cb8dd6

Browse files
committed
Refactor the command-abstraction into a composable plan abstraction
Signed-off-by: Nick Cameron <[email protected]>
1 parent 2baddc6 commit 8cb8dd6

26 files changed

+1630
-1302
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ os:
66
# - windows # TODO: https://github.com/pingcap/kvproto/issues/355
77
- osx
88
rust:
9-
- stable
9+
# - stable
1010
- nightly
1111

1212
env:

examples/pessimistic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
mod common;
44

55
use crate::common::parse_args;
6-
use tikv_client::{Config, Key, TransactionClient as Client, Value};
6+
use tikv_client::{Config, Key, TransactionClient as Client, TransactionOptions, Value};
77

88
#[tokio::main]
99
async fn main() {
@@ -50,7 +50,7 @@ async fn main() {
5050
{
5151
// another txn cannot write to the locked key
5252
let mut txn2 = client
53-
.begin_optimistic()
53+
.begin_with_options(TransactionOptions::new_optimistic().no_resolve_locks())
5454
.await
5555
.expect("Could not begin a transaction");
5656
let value2: Value = b"value2".to_vec();

src/backoff.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
use rand::{thread_rng, Rng};
66
use std::time::Duration;
77

8+
pub const DEFAULT_REGION_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
9+
pub const OPTIMISTIC_BACKOFF: Backoff = Backoff::no_jitter_backoff(2, 500, 10);
10+
pub const PESSIMISTIC_BACKOFF: Backoff = Backoff::no_backoff();
11+
812
/// When a request is retried, we can backoff for some time to avoid saturating the network.
913
///
1014
/// `Backoff` is an object which determines how long to wait for.

src/kv/bound_range.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ impl BoundRange {
7575
BoundRange { from, to }
7676
}
7777

78+
/// Create a new BoundRange bounded below by `from` and unbounded above.
79+
pub fn range_from(from: Key) -> BoundRange {
80+
BoundRange {
81+
from: Bound::Included(from),
82+
to: Bound::Unbounded,
83+
}
84+
}
85+
7886
/// Ranges used in scanning TiKV have a particularity to them.
7987
///
8088
/// The **start** of a scan is inclusive, unless appended with an '\0', then it is exclusive.

src/kv/key.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,11 @@ impl<'a> From<&'a Key> for &'a [u8] {
152152
}
153153
}
154154

155+
impl<'a> From<&'a Vec<u8>> for &'a Key {
156+
fn from(key: &'a Vec<u8>) -> Self {
157+
unsafe { &*(key as *const Vec<u8> as *const Key) }
158+
}
159+
}
155160
impl AsRef<Key> for Key {
156161
fn as_ref(&self) -> &Key {
157162
self

src/kv/kvpair.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ impl From<KvPair> for (Key, Value) {
9696
}
9797
}
9898

99+
impl From<KvPair> for Key {
100+
fn from(pair: KvPair) -> Self {
101+
pair.0
102+
}
103+
}
104+
99105
impl From<kvrpcpb::KvPair> for KvPair {
100106
fn from(mut pair: kvrpcpb::KvPair) -> Self {
101107
KvPair(Key::from(pair.take_key()), pair.take_value())

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#![allow(clippy::type_complexity)]
77
#![allow(incomplete_features)]
88
#![cfg_attr(test, feature(specialization))]
9+
#![feature(associated_type_bounds)]
10+
#![feature(decl_macro)]
911

1012
//! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
1113
//! distributed transactional Key-Value database written in Rust.
@@ -89,6 +91,7 @@ mod request;
8991
mod stats;
9092
mod store;
9193
mod timestamp;
94+
mod util;
9295

9396
#[cfg(test)]
9497
mod mock;

src/mock.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl PdClient for MockPdClient {
124124
type KvClient = MockKvClient;
125125

126126
async fn map_region_to_store(self: Arc<Self>, region: Region) -> Result<Store> {
127-
Ok(Store::new(region, Box::new(self.client.clone())))
127+
Ok(Store::new(region, Arc::new(self.client.clone())))
128128
}
129129

130130
async fn region_for_key(&self, key: &Key) -> Result<Region> {

src/pd/client.rs

Lines changed: 80 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::{
1313
thread,
1414
};
1515
use tikv_client_pd::Cluster;
16+
use tikv_client_proto::kvrpcpb;
1617
use tikv_client_store::{KvClient, KvConnect, TikvConnect};
1718

1819
const CQ_COUNT: usize = 1;
@@ -53,8 +54,8 @@ pub trait PdClient: Send + Sync + 'static {
5354
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;
5455

5556
/// In transactional API, `key` is in raw format
56-
async fn store_for_key(self: Arc<Self>, key: Key) -> Result<Store> {
57-
let region = self.region_for_key(&key).await?;
57+
async fn store_for_key(self: Arc<Self>, key: &Key) -> Result<Store> {
58+
let region = self.region_for_key(key).await?;
5859
self.map_region_to_store(region).await
5960
}
6061

@@ -63,23 +64,27 @@ pub trait PdClient: Send + Sync + 'static {
6364
self.map_region_to_store(region).await
6465
}
6566

66-
fn group_keys_by_region<K: AsRef<Key> + Send + Sync + 'static>(
67+
fn group_keys_by_region<K, K2>(
6768
self: Arc<Self>,
6869
keys: impl Iterator<Item = K> + Send + Sync + 'static,
69-
) -> BoxStream<'static, Result<(RegionId, Vec<K>)>> {
70+
) -> BoxStream<'static, Result<(RegionId, Vec<K2>)>>
71+
where
72+
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
73+
K2: Send + Sync + 'static,
74+
{
7075
let keys = keys.peekable();
7176
stream_fn(keys, move |mut keys| {
7277
let this = self.clone();
7378
async move {
7479
if let Some(key) = keys.next() {
7580
let region = this.region_for_key(key.as_ref()).await?;
7681
let id = region.id();
77-
let mut grouped = vec![key];
82+
let mut grouped = vec![key.into()];
7883
while let Some(key) = keys.peek() {
7984
if !region.contains(key.as_ref()) {
8085
break;
8186
}
82-
grouped.push(keys.next().unwrap());
87+
grouped.push(keys.next().unwrap().into());
8388
}
8489
Ok(Some((keys, (id, grouped))))
8590
} else {
@@ -121,8 +126,8 @@ pub trait PdClient: Send + Sync + 'static {
121126
/// Returns a Stream which iterates over the contexts for ranges in the same region.
122127
fn group_ranges_by_region(
123128
self: Arc<Self>,
124-
mut ranges: Vec<BoundRange>,
125-
) -> BoxStream<'static, Result<(RegionId, Vec<BoundRange>)>> {
129+
mut ranges: Vec<kvrpcpb::KeyRange>,
130+
) -> BoxStream<'static, Result<(RegionId, Vec<kvrpcpb::KeyRange>)>> {
126131
ranges.reverse();
127132
stream_fn(Some(ranges), move |ranges| {
128133
let this = self.clone();
@@ -133,38 +138,42 @@ pub trait PdClient: Send + Sync + 'static {
133138
};
134139

135140
if let Some(range) = ranges.pop() {
136-
let (start_key, end_key) = range.clone().into_keys();
141+
let start_key: Key = range.start_key.clone().into();
142+
let end_key: Key = range.end_key.clone().into();
137143
let region = this.region_for_key(&start_key).await?;
138144
let id = region.id();
139145
let region_start = region.start_key();
140146
let region_end = region.end_key();
141147
let mut grouped = vec![];
142-
if !region_end.is_empty()
143-
&& end_key
144-
.clone()
145-
.map(|x| x > region_end || x.is_empty())
146-
.unwrap_or(true)
147-
{
148-
grouped.push((start_key, region_end.clone()).into());
149-
ranges.push((region_end, end_key).into());
148+
if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) {
149+
grouped.push(kvrpcpb::KeyRange {
150+
start_key: start_key.into(),
151+
end_key: region_end.clone().into(),
152+
});
153+
ranges.push(kvrpcpb::KeyRange {
154+
start_key: region_end.into(),
155+
end_key: end_key.into(),
156+
});
150157
return Ok(Some((Some(ranges), (id, grouped))));
151158
}
152159
grouped.push(range);
153160

154161
while let Some(range) = ranges.pop() {
155-
let (start_key, end_key) = range.clone().into_keys();
162+
let start_key: Key = range.start_key.clone().into();
163+
let end_key: Key = range.end_key.clone().into();
156164
if start_key < region_start {
157165
ranges.push(range);
158166
break;
159167
}
160-
if !region_end.is_empty()
161-
&& end_key
162-
.clone()
163-
.map(|x| x > region_end || x.is_empty())
164-
.unwrap_or(true)
165-
{
166-
grouped.push((start_key, region_end.clone()).into());
167-
ranges.push((region_end, end_key).into());
168+
if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) {
169+
grouped.push(kvrpcpb::KeyRange {
170+
start_key: start_key.into(),
171+
end_key: region_end.clone().into(),
172+
});
173+
ranges.push(kvrpcpb::KeyRange {
174+
start_key: region_end.into(),
175+
end_key: end_key.into(),
176+
});
168177
return Ok(Some((Some(ranges), (id, grouped))));
169178
}
170179
grouped.push(range);
@@ -204,7 +213,7 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
204213
let store_id = region.get_store_id()?;
205214
let store = self.pd.clone().get_store(store_id).await?;
206215
let kv_client = self.kv_client(store.get_address())?;
207-
Ok(Store::new(region, Box::new(kv_client)))
216+
Ok(Store::new(region, Arc::new(kv_client)))
208217
}
209218

210219
async fn region_for_key(&self, key: &Key) -> Result<Region> {
@@ -351,8 +360,9 @@ pub mod test {
351360
let stream = Arc::new(client).group_keys_by_region(tasks.into_iter());
352361
let mut stream = executor::block_on_stream(stream);
353362

363+
let result: Vec<Key> = stream.next().unwrap().unwrap().1;
354364
assert_eq!(
355-
stream.next().unwrap().unwrap().1,
365+
result,
356366
vec![
357367
vec![1].into(),
358368
vec![2].into(),
@@ -388,15 +398,24 @@ pub mod test {
388398
#[test]
389399
fn test_group_ranges_by_region() {
390400
let client = Arc::new(MockPdClient::default());
391-
let k1: Key = vec![1].into();
392-
let k2: Key = vec![5, 2].into();
393-
let k3: Key = vec![11, 4].into();
394-
let k4: Key = vec![16, 4].into();
395-
let k_split: Key = vec![10].into();
396-
let range1 = (k1.clone(), k2.clone()).into();
397-
let range2 = (k1.clone(), k3.clone()).into();
398-
let range3 = (k2.clone(), k4.clone()).into();
399-
let ranges: Vec<BoundRange> = vec![range1, range2, range3];
401+
let k1 = vec![1];
402+
let k2 = vec![5, 2];
403+
let k3 = vec![11, 4];
404+
let k4 = vec![16, 4];
405+
let k_split = vec![10];
406+
let range1 = kvrpcpb::KeyRange {
407+
start_key: k1.clone(),
408+
end_key: k2.clone(),
409+
};
410+
let range2 = kvrpcpb::KeyRange {
411+
start_key: k1.clone(),
412+
end_key: k3.clone(),
413+
};
414+
let range3 = kvrpcpb::KeyRange {
415+
start_key: k2.clone(),
416+
end_key: k4.clone(),
417+
};
418+
let ranges = vec![range1, range2, range3];
400419

401420
let mut stream = executor::block_on_stream(client.group_ranges_by_region(ranges));
402421
let ranges1 = stream.next().unwrap().unwrap();
@@ -408,22 +427,40 @@ pub mod test {
408427
assert_eq!(
409428
ranges1.1,
410429
vec![
411-
(k1.clone(), k2.clone()).into(),
412-
(k1, k_split.clone()).into()
413-
] as Vec<BoundRange>
430+
kvrpcpb::KeyRange {
431+
start_key: k1.clone(),
432+
end_key: k2.clone()
433+
},
434+
kvrpcpb::KeyRange {
435+
start_key: k1,
436+
end_key: k_split.clone()
437+
}
438+
]
414439
);
415440
assert_eq!(ranges2.0, 2);
416441
assert_eq!(
417442
ranges2.1,
418-
vec![(k_split.clone(), k3).into()] as Vec<BoundRange>
443+
vec![kvrpcpb::KeyRange {
444+
start_key: k_split.clone(),
445+
end_key: k3
446+
}]
419447
);
420448
assert_eq!(ranges3.0, 1);
421449
assert_eq!(
422450
ranges3.1,
423-
vec![(k2, k_split.clone()).into()] as Vec<BoundRange>
451+
vec![kvrpcpb::KeyRange {
452+
start_key: k2,
453+
end_key: k_split.clone()
454+
}]
424455
);
425456
assert_eq!(ranges4.0, 2);
426-
assert_eq!(ranges4.1, vec![(k_split, k4).into()] as Vec<BoundRange>);
457+
assert_eq!(
458+
ranges4.1,
459+
vec![kvrpcpb::KeyRange {
460+
start_key: k_split,
461+
end_key: k4
462+
}]
463+
);
427464
assert!(stream.next().is_none());
428465
}
429466
}

0 commit comments

Comments
 (0)