Skip to content

Commit 7d68247

Browse files
committed
fix api version context
Signed-off-by: iosmanthus <[email protected]>
1 parent 3660ca1 commit 7d68247

File tree

5 files changed

+80
-98
lines changed

5 files changed

+80
-98
lines changed

src/raw/requests.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub fn new_raw_batch_get_request(
6666
req
6767
}
6868

69-
impl_kv_request!(kvrpcpb::RawBatchGetRequest, keys; kvrpcpb::RawBatchGetResponse);
69+
impl_kv_request!(kvrpcpb::RawBatchGetRequest, keys; kvrpcpb::RawBatchGetResponse, pairs);
7070
shardable_keys!(kvrpcpb::RawBatchGetRequest);
7171

7272
impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
@@ -362,6 +362,10 @@ impl Request for RawCoprocessorRequest {
362362
fn set_context(&mut self, context: kvrpcpb::Context) {
363363
self.inner.set_context(context);
364364
}
365+
366+
fn mut_context(&mut self) -> &mut kvrpcpb::Context {
367+
self.inner.mut_context()
368+
}
365369
}
366370

367371
impl<C: RequestCodec> KvRequest<C> for RawCoprocessorRequest {

src/request/codec.rs

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ pub trait RequestCodec: Sized + Clone + Sync + Send + 'static {
4040
Ok(())
4141
}
4242

43-
fn is_plain(&self) -> bool {
44-
true
43+
fn version(&self) -> kvrpcpb::ApiVersion {
44+
kvrpcpb::ApiVersion::V1
4545
}
4646
}
4747

@@ -331,7 +331,8 @@ impl RequestCodec for KeySpaceCodec {
331331
}
332332

333333
// Map the region's end key to the keyspace's end key.
334-
if region.get_end_key() > self.mode.max_key().as_slice() {
334+
if region.get_end_key().is_empty() || region.get_end_key() > self.mode.max_key().as_slice()
335+
{
335336
*region.mut_end_key() = vec![];
336337
} else {
337338
self.decode_key(region.mut_end_key())?;
@@ -340,8 +341,8 @@ impl RequestCodec for KeySpaceCodec {
340341
Ok(())
341342
}
342343

343-
fn is_plain(&self) -> bool {
344-
false
344+
fn version(&self) -> kvrpcpb::ApiVersion {
345+
kvrpcpb::ApiVersion::V2
345346
}
346347
}
347348

@@ -357,7 +358,31 @@ impl RawKeyspaceCodec {
357358
}
358359
}
359360

360-
impl RequestCodec for RawKeyspaceCodec {}
361+
impl RequestCodec for RawKeyspaceCodec {
362+
fn encode_key(&self, key: Vec<u8>) -> Vec<u8> {
363+
self.0.encode_key(key)
364+
}
365+
366+
fn decode_key(&self, key: &mut Vec<u8>) -> Result<()> {
367+
self.0.decode_key(key)
368+
}
369+
370+
fn encode_range(&self, start: Vec<u8>, end: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
371+
self.0.encode_range(start, end)
372+
}
373+
374+
fn encode_pd_query(&self, key: Vec<u8>) -> Vec<u8> {
375+
self.0.encode_pd_query(key)
376+
}
377+
378+
fn decode_region(&self, region: &mut Region) -> Result<()> {
379+
self.0.decode_region(region)
380+
}
381+
382+
fn version(&self) -> kvrpcpb::ApiVersion {
383+
self.0.version()
384+
}
385+
}
361386

362387
impl RawCodec for RawKeyspaceCodec {}
363388

@@ -373,6 +398,30 @@ impl TxnKeyspaceCodec {
373398
}
374399
}
375400

376-
impl RequestCodec for TxnKeyspaceCodec {}
401+
impl RequestCodec for TxnKeyspaceCodec {
402+
fn encode_key(&self, key: Vec<u8>) -> Vec<u8> {
403+
self.0.encode_key(key)
404+
}
405+
406+
fn decode_key(&self, key: &mut Vec<u8>) -> Result<()> {
407+
self.0.decode_key(key)
408+
}
409+
410+
fn encode_range(&self, start: Vec<u8>, end: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
411+
self.0.encode_range(start, end)
412+
}
413+
414+
fn encode_pd_query(&self, key: Vec<u8>) -> Vec<u8> {
415+
self.0.encode_pd_query(key)
416+
}
417+
418+
fn decode_region(&self, region: &mut Region) -> Result<()> {
419+
self.0.decode_region(region)
420+
}
421+
422+
fn version(&self) -> kvrpcpb::ApiVersion {
423+
self.0.version()
424+
}
425+
}
377426

378427
impl TxnCodec for TxnKeyspaceCodec {}

src/request/mod.rs

Lines changed: 4 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -119,88 +119,6 @@ macro_rules! impl_kv_request {
119119
};
120120
}
121121

