Skip to content

Commit e805726

Browse files
committed
Final updates to ensure better atomicity
Signed-off-by: Darwin Boersma <[email protected]>
1 parent 2400dc0 commit e805726

File tree

1 file changed

+49
-27
lines changed

1 file changed

+49
-27
lines changed

crates/key-value-aws/src/store.rs

Lines changed: 49 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use aws_config::{BehaviorVersion, Region, SdkConfig};
88
use aws_credential_types::Credentials;
99
use aws_sdk_dynamodb::{
1010
config::{ProvideCredentials, SharedCredentialsProvider},
11-
operation::{
12-
batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput,
13-
get_item::GetItemOutput,
14-
},
11+
operation::{batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput},
1512
primitives::Blob,
16-
types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest},
13+
types::{
14+
AttributeValue, DeleteRequest, Get, KeysAndAttributes, PutRequest, TransactGetItem,
15+
TransactWriteItem, Update, WriteRequest,
16+
},
1717
Client,
1818
};
1919
use spin_core::async_trait;
@@ -353,21 +353,34 @@ impl Store for AwsDynamoStore {
353353
#[async_trait]
354354
impl Cas for CompareAndSwap {
355355
async fn current(&self) -> Result<Option<Vec<u8>>, Error> {
356-
let GetItemOutput {
357-
item: Some(mut current_item),
358-
..
359-
} = self
356+
// TransactGetItems fails if concurrent writes are in progress on an item
357+
let output = self
360358
.client
361-
.get_item()
362-
.table_name(self.table.as_str())
363-
.key(
364-
PK,
365-
aws_sdk_dynamodb::types::AttributeValue::S(self.key.clone()),
359+
.transact_get_items()
360+
.transact_items(
361+
TransactGetItem::builder()
362+
.get(
363+
Get::builder()
364+
.table_name(self.table.as_str())
365+
.key(
366+
PK,
367+
aws_sdk_dynamodb::types::AttributeValue::S(self.key.clone()),
368+
)
369+
.build()
370+
.map_err(log_error)?,
371+
)
372+
.build(),
366373
)
367374
.send()
368375
.await
369-
.map_err(log_error)?
370-
else {
376+
.map_err(log_error)?;
377+
378+
let item = output
379+
.responses
380+
.and_then(|responses| responses.into_iter().next())
381+
.and_then(|response| response.item);
382+
383+
let Some(mut current_item) = item else {
371384
return Ok(None);
372385
};
373386

@@ -384,38 +397,47 @@ impl Cas for CompareAndSwap {
384397
}
385398
}
386399

387-
/// `swap` updates the value for the key using the etag saved in the `current` function for
400+
/// `swap` updates the value for the key using the version saved in the `current` function for
388401
/// optimistic concurrency.
389402
async fn swap(&self, value: Vec<u8>) -> Result<(), SwapError> {
390-
let mut update_item = self
391-
.client
392-
.update_item()
403+
let mut update_item = Update::builder()
393404
.table_name(self.table.as_str())
394405
.key(PK, AttributeValue::S(self.key.clone()))
395406
.expression_attribute_names("#val", VAL)
396407
.expression_attribute_values(":val", AttributeValue::B(Blob::new(value)))
397408
.expression_attribute_names("#ver", VER)
398409
.expression_attribute_values(":increment", AttributeValue::N("1".to_owned()))
399-
.return_values(aws_sdk_dynamodb::types::ReturnValue::UpdatedNew);
410+
.return_values_on_condition_check_failure(
411+
aws_sdk_dynamodb::types::ReturnValuesOnConditionCheckFailure::None,
412+
);
400413

401414
let current_version = self.version.lock().unwrap().clone();
402415
match current_version {
403-
// Existing item with version key, update under condition that version in DynamoDB matches stored version (optimistic lock)
416+
// Existing item with version, update under condition that version in DynamoDB matches cached version
404417
Some(version) => {
405418
update_item = update_item
406419
.update_expression("SET #val=:val ADD #ver :increment")
407420
.condition_expression("#ver = :ver")
408421
.expression_attribute_values(":ver", AttributeValue::N(version));
409422
}
410-
// Assume new/unversioned item, upsert under condition that item does not already have a version -- if it does, another atomic operation has already started
423+
// New/unversioned item, upsert atomically but without optimistic locking guarantee
411424
None => {
412-
update_item = update_item
413-
.condition_expression("attribute_not_exists(#ver)")
414-
.update_expression("SET #val=:val, #ver=:increment");
425+
update_item = update_item.update_expression("SET #val=:val, #ver=:increment");
415426
}
416427
};
417428

418-
update_item
429+
// TransactWriteItems fails if concurrent writes are in progress on an item.
430+
self.client
431+
.transact_write_items()
432+
.transact_items(
433+
TransactWriteItem::builder()
434+
.update(
435+
update_item
436+
.build()
437+
.map_err(|e| SwapError::Other(format!("{e:?}")))?,
438+
)
439+
.build(),
440+
)
419441
.send()
420442
.await
421443
.map_err(|e| SwapError::CasFailed(format!("{e:?}")))?;

0 commit comments

Comments
 (0)