@@ -8,7 +8,10 @@ use aws_config::{BehaviorVersion, Region, SdkConfig};
88use aws_credential_types:: Credentials ;
99use aws_sdk_dynamodb:: {
1010 config:: { ProvideCredentials , SharedCredentialsProvider } ,
11- operation:: { batch_get_item:: BatchGetItemOutput , get_item:: GetItemOutput } ,
11+ operation:: {
12+ batch_get_item:: BatchGetItemOutput , batch_write_item:: BatchWriteItemOutput ,
13+ get_item:: GetItemOutput ,
14+ } ,
1215 primitives:: Blob ,
1316 types:: { AttributeValue , DeleteRequest , KeysAndAttributes , PutRequest , WriteRequest } ,
1417 Client ,
@@ -136,7 +139,7 @@ struct CompareAndSwap {
136139 client : Client ,
137140 table : Arc < String > ,
138141 bucket_rep : u32 ,
139- etag : Mutex < Option < String > > ,
142+ version : Mutex < Option < String > > ,
140143}
141144
142145/// Primary key in DynamoDB items used for querying items
@@ -234,7 +237,7 @@ impl Store for AwsDynamoStore {
234237 }
235238 }
236239
237- request_items = unprocessed_keys;
240+ request_items = unprocessed_keys. filter ( |unprocessed| !unprocessed . is_empty ( ) ) ;
238241 }
239242
240243 Ok ( results)
@@ -259,15 +262,17 @@ impl Store for AwsDynamoStore {
259262 let mut request_items = Some ( HashMap :: from_iter ( [ ( self . table . to_string ( ) , data) ] ) ) ;
260263
261264 while request_items. is_some ( ) {
262- let results = self
265+ let BatchWriteItemOutput {
266+ unprocessed_items, ..
267+ } = self
263268 . client
264269 . batch_write_item ( )
265270 . set_request_items ( request_items)
266271 . send ( )
267272 . await
268273 . map_err ( log_error) ?;
269274
270- request_items = results . unprocessed_items ;
275+ request_items = unprocessed_items. filter ( |unprocessed| !unprocessed . is_empty ( ) ) ;
271276 }
272277
273278 Ok ( ( ) )
@@ -291,15 +296,17 @@ impl Store for AwsDynamoStore {
291296 let mut request_items = Some ( HashMap :: from_iter ( [ ( self . table . to_string ( ) , data) ] ) ) ;
292297
293298 while request_items. is_some ( ) {
294- let results = self
299+ let BatchWriteItemOutput {
300+ unprocessed_items, ..
301+ } = self
295302 . client
296303 . batch_write_item ( )
297304 . set_request_items ( request_items)
298305 . send ( )
299306 . await
300307 . map_err ( log_error) ?;
301308
302- request_items = results . unprocessed_items ;
309+ request_items = unprocessed_items. filter ( |unprocessed| !unprocessed . is_empty ( ) ) ;
303310 }
304311
305312 Ok ( ( ) )
@@ -337,7 +344,7 @@ impl Store for AwsDynamoStore {
337344 key : key. to_string ( ) ,
338345 client : self . client . clone ( ) ,
339346 table : self . table . clone ( ) ,
340- etag : Mutex :: new ( None ) ,
347+ version : Mutex :: new ( None ) ,
341348 bucket_rep,
342349 } ) )
343350 }
@@ -365,12 +372,12 @@ impl Cas for CompareAndSwap {
365372 } ;
366373
367374 if let Some ( AttributeValue :: B ( val) ) = current_item. remove ( VAL ) {
368- let version = if let Some ( AttributeValue :: N ( ver) ) = current_item. remove ( VER ) {
369- Some ( ver)
370- } else {
371- Some ( String :: from ( "0" ) )
375+ let version = match current_item. remove ( VER ) {
376+ Some ( AttributeValue :: N ( ver) ) => Some ( ver) ,
377+ _ => None ,
372378 } ;
373- self . etag . lock ( ) . unwrap ( ) . clone_from ( & version) ;
379+
380+ self . version . lock ( ) . unwrap ( ) . clone_from ( & version) ;
374381 Ok ( Some ( val. into_inner ( ) ) )
375382 } else {
376383 Ok ( None )
@@ -385,38 +392,35 @@ impl Cas for CompareAndSwap {
385392 . update_item ( )
386393 . table_name ( self . table . as_str ( ) )
387394 . key ( PK , AttributeValue :: S ( self . key . clone ( ) ) )
388- . update_expression ( "SET #val=:val, ADD #ver :increment" )
389395 . expression_attribute_names ( "#val" , VAL )
390- . expression_attribute_names ( "#ver" , VER )
391396 . expression_attribute_values ( ":val" , AttributeValue :: B ( Blob :: new ( value) ) )
397+ . expression_attribute_names ( "#ver" , VER )
392398 . expression_attribute_values ( ":increment" , AttributeValue :: N ( "1" . to_owned ( ) ) )
393- . return_values ( aws_sdk_dynamodb:: types:: ReturnValue :: None ) ;
399+ . return_values ( aws_sdk_dynamodb:: types:: ReturnValue :: UpdatedNew ) ;
394400
395- let current_version = self . etag . lock ( ) . unwrap ( ) . clone ( ) ;
401+ let current_version = self . version . lock ( ) . unwrap ( ) . clone ( ) ;
396402 match current_version {
397- // Existing item with no version key, update under condition that version key still does not exist in Dynamo when operation is executed
398- Some ( version) if version == "0" => {
399- update_item = update_item. condition_expression ( "attribute_not_exists(#ver)" ) ;
400- }
401- // Existing item with version key, update under condition that version in Dynamo matches stored version
403+ // Existing item with version key, update under condition that version in DynamoDB matches stored version (optimistic lock)
402404 Some ( version) => {
403405 update_item = update_item
406+ . update_expression ( "SET #val=:val ADD #ver :increment" )
404407 . condition_expression ( "#ver = :ver" )
405408 . expression_attribute_values ( ":ver" , AttributeValue :: N ( version) ) ;
406409 }
407- // Assume new item, insert under condition that item does not already exist
410+ // Assume new/unversioned item, upsert under condition that item does not already have a version -- if it does, another atomic operation has already started
408411 None => {
409412 update_item = update_item
410- . condition_expression ( "attribute_not_exists(#pk )" )
411- . expression_attribute_names ( "#pk" , PK ) ;
413+ . condition_expression ( "attribute_not_exists(#ver )" )
414+ . update_expression ( "SET #val=:val, #ver=:increment" ) ;
412415 }
413- }
416+ } ;
414417
415418 update_item
416419 . send ( )
417420 . await
418- . map ( |_| ( ) )
419- . map_err ( |e| SwapError :: CasFailed ( format ! ( "{e:?}" ) ) )
421+ . map_err ( |e| SwapError :: CasFailed ( format ! ( "{e:?}" ) ) ) ?;
422+
423+ Ok ( ( ) )
420424 }
421425
422426 async fn bucket_rep ( & self ) -> u32 {
0 commit comments