Skip to content

Commit 606c48a

Browse files
authored
Merge pull request #2883 from ogghead/dynamo-key-value-store
Implement AWS key value store
2 parents 509d8d1 + 6d8570b commit 606c48a

File tree

13 files changed

+1216
-63
lines changed

13 files changed

+1216
-63
lines changed

Cargo.lock

Lines changed: 463 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-key-value/src/host.rs

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -283,10 +283,10 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
283283
keys: Vec<String>,
284284
) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
285285
let store = self.get_store_wasi(bucket)?;
286-
store
287-
.get_many(keys.iter().map(|k| k.to_string()).collect())
288-
.await
289-
.map_err(to_wasi_err)
286+
if keys.is_empty() {
287+
return Ok(vec![]);
288+
}
289+
store.get_many(keys).await.map_err(to_wasi_err)
290290
}
291291

292292
#[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))]
@@ -296,6 +296,9 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
296296
key_values: Vec<(String, Vec<u8>)>,
297297
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
298298
let store = self.get_store_wasi(bucket)?;
299+
if key_values.is_empty() {
300+
return Ok(());
301+
}
299302
store.set_many(key_values).await.map_err(to_wasi_err)
300303
}
301304

@@ -306,10 +309,10 @@ impl wasi_keyvalue::batch::Host for KeyValueDispatch {
306309
keys: Vec<String>,
307310
) -> std::result::Result<(), wasi_keyvalue::store::Error> {
308311
let store = self.get_store_wasi(bucket)?;
309-
store
310-
.delete_many(keys.iter().map(|k| k.to_string()).collect())
311-
.await
312-
.map_err(to_wasi_err)
312+
if keys.is_empty() {
313+
return Ok(());
314+
}
315+
store.delete_many(keys).await.map_err(to_wasi_err)
313316
}
314317
}
315318

@@ -355,6 +358,13 @@ impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
355358

356359
#[async_trait]
357360
impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
361+
fn convert_cas_error(
362+
&mut self,
363+
error: spin_world::wasi::keyvalue::atomics::CasError,
364+
) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
365+
Ok(error)
366+
}
367+
358368
#[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))]
359369
async fn increment(
360370
&mut self,
@@ -371,27 +381,29 @@ impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
371381
&mut self,
372382
cas_res: Resource<atomics::Cas>,
373383
value: Vec<u8>,
374-
) -> Result<std::result::Result<(), CasError>> {
384+
) -> Result<(), CasError> {
375385
let cas_rep = cas_res.rep();
376386
let cas = self
377387
.get_cas(Resource::<Bucket>::new_own(cas_rep))
378388
.map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
379389

380390
match cas.swap(value).await {
381-
Ok(_) => Ok(Ok(())),
391+
Ok(_) => Ok(()),
382392
Err(err) => match err {
383393
SwapError::CasFailed(_) => {
384394
let bucket = Resource::new_own(cas.bucket_rep().await);
385-
let new_cas = self.new(bucket, cas.key().await).await?;
395+
let new_cas = self
396+
.new(bucket, cas.key().await)
397+
.await
398+
.map_err(CasError::StoreError)?;
386399
let new_cas_rep = new_cas.rep();
387-
self.current(Resource::new_own(new_cas_rep)).await?;
388-
Err(anyhow::Error::new(CasError::CasFailed(Resource::new_own(
389-
new_cas_rep,
390-
))))
400+
self.current(Resource::new_own(new_cas_rep))
401+
.await
402+
.map_err(CasError::StoreError)?;
403+
let res = Resource::new_own(new_cas_rep);
404+
Err(CasError::CasFailed(res))
391405
}
392-
SwapError::Other(msg) => Err(anyhow::Error::new(CasError::StoreError(
393-
atomics::Error::Other(msg),
394-
))),
406+
SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
395407
},
396408
}
397409
}

