Skip to content

Commit de4f334

Browse files
committed
Removed arcs, adjusted atomic functions to use updateItem, minimized returned values for getItem calls
Signed-off-by: Darwin Boersma <[email protected]>
1 parent e805726 commit de4f334

File tree

2 files changed

+79
-99
lines changed

2 files changed

+79
-99
lines changed

crates/key-value-aws/src/store.rs

Lines changed: 78 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,24 @@ use aws_config::{BehaviorVersion, Region, SdkConfig};
88
use aws_credential_types::Credentials;
99
use aws_sdk_dynamodb::{
1010
config::{ProvideCredentials, SharedCredentialsProvider},
11-
operation::{batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput},
11+
operation::{
12+
batch_get_item::BatchGetItemOutput, batch_write_item::BatchWriteItemOutput,
13+
get_item::GetItemOutput, update_item::UpdateItemOutput,
14+
},
1215
primitives::Blob,
1316
types::{
14-
AttributeValue, DeleteRequest, Get, KeysAndAttributes, PutRequest, TransactGetItem,
15-
TransactWriteItem, Update, WriteRequest,
17+
AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, TransactWriteItem, Update,
18+
WriteRequest,
1619
},
1720
Client,
1821
};
1922
use spin_core::async_trait;
2023
use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError};
2124

2225
pub struct KeyValueAwsDynamo {
26+
region: String,
27+
// Needs to be cloned when getting a store
2328
table: Arc<String>,
24-
region: Arc<String>,
2529
client: async_once_cell::Lazy<
2630
Client,
2731
std::pin::Pin<Box<dyn std::future::Future<Output = Client> + Send>>,
@@ -99,18 +103,17 @@ impl KeyValueAwsDynamo {
99103
});
100104

101105
Ok(Self {
106+
region,
102107
table: Arc::new(table),
103-
region: Arc::new(region),
104108
client: async_once_cell::Lazy::from_future(client_fut),
105109
})
106110
}
107111
}
108112

