@@ -16,6 +16,7 @@ use async_lock::{Semaphore, SemaphoreGuard};
1616use aws_sdk_dynamodb:: {
1717 error:: SdkError ,
1818 operation:: {
19+ batch_get_item:: BatchGetItemError ,
1920 create_table:: CreateTableError ,
2021 delete_table:: DeleteTableError ,
2122 get_item:: GetItemError ,
@@ -25,7 +26,7 @@ use aws_sdk_dynamodb::{
2526 } ,
2627 primitives:: Blob ,
2728 types:: {
28- AttributeDefinition , AttributeValue , Delete , KeySchemaElement , KeyType ,
29+ AttributeDefinition , AttributeValue , Delete , KeySchemaElement , KeyType , KeysAndAttributes ,
2930 ProvisionedThroughput , Put , ScalarAttributeType , TransactWriteItem ,
3031 } ,
3132 Client ,
@@ -152,6 +153,11 @@ const TEST_DYNAMO_DB_MAX_STREAM_QUERIES: usize = 10;
152153/// See <https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html>
153154const MAX_TRANSACT_WRITE_ITEM_SIZE : usize = 100 ;
154155
156+ /// Maximum number of entries that can be obtained in a [`BatchGetItem`] operation.
157+ /// The two constraints are at most 100 operations and at most 16M in total.
158+ /// Since the maximum size of a value is 400K, this gets us 40 as upper limit
159+ const MAX_BATCH_GET_ITEM_SIZE : usize = 40 ;
160+
155161/// Builds the key attributes for a table item.
156162///
157163/// The key is composed of two attributes that are both binary blobs. The first attribute is a
@@ -687,6 +693,84 @@ impl DynamoDbStoreInternal {
687693 responses,
688694 } )
689695 }
696+
697+ async fn read_batch_values_bytes (
698+ & self ,
699+ keys : & [ Vec < u8 > ] ,
700+ ) -> Result < Vec < Option < Vec < u8 > > > , DynamoDbStoreInternalError > {
701+ // Early return for empty keys
702+ if keys. is_empty ( ) {
703+ return Ok ( Vec :: new ( ) ) ;
704+ }
705+ let mut results = vec ! [ None ; keys. len( ) ] ;
706+
707+ // Build the request keys
708+ let mut request_keys = Vec :: new ( ) ;
709+ let mut key_to_index = HashMap :: < Vec < u8 > , Vec < usize > > :: new ( ) ;
710+
711+ for ( i, key) in keys. iter ( ) . enumerate ( ) {
712+ check_key_size ( key) ?;
713+ let key_attrs = build_key ( & self . start_key , key. clone ( ) ) ;
714+ key_to_index. entry ( key. clone ( ) ) . or_default ( ) . push ( i) ;
715+ request_keys. push ( key_attrs) ;
716+ }
717+
718+ let keys_and_attributes = KeysAndAttributes :: builder ( )
719+ . set_keys ( Some ( request_keys) )
720+ . build ( ) ?;
721+
722+ let mut request_items = HashMap :: new ( ) ;
723+ request_items. insert ( self . namespace . clone ( ) , keys_and_attributes) ;
724+
725+ // Execute batch get item request with retry for unprocessed keys
726+ let mut remaining_request_items = Some ( request_items) ;
727+
728+ while let Some ( request_items) = remaining_request_items {
729+ // Skip if the request items are empty
730+ if request_items. is_empty ( ) {
731+ break ;
732+ }
733+
734+ let _guard = self . acquire ( ) . await ;
735+ let response = self
736+ . client
737+ . batch_get_item ( )
738+ . set_request_items ( Some ( request_items) )
739+ . send ( )
740+ . boxed_sync ( )
741+ . await ?;
742+
743+ // Process returned items
744+ if let Some ( mut responses) = response. responses {
745+ if let Some ( items) = responses. remove ( & self . namespace ) {
746+ for mut item in items {
747+ // Extract key to find the original index
748+ let key_attr = item
749+ . get ( KEY_ATTRIBUTE )
750+ . ok_or ( DynamoDbStoreInternalError :: MissingKey ) ?;
751+
752+ if let AttributeValue :: B ( blob) = key_attr {
753+ let key = blob. as_ref ( ) ;
754+ if let Some ( indices) = key_to_index. get ( key) {
755+ if let Some ( ( & last, rest) ) = indices. split_last ( ) {
756+ let value = extract_value_owned ( & mut item) ?;
757+ for index in rest {
758+ results[ * index] = Some ( value. clone ( ) ) ;
759+ }
760+ results[ last] = Some ( value) ;
761+ }
762+ }
763+ }
764+ }
765+ }
766+ }
767+
768+ // Handle unprocessed keys
769+ remaining_request_items = response. unprocessed_keys ;
770+ }
771+
772+ Ok ( results)
773+ }
690774}
691775
692776struct QueryResponses {
@@ -759,17 +843,18 @@ impl ReadableKeyValueStore for DynamoDbStoreInternal {
759843 & self ,
760844 keys : Vec < Vec < u8 > > ,
761845 ) -> Result < Vec < Option < Vec < u8 > > > , DynamoDbStoreInternalError > {
762- let mut handles = Vec :: new ( ) ;
763- for key in keys {
764- check_key_size ( & key) ?;
765- let key_db = build_key ( & self . start_key , key) ;
766- let handle = self . read_value_bytes_general ( key_db) ;
767- handles. push ( handle) ;
846+ if keys. is_empty ( ) {
847+ return Ok ( Vec :: new ( ) ) ;
768848 }
769- join_all ( handles)
849+
850+ let handles = keys
851+ . chunks ( MAX_BATCH_GET_ITEM_SIZE )
852+ . map ( |key_batch| self . read_batch_values_bytes ( key_batch) ) ;
853+ let results: Vec < _ > = join_all ( handles)
770854 . await
771855 . into_iter ( )
772- . collect :: < Result < _ , _ > > ( )
856+ . collect :: < Result < _ , _ > > ( ) ?;
857+ Ok ( results. into_iter ( ) . flatten ( ) . collect ( ) )
773858 }
774859
775860 async fn find_keys_by_prefix (
@@ -861,6 +946,10 @@ pub enum DynamoDbStoreInternalError {
861946 #[ error( transparent) ]
862947 Get ( #[ from] Box < SdkError < GetItemError > > ) ,
863948
949+ /// An error occurred while batch getting items.
950+ #[ error( transparent) ]
951+ BatchGet ( #[ from] Box < SdkError < BatchGetItemError > > ) ,
952+
864953 /// An error occurred while writing a transaction of items.
865954 #[ error( transparent) ]
866955 TransactWriteItem ( #[ from] Box < SdkError < TransactWriteItemsError > > ) ,
0 commit comments