crates/factor-key-value/src/util.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,11 @@ impl Store for CachingStore {
251251
keys: Vec<String>,
252252
) -> anyhow::Result<Vec<(String, Option<Vec<u8>>)>, Error> {
253253
let mut state = self.state.lock().await;
254+
255+
// Flush any outstanding writes first in case entries have been popped off the end of the LRU cache prior
256+
// to their corresponding writes reaching the backing store.
257+
state.flush().await?;
258+
254259
let mut found: Vec<(String, Option<Vec<u8>>)> = Vec::new();
255260
let mut not_found: Vec<String> = Vec::new();
256261
for key in keys {
@@ -260,10 +265,12 @@ impl Store for CachingStore {
260265
}
261266
}
262267

263-
let keys_and_values = self.inner.get_many(not_found).await?;
264-
for (key, value) in keys_and_values {
265-
found.push((key.clone(), value.clone()));
266-
state.cache.put(key, value);
268+
if !not_found.is_empty() {
269+
let keys_and_values = self.inner.get_many(not_found).await?;
270+
for (key, value) in keys_and_values {
271+
found.push((key.clone(), value.clone()));
272+
state.cache.put(key, value);
273+
}
267274
}
268275

269276
Ok(found)

crates/key-value-aws/Cargo.toml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "spin-key-value-aws"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
license.workspace = true
7+
homepage.workspace = true
8+
repository.workspace = true
9+
rust-version.workspace = true
10+
11+
[dependencies]
12+
anyhow = { workspace = true }
13+
async-once-cell = "0.5.4"
14+
aws-config = "1.1.7"
15+
aws-credential-types = "1.1.7"
16+
aws-sdk-dynamodb = "1.49.0"
17+
serde = { workspace = true }
18+
spin-core = { path = "../core" }
19+
spin-factor-key-value = { path = "../factor-key-value" }
20+
21+
[lints]
22+
workspace = true

crates/key-value-aws/src/lib.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
mod store;
2+
3+
use serde::Deserialize;
4+
use spin_factor_key_value::runtime_config::spin::MakeKeyValueStore;
5+
use store::{
6+
KeyValueAwsDynamo, KeyValueAwsDynamoAuthOptions, KeyValueAwsDynamoRuntimeConfigOptions,
7+
};
8+
9+
/// A key-value store that uses AWS Dynamo as the backend.
10+
#[derive(Default)]
11+
pub struct AwsDynamoKeyValueStore {
12+
_priv: (),
13+
}
14+
15+
impl AwsDynamoKeyValueStore {
16+
/// Creates a new `AwsKeyValueStore`.
17+
pub fn new() -> Self {
18+
Self::default()
19+
}
20+
}
21+
22+
/// Runtime configuration for the AWS Dynamo key-value store.
23+
#[derive(Deserialize)]
24+
pub struct AwsDynamoKeyValueRuntimeConfig {
25+
/// The access key for the AWS Dynamo DB account role.
26+
access_key: Option<String>,
27+
/// The secret key for authorization on the AWS Dynamo DB account.
28+
secret_key: Option<String>,
29+
/// The token for authorization on the AWS Dynamo DB account.
30+
token: Option<String>,
31+
/// The AWS region where the database is located
32+
region: String,
33+
/// Boolean determining whether to use strongly consistent reads.
34+
/// Defaults to `false` but can be set to `true` to improve atomicity
35+
consistent_read: Option<bool>,
36+
/// The AWS Dynamo DB table.
37+
table: String,
38+
}
39+
40+
impl MakeKeyValueStore for AwsDynamoKeyValueStore {
41+
const RUNTIME_CONFIG_TYPE: &'static str = "aws_dynamo";
42+
43+
type RuntimeConfig = AwsDynamoKeyValueRuntimeConfig;
44+
45+
type StoreManager = KeyValueAwsDynamo;
46+
47+
fn make_store(
48+
&self,
49+
runtime_config: Self::RuntimeConfig,
50+
) -> anyhow::Result<Self::StoreManager> {
51+
let AwsDynamoKeyValueRuntimeConfig {
52+
access_key,
53+
secret_key,
54+
token,
55+
region,
56+
consistent_read,
57+
table,
58+
} = runtime_config;
59+
let auth_options = match (access_key, secret_key) {
60+
(Some(access_key), Some(secret_key)) => {
61+
KeyValueAwsDynamoAuthOptions::RuntimeConfigValues(
62+
KeyValueAwsDynamoRuntimeConfigOptions::new(access_key, secret_key, token),
63+
)
64+
}
65+
_ => KeyValueAwsDynamoAuthOptions::Environmental,
66+
};
67+
KeyValueAwsDynamo::new(
68+
region,
69+
consistent_read.unwrap_or(false),
70+
table,
71+
auth_options,
72+
)
73+
}
74+
}

0 commit comments

Comments
 (0)