Skip to content

Commit c3d587b

Browse files
committed
CAS implementation
Signed-off-by: Darwin Boersma <[email protected]>
1 parent 2460378 commit c3d587b

File tree

1 file changed

+100
-36
lines changed

1 file changed

+100
-36
lines changed

crates/key-value-aws/src/store.rs

Lines changed: 100 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use aws_config::{BehaviorVersion, Region, SdkConfig};
88
use aws_credential_types::Credentials;
99
use aws_sdk_dynamodb::{
1010
config::{ProvideCredentials, SharedCredentialsProvider},
11-
operation::{batch_get_item::BatchGetItemOutput, update_item::UpdateItemOutput},
11+
operation::{batch_get_item::BatchGetItemOutput, get_item::GetItemOutput},
1212
primitives::Blob,
1313
types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest},
1414
Client,
@@ -17,8 +17,8 @@ use spin_core::async_trait;
1717
use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError};
1818

1919
pub struct KeyValueAwsDynamo {
20-
table: String,
21-
region: String,
20+
table: Arc<String>,
21+
region: Arc<String>,
2222
client: async_once_cell::Lazy<
2323
Client,
2424
std::pin::Pin<Box<dyn std::future::Future<Output = Client> + Send>>,
@@ -96,8 +96,8 @@ impl KeyValueAwsDynamo {
9696
});
9797

