Skip to content

Commit 47478cb

Browse files
authored
Merge pull request #245 from ekexium/fix-result-haserror
Fix HasError and HasLocks
2 parents 3fad149 + 69ae1b8 commit 47478cb

File tree

9 files changed

+223
-43
lines changed

9 files changed

+223
-43
lines changed

src/raw/client.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ impl Client {
179179
.await?
180180
.resolve_lock(OPTIMISTIC_BACKOFF)
181181
.retry_region(DEFAULT_REGION_BACKOFF)
182+
.extract_error()
182183
.plan();
183184
plan.execute().await?;
184185
Ok(())
@@ -210,6 +211,7 @@ impl Client {
210211
.resolve_lock(OPTIMISTIC_BACKOFF)
211212
.multi_region()
212213
.retry_region(DEFAULT_REGION_BACKOFF)
214+
.extract_error()
213215
.plan();
214216
plan.execute().await?;
215217
Ok(())
@@ -239,6 +241,7 @@ impl Client {
239241
.await?
240242
.resolve_lock(OPTIMISTIC_BACKOFF)
241243
.retry_region(DEFAULT_REGION_BACKOFF)
244+
.extract_error()
242245
.plan();
243246
plan.execute().await?;
244247
Ok(())
@@ -268,6 +271,7 @@ impl Client {
268271
.resolve_lock(OPTIMISTIC_BACKOFF)
269272
.multi_region()
270273
.retry_region(DEFAULT_REGION_BACKOFF)
274+
.extract_error()
271275
.plan();
272276
plan.execute().await?;
273277
Ok(())
@@ -294,6 +298,7 @@ impl Client {
294298
.resolve_lock(OPTIMISTIC_BACKOFF)
295299
.multi_region()
296300
.retry_region(DEFAULT_REGION_BACKOFF)
301+
.extract_error()
297302
.plan();
298303
plan.execute().await?;
299304
Ok(())

src/request/mod.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use tikv_client_store::{HasError, Request};
1010

1111
pub use self::{
1212
plan::{
13-
Collect, CollectError, DefaultProcessor, Dispatch, Merge, MergeResponse, MultiRegion, Plan,
14-
Process, ProcessResponse, ResolveLock, RetryRegion,
13+
Collect, CollectError, DefaultProcessor, Dispatch, ExtractError, Merge, MergeResponse,
14+
MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion,
1515
},
1616
plan_builder::{PlanBuilder, SingleKey},
1717
shard::Shardable,
@@ -66,15 +66,17 @@ mod test {
6666
use crate::{
6767
mock::{MockKvClient, MockPdClient},
6868
store::store_stream_for_keys,
69+
transaction::lowering::new_commit_request,
6970
Error, Key, Result,
7071
};
7172
use futures::executor;
7273
use grpcio::CallOption;
7374
use std::{
7475
any::Any,
76+
iter,
7577
sync::{Arc, Mutex},
7678
};
77-
use tikv_client_proto::{kvrpcpb, tikvpb::TikvClient};
79+
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp, tikvpb::TikvClient};
7880
use tikv_client_store::HasRegionError;
7981

8082
#[test]
@@ -167,10 +169,54 @@ mod test {
167169
.resolve_lock(Backoff::no_jitter_backoff(1, 1, 3))
168170
.multi_region()
169171
.retry_region(Backoff::no_jitter_backoff(1, 1, 3))
172+
.extract_error()
170173
.plan();
171174
let _ = executor::block_on(async { plan.execute().await });
172175

173176
// Original call plus the 3 retries
174177
assert_eq!(*invoking_count.lock().unwrap(), 4);
175178
}
179+
180+
#[test]
181+
fn test_extract_error() {
182+
let pd_client = Arc::new(MockPdClient::new(MockKvClient::with_dispatch_hook(
183+
|_: &dyn Any| {
184+
Ok(Box::new(kvrpcpb::CommitResponse {
185+
region_error: None,
186+
error: Some(kvrpcpb::KeyError {
187+
locked: None,
188+
retryable: String::new(),
189+
abort: String::new(),
190+
conflict: None,
191+
already_exist: None,
192+
deadlock: None,
193+
commit_ts_expired: None,
194+
txn_not_found: None,
195+
commit_ts_too_large: None,
196+
}),
197+
commit_version: 0,
198+
}) as Box<dyn Any>)
199+
},
200+
)));
201+
202+
let key: Key = "key".to_owned().into();
203+
let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default());
204+
205+
// does not extract error
206+
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req.clone())
207+
.resolve_lock(OPTIMISTIC_BACKOFF)
208+
.multi_region()
209+
.retry_region(OPTIMISTIC_BACKOFF)
210+
.plan();
211+
assert!(executor::block_on(async { plan.execute().await }).is_ok());
212+
213+
// extract error
214+
let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
215+
.resolve_lock(OPTIMISTIC_BACKOFF)
216+
.multi_region()
217+
.retry_region(OPTIMISTIC_BACKOFF)
218+
.extract_error()
219+
.plan();
220+
assert!(executor::block_on(async { plan.execute().await }).is_err());
221+
}
176222
}

src/request/plan.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,42 @@ where
249249
}
250250
}
251251

252+
pub struct ExtractError<P: Plan> {
253+
pub inner: P,
254+
}
255+
256+
impl<P: Plan> Clone for ExtractError<P> {
257+
fn clone(&self) -> Self {
258+
ExtractError {
259+
inner: self.inner.clone(),
260+
}
261+
}
262+
}
263+
264+
/// When executed, the plan extracts errors from its inner plan, and
265+
/// returns an `Err` wrapping the error.
266+
///
267+
/// The errors come from two places: `Err` from inner plans, and `Ok(response)`
268+
/// where `response` contains unresolved errors (`error` and `region_error`).
269+
#[async_trait]
270+
impl<P: Plan> Plan for ExtractError<P>
271+
where
272+
P::Result: HasError,
273+
{
274+
type Result = P::Result;
275+
276+
async fn execute(&self) -> Result<Self::Result> {
277+
let mut result = self.inner.execute().await?;
278+
if let Some(error) = result.error() {
279+
Err(error)
280+
} else if let Some(error) = result.region_error() {
281+
Err(error)
282+
} else {
283+
Ok(result)
284+
}
285+
}
286+
}
287+
252288
#[cfg(test)]
253289
mod test {
254290
use super::*;

src/request/plan_builder.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use crate::{
44
backoff::Backoff,
55
pd::PdClient,
66
request::{
7-
DefaultProcessor, Dispatch, KvRequest, Merge, MergeResponse, MultiRegion, Plan, Process,
8-
ProcessResponse, ResolveLock, RetryRegion, Shardable,
7+
DefaultProcessor, Dispatch, ExtractError, KvRequest, Merge, MergeResponse, MultiRegion,
8+
Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable,
99
},
1010
store::Store,
1111
transaction::HasLocks,
@@ -161,6 +161,19 @@ impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
161161
}
162162
}
163163

164+
impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
165+
where
166+
P::Result: HasError,
167+
{
168+
pub fn extract_error(self) -> PlanBuilder<PdC, ExtractError<P>, Targetted> {
169+
PlanBuilder {
170+
pd_client: self.pd_client,
171+
plan: ExtractError { inner: self.plan },
172+
phantom: self.phantom,
173+
}
174+
}
175+
}
176+
164177
fn set_single_region_store<PdC: PdClient, R: KvRequest>(
165178
mut plan: Dispatch<R>,
166179
store: Store,

src/transaction/lock.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ async fn resolve_lock_with_retry(
104104
.await?
105105
.resolve_lock(Backoff::no_backoff())
106106
.retry_region(Backoff::no_backoff())
107+
.extract_error()
107108
.plan();
108109
match plan.execute().await {
109110
Ok(_) => {

src/transaction/requests.rs

Lines changed: 68 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,47 @@ use futures::stream::BoxStream;
1313
use std::{collections::HashMap, iter, sync::Arc};
1414
use tikv_client_proto::{kvrpcpb, pdpb::Timestamp};
1515

16+
// implement HasLocks for a response type that has a `pairs` field,
17+
// where locks can be extracted from both the `pairs` and `error` fields
18+
macro_rules! pair_locks {
19+
($response_type:ty) => {
20+
impl HasLocks for $response_type {
21+
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
22+
if self.pairs.is_empty() {
23+
self.error
24+
.as_mut()
25+
.and_then(|error| error.locked.take())
26+
.into_iter()
27+
.collect()
28+
} else {
29+
self.pairs
30+
.iter_mut()
31+
.filter_map(|pair| {
32+
pair.error.as_mut().and_then(|error| error.locked.take())
33+
})
34+
.collect()
35+
}
36+
}
37+
}
38+
};
39+
}
40+
41+
// implement HasLocks for a response type that does not have a `pairs` field,
42+
// where locks are only extracted from the `error` field
43+
macro_rules! error_locks {
44+
($response_type:ty) => {
45+
impl HasLocks for $response_type {
46+
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
47+
self.error
48+
.as_mut()
49+
.and_then(|error| error.locked.take())
50+
.into_iter()
51+
.collect()
52+
}
53+
}
54+
};
55+
}
56+
1657
pub fn new_get_request(key: Vec<u8>, timestamp: u64) -> kvrpcpb::GetRequest {
1758
let mut req = kvrpcpb::GetRequest::default();
1859
req.set_key(key);
@@ -43,16 +84,6 @@ impl Process<kvrpcpb::GetResponse> for DefaultProcessor {
4384
}
4485
}
4586

46-
impl HasLocks for kvrpcpb::GetResponse {
47-
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
48-
self.error
49-
.as_mut()
50-
.and_then(|error| error.locked.take())
51-
.into_iter()
52-
.collect()
53-
}
54-
}
55-
5687
pub fn new_batch_get_request(keys: Vec<Vec<u8>>, timestamp: u64) -> kvrpcpb::BatchGetRequest {
5788
let mut req = kvrpcpb::BatchGetRequest::default();
5889
req.set_keys(keys);
@@ -77,15 +108,6 @@ impl Merge<kvrpcpb::BatchGetResponse> for Collect {
77108
}
78109
}
79110

80-
impl HasLocks for kvrpcpb::BatchGetResponse {
81-
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
82-
self.pairs
83-
.iter_mut()
84-
.filter_map(|pair| pair.error.as_mut().and_then(|error| error.locked.take()))
85-
.collect()
86-
}
87-
}
88-
89111
pub fn new_scan_request(
90112
start_key: Vec<u8>,
91113
end_key: Vec<u8>,
@@ -119,15 +141,6 @@ impl Merge<kvrpcpb::ScanResponse> for Collect {
119141
}
120142
}
121143

122-
impl HasLocks for kvrpcpb::ScanResponse {
123-
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
124-
self.pairs
125-
.iter_mut()
126-
.filter_map(|pair| pair.error.as_mut().and_then(|error| error.locked.take()))
127-
.collect()
128-
}
129-
}
130-
131144
pub fn new_resolve_lock_request(
132145
start_version: u64,
133146
commit_version: u64,
@@ -547,13 +560,32 @@ pub struct SecondaryLocksStatus {
547560
pub commit_ts: Option<Timestamp>,
548561
}
549562

550-
impl HasLocks for kvrpcpb::CommitResponse {}
563+
impl HasLocks for kvrpcpb::PessimisticRollbackResponse {
564+
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
565+
self.errors
566+
.iter_mut()
567+
.filter_map(|error| error.locked.take())
568+
.collect()
569+
}
570+
}
571+
572+
impl HasLocks for kvrpcpb::PessimisticLockResponse {
573+
fn take_locks(&mut self) -> Vec<kvrpcpb::LockInfo> {
574+
self.errors
575+
.iter_mut()
576+
.filter_map(|error| error.locked.take())
577+
.collect()
578+
}
579+
}
580+
581+
pair_locks!(kvrpcpb::BatchGetResponse);
582+
pair_locks!(kvrpcpb::ScanResponse);
583+
error_locks!(kvrpcpb::GetResponse);
584+
error_locks!(kvrpcpb::ResolveLockResponse);
585+
error_locks!(kvrpcpb::CommitResponse);
586+
error_locks!(kvrpcpb::BatchRollbackResponse);
587+
error_locks!(kvrpcpb::TxnHeartBeatResponse);
588+
error_locks!(kvrpcpb::CheckTxnStatusResponse);
589+
error_locks!(kvrpcpb::CheckSecondaryLocksResponse);
551590
impl HasLocks for kvrpcpb::CleanupResponse {}
552-
impl HasLocks for kvrpcpb::BatchRollbackResponse {}
553-
impl HasLocks for kvrpcpb::PessimisticRollbackResponse {}
554-
impl HasLocks for kvrpcpb::ResolveLockResponse {}
555591
impl HasLocks for kvrpcpb::ScanLockResponse {}
556-
impl HasLocks for kvrpcpb::PessimisticLockResponse {}
557-
impl HasLocks for kvrpcpb::TxnHeartBeatResponse {}
558-
impl HasLocks for kvrpcpb::CheckTxnStatusResponse {}
559-
impl HasLocks for kvrpcpb::CheckSecondaryLocksResponse {}

