34
34
import software .amazon .awssdk .enhanced .dynamodb .mapper .StaticAttributeTag ;
35
35
import software .amazon .awssdk .enhanced .dynamodb .mapper .StaticTableMetadata ;
36
36
import software .amazon .awssdk .services .dynamodb .model .AttributeValue ;
37
+ import software .amazon .awssdk .utils .Validate ;
37
38
38
39
/**
39
40
* This extension implements optimistic locking on record writes by means of a 'record version number' that is used
@@ -61,7 +62,18 @@ public final class VersionedRecordExtension implements DynamoDbEnhancedClientExt
61
62
private static final String CUSTOM_METADATA_KEY = "VersionedRecordExtension:VersionAttribute" ;
62
63
private static final VersionAttribute VERSION_ATTRIBUTE = new VersionAttribute ();
63
64
64
- private VersionedRecordExtension () {
65
+ private final long startAt ;
66
+ private final long incrementBy ;
67
+
68
+ private VersionedRecordExtension (Long startAt , Long incrementBy ) {
69
+ Validate .isNotNegativeOrNull (startAt , "startAt" );
70
+
71
+ if (incrementBy != null && incrementBy < 1 ) {
72
+ throw new IllegalArgumentException ("incrementBy must be greater than 0." );
73
+ }
74
+
75
+ this .startAt = startAt != null ? startAt : 0L ;
76
+ this .incrementBy = incrementBy != null ? incrementBy : 1L ;
65
77
}
66
78
67
79
public static Builder builder () {
@@ -75,19 +87,47 @@ private AttributeTags() {
75
87
public static StaticAttributeTag versionAttribute () {
76
88
return VERSION_ATTRIBUTE ;
77
89
}
90
+
91
+ public static StaticAttributeTag versionAttribute (Long startAt , Long incrementBy ) {
92
+ return new VersionAttribute (startAt , incrementBy );
93
+ }
78
94
}
79
95
80
- private static class VersionAttribute implements StaticAttributeTag {
96
+ private static final class VersionAttribute implements StaticAttributeTag {
97
+ private static final String START_AT_METADATA_KEY = "VersionedRecordExtension:StartAt" ;
98
+ private static final String INCREMENT_BY_METADATA_KEY = "VersionedRecordExtension:IncrementBy" ;
99
+
100
+ private final Long startAt ;
101
+ private final Long incrementBy ;
102
+
103
+ private VersionAttribute () {
104
+ this .startAt = null ;
105
+ this .incrementBy = null ;
106
+ }
107
+
108
+ private VersionAttribute (Long startAt , Long incrementBy ) {
109
+ this .startAt = startAt ;
110
+ this .incrementBy = incrementBy ;
111
+ }
112
+
81
113
@ Override
82
114
public Consumer <StaticTableMetadata .Builder > modifyMetadata (String attributeName ,
83
115
AttributeValueType attributeValueType ) {
84
116
if (attributeValueType != AttributeValueType .N ) {
85
117
throw new IllegalArgumentException (String .format (
86
118
"Attribute '%s' of type %s is not a suitable type to be used as a version attribute. Only type 'N' " +
87
- "is supported." , attributeName , attributeValueType .name ()));
119
+ "is supported." , attributeName , attributeValueType .name ()));
120
+ }
121
+
122
+ Validate .isNotNegativeOrNull (startAt , "startAt" );
123
+
124
+ if (incrementBy != null && incrementBy < 1 ) {
125
+ throw new IllegalArgumentException ("incrementBy must be greater than 0." );
88
126
}
89
127
90
128
return metadata -> metadata .addCustomMetadataObject (CUSTOM_METADATA_KEY , attributeName )
129
+ .addCustomMetadataObject (START_AT_METADATA_KEY , startAt )
130
+ .addCustomMetadataObject (INCREMENT_BY_METADATA_KEY , incrementBy )
91
131
.markAttributeAsKey (attributeName , attributeValueType );
92
132
}
93
133
}
@@ -106,31 +146,53 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
106
146
String attributeKeyRef = keyRef (versionAttributeKey .get ());
107
147
AttributeValue newVersionValue ;
108
148
Expression condition ;
109
- Optional <AttributeValue > existingVersionValue =
110
- Optional .ofNullable (itemToTransform .get (versionAttributeKey .get ()));
111
149
112
- if (!existingVersionValue .isPresent () || isNullAttributeValue (existingVersionValue .get ())) {
113
- // First version of the record
114
- newVersionValue = AttributeValue .builder ().n ("1" ).build ();
150
+ AttributeValue existingVersionValue = itemToTransform .get (versionAttributeKey .get ());
151
+ Long versionStartAtFromAnnotation = context .tableMetadata ()
152
+ .customMetadataObject (VersionAttribute .START_AT_METADATA_KEY , Long .class )
153
+ .orElse (this .startAt );
154
+ Long versionIncrementByFromAnnotation = context .tableMetadata ()
155
+ .customMetadataObject (VersionAttribute .INCREMENT_BY_METADATA_KEY , Long .class )
156
+ .orElse (this .incrementBy );
157
+
158
+
159
+ if (isInitialVersion (existingVersionValue , versionStartAtFromAnnotation )) {
160
+ newVersionValue = AttributeValue .builder ()
161
+ .n (Long .toString (versionStartAtFromAnnotation + versionIncrementByFromAnnotation ))
162
+ .build ();
115
163
condition = Expression .builder ()
116
164
.expression (String .format ("attribute_not_exists(%s)" , attributeKeyRef ))
117
165
.expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey .get ()))
118
166
.build ();
119
167
} else {
120
168
// Existing record, increment version
121
- if (existingVersionValue .get (). n () == null ) {
169
+ if (existingVersionValue .n () == null ) {
122
170
// In this case a non-null version attribute is present, but it's not an N
123
171
throw new IllegalArgumentException ("Version attribute appears to be the wrong type. N is required." );
124
172
}
125
173
126
- int existingVersion = Integer . parseInt (existingVersionValue . get () .n ());
174
+ long existingVersion = Long . parseLong (existingVersionValue .n ());
127
175
String existingVersionValueKey = VERSIONED_RECORD_EXPRESSION_VALUE_KEY_MAPPER .apply (versionAttributeKey .get ());
128
- newVersionValue = AttributeValue .builder ().n (Integer .toString (existingVersion + 1 )).build ();
176
+
177
+ long increment = versionIncrementByFromAnnotation ;
178
+
179
+ /*
180
+ Since the new incrementBy and StartAt functionality can now accept any positive number, though unlikely
181
+ to happen in a real life scenario, we should add overflow protection.
182
+ */
183
+ if (existingVersion > Long .MAX_VALUE - increment ) {
184
+ throw new IllegalStateException (
185
+ String .format ("Version overflow detected. Current version %d + increment %d would exceed Long.MAX_VALUE" ,
186
+ existingVersion , increment ));
187
+ }
188
+
189
+ newVersionValue = AttributeValue .builder ().n (Long .toString (existingVersion + increment )).build ();
190
+
129
191
condition = Expression .builder ()
130
192
.expression (String .format ("%s = %s" , attributeKeyRef , existingVersionValueKey ))
131
193
.expressionNames (Collections .singletonMap (attributeKeyRef , versionAttributeKey .get ()))
132
194
.expressionValues (Collections .singletonMap (existingVersionValueKey ,
133
- existingVersionValue . get () ))
195
+ existingVersionValue ))
134
196
.build ();
135
197
}
136
198
@@ -142,13 +204,55 @@ public WriteModification beforeWrite(DynamoDbExtensionContext.BeforeWrite contex
142
204
.build ();
143
205
}
144
206
207
+ private boolean isInitialVersion (AttributeValue existingVersionValue , Long versionStartAtFromAnnotation ) {
208
+ if (existingVersionValue == null || isNullAttributeValue (existingVersionValue )) {
209
+ return true ;
210
+ }
211
+
212
+ if (existingVersionValue .n () != null ) {
213
+ long currentVersion = Long .parseLong (existingVersionValue .n ());
214
+ // If annotation value is present, use it, otherwise fall back to the extension's value
215
+ Long effectiveStartAt = versionStartAtFromAnnotation != null ? versionStartAtFromAnnotation : this .startAt ;
216
+ return currentVersion == effectiveStartAt ;
217
+ }
218
+
219
+ return false ;
220
+ }
221
+
145
222
@ NotThreadSafe
146
223
public static final class Builder {
224
+ private Long startAt ;
225
+ private Long incrementBy ;
226
+
147
227
private Builder () {
148
228
}
149
229
230
+ /**
231
+ * Sets the startAt used to compare if a record is the initial version of a record.
232
+ * Default value - {@code 0}.
233
+ *
234
+ * @param startAt the starting value for version comparison, must not be negative
235
+ * @return the builder instance
236
+ */
237
+ public Builder startAt (Long startAt ) {
238
+ this .startAt = startAt ;
239
+ return this ;
240
+ }
241
+
242
+ /**
243
+ * Sets the amount to increment the version by with each subsequent update.
244
+ * Default value - {@code 1}.
245
+ *
246
+ * @param incrementBy the amount to increment the version by, must be greater than 0
247
+ * @return the builder instance
248
+ */
249
+ public Builder incrementBy (Long incrementBy ) {
250
+ this .incrementBy = incrementBy ;
251
+ return this ;
252
+ }
253
+
150
254
public VersionedRecordExtension build () {
151
- return new VersionedRecordExtension ();
255
+ return new VersionedRecordExtension (this . startAt , this . incrementBy );
152
256
}
153
257
}
154
258
}
0 commit comments