Skip to content

Add ConsistentRead client config to DDB Enhanced Client #6337

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "DynamoDB Enhanced Client",
"contributor": "",
"description": "Adds consistent read client configuration to DDB Enhanced Client"
}
5 changes: 5 additions & 0 deletions services-custom/dynamodb-enhanced/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,10 @@
<type>so</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@

package software.amazon.awssdk.enhanced.dynamodb;

import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.concurrent.CompletionException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedRequest;
import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedResponse;
import software.amazon.awssdk.enhanced.dynamodb.model.EnhancedLocalSecondaryIndex;
Expand Down Expand Up @@ -56,25 +55,37 @@ public class AsyncCrudWithResponseIntegrationTest extends DynamoDbEnhancedIntegr
private static DynamoDbAsyncClient dynamoDbClient;
private static DynamoDbEnhancedAsyncClient enhancedClient;
private static DynamoDbAsyncTable<Record> mappedTable;
private static CapturingInterceptor capturingInterceptor;

@BeforeClass
@BeforeAll
public static void beforeClass() {
dynamoDbClient = createAsyncDynamoDbClient();
enhancedClient = DynamoDbEnhancedAsyncClient.builder().dynamoDbClient(dynamoDbClient).build();
capturingInterceptor = new CapturingInterceptor();
dynamoDbClient = dynamoDbAsyncClientBuilder()
.overrideConfiguration(o -> o.addExecutionInterceptor(capturingInterceptor))
.build();
enhancedClient = DynamoDbEnhancedAsyncClient.builder()
.consistentRead(true)
.dynamoDbClient(dynamoDbClient)
.build();
mappedTable = enhancedClient.table(TABLE_NAME, TABLE_SCHEMA);
mappedTable.createTable(r -> r.localSecondaryIndices(LOCAL_SECONDARY_INDEX)).join();
dynamoDbClient.waiter().waitUntilTableExists(r -> r.tableName(TABLE_NAME)).join();
}

@After
@AfterEach
public void tearDown() {
mappedTable.scan()
.items()
.subscribe(record -> mappedTable.deleteItem(record).join())
.join();
}

@AfterClass
@AfterEach
public void reset() {
capturingInterceptor.reset();
}