109113
#[async_trait]
110114
impl StoreManager for KeyValueAwsDynamo {
111-
async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error> {
115+
async fn get(&self, _name: &str) -> Result<Arc<dyn Store>, Error> {
112116
Ok(Arc::new(AwsDynamoStore {
113-
_name: name.to_owned(),
114117
client: self.client.get_unpin().await.clone(),
115118
table: self.table.clone(),
116119
}))
@@ -122,14 +125,14 @@ impl StoreManager for KeyValueAwsDynamo {
122125

123126
fn summary(&self, _store_name: &str) -> Option<String> {
124127
Some(format!(
125-
"AWS DynamoDB region: {:?}, table: {}",
128+
"AWS DynamoDB region: {}, table: {}",
126129
self.region, self.table
127130
))
128131
}
129132
}
130133

131134
struct AwsDynamoStore {
132-
_name: String,
135+
// Client wraps an Arc so should be low cost to clone
133136
client: Client,
134137
table: Arc<String>,
135138
}
@@ -139,20 +142,40 @@ struct CompareAndSwap {
139142
client: Client,
140143
table: Arc<String>,
141144
bucket_rep: u32,
142-
version: Mutex<Option<String>>,
145+
has_lock: Mutex<bool>,
143146
}
144147

145148
/// Primary key in DynamoDB items used for querying items
146149
const PK: &str = "PK";
147150
/// Value key in DynamoDB items storing item value as binary
148151
const VAL: &str = "val";
149-
/// Version key in DynamoDB items used for optimistic locking
150-
const VER: &str = "ver";
152+
/// Lock key in DynamoDB items used for atomic operations
153+
const LOCK: &str = "lock";
151154

152155
#[async_trait]
153156
impl Store for AwsDynamoStore {
154157
async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
155-
let item = self.get_item(key).await?;
158+
let response = self
159+
.client
160+
.get_item()
161+
.table_name(self.table.as_str())
162+
.key(
163+
PK,
164+
aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()),
165+
)
166+
.projection_expression(VAL)
167+
.send()
168+
.await
169+
.map_err(log_error)?;
170+
171+
let item = response.item.and_then(|mut item| {
172+
if let Some(AttributeValue::B(val)) = item.remove(VAL) {
173+
Some(val.into_inner())
174+
} else {
175+
None
176+
}
177+
});
178+
156179
Ok(item)
157180
}
158181

@@ -182,7 +205,20 @@ impl Store for AwsDynamoStore {
182205
}
183206

184207
async fn exists(&self, key: &str) -> Result<bool, Error> {
185-
Ok(self.get_item(key).await?.is_some())
208+
let GetItemOutput { item, .. } = self
209+
.client
210+
.get_item()
211+
.table_name(self.table.as_str())
212+
.key(
213+
PK,
214+
aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()),
215+
)
216+
.projection_expression(PK)
217+
.send()
218+
.await
219+
.map_err(log_error)?;
220+
221+
Ok(item.map(|item| item.contains_key(PK)).unwrap_or(false))
186222
}
187223

188224
async fn get_keys(&self) -> Result<Vec<String>, Error> {
@@ -192,7 +228,8 @@ impl Store for AwsDynamoStore {
192228
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error> {
193229
let mut results = Vec::with_capacity(keys.len());
194230

195-
let mut keys_and_attributes_builder = KeysAndAttributes::builder();
231+
let mut keys_and_attributes_builder =
232+
KeysAndAttributes::builder().projection_expression(format!("{PK},{VAL}"));
196233
for key in keys {
197234
keys_and_attributes_builder = keys_and_attributes_builder.keys(HashMap::from_iter([(
198235
PK.to_owned(),
@@ -344,7 +381,7 @@ impl Store for AwsDynamoStore {
344381
key: key.to_string(),
345382
client: self.client.clone(),
346383
table: self.table.clone(),
347-
version: Mutex::new(None),
384+
has_lock: Mutex::new(false),
348385
bucket_rep,
349386
}))
350387
}
@@ -353,47 +390,28 @@ impl Store for AwsDynamoStore {
353390
#[async_trait]
354391
impl Cas for CompareAndSwap {
355392
async fn current(&self) -> Result<Option<Vec<u8>>, Error> {
356-
// TransactGetItems fails if concurrent writes are in progress on an item
357-
let output = self
393+
let UpdateItemOutput { attributes, .. } = self
358394
.client
359-
.transact_get_items()
360-
.transact_items(
361-
TransactGetItem::builder()
362-
.get(
363-
Get::builder()
364-
.table_name(self.table.as_str())
365-
.key(
366-
PK,
367-
aws_sdk_dynamodb::types::AttributeValue::S(self.key.clone()),
368-
)
369-
.build()
370-
.map_err(log_error)?,
371-
)
372-
.build(),
373-
)
395+
.update_item()
396+
.table_name(self.table.as_str())
397+
.key(PK, AttributeValue::S(self.key.clone()))
398+
.update_expression("SET #lock=:lock")
399+
.expression_attribute_names("#lock", LOCK)
400+
.expression_attribute_values(":lock", AttributeValue::Null(true))
401+
.condition_expression("attribute_not_exists (#lock)")
402+
.return_values(aws_sdk_dynamodb::types::ReturnValue::AllNew)
374403
.send()
375404
.await
376405
.map_err(log_error)?;
377406

378-
let item = output
379-
.responses
380-
.and_then(|responses| responses.into_iter().next())
381-
.and_then(|response| response.item);
382-
383-
let Some(mut current_item) = item else {
384-
return Ok(None);
385-
};
386-
387-
if let Some(AttributeValue::B(val)) = current_item.remove(VAL) {
388-
let version = match current_item.remove(VER) {
389-
Some(AttributeValue::N(ver)) => Some(ver),
390-
_ => None,
391-
};
407+
self.has_lock.lock().unwrap().clone_from(&true);
392408

393-
self.version.lock().unwrap().clone_from(&version);
394-
Ok(Some(val.into_inner()))
395-
} else {
396-
Ok(None)
409+
match attributes {
410+
Some(mut item) => match item.remove(VAL) {
411+
Some(AttributeValue::B(val)) => Ok(Some(val.into_inner())),
412+
_ => Ok(None),
413+
},
414+
None => Ok(None),
397415
}
398416
}
399417

@@ -403,30 +421,18 @@ impl Cas for CompareAndSwap {
403421
let mut update_item = Update::builder()
404422
.table_name(self.table.as_str())
405423
.key(PK, AttributeValue::S(self.key.clone()))
424+
.update_expression("SET #val=:val REMOVE #lock")
406425
.expression_attribute_names("#val", VAL)
407426
.expression_attribute_values(":val", AttributeValue::B(Blob::new(value)))
408-
.expression_attribute_names("#ver", VER)
409-
.expression_attribute_values(":increment", AttributeValue::N("1".to_owned()))
410-
.return_values_on_condition_check_failure(
411-
aws_sdk_dynamodb::types::ReturnValuesOnConditionCheckFailure::None,
412-
);
413-
414-
let current_version = self.version.lock().unwrap().clone();
415-
match current_version {
416-
// Existing item with version, update under condition that version in DynamoDB matches cached version
417-
Some(version) => {
418-
update_item = update_item
419-
.update_expression("SET #val=:val ADD #ver :increment")
420-
.condition_expression("#ver = :ver")
421-
.expression_attribute_values(":ver", AttributeValue::N(version));
422-
}
423-
// New/unversioned item, upsert atomically but without optimistic locking guarantee
424-
None => {
425-
update_item = update_item.update_expression("SET #val=:val, #ver=:increment");
426-
}
427-
};
427+
.expression_attribute_names("#lock", LOCK);
428+
429+
let has_lock = *self.has_lock.lock().unwrap();
430+
// Ensure exclusive access between fetching the current value of the item and swapping
431+
if has_lock {
432+
update_item = update_item.condition_expression("attribute_exists (#lock)");
433+
}
428434

429-
// TransactWriteItems fails if concurrent writes are in progress on an item.
435+
// TransactWriteItems fails if concurrent writes are in progress on an item, so even without locking, we get atomicity in overwriting
430436
self.client
431437
.transact_write_items()
432438
.transact_items(
@@ -455,30 +461,6 @@ impl Cas for CompareAndSwap {
455461
}
456462

457463
impl AwsDynamoStore {
458-
async fn get_item(&self, key: &str) -> Result<Option<Vec<u8>>, Error> {
459-
let response = self
460-
.client
461-
.get_item()
462-
.table_name(self.table.as_str())
463-
.key(
464-
PK,
465-
aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()),
466-
)
467-
.send()
468-
.await
469-
.map_err(log_error)?;
470-
471-
let val = response.item.and_then(|mut item| {
472-
if let Some(AttributeValue::B(val)) = item.remove(VAL) {
473-
Some(val.into_inner())
474-
} else {
475-
None
476-
}
477-
});
478-
479-
Ok(val)
480-
}
481-
482464
async fn get_keys(&self) -> Result<Vec<String>, Error> {
483465
let mut primary_keys = Vec::new();
484466
let mut last_evaluated_key = None;

crates/key-value-azure/src/store.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,8 @@ impl KeyValueAzureCosmos {
9292

9393
#[async_trait]
9494
impl StoreManager for KeyValueAzureCosmos {
95-
async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error> {
95+
async fn get(&self, _name: &str) -> Result<Arc<dyn Store>, Error> {
9696
Ok(Arc::new(AzureCosmosStore {
97-
_name: name.to_owned(),
9897
client: self.client.clone(),
9998
}))
10099
}
@@ -114,7 +113,6 @@ impl StoreManager for KeyValueAzureCosmos {
114113

115114
#[derive(Clone)]
116115
struct AzureCosmosStore {
117-
_name: String,
118116
client: CollectionClient,
119117
}
120118

0 commit comments

Comments
 (0)