Skip to content

Commit 4f30df0

Browse files
committed
Add ConsistentRead client config to DDB Enhanced Client
1 parent 8966845 commit 4f30df0

21 files changed

+510
-89
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "DynamoDB Enhanced Client",
4+
"contributor": "",
5+
"description": "Adds consistent read client configuration to DDB Enhanced Client"
6+
}

services-custom/dynamodb-enhanced/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,5 +233,10 @@
233233
<type>so</type>
234234
<scope>test</scope>
235235
</dependency>
236+
<dependency>
237+
<groupId>org.mockito</groupId>
238+
<artifactId>mockito-junit-jupiter</artifactId>
239+
<scope>test</scope>
240+
</dependency>
236241
</dependencies>
237242
</project>

services-custom/dynamodb-enhanced/src/it/java/software/amazon/awssdk/enhanced/dynamodb/AsyncCrudWithResponseIntegrationTest.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@
1515

1616
package software.amazon.awssdk.enhanced.dynamodb;
1717

18-
import static org.assertj.core.api.Assertions.as;
1918
import static org.assertj.core.api.Assertions.assertThat;
2019
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2120

2221
import java.util.concurrent.CompletionException;
23-
import org.junit.After;
24-
import org.junit.AfterClass;
25-
import org.junit.BeforeClass;
26-
import org.junit.Test;
22+
import org.junit.jupiter.api.AfterEach;
23+
import org.junit.jupiter.api.AfterAll;
24+
import org.junit.jupiter.api.BeforeAll;
25+
import org.junit.jupiter.api.Test;
2726
import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedRequest;
2827
import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedResponse;
2928
import software.amazon.awssdk.enhanced.dynamodb.model.EnhancedLocalSecondaryIndex;
@@ -56,25 +55,37 @@ public class AsyncCrudWithResponseIntegrationTest extends DynamoDbEnhancedIntegr
5655
private static DynamoDbAsyncClient dynamoDbClient;
5756
private static DynamoDbEnhancedAsyncClient enhancedClient;
5857
private static DynamoDbAsyncTable<Record> mappedTable;
58+
private static CapturingInterceptor capturingInterceptor;
5959

60-
@BeforeClass
60+
@BeforeAll
6161
public static void beforeClass() {
62-
dynamoDbClient = createAsyncDynamoDbClient();
63-
enhancedClient = DynamoDbEnhancedAsyncClient.builder().dynamoDbClient(dynamoDbClient).build();
62+
capturingInterceptor = new CapturingInterceptor();
63+
dynamoDbClient = dynamoDbAsyncClientBuilder()
64+
.overrideConfiguration(o -> o.addExecutionInterceptor(capturingInterceptor))
65+
.build();
66+
enhancedClient = DynamoDbEnhancedAsyncClient.builder()
67+
.consistentRead(true)
68+
.dynamoDbClient(dynamoDbClient)
69+
.build();
6470
mappedTable = enhancedClient.table(TABLE_NAME, TABLE_SCHEMA);
6571
mappedTable.createTable(r -> r.localSecondaryIndices(LOCAL_SECONDARY_INDEX)).join();
6672
dynamoDbClient.waiter().waitUntilTableExists(r -> r.tableName(TABLE_NAME)).join();
6773
}
6874

69-
@After
75+
@AfterEach
7076
public void tearDown() {
7177
mappedTable.scan()
7278
.items()
7379
.subscribe(record -> mappedTable.deleteItem(record).join())
7480
.join();
7581
}
7682