@AfterAll
public static void afterClass() {
try {
dynamoDbClient.deleteTable(r -> r.tableName(TABLE_NAME)).join();
Expand Down Expand Up @@ -340,5 +351,22 @@ public void getItem_withoutReturnConsumedCapacity() {

GetItemEnhancedResponse<Record> response = mappedTable.getItemWithResponse(req -> req.key(key)).join();
assertThat(response.consumedCapacity()).isNull();

assertThat(capturingInterceptor.getItemRequests.size()).isEqualTo(1);
assertThat(capturingInterceptor.getItemRequests.get(0).consistentRead()).isTrue();
}

@Test
public void getItem_consistentReadSetOnRequest_overridesClientValue() {
Record record = new Record().setId("101").setSort(102).setStringAttribute(getStringAttrValue(80_000));
Key key = Key.builder()
.partitionValue(record.getId())
.sortValue(record.getSort())
.build();

mappedTable.getItemWithResponse(req -> req.consistentRead(false).key(key)).join();

assertThat(capturingInterceptor.getItemRequests.size()).isEqualTo(1);
assertThat(capturingInterceptor.getItemRequests.get(0).consistentRead()).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,25 @@
import static software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTags.secondaryPartitionKey;
import static software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTags.secondarySortKey;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
import software.amazon.awssdk.enhanced.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClientBuilder;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.QueryRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase;

public abstract class DynamoDbEnhancedIntegrationTestBase extends AwsIntegrationTestBase {
Expand All @@ -37,15 +47,21 @@ protected static String createTestTableName() {
}

protected static DynamoDbClient createDynamoDbClient() {
return DynamoDbClient.builder()
.credentialsProvider(getCredentialsProvider())
.build();
return dynamoDbClientBuilder().build();
}

protected static DynamoDbAsyncClient createAsyncDynamoDbClient() {
return dynamoDbAsyncClientBuilder().build();
}

protected static DynamoDbClientBuilder dynamoDbClientBuilder() {
return DynamoDbClient.builder()
.credentialsProvider(getCredentialsProvider());
}

protected static DynamoDbAsyncClientBuilder dynamoDbAsyncClientBuilder() {
return DynamoDbAsyncClient.builder()
.credentialsProvider(getCredentialsProvider())
.build();
.credentialsProvider(getCredentialsProvider());
}

protected static final TableSchema<Record> TABLE_SCHEMA =
Expand Down Expand Up @@ -102,4 +118,29 @@ protected static String getStringAttrValue(int numChars) {
return new String(chars);
}

protected static class CapturingInterceptor implements ExecutionInterceptor {
public final List<GetItemRequest> getItemRequests = new ArrayList<>();
public final List<ScanRequest> scanRequests = new ArrayList<>();
public final List<QueryRequest> queryRequests = new ArrayList<>();

@Override
public void beforeExecution(Context.BeforeExecution context, ExecutionAttributes executionAttributes) {
SdkRequest sdkRequest = context.request();
if (sdkRequest instanceof GetItemRequest) {
getItemRequests.add((GetItemRequest) sdkRequest);
}
if (sdkRequest instanceof ScanRequest) {
scanRequests.add((ScanRequest) sdkRequest);
}
if (sdkRequest instanceof QueryRequest) {
queryRequests.add((QueryRequest) sdkRequest);
}
}

public void reset() {
getItemRequests.clear();
scanRequests.clear();
queryRequests.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.enhanced.dynamodb.model.Page;
import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest;
import software.amazon.awssdk.enhanced.dynamodb.model.Record;
Expand All @@ -49,17 +50,39 @@ public class ScanQueryIntegrationTest extends DynamoDbEnhancedIntegrationTestBas
private static DynamoDbClient dynamoDbClient;
private static DynamoDbEnhancedClient enhancedClient;
private static DynamoDbTable<Record> mappedTable;
private static CapturingInterceptor capturingInterceptor;

@BeforeClass
public static void main(String[] args) {
DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder()
.consistentRead(true)
.dynamoDbClient(dynamoDbClient)
.build();

mappedTable = enhancedClient.table(TABLE_NAME, TABLE_SCHEMA);

}

@BeforeAll
public static void setup() {
dynamoDbClient = createDynamoDbClient();
enhancedClient = DynamoDbEnhancedClient.builder().dynamoDbClient(dynamoDbClient).build();
capturingInterceptor = new CapturingInterceptor();
dynamoDbClient = dynamoDbClientBuilder()
.overrideConfiguration(o -> o.addExecutionInterceptor(capturingInterceptor))
.build();
enhancedClient = DynamoDbEnhancedClient.builder()
.consistentRead(true)
.dynamoDbClient(dynamoDbClient)
.build();
mappedTable = enhancedClient.table(TABLE_NAME, TABLE_SCHEMA);
mappedTable.createTable();
dynamoDbClient.waiter().waitUntilTableExists(r -> r.tableName(TABLE_NAME));
}

@AfterClass
@AfterEach
public void reset() {
capturingInterceptor.reset();
}

@AfterAll
public static void teardown() {
try {
dynamoDbClient.deleteTable(r -> r.tableName(TABLE_NAME));
Expand Down Expand Up @@ -100,7 +123,10 @@ public void scan_withReturnConsumedCapacityAndDifferentReadConsistency_checksCon
insertRecords();

Iterator<Page<Record>> eventualConsistencyResult =
mappedTable.scan(ScanEnhancedRequest.builder().returnConsumedCapacity(ReturnConsumedCapacity.TOTAL).build())
mappedTable.scan(ScanEnhancedRequest.builder()
.consistentRead(false)
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build())
.iterator();

Page<Record> page = eventualConsistencyResult.next();
Expand All @@ -109,7 +135,10 @@ public void scan_withReturnConsumedCapacityAndDifferentReadConsistency_checksCon
assertThat(eventualConsumedCapacity, is(notNullValue()));

Iterator<Page<Record>> strongConsistencyResult =
mappedTable.scan(ScanEnhancedRequest.builder().returnConsumedCapacity(ReturnConsumedCapacity.TOTAL).build())
mappedTable.scan(ScanEnhancedRequest.builder()
.consistentRead(true)
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build())
.iterator();

page = strongConsistencyResult.next();
Expand Down Expand Up @@ -155,6 +184,7 @@ public void query_withReturnConsumedCapacityAndDifferentReadConsistency_checksCo

Iterator<Page<Record>> eventualConsistencyResult =
mappedTable.query(QueryEnhancedRequest.builder()
.consistentRead(false)
.queryConditional(sortGreaterThan(k -> k.partitionValue("id-value").sortValue(3)))
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build())
Expand All @@ -167,6 +197,7 @@ public void query_withReturnConsumedCapacityAndDifferentReadConsistency_checksCo

Iterator<Page<Record>> strongConsistencyResult =
mappedTable.query(QueryEnhancedRequest.builder()
.consistentRead(true)
.queryConditional(sortGreaterThan(k -> k.partitionValue("id-value").sortValue(3)))
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build())
Expand All @@ -180,6 +211,29 @@ public void query_withReturnConsumedCapacityAndDifferentReadConsistency_checksCo
assertThat(strongConsumedCapacity.capacityUnits(), is(greaterThanOrEqualTo(eventualConsumedCapacity.capacityUnits())));
}

@Test
public void scan_consistentReadNotSetOnRequest_usesClientValue() {
mappedTable.scan(ScanEnhancedRequest.builder().build())
.items().stream().count();

assertThat(capturingInterceptor.scanRequests.size(), is(1));
Boolean consistentRead = capturingInterceptor.scanRequests.get(0).consistentRead();
assertThat(consistentRead, is(true));
}

@Test
public void query_consistentReadSetOnRequest_overridesClientValue() {
mappedTable.query(QueryEnhancedRequest.builder()
.consistentRead(false)
.queryConditional(sortGreaterThan(k -> k.partitionValue("id-value").sortValue(3)))
.build())
.items().stream().count();

assertThat(capturingInterceptor.queryRequests.size(), is(1));
Boolean consistentRead = capturingInterceptor.queryRequests.get(0).consistentRead();
assertThat(consistentRead, is(false));
}

private Map<String, AttributeValue> getKeyMap(int sort) {
Map<String, AttributeValue> result = new HashMap<>();
result.put("id", stringValue(RECORDS.get(sort).getId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,9 @@ interface Builder extends DynamoDbEnhancedResource.Builder {
@Override
Builder extensions(List<DynamoDbEnhancedClientExtension> dynamoDbEnhancedClientExtensions);

@Override
Builder consistentRead(Boolean consistentRead);

/**
* Builds an enhanced client based on the settings supplied to this builder
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,9 @@ interface Builder extends DynamoDbEnhancedResource.Builder {
@Override
Builder extensions(List<DynamoDbEnhancedClientExtension> dynamoDbEnhancedClientExtensions);

@Override
Builder consistentRead(Boolean consistentRead);

/**
* Builds an enhanced client based on the settings supplied to this builder
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,28 @@ interface Builder {
* @param dynamoDbEnhancedClientExtensions a list of extensions to load with the enhanced client
*/
Builder extensions(List<DynamoDbEnhancedClientExtension> dynamoDbEnhancedClientExtensions);

/**
* Sets the default read consistency model for single read operations (GetItem, Query, Scan). When set to true, these
* operations will use strongly consistent reads. By default, this is set to null/false, i.e., eventually consistent
* reads.
* <p>
* If set at the request level, e.g., {@code QueryEnhancedRequest}, the request level value will take precedence.
* <p>
* Note: This setting applies to single read operations only:
* <ul>
* <li>BatchGetItem: Eventually consistent by default, consistent read setting must be configured at the individual
* request level, i.e. {@code GetItemEnhancedRequest}s passed to {@code ReachBatch} on the {@code
* BatchGetItemEnhancedRequest}, when performing batch GET.
* </li>
* <li>TransactGetItems: Always uses strongly consistent reads by design, so this setting is not applicable.</li>
* </ul>
*
* @param consistentRead true for strongly consistent reads, null/false (default value) for eventually consistent reads
* @return this builder for method chaining
*/
default Builder consistentRead(Boolean consistentRead) {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,25 @@ public interface MappedTableResource<T> {
* @return A key that has been initialized with the index values extracted from the modelled object.
*/
Key keyFrom(T item);

/**
* The default read consistency model for single read operations (GetItem, Query, Scan). When set to true, these
* operations will use strongly consistent reads. By default, this is set to null/false, i.e., eventually consistent reads.
* <p>
* If set at the request level, e.g., {@code QueryEnhancedRequest}, the request level value will take precedence.
* <p>
* Note: This setting applies to single read operations only:
* <ul>
* <li>BatchGetItem: Eventually consistent by default, consistent read setting must be configured at the individual
* request level, i.e. {@code GetItemEnhancedRequest}s passed to {@code ReachBatch} on the {@code
* BatchGetItemEnhancedRequest}, when performing batch GET.
* </li>
* <li>TransactGetItems: Always uses strongly consistent reads by design, so this setting is not applicable.</li>
* </ul>
*
* @return The default consistent read setting on the table.
*/
default Boolean consistentRead() {
throw new UnsupportedOperationException();
}
}
Loading
Loading