|
1 | | -use std::{collections::HashMap, sync::Arc}; |
| 1 | +use std::{ |
| 2 | + collections::HashMap, |
| 3 | + sync::{Arc, Mutex}, |
| 4 | +}; |
2 | 5 |
|
3 | 6 | use anyhow::Result; |
4 | 7 | use aws_config::{BehaviorVersion, Region, SdkConfig}; |
5 | 8 | use aws_credential_types::Credentials; |
6 | 9 | use aws_sdk_dynamodb::{ |
7 | 10 | config::{ProvideCredentials, SharedCredentialsProvider}, |
8 | | - operation::batch_get_item::BatchGetItemOutput, |
| 11 | + operation::{batch_get_item::BatchGetItemOutput, update_item::UpdateItemOutput}, |
9 | 12 | primitives::Blob, |
10 | 13 | types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest}, |
11 | 14 | Client, |
12 | 15 | }; |
13 | 16 | use spin_core::async_trait; |
14 | | -use spin_factor_key_value::{log_error, Error, Store, StoreManager}; |
| 17 | +use spin_factor_key_value::{log_error, Cas, Error, Store, StoreManager, SwapError}; |
15 | 18 |
|
16 | 19 | pub struct KeyValueAwsDynamo { |
17 | 20 | table: String, |
@@ -128,12 +131,12 @@ struct AwsDynamoStore { |
128 | 131 | table: String, |
129 | 132 | } |
130 | 133 |
|
131 | | -// struct CompareAndSwap { |
132 | | -// key: String, |
133 | | -// client: CollectionClient, |
134 | | -// bucket_rep: u32, |
135 | | -// etag: Mutex<Option<String>>, |
136 | | -// } |
| 134 | +struct CompareAndSwap { |
| 135 | + key: String, |
| 136 | + client: Client, |
| 137 | + bucket_rep: u32, |
| 138 | + etag: Mutex<Option<String>>, |
| 139 | +} |
137 | 140 |
|
138 | 141 | const PK: &str = "PK"; |
139 | 142 | const VAL: &str = "val"; |
@@ -304,15 +307,60 @@ impl Store for AwsDynamoStore { |
304 | 307 | } |
305 | 308 |
|
306 | 309 | async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> { |
307 | | - todo!() |
| 310 | + let result = self |
| 311 | + .client |
| 312 | + .update_item() |
| 313 | + .table_name(&self.table) |
| 314 | + .key(PK, AttributeValue::S(key)) |
| 315 | + .update_expression("ADD #val :delta") |
| 316 | + .expression_attribute_names("#val", VAL) |
| 317 | + .expression_attribute_values(":delta", AttributeValue::N(delta.to_string())) |
| 318 | + .return_values(aws_sdk_dynamodb::types::ReturnValue::UpdatedNew) |
| 319 | + .send() |
| 320 | + .await |
| 321 | + .map_err(log_error)?; |
| 322 | + |
| 323 | + if let Some(updated_attributes) = result.attributes { |
| 324 | + if let Some(AttributeValue::N(new_value)) = updated_attributes.get(VAL) { |
| 325 | + return Ok(new_value.parse::<i64>().map_err(log_error))?; |
| 326 | + } |
| 327 | + } |
| 328 | + |
| 329 | + Err(Error::Other("Failed to increment value".into())) |
308 | 330 | } |
309 | 331 |
|
310 | 332 | async fn new_compare_and_swap( |
311 | 333 | &self, |
312 | 334 | bucket_rep: u32, |
313 | 335 | key: &str, |
314 | 336 | ) -> Result<Arc<dyn spin_factor_key_value::Cas>, Error> { |
315 | | - todo!() |
| 337 | + Ok(Arc::new(CompareAndSwap { |
| 338 | + key: key.to_string(), |
| 339 | + client: self.client.clone(), |
| 340 | + etag: Mutex::new(None), |
| 341 | + bucket_rep, |
| 342 | + })) |
| 343 | + } |
| 344 | +} |
| 345 | + |
| 346 | +#[async_trait] |
| 347 | +impl Cas for CompareAndSwap { |
| 348 | + async fn current(&self) -> Result<Option<Vec<u8>>, Error> { |
| 349 | + todo!(); |
| 350 | + } |
| 351 | + |
| 352 | + /// `swap` updates the value for the key using the etag saved in the `current` function for |
| 353 | + /// optimistic concurrency. |
| 354 | + async fn swap(&self, value: Vec<u8>) -> Result<(), SwapError> { |
| 355 | + todo!(); |
| 356 | + } |
| 357 | + |
| 358 | + async fn bucket_rep(&self) -> u32 { |
| 359 | + self.bucket_rep |
| 360 | + } |
| 361 | + |
| 362 | + async fn key(&self) -> String { |
| 363 | + self.key.clone() |
316 | 364 | } |
317 | 365 | } |
318 | 366 |
|
|
0 commit comments