src/transaction/transaction.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,7 @@ impl Committer {
901901
.multi_region()
902902
.retry_region(self.options.retry_options.region_backoff.clone())
903903
.merge(CollectError)
904+
.extract_error()
904905
.plan();
905906
let response = plan.execute().await?;
906907

@@ -939,6 +940,7 @@ impl Committer {
939940
.resolve_lock(self.options.retry_options.lock_backoff.clone())
940941
.multi_region()
941942
.retry_region(self.options.retry_options.region_backoff.clone())
943+
.extract_error()
942944
.plan();
943945
plan.execute()
944946
.inspect_err(|e| {
@@ -975,6 +977,7 @@ impl Committer {
975977
.resolve_lock(self.options.retry_options.lock_backoff)
976978
.multi_region()
977979
.retry_region(self.options.retry_options.region_backoff)
980+
.extract_error()
978981
.plan();
979982
plan.execute().await?;
980983
Ok(())
@@ -995,6 +998,7 @@ impl Committer {
995998
.resolve_lock(self.options.retry_options.lock_backoff)
996999
.multi_region()
9971000
.retry_region(self.options.retry_options.region_backoff)
1001+
.extract_error()
9981002
.plan();
9991003
plan.execute().await?;
10001004
}
@@ -1004,6 +1008,7 @@ impl Committer {
10041008
.resolve_lock(self.options.retry_options.lock_backoff)
10051009
.multi_region()
10061010
.retry_region(self.options.retry_options.region_backoff)
1011+
.extract_error()
10071012
.plan();
10081013
plan.execute().await?;
10091014
}

tikv-client-common/src/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ pub enum Error {
6969
KvError { message: String },
7070
#[error("{}", message)]
7171
InternalError { message: String },
72+
#[error("{0}")]
73+
StringError(String),
7274
}
7375

7476
impl From<tikv_client_proto::errorpb::Error> for Error {

0 commit comments

Comments
 (0)