Skip to content

Commit a5f326f

Browse files
committed
impl KvRequest for txn requests
Signed-off-by: iosmanthus <[email protected]>
1 parent 71af50d commit a5f326f

File tree

5 files changed

+315
-140
lines changed

5 files changed

+315
-140
lines changed

src/raw/requests.rs

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -122,16 +122,10 @@ pub fn new_raw_batch_put_request(
122122
impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawBatchPutRequest {
123123
type Response = kvrpcpb::RawBatchPutResponse;
124124

125-
fn encode_request(&self, codec: &C) -> Cow<Self> {
126-
if codec.is_plain() {
127-
return Cow::Borrowed(self);
128-
}
129-
130-
let mut req = self.clone();
131-
132-
*req.mut_pairs() = codec.encode_pairs(req.take_pairs());
125+
fn encode_request(mut self, codec: &C) -> Self {
126+
*self.mut_pairs() = codec.encode_pairs(self.take_pairs());
133127

134-
Cow::Owned(req)
128+
self
135129
}
136130
}
137131

@@ -193,16 +187,10 @@ pub fn new_raw_batch_delete_request(
193187
impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawBatchDeleteRequest {
194188
type Response = kvrpcpb::RawBatchDeleteResponse;
195189

196-
fn encode_request(&self, codec: &C) -> Cow<Self> {
197-
if codec.is_plain() {
198-
return Cow::Borrowed(self);
199-
}
200-
201-
let mut req = self.clone();
202-
203-
*req.mut_keys() = codec.encode_keys(req.take_keys());
190+
fn encode_request(mut self, codec: &C) -> Self {
191+
*self.mut_keys() = codec.encode_keys(self.take_keys());
204192

205-
Cow::Owned(req)
193+
self
206194
}
207195
}
208196

@@ -224,18 +212,13 @@ pub fn new_raw_delete_range_request(
224212
impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawDeleteRangeRequest {
225213
type Response = kvrpcpb::RawDeleteRangeResponse;
226214

227-
fn encode_request(&self, codec: &C) -> Cow<Self> {
228-
if codec.is_plain() {
229-
return Cow::Borrowed(self);
230-
}
231-
232-
let mut req = self.clone();
215+
fn encode_request(mut self, codec: &C) -> Self {
216+
let (start, end) = (self.take_start_key(), self.take_end_key());
233217

234-
let (start, end) = (req.take_start_key(), req.take_end_key());
235-
*req.mut_start_key() = codec.encode_key(start);
236-
*req.mut_end_key() = codec.encode_key(end);
218+
self.set_start_key(codec.encode_key(start));
219+
self.set_end_key(codec.encode_key(end));
237220

238-
Cow::Owned(req)
221+
self
239222
}
240223
}
241224

@@ -290,16 +273,10 @@ pub fn new_raw_batch_scan_request(
290273
impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawBatchScanRequest {
291274
type Response = kvrpcpb::RawBatchScanResponse;
292275

293-
fn encode_request(&self, codec: &C) -> Cow<Self> {
294-
if codec.is_plain() {
295-
return Cow::Borrowed(self);
296-
}
297-
298-
let mut req = self.clone();
299-
300-
*req.mut_ranges() = codec.encode_ranges(req.take_ranges());
276+
fn encode_request(mut self, codec: &C) -> Self {
277+
*self.mut_ranges() = codec.encode_ranges(self.take_ranges());
301278

302-
Cow::Owned(req)
279+
self
303280
}
304281
}
305282

src/request/codec.rs

Lines changed: 36 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,59 @@ pub trait RequestCodec: Sized + Clone + Sync + Send + 'static {
2121
key
2222
}
2323

24+
fn encode_mutations(&self, mutations: Vec<kvrpcpb::Mutation>) -> Vec<kvrpcpb::Mutation> {
25+
mutations
26+
.into_iter()
27+
.map(|mut m| {
28+
let key = m.take_key();
29+
m.set_key(self.encode_key(key));
30+
m
31+
})
32+
.collect()
33+
}
34+
2435
fn encode_keys(&self, keys: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
25-
keys
36+
keys.into_iter().map(|key| self.encode_key(key)).collect()
2637
}
2738

28-
fn encode_pairs(&self, pairs: Vec<kvrpcpb::KvPair>) -> Vec<kvrpcpb::KvPair> {
39+
fn encode_pairs(&self, mut pairs: Vec<kvrpcpb::KvPair>) -> Vec<kvrpcpb::KvPair> {
40+
for pair in pairs.iter_mut() {
41+
*pair.mut_key() = self.encode_key(pair.take_key());
42+
}
43+
2944
pairs
3045
}
3146

3247
fn decode_key(&self, key: Vec<u8>) -> Result<Vec<u8>> {
3348
Ok(key)
3449
}
3550

36-
fn decode_pairs(&self, pairs: Vec<kvrpcpb::KvPair>) -> Result<Vec<kvrpcpb::KvPair>> {
51+
fn decode_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<Vec<u8>>> {
52+
Ok(keys
53+
.into_iter()
54+
.map(|key| self.decode_key(key))
55+
.collect::<Result<Vec<Vec<u8>>>>()?)
56+
}
57+
58+
fn decode_pairs(&self, mut pairs: Vec<kvrpcpb::KvPair>) -> Result<Vec<kvrpcpb::KvPair>> {
59+
for pair in pairs.iter_mut() {
60+
*pair.mut_key() = self.decode_key(pair.take_key())?;
61+
}
62+
3763
Ok(pairs)
3864
}
3965

4066
fn encode_range(&self, start: Vec<u8>, end: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
4167
(start, end)
4268
}
4369

44-
fn encode_ranges(&self, ranges: Vec<kvrpcpb::KeyRange>) -> Vec<kvrpcpb::KeyRange> {
70+
fn encode_ranges(&self, mut ranges: Vec<kvrpcpb::KeyRange>) -> Vec<kvrpcpb::KeyRange> {
71+
for range in ranges.iter_mut() {
72+
let (start, end) = self.encode_range(range.take_start_key(), range.take_end_key());
73+
*range.mut_start_key() = start;
74+
*range.mut_end_key() = end;
75+
}
76+
4577
ranges
4678
}
4779

@@ -143,18 +175,6 @@ impl RequestCodec for KeySpaceCodec {
143175
encoded
144176
}
145177

146-
fn encode_keys(&self, mut keys: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
147-
keys.into_iter().map(|key| self.encode_key(key)).collect()
148-
}
149-
150-
fn encode_pairs(&self, mut pairs: Vec<kvrpcpb::KvPair>) -> Vec<kvrpcpb::KvPair> {
151-
for pair in pairs.iter_mut() {
152-
*pair.mut_key() = self.encode_key(pair.take_key());
153-
}
154-
155-
pairs
156-
}
157-
158178
fn decode_key(&self, mut key: Vec<u8>) -> Result<Vec<u8>> {
159179
let prefix: Prefix = self.clone().into();
160180

@@ -169,14 +189,6 @@ impl RequestCodec for KeySpaceCodec {
169189
Ok(key.split_off(KEYSPACE_PREFIX_LEN))
170190
}
171191

172-
fn decode_pairs(&self, mut pairs: Vec<kvrpcpb::KvPair>) -> Result<Vec<kvrpcpb::KvPair>> {
173-
for pair in pairs.iter_mut() {
174-
*pair.mut_key() = self.decode_key(pair.take_key())?;
175-
}
176-
177-
Ok(pairs)
178-
}
179-
180192
fn encode_range(&self, start: Vec<u8>, end: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
181193
if self.id == MAX_KEYSPACE_ID {
182194
(self.encode_key(start), self.mode.max_key())
@@ -185,16 +197,6 @@ impl RequestCodec for KeySpaceCodec {
185197
}
186198
}
187199

188-
fn encode_ranges(&self, mut ranges: Vec<kvrpcpb::KeyRange>) -> Vec<kvrpcpb::KeyRange> {
189-
for range in ranges.iter_mut() {
190-
let (start, end) = self.encode_range(range.take_start_key(), range.take_end_key());
191-
*range.mut_start_key() = start;
192-
*range.mut_end_key() = end;
193-
}
194-
195-
ranges
196-
}
197-
198200
fn encode_pd_query(&self, key: Vec<u8>) -> Vec<u8> {
199201
Key::from(self.encode_key(key)).to_encoded().into()
200202
}

src/request/mod.rs

Lines changed: 18 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@ pub mod codec;
3333
pub trait KvRequest<C>: Request + Sized + Clone + Sync + Send + 'static {
3434
/// The expected response to the request.
3535
type Response: HasKeyErrors + HasLocks + HasRegionError + Clone + Send + 'static;
36-
fn encode_request(&self, _codec: &C) -> Cow<Self> {
37-
Cow::Borrowed(self)
36+
37+
fn encode_request(self, _codec: &C) -> Self {
38+
self.clone()
3839
}
40+
3941
fn decode_response(&self, _codec: &C, resp: Self::Response) -> crate::Result<Self::Response> {
4042
Ok(resp)
4143
}
@@ -50,15 +52,10 @@ macro_rules! impl_kv_request_for_single_key_op {
5052
{
5153
type Response = $resp;
5254

53-
fn encode_request(&self, codec: &C) -> Cow<Self> {
54-
if codec.is_plain() {
55-
return Cow::Borrowed(self);
56-
}
57-
let mut req = self.clone();
58-
59-
*req.mut_key() = codec.encode_key(req.take_key());
55+
fn encode_request(mut self, codec: &C) -> Self {
56+
*self.mut_key() = codec.encode_key(self.take_key());
6057

61-
Cow::Owned(req)
58+
self
6259
}
6360
}
6461
};
@@ -73,26 +70,17 @@ macro_rules! impl_kv_request_for_batch_get {
7370
{
7471
type Response = $resp;
7572

76-
fn encode_request(&self, codec: &C) -> Cow<Self> {
77-
if codec.is_plain() {
78-
return Cow::Borrowed(self);
79-
}
80-
81-
let mut req = self.clone();
82-
*req.mut_keys() = codec.encode_keys(req.take_keys());
73+
fn encode_request(mut self, codec: &C) -> Self {
74+
*self.mut_keys() = codec.encode_keys(self.take_keys());
8375

84-
Cow::Owned(req)
76+
self
8577
}
8678

8779
fn decode_response(
8880
&self,
8981
codec: &C,
9082
mut resp: Self::Response,
9183
) -> crate::Result<Self::Response> {
92-
if codec.is_plain() {
93-
return Ok(resp);
94-
}
95-
9684
*resp.mut_pairs() = codec.decode_pairs(resp.take_pairs())?;
9785

9886
Ok(resp)
@@ -110,28 +98,21 @@ macro_rules! impl_kv_request_for_scan_op {
11098
{
11199
type Response = $resp;
112100

113-
fn encode_request(&self, codec: &C) -> Cow<Self> {
114-
if codec.is_plain() {
115-
return Cow::Borrowed(self);
116-
}
117-
let mut req = self.clone();
101+
fn encode_request(mut self, codec: &C) -> Self {
118102
let (start, end) =
119-
codec.encode_range(req.take_start_key().into(), req.take_end_key().into());
103+
codec.encode_range(self.take_start_key().into(), self.take_end_key().into());
104+
105+
self.set_start_key(start);
106+
self.set_end_key(end);
120107

121-
*req.mut_start_key() = start.into();
122-
*req.mut_end_key() = end.into();
123-
Cow::Owned(req)
108+
self
124109
}
125110

126111
fn decode_response(
127112
&self,
128113
codec: &C,
129114
mut resp: Self::Response,
130115
) -> crate::Result<Self::Response> {
131-
if codec.is_plain() {
132-
return Ok(resp);
133-
}
134-
135116
paste::paste! {
136117
let pairs = resp.[<take_ $pairs>]();
137118

@@ -245,8 +226,8 @@ mod test {
245226
impl<C: RequestCodec> KvRequest<C> for MockKvRequest {
246227
type Response = MockRpcResponse;
247228

248-
fn encode_request(&self, _codec: &C) -> Cow<Self> {
249-
Cow::Borrowed(self)
229+
fn encode_request(self, _codec: &C) -> Self {
230+
self
250231
}
251232

252233
fn decode_response(&self, _codec: &C, resp: Self::Response) -> Result<Self::Response> {

src/request/plan.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
22

3-
use std::{marker::PhantomData, sync::Arc};
3+
use std::{borrow::Cow, marker::PhantomData, sync::Arc};
44

55
use async_recursion::async_recursion;
66
use async_trait::async_trait;
@@ -55,7 +55,12 @@ impl<C: RequestCodec, Req: KvRequest<C>> Plan for Dispatch<C, Req> {
5555
type Result = Req::Response;
5656

5757
async fn execute(&self) -> Result<Self::Result> {
58-
let req = self.request.encode_request(&self.codec);
58+
let req = if self.codec.is_plain() {
59+
Cow::Borrowed(&self.request)
60+
} else {
61+
Cow::Owned(self.request.clone().encode_request(&self.codec))
62+
};
63+
5964
let stats = tikv_stats(self.request.label());
6065
let result = self
6166
.kv_client
@@ -64,12 +69,17 @@ impl<C: RequestCodec, Req: KvRequest<C>> Plan for Dispatch<C, Req> {
6469
.dispatch(req.as_ref())
6570
.await;
6671
let result = stats.done(result);
72+
6773
result.and_then(|r| {
68-
req.decode_response(
69-
&self.codec,
70-
*r.downcast()
71-
.expect("Downcast failed: request and response type mismatch"),
72-
)
74+
let resp = *r
75+
.downcast()
76+
.expect("Downcast failed: request and response type mismatch");
77+
78+
if self.codec.is_plain() {
79+
Ok(resp)
80+
} else {
81+
req.decode_response(&self.codec, resp)
82+
}
7383
})
7484
}
7585
}

0 commit comments

Comments
 (0)