77-
@AfterClass
83+
@AfterEach
84+
public void reset() {
85+
capturingInterceptor.reset();
86+
}
87+
88+
@AfterAll
7889
public static void afterClass() {
7990
try {
8091
dynamoDbClient.deleteTable(r -> r.tableName(TABLE_NAME)).join();
@@ -340,5 +351,22 @@ public void getItem_withoutReturnConsumedCapacity() {
340351

341352
GetItemEnhancedResponse<Record> response = mappedTable.getItemWithResponse(req -> req.key(key)).join();
342353
assertThat(response.consumedCapacity()).isNull();
354+
355+
assertThat(capturingInterceptor.getItemRequests.size()).isEqualTo(1);
356+
assertThat(capturingInterceptor.getItemRequests.get(0).consistentRead()).isTrue();
357+
}
358+
359+
@Test
360+
public void getItem_consistentReadSetOnRequest_overridesClientValue() {
361+
Record record = new Record().setId("101").setSort(102).setStringAttribute(getStringAttrValue(80_000));
362+
Key key = Key.builder()
363+
.partitionValue(record.getId())
364+
.sortValue(record.getSort())
365+
.build();
366+
367+
mappedTable.getItemWithResponse(req -> req.consistentRead(false).key(key)).join();
368+
369+
assertThat(capturingInterceptor.getItemRequests.size()).isEqualTo(1);
370+
assertThat(capturingInterceptor.getItemRequests.get(0).consistentRead()).isFalse();
343371
}
344372
}

services-custom/dynamodb-enhanced/src/it/java/software/amazon/awssdk/enhanced/dynamodb/DynamoDbEnhancedIntegrationTestBase.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,25 @@
2020
import static software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTags.secondaryPartitionKey;
2121
import static software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTags.secondarySortKey;
2222

23+
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.List;
2526
import java.util.UUID;
2627
import java.util.stream.Collectors;
2728
import java.util.stream.IntStream;
29+
import software.amazon.awssdk.core.SdkRequest;
30+
import software.amazon.awssdk.core.interceptor.Context;
31+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
32+
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
2833
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
2934
import software.amazon.awssdk.enhanced.dynamodb.model.Record;
3035
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
36+
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
3137
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
38+
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
39+
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
40+
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
41+
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
3242
import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase;
3343

3444
public abstract class DynamoDbEnhancedIntegrationTestBase extends AwsIntegrationTestBase {
@@ -37,15 +47,21 @@ protected static String createTestTableName() {
3747
}
3848

3949
protected static DynamoDbClient createDynamoDbClient() {
40-
return DynamoDbClient.builder()
41-
.credentialsProvider(getCredentialsProvider())
42-
.build();
50+
return dynamoDbClientBuilder().build();
4351
}
4452

4553
protected static DynamoDbAsyncClient createAsyncDynamoDbClient() {
54+
return dynamoDbAsyncClientBuilder().build();
55+
}
56+
57+
protected static DynamoDbClientBuilder dynamoDbClientBuilder() {
58+
return DynamoDbClient.builder()
59+
.credentialsProvider(getCredentialsProvider());
60+
}
61+
62+
protected static DynamoDbAsyncClientBuilder dynamoDbAsyncClientBuilder() {
4663
return DynamoDbAsyncClient.builder()
47-
.credentialsProvider(getCredentialsProvider())
48-
.build();
64+
.credentialsProvider(getCredentialsProvider());
4965
}
5066

5167
protected static final TableSchema<Record> TABLE_SCHEMA =
@@ -102,4 +118,29 @@ protected static String getStringAttrValue(int numChars) {
102118
return new String(chars);
103119
}
104120

121+
protected static class CapturingInterceptor implements ExecutionInterceptor {
122+
public final List<GetItemRequest> getItemRequests = new ArrayList<>();
123+
public final List<ScanRequest> scanRequests = new ArrayList<>();
124+
public final List<QueryRequest> queryRequests = new ArrayList<>();
125+
126+
@Override
127+
public void beforeExecution(Context.BeforeExecution context, ExecutionAttributes executionAttributes) {
128+
SdkRequest sdkRequest = context.request();
129+
if (sdkRequest instanceof GetItemRequest) {
130+
getItemRequests.add((GetItemRequest) sdkRequest);
131+
}
132+
if (sdkRequest instanceof ScanRequest) {
133+
scanRequests.add((ScanRequest) sdkRequest);
134+
}
135+
if (sdkRequest instanceof QueryRequest) {
136+
queryRequests.add((QueryRequest) sdkRequest);
137+
}
138+
}
139+
140+
public void reset() {
141+
getItemRequests.clear();
142+
scanRequests.clear();
143+
queryRequests.clear();
144+
}
145+
}
105146
}

services-custom/dynamodb-enhanced/src/it/java/software/amazon/awssdk/enhanced/dynamodb/ScanQueryIntegrationTest.java

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@
3030
import java.util.HashMap;
3131
import java.util.Iterator;
3232
import java.util.Map;
33-
import org.junit.AfterClass;
34-
import org.junit.BeforeClass;
35-
import org.junit.Test;
33+
import org.junit.jupiter.api.AfterAll;
34+
import org.junit.jupiter.api.AfterEach;
35+
import org.junit.jupiter.api.BeforeAll;
36+
import org.junit.jupiter.api.Test;
3637
import software.amazon.awssdk.enhanced.dynamodb.model.Page;
3738
import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest;
3839
import software.amazon.awssdk.enhanced.dynamodb.model.Record;
@@ -49,17 +50,39 @@ public class ScanQueryIntegrationTest extends DynamoDbEnhancedIntegrationTestBas
4950
private static DynamoDbClient dynamoDbClient;
5051
private static DynamoDbEnhancedClient enhancedClient;
5152
private static DynamoDbTable<Record> mappedTable;
53+
private static CapturingInterceptor capturingInterceptor;
5254

53-
@BeforeClass
55+
public static void main(String[] args) {
56+
DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder()
57+
.consistentRead(true)
58+
.dynamoDbClient(dynamoDbClient)
59+
.build();
60+
61+
mappedTable = enhancedClient.table(TABLE_NAME, TABLE_SCHEMA);
62+
63+
}
64+
65+
@BeforeAll
5466
public static void setup() {
55-
dynamoDbClient = createDynamoDbClient();
56-
enhancedClient = DynamoDbEnhancedClient.builder().dynamoDbClient(dynamoDbClient).build();
67+
capturingInterceptor = new CapturingInterceptor();
68+
dynamoDbClient = dynamoDbClientBuilder()
69+
.overrideConfiguration(o -> o.addExecutionInterceptor(capturingInterceptor))
70+
.build();
71+
enhancedClient = DynamoDbEnhancedClient.builder()
72+
.consistentRead(true)
73+
.dynamoDbClient(dynamoDbClient)
74+
.build();
5775
mappedTable = enhancedClient.table(TABLE_NAME, TABLE_SCHEMA);
5876
mappedTable.createTable();
5977
dynamoDbClient.waiter().waitUntilTableExists(r -> r.tableName(TABLE_NAME));
6078
}
6179

62-
@AfterClass
80+
@AfterEach
81+
public void reset() {
82+
capturingInterceptor.reset();
83+
}
84+
85+
@AfterAll
6386
public static void teardown() {
6487
try {
6588
dynamoDbClient.deleteTable(r -> r.tableName(TABLE_NAME));
@@ -100,7 +123,10 @@ public void scan_withReturnConsumedCapacityAndDifferentReadConsistency_checksCon
100123
insertRecords();
101124

102125
Iterator<Page<Record>> eventualConsistencyResult =
103-
mappedTable.scan(ScanEnhancedRequest.builder().returnConsumedCapacity(ReturnConsumedCapacity.TOTAL).build())
126+
mappedTable.scan(ScanEnhancedRequest.builder()
127+
.consistentRead(false)
128+
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
129+
.build())
104130
.iterator();
105131

106132
Page<Record> page = eventualConsistencyResult.next();
@@ -109,7 +135,10 @@ public void scan_withReturnConsumedCapacityAndDifferentReadConsistency_checksCon
109135
assertThat(eventualConsumedCapacity, is(notNullValue()));
110136

111137
Iterator<Page<Record>> strongConsistencyResult =
112-
mappedTable.scan(ScanEnhancedRequest.builder().returnConsumedCapacity(ReturnConsumedCapacity.TOTAL).build())
138+
mappedTable.scan(ScanEnhancedRequest.builder()
139+
.consistentRead(true)
140+
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
141+
.build())
113142
.iterator();
114143

115144
page = strongConsistencyResult.next();
@@ -155,6 +184,7 @@ public void query_withReturnConsumedCapacityAndDifferentReadConsistency_checksCo
155184

156185
Iterator<Page<Record>> eventualConsistencyResult =
157186
mappedTable.query(QueryEnhancedRequest.builder()
187+
.consistentRead(false)
158188
.queryConditional(sortGreaterThan(k -> k.partitionValue("id-value").sortValue(3)))
159189
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
160190
.build())
@@ -167,6 +197,7 @@ public void query_withReturnConsumedCapacityAndDifferentReadConsistency_checksCo
167197

168198
Iterator<Page<Record>> strongConsistencyResult =
169199
mappedTable.query(QueryEnhancedRequest.builder()
200+
.consistentRead(true)
170201
.queryConditional(sortGreaterThan(k -> k.partitionValue("id-value").sortValue(3)))
171202
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
172203
.build())
@@ -180,6 +211,29 @@ public void query_withReturnConsumedCapacityAndDifferentReadConsistency_checksCo
180211
assertThat(strongConsumedCapacity.capacityUnits(), is(greaterThanOrEqualTo(eventualConsumedCapacity.capacityUnits())));
181212
}
182213

214+
@Test
215+
public void scan_consistentReadNotSetOnRequest_usesClientValue() {
216+
mappedTable.scan(ScanEnhancedRequest.builder().build())
217+
.items().stream().count();
218+
219+
assertThat(capturingInterceptor.scanRequests.size(), is(1));
220+
Boolean consistentRead = capturingInterceptor.scanRequests.get(0).consistentRead();
221+
assertThat(consistentRead, is(true));
222+
}
223+
224+
@Test
225+
public void query_consistentReadSetOnRequest_overridesClientValue() {
226+
mappedTable.query(QueryEnhancedRequest.builder()
227+
.consistentRead(false)
228+
.queryConditional(sortGreaterThan(k -> k.partitionValue("id-value").sortValue(3)))
229+
.build())
230+
.items().stream().count();
231+
232+
assertThat(capturingInterceptor.queryRequests.size(), is(1));
233+
Boolean consistentRead = capturingInterceptor.queryRequests.get(0).consistentRead();
234+
assertThat(consistentRead, is(false));
235+
}
236+
183237
private Map<String, AttributeValue> getKeyMap(int sort) {
184238
Map<String, AttributeValue> result = new HashMap<>();
185239
result.put("id", stringValue(RECORDS.get(sort).getId()));

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/DynamoDbEnhancedAsyncClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,9 @@ interface Builder extends DynamoDbEnhancedResource.Builder {
560560
@Override
561561
Builder extensions(List<DynamoDbEnhancedClientExtension> dynamoDbEnhancedClientExtensions);
562562

563+
@Override
564+
Builder consistentRead(Boolean consistentRead);
565+
563566
/**
564567
* Builds an enhanced client based on the settings supplied to this builder
565568
*

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/DynamoDbEnhancedClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,9 @@ interface Builder extends DynamoDbEnhancedResource.Builder {
569569
@Override
570570
Builder extensions(List<DynamoDbEnhancedClientExtension> dynamoDbEnhancedClientExtensions);
571571

572+
@Override
573+
Builder consistentRead(Boolean consistentRead);
574+
572575
/**
573576
* Builds an enhanced client based on the settings supplied to this builder
574577
*

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/DynamoDbEnhancedResource.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,28 @@ interface Builder {
5454
* @param dynamoDbEnhancedClientExtensions a list of extensions to load with the enhanced client
5555
*/
5656
Builder extensions(List<DynamoDbEnhancedClientExtension> dynamoDbEnhancedClientExtensions);
57+
58+
/**
59+
* Sets the default read consistency model for single read operations (GetItem, Query, Scan). When set to true, these
60+
* operations will use strongly consistent reads. By default, this is set to null/false, i.e., eventually consistent
61+
* reads.
62+
* <p>
63+
* If set at the request level, e.g., {@code QueryEnhancedRequest}, the request level value will take precedence.
64+
* <p>
65+
* Note: This setting applies to single read operations only:
66+
* <ul>
67+
* <li>BatchGetItem: Eventually consistent by default, consistent read setting must be configured at the individual
68+
* request level, i.e. {@code GetItemEnhancedRequest}s passed to {@code ReachBatch} on the {@code
69+
* BatchGetItemEnhancedRequest}, when performing batch GET.
70+
* </li>
71+
* <li>TransactGetItems: Always uses strongly consistent reads by design, so this setting is not applicable.</li>
72+
* </ul>
73+
*
74+
* @param consistentRead true for strongly consistent reads, null/false (default value) for eventually consistent reads
75+
* @return this builder for method chaining
76+
*/
77+
default Builder consistentRead(Boolean consistentRead) {
78+
throw new UnsupportedOperationException();
79+
}
5780
}
5881
}

services-custom/dynamodb-enhanced/src/main/java/software/amazon/awssdk/enhanced/dynamodb/MappedTableResource.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,25 @@ public interface MappedTableResource<T> {
5454
* @return A key that has been initialized with the index values extracted from the modelled object.
5555
*/
5656
Key keyFrom(T item);
57+
58+
/**
59+
* The default read consistency model for single read operations (GetItem, Query, Scan). When set to true, these
60+
* operations will use strongly consistent reads. By default, this is set to null/false, i.e., eventually consistent reads.
61+
* <p>
62+
* If set at the request level, e.g., {@code QueryEnhancedRequest}, the request level value will take precedence.
63+
* <p>
64+
* Note: This setting applies to single read operations only:
65+
* <ul>
66+
* <li>BatchGetItem: Eventually consistent by default, consistent read setting must be configured at the individual
67+
* request level, i.e. {@code GetItemEnhancedRequest}s passed to {@code ReachBatch} on the {@code
68+
* BatchGetItemEnhancedRequest}, when performing batch GET.
69+
* </li>
70+
* <li>TransactGetItems: Always uses strongly consistent reads by design, so this setting is not applicable.</li>
71+
* </ul>
72+
*
73+
* @return The default consistent read setting on the table.
74+
*/
75+
default Boolean consistentRead() {
76+
throw new UnsupportedOperationException();
77+
}
5778
}

0 commit comments

Comments
 (0)