Skip to content

Commit 317116b

Browse files
committed
decode in place
Signed-off-by: iosmanthus <[email protected]>
1 parent 434e10d commit 317116b

File tree

5 files changed

+157
-116
lines changed

5 files changed

+157
-116
lines changed

src/raw/requests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ pub fn new_raw_put_request(
9696
req
9797
}
9898

99-
impl_kv_request_for_single_key_op!(kvrpcpb::RawPutRequest, kvrpcpb::RawPutResponse);
99+
impl_kv_request!(kvrpcpb::RawPutRequest, kvrpcpb::RawPutResponse; encode=key);
100100
shardable_key!(kvrpcpb::RawPutRequest);
101101
collect_first!(kvrpcpb::RawPutResponse);
102102

src/region_cache.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ impl<C: RequestCodec, R: RetryClientTrait> RegionCache<C, R> {
137137
.clone()
138138
.get_region(self.codec.encode_pd_query(key.into()))
139139
.await?;
140-
r.region = self.codec.decode_region(r.region)?;
140+
self.codec.decode_region(&mut r.region)?;
141141
self.add_region(r.clone()).await;
142142
Ok(r)
143143
}
@@ -152,7 +152,7 @@ impl<C: RequestCodec, R: RetryClientTrait> RegionCache<C, R> {
152152
}
153153

154154
let mut region = self.inner_client.clone().get_region_by_id(id).await?;
155-
region.region = self.codec.decode_region(region.region)?;
155+
self.codec.decode_region(&mut region.region)?;
156156
self.add_region(region.clone()).await;
157157

158158
// notify others

src/request/codec.rs

Lines changed: 109 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
use core::intrinsics::copy;
12
use std::ops::{Deref, DerefMut};
23
use tikv_client_common::Error;
3-
use tikv_client_proto::{kvrpcpb, metapb::Region};
4+
use tikv_client_proto::{errorpb, kvrpcpb, metapb::Region};
45

56
use crate::{kv::codec::decode_bytes_in_place, Key, Result};
67

@@ -45,22 +46,85 @@ pub trait RequestCodec: Sized + Clone + Sync + Send + 'static {
4546
pairs
4647
}
4748