122-
#[macro_export]
123-
macro_rules! impl_kv_request_for_single_key_op {
124-
($req: ty, $resp: ty) => {
125-
impl<C> KvRequest<C> for $req
126-
where
127-
C: RequestCodec,
128-
{
129-
type Response = $resp;
130-
131-
fn encode_request(mut self, codec: &C) -> Self {
132-
*self.mut_key() = codec.encode_key(self.take_key());
133-
134-
self
135-
}
136-
}
137-
};
138-
}
139-
140-
#[macro_export]
141-
macro_rules! impl_kv_request_for_batch_get {
142-
($req: ty, $resp: ty) => {
143-
impl<C> KvRequest<C> for $req
144-
where
145-
C: RequestCodec,
146-
{
147-
type Response = $resp;
148-
149-
fn encode_request(mut self, codec: &C) -> Self {
150-
*self.mut_keys() = codec.encode_keys(self.take_keys());
151-
152-
self
153-
}
154-
155-
fn decode_response(
156-
&self,
157-
codec: &C,
158-
mut resp: Self::Response,
159-
) -> $crate::Result<Self::Response> {
160-
codec.decode_pairs(resp.mut_pairs())?;
161-
162-
Ok(resp)
163-
}
164-
}
165-
};
166-
}
167-
168-
#[macro_export]
169-
macro_rules! impl_kv_request_for_scan_op {
170-
($req: ty, $resp: ty, $pairs: ident) => {
171-
impl<C> KvRequest<C> for $req
172-
where
173-
C: RequestCodec,
174-
{
175-
type Response = $resp;
176-
177-
fn encode_request(mut self, codec: &C) -> Self {
178-
let (start, end) =
179-
codec.encode_range(self.take_start_key().into(), self.take_end_key().into());
180-
181-
self.set_start_key(start);
182-
self.set_end_key(end);
183-
184-
self
185-
}
186-
187-
fn decode_response(
188-
&self,
189-
codec: &C,
190-
mut resp: Self::Response,
191-
) -> $crate::Result<Self::Response> {
192-
paste::paste! {
193-
let pairs = resp.[<mut_ $pairs>]();
194-
195-
codec.decode_pairs(pairs)?;
196-
197-
Ok(resp)
198-
}
199-
}
200-
}
201-
};
202-
}
203-
204122
#[derive(Clone, Debug, new, Eq, PartialEq)]
205123
pub struct RetryOptions {
206124
/// How to retry when there is a region error and we need to resolve regions with PD.
@@ -296,6 +214,10 @@ mod test {
296214
fn set_context(&mut self, _: kvrpcpb::Context) {
297215
unreachable!();
298216
}
217+
218+
fn mut_context(&mut self) -> &mut kvrpcpb::Context {
219+
unreachable!()
220+
}
299221
}
300222

301223
#[async_trait]

src/request/plan.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,13 @@ impl<C: RequestCodec, Req: KvRequest<C>> Plan for Dispatch<C, Req> {
5555
type Result = Req::Response;
5656

5757
async fn execute(&self) -> Result<Self::Result> {
58-
let req = if self.codec.is_plain() {
59-
Cow::Borrowed(&self.request)
60-
} else {
61-
Cow::Owned(self.request.clone().encode_request(&self.codec))
58+
let req = match self.codec.version() {
59+
kvrpcpb::ApiVersion::V2 => {
60+
let mut req = self.request.clone();
61+
req.mut_context().set_api_version(self.codec.version());
62+
Cow::Owned(req.encode_request(&self.codec))
63+
}
64+
_ => Cow::Borrowed(&self.request),
6265
};
6366

6467
let stats = tikv_stats(self.request.label());
@@ -75,10 +78,9 @@ impl<C: RequestCodec, Req: KvRequest<C>> Plan for Dispatch<C, Req> {
7578
.downcast()
7679
.expect("Downcast failed: request and response type mismatch");
7780

78-
if self.codec.is_plain() {
79-
Ok(resp)
80-
} else {
81-
req.decode_response(&self.codec, resp)
81+
match self.codec.version() {
82+
kvrpcpb::ApiVersion::V2 => req.decode_response(&self.codec, resp),
83+
_ => Ok(resp),
8284
}
8385
})
8486
}

tikv-client-store/src/request.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ pub trait Request: Any + Sync + Send + 'static {
1212
fn label(&self) -> &'static str;
1313
fn as_any(&self) -> &dyn Any;
1414
fn set_context(&mut self, context: kvrpcpb::Context);
15+
fn mut_context(&mut self) -> &mut kvrpcpb::Context;
1516
}
1617

1718
macro_rules! impl_request {
@@ -41,6 +42,10 @@ macro_rules! impl_request {
4142
fn set_context(&mut self, context: kvrpcpb::Context) {
4243
kvrpcpb::$name::set_context(self, context)
4344
}
45+
46+
fn mut_context(&mut self) -> &mut kvrpcpb::Context {
47+
kvrpcpb::$name::mut_context(self)
48+
}
4449
}
4550
};
4651
}

0 commit comments

Comments
 (0)