9898
Ok(Self {
99-
table,
100-
region,
99+
table: Arc::new(table),
100+
region: Arc::new(region),
101101
client: async_once_cell::Lazy::from_future(client_fut),
102102
})
103103
}
@@ -128,18 +128,23 @@ impl StoreManager for KeyValueAwsDynamo {
128128
struct AwsDynamoStore {
129129
_name: String,
130130
client: Client,
131-
table: String,
131+
table: Arc<String>,
132132
}
133133

134134
struct CompareAndSwap {
135135
key: String,
136136
client: Client,
137+
table: Arc<String>,
137138
bucket_rep: u32,
138139
etag: Mutex<Option<String>>,
139140
}
140141

142+
/// Primary key in DynamoDB items used for querying items
141143
const PK: &str = "PK";
144+
/// Value key in DynamoDB items storing item value as binary
142145
const VAL: &str = "val";
146+
/// Version key in DynamoDB items used for optimistic locking
147+
const VER: &str = "ver";
143148

144149
#[async_trait]
145150
impl Store for AwsDynamoStore {
@@ -151,7 +156,7 @@ impl Store for AwsDynamoStore {
151156
async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error> {
152157
self.client
153158
.put_item()
154-
.table_name(&self.table)
159+
.table_name(self.table.as_str())
155160
.item(PK, AttributeValue::S(key.to_string()))
156161
.item(VAL, AttributeValue::B(Blob::new(value)))
157162
.send()
@@ -164,7 +169,7 @@ impl Store for AwsDynamoStore {
164169
if self.exists(key).await? {
165170
self.client
166171
.delete_item()
167-
.table_name(&self.table)
172+
.table_name(self.table.as_str())
168173
.key(PK, AttributeValue::S(key.to_string()))
169174
.send()
170175
.await
@@ -192,11 +197,11 @@ impl Store for AwsDynamoStore {
192197
)]))
193198
}
194199
let mut request_items = Some(HashMap::from_iter([(
195-
self.table.clone(),
200+
self.table.to_string(),
196201
keys_and_attributes_builder.build().map_err(log_error)?,
197202
)]));
198203

199-
loop {
204+
while request_items.is_some() {
200205
let BatchGetItemOutput {
201206
responses: Some(mut responses),
202207
unprocessed_keys,
@@ -212,7 +217,7 @@ impl Store for AwsDynamoStore {
212217
return Err(Error::Other("No results".into()));
213218
};
214219

215-
if let Some(items) = responses.remove(&self.table) {
220+
if let Some(items) = responses.remove(self.table.as_str()) {
216221
for mut item in items {
217222
let Some(AttributeValue::S(pk)) = item.remove(PK) else {
218223
return Err(Error::Other(
@@ -229,12 +234,10 @@ impl Store for AwsDynamoStore {
229234
}
230235
}
231236

232-
match unprocessed_keys {
233-
None => return Ok(results),
234-
// TODO: break out if we have retried 10+ times?
235-
remaining_keys => request_items = remaining_keys,
236-
}
237+
request_items = unprocessed_keys;
237238
}
239+
240+
Ok(results)
238241
}
239242

240243
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> {
@@ -253,9 +256,9 @@ impl Store for AwsDynamoStore {
253256
)
254257
}
255258

256-
let mut request_items = Some(HashMap::from_iter([(self.table.clone(), data)]));
259+
let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)]));
257260

258-
loop {
261+
while request_items.is_some() {
259262
let results = self
260263
.client
261264
.batch_write_item()
@@ -264,12 +267,10 @@ impl Store for AwsDynamoStore {
264267
.await
265268
.map_err(log_error)?;
266269

267-
match results.unprocessed_items {
268-
None => return Ok(()),
269-
// TODO: break out if we have retried 10+ times?
270-
remaining_items => request_items = remaining_items,
271-
}
270+
request_items = results.unprocessed_items;
272271
}
272+
273+
Ok(())
273274
}
274275

275276
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error> {
@@ -287,30 +288,28 @@ impl Store for AwsDynamoStore {
287288
)
288289
}
289290

290-
let mut input = Some(HashMap::from_iter([(self.table.clone(), data)]));
291+
let mut request_items = Some(HashMap::from_iter([(self.table.to_string(), data)]));
291292

292-
loop {
293+
while request_items.is_some() {
293294
let results = self
294295
.client
295296
.batch_write_item()
296-
.set_request_items(input)
297+
.set_request_items(request_items)
297298
.send()
298299
.await
299300
.map_err(log_error)?;
300301

301-
match results.unprocessed_items {
302-
None => return Ok(()),
303-
// TODO: break out if we have retried 10+ times?
304-
remaining_items => input = remaining_items,
305-
}
302+
request_items = results.unprocessed_items;
306303
}
304+
305+
Ok(())
307306
}
308307

309308
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> {
310309
let result = self
311310
.client
312311
.update_item()
313-
.table_name(&self.table)
312+
.table_name(self.table.as_str())
314313
.key(PK, AttributeValue::S(key))
315314
.update_expression("ADD #val :delta")
316315
.expression_attribute_names("#val", VAL)
@@ -337,6 +336,7 @@ impl Store for AwsDynamoStore {
337336
Ok(Arc::new(CompareAndSwap {
338337
key: key.to_string(),
339338
client: self.client.clone(),
339+
table: self.table.clone(),
340340
etag: Mutex::new(None),
341341
bucket_rep,
342342
}))
@@ -346,13 +346,77 @@ impl Store for AwsDynamoStore {
346346
#[async_trait]
347347
impl Cas for CompareAndSwap {
348348
async fn current(&self) -> Result<Option<Vec<u8>>, Error> {
349-
todo!();
349+
let GetItemOutput {
350+
item: Some(mut current_item),
351+
..
352+
} = self
353+
.client
354+
.get_item()
355+
.table_name(self.table.as_str())
356+
.key(
357+
PK,
358+
aws_sdk_dynamodb::types::AttributeValue::S(self.key.clone()),
359+
)
360+
.send()
361+
.await
362+
.map_err(log_error)?
363+
else {
364+
return Ok(None);
365+
};
366+
367+
if let Some(AttributeValue::B(val)) = current_item.remove(VAL) {
368+
let version = if let Some(AttributeValue::N(ver)) = current_item.remove(VER) {
369+
Some(ver)
370+
} else {
371+
Some(String::from("0"))
372+
};
373+
self.etag.lock().unwrap().clone_from(&version);
374+
Ok(Some(val.into_inner()))
375+
} else {
376+
Ok(None)
377+
}
350378
}
351379

352380
/// `swap` updates the value for the key using the etag saved in the `current` function for
353381
/// optimistic concurrency.
354382
async fn swap(&self, value: Vec<u8>) -> Result<(), SwapError> {
355-
todo!();
383+
let mut update_item = self
384+
.client
385+
.update_item()
386+
.table_name(self.table.as_str())
387+
.key(PK, AttributeValue::S(self.key.clone()))
388+
.update_expression("SET #val=:val, ADD #ver :increment")
389+
.expression_attribute_names("#val", VAL)
390+
.expression_attribute_names("#ver", VER)
391+
.expression_attribute_values(":val", AttributeValue::B(Blob::new(value)))
392+
.expression_attribute_values(":increment", AttributeValue::N("1".to_owned()))
393+
.return_values(aws_sdk_dynamodb::types::ReturnValue::None);
394+
395+
let current_version = self.etag.lock().unwrap().clone();
396+
match current_version {
397+
// Existing item with no version key, update under condition that version key still does not exist in Dynamo when operation is executed
398+
Some(version) if version == "0" => {
399+
update_item = update_item.condition_expression("attribute_not_exists(#ver)");
400+
}
401+
// Existing item with version key, update under condition that version in Dynamo matches stored version
402+
Some(version) => {
403+
update_item = update_item
404+
.condition_expression("#ver = :ver")
405+
.expression_attribute_values(":ver", AttributeValue::N(version));
406+
}
407+
// Assume new item, insert under condition that item does not already exist
408+
None => {
409+
update_item = update_item
410+
.condition_expression("attribute_not_exists(#pk)")
411+
.expression_attribute_names("#pk", PK);
412+
}
413+
}
414+
415+
update_item
416+
.send()
417+
.await
418+
.map(|_| ())
419+
.map_err(|e| SwapError::CasFailed(format!("{e:?}")))
356420
}
357421

358422
async fn bucket_rep(&self) -> u32 {
@@ -369,7 +433,7 @@ impl AwsDynamoStore {
369433
let response = self
370434
.client
371435
.get_item()
372-
.table_name(&self.table)
436+
.table_name(self.table.as_str())
373437
.key(
374438
PK,
375439
aws_sdk_dynamodb::types::AttributeValue::S(key.to_string()),
@@ -397,7 +461,7 @@ impl AwsDynamoStore {
397461
let mut scan_builder = self
398462
.client
399463
.scan()
400-
.table_name(&self.table)
464+
.table_name(self.table.as_str())
401465
.projection_expression(PK);
402466

403467
if let Some(keys) = last_evaluated_key {

0 commit comments

Comments
 (0)