Skip to content

Commit ef2e26e

Browse files
committed
Implemented first draft batch operations
Signed-off-by: Darwin Boersma <[email protected]>
1 parent 74cfa2b commit ef2e26e

File tree

1 file changed

+118
-4
lines changed

1 file changed

+118
-4
lines changed

crates/key-value-aws/src/store.rs

Lines changed: 118 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use aws_config::{BehaviorVersion, Region, SdkConfig};
55
use aws_credential_types::Credentials;
66
use aws_sdk_dynamodb::{
77
config::{ProvideCredentials, SharedCredentialsProvider},
8+
operation::batch_get_item::BatchGetItemOutput,
89
primitives::Blob,
9-
types::{AttributeValue, KeysAndAttributes},
10+
types::{AttributeValue, DeleteRequest, KeysAndAttributes, PutRequest, WriteRequest},
1011
Client,
1112
};
1213
use spin_core::async_trait;
@@ -178,15 +179,128 @@ impl Store for AwsDynamoStore {
178179
}
179180

180181
async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error> {
181-
todo!()
182+
let mut results = Vec::with_capacity(keys.len());
183+
184+
let mut keys_and_attributes_builder = KeysAndAttributes::builder();
185+
for key in keys {
186+
keys_and_attributes_builder = keys_and_attributes_builder.keys(HashMap::from_iter([(
187+
PK.to_owned(),
188+
AttributeValue::S(key),
189+
)]))
190+
}
191+
let mut request_items = Some(HashMap::from_iter([(
192+
self.table.clone(),
193+
keys_and_attributes_builder.build().map_err(log_error)?,
194+
)]));
195+
196+
loop {
197+
let BatchGetItemOutput {
198+
responses: Some(mut responses),
199+
unprocessed_keys,
200+
..
201+
} = self
202+
.client
203+
.batch_get_item()
204+
.set_request_items(request_items)
205+
.send()
206+
.await
207+
.map_err(log_error)?
208+
else {
209+
return Err(Error::Other("No results".into()));
210+
};
211+
212+
if let Some(items) = responses.remove(&self.table) {
213+
for mut item in items {
214+
let Some(AttributeValue::S(pk)) = item.remove(PK) else {
215+
return Err(Error::Other(
216+
"Could not find 'PK' key on DynamoDB item".into(),
217+
));
218+
};
219+
let Some(AttributeValue::B(val)) = item.remove(VAL) else {
220+
return Err(Error::Other(
221+
"Could not find 'val' key on DynamoDB item".into(),
222+
));
223+
};
224+
225+
results.push((pk, Some(val.into_inner())));
226+
}
227+
}
228+
229+
match unprocessed_keys {
230+
None => return Ok(results),
231+
// TODO: break out if we have retried 10+ times?
232+
remaining_keys => request_items = remaining_keys,
233+
}
234+
}
182235
}
183236

184237
async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error> {
185-
todo!()
238+
let mut data = Vec::with_capacity(key_values.len());
239+
for (key, val) in key_values {
240+
data.push(
241+
WriteRequest::builder()
242+
.put_request(
243+
PutRequest::builder()
244+
.item(PK, AttributeValue::S(key))
245+
.item(VAL, AttributeValue::B(Blob::new(val)))
246+
.build()
247+
.map_err(log_error)?,
248+
)
249+
.build(),
250+
)
251+
}
252+
253+
let mut request_items = Some(HashMap::from_iter([(self.table.clone(), data)]));
254+
255+
loop {
256+
let results = self
257+
.client
258+
.batch_write_item()
259+
.set_request_items(request_items)
260+
.send()
261+
.await
262+
.map_err(log_error)?;
263+
264+
match results.unprocessed_items {
265+
None => return Ok(()),
266+
// TODO: break out if we have retried 10+ times?
267+
remaining_items => request_items = remaining_items,
268+
}
269+
}
186270
}
187271

188272
async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error> {
189-
todo!()
273+
let mut data = Vec::with_capacity(keys.len());
274+
for key in keys {
275+
data.push(
276+
WriteRequest::builder()
277+
.delete_request(
278+
DeleteRequest::builder()
279+
.key(PK, AttributeValue::S(key))
280+
.build()
281+
.map_err(log_error)?,
282+
)
283+
.build(),
284+
)
285+
}
286+
287+
let mut input = Some(HashMap::from_iter([(self.table.clone(), data)]));
288+
289+
loop {
290+
let results = self
291+
.client
292+
.batch_write_item()
293+
.set_request_items(input)
294+
.send()
295+
.await
296+
.map_err(log_error)?;
297+
298+
match results.unprocessed_items {
299+
None => return Ok(()),
300+
// TODO: break out if we have retried 10+ times?
301+
remaining_items => input = remaining_items,
302+
}
303+
}
190304
}
191305

192306
async fn increment(&self, key: String, delta: i64) -> Result<i64, Error> {

0 commit comments

Comments
 (0)