48-
fn decode_key(&self, key: Vec<u8>) -> Result<Vec<u8>> {
49-
Ok(key)
49+
fn decode_key(&self, _key: &mut Vec<u8>) -> Result<()> {
50+
Ok(())
5051
}
5152

52-
fn decode_keys(&self, keys: Vec<Vec<u8>>) -> Result<Vec<Vec<u8>>> {
53-
keys.into_iter()
54-
.map(|key| self.decode_key(key))
55-
.collect::<Result<Vec<Vec<u8>>>>()
53+
fn decode_keys(&self, keys: &mut [Vec<u8>]) -> Result<()> {
54+
for key in keys.iter_mut() {
55+
self.decode_key(key)?;
56+
}
57+
58+
Ok(())
59+
}
60+
61+
fn decode_key_error(&self, err: &mut kvrpcpb::KeyError) -> Result<()> {
62+
if err.has_locked() {
63+
let locked = err.mut_locked();
64+
self.decode_lock(locked)?;
65+
}
66+
67+
if err.has_conflict() {
68+
let conflict = err.mut_conflict();
69+
self.decode_key(conflict.mut_key())?;
70+
self.decode_key(conflict.mut_primary())?;
71+
}
72+
73+
if err.has_already_exist() {
74+
let already_exist = err.mut_already_exist();
75+
self.decode_key(already_exist.mut_key())?;
76+
}
77+
78+
// We do not decode key in `Deadlock` since there is no use for the key right now in client side.
79+
// All we need is the key hash to detect deadlock.
80+
// TODO: while we check the keys against the deadlock key hash, we need to encode the key.
81+
82+
if err.has_commit_ts_expired() {
83+
let commit_ts_expired = err.mut_commit_ts_expired();
84+
self.decode_key(commit_ts_expired.mut_key())?;
85+
}
86+
87+
if err.has_txn_not_found() {
88+
let txn_not_found = err.mut_txn_not_found();
89+
self.decode_key(txn_not_found.mut_primary_key())?;
90+
}
91+
92+
if err.has_assertion_failed() {
93+
let assertion_failed = err.mut_assertion_failed();
94+
self.decode_key(assertion_failed.mut_key())?;
95+
}
96+
97+
Ok(())
98+
}
99+
100+
fn decode_key_errors(&self, errors: &mut [kvrpcpb::KeyError]) -> Result<()> {
101+
for err in errors.iter_mut() {
102+
self.decode_key_error(err)?;
103+
}
104+
105+
Ok(())
106+
}
107+
108+
fn decode_lock(&self, lock: &mut kvrpcpb::LockInfo) -> Result<()> {
109+
self.decode_key(lock.mut_primary_lock())?;
110+
self.decode_key(lock.mut_key())?;
111+
self.decode_keys(lock.mut_secondaries())
112+
}
113+
114+
fn decode_locks(&self, locks: &mut [kvrpcpb::LockInfo]) -> Result<()> {
115+
for lock in locks.iter_mut() {
116+
self.decode_lock(lock)?;
117+
}
118+
119+
Ok(())
56120
}
57121

58-
fn decode_pairs(&self, mut pairs: Vec<kvrpcpb::KvPair>) -> Result<Vec<kvrpcpb::KvPair>> {
122+
fn decode_pairs(&self, pairs: &mut [kvrpcpb::KvPair]) -> Result<()> {
59123
for pair in pairs.iter_mut() {
60-
*pair.mut_key() = self.decode_key(pair.take_key())?;
124+
self.decode_key(pair.mut_key())?;
61125
}
62126

63-
Ok(pairs)
127+
Ok(())
64128
}
65129

66130
fn encode_range(&self, start: Vec<u8>, end: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
@@ -81,8 +145,22 @@ pub trait RequestCodec: Sized + Clone + Sync + Send + 'static {
81145
key
82146
}
83147

84-
fn decode_region(&self, region: Region) -> Result<Region> {
85-
Ok(region)
148+
fn decode_region(&self, _region: &mut Region) -> Result<()> {
149+
Ok(())
150+
}
151+
152+
fn decode_regions(&self, regions: &mut [Region]) -> Result<()> {
153+
for region in regions.iter_mut() {
154+
self.decode_region(region)?;
155+
}
156+
Ok(())
157+
}
158+
159+
fn decode_region_error(&self, err: &mut errorpb::Error) -> Result<()> {
160+
if err.has_epoch_not_match() {
161+
self.decode_regions(err.mut_epoch_not_match().mut_current_regions())?;
162+
}
163+
Ok(())
86164
}
87165

88166
fn is_plain(&self) -> bool {
@@ -107,10 +185,10 @@ impl RequestCodec for TxnApiV1 {
107185
Key::from(key).to_encoded().into()
108186
}
109187

110-
fn decode_region(&self, mut region: Region) -> Result<Region> {
188+
fn decode_region(&self, region: &mut Region) -> Result<()> {
111189
decode_bytes_in_place(region.mut_start_key(), false)?;
112190
decode_bytes_in_place(region.mut_end_key(), false)?;
113-
Ok(region)
191+
Ok(())
114192
}
115193
}
116194

@@ -151,7 +229,7 @@ impl KeyMode {
151229

152230
type Prefix = [u8; KEYSPACE_PREFIX_LEN];
153231

154-
#[derive(Clone, Copy, Default, PartialEq)]
232+
#[derive(Clone, Copy, Default, PartialEq, Eq)]
155233
pub struct KeySpaceId([u8; 3]);
156234

157235
impl Deref for KeySpaceId {
@@ -189,18 +267,27 @@ impl RequestCodec for KeySpaceCodec {
189267
encoded
190268
}
191269

192-
fn decode_key(&self, mut key: Vec<u8>) -> Result<Vec<u8>> {
270+
fn decode_key(&self, key: &mut Vec<u8>) -> Result<()> {
193271
let prefix: Prefix = (*self).into();
194272

195273
if !key.starts_with(&prefix) {
196274
return Err(Error::CorruptedKeyspace {
197275
expected: prefix.to_vec(),
198276
actual: key[..KEYSPACE_PREFIX_LEN].to_vec(),
199-
key,
277+
key: key.to_vec(),
200278
});
201279
}
202280

203-
Ok(key.split_off(KEYSPACE_PREFIX_LEN))
281+
unsafe {
282+
let trimmed_len = key.len() - KEYSPACE_PREFIX_LEN;
283+
let ptr = key.as_mut_ptr();
284+
let trimmed = key[KEYSPACE_PREFIX_LEN..].as_mut_ptr();
285+
286+
copy(trimmed, ptr, trimmed_len);
287+
288+
key.set_len(trimmed_len);
289+
}
290+
Ok(())
204291
}
205292

206293
fn encode_range(&self, start: Vec<u8>, end: Vec<u8>) -> (Vec<u8>, Vec<u8>) {
@@ -215,25 +302,25 @@ impl RequestCodec for KeySpaceCodec {
215302
Key::from(self.encode_key(key)).to_encoded().into()
216303
}
217304

218-
fn decode_region(&self, mut region: Region) -> Result<Region> {
305+
fn decode_region(&self, region: &mut Region) -> Result<()> {
219306
decode_bytes_in_place(region.mut_start_key(), false)?;
220307
decode_bytes_in_place(region.mut_end_key(), false)?;
221308

222309
// Map the region's start key to the keyspace's start key.
223310
if region.get_start_key() < self.mode.min_key().as_slice() {
224311
*region.mut_start_key() = vec![];
225312
} else {
226-
*region.mut_start_key() = self.decode_key(region.get_start_key().to_vec())?;
313+
self.decode_key(region.mut_start_key())?;
227314
}
228315

229316
// Map the region's end key to the keyspace's end key.
230317
if region.get_end_key() > self.mode.max_key().as_slice() {
231318
*region.mut_end_key() = vec![];
232319
} else {
233-
*region.mut_end_key() = self.decode_key(region.get_end_key().to_vec())?;
320+
self.decode_key(region.mut_end_key())?;
234321
}
235322

236-
Ok(region)
323+
Ok(())
237324
}
238325

239326
fn is_plain(&self) -> bool {

src/request/mod.rs

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,35 @@ pub trait KvRequest<C>: Request + Sized + Clone + Sync + Send + 'static {
4141
}
4242
}
4343

44+
#[macro_export]
45+
macro_rules! impl_kv_request {
46+
($req:ty, $resp:ty; encode=$($i:ident),*) => {
47+
impl<C> KvRequest<C> for $req
48+
where C: RequestCodec
49+
{
50+
type Response = $resp;
51+
52+
fn encode_request(mut self, codec: &C) -> Self {
53+
$(
54+
paste::paste! {
55+
*self.[<mut_ $i>]() = codec.[<encode_ $i>](self.[<take_ $i>]());
56+
}
57+
58+
self
59+
)*
60+
}
61+
62+
fn decode_response(&self, codec: &C, mut resp: Self::Response) -> Result<Self::Response> {
63+
if resp.has_region_error() {
64+
codec.decode_region_error(resp.mut_region_error())?;
65+
}
66+
67+
Ok(resp)
68+
}
69+
}
70+
};
71+
}
72+
4473
#[macro_export]
4574
macro_rules! impl_kv_request_for_single_key_op {
4675
($req: ty, $resp: ty) => {
@@ -79,7 +108,7 @@ macro_rules! impl_kv_request_for_batch_get {
79108
codec: &C,
80109
mut resp: Self::Response,
81110
) -> $crate::Result<Self::Response> {
82-
*resp.mut_pairs() = codec.decode_pairs(resp.take_pairs())?;
111+
codec.decode_pairs(resp.mut_pairs())?;
83112

84113
Ok(resp)
85114
}
@@ -112,9 +141,9 @@ macro_rules! impl_kv_request_for_scan_op {
112141
mut resp: Self::Response,
113142
) -> $crate::Result<Self::Response> {
114143
paste::paste! {
115-
let pairs = resp.[<take_ $pairs>]();
144+
let pairs = resp.[<mut_ $pairs>]();
116145

117-
*resp.[<mut_ $pairs>]() = codec.encode_pairs(pairs);
146+
codec.decode_pairs(pairs)?;
118147

119148
Ok(resp)
120149
}

0 commit comments

Comments
 (0)