Skip to content

Commit 56c9933

Browse files
authored
Merge pull request #233 from ziyi-yan/support-mutation-insert-checkne
Support mutation Insert and CheckNotExists
2 parents c59911a + 59919f9 commit 56c9933

File tree

4 files changed

+141
-1
lines changed

4 files changed

+141
-1
lines changed

src/transaction/buffer.rs

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ impl Buffer {
4444
self.mutations.lock().await.get_primary_key_or(key).clone()
4545
}
4646

47+
/// Get a value from the buffer.
48+
/// If the returned value is None, it means the key doesn't exist in buffer yet.
49+
pub async fn get(&self, key: &Key) -> Option<Value> {
50+
match self.get_from_mutations(key).await {
51+
MutationValue::Determined(value) => value,
52+
MutationValue::Undetermined => None,
53+
}
54+
}
55+
4756
/// Get a value from the buffer. If the value is not present, run `f` to get
4857
/// the value.
4958
pub async fn get_or_else<F, Fut>(&self, key: Key, f: F) -> Result<Option<Value>>
@@ -192,9 +201,30 @@ impl Buffer {
192201
.insert(key, BufferEntry::Put(value));
193202
}
194203

204+
/// Mark a value as Insert mutation into the buffer (does not write through).
205+
pub async fn insert(&self, key: Key, value: Value) {
206+
self.mutations
207+
.lock()
208+
.await
209+
.insert(key, BufferEntry::Insert(value));
210+
}
211+
195212
/// Mark a value as deleted.
196213
pub async fn delete(&self, key: Key) {
197-
self.mutations.lock().await.insert(key, BufferEntry::Del);
214+
let mut mutations = self.mutations.lock().await;
215+
let value = mutations
216+
.entry_map
217+
.entry(key.clone())
218+
.or_insert(BufferEntry::Del);
219+
220+
let new_value: BufferEntry;
221+
if let BufferEntry::Insert(_) = value {
222+
new_value = BufferEntry::CheckNotExist
223+
} else {
224+
new_value = BufferEntry::Del
225+
}
226+
227+
mutations.insert(key, new_value);
198228
}
199229

200230
/// Converts the buffered mutations to the proto buffer version
@@ -237,6 +267,12 @@ impl Buffer {
237267
Some(BufferEntry::Del) => {
238268
assert!(value.is_none());
239269
}
270+
Some(BufferEntry::Insert(v)) => {
271+
assert!(value.as_ref() == Some(v))
272+
}
273+
Some(BufferEntry::CheckNotExist) => {
274+
assert!(value.is_none());
275+
}
240276
}
241277
}
242278
}
@@ -247,6 +283,8 @@ impl Buffer {
247283
// Mutations:
248284
// - `Put`
249285
// - `Del`
286+
// - `Insert`
287+
// - `CheckNotExist`, a constraint to ensure the key doesn't exist. See https://github.com/pingcap/tidb/pull/14968.
250288
// Cache of read requests:
251289
// - `Cached`, generated by normal read requests
252290
// - `ReadLockCached`, generated by lock commands (`lock_keys`, `get_for_update`) and optionally read requests
@@ -274,6 +312,10 @@ enum BufferEntry {
274312
Put(Value),
275313
// Value has been deleted.
276314
Del,
315+
// Key should be check not exists before.
316+
Insert(Value),
317+
// Key should be check not exists before.
318+
CheckNotExist,
277319
}
278320

279321
impl BufferEntry {
@@ -287,6 +329,11 @@ impl BufferEntry {
287329
}
288330
BufferEntry::Del => pb.set_op(kvrpcpb::Op::Del),
289331
BufferEntry::Locked(_) => pb.set_op(kvrpcpb::Op::Lock),
332+
BufferEntry::Insert(v) => {
333+
pb.set_op(kvrpcpb::Op::Insert);
334+
pb.set_value(v.clone());
335+
}
336+
BufferEntry::CheckNotExist => pb.set_op(kvrpcpb::Op::CheckNotExists),
290337
};
291338
pb.set_key(key.clone().into());
292339
Some(pb)
@@ -299,6 +346,8 @@ impl BufferEntry {
299346
BufferEntry::Del => MutationValue::Determined(None),
300347
BufferEntry::Locked(None) => MutationValue::Undetermined,
301348
BufferEntry::Locked(Some(value)) => MutationValue::Determined(value.clone()),
349+
BufferEntry::Insert(value) => MutationValue::Determined(Some(value.clone())),
350+
BufferEntry::CheckNotExist => MutationValue::Determined(None),
302351
}
303352
}
304353
}
@@ -359,6 +408,38 @@ mod tests {
359408
);
360409
}
361410

