Skip to content

Commit 624c68e

Browse files
committed
impl KvRequest for raw requests
Signed-off-by: iosmanthus <[email protected]>
1 parent 19c3123 commit 624c68e

File tree

6 files changed

+245
-304
lines changed

6 files changed

+245
-304
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ slog-term = { version = "2.4" }
3838
thiserror = "1"
3939
tokio = { version = "1", features = [ "sync", "rt-multi-thread", "macros" ] }
4040
async-recursion = "0.3"
41+
paste = "1.0"
4142

4243
tikv-client-common = { version = "0.1.0", path = "tikv-client-common" }
4344
tikv-client-pd = { version = "0.1.0", path = "tikv-client-pd" }

src/raw/requests.rs

Lines changed: 72 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,25 @@ use std::{any::Any, borrow::Cow, ops::Range, sync::Arc};
55
use async_trait::async_trait;
66
use futures::stream::BoxStream;
77
use grpcio::CallOption;
8+
89
use tikv_client_proto::{kvrpcpb, metapb, tikvpb::TikvClient};
910
use tikv_client_store::Request;
1011

11-
use super::RawRpcRequest;
1212
use crate::{
1313
collect_first,
14+
ColumnFamily,
15+
Key,
16+
KvPair,
1417
pd::PdClient,
1518
request::{
16-
codec::RequestCodec, plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor,
17-
KvRequest, Merge, Process, Shardable, SingleKey,
19+
codec::RequestCodec, Collect, CollectSingle, DefaultProcessor, KvRequest,
20+
Merge, plan::ResponseWithShard, Process, Shardable, SingleKey,
1821
},
19-
store::{store_stream_for_keys, store_stream_for_ranges, RegionStore},
20-
transaction::HasLocks,
21-
util::iter::FlatMapOkIterExt,
22-
ColumnFamily, Key, KvPair, Result, Value,
22+
Result, store::{RegionStore, store_stream_for_keys, store_stream_for_ranges}, transaction::HasLocks, util::iter::FlatMapOkIterExt, Value,
2323
};
2424

25+
use super::RawRpcRequest;
26+
2527
pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
2628
let mut req = kvrpcpb::RawGetRequest::default();
2729
req.set_key(key);
@@ -30,7 +32,7 @@ pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::R
3032
req
3133
}
3234

33-
kv_request_with_key!(kvrpcpb::RawGetRequest, kvrpcpb::RawGetResponse);
35+
impl_kv_request_for_single_key_op!(kvrpcpb::RawGetRequest, kvrpcpb::RawGetResponse);
3436
shardable_key!(kvrpcpb::RawGetRequest);
3537
collect_first!(kvrpcpb::RawGetResponse);
3638

@@ -64,7 +66,8 @@ pub fn new_raw_batch_get_request(
6466
req
6567
}
6668

67-
kv_request_with_key!(kvrpcpb::RawBatchGetRequest, kvrpcpb::RawBatchGetResponse);
69+
impl_kv_request_for_batch_get!(kvrpcpb::RawBatchGetRequest, kvrpcpb::RawBatchGetResponse);
70+
6871
shardable_keys!(kvrpcpb::RawBatchGetRequest);
6972

7073
impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
@@ -93,7 +96,7 @@ pub fn new_raw_put_request(
9396
req
9497
}
9598

96-
kv_request_with_key!(kvrpcpb::RawPutRequest, kvrpcpb::RawPutResponse);
99+
impl_kv_request_for_single_key_op!(kvrpcpb::RawPutRequest, kvrpcpb::RawPutResponse);
97100
shardable_key!(kvrpcpb::RawPutRequest);
98101
collect_first!(kvrpcpb::RawPutResponse);
99102

@@ -120,13 +123,15 @@ impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawBatchPutRequest {
120123
type Response = kvrpcpb::RawBatchPutResponse;
121124

122125
fn encode_request(&self, codec: &C) -> Cow<Self> {
123-
plain_request!(self, codec);
124-
todo!()
125-
}
126+
if codec.is_plain() {
127+
return Cow::Borrowed(self);
128+
}
129+
130+
let mut req = self.clone();
131+
132+
*req.mut_pairs() = codec.encode_pairs(req.take_pairs());
126133

127-
fn decode_response(&self, codec: &C, resp: Self::Response) -> Result<Self::Response> {
128-
plain_response!(resp, codec);
129-
todo!()
134+
Cow::Owned(req)
130135
}
131136
}
132137

