Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "DynamoDB Enhanced Client",
"contributor": "",
"description": "Allow new records to be initialized with version=0 by supporting startAt=-1 in VersionedRecordExtension"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "DynamoDB Enhanced Client",
"contributor": "",
"description": "modify VersionedRecordExtension to support updating existing records with version=0 using OR condition"
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTag;
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableMetadata;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.utils.Validate;

/**
* This extension implements optimistic locking on record writes by means of a 'record version number' that is used
Expand All @@ -55,6 +54,10 @@
* Then, whenever a record is written the write operation will only succeed if the version number of the record has not
* been modified since it was last read by the application. Every time a new version of the record is successfully
* written to the database, the record version number will be automatically incremented.
* <p>
* <b>Version Calculation:</b> The first version written to a new record is calculated as {@code startAt + incrementBy}.
* For example, with {@code startAt=0} and {@code incrementBy=1} (defaults), the first version is 1.
* To start versioning from 0, use {@code startAt=-1} and {@code incrementBy=1}, which produces first version = 0.
*/
@SdkPublicApi
@ThreadSafe
Expand All @@ -68,7 +71,9 @@ public final class VersionedRecordExtension implements DynamoDbEnhancedClientExt
private final long incrementBy;

private VersionedRecordExtension(Long startAt, Long incrementBy) {
Validate.isNotNegativeOrNull(startAt, "startAt");
if (startAt != null && startAt < -1) {
throw new IllegalArgumentException("startAt must be -1 or greater");
}

if (incrementBy != null && incrementBy < 1) {
throw new IllegalArgumentException("incrementBy must be greater than 0.");
Expand Down Expand Up @@ -121,7 +126,9 @@ public Consumer<StaticTableMetadata.Builder> modifyMetadata(String attributeName
"is supported.", attributeName, attributeValueType.name()));
}

Validate.isNotNegativeOrNull(startAt, "startAt");
if (startAt != null && startAt < -1) {
throw new IllegalArgumentException("startAt must be -1 or greater.");
}

if (incrementBy != null && incrementBy < 1) {
throw new IllegalArgumentException("incrementBy must be greater than 0.");
Expand Down Expand Up @@ -158,7 +165,7 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
.orElse(this.incrementBy);


if (isInitialVersion(existingVersionValue, versionStartAtFromAnnotation)) {
if (existingVersionValue == null || isNullAttributeValue(existingVersionValue)) {
newVersionValue = AttributeValue.builder()
.n(Long.toString(versionStartAtFromAnnotation + versionIncrementByFromAnnotation))
.build();
Expand All @@ -175,7 +182,6 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex

long existingVersion = Long.parseLong(existingVersionValue.n());
String existingVersionValueKey = VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER.apply(versionAttributeKey.get());

long increment = versionIncrementByFromAnnotation;

/*
Expand All @@ -190,12 +196,25 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex

newVersionValue = AttributeValue.builder().n(Long.toString(existingVersion + increment)).build();

condition = Expression.builder()
.expression(String.format("%s = %s", attributeKeyRef, existingVersionValueKey))
.expressionNames(Collections.singletonMap(attributeKeyRef, versionAttributeKey.get()))
.expressionValues(Collections.singletonMap(existingVersionValueKey,
existingVersionValue))
.build();
// When version equals startAt, we can't distinguish between new and existing records
// Use OR condition to handle both cases
if (existingVersion == versionStartAtFromAnnotation) {
condition = Expression.builder()
.expression(String.format("attribute_not_exists(%s) OR %s = %s",
attributeKeyRef, attributeKeyRef, existingVersionValueKey))
.expressionNames(Collections.singletonMap(attributeKeyRef, versionAttributeKey.get()))
.expressionValues(Collections.singletonMap(existingVersionValueKey,
existingVersionValue))
.build();
} else {
// Normal case - version doesn't equal startAt, must be existing record
condition = Expression.builder()
.expression(String.format("%s = %s", attributeKeyRef, existingVersionValueKey))
.expressionNames(Collections.singletonMap(attributeKeyRef, versionAttributeKey.get()))
.expressionValues(Collections.singletonMap(existingVersionValueKey,
existingVersionValue))
.build();
}
}

itemToTransform.put(versionAttributeKey.get(), newVersionValue);
Expand All @@ -206,21 +225,6 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
.build();
}

private boolean isInitialVersion(AttributeValue existingVersionValue, Long versionStartAtFromAnnotation) {
if (existingVersionValue == null || isNullAttributeValue(existingVersionValue)) {
return true;
}

if (existingVersionValue.n() != null) {
long currentVersion = Long.parseLong(existingVersionValue.n());
// If annotation value is present, use it, otherwise fall back to the extension's value
Long effectiveStartAt = versionStartAtFromAnnotation != null ? versionStartAtFromAnnotation : this.startAt;
return currentVersion == effectiveStartAt;
}

return false;
}

@NotThreadSafe
public static final class Builder {
private Long startAt;
Expand All @@ -231,9 +235,10 @@ private Builder() {

/**
* Sets the startAt used to compare if a record is the initial version of a record.
* The first version written to a new record is calculated as {@code startAt + incrementBy}.
* Default value - {@code 0}.
*
* @param startAt the starting value for version comparison, must not be negative
* @param startAt the starting value for version comparison, must be -1 or greater
* @return the builder instance
*/
public Builder startAt(Long startAt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
* Denotes this attribute as recording the version record number to be used for optimistic locking. Every time a record
* with this attribute is written to the database it will be incremented and a condition added to the request to check
* for an exact match of the old version.
* <p>
* <b>Version Calculation:</b> The first version written to a new record is calculated as {@code startAt + incrementBy}.
* For example, with {@code startAt=0} and {@code incrementBy=1} (defaults), the first version is 1.
* To start versioning from 0, use {@code startAt=-1} and {@code incrementBy=1}, which produces first version = 0.
*/
@SdkPublicApi
@Target({ElementType.METHOD})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void beforeWrite_versionEqualsStartAt_treatedAsInitialVersion() {
.operationContext(PRIMARY_CONTEXT).build());

assertThat(result.additionalConditionalExpression().expression(),
is("attribute_not_exists(#AMZN_MAPPED_version)"));
is("attribute_not_exists(#AMZN_MAPPED_version) OR #AMZN_MAPPED_version = :old_version_value"));
}

@ParameterizedTest
Expand Down Expand Up @@ -321,7 +321,7 @@ public void beforeWrite_versionEqualsAnnotationStartAt_isTreatedAsInitialVersion
.operationContext(PRIMARY_CONTEXT).build());

assertThat(result.additionalConditionalExpression().expression(),
is("attribute_not_exists(#AMZN_MAPPED_version)"));
is("attribute_not_exists(#AMZN_MAPPED_version) OR #AMZN_MAPPED_version = :old_version_value"));
}