411+
#[tokio::test]
412+
#[allow(unreachable_code)]
413+
async fn insert_and_get_from_buffer() {
414+
let buffer = Buffer::default();
415+
buffer
416+
.insert(b"key1".to_vec().into(), b"value1".to_vec())
417+
.await;
418+
buffer
419+
.insert(b"key2".to_vec().into(), b"value2".to_vec())
420+
.await;
421+
assert_eq!(
422+
block_on(buffer.get_or_else(b"key1".to_vec().into(), move |_| ready(panic!())))
423+
.unwrap()
424+
.unwrap(),
425+
b"value1".to_vec()
426+
);
427+
428+
buffer.delete(b"key2".to_vec().into()).await;
429+
buffer
430+
.insert(b"key1".to_vec().into(), b"value".to_vec())
431+
.await;
432+
assert_eq!(
433+
block_on(buffer.batch_get_or_else(
434+
vec![b"key2".to_vec().into(), b"key1".to_vec().into()].into_iter(),
435+
move |_| ready(Ok(vec![])),
436+
))
437+
.unwrap()
438+
.collect::<Vec<_>>(),
439+
vec![KvPair(Key::from(b"key1".to_vec()), b"value".to_vec()),]
440+
);
441+
}
442+
362443
#[test]
363444
#[allow(unreachable_code)]
364445
fn repeat_reads_are_cached() {

src/transaction/transaction.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,37 @@ impl Transaction {
362362
Ok(())
363363
}
364364

365+
/// Inserts the value associated with the given key.
366+
/// It has a constraint that key should not exist before.
367+
///
368+
/// # Examples
369+
/// ```rust,no_run
370+
/// # use tikv_client::{Key, Value, Config, TransactionClient};
371+
/// # use futures::prelude::*;
372+
/// # futures::executor::block_on(async {
373+
/// # let client = TransactionClient::new(vec!["192.168.0.100", "192.168.0.101"]).await.unwrap();
374+
/// let mut txn = client.begin_optimistic().await.unwrap();
375+
/// let key = "TiKV".to_owned();
376+
/// let val = "TiKV".to_owned();
377+
/// txn.insert(key, val);
378+
/// // Finish the transaction...
379+
/// txn.commit().await.unwrap();
380+
/// # });
381+
/// ```
382+
pub async fn insert(&mut self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
383+
self.check_allow_operation()?;
384+
let key = key.into();
385+
if self.buffer.get(&key).await.is_some() {
386+
return Err(Error::DuplicateKeyInsertion);
387+
}
388+
if self.is_pessimistic() {
389+
self.pessimistic_lock(iter::once(key.clone()), false)
390+
.await?;
391+
}
392+
self.buffer.insert(key, value.into()).await;
393+
Ok(())
394+
}
395+
365396
/// Deletes the given key.
366397
///
367398
/// Deleting a non-existent key will not result in an error.

tests/integration_tests.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,31 @@ async fn crud() -> Result<()> {
132132
Ok(())
133133
}
134134

135+
// Tests transactional insert and delete-your-writes cases
136+
#[tokio::test]
137+
#[serial]
138+
async fn insert_duplicate_keys() -> Result<()> {
139+
clear_tikv().await?;
140+
141+
let client = TransactionClient::new(pd_addrs()).await?;
142+
// Initialize TiKV store with {foo => bar}
143+
let mut txn = client.begin_optimistic().await?;
144+
txn.put("foo".to_owned(), "bar".to_owned()).await?;
145+
txn.commit().await?;
146+
// Try insert foo again
147+
let mut txn = client.begin_optimistic().await?;
148+
txn.insert("foo".to_owned(), "foo".to_owned()).await?;
149+
assert!(txn.commit().await.is_err());
150+
151+
// Delete-your-writes
152+
let mut txn = client.begin_optimistic().await?;
153+
txn.insert("foo".to_owned(), "foo".to_owned()).await?;
154+
txn.delete("foo".to_owned()).await?;
155+
assert!(txn.commit().await.is_err());
156+
157+
Ok(())
158+
}
159+
135160
#[tokio::test]
136161
#[serial]
137162
async fn pessimistic() -> Result<()> {

tikv-client-common/src/errors.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ pub enum Error {
1010
/// Feature is not implemented.
1111
#[error("Unimplemented feature")]
1212
Unimplemented,
13+
/// Duplicate key insertion happens.
14+
#[error("Duplicate key insertion")]
15+
DuplicateKeyInsertion,
1316
/// Failed to resolve a lock
1417
#[error("Failed to resolve lock")]
1518
ResolveLockError,

0 commit comments

Comments
 (0)