@@ -165,20 +170,7 @@ pub fn new_raw_delete_request(
165170
req
166171
}
167172

168-
impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawDeleteRequest {
169-
type Response = kvrpcpb::RawDeleteResponse;
170-
171-
fn encode_request(&self, codec: &C) -> Cow<Self> {
172-
plain_request!(self, codec);
173-
todo!()
174-
}
175-
176-
fn decode_response(&self, codec: &C, resp: Self::Response) -> Result<Self::Response> {
177-
plain_response!(resp, codec);
178-
todo!()
179-
}
180-
}
181-
173+
impl_kv_request_for_single_key_op!(kvrpcpb::RawDeleteRequest, kvrpcpb::RawDeleteResponse);
182174
shardable_key!(kvrpcpb::RawDeleteRequest);
183175
collect_first!(kvrpcpb::RawDeleteResponse);
184176
impl SingleKey for kvrpcpb::RawDeleteRequest {
@@ -202,13 +194,15 @@ impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawBatchDeleteRequest {
202194
type Response = kvrpcpb::RawBatchDeleteResponse;
203195

204196
fn encode_request(&self, codec: &C) -> Cow<Self> {
205-
plain_request!(self, codec);
206-
todo!()
207-
}
197+
if codec.is_plain() {
198+
return Cow::Borrowed(self);
199+
}
200+
201+
let mut req = self.clone();
208202

209-
fn decode_response(&self, codec: &C, resp: Self::Response) -> Result<Self::Response> {
210-
plain_response!(resp, codec);
211-
todo!()
203+
*req.mut_keys() = codec.encode_keys(req.take_keys());
204+
205+
Cow::Owned(req)
212206
}
213207
}
214208

@@ -231,13 +225,17 @@ impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawDeleteRangeRequest {
231225
type Response = kvrpcpb::RawDeleteRangeResponse;
232226

233227
fn encode_request(&self, codec: &C) -> Cow<Self> {
234-
plain_request!(self, codec);
235-
todo!()
236-
}
228+
if codec.is_plain() {
229+
return Cow::Borrowed(self);
230+
}
231+
232+
let mut req = self.clone();
237233

238-
fn decode_response(&self, codec: &C, resp: Self::Response) -> Result<Self::Response> {
239-
plain_response!(resp, codec);
240-
todo!()
234+
let (start, end) = (req.take_start_key(), req.take_end_key());
235+
*req.mut_start_key() = codec.encode_key(start);
236+
*req.mut_end_key() = codec.encode_key(end);
237+
238+
Cow::Owned(req)
241239
}
242240
}
243241

@@ -260,20 +258,7 @@ pub fn new_raw_scan_request(
260258
req
261259
}
262260

263-
impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawScanRequest {
264-
type Response = kvrpcpb::RawScanResponse;
265-
266-
fn encode_request(&self, codec: &C) -> Cow<Self> {
267-
plain_request!(self, codec);
268-
todo!()
269-
}
270-
271-
fn decode_response(&self, codec: &C, resp: Self::Response) -> Result<Self::Response> {
272-
plain_response!(resp, codec);
273-
todo!()
274-
}
275-
}
276-
261+
impl_kv_request_for_scan_op!(kvrpcpb::RawScanRequest, kvrpcpb::RawScanResponse, kvs);
277262
shardable_range!(kvrpcpb::RawScanRequest);
278263

279264
impl Merge<kvrpcpb::RawScanResponse> for Collect {
@@ -306,13 +291,15 @@ impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawBatchScanRequest {
306291
type Response = kvrpcpb::RawBatchScanResponse;
307292

308293
fn encode_request(&self, codec: &C) -> Cow<Self> {
309-
plain_request!(self, codec);
310-
todo!()
311-
}
294+
if codec.is_plain() {
295+
return Cow::Borrowed(self);
296+
}
312297

313-
fn decode_response(&self, codec: &C, resp: Self::Response) -> Result<Self::Response> {
314-
plain_response!(resp, codec);
315-
todo!()
298+
let mut req = self.clone();
299+
300+
*req.mut_ranges() = codec.encode_ranges(req.take_ranges());
301+
302+
Cow::Owned(req)
316303
}
317304
}
318305

@@ -361,20 +348,7 @@ pub fn new_cas_request(
361348
req
362349
}
363350

364-
impl<C: RequestCodec> KvRequest<C> for kvrpcpb::RawCasRequest {
365-
type Response = kvrpcpb::RawCasResponse;
366-
367-
fn encode_request(&self, codec: &C) -> Cow<Self> {
368-
plain_request!(self, codec);
369-
todo!()
370-
}
371-
372-
fn decode_response(&self, codec: &C, resp: Self::Response) -> Result<Self::Response> {
373-
plain_response!(resp, codec);
374-
todo!()
375-
}
376-
}
377-
351+
impl_kv_request_for_single_key_op!(kvrpcpb::RawCasRequest, kvrpcpb::RawCasResponse);
378352
shardable_key!(kvrpcpb::RawCasRequest);
379353
collect_first!(kvrpcpb::RawCasResponse);
380354
impl SingleKey for kvrpcpb::RawCasRequest {
@@ -397,7 +371,7 @@ impl Process<kvrpcpb::RawCasResponse> for DefaultProcessor {
397371
}
398372

399373
type RawCoprocessorRequestDataBuilder =
400-
Arc<dyn Fn(metapb::Region, Vec<kvrpcpb::KeyRange>) -> Vec<u8> + Send + Sync>;
374+
Arc<dyn Fn(metapb::Region, Vec<kvrpcpb::KeyRange>) -> Vec<u8> + Send + Sync>;
401375

402376
pub fn new_raw_coprocessor_request(
403377
copr_name: String,
@@ -467,8 +441,8 @@ impl Shardable for RawCoprocessorRequest {
467441

468442
#[allow(clippy::type_complexity)]
469443
impl
470-
Process<Vec<Result<ResponseWithShard<kvrpcpb::RawCoprocessorResponse, Vec<kvrpcpb::KeyRange>>>>>
471-
for DefaultProcessor
444+
Process<Vec<Result<ResponseWithShard<kvrpcpb::RawCoprocessorResponse, Vec<kvrpcpb::KeyRange>>>>>
445+
for DefaultProcessor
472446
{
473447
type Out = Vec<(Vec<u8>, Vec<Range<Key>>)>;
474448

@@ -517,29 +491,43 @@ impl_raw_rpc_request!(RawDeleteRangeRequest);
517491
impl_raw_rpc_request!(RawCasRequest);
518492

519493
impl HasLocks for kvrpcpb::RawGetResponse {}
494+
520495
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
496+
521497
impl HasLocks for kvrpcpb::RawPutResponse {}
498+
522499
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
500+
523501
impl HasLocks for kvrpcpb::RawDeleteResponse {}
502+
524503
impl HasLocks for kvrpcpb::RawBatchDeleteResponse {}
504+
525505
impl HasLocks for kvrpcpb::RawScanResponse {}
506+
526507
impl HasLocks for kvrpcpb::RawBatchScanResponse {}
508+
527509
impl HasLocks for kvrpcpb::RawDeleteRangeResponse {}
510+
528511
impl HasLocks for kvrpcpb::RawCasResponse {}
512+
529513
impl HasLocks for kvrpcpb::RawCoprocessorResponse {}
530514

531515
#[cfg(test)]
532516
mod test {
533-
use super::*;
517+
use std::any::Any;
518+
519+
use futures::executor;
520+
521+
use tikv_client_proto::kvrpcpb;
522+
534523
use crate::{
535524
backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
525+
Key,
536526
mock::{MockKvClient, MockPdClient},
537527
request::Plan,
538-
Key,
539528
};
540-
use futures::executor;
541-
use std::any::Any;
542-
use tikv_client_proto::kvrpcpb;
529+
530+
use super::*;
543531

544532
#[test]
545533
#[ignore]

src/region_cache.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ impl<C: RequestCodec, R: RetryClientTrait> RegionCache<C, R> {
135135
let mut r = self
136136
.inner_client
137137
.clone()
138-
.get_region(self.codec.encode_pd_query(key).into())
138+
.get_region(self.codec.encode_pd_query(key.into()))
139139
.await?;
140140
r.region = self.codec.decode_region(r.region)?;
141141
self.add_region(r.clone()).await;

0 commit comments

Comments
 (0)