Skip to content

Commit 1824288

Browse files
authored
Merge branch 'master' into unify-locking
2 parents ef5f170 + 87d5cbb commit 1824288

File tree

8 files changed

+89
-38
lines changed

8 files changed

+89
-38
lines changed

mock-tikv/src/server.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,4 +513,13 @@ impl Tikv for MockTikv {
513513
) {
514514
todo!()
515515
}
516+
517+
fn coprocessor_v2(
518+
&mut self,
519+
_: grpcio::RpcContext,
520+
_: tikv_client_proto::coprocessor_v2::RawCoprocessorRequest,
521+
_: grpcio::UnarySink<tikv_client_proto::coprocessor_v2::RawCoprocessorResponse>,
522+
) {
523+
todo!()
524+
}
516525
}

rust-toolchain

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/raw/client.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use tikv_client_common::Error;
44

55
use crate::{
6-
backoff::{DEFAULT_REGION_BACKOFF, OPTIMISTIC_BACKOFF},
6+
backoff::DEFAULT_REGION_BACKOFF,
77
config::Config,
88
pd::PdRpcClient,
99
raw::lowering::*,
@@ -115,7 +115,6 @@ impl Client {
115115
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
116116
.single_region()
117117
.await?
118-
.resolve_lock(OPTIMISTIC_BACKOFF)
119118
.retry_region(DEFAULT_REGION_BACKOFF)
120119
.post_process_default()
121120
.plan();
@@ -146,7 +145,6 @@ impl Client {
146145
) -> Result<Vec<KvPair>> {
147146
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
148147
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
149-
.resolve_lock(OPTIMISTIC_BACKOFF)
150148
.multi_region()
151149
.retry_region(DEFAULT_REGION_BACKOFF)
152150
.merge(Collect)
@@ -177,7 +175,6 @@ impl Client {
177175
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
178176
.single_region()
179177
.await?
180-
.resolve_lock(OPTIMISTIC_BACKOFF)
181178
.retry_region(DEFAULT_REGION_BACKOFF)
182179
.extract_error()
183180
.plan();
@@ -208,7 +205,6 @@ impl Client {
208205
) -> Result<()> {
209206
let request = new_raw_batch_put_request(pairs.into_iter().map(Into::into), self.cf.clone());
210207
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
211-
.resolve_lock(OPTIMISTIC_BACKOFF)
212208
.multi_region()
213209
.retry_region(DEFAULT_REGION_BACKOFF)
214210
.extract_error()
@@ -239,7 +235,6 @@ impl Client {
239235
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
240236
.single_region()
241237
.await?
242-
.resolve_lock(OPTIMISTIC_BACKOFF)
243238
.retry_region(DEFAULT_REGION_BACKOFF)
244239
.extract_error()
245240
.plan();
@@ -268,7 +263,6 @@ impl Client {
268263
let request =
269264
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
270265
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
271-
.resolve_lock(OPTIMISTIC_BACKOFF)
272266
.multi_region()
273267
.retry_region(DEFAULT_REGION_BACKOFF)
274268
.extract_error()
@@ -295,7 +289,6 @@ impl Client {
295289
pub async fn delete_range(&self, range: impl Into<BoundRange>) -> Result<()> {
296290
let request = new_raw_delete_range_request(range.into(), self.cf.clone());
297291
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
298-
.resolve_lock(OPTIMISTIC_BACKOFF)
299292
.multi_region()
300293
.retry_region(DEFAULT_REGION_BACKOFF)
301294
.extract_error()
@@ -437,7 +430,6 @@ impl Client {
437430

438431
let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
439432
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
440-
.resolve_lock(OPTIMISTIC_BACKOFF)
441433
.multi_region()
442434
.retry_region(DEFAULT_REGION_BACKOFF)
443435
.merge(Collect)
@@ -469,7 +461,6 @@ impl Client {
469461
self.cf.clone(),
470462
);
471463
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
472-
.resolve_lock(OPTIMISTIC_BACKOFF)
473464
.multi_region()
474465
.retry_region(DEFAULT_REGION_BACKOFF)
475466
.merge(Collect)

src/transaction/buffer.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,38 @@ struct InnerBuffer {
2121
impl InnerBuffer {
2222
fn insert(&mut self, key: impl Into<Key>, entry: BufferEntry) {
2323
let key = key.into();
24-
if !matches!(entry, BufferEntry::Cached(_)) {
24+
if !matches!(entry, BufferEntry::Cached(_) | BufferEntry::CheckNotExist) {
2525
self.primary_key.get_or_insert_with(|| key.clone());
2626
}
2727
self.entry_map.insert(key, entry);
2828
}
2929

30-
pub fn get_primary_key_or(&mut self, key: &Key) -> &Key {
31-
self.primary_key.get_or_insert(key.clone())
30+
/// Set the primary key if it is not set
31+
pub fn primary_key_or(&mut self, key: &Key) {
32+
self.primary_key.get_or_insert(key.clone());
3233
}
3334
}
3435

3536
/// A caching layer which buffers reads and writes in a transaction.
3637
pub struct Buffer {
37-
mutations: Mutex<InnerBuffer>,
38+
inner: Mutex<InnerBuffer>,
3839
}
3940

4041
impl Buffer {
4142
pub fn new(is_pessimistic: bool) -> Buffer {
4243
Buffer {
43-
mutations: Mutex::new(InnerBuffer::new(is_pessimistic)),
44+
inner: Mutex::new(InnerBuffer::new(is_pessimistic)),
4445
}
4546
}
4647

4748
/// Get the primary key of the buffer.
4849
pub async fn get_primary_key(&self) -> Option<Key> {
49-
self.mutations.lock().await.primary_key.clone()
50+
self.inner.lock().await.primary_key.clone()
5051
}
5152

52-
/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
53-
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
54-
self.mutations.lock().await.get_primary_key_or(key).clone()
53+
/// Set the primary key if it is not set
54+
pub async fn primary_key_or(&self, key: &Key) {
55+
self.inner.lock().await.primary_key_or(key);
5556
}
5657

5758
/// Get a value from the buffer.
@@ -74,7 +75,7 @@ impl Buffer {
7475
MutationValue::Determined(value) => Ok(value),
7576
MutationValue::Undetermined => {
7677
let value = f(key.clone()).await?;
77-
let mut mutations = self.mutations.lock().await;
78+
let mut mutations = self.inner.lock().await;
7879
Self::update_cache(&mut mutations, key, value.clone());
7980
Ok(value)
8081
}
@@ -95,7 +96,7 @@ impl Buffer {
9596
Fut: Future<Output = Result<Vec<KvPair>>>,
9697
{
9798
let (cached_results, undetermined_keys) = {
98-
let mutations = self.mutations.lock().await;
99+
let mutations = self.inner.lock().await;
99100
// Partition the keys into those we have buffered and those we have to
100101
// get from the store.
101102
let (undetermined_keys, cached_results): (
@@ -121,7 +122,7 @@ impl Buffer {
121122
};
122123

123124
let fetched_results = f(Box::new(undetermined_keys)).await?;
124-
let mut mutations = self.mutations.lock().await;
125+
let mut mutations = self.inner.lock().await;
125126
for kvpair in &fetched_results {
126127
let key = kvpair.0.clone();
127128
let value = Some(kvpair.1.clone());
@@ -144,7 +145,7 @@ impl Buffer {
144145
Fut: Future<Output = Result<Vec<KvPair>>>,
145146
{
146147
// read from local buffer
147-
let mut mutations = self.mutations.lock().await;
148+
let mut mutations = self.inner.lock().await;
148149
let mutation_range = mutations.entry_map.range(range.clone());
149150

150151
// fetch from TiKV
@@ -190,8 +191,8 @@ impl Buffer {
190191

191192
/// Lock the given key if necessary.
192193
pub async fn lock(&self, key: Key) {
193-
let mutations = &mut self.mutations.lock().await;
194-
mutations.primary_key.get_or_insert(key.clone());
194+
let mutations = &mut self.inner.lock().await;
195+
mutations.primary_key.get_or_insert_with(|| key.clone());
195196
let value = mutations
196197
.entry_map
197198
.entry(key)
@@ -205,15 +206,12 @@ impl Buffer {
205206

206207
/// Insert a value into the buffer (does not write through).
207208
pub async fn put(&self, key: Key, value: Value) {
208-
self.mutations
209-
.lock()
210-
.await
211-
.insert(key, BufferEntry::Put(value));
209+
self.inner.lock().await.insert(key, BufferEntry::Put(value));
212210
}
213211

214212
/// Mark a value as Insert mutation into the buffer (does not write through).
215213
pub async fn insert(&self, key: Key, value: Value) {
216-
let mut mutations = self.mutations.lock().await;
214+
let mut mutations = self.inner.lock().await;
217215
let mut entry = mutations.entry_map.entry(key.clone());
218216
match entry {
219217
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
@@ -225,7 +223,7 @@ impl Buffer {
225223

226224
/// Mark a value as deleted.
227225
pub async fn delete(&self, key: Key) {
228-
let mut mutations = self.mutations.lock().await;
226+
let mut mutations = self.inner.lock().await;
229227
let is_pessimistic = mutations.is_pessimistic;
230228
let mut entry = mutations.entry_map.entry(key.clone());
231229

@@ -241,7 +239,7 @@ impl Buffer {
241239

242240
/// Converts the buffered mutations to the proto buffer version
243241
pub async fn to_proto_mutations(&self) -> Vec<kvrpcpb::Mutation> {
244-
self.mutations
242+
self.inner
245243
.lock()
246244
.await
247245
.entry_map
@@ -251,7 +249,7 @@ impl Buffer {
251249
}
252250

253251
async fn get_from_mutations(&self, key: &Key) -> MutationValue {
254-
self.mutations
252+
self.inner
255253
.lock()
256254
.await
257255
.entry_map

src/transaction/transaction.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,9 @@ impl<PdC: PdClient> Transaction<PdC> {
549549
let status = self.status.read().await;
550550
if !matches!(
551551
*status,
552-
TransactionStatus::StartedRollback | TransactionStatus::Active
552+
TransactionStatus::StartedRollback
553+
| TransactionStatus::Active
554+
| TransactionStatus::StartedCommit
553555
) {
554556
return Err(Error::OperationAfterCommitError);
555557
}
@@ -655,7 +657,13 @@ impl<PdC: PdClient> Transaction<PdC> {
655657
}
656658

657659
let first_key = keys[0].clone().key();
658-
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
660+
// we do not set the primary key here, because pessimistic lock request
661+
// can fail, in which case the keys may not be part of the transaction.
662+
let primary_lock = self
663+
.buffer
664+
.get_primary_key()
665+
.await
666+
.unwrap_or_else(|| first_key.clone());
659667
let for_update_ts = self.rpc.clone().get_timestamp().await?;
660668
self.options.push_for_update_ts(for_update_ts.clone());
661669
let request = new_pessimistic_lock_request(
@@ -675,6 +683,9 @@ impl<PdC: PdClient> Transaction<PdC> {
675683
.plan();
676684
let pairs = plan.execute().await;
677685

686+
// primary key will be set here if needed
687+
self.buffer.primary_key_or(&first_key).await;
688+
678689
self.start_auto_heartbeat().await;
679690

680691
for key in keys {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
syntax = "proto3";
2+
package coprocessor_v2;
3+
4+
import "errorpb.proto";
5+
import "kvrpcpb.proto";
6+
import "gogoproto/gogo.proto";
7+
import "rustproto.proto";
8+
9+
option (gogoproto.marshaler_all) = true;
10+
option (gogoproto.sizer_all) = true;
11+
option (gogoproto.unmarshaler_all) = true;
12+
option (rustproto.lite_runtime_all) = true;
13+
14+
option java_package = "org.tikv.kvproto";
15+
16+
message RawCoprocessorRequest {
17+
kvrpcpb.Context context = 1;
18+
19+
string copr_name = 2;
20+
string copr_version_constraint = 3;
21+
22+
bytes data = 4;
23+
}
24+
25+
message RawCoprocessorResponse {
26+
bytes data = 1;
27+
28+
errorpb.Error region_error = 2;
29+
30+
// Error message for cases like if no coprocessor with a matching name is found
31+
// or on a version mismatch between plugin_api and the coprocessor.
32+
string other_error = 4;
33+
}

tikv-client-proto/proto/tikvpb.proto

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ syntax = "proto3";
22
package tikvpb;
33

44
import "coprocessor.proto";
5+
import "coprocessor_v2.proto";
56
import "kvrpcpb.proto";
67
import "mpp.proto";
78
import "raft_serverpb.proto";
@@ -69,6 +70,9 @@ service Tikv {
6970
rpc CoprocessorStream(coprocessor.Request) returns (stream coprocessor.Response) {}
7071
rpc BatchCoprocessor(coprocessor.BatchRequest) returns (stream coprocessor.BatchResponse) {}
7172

73+
// Command for executing custom user requests in TiKV coprocessor_v2.
74+
rpc CoprocessorV2(coprocessor_v2.RawCoprocessorRequest) returns (coprocessor_v2.RawCoprocessorResponse) {}
75+
7276
// Raft commands (sent between TiKV nodes).
7377
rpc Raft(stream raft_serverpb.RaftMessage) returns (raft_serverpb.Done) {}
7478
rpc BatchRaft(stream BatchRaftMessage) returns (raft_serverpb.Done) {}
@@ -92,7 +96,7 @@ service Tikv {
9296
rpc EstablishMPPConnection(mpp.EstablishMPPConnectionRequest) returns (stream mpp.MPPDataPacket) {}
9397

9498
/// CheckLeader sends all information (includes region term and epoch) to other stores.
95-
/// Once a store receives a request, it checks term and epoch for each region, and sends the regions whose
99+
/// Once a store receives a request, it checks term and epoch for each region, and sends the regions whose
96100
/// term and epoch match with local information in the store.
97101
/// After the client collected all responses from all stores, it checks if got a quorum of responses from
98102
/// other stores for every region, and decides to advance resolved ts from these regions.
@@ -144,6 +148,8 @@ message BatchCommandsRequest {
144148

145149
kvrpcpb.CheckSecondaryLocksRequest CheckSecondaryLocks = 33;
146150

151+
coprocessor_v2.RawCoprocessorRequest CoprocessorV2 = 34;
152+
147153
// For some test cases.
148154
BatchCommandsEmptyRequest Empty = 255;
149155
}
@@ -197,6 +203,8 @@ message BatchCommandsResponse {
197203

198204
kvrpcpb.CheckSecondaryLocksResponse CheckSecondaryLocks = 33;
199205

206+
coprocessor_v2.RawCoprocessorResponse CoprocessorV2 = 34;
207+
200208
// For some test cases.
201209
BatchCommandsEmptyResponse Empty = 255;
202210
}

tikv-client-proto/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use protos::*;
4-
pub use protos::{coprocessor, errorpb, kvrpcpb, metapb, mpp, pdpb, raft_serverpb, tikvpb};
4+
pub use protos::{
5+
coprocessor, coprocessor_v2, errorpb, kvrpcpb, metapb, mpp, pdpb, raft_serverpb, tikvpb,
6+
};
57

68
#[allow(dead_code)]
79
#[allow(clippy::all)]

0 commit comments

Comments
 (0)