Skip to content

Commit 7d80f59

Browse files
authored
transaction: Resolve locks using "kv_resolve_lock" interface and handle txn file locks (#519)
Signed-off-by: Ping Yu <yuping@pingcap.com>
1 parent a5d247e commit 7d80f59

File tree

12 files changed

+343
-102
lines changed

12 files changed

+343
-102
lines changed

proto/kvrpcpb.proto

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ message PrewriteRequest {
132132
// for_update_ts constriants that should be checked when prewriting a pessimistic transaction.
133133
// See https://github.com/tikv/tikv/issues/14311
134134
repeated ForUpdateTSConstraint for_update_ts_constraints = 16;
135+
136+
// Reserved for file based transaction.
137+
repeated uint64 txn_file_chunks = 100;
135138
}
136139

137140
message PrewriteResponse {
@@ -271,6 +274,9 @@ message TxnHeartBeatRequest {
271274
uint64 start_version = 3;
272275
// The new TTL the sender would like.
273276
uint64 advise_lock_ttl = 4;
277+
278+
// Reserved for file based transaction.
279+
bool is_txn_file = 100;
274280
}
275281

276282
message TxnHeartBeatResponse {
@@ -315,6 +321,9 @@ message CheckTxnStatusRequest {
315321
// because the old versions of clients cannot handle some results returned from TiKV correctly.
316322
// For new versions, this field should always be set to true.
317323
bool verify_is_primary = 9;
324+
325+
// Reserved for file based transaction.
326+
bool is_txn_file = 100;
318327
}
319328

320329
message CheckTxnStatusResponse {
@@ -369,6 +378,9 @@ message CommitRequest {
369378
repeated bytes keys = 3;
370379
// Timestamp for the end of the transaction. Must be greater than `start_version`.
371380
uint64 commit_version = 4;
381+
382+
// Reserved for file based transaction.
383+
bool is_txn_file = 100;
372384
}
373385

374386
message CommitResponse {
@@ -437,6 +449,9 @@ message BatchRollbackRequest {
437449
uint64 start_version = 2;
438450
// The keys to rollback.
439451
repeated bytes keys = 3;
452+
453+
// Reserved for file based transaction.
454+
bool is_txn_file = 100;
440455
}
441456

442457
message BatchRollbackResponse {
@@ -480,6 +495,9 @@ message ResolveLockRequest {
480495
repeated TxnInfo txn_infos = 4;
481496
// Only resolve specified keys.
482497
repeated bytes keys = 5;
498+
499+
// Reserved for file based transaction.
500+
bool is_txn_file = 100;
483501
}
484502

485503
message ResolveLockResponse {
@@ -971,6 +989,9 @@ message LockInfo {
971989
// It's used in timeout errors. 0 means unknown or not applicable.
972990
// It can be used to help the client decide whether to try resolving the lock.
973991
uint64 duration_to_last_update_ms = 11;
992+
993+
// Reserved for file based transaction.
994+
bool is_txn_file = 100;
974995
}
975996

976997
message KeyError {
@@ -1320,6 +1341,9 @@ message MvccInfo {
13201341
message TxnInfo {
13211342
uint64 txn = 1;
13221343
uint64 status = 2;
1344+
1345+
// Reserved for file based transaction.
1346+
bool is_txn_file = 100;
13231347
}
13241348

13251349
enum Action {

src/common/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ pub enum Error {
112112
},
113113
#[error("Keyspace not found: {0}")]
114114
KeyspaceNotFound(String),
115+
#[error("Transaction not found error: {:?}", _0)]
116+
TxnNotFound(kvrpcpb::TxnNotFound),
115117
}
116118

117119
impl From<crate::proto::errorpb::Error> for Error {

src/generated/kvrpcpb.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ pub struct PrewriteRequest {
134134
pub for_update_ts_constraints: ::prost::alloc::vec::Vec<
135135
prewrite_request::ForUpdateTsConstraint,
136136
>,
137+
/// Reserved for file based transaction.
138+
#[prost(uint64, repeated, tag = "100")]
139+
pub txn_file_chunks: ::prost::alloc::vec::Vec<u64>,
137140
}
138141
/// Nested message and enum types in `PrewriteRequest`.
139142
pub mod prewrite_request {
@@ -363,6 +366,9 @@ pub struct TxnHeartBeatRequest {
363366
/// The new TTL the sender would like.
364367
#[prost(uint64, tag = "4")]
365368
pub advise_lock_ttl: u64,
369+
/// Reserved for file based transaction.
370+
#[prost(bool, tag = "100")]
371+
pub is_txn_file: bool,
366372
}
367373
#[allow(clippy::derive_partial_eq_without_eq)]
368374
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -422,6 +428,9 @@ pub struct CheckTxnStatusRequest {
422428
/// For new versions, this field should always be set to true.
423429
#[prost(bool, tag = "9")]
424430
pub verify_is_primary: bool,
431+
/// Reserved for file based transaction.
432+
#[prost(bool, tag = "100")]
433+
pub is_txn_file: bool,
425434
}
426435
#[allow(clippy::derive_partial_eq_without_eq)]
427436
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -497,6 +506,9 @@ pub struct CommitRequest {
497506
/// Timestamp for the end of the transaction. Must be greater than `start_version`.
498507
#[prost(uint64, tag = "4")]
499508
pub commit_version: u64,
509+
/// Reserved for file based transaction.
510+
#[prost(bool, tag = "100")]
511+
pub is_txn_file: bool,
500512
}
501513
#[allow(clippy::derive_partial_eq_without_eq)]
502514
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -597,6 +609,9 @@ pub struct BatchRollbackRequest {
597609
/// The keys to rollback.
598610
#[prost(bytes = "vec", repeated, tag = "3")]
599611
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
612+
/// Reserved for file based transaction.
613+
#[prost(bool, tag = "100")]
614+
pub is_txn_file: bool,
600615
}
601616
#[allow(clippy::derive_partial_eq_without_eq)]
602617
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -661,6 +676,9 @@ pub struct ResolveLockRequest {
661676
/// Only resolve specified keys.
662677
#[prost(bytes = "vec", repeated, tag = "5")]
663678
pub keys: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec<u8>>,
679+
/// Reserved for file based transaction.
680+
#[prost(bool, tag = "100")]
681+
pub is_txn_file: bool,
664682
}
665683
#[allow(clippy::derive_partial_eq_without_eq)]
666684
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -1308,6 +1326,9 @@ pub struct LockInfo {
13081326
/// It can be used to help the client decide whether to try resolving the lock.
13091327
#[prost(uint64, tag = "11")]
13101328
pub duration_to_last_update_ms: u64,
1329+
/// Reserved for file based transaction.
1330+
#[prost(bool, tag = "100")]
1331+
pub is_txn_file: bool,
13111332
}
13121333
#[allow(clippy::derive_partial_eq_without_eq)]
13131334
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -1781,6 +1802,9 @@ pub struct TxnInfo {
17811802
pub txn: u64,
17821803
#[prost(uint64, tag = "2")]
17831804
pub status: u64,
1805+
/// Reserved for file based transaction.
1806+
#[prost(bool, tag = "100")]
1807+
pub is_txn_file: bool,
17841808
}
17851809
#[allow(clippy::derive_partial_eq_without_eq)]
17861810
#[derive(Clone, PartialEq, ::prost::Message)]

src/raw/requests.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,6 @@ mod test {
659659

660660
use super::*;
661661
use crate::backoff::DEFAULT_REGION_BACKOFF;
662-
use crate::backoff::OPTIMISTIC_BACKOFF;
663662
use crate::mock::MockKvClient;
664663
use crate::mock::MockPdClient;
665664
use crate::proto::kvrpcpb;
@@ -700,7 +699,6 @@ mod test {
700699
..Default::default()
701700
};
702701
let plan = crate::request::PlanBuilder::new(client, keyspace, scan)
703-
.resolve_lock(OPTIMISTIC_BACKOFF, keyspace)
704702
.retry_multi_region(DEFAULT_REGION_BACKOFF)
705703
.merge(Collect)
706704
.plan();
@@ -756,7 +754,6 @@ mod test {
756754
new_raw_batch_put_request(pairs.clone(), ttls.clone(), Some(cf), false);
757755
let keyspace = Keyspace::Enable { keyspace_id: 0 };
758756
let plan = crate::request::PlanBuilder::new(client, keyspace, batch_put_request)
759-
.resolve_lock(OPTIMISTIC_BACKOFF, keyspace)
760757
.retry_multi_region(DEFAULT_REGION_BACKOFF)
761758
.plan();
762759
let _ = plan.execute().await;

src/request/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ mod test {
197197
)));
198198

199199
let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, request)
200-
.resolve_lock(Backoff::no_jitter_backoff(1, 1, 3), Keyspace::Disable)
201200
.retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
202201
.extract_error()
203202
.plan();
@@ -224,14 +223,12 @@ mod test {
224223
// does not extract error
225224
let plan =
226225
crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req.clone())
227-
.resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable)
228226
.retry_multi_region(OPTIMISTIC_BACKOFF)
229227
.plan();
230228
assert!(plan.execute().await.is_ok());
231229

232230
// extract error
233231
let plan = crate::request::PlanBuilder::new(pd_client.clone(), Keyspace::Disable, req)
234-
.resolve_lock(OPTIMISTIC_BACKOFF, Keyspace::Disable)
235232
.retry_multi_region(OPTIMISTIC_BACKOFF)
236233
.extract_error()
237234
.plan();

src/request/plan.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::pd::PdClient;
1717
use crate::proto::errorpb;
1818
use crate::proto::errorpb::EpochNotMatch;
1919
use crate::proto::kvrpcpb;
20+
use crate::proto::pdpb::Timestamp;
2021
use crate::region::StoreId;
2122
use crate::region::{RegionVerId, RegionWithLeader};
2223
use crate::request::shard::HasNextBatch;
@@ -597,6 +598,7 @@ pub struct DefaultProcessor;
597598

598599
pub struct ResolveLock<P: Plan, PdC: PdClient> {
599600
pub inner: P,
601+
pub timestamp: Timestamp,
600602
pub pd_client: Arc<PdC>,
601603
pub backoff: Backoff,
602604
pub keyspace: Keyspace,
@@ -606,6 +608,7 @@ impl<P: Plan, PdC: PdClient> Clone for ResolveLock<P, PdC> {
606608
fn clone(&self) -> Self {
607609
ResolveLock {
608610
inner: self.inner.clone(),
611+
timestamp: self.timestamp.clone(),
609612
pd_client: self.pd_client.clone(),
610613
backoff: self.backoff.clone(),
611614
keyspace: self.keyspace,
@@ -634,7 +637,13 @@ where
634637
}
635638

636639
let pd_client = self.pd_client.clone();
637-
let live_locks = resolve_locks(locks, pd_client.clone(), self.keyspace).await?;
640+
let live_locks = resolve_locks(
641+
locks,
642+
self.timestamp.clone(),
643+
pd_client.clone(),
644+
self.keyspace,
645+
)
646+
.await?;
638647
if live_locks.is_empty() {
639648
result = self.inner.execute().await?;
640649
} else {
@@ -953,6 +962,7 @@ mod test {
953962
let plan = RetryableMultiRegion {
954963
inner: ResolveLock {
955964
inner: ErrPlan,
965+
timestamp: Timestamp::default(),
956966
backoff: Backoff::no_backoff(),
957967
pd_client: Arc::new(MockPdClient::default()),
958968
keyspace: Keyspace::Disable,

src/request/plan_builder.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use crate::transaction::HasLocks;
3030
use crate::transaction::ResolveLocksContext;
3131
use crate::transaction::ResolveLocksOptions;
3232
use crate::Result;
33+
use crate::Timestamp;
3334

3435
/// Builder type for plans (see that module for more).
3536
pub struct PlanBuilder<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> {
@@ -72,6 +73,7 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
7273
/// If there is a lock error, then resolve the lock and retry the request.
7374
pub fn resolve_lock(
7475
self,
76+
timestamp: Timestamp,
7577
backoff: Backoff,
7678
keyspace: Keyspace,
7779
) -> PlanBuilder<PdC, ResolveLock<P, PdC>, Ph>
@@ -82,6 +84,7 @@ impl<PdC: PdClient, P: Plan, Ph: PlanBuilderPhase> PlanBuilder<PdC, P, Ph> {
8284
pd_client: self.pd_client.clone(),
8385
plan: ResolveLock {
8486
inner: self.plan,
87+
timestamp,
8588
backoff,
8689
pd_client: self.pd_client,
8790
keyspace,

src/store/errors.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ has_region_error!(kvrpcpb::PessimisticLockResponse);
4343
has_region_error!(kvrpcpb::ImportResponse);
4444
has_region_error!(kvrpcpb::BatchRollbackResponse);
4545
has_region_error!(kvrpcpb::PessimisticRollbackResponse);
46-
has_region_error!(kvrpcpb::CleanupResponse);
4746
has_region_error!(kvrpcpb::BatchGetResponse);
4847
has_region_error!(kvrpcpb::ScanLockResponse);
4948
has_region_error!(kvrpcpb::ResolveLockResponse);
@@ -79,7 +78,6 @@ macro_rules! has_key_error {
7978
has_key_error!(kvrpcpb::GetResponse);
8079
has_key_error!(kvrpcpb::CommitResponse);
8180
has_key_error!(kvrpcpb::BatchRollbackResponse);
82-
has_key_error!(kvrpcpb::CleanupResponse);
8381
has_key_error!(kvrpcpb::ScanLockResponse);
8482
has_key_error!(kvrpcpb::ResolveLockResponse);
8583
has_key_error!(kvrpcpb::GcResponse);

src/store/request.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ impl_request!(GetRequest, kv_get, "kv_get");
8989
impl_request!(ScanRequest, kv_scan, "kv_scan");
9090
impl_request!(PrewriteRequest, kv_prewrite, "kv_prewrite");
9191
impl_request!(CommitRequest, kv_commit, "kv_commit");
92-
impl_request!(CleanupRequest, kv_cleanup, "kv_cleanup");
9392
impl_request!(BatchGetRequest, kv_batch_get, "kv_batch_get");
9493
impl_request!(BatchRollbackRequest, kv_batch_rollback, "kv_batch_rollback");
9594
impl_request!(

0 commit comments

Comments
 (0)