Expand Down Expand Up @@ -634,6 +634,46 @@ public void isInitialVersion_shouldPrioritizeAnnotationValueOverBuilderValue() {
is("#AMZN_MAPPED_version = :old_version_value"));
}

@Test
public void updateItem_existingRecordWithVersionEqualToStartAt_shouldSucceed() {
VersionedRecordExtension recordExtension = VersionedRecordExtension.builder().build();
FakeItem item = createUniqueFakeItem();
item.setVersion(0);

Map<String, AttributeValue> inputMap = new HashMap<>(FakeItem.getTableSchema().itemToMap(item, true));

WriteModification result =
recordExtension.beforeWrite(DefaultDynamoDbExtensionContext
.builder()
.items(inputMap)
.tableMetadata(FakeItem.getTableMetadata())
.operationContext(PRIMARY_CONTEXT).build());

assertThat(result.additionalConditionalExpression().expression(),
is("attribute_not_exists(#AMZN_MAPPED_version) OR #AMZN_MAPPED_version = :old_version_value"));
}

@Test
public void beforeWrite_startAtNegativeOne_firstVersionIsZero() {
VersionedRecordExtension extension = VersionedRecordExtension.builder()
.startAt(-1L)
.incrementBy(1L)
.build();
FakeItem fakeItem = createUniqueFakeItem();
Map<String, AttributeValue> expectedItem =
new HashMap<>(FakeItem.getTableSchema().itemToMap(fakeItem, true));
expectedItem.put("version", AttributeValue.builder().n("0").build());

WriteModification result =
extension.beforeWrite(DefaultDynamoDbExtensionContext
.builder()
.items(FakeItem.getTableSchema().itemToMap(fakeItem, true))
.tableMetadata(FakeItem.getTableMetadata())
.operationContext(PRIMARY_CONTEXT).build());

assertThat(result.transformedItem(), is(expectedItem));
}

public static Stream<Arguments> customIncrementForExistingVersionValues() {
return Stream.of(
Arguments.of(0L, 1L, 5L, "6"),
Expand Down
